This is an automated email from the ASF dual-hosted git repository. seanyinx pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 8190d30d8896d5aba923040a255628f6a202856a Author: seanyinx <[email protected]> AuthorDate: Sat Jan 13 09:47:39 2018 +0800 SCB-220 compensated only ended events Signed-off-by: seanyinx <[email protected]> --- .../saga/alpha/core/TxConsistentService.java | 46 +++++++++------- .../servicecomb/saga/alpha/core/TxEvent.java | 12 +++++ .../saga/alpha/core/TxEventRepository.java | 4 ++ .../saga/alpha/core/TxConsistentServiceTest.java | 39 ++++++++++++-- .../saga/alpha/server/SpringTxEventRepository.java | 16 ++++-- .../alpha/server/TxEventEnvelopeRepository.java | 24 +++++++-- .../saga/alpha/server/AlphaIntegrationTest.java | 62 ++++++++++++++++++---- 7 files changed, 164 insertions(+), 39 deletions(-) diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java index fa93752..9c592d8 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java @@ -17,6 +17,7 @@ package org.apache.servicecomb.saga.alpha.core; +import static java.util.Collections.emptySet; import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent; import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent; import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent; @@ -25,11 +26,10 @@ import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.Consumer; @@ -42,11 +42,12 @@ public class TxConsistentService { private final TxEventRepository eventRepository; private final OmegaCallback omegaCallback; private final Map<String, Consumer<TxEvent>> eventCallbacks = new HashMap<String, Consumer<TxEvent>>() {{ + put(TxEndedEvent.name(), (event) -> compensateIfAlreadyAborted(event)); put(TxAbortedEvent.name(), (event) -> compensate(event)); put(TxCompensatedEvent.name(), (event) -> updateCompensateStatus(event)); }}; - private final Map<String, Set<String>> eventsToCompensate = new ConcurrentHashMap<>(); + private final Map<String, Set<String>> eventsToCompensate = new HashMap<>(); private final ExecutorService executor = Executors.newSingleThreadExecutor(); public TxConsistentService(TxEventRepository eventRepository, OmegaCallback omegaCallback) { @@ -61,27 +62,38 @@ public class TxConsistentService { eventRepository.save(event); - executor.execute(() -> { - if (isTxEndedEvent(event) && isGlobalTxAborted(event)) { - omegaCallback.compensate(event); - } + executor.execute(() -> eventCallbacks.getOrDefault(event.type(), DO_NOTHING_CONSUMER).accept(event)); + } + + private void compensateIfAlreadyAborted(TxEvent event) { + if (!isCompensationScheduled(event) && isGlobalTxAborted(event)) { + eventsToCompensate.computeIfAbsent(event.globalTxId(), k -> new HashSet<>()).add(event.localTxId()); + TxEvent correspondingStartedEvent = eventRepository + .findFirstTransaction(event.globalTxId(), event.localTxId(), TxStartedEvent.name()); + + omegaCallback.compensate(correspondingStartedEvent); + } + } - eventCallbacks.getOrDefault(event.type(), DO_NOTHING_CONSUMER).accept(event); - }); + private boolean isCompensationScheduled(TxEvent event) { + return eventsToCompensate.getOrDefault(event.globalTxId(), emptySet()).contains(event.localTxId()); return true; } private void compensate(TxEvent event) { - List<TxEvent> events = eventRepository.findTransactions(event.globalTxId(), TxStartedEvent.name()); - eventsToCompensate.computeIfAbsent(event.globalTxId(), (v) -> { - Set<String> eventSet = new ConcurrentSkipListSet<>(); - events.forEach(e -> eventSet.add(e.localTxId())); - return eventSet; - }); + List<TxEvent> events = eventRepository.findTransactionsToCompensate(event.globalTxId()); + + events.removeIf(this::isCompensationScheduled); + + Set<String> localTxIds = eventsToCompensate.computeIfAbsent(event.globalTxId(), k -> new HashSet<>()); + events.forEach(e -> localTxIds.add(e.localTxId())); + events.forEach(omegaCallback::compensate); } + // TODO: 2018/1/13 SagaEndedEvent may still not be the last, because some omegas may have slow network and its TxEndedEvent reached late, + // unless we ask user to specify a name for each participant in the global TX in @Compensable private void updateCompensateStatus(TxEvent event) { Set<String> events = eventsToCompensate.get(event.globalTxId()); if (events != null) { @@ -102,8 +114,4 @@ public class TxConsistentService { private boolean isGlobalTxAborted(TxEvent event) { return !eventRepository.findTransactions(event.globalTxId(), TxAbortedEvent.name()).isEmpty(); } - - private boolean isTxEndedEvent(TxEvent event) { - return TxEndedEvent.name().equals(event.type()); - } } diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java index 9a2cea4..ebea44c 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java @@ -36,6 +36,18 @@ public class TxEvent { public TxEvent( String serviceName, String instanceId, + String globalTxId, + String localTxId, + String parentTxId, + String type, + String compensationMethod, + byte[] payloads) { + this(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, payloads); + } + + public TxEvent( + String serviceName, + String instanceId, Date creationTime, String globalTxId, String localTxId, diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java index 3a8387b..cf5706b 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java @@ -23,4 +23,8 @@ public interface TxEventRepository { void save(TxEvent event); List<TxEvent> findTransactions(String globalTxId, String type); + + TxEvent findFirstTransaction(String globalTxId, String localTxId, String type); + + List<TxEvent> findTransactionsToCompensate(String globalTxId); } diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java index 9318a06..99667e7 100644 --- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java +++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java @@ -56,6 +56,40 @@ public class TxConsistentServiceTest { .filter(event -> globalTxId.equals(event.globalTxId()) && type.equals(event.type())) .collect(Collectors.toList()); } + + @Override + public TxEvent findFirstTransaction(String globalTxId, String localTxId, String type) { + return events.stream() + .filter(event -> globalTxId.equals(event.globalTxId()) && localTxId.equals(event.localTxId()) && type.equals(event.type())) + .findFirst() + .get(); + } + + @Override + public List<TxEvent> findTransactionsToCompensate(String globalTxId) { + return events.stream() + .filter(event -> globalTxId.equals(event.globalTxId()) + && event.type().equals(TxStartedEvent.name()) + && isCompleted(globalTxId, event) + && !isCompensated(globalTxId, event)) + .collect(Collectors.toList()); + } + + private boolean isCompleted(String globalTxId, TxEvent event) { + return events.stream() + .filter(e -> globalTxId.equals(e.globalTxId()) + && e.localTxId().equals(event.localTxId()) + && e.type().equals(TxEndedEvent.name())) + .count() > 0; + } + + private boolean isCompensated(String globalTxId, TxEvent event) { + return events.stream() + .filter(e -> globalTxId.equals(e.globalTxId()) + && e.localTxId().equals(event.localTxId()) + && e.type().equals(TxCompensatedEvent.name())) + .count() > 0; + } }; private final String globalTxId = UUID.randomUUID().toString(); @@ -121,17 +155,16 @@ public class TxConsistentServiceTest { @Test public void compensateTxEndedEventImmediately_IfGlobalTxAlreadyFailed() throws Exception { - String localTxId1 = UUID.randomUUID().toString(); events.add(newEvent(TxStartedEvent)); events.add(newEvent(TxAbortedEvent)); - TxEvent event = eventOf(TxEndedEvent, "service x".getBytes(), localTxId1, "method x"); + TxEvent event = eventOf(TxEndedEvent, new byte[0], localTxId, compensationMethod); consistentService.handle(event); await().atMost(1, SECONDS).until(() -> compensationContexts.size() > 0); assertThat(compensationContexts, containsInAnyOrder( - new CompensationContext(globalTxId, localTxId1, "method x", "service x".getBytes()) + new CompensationContext(globalTxId, localTxId, compensationMethod, "yeah".getBytes()) )); } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java index 3bf6e03..00d10d2 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java @@ -18,7 +18,6 @@ package org.apache.servicecomb.saga.alpha.server; import java.util.List; -import java.util.stream.Collectors; import org.apache.servicecomb.saga.alpha.core.TxEvent; import org.apache.servicecomb.saga.alpha.core.TxEventRepository; @@ -37,9 +36,16 @@ class SpringTxEventRepository implements TxEventRepository { @Override public List<TxEvent> findTransactions(String globalTxId, String type) { - return eventRepo.findByEventGlobalTxIdAndEventType(globalTxId, type) - .stream() - .map(TxEventEnvelope::event) - .collect(Collectors.toList()); + return eventRepo.findByEventGlobalTxIdAndEventType(globalTxId, type); + } + + @Override + public TxEvent findFirstTransaction(String globalTxId, String localTxId, String type) { + return eventRepo.findFirstByEventGlobalTxIdAndEventLocalTxIdAndEventType(globalTxId, localTxId, type).event(); + } + + @Override + public List<TxEvent> findTransactionsToCompensate(String globalTxId) { + return eventRepo.findStartedEventsWithMatchingEndedButNotCompensatedEvents(globalTxId); } } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java index fe05c1e..1e1859c 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java @@ -19,15 +19,33 @@ package org.apache.servicecomb.saga.alpha.server; import java.util.List; +import org.apache.servicecomb.saga.alpha.core.TxEvent; import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.CrudRepository; interface TxEventEnvelopeRepository extends CrudRepository<TxEventEnvelope, Long> { - TxEventEnvelope findByEventGlobalTxId(String globalTxId); + List<TxEventEnvelope> findByEventGlobalTxId(String globalTxId); - @Query("SELECT DISTINCT new org.apache.servicecomb.saga.alpha.server.TxEventEnvelope(" + @Query("SELECT DISTINCT new org.apache.servicecomb.saga.alpha.core.TxEvent(" + "t.event.serviceName, t.event.instanceId, t.event.globalTxId, t.event.localTxId, t.event.parentTxId, t.event.type, t.event.compensationMethod, t.event.payloads" + ") FROM TxEventEnvelope t " + "WHERE t.event.globalTxId = ?1 AND t.event.type = ?2") - List<TxEventEnvelope> findByEventGlobalTxIdAndEventType(String globalTxId, String type); + List<TxEvent> findByEventGlobalTxIdAndEventType(String globalTxId, String type); + + TxEventEnvelope findFirstByEventGlobalTxIdAndEventLocalTxIdAndEventType(String globalTxId, String localTxId, String type); + + @Query("SELECT DISTINCT new org.apache.servicecomb.saga.alpha.core.TxEvent(" + + "t.event.serviceName, t.event.instanceId, t.event.globalTxId, t.event.localTxId, t.event.parentTxId, t.event.type, t.event.compensationMethod, t.event.payloads" + + ") FROM TxEventEnvelope t " + + "WHERE t.event.globalTxId = ?1 AND t.event.type = 'TxStartedEvent' AND EXISTS ( " + + " FROM TxEventEnvelope t1 " + + " WHERE t1.event.globalTxId = ?1 " + + " AND t1.event.localTxId = t.event.localTxId " + + " AND t1.event.type = 'TxEndedEvent'" + + ") AND NOT EXISTS ( " + + " FROM TxEventEnvelope t2 " + + " WHERE t2.event.globalTxId = ?1 " + + " AND t2.event.localTxId = t.event.localTxId " + + " AND t2.event.type = 'TxCompensatedEvent')") + List<TxEvent> findStartedEventsWithMatchingEndedButNotCompensatedEvents(String globalTxId); } diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java index 6cf1893..ab9c7ec 100644 --- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java +++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java @@ -20,6 +20,7 @@ package org.apache.servicecomb.saga.alpha.server; import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent; +import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent; import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent; import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent; import static org.awaitility.Awaitility.await; @@ -119,11 +120,11 @@ public class AlphaIntegrationTest { asyncStub.onConnected(serviceConfig, compensateResponseObserver); blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent)); // use the asynchronous stub need to wait for some time - await().atMost(1, SECONDS).until(() -> eventRepo.findByEventGlobalTxId(globalTxId) != null); + await().atMost(1, SECONDS).until(() -> !eventRepo.findByEventGlobalTxId(globalTxId).isEmpty()); assertThat(receivedCommands.isEmpty(), is(true)); - TxEventEnvelope envelope = eventRepo.findByEventGlobalTxId(globalTxId); + TxEventEnvelope envelope = eventRepo.findByEventGlobalTxId(globalTxId).get(0); assertThat(envelope.serviceName(), is(serviceName)); assertThat(envelope.instanceId(), is(instanceId)); @@ -179,6 +180,7 @@ public class AlphaIntegrationTest { public void removeCallbackOnClientDown() throws Exception { asyncStub.onConnected(serviceConfig, compensateResponseObserver); blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent)); + blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent)); omegaCallbacks.get(serviceName).get(instanceId).disconnect(); @@ -188,17 +190,34 @@ public class AlphaIntegrationTest { } @Test + public void compensateImmediatelyWhenGlobalTxAlreadyAborted() throws Exception { + asyncStub.onConnected(serviceConfig, compensateResponseObserver); + blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent)); + blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent)); + + blockingStub.onTxEvent(eventOf(TxEndedEvent, localTxId, parentTxId, new byte[0], compensationMethod)); + await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty()); + + GrpcCompensateCommand command = receivedCommands.poll(); + assertThat(command.getGlobalTxId(), is(globalTxId)); + assertThat(command.getLocalTxId(), is(localTxId)); + assertThat(command.getParentTxId(), is(parentTxId)); + assertThat(command.getCompensateMethod(), is(compensationMethod)); + assertThat(command.getPayloads().toByteArray(), is(payload.getBytes())); + } + + @Test public void doNotCompensateDuplicateTxOnFailure() { // duplicate events with same content but different timestamp asyncStub.onConnected(serviceConfig, compensateResponseObserver); blockingStub.onTxEvent(eventOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a")); blockingStub.onTxEvent(eventOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a")); - blockingStub.onTxEvent(eventOf(TxEndedEvent, new byte[0], "method a")); + blockingStub.onTxEvent(eventOf(TxEndedEvent, localTxId, parentTxId, new byte[0], "method a")); String localTxId1 = UUID.randomUUID().toString(); String parentTxId1 = UUID.randomUUID().toString(); blockingStub.onTxEvent(eventOf(TxStartedEvent, localTxId1, parentTxId1, "service b".getBytes(), "method b")); - blockingStub.onTxEvent(eventOf(TxEndedEvent, new byte[0], "method b")); + blockingStub.onTxEvent(eventOf(TxEndedEvent, localTxId1, parentTxId1, new byte[0], "method b")); blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent)); await().atMost(1, SECONDS).until(() -> receivedCommands.size() > 1); @@ -215,7 +234,8 @@ public class AlphaIntegrationTest { public void getCompensateCommandOnFailure() { asyncStub.onConnected(serviceConfig, compensateResponseObserver); blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent)); - await().atMost(1, SECONDS).until(() -> eventRepo.findByEventGlobalTxId(globalTxId) != null); + blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent)); + await().atMost(1, SECONDS).until(() -> !eventRepo.findByEventGlobalTxId(globalTxId).isEmpty()); blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent)); await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty()); @@ -232,6 +252,7 @@ public class AlphaIntegrationTest { public void compensateOnlyFailedGlobalTransaction() { asyncStub.onConnected(serviceConfig, compensateResponseObserver); blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent)); + blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent)); // simulates connection from another service with different globalTxId GrpcServiceConfig anotherServiceConfig = someServiceConfig(); @@ -240,7 +261,7 @@ public class AlphaIntegrationTest { TxEventServiceBlockingStub anotherBlockingStub = TxEventServiceGrpc.newBlockingStub(clientChannel); anotherBlockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, UUID.randomUUID().toString())); - await().atMost(1, SECONDS).until(() -> eventRepo.count() == 2); + await().atMost(1, SECONDS).until(() -> eventRepo.count() == 3); blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent)); await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty()); @@ -269,6 +290,29 @@ public class AlphaIntegrationTest { assertThat(result.getAborted(), is(true)); } + @Test + public void compensateOnlyCompletedTransactions() { + asyncStub.onConnected(serviceConfig, compensateResponseObserver); + blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent)); + blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent)); + + String anotherLocalTxId1 = UUID.randomUUID().toString(); + blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, globalTxId, anotherLocalTxId1)); + blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent, globalTxId, anotherLocalTxId1)); + blockingStub.onTxEvent(someGrpcEvent(TxCompensatedEvent, globalTxId, anotherLocalTxId1)); + + String anotherLocalTxId2 = UUID.randomUUID().toString(); + blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, globalTxId, anotherLocalTxId2)); + + await().atMost(1, SECONDS).until(() -> eventRepo.count() == 6); + + blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent, globalTxId, anotherLocalTxId2)); + await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty()); + + assertThat(receivedCommands.size(), is(1)); + assertThat(receivedCommands.poll().getGlobalTxId(), is(globalTxId)); + } + private GrpcServiceConfig someServiceConfig() { return GrpcServiceConfig.newBuilder() .setServiceName(uniquify("serviceName")) @@ -294,11 +338,11 @@ public class AlphaIntegrationTest { } private GrpcTxEvent someGrpcEvent(EventType type, String globalTxId) { - return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName()); + return someGrpcEvent(type, globalTxId, localTxId); } - private GrpcTxEvent eventOf(EventType eventType, byte[] payloads, String compensationMethod) { - return eventOf(eventType, UUID.randomUUID().toString(), UUID.randomUUID().toString(), payloads, compensationMethod); + private GrpcTxEvent someGrpcEvent(EventType type, String globalTxId, String localTxId) { + return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName()); } private GrpcTxEvent eventOf(EventType eventType, String localTxId, String parentTxId, byte[] payloads, String compensationMethod) { -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
