This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f79ce0  SCB-140 compensated only distinct events SCB-141 supported 
multiple sub tx within the same global tx on a single service
1f79ce0 is described below

commit 1f79ce08db38f496ee06da5e17f1e0504c4c0ee3
Author: seanyinx <sean....@huawei.com>
AuthorDate: Thu Dec 28 10:28:16 2017 +0800

    SCB-140 compensated only distinct events
    SCB-141 supported multiple sub tx within the same global tx on a single 
service
    
    Signed-off-by: seanyinx <sean....@huawei.com>
---
 .../saga/alpha/core/TxConsistentService.java       |  3 +-
 .../saga/alpha/core/TxEventRepository.java         |  2 +-
 .../saga/alpha/core/TxConsistentServiceTest.java   |  6 ++--
 .../saga/alpha/server/SpringTxEventRepository.java |  2 +-
 .../saga/alpha/server/TxEventEnvelope.java         |  6 ++++
 .../alpha/server/TxEventEnvelopeRepository.java    |  5 ++++
 .../saga/alpha/server/AlphaIntegrationTest.java    | 26 ++++++++++++++---
 omega/omega-context/pom.xml                        |  4 +++
 .../saga/omega/context/OmegaContext.java           | 34 +++++++++++++---------
 .../spring/TransactionInterceptionTest.java        |  8 +++--
 .../saga/omega/transaction/TransactionAspect.java  |  1 +
 11 files changed, 70 insertions(+), 27 deletions(-)

diff --git 
a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java
 
b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java
index af02d74..22605f8 100644
--- 
a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java
+++ 
b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -18,7 +18,6 @@
 package io.servicecomb.saga.alpha.core;
 
 import static io.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
-import static io.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
 import static io.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
 
 import java.util.HashMap;
@@ -49,7 +48,7 @@ public class TxConsistentService {
 
   // TODO: 2017/12/27 we must define a way to find which service to 
compensate, to avoid sending to all
   private void compensate(TxEvent event) {
-    List<TxEvent> events = 
eventRepository.findCompletedEvents(event.globalTxId(), TxEndedEvent.name());
+    List<TxEvent> events = 
eventRepository.findStartedTransactions(event.globalTxId(), 
TxStartedEvent.name());
     events.forEach(evt -> omegaCallback.compensate(evt.globalTxId(), 
evt.payloads()));
   }
 }
diff --git 
a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java
 
b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java
index cb44f77..9eed4ea 100644
--- 
a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java
+++ 
b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java
@@ -22,5 +22,5 @@ import java.util.List;
 public interface TxEventRepository {
   void save(TxEvent event);
 
-  List<TxEvent> findCompletedEvents(String globalTxId, String type);
+  List<TxEvent> findStartedTransactions(String globalTxId, String type);
 }
diff --git 
a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
 
b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index 64ed62c..d7e66c3 100644
--- 
a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ 
b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -50,7 +50,7 @@ public class TxConsistentServiceTest {
     }
 
     @Override
