This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 9640570a0f301f81e013f522f7e14ea3f5282bfb Author: seanyinx <[email protected]> AuthorDate: Fri Dec 29 09:00:01 2017 +0800 SCB-98 included compensation method signature in omega callback Signed-off-by: seanyinx <[email protected]> --- .../servicecomb/saga/alpha/core/OmegaCallback.java | 2 +- .../saga/alpha/core/TxConsistentService.java | 2 +- .../io/servicecomb/saga/alpha/core/TxEvent.java | 14 +++- .../saga/alpha/core/TxConsistentServiceTest.java | 90 +++++++++++++++----- .../servicecomb/saga/alpha/server/AlphaConfig.java | 3 +- .../alpha/server/SwiftTxEventEndpointImpl.java | 1 + .../saga/alpha/server/TxEventEnvelope.java | 13 ++- .../alpha/server/TxEventEnvelopeRepository.java | 2 +- .../saga/alpha/server/AlphaIntegrationTest.java | 95 +++++++++++++++------- 9 files changed, 162 insertions(+), 60 deletions(-) diff --git a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/OmegaCallback.java b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/OmegaCallback.java index 7302016..5ebfb72 100644 --- a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/OmegaCallback.java +++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/OmegaCallback.java @@ -18,5 +18,5 @@ package io.servicecomb.saga.alpha.core; public interface OmegaCallback { - void compensate(String globalTxId, byte[] message); + void compensate(String globalTxId, String localTxId, String compensationMethod, byte[] message); } diff --git a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java index 22605f8..15f5099 100644 --- a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java +++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java @@ -49,6 +49,6 @@ public class TxConsistentService { // TODO: 2017/12/27 we must define a way to find which service to compensate, to avoid sending to all private void compensate(TxEvent event) { List<TxEvent> events = eventRepository.findStartedTransactions(event.globalTxId(), TxStartedEvent.name()); - events.forEach(evt -> omegaCallback.compensate(evt.globalTxId(), evt.payloads())); + events.forEach(evt -> omegaCallback.compensate(evt.globalTxId(), evt.localTxId(), event.compensationMethod(), evt.payloads())); } } diff --git a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEvent.java b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEvent.java index 2d0a19b..da46db8 100644 --- a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEvent.java +++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEvent.java @@ -25,17 +25,25 @@ public class TxEvent { private String localTxId; private String parentTxId; private String type; + private String compensationMethod; private byte[] payloads; private TxEvent() { } - public TxEvent(Date creationTime, String globalTxId, String localTxId, String parentTxId, String type, byte[] payloads) { + public TxEvent(Date creationTime, + String globalTxId, + String localTxId, + String parentTxId, + String type, + String compensationMethod, + byte[] payloads) { this.creationTime = creationTime; this.globalTxId = globalTxId; this.localTxId = localTxId; this.parentTxId = parentTxId; this.type = type; + this.compensationMethod = compensationMethod; this.payloads = payloads; } @@ -59,6 +67,10 @@ public class TxEvent { return type; } + public String compensationMethod() { + return compensationMethod; + } + public byte[] payloads() { return payloads; } diff --git a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java index d7e66c3..13a2674 100644 --- a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java +++ b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java @@ -23,7 +23,6 @@ import static io.servicecomb.saga.alpha.core.EventType.TxAbortedEvent; import static io.servicecomb.saga.alpha.core.EventType.TxCompensatedEvent; import static io.servicecomb.saga.alpha.core.EventType.TxEndedEvent; import static io.servicecomb.saga.alpha.core.EventType.TxStartedEvent; -import static java.util.Collections.emptyList; import static java.util.concurrent.TimeUnit.SECONDS; import static org.awaitility.Awaitility.await; import static org.hamcrest.Matchers.contains; @@ -32,10 +31,10 @@ import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; import java.util.ArrayList; +import java.util.Arrays; import java.util.Date; -import java.util.HashMap; import java.util.List; -import java.util.Map; +import java.util.Objects; import java.util.UUID; import java.util.stream.Collectors; @@ -61,8 +60,12 @@ public class TxConsistentServiceTest { private final String localTxId = UUID.randomUUID().toString(); private final String parentTxId = UUID.randomUUID().toString(); - private final Map<String, List<byte[]>> callbackArgs = new HashMap<>(); - private final OmegaCallback omegaCallback = (key, value) -> callbackArgs.computeIfAbsent(key, k -> new ArrayList<>()).add(value); + private final String compensationMethod = getClass().getCanonicalName(); + private final List<CompensationContext> compensationContexts = new ArrayList<>(); + + private final OmegaCallback omegaCallback = (globalTxId, localTxId, compensationMethod, payloads) -> + compensationContexts.add(new CompensationContext(globalTxId, localTxId, compensationMethod, payloads)); + private final TxConsistentService consistentService = new TxConsistentService(eventRepository, omegaCallback); @Test @@ -79,40 +82,85 @@ public class TxConsistentServiceTest { } assertThat(this.events, contains(events)); - assertThat(callbackArgs.isEmpty(), is(true)); + assertThat(compensationContexts.isEmpty(), is(true)); } @Test public void compensateGlobalTx_OnAnyLocalTxFailure() throws Exception { - events.add(eventOf(TxStartedEvent, "service a".getBytes())); - events.add(eventOf(TxEndedEvent, new byte[0])); - events.add(eventOf(TxStartedEvent, "service b".getBytes())); - events.add(eventOf(TxEndedEvent, new byte[0])); + String localTxId1 = UUID.randomUUID().toString(); + events.add(eventOf(TxStartedEvent, "service a".getBytes(), localTxId1)); + events.add(eventOf(TxEndedEvent, new byte[0], localTxId1)); + + String localTxId2 = UUID.randomUUID().toString(); + events.add(eventOf(TxStartedEvent, "service b".getBytes(), localTxId2)); + events.add(eventOf(TxEndedEvent, new byte[0], localTxId2)); TxEvent abortEvent = newEvent(TxAbortedEvent); consistentService.handle(abortEvent); - await().atMost(1, SECONDS).until(() -> callbackArgs.getOrDefault(globalTxId, emptyList()).size() > 1); - assertThat(stringOf(callbackArgs.get(globalTxId)), containsInAnyOrder("service a", "service b")); - } - - private List<String> stringOf(List<byte[]> bytes) { - return bytes.stream() - .map(String::new) - .collect(Collectors.toList()); + await().atMost(1, SECONDS).until(() -> compensationContexts.size() > 1); + assertThat(compensationContexts, containsInAnyOrder( + new CompensationContext(globalTxId, localTxId1, compensationMethod, "service a".getBytes()), + new CompensationContext(globalTxId, localTxId2, compensationMethod, "service b".getBytes()) + )); } private TxEvent newEvent(EventType eventType) { - return new TxEvent(new Date(), globalTxId, localTxId, parentTxId, eventType.name(), "yeah".getBytes()); + return new TxEvent(new Date(), globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod, "yeah".getBytes()); } - private TxEvent eventOf(EventType eventType, byte[] payloads) { + private TxEvent eventOf(EventType eventType, byte[] payloads, String localTxId) { return new TxEvent(new Date(), globalTxId, - UUID.randomUUID().toString(), + localTxId, UUID.randomUUID().toString(), eventType.name(), + compensationMethod, payloads); } + + private static class CompensationContext { + private final String globalTxId; + private final String localTxId; + private final String compensationMethod; + private final byte[] message; + + private CompensationContext(String globalTxId, String localTxId, String compensationMethod, byte[] message) { + this.globalTxId = globalTxId; + this.localTxId = localTxId; + this.compensationMethod = compensationMethod; + this.message = message; + } + + @Override + public String toString() { + return "CompensationContext{" + + "globalTxId='" + globalTxId + '\'' + + ", localTxId='" + localTxId + '\'' + + ", compensationMethod='" + compensationMethod + '\'' + + ", message=" + Arrays.toString(message) + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CompensationContext that = (CompensationContext) o; + return Objects.equals(globalTxId, that.globalTxId) && + Objects.equals(localTxId, that.localTxId) && + Objects.equals(compensationMethod, that.compensationMethod) && + Arrays.equals(message, that.message); + } + + @Override + public int hashCode() { + return Objects.hash(globalTxId, localTxId, compensationMethod, message); + } + } } diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java index 94b024e..d443fa7 100644 --- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java +++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java @@ -33,8 +33,7 @@ class AlphaConfig { @Bean OmegaCallback omegaCallback() { // TODO: 2017/12/27 to be replaced with actual callback on completion of SCB-138 - return (globalTxId, message) -> { - }; + return (globalTxId, localTxId, compensationMethod, message) -> {}; } @Bean diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java index 9ce7c80..f1f8e40 100644 --- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java +++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java @@ -40,6 +40,7 @@ class SwiftTxEventEndpointImpl implements SwiftTxEventEndpoint { message.localTxId(), message.parentTxId(), message.type(), + message.compensationMethod(), message.payloads() )); } diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java index b027754..fa282b4 100644 --- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java +++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java @@ -42,8 +42,13 @@ class TxEventEnvelope { this.event = event; } - public TxEventEnvelope(String globalTxId, String localTxId, String parentTxId, String type, byte[] payloads) { - this.event = new TxEvent(new Date(), globalTxId, localTxId, parentTxId, type, payloads); + public TxEventEnvelope(String globalTxId, + String localTxId, + String parentTxId, + String type, + String compensationMethod, + byte[] payloads) { + this.event = new TxEvent(new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, payloads); } public long creationTime() { @@ -66,6 +71,10 @@ class TxEventEnvelope { return event.type(); } + String compensationMethod() { + return event.compensationMethod(); + } + byte[] payloads() { return event.payloads(); } diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java index 04ff836..5f929a1 100644 --- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java +++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java @@ -26,7 +26,7 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEventEnvelope, Long TxEventEnvelope findByEventGlobalTxId(String globalTxId); @Query("SELECT DISTINCT new io.servicecomb.saga.alpha.server.TxEventEnvelope(" - + "t.event.globalTxId, t.event.localTxId, t.event.parentTxId, t.event.type, t.event.payloads" + + "t.event.globalTxId, t.event.localTxId, t.event.parentTxId, t.event.type, t.event.compensationMethod, t.event.payloads" + ") FROM TxEventEnvelope t " + "WHERE t.event.globalTxId = ?1 AND t.event.type = ?2") List<TxEventEnvelope> findByEventGlobalTxIdAndEventType(String globalTxId, String type); diff --git a/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java index d932072..f7ec33c 100644 --- a/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java +++ b/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java @@ -21,7 +21,6 @@ import static com.google.common.net.HostAndPort.fromParts; import static io.servicecomb.saga.alpha.core.EventType.TxAbortedEvent; import static io.servicecomb.saga.alpha.core.EventType.TxEndedEvent; import static io.servicecomb.saga.alpha.core.EventType.TxStartedEvent; -import static java.util.Collections.emptyList; import static java.util.concurrent.TimeUnit.SECONDS; import static org.awaitility.Awaitility.await; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -29,12 +28,11 @@ import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; import java.util.ArrayList; +import java.util.Arrays; import java.util.Date; -import java.util.HashMap; import java.util.List; -import java.util.Map; +import java.util.Objects; import java.util.UUID; -import java.util.stream.Collectors; import org.junit.After; import org.junit.AfterClass; @@ -68,12 +66,13 @@ public class AlphaIntegrationTest { private final String globalTxId = UUID.randomUUID().toString(); private final String localTxId = UUID.randomUUID().toString(); private final String parentTxId = UUID.randomUUID().toString(); + private final String compensationMethod = getClass().getCanonicalName(); @Autowired private TxEventEnvelopeRepository eventRepo; @Autowired - private Map<String, List<byte[]>> callbackArgs; + private List<CompensationContext> compensationContexts; private final FramedClientConnector connector = new FramedClientConnector(fromParts("localhost", port)); private SwiftTxEventEndpoint endpoint; @@ -103,40 +102,28 @@ public class AlphaIntegrationTest { assertThat(envelope.localTxId(), is(localTxId)); assertThat(envelope.parentTxId(), is(parentTxId)); assertThat(envelope.type(), is(TxStartedEvent.name())); + assertThat(envelope.compensationMethod(), is(compensationMethod)); assertThat(envelope.payloads(), is(payload.getBytes())); } @Test - public void compensateOnFailure() throws Exception { - eventRepo.save(eventEnvelopeOf(TxStartedEvent, "service a".getBytes())); - eventRepo.save(eventEnvelopeOf(TxEndedEvent, new byte[0])); - eventRepo.save(eventEnvelopeOf(TxStartedEvent, "service b".getBytes())); - eventRepo.save(eventEnvelopeOf(TxEndedEvent, new byte[0])); - - endpoint.handle(someEvent(TxAbortedEvent)); - - await().atMost(1, SECONDS).until(() -> callbackArgs.getOrDefault(globalTxId, emptyList()).size() > 1); - assertThat(stringOf(callbackArgs.get(globalTxId)), containsInAnyOrder("service a", "service b")); - } - - @Test public void doNotCompensateDuplicateTxOnFailure() throws Exception { + // duplicate events with same content but different timestamp eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes())); eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes())); eventRepo.save(eventEnvelopeOf(TxEndedEvent, new byte[0])); - eventRepo.save(eventEnvelopeOf(TxStartedEvent, "service b".getBytes())); + + String localTxId1 = UUID.randomUUID().toString(); + eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId1, UUID.randomUUID().toString(), "service b".getBytes())); eventRepo.save(eventEnvelopeOf(TxEndedEvent, new byte[0])); endpoint.handle(someEvent(TxAbortedEvent)); - await().atMost(1, SECONDS).until(() -> callbackArgs.getOrDefault(globalTxId, emptyList()).size() > 1); - assertThat(stringOf(callbackArgs.get(globalTxId)), containsInAnyOrder("service a", "service b")); - } - - private List<String> stringOf(List<byte[]> bytes) { - return bytes.stream() - .map(String::new) - .collect(Collectors.toList()); + await().atMost(1, SECONDS).until(() -> compensationContexts.size() > 1); + assertThat(compensationContexts, containsInAnyOrder( + new CompensationContext(globalTxId, this.localTxId, compensationMethod, "service a".getBytes()), + new CompensationContext(globalTxId, localTxId1, compensationMethod, "service b".getBytes()) + )); } private SwiftTxEvent someEvent(EventType type) { @@ -146,6 +133,7 @@ public class AlphaIntegrationTest { this.localTxId, this.parentTxId, type.name(), + compensationMethod, payload.getBytes()); } @@ -159,22 +147,67 @@ public class AlphaIntegrationTest { localTxId, parentTxId, eventType.name(), + compensationMethod, payloads)); } @Configuration static class OmegaCallbackConfig { - private final Map<String, List<byte[]>> callbackArgs = new HashMap<>(); + private final List<CompensationContext> compensationContexts = new ArrayList<>(); @Bean - Map<String, List<byte[]>> callbackArgs() { - return callbackArgs; + List<CompensationContext> compensationContexts() { + return compensationContexts; } @Bean OmegaCallback omegaCallback() { - return (key, value) -> callbackArgs.computeIfAbsent(key, k -> new ArrayList<>()).add(value); + return (globalTxId, localTxId, compensationMethod, payloads) -> + compensationContexts.add(new CompensationContext(globalTxId, localTxId, compensationMethod, payloads)); } } + private static class CompensationContext { + private final String globalTxId; + private final String localTxId; + private final String compensationMethod; + private final byte[] message; + + private CompensationContext(String globalTxId, String localTxId, String compensationMethod, byte[] message) { + this.globalTxId = globalTxId; + this.localTxId = localTxId; + this.compensationMethod = compensationMethod; + this.message = message; + } + + @Override + public String toString() { + return "CompensationContext{" + + "globalTxId='" + globalTxId + '\'' + + ", localTxId='" + localTxId + '\'' + + ", compensationMethod='" + compensationMethod + '\'' + + ", message=" + Arrays.toString(message) + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CompensationContext that = (CompensationContext) o; + return Objects.equals(globalTxId, that.globalTxId) && + Objects.equals(localTxId, that.localTxId) && + Objects.equals(compensationMethod, that.compensationMethod) && + Arrays.equals(message, that.message); + } + + @Override + public int hashCode() { + return Objects.hash(globalTxId, localTxId, compensationMethod, message); + } + } } -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
