This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 9640570a0f301f81e013f522f7e14ea3f5282bfb
Author: seanyinx <[email protected]>
AuthorDate: Fri Dec 29 09:00:01 2017 +0800

    SCB-98 included compensation method signature in omega callback
    
    Signed-off-by: seanyinx <[email protected]>
---
 .../servicecomb/saga/alpha/core/OmegaCallback.java |  2 +-
 .../saga/alpha/core/TxConsistentService.java       |  2 +-
 .../io/servicecomb/saga/alpha/core/TxEvent.java    | 14 +++-
 .../saga/alpha/core/TxConsistentServiceTest.java   | 90 +++++++++++++++-----
 .../servicecomb/saga/alpha/server/AlphaConfig.java |  3 +-
 .../alpha/server/SwiftTxEventEndpointImpl.java     |  1 +
 .../saga/alpha/server/TxEventEnvelope.java         | 13 ++-
 .../alpha/server/TxEventEnvelopeRepository.java    |  2 +-
 .../saga/alpha/server/AlphaIntegrationTest.java    | 95 +++++++++++++++-------
 9 files changed, 162 insertions(+), 60 deletions(-)

diff --git 
a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/OmegaCallback.java
 
b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/OmegaCallback.java
index 7302016..5ebfb72 100644
--- 
a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/OmegaCallback.java
+++ 
b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/OmegaCallback.java
@@ -18,5 +18,5 @@
 package io.servicecomb.saga.alpha.core;
 
 public interface OmegaCallback {
-  void compensate(String globalTxId, byte[] message);
+  void compensate(String globalTxId, String localTxId, String 
compensationMethod, byte[] message);
 }
diff --git 
a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java
 
b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java
index 22605f8..15f5099 100644
--- 
a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java
+++ 
b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -49,6 +49,6 @@ public class TxConsistentService {
   // TODO: 2017/12/27 we must define a way to find which service to 
compensate, to avoid sending to all
   private void compensate(TxEvent event) {
     List<TxEvent> events = 
eventRepository.findStartedTransactions(event.globalTxId(), 
TxStartedEvent.name());
-    events.forEach(evt -> omegaCallback.compensate(evt.globalTxId(), 
evt.payloads()));
+    events.forEach(evt -> omegaCallback.compensate(evt.globalTxId(), 
evt.localTxId(), event.compensationMethod(), evt.payloads()));
   }
 }