-    public List<TxEvent> findCompletedEvents(String globalTxId, String type) {
+    public List<TxEvent> findStartedTransactions(String globalTxId, String 
type) {
       return events.stream()
           .filter(event -> globalTxId.equals(event.globalTxId()) && 
type.equals(event.type()))
           .collect(Collectors.toList());
@@ -85,9 +85,9 @@ public class TxConsistentServiceTest {
   @Test
   public void compensateGlobalTx_OnAnyLocalTxFailure() throws Exception {
     events.add(eventOf(TxStartedEvent, "service a".getBytes()));
-    events.add(eventOf(TxEndedEvent, "service a".getBytes()));
+    events.add(eventOf(TxEndedEvent, new byte[0]));
     events.add(eventOf(TxStartedEvent, "service b".getBytes()));
-    events.add(eventOf(TxEndedEvent, "service b".getBytes()));
+    events.add(eventOf(TxEndedEvent, new byte[0]));
 
     TxEvent abortEvent = newEvent(TxAbortedEvent);
 
diff --git 
a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SpringTxEventRepository.java
 
b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SpringTxEventRepository.java
index 7b59d29..e8c8058 100644
--- 
a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SpringTxEventRepository.java
+++ 
b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SpringTxEventRepository.java
@@ -36,7 +36,7 @@ class SpringTxEventRepository implements TxEventRepository {
   }
 
   @Override
-  public List<TxEvent> findCompletedEvents(String globalTxId, String type) {
+  public List<TxEvent> findStartedTransactions(String globalTxId, String type) 
{
     return eventRepo.findByEventGlobalTxIdAndEventType(globalTxId, type)
         .stream()
         .map(TxEventEnvelope::event)
diff --git 
a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java
 
b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java
index 152edfb..b027754 100644
--- 
a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java
+++ 
b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java
@@ -17,6 +17,8 @@
 
 package io.servicecomb.saga.alpha.server;
 
+import java.util.Date;
+
 import javax.persistence.Embedded;
 import javax.persistence.Entity;
 import javax.persistence.GeneratedValue;
@@ -40,6 +42,10 @@ class TxEventEnvelope {
     this.event = event;
   }
 
+  public TxEventEnvelope(String globalTxId, String localTxId, String 
parentTxId, String type, byte[] payloads) {
+    this.event = new TxEvent(new Date(), globalTxId, localTxId, parentTxId, 
type, payloads);
+  }
+
   public long creationTime() {
     return event.creationTime().getTime();
   }
diff --git 
a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
 
b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index cd3cbc7..04ff836 100644
--- 
a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ 
b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -19,10 +19,15 @@ package io.servicecomb.saga.alpha.server;
 
 import java.util.List;
 
+import org.springframework.data.jpa.repository.Query;
 import org.springframework.data.repository.CrudRepository;
 
 interface TxEventEnvelopeRepository extends CrudRepository<TxEventEnvelope, 
Long> {
   TxEventEnvelope findByEventGlobalTxId(String globalTxId);
 
+  @Query("SELECT DISTINCT new 
io.servicecomb.saga.alpha.server.TxEventEnvelope("
+      + "t.event.globalTxId, t.event.localTxId, t.event.parentTxId, 
t.event.type, t.event.payloads"
+      + ") FROM TxEventEnvelope t "
+      + "WHERE t.event.globalTxId = ?1 AND t.event.type = ?2")
   List<TxEventEnvelope> findByEventGlobalTxIdAndEventType(String globalTxId, 
String type);
 }
diff --git 
a/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
 
b/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index e9c9a98..d932072 100644
--- 
a/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ 
b/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -109,9 +109,23 @@ public class AlphaIntegrationTest {
   @Test
   public void compensateOnFailure() throws Exception {
     eventRepo.save(eventEnvelopeOf(TxStartedEvent, "service a".getBytes()));
-    eventRepo.save(eventEnvelopeOf(TxEndedEvent, "service a".getBytes()));
+    eventRepo.save(eventEnvelopeOf(TxEndedEvent, new byte[0]));
     eventRepo.save(eventEnvelopeOf(TxStartedEvent, "service b".getBytes()));
-    eventRepo.save(eventEnvelopeOf(TxEndedEvent, "service b".getBytes()));
+    eventRepo.save(eventEnvelopeOf(TxEndedEvent, new byte[0]));
+
+    endpoint.handle(someEvent(TxAbortedEvent));
+
+    await().atMost(1, SECONDS).until(() -> 
callbackArgs.getOrDefault(globalTxId, emptyList()).size() > 1);
+    assertThat(stringOf(callbackArgs.get(globalTxId)), 
containsInAnyOrder("service a", "service b"));
+  }
+
+  @Test
+  public void doNotCompensateDuplicateTxOnFailure() throws Exception {
+    eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId, parentTxId, 
"service a".getBytes()));
+    eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId, parentTxId, 
"service a".getBytes()));
+    eventRepo.save(eventEnvelopeOf(TxEndedEvent, new byte[0]));
+    eventRepo.save(eventEnvelopeOf(TxStartedEvent, "service b".getBytes()));
+    eventRepo.save(eventEnvelopeOf(TxEndedEvent, new byte[0]));
 
     endpoint.handle(someEvent(TxAbortedEvent));
 
@@ -136,10 +150,14 @@ public class AlphaIntegrationTest {
   }
 
   private TxEventEnvelope eventEnvelopeOf(EventType eventType, byte[] 
payloads) {
+    return eventEnvelopeOf(eventType, UUID.randomUUID().toString(), 
UUID.randomUUID().toString(), payloads);
+  }
+
+  private TxEventEnvelope eventEnvelopeOf(EventType eventType, String 
localTxId, String parentTxId, byte[] payloads) {
     return new TxEventEnvelope(new TxEvent(new Date(),
         globalTxId,
-        UUID.randomUUID().toString(),
-        UUID.randomUUID().toString(),
+        localTxId,
+        parentTxId,
         eventType.name(),
         payloads));
   }
diff --git a/omega/omega-context/pom.xml b/omega/omega-context/pom.xml
index 5bba67c..f6cd0c6 100644
--- a/omega/omega-context/pom.xml
+++ b/omega/omega-context/pom.xml
@@ -29,6 +29,10 @@
   <artifactId>omega-context</artifactId>
   <dependencies>
     <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
     </dependency>
diff --git 
a/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java
 
b/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java
index d8cca65..1fe8661 100644
--- 
a/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java
+++ 
b/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java
@@ -17,19 +17,24 @@
 
 package io.servicecomb.saga.omega.context;
 
+import java.lang.invoke.MethodHandles;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class OmegaContext {
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   public static final String GLOBAL_TX_ID_KEY = "X-Pack-Global-Transaction-Id";
   public static final String LOCAL_TX_ID_KEY = "X-Pack-Local-Transaction-Id";
 
   private final ThreadLocal<String> globalTxId = new ThreadLocal<>();
   private final ThreadLocal<String> localTxId = new ThreadLocal<>();
   private final ThreadLocal<String> parentTxId = new ThreadLocal<>();
-  private final Map<String, CompensationContext> compensationContexts = new 
ConcurrentHashMap<>();
+  private final Map<String, Map<String, CompensationContext>> 
compensationContexts = new ConcurrentHashMap<>();
   private final IdGenerator<String> idGenerator;
 
   public OmegaContext(IdGenerator<String> idGenerator) {
@@ -73,8 +78,9 @@ public class OmegaContext {
   }
 
   // TODO: 2017/12/23 remove this context entry by the end of its 
corresponding global tx
-  public void addContext(String id, Object target, String compensationMethod, 
Object... args) {
-    compensationContexts.put(id, new CompensationContext(target, 
compensationMethod, args));
+  public void addContext(String globalTxId, String localTxId, Object target, 
String compensationMethod, Object... args) {
+    compensationContexts.computeIfAbsent(globalTxId, k -> new 
ConcurrentHashMap<>())
+        .put(localTxId, new CompensationContext(target, compensationMethod, 
args));
   }
 
   public boolean containsContext(String globalTxId) {
@@ -82,17 +88,17 @@ public class OmegaContext {
   }
 
   public void compensate(String globalTxId) {
-    CompensationContext compensationContext = 
compensationContexts.get(globalTxId);
-
-    try {
-      invokeMethod(compensationContext);
-    } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException e) {
-      throw new IllegalStateException(
-          "Pre-checking for compensate method " + 
compensationContext.compensationMethod
-              + " was somehow skipped, did you forget to configure compensable 
method checking on service startup?",
-          e);
-    } finally {
-      compensationContexts.remove(globalTxId);
+    Map<String, CompensationContext> contexts = 
compensationContexts.remove(globalTxId);
+
+    for (CompensationContext compensationContext : contexts.values()) {
+      try {
+        invokeMethod(compensationContext);
+      } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException e) {
+        LOG.error(
+            "Pre-checking for compensate method " + 
compensationContext.compensationMethod
+                + " was somehow skipped, did you forget to configure 
compensable method checking on service startup?",
+            e);
+      }
     }
   }
 
diff --git 
a/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
 
b/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
index 67fdff9..83e57b0 100644
--- 
a/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
+++ 
b/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
@@ -105,10 +105,14 @@ public class TransactionInterceptionTest {
   public void compensateOnTransactionException() throws Exception {
     User user = userService.add(new User(username, email));
 
+    // another sub transaction to the same service within the same global 
transaction
+    omegaContext.newLocalTxId();
+    User anotherUser = userService.add(new User(uniquify("Jack"), 
uniquify("j...@gmail.com")));
+
     messageHandler.onReceive("to be compensated".getBytes());
 
-    User actual = userRepository.findOne(user.id());
-    assertThat(actual, is(nullValue()));
+    assertThat(userRepository.findOne(user.id()), is(nullValue()));
+    assertThat(userRepository.findOne(anotherUser.id()), is(nullValue()));
 
     assertThat(omegaContext.containsContext(globalTxId), is(false));
   }
diff --git 
a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TransactionAspect.java
 
b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TransactionAspect.java
index f78ed31..ecce0ee 100644
--- 
a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ 
b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -49,6 +49,7 @@ public class TransactionAspect {
     LOG.debug("Intercepting compensable method {} with context {}", 
method.toString(), context);
 
     context.addContext(context.globalTxId(),
+        context.localTxId(),
         joinPoint.getTarget(),
         compensable.compensationMethod(),
         joinPoint.getArgs());

-- 
To stop receiving notification emails like this one, please contact
['"commits@servicecomb.apache.org" <commits@servicecomb.apache.org>'].

Reply via email to