This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
The following commit(s) were added to refs/heads/master by this push: new 1f79ce0 SCB-140 compensated only distinct events SCB-141 supported multiple sub tx within the same global tx on a single service 1f79ce0 is described below commit 1f79ce08db38f496ee06da5e17f1e0504c4c0ee3 Author: seanyinx <sean....@huawei.com> AuthorDate: Thu Dec 28 10:28:16 2017 +0800 SCB-140 compensated only distinct events SCB-141 supported multiple sub tx within the same global tx on a single service Signed-off-by: seanyinx <sean....@huawei.com> --- .../saga/alpha/core/TxConsistentService.java | 3 +- .../saga/alpha/core/TxEventRepository.java | 2 +- .../saga/alpha/core/TxConsistentServiceTest.java | 6 ++-- .../saga/alpha/server/SpringTxEventRepository.java | 2 +- .../saga/alpha/server/TxEventEnvelope.java | 6 ++++ .../alpha/server/TxEventEnvelopeRepository.java | 5 ++++ .../saga/alpha/server/AlphaIntegrationTest.java | 26 ++++++++++++++--- omega/omega-context/pom.xml | 4 +++ .../saga/omega/context/OmegaContext.java | 34 +++++++++++++--------- .../spring/TransactionInterceptionTest.java | 8 +++-- .../saga/omega/transaction/TransactionAspect.java | 1 + 11 files changed, 70 insertions(+), 27 deletions(-) diff --git a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java index af02d74..22605f8 100644 --- a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java +++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java @@ -18,7 +18,6 @@ package io.servicecomb.saga.alpha.core; import static io.servicecomb.saga.alpha.core.EventType.TxAbortedEvent; -import static io.servicecomb.saga.alpha.core.EventType.TxEndedEvent; import static io.servicecomb.saga.alpha.core.EventType.TxStartedEvent; import java.util.HashMap; @@ -49,7 +48,7 @@ public class TxConsistentService { // TODO: 2017/12/27 we must define a way to find which service to compensate, to avoid sending to all private void compensate(TxEvent event) { - List<TxEvent> events = eventRepository.findCompletedEvents(event.globalTxId(), TxEndedEvent.name()); + List<TxEvent> events = eventRepository.findStartedTransactions(event.globalTxId(), TxStartedEvent.name()); events.forEach(evt -> omegaCallback.compensate(evt.globalTxId(), evt.payloads())); } } diff --git a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java index cb44f77..9eed4ea 100644 --- a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java +++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java @@ -22,5 +22,5 @@ import java.util.List; public interface TxEventRepository { void save(TxEvent event); - List<TxEvent> findCompletedEvents(String globalTxId, String type); + List<TxEvent> findStartedTransactions(String globalTxId, String type); } diff --git a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java index 64ed62c..d7e66c3 100644 --- a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java +++ b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java @@ -50,7 +50,7 @@ public class TxConsistentServiceTest { } @Override - public List<TxEvent> findCompletedEvents(String globalTxId, String type) { + public List<TxEvent> findStartedTransactions(String globalTxId, String type) { return events.stream() .filter(event -> globalTxId.equals(event.globalTxId()) && type.equals(event.type())) .collect(Collectors.toList()); @@ -85,9 +85,9 @@ public class TxConsistentServiceTest { @Test public void compensateGlobalTx_OnAnyLocalTxFailure() throws Exception { events.add(eventOf(TxStartedEvent, "service a".getBytes())); - events.add(eventOf(TxEndedEvent, "service a".getBytes())); + events.add(eventOf(TxEndedEvent, new byte[0])); events.add(eventOf(TxStartedEvent, "service b".getBytes())); - events.add(eventOf(TxEndedEvent, "service b".getBytes())); + events.add(eventOf(TxEndedEvent, new byte[0])); TxEvent abortEvent = newEvent(TxAbortedEvent); diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SpringTxEventRepository.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SpringTxEventRepository.java index 7b59d29..e8c8058 100644 --- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SpringTxEventRepository.java +++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SpringTxEventRepository.java @@ -36,7 +36,7 @@ class SpringTxEventRepository implements TxEventRepository { } @Override - public List<TxEvent> findCompletedEvents(String globalTxId, String type) { + public List<TxEvent> findStartedTransactions(String globalTxId, String type) { return eventRepo.findByEventGlobalTxIdAndEventType(globalTxId, type) .stream() .map(TxEventEnvelope::event) diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java index 152edfb..b027754 100644 --- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java +++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java @@ -17,6 +17,8 @@ package io.servicecomb.saga.alpha.server; +import java.util.Date; + import javax.persistence.Embedded; import javax.persistence.Entity; import javax.persistence.GeneratedValue; @@ -40,6 +42,10 @@ class TxEventEnvelope { this.event = event; } + public TxEventEnvelope(String globalTxId, String localTxId, String parentTxId, String type, byte[] payloads) { + this.event = new TxEvent(new Date(), globalTxId, localTxId, parentTxId, type, payloads); + } + public long creationTime() { return event.creationTime().getTime(); } diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java index cd3cbc7..04ff836 100644 --- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java +++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java @@ -19,10 +19,15 @@ package io.servicecomb.saga.alpha.server; import java.util.List; +import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.CrudRepository; interface TxEventEnvelopeRepository extends CrudRepository<TxEventEnvelope, Long> { TxEventEnvelope findByEventGlobalTxId(String globalTxId); + @Query("SELECT DISTINCT new io.servicecomb.saga.alpha.server.TxEventEnvelope(" + + "t.event.globalTxId, t.event.localTxId, t.event.parentTxId, t.event.type, t.event.payloads" + + ") FROM TxEventEnvelope t " + + "WHERE t.event.globalTxId = ?1 AND t.event.type = ?2") List<TxEventEnvelope> findByEventGlobalTxIdAndEventType(String globalTxId, String type); } diff --git a/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java index e9c9a98..d932072 100644 --- a/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java +++ b/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java @@ -109,9 +109,23 @@ public class AlphaIntegrationTest { @Test public void compensateOnFailure() throws Exception { eventRepo.save(eventEnvelopeOf(TxStartedEvent, "service a".getBytes())); - eventRepo.save(eventEnvelopeOf(TxEndedEvent, "service a".getBytes())); + eventRepo.save(eventEnvelopeOf(TxEndedEvent, new byte[0])); eventRepo.save(eventEnvelopeOf(TxStartedEvent, "service b".getBytes())); - eventRepo.save(eventEnvelopeOf(TxEndedEvent, "service b".getBytes())); + eventRepo.save(eventEnvelopeOf(TxEndedEvent, new byte[0])); + + endpoint.handle(someEvent(TxAbortedEvent)); + + await().atMost(1, SECONDS).until(() -> callbackArgs.getOrDefault(globalTxId, emptyList()).size() > 1); + assertThat(stringOf(callbackArgs.get(globalTxId)), containsInAnyOrder("service a", "service b")); + } + + @Test + public void doNotCompensateDuplicateTxOnFailure() throws Exception { + eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes())); + eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes())); + eventRepo.save(eventEnvelopeOf(TxEndedEvent, new byte[0])); + eventRepo.save(eventEnvelopeOf(TxStartedEvent, "service b".getBytes())); + eventRepo.save(eventEnvelopeOf(TxEndedEvent, new byte[0])); endpoint.handle(someEvent(TxAbortedEvent)); @@ -136,10 +150,14 @@ public class AlphaIntegrationTest { } private TxEventEnvelope eventEnvelopeOf(EventType eventType, byte[] payloads) { + return eventEnvelopeOf(eventType, UUID.randomUUID().toString(), UUID.randomUUID().toString(), payloads); + } + + private TxEventEnvelope eventEnvelopeOf(EventType eventType, String localTxId, String parentTxId, byte[] payloads) { return new TxEventEnvelope(new TxEvent(new Date(), globalTxId, - UUID.randomUUID().toString(), - UUID.randomUUID().toString(), + localTxId, + parentTxId, eventType.name(), payloads)); } diff --git a/omega/omega-context/pom.xml b/omega/omega-context/pom.xml index 5bba67c..f6cd0c6 100644 --- a/omega/omega-context/pom.xml +++ b/omega/omega-context/pom.xml @@ -29,6 +29,10 @@ <artifactId>omega-context</artifactId> <dependencies> <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> </dependency> diff --git a/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java b/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java index d8cca65..1fe8661 100644 --- a/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java +++ b/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java @@ -17,19 +17,24 @@ package io.servicecomb.saga.omega.context; +import java.lang.invoke.MethodHandles; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class OmegaContext { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); public static final String GLOBAL_TX_ID_KEY = "X-Pack-Global-Transaction-Id"; public static final String LOCAL_TX_ID_KEY = "X-Pack-Local-Transaction-Id"; private final ThreadLocal<String> globalTxId = new ThreadLocal<>(); private final ThreadLocal<String> localTxId = new ThreadLocal<>(); private final ThreadLocal<String> parentTxId = new ThreadLocal<>(); - private final Map<String, CompensationContext> compensationContexts = new ConcurrentHashMap<>(); + private final Map<String, Map<String, CompensationContext>> compensationContexts = new ConcurrentHashMap<>(); private final IdGenerator<String> idGenerator; public OmegaContext(IdGenerator<String> idGenerator) { @@ -73,8 +78,9 @@ public class OmegaContext { } // TODO: 2017/12/23 remove this context entry by the end of its corresponding global tx - public void addContext(String id, Object target, String compensationMethod, Object... args) { - compensationContexts.put(id, new CompensationContext(target, compensationMethod, args)); + public void addContext(String globalTxId, String localTxId, Object target, String compensationMethod, Object... args) { + compensationContexts.computeIfAbsent(globalTxId, k -> new ConcurrentHashMap<>()) + .put(localTxId, new CompensationContext(target, compensationMethod, args)); } public boolean containsContext(String globalTxId) { @@ -82,17 +88,17 @@ public class OmegaContext { } public void compensate(String globalTxId) { - CompensationContext compensationContext = compensationContexts.get(globalTxId); - - try { - invokeMethod(compensationContext); - } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { - throw new IllegalStateException( - "Pre-checking for compensate method " + compensationContext.compensationMethod - + " was somehow skipped, did you forget to configure compensable method checking on service startup?", - e); - } finally { - compensationContexts.remove(globalTxId); + Map<String, CompensationContext> contexts = compensationContexts.remove(globalTxId); + + for (CompensationContext compensationContext : contexts.values()) { + try { + invokeMethod(compensationContext); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + LOG.error( + "Pre-checking for compensate method " + compensationContext.compensationMethod + + " was somehow skipped, did you forget to configure compensable method checking on service startup?", + e); + } } } diff --git a/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java index 67fdff9..83e57b0 100644 --- a/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java +++ b/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java @@ -105,10 +105,14 @@ public class TransactionInterceptionTest { public void compensateOnTransactionException() throws Exception { User user = userService.add(new User(username, email)); + // another sub transaction to the same service within the same global transaction + omegaContext.newLocalTxId(); + User anotherUser = userService.add(new User(uniquify("Jack"), uniquify("j...@gmail.com"))); + messageHandler.onReceive("to be compensated".getBytes()); - User actual = userRepository.findOne(user.id()); - assertThat(actual, is(nullValue())); + assertThat(userRepository.findOne(user.id()), is(nullValue())); + assertThat(userRepository.findOne(anotherUser.id()), is(nullValue())); assertThat(omegaContext.containsContext(globalTxId), is(false)); } diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TransactionAspect.java index f78ed31..ecce0ee 100644 --- a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TransactionAspect.java +++ b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TransactionAspect.java @@ -49,6 +49,7 @@ public class TransactionAspect { LOG.debug("Intercepting compensable method {} with context {}", method.toString(), context); context.addContext(context.globalTxId(), + context.localTxId(), joinPoint.getTarget(), compensable.compensationMethod(), joinPoint.getArgs()); -- To stop receiving notification emails like this one, please contact ['"commits@servicecomb.apache.org" <commits@servicecomb.apache.org>'].