diff --git 
a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEvent.java 
b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEvent.java
index 2d0a19b..da46db8 100644
--- a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEvent.java
+++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEvent.java
@@ -25,17 +25,25 @@ public class TxEvent {
   private String localTxId;
   private String parentTxId;
   private String type;
+  private String compensationMethod;
   private byte[] payloads;
 
   private TxEvent() {
   }
 
-  public TxEvent(Date creationTime, String globalTxId, String localTxId, 
String parentTxId, String type, byte[] payloads) {
+  public TxEvent(Date creationTime,
+      String globalTxId,
+      String localTxId,
+      String parentTxId,
+      String type,
+      String compensationMethod,
+      byte[] payloads) {
     this.creationTime = creationTime;
     this.globalTxId = globalTxId;
     this.localTxId = localTxId;
     this.parentTxId = parentTxId;
     this.type = type;
+    this.compensationMethod = compensationMethod;
     this.payloads = payloads;
   }
 
@@ -59,6 +67,10 @@ public class TxEvent {
     return type;
   }
 
+  public String compensationMethod() {
+    return compensationMethod;
+  }
+
   public byte[] payloads() {
     return payloads;
   }
diff --git 
a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
 
b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index d7e66c3..13a2674 100644
--- 
a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ 
b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -23,7 +23,6 @@ import static 
io.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
 import static io.servicecomb.saga.alpha.core.EventType.TxCompensatedEvent;
 import static io.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
 import static io.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
-import static java.util.Collections.emptyList;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.Matchers.contains;
@@ -32,10 +31,10 @@ import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Date;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
+import java.util.Objects;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
@@ -61,8 +60,12 @@ public class TxConsistentServiceTest {
   private final String localTxId = UUID.randomUUID().toString();
   private final String parentTxId = UUID.randomUUID().toString();
 
-  private final Map<String, List<byte[]>> callbackArgs = new HashMap<>();
-  private final OmegaCallback omegaCallback = (key, value) -> 
callbackArgs.computeIfAbsent(key, k -> new ArrayList<>()).add(value);
+  private final String compensationMethod = getClass().getCanonicalName();
+  private final List<CompensationContext> compensationContexts = new 
ArrayList<>();
+
+  private final OmegaCallback omegaCallback = (globalTxId, localTxId, 
compensationMethod, payloads) ->
+      compensationContexts.add(new CompensationContext(globalTxId, localTxId, 
compensationMethod, payloads));
+
   private final TxConsistentService consistentService = new 
TxConsistentService(eventRepository, omegaCallback);
 
   @Test
@@ -79,40 +82,85 @@ public class TxConsistentServiceTest {
     }
 
     assertThat(this.events, contains(events));
-    assertThat(callbackArgs.isEmpty(), is(true));
+    assertThat(compensationContexts.isEmpty(), is(true));
   }
 
   @Test
   public void compensateGlobalTx_OnAnyLocalTxFailure() throws Exception {
-    events.add(eventOf(TxStartedEvent, "service a".getBytes()));
-    events.add(eventOf(TxEndedEvent, new byte[0]));
-    events.add(eventOf(TxStartedEvent, "service b".getBytes()));
-    events.add(eventOf(TxEndedEvent, new byte[0]));
+    String localTxId1 = UUID.randomUUID().toString();
+    events.add(eventOf(TxStartedEvent, "service a".getBytes(), localTxId1));
+    events.add(eventOf(TxEndedEvent, new byte[0], localTxId1));
+
+    String localTxId2 = UUID.randomUUID().toString();
+    events.add(eventOf(TxStartedEvent, "service b".getBytes(), localTxId2));
+    events.add(eventOf(TxEndedEvent, new byte[0], localTxId2));
 
     TxEvent abortEvent = newEvent(TxAbortedEvent);
 
     consistentService.handle(abortEvent);
 
-    await().atMost(1, SECONDS).until(() -> 
callbackArgs.getOrDefault(globalTxId, emptyList()).size() > 1);
-    assertThat(stringOf(callbackArgs.get(globalTxId)), 
containsInAnyOrder("service a", "service b"));
-  }
-
-  private List<String> stringOf(List<byte[]> bytes) {
-    return bytes.stream()
-        .map(String::new)
-        .collect(Collectors.toList());
+    await().atMost(1, SECONDS).until(() -> compensationContexts.size() > 1);
+    assertThat(compensationContexts, containsInAnyOrder(
+        new CompensationContext(globalTxId, localTxId1, compensationMethod, 
"service a".getBytes()),
+        new CompensationContext(globalTxId, localTxId2, compensationMethod, 
"service b".getBytes())
+    ));
   }
 
   private TxEvent newEvent(EventType eventType) {
-    return new TxEvent(new Date(), globalTxId, localTxId, parentTxId, 
eventType.name(), "yeah".getBytes());
+    return new TxEvent(new Date(), globalTxId, localTxId, parentTxId, 
eventType.name(), compensationMethod, "yeah".getBytes());
   }
 
-  private TxEvent eventOf(EventType eventType, byte[] payloads) {
+  private TxEvent eventOf(EventType eventType, byte[] payloads, String 
localTxId) {
     return new TxEvent(new Date(),
         globalTxId,
-        UUID.randomUUID().toString(),
+        localTxId,
         UUID.randomUUID().toString(),
         eventType.name(),
+        compensationMethod,
         payloads);
   }
+
+  private static class CompensationContext {
+    private final String globalTxId;
+    private final String localTxId;
+    private final String compensationMethod;
+    private final byte[] message;
+
+    private CompensationContext(String globalTxId, String localTxId, String 
compensationMethod, byte[] message) {
+      this.globalTxId = globalTxId;
+      this.localTxId = localTxId;
+      this.compensationMethod = compensationMethod;
+      this.message = message;
+    }
+
+    @Override
+    public String toString() {
+      return "CompensationContext{" +
+          "globalTxId='" + globalTxId + '\'' +
+          ", localTxId='" + localTxId + '\'' +
+          ", compensationMethod='" + compensationMethod + '\'' +
+          ", message=" + Arrays.toString(message) +
+          '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      CompensationContext that = (CompensationContext) o;
+      return Objects.equals(globalTxId, that.globalTxId) &&
+          Objects.equals(localTxId, that.localTxId) &&
+          Objects.equals(compensationMethod, that.compensationMethod) &&
+          Arrays.equals(message, that.message);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(globalTxId, localTxId, compensationMethod, message);
+    }
+  }
 }
diff --git 
a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java
 
b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java
index 94b024e..d443fa7 100644
--- 
a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java
+++ 
b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -33,8 +33,7 @@ class AlphaConfig {
   @Bean
   OmegaCallback omegaCallback() {
     // TODO: 2017/12/27 to be replaced with actual callback on completion of 
SCB-138
-    return (globalTxId, message) -> {
-    };
+    return (globalTxId, localTxId, compensationMethod, message) -> {};
   }
   
   @Bean
diff --git 
a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java
 
b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java
index 9ce7c80..f1f8e40 100644
--- 
a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java
+++ 
b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java
@@ -40,6 +40,7 @@ class SwiftTxEventEndpointImpl implements 
SwiftTxEventEndpoint {
         message.localTxId(),
         message.parentTxId(),
         message.type(),
+        message.compensationMethod(),
         message.payloads()
     ));
   }
diff --git 
a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java
 
b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java
index b027754..fa282b4 100644
--- 
a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java
+++ 
b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java
@@ -42,8 +42,13 @@ class TxEventEnvelope {
     this.event = event;
   }
 
-  public TxEventEnvelope(String globalTxId, String localTxId, String 
parentTxId, String type, byte[] payloads) {
-    this.event = new TxEvent(new Date(), globalTxId, localTxId, parentTxId, 
type, payloads);
+  public TxEventEnvelope(String globalTxId,
+      String localTxId,
+      String parentTxId,
+      String type,
+      String compensationMethod,
+      byte[] payloads) {
+    this.event = new TxEvent(new Date(), globalTxId, localTxId, parentTxId, 
type, compensationMethod, payloads);
   }
 
   public long creationTime() {
@@ -66,6 +71,10 @@ class TxEventEnvelope {
     return event.type();
   }
 
+  String compensationMethod() {
+    return event.compensationMethod();
+  }
+
   byte[] payloads() {
     return event.payloads();
   }
diff --git 
a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
 
b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index 04ff836..5f929a1 100644
--- 
a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ 
b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -26,7 +26,7 @@ interface TxEventEnvelopeRepository extends 
CrudRepository<TxEventEnvelope, Long
   TxEventEnvelope findByEventGlobalTxId(String globalTxId);
 
   @Query("SELECT DISTINCT new 
io.servicecomb.saga.alpha.server.TxEventEnvelope("
-      + "t.event.globalTxId, t.event.localTxId, t.event.parentTxId, 
t.event.type, t.event.payloads"
+      + "t.event.globalTxId, t.event.localTxId, t.event.parentTxId, 
t.event.type, t.event.compensationMethod, t.event.payloads"
       + ") FROM TxEventEnvelope t "
       + "WHERE t.event.globalTxId = ?1 AND t.event.type = ?2")
   List<TxEventEnvelope> findByEventGlobalTxIdAndEventType(String globalTxId, 
String type);
diff --git 
a/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
 
b/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index d932072..f7ec33c 100644
--- 
a/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ 
b/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -21,7 +21,6 @@ import static com.google.common.net.HostAndPort.fromParts;
 import static io.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
 import static io.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
 import static io.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
-import static java.util.Collections.emptyList;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -29,12 +28,11 @@ import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Date;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
+import java.util.Objects;
 import java.util.UUID;
-import java.util.stream.Collectors;
 
 import org.junit.After;
 import org.junit.AfterClass;
@@ -68,12 +66,13 @@ public class AlphaIntegrationTest {
   private final String globalTxId = UUID.randomUUID().toString();
   private final String localTxId = UUID.randomUUID().toString();
   private final String parentTxId = UUID.randomUUID().toString();
+  private final String compensationMethod = getClass().getCanonicalName();
 
   @Autowired
   private TxEventEnvelopeRepository eventRepo;
 
   @Autowired
-  private Map<String, List<byte[]>> callbackArgs;
+  private List<CompensationContext> compensationContexts;
 
   private final FramedClientConnector connector = new 
FramedClientConnector(fromParts("localhost", port));
   private SwiftTxEventEndpoint endpoint;
@@ -103,40 +102,28 @@ public class AlphaIntegrationTest {
     assertThat(envelope.localTxId(), is(localTxId));
     assertThat(envelope.parentTxId(), is(parentTxId));
     assertThat(envelope.type(), is(TxStartedEvent.name()));
+    assertThat(envelope.compensationMethod(), is(compensationMethod));
     assertThat(envelope.payloads(), is(payload.getBytes()));
   }
 
   @Test
-  public void compensateOnFailure() throws Exception {
-    eventRepo.save(eventEnvelopeOf(TxStartedEvent, "service a".getBytes()));
-    eventRepo.save(eventEnvelopeOf(TxEndedEvent, new byte[0]));
-    eventRepo.save(eventEnvelopeOf(TxStartedEvent, "service b".getBytes()));
-    eventRepo.save(eventEnvelopeOf(TxEndedEvent, new byte[0]));
-
-    endpoint.handle(someEvent(TxAbortedEvent));
-
-    await().atMost(1, SECONDS).until(() -> 
callbackArgs.getOrDefault(globalTxId, emptyList()).size() > 1);
-    assertThat(stringOf(callbackArgs.get(globalTxId)), 
containsInAnyOrder("service a", "service b"));
-  }
-
-  @Test
   public void doNotCompensateDuplicateTxOnFailure() throws Exception {
+    // duplicate events with same content but different timestamp
     eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId, parentTxId, 
"service a".getBytes()));
     eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId, parentTxId, 
"service a".getBytes()));
     eventRepo.save(eventEnvelopeOf(TxEndedEvent, new byte[0]));
-    eventRepo.save(eventEnvelopeOf(TxStartedEvent, "service b".getBytes()));
+
+    String localTxId1 = UUID.randomUUID().toString();
+    eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId1, 
UUID.randomUUID().toString(), "service b".getBytes()));
     eventRepo.save(eventEnvelopeOf(TxEndedEvent, new byte[0]));
 
     endpoint.handle(someEvent(TxAbortedEvent));
 
