This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
The following commit(s) were added to refs/heads/master by this push:
new 1f79ce0 SCB-140 compensated only distinct events SCB-141 supported
multiple sub tx within the same global tx on a single service
1f79ce0 is described below
commit 1f79ce08db38f496ee06da5e17f1e0504c4c0ee3
Author: seanyinx <[email protected]>
AuthorDate: Thu Dec 28 10:28:16 2017 +0800
SCB-140 compensated only distinct events
SCB-141 supported multiple sub tx within the same global tx on a single
service
Signed-off-by: seanyinx <[email protected]>
---
.../saga/alpha/core/TxConsistentService.java | 3 +-
.../saga/alpha/core/TxEventRepository.java | 2 +-
.../saga/alpha/core/TxConsistentServiceTest.java | 6 ++--
.../saga/alpha/server/SpringTxEventRepository.java | 2 +-
.../saga/alpha/server/TxEventEnvelope.java | 6 ++++
.../alpha/server/TxEventEnvelopeRepository.java | 5 ++++
.../saga/alpha/server/AlphaIntegrationTest.java | 26 ++++++++++++++---
omega/omega-context/pom.xml | 4 +++
.../saga/omega/context/OmegaContext.java | 34 +++++++++++++---------
.../spring/TransactionInterceptionTest.java | 8 +++--
.../saga/omega/transaction/TransactionAspect.java | 1 +
11 files changed, 70 insertions(+), 27 deletions(-)
diff --git
a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java
b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java
index af02d74..22605f8 100644
---
a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java
+++
b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -18,7 +18,6 @@
package io.servicecomb.saga.alpha.core;
import static io.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
-import static io.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
import static io.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
import java.util.HashMap;
@@ -49,7 +48,7 @@ public class TxConsistentService {
// TODO: 2017/12/27 we must define a way to find which service to
compensate, to avoid sending to all
private void compensate(TxEvent event) {
- List<TxEvent> events =
eventRepository.findCompletedEvents(event.globalTxId(), TxEndedEvent.name());
+ List<TxEvent> events =
eventRepository.findStartedTransactions(event.globalTxId(),
TxStartedEvent.name());
events.forEach(evt -> omegaCallback.compensate(evt.globalTxId(),
evt.payloads()));
}
}
diff --git
a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java
b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java
index cb44f77..9eed4ea 100644
---
a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java
+++
b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java
@@ -22,5 +22,5 @@ import java.util.List;
public interface TxEventRepository {
void save(TxEvent event);
- List<TxEvent> findCompletedEvents(String globalTxId, String type);
+ List<TxEvent> findStartedTransactions(String globalTxId, String type);
}
diff --git
a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index 64ed62c..d7e66c3 100644
---
a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++
b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -50,7 +50,7 @@ public class TxConsistentServiceTest {
}
@Override
- public List<TxEvent> findCompletedEvents(String globalTxId, String type) {
+ public List<TxEvent> findStartedTransactions(String globalTxId, String
type) {
return events.stream()
.filter(event -> globalTxId.equals(event.globalTxId()) &&
type.equals(event.type()))
.collect(Collectors.toList());
@@ -85,9 +85,9 @@ public class TxConsistentServiceTest {
@Test
public void compensateGlobalTx_OnAnyLocalTxFailure() throws Exception {
events.add(eventOf(TxStartedEvent, "service a".getBytes()));
- events.add(eventOf(TxEndedEvent, "service a".getBytes()));
+ events.add(eventOf(TxEndedEvent, new byte[0]));
events.add(eventOf(TxStartedEvent, "service b".getBytes()));
- events.add(eventOf(TxEndedEvent, "service b".getBytes()));
+ events.add(eventOf(TxEndedEvent, new byte[0]));
TxEvent abortEvent = newEvent(TxAbortedEvent);
diff --git
a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SpringTxEventRepository.java
b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SpringTxEventRepository.java
index 7b59d29..e8c8058 100644
---
a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SpringTxEventRepository.java
+++
b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SpringTxEventRepository.java
@@ -36,7 +36,7 @@ class SpringTxEventRepository implements TxEventRepository {
}
@Override
- public List<TxEvent> findCompletedEvents(String globalTxId, String type) {
+ public List<TxEvent> findStartedTransactions(String globalTxId, String type)
{
return eventRepo.findByEventGlobalTxIdAndEventType(globalTxId, type)
.stream()
.map(TxEventEnvelope::event)
diff --git
a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java
b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java
index 152edfb..b027754 100644
---
a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java
+++
b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java
@@ -17,6 +17,8 @@
package io.servicecomb.saga.alpha.server;
+import java.util.Date;
+
import javax.persistence.Embedded;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
@@ -40,6 +42,10 @@ class TxEventEnvelope {
this.event = event;
}
+ public TxEventEnvelope(String globalTxId, String localTxId, String
parentTxId, String type, byte[] payloads) {
+ this.event = new TxEvent(new Date(), globalTxId, localTxId, parentTxId,
type, payloads);
+ }
+
public long creationTime() {
return event.creationTime().getTime();
}
diff --git
a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index cd3cbc7..04ff836 100644
---
a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++
b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -19,10 +19,15 @@ package io.servicecomb.saga.alpha.server;
import java.util.List;
+import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.CrudRepository;
interface TxEventEnvelopeRepository extends CrudRepository<TxEventEnvelope,
Long> {
TxEventEnvelope findByEventGlobalTxId(String globalTxId);
+ @Query("SELECT DISTINCT new
io.servicecomb.saga.alpha.server.TxEventEnvelope("
+ + "t.event.globalTxId, t.event.localTxId, t.event.parentTxId,
t.event.type, t.event.payloads"
+ + ") FROM TxEventEnvelope t "
+ + "WHERE t.event.globalTxId = ?1 AND t.event.type = ?2")
List<TxEventEnvelope> findByEventGlobalTxIdAndEventType(String globalTxId,
String type);
}
diff --git
a/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
b/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index e9c9a98..d932072 100644
---
a/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++
b/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -109,9 +109,23 @@ public class AlphaIntegrationTest {
@Test
public void compensateOnFailure() throws Exception {
eventRepo.save(eventEnvelopeOf(TxStartedEvent, "service a".getBytes()));
- eventRepo.save(eventEnvelopeOf(TxEndedEvent, "service a".getBytes()));
+ eventRepo.save(eventEnvelopeOf(TxEndedEvent, new byte[0]));
eventRepo.save(eventEnvelopeOf(TxStartedEvent, "service b".getBytes()));
- eventRepo.save(eventEnvelopeOf(TxEndedEvent, "service b".getBytes()));
+ eventRepo.save(eventEnvelopeOf(TxEndedEvent, new byte[0]));
+
+ endpoint.handle(someEvent(TxAbortedEvent));
+
+ await().atMost(1, SECONDS).until(() ->
callbackArgs.getOrDefault(globalTxId, emptyList()).size() > 1);
+ assertThat(stringOf(callbackArgs.get(globalTxId)),
containsInAnyOrder("service a", "service b"));
+ }
+
+ @Test
+ public void doNotCompensateDuplicateTxOnFailure() throws Exception {
+ eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId, parentTxId,
"service a".getBytes()));
+ eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId, parentTxId,
"service a".getBytes()));
+ eventRepo.save(eventEnvelopeOf(TxEndedEvent, new byte[0]));
+ eventRepo.save(eventEnvelopeOf(TxStartedEvent, "service b".getBytes()));
+ eventRepo.save(eventEnvelopeOf(TxEndedEvent, new byte[0]));
endpoint.handle(someEvent(TxAbortedEvent));
@@ -136,10 +150,14 @@ public class AlphaIntegrationTest {
}
private TxEventEnvelope eventEnvelopeOf(EventType eventType, byte[]
payloads) {
+ return eventEnvelopeOf(eventType, UUID.randomUUID().toString(),
UUID.randomUUID().toString(), payloads);
+ }
+
+ private TxEventEnvelope eventEnvelopeOf(EventType eventType, String
localTxId, String parentTxId, byte[] payloads) {
return new TxEventEnvelope(new TxEvent(new Date(),
globalTxId,
- UUID.randomUUID().toString(),
- UUID.randomUUID().toString(),
+ localTxId,
+ parentTxId,
eventType.name(),
payloads));
}
diff --git a/omega/omega-context/pom.xml b/omega/omega-context/pom.xml
index 5bba67c..f6cd0c6 100644
--- a/omega/omega-context/pom.xml
+++ b/omega/omega-context/pom.xml
@@ -29,6 +29,10 @@
<artifactId>omega-context</artifactId>
<dependencies>
<dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
diff --git
a/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java
b/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java
index d8cca65..1fe8661 100644
---
a/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java
+++
b/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java
@@ -17,19 +17,24 @@
package io.servicecomb.saga.omega.context;
+import java.lang.invoke.MethodHandles;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public class OmegaContext {
+ private static final Logger LOG =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final String GLOBAL_TX_ID_KEY = "X-Pack-Global-Transaction-Id";
public static final String LOCAL_TX_ID_KEY = "X-Pack-Local-Transaction-Id";
private final ThreadLocal<String> globalTxId = new ThreadLocal<>();
private final ThreadLocal<String> localTxId = new ThreadLocal<>();
private final ThreadLocal<String> parentTxId = new ThreadLocal<>();
- private final Map<String, CompensationContext> compensationContexts = new
ConcurrentHashMap<>();
+ private final Map<String, Map<String, CompensationContext>>
compensationContexts = new ConcurrentHashMap<>();
private final IdGenerator<String> idGenerator;
public OmegaContext(IdGenerator<String> idGenerator) {
@@ -73,8 +78,9 @@ public class OmegaContext {
}
// TODO: 2017/12/23 remove this context entry by the end of its
corresponding global tx
- public void addContext(String id, Object target, String compensationMethod,
Object... args) {
- compensationContexts.put(id, new CompensationContext(target,
compensationMethod, args));
+ public void addContext(String globalTxId, String localTxId, Object target,
String compensationMethod, Object... args) {
+ compensationContexts.computeIfAbsent(globalTxId, k -> new
ConcurrentHashMap<>())
+ .put(localTxId, new CompensationContext(target, compensationMethod,
args));
}
public boolean containsContext(String globalTxId) {
@@ -82,17 +88,17 @@ public class OmegaContext {
}
public void compensate(String globalTxId) {
- CompensationContext compensationContext =
compensationContexts.get(globalTxId);
-
- try {
- invokeMethod(compensationContext);
- } catch (NoSuchMethodException | IllegalAccessException |
InvocationTargetException e) {
- throw new IllegalStateException(
- "Pre-checking for compensate method " +
compensationContext.compensationMethod
- + " was somehow skipped, did you forget to configure compensable
method checking on service startup?",
- e);
- } finally {
- compensationContexts.remove(globalTxId);
+ Map<String, CompensationContext> contexts =
compensationContexts.remove(globalTxId);
+
+ for (CompensationContext compensationContext : contexts.values()) {
+ try {
+ invokeMethod(compensationContext);
+ } catch (NoSuchMethodException | IllegalAccessException |
InvocationTargetException e) {
+ LOG.error(
+ "Pre-checking for compensate method " +
compensationContext.compensationMethod
+ + " was somehow skipped, did you forget to configure
compensable method checking on service startup?",
+ e);
+ }
}
}
diff --git
a/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
b/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
index 67fdff9..83e57b0 100644
---
a/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
+++
b/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
@@ -105,10 +105,14 @@ public class TransactionInterceptionTest {
public void compensateOnTransactionException() throws Exception {
User user = userService.add(new User(username, email));
+ // another sub transaction to the same service within the same global
transaction
+ omegaContext.newLocalTxId();
+ User anotherUser = userService.add(new User(uniquify("Jack"),
uniquify("[email protected]")));
+
messageHandler.onReceive("to be compensated".getBytes());
- User actual = userRepository.findOne(user.id());
- assertThat(actual, is(nullValue()));
+ assertThat(userRepository.findOne(user.id()), is(nullValue()));
+ assertThat(userRepository.findOne(anotherUser.id()), is(nullValue()));
assertThat(omegaContext.containsContext(globalTxId), is(false));
}
diff --git
a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TransactionAspect.java
b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TransactionAspect.java
index f78ed31..ecce0ee 100644
---
a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TransactionAspect.java
+++
b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -49,6 +49,7 @@ public class TransactionAspect {
LOG.debug("Intercepting compensable method {} with context {}",
method.toString(), context);
context.addContext(context.globalTxId(),
+ context.localTxId(),
joinPoint.getTarget(),
compensable.compensationMethod(),
joinPoint.getArgs());
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].