-    await().atMost(1, SECONDS).until(() -> 
callbackArgs.getOrDefault(globalTxId, emptyList()).size() > 1);
-    assertThat(stringOf(callbackArgs.get(globalTxId)), 
containsInAnyOrder("service a", "service b"));
-  }
-
-  private List<String> stringOf(List<byte[]> bytes) {
-    return bytes.stream()
-        .map(String::new)
-        .collect(Collectors.toList());
+    await().atMost(1, SECONDS).until(() -> compensationContexts.size() > 1);
+    assertThat(compensationContexts, containsInAnyOrder(
+        new CompensationContext(globalTxId, this.localTxId, 
compensationMethod, "service a".getBytes()),
+        new CompensationContext(globalTxId, localTxId1, compensationMethod, 
"service b".getBytes())
+    ));
   }
 
   private SwiftTxEvent someEvent(EventType type) {
@@ -146,6 +133,7 @@ public class AlphaIntegrationTest {
         this.localTxId,
         this.parentTxId,
         type.name(),
+        compensationMethod,
         payload.getBytes());
   }
 
@@ -159,22 +147,67 @@ public class AlphaIntegrationTest {
         localTxId,
         parentTxId,
         eventType.name(),
+        compensationMethod,
         payloads));
   }
 
   @Configuration
   static class OmegaCallbackConfig {
-    private final Map<String, List<byte[]>> callbackArgs = new HashMap<>();
+    private final List<CompensationContext> compensationContexts = new 
ArrayList<>();
 
     @Bean
-    Map<String, List<byte[]>> callbackArgs() {
-      return callbackArgs;
+    List<CompensationContext> compensationContexts() {
+      return compensationContexts;
     }
 
     @Bean
     OmegaCallback omegaCallback() {
-      return (key, value) -> callbackArgs.computeIfAbsent(key, k -> new 
ArrayList<>()).add(value);
+      return (globalTxId, localTxId, compensationMethod, payloads) ->
+          compensationContexts.add(new CompensationContext(globalTxId, 
localTxId, compensationMethod, payloads));
     }
   }
 
+  private static class CompensationContext {
+    private final String globalTxId;
+    private final String localTxId;
+    private final String compensationMethod;
+    private final byte[] message;
+
+    private CompensationContext(String globalTxId, String localTxId, String 
compensationMethod, byte[] message) {
+      this.globalTxId = globalTxId;
+      this.localTxId = localTxId;
+      this.compensationMethod = compensationMethod;
+      this.message = message;
+    }
+
+    @Override
+    public String toString() {
+      return "CompensationContext{" +
+          "globalTxId='" + globalTxId + '\'' +
+          ", localTxId='" + localTxId + '\'' +
+          ", compensationMethod='" + compensationMethod + '\'' +
+          ", message=" + Arrays.toString(message) +
+          '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      CompensationContext that = (CompensationContext) o;
+      return Objects.equals(globalTxId, that.globalTxId) &&
+          Objects.equals(localTxId, that.localTxId) &&
+          Objects.equals(compensationMethod, that.compensationMethod) &&
+          Arrays.equals(message, that.message);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(globalTxId, localTxId, compensationMethod, message);
+    }
+  }
 }

-- 
To stop receiving notification emails like this one, please contact
"[email protected]" <[email protected]>.

Reply via email to