This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 70bafcf53fd739467c0be6d7a57319f378c16566 Author: seanyinx <sean....@huawei.com> AuthorDate: Sat Dec 23 10:11:27 2017 +0800 SCB-96 added local and parent tx id to keep track of entire tx graph Signed-off-by: seanyinx <sean....@huawei.com> --- .../saga/omega/context/OmegaContext.java | 37 +++++++++++++--- .../saga/omega/context/OmegaContextTest.java | 49 +++++++++++++++++++--- .../spring/TransactionInterceptionTest.java | 28 +++++++++---- .../transaction/PreTransactionInterceptor.java | 4 +- .../saga/omega/transaction/TransactionAspect.java | 9 +++- .../saga/omega/transaction/TxEvent.java | 22 +++++++--- .../saga/omega/transaction/TxStartedEvent.java | 4 +- .../transaction/PreTransactionInterceptorTest.java | 16 +++---- 8 files changed, 133 insertions(+), 36 deletions(-) diff --git a/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java b/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java index 2c33e41..9884349 100644 --- a/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java +++ b/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java @@ -18,14 +18,41 @@ package io.servicecomb.saga.omega.context; public class OmegaContext { - private final ThreadLocal<Long> transactionId = new ThreadLocal<>(); + private final ThreadLocal<String> globalTxId = new ThreadLocal<>(); + private final ThreadLocal<String> localTxId = new ThreadLocal<>(); + private final ThreadLocal<String> parentTxId = new ThreadLocal<>(); - public void setTxId(long txId) { - transactionId.set(txId); + public void setGlobalTxId(String txId) { + globalTxId.set(txId); } - public long txId() { - return transactionId.get(); + public String globalTxId() { + return globalTxId.get(); + } + + public void setLocalTxId(String localTxId) { + this.localTxId.set(localTxId); + } + + public String localTxId() { + return localTxId.get(); + } + + public String parentTxId() { + return parentTxId.get(); + } + + public void setParentTxId(String parentTxId) { + this.parentTxId.set(parentTxId); + } + + @Override + public String toString() { + return "OmegaContext{" + + "globalTxId=" + globalTxId.get() + + ", localTxId=" + localTxId.get() + + ", parentTxId=" + parentTxId.get() + + '}'; } } diff --git a/omega/omega-context/src/test/java/io/servicecomb/saga/omega/context/OmegaContextTest.java b/omega/omega-context/src/test/java/io/servicecomb/saga/omega/context/OmegaContextTest.java index 82837d1..e890c18 100644 --- a/omega/omega-context/src/test/java/io/servicecomb/saga/omega/context/OmegaContextTest.java +++ b/omega/omega-context/src/test/java/io/servicecomb/saga/omega/context/OmegaContextTest.java @@ -17,10 +17,11 @@ package io.servicecomb.saga.omega.context; -import static com.seanyinx.github.unit.scaffolding.Randomness.nextId; import static org.hamcrest.core.Is.is; -import static org.junit.Assert.*; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CyclicBarrier; @@ -31,15 +32,51 @@ public class OmegaContextTest { private final OmegaContext omegaContext = new OmegaContext(); @Test - public void eachThreadGetsDifferentId() throws Exception { + public void eachThreadGetsDifferentGlobalTxId() throws Exception { CyclicBarrier barrier = new CyclicBarrier(2); Runnable runnable = exceptionalRunnable(() -> { - long txId = nextId(); - omegaContext.setTxId(txId); + String txId = UUID.randomUUID().toString(); + omegaContext.setGlobalTxId(txId); barrier.await(); - assertThat(omegaContext.txId(), is(txId)); + assertThat(omegaContext.globalTxId(), is(txId)); + }); + + CompletableFuture<Void> future1 = CompletableFuture.runAsync(runnable); + CompletableFuture<Void> future2 = CompletableFuture.runAsync(runnable); + + CompletableFuture.allOf(future1, future2).join(); + } + + @Test + public void eachThreadGetsDifferentLocalTxId() throws Exception { + CyclicBarrier barrier = new CyclicBarrier(2); + + Runnable runnable = exceptionalRunnable(() -> { + String spanId = UUID.randomUUID().toString(); + omegaContext.setLocalTxId(spanId); + barrier.await(); + + assertThat(omegaContext.localTxId(), is(spanId)); + }); + + CompletableFuture<Void> future1 = CompletableFuture.runAsync(runnable); + CompletableFuture<Void> future2 = CompletableFuture.runAsync(runnable); + + CompletableFuture.allOf(future1, future2).join(); + } + + @Test + public void eachThreadGetsDifferentParentTxId() throws Exception { + CyclicBarrier barrier = new CyclicBarrier(2); + + Runnable runnable = exceptionalRunnable(() -> { + String parentId = UUID.randomUUID().toString(); + omegaContext.setParentTxId(parentId); + barrier.await(); + + assertThat(omegaContext.parentTxId(), is(parentId)); }); CompletableFuture<Void> future1 = CompletableFuture.runAsync(runnable); diff --git a/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java index e0afebb..c94a4cb 100644 --- a/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java +++ b/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java @@ -17,13 +17,13 @@ package io.servicecomb.saga.omega.transaction.spring; -import static com.seanyinx.github.unit.scaffolding.Randomness.nextId; import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify; import static org.hamcrest.collection.IsIterableContainingInOrder.contains; import static org.junit.Assert.assertThat; import java.util.ArrayList; import java.util.List; +import java.util.UUID; import org.junit.Before; import org.junit.Test; @@ -42,7 +42,9 @@ import io.servicecomb.saga.omega.transaction.spring.TransactionInterceptionTest. @RunWith(SpringRunner.class) @SpringBootTest(classes = {TransactionTestMain.class, MessageConfig.class}) public class TransactionInterceptionTest { - private final long txId = nextId(); + private final String globalTxId = UUID.randomUUID().toString(); + private final String localTxId = UUID.randomUUID().toString(); + private final String parentTxId = UUID.randomUUID().toString(); private final String username = uniquify("username"); private final String email = uniquify("email"); @@ -57,14 +59,16 @@ public class TransactionInterceptionTest { @Before public void setUp() throws Exception { - omegaContext.setTxId(txId); + omegaContext.setGlobalTxId(globalTxId); + omegaContext.setLocalTxId(localTxId); + omegaContext.setParentTxId(parentTxId); } @Test public void sendsUserToRemote_BeforeTransaction() throws Exception { userService.add(new User(username, email)); - assertThat(messages, contains(serialize(txId, "TxStartedEvent", username, email))); + assertThat(messages, contains(serialize(globalTxId, localTxId, parentTxId, "TxStartedEvent", username, email))); } @Configuration @@ -86,14 +90,24 @@ public class TransactionInterceptionTest { return event -> { if (event.payloads()[0] instanceof User) { User user = ((User) event.payloads()[0]); - return serialize(event.txId(), event.type(), user.username(), user.email()); + return serialize(event.globalTxId(), + event.localTxId(), + event.parentTxId(), + event.type(), + user.username(), + user.email()); } throw new IllegalArgumentException("Expected instance of User, but was " + event.getClass()); }; } } - private static byte[] serialize(long txId, String eventType, String username, String email) { - return (txId + ":" + eventType + ":" + username + ":" + email).getBytes(); + private static byte[] serialize(String globalTxId, + String localTxId, + String parentTxId, + String eventType, + String username, + String email) { + return (globalTxId + ":" + localTxId + ":" + parentTxId + ":" + eventType + ":" + username + ":" + email).getBytes(); } } diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/PreTransactionInterceptor.java b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/PreTransactionInterceptor.java index 1934280..951d21f 100644 --- a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/PreTransactionInterceptor.java +++ b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/PreTransactionInterceptor.java @@ -26,7 +26,7 @@ class PreTransactionInterceptor { this.serializer = serializer; } - void intercept(long txId, Object... message) { - sender.send(serializer.serialize(new TxStartedEvent(txId, message))); + void intercept(String globalTxId, String localTxId, String parentTxId, Object... message) { + sender.send(serializer.serialize(new TxStartedEvent(globalTxId, localTxId, parentTxId, message))); } } diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TransactionAspect.java index 1701b98..cafc61a 100644 --- a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TransactionAspect.java +++ b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TransactionAspect.java @@ -43,9 +43,14 @@ public class TransactionAspect { @Around("execution(@javax.transaction.Transactional * *(..))") Object advise(ProceedingJoinPoint joinPoint) throws Throwable { Method method = ((MethodSignature) joinPoint.getSignature()).getMethod(); - LOG.debug("Intercepting transactional method {}", method.toString()); + LOG.debug("Intercepting transactional method {} with context {}", method.toString(), context); + + preTransactionInterceptor.intercept( + context.globalTxId(), + context.localTxId(), + context.parentTxId(), + joinPoint.getArgs()); - preTransactionInterceptor.intercept(context.txId(), joinPoint.getArgs()); return joinPoint.proceed(); } } diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxEvent.java b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxEvent.java index 5347c0d..e011aeb 100644 --- a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxEvent.java +++ b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxEvent.java @@ -18,16 +18,28 @@ package io.servicecomb.saga.omega.transaction; public abstract class TxEvent { - private final long txId; + private final String globalTxId; + private final String localTxId; + private final String parentTxId; private final Object[] payloads; - TxEvent(Object[] payloads, long txId) { + TxEvent(String globalTxId, String localTxId, String parentTxId, Object[] payloads) { + this.localTxId = localTxId; + this.parentTxId = parentTxId; this.payloads = payloads; - this.txId = txId; + this.globalTxId = globalTxId; } - public long txId() { - return txId; + public String globalTxId() { + return globalTxId; + } + + public String localTxId() { + return localTxId; + } + + public String parentTxId() { + return parentTxId; } public Object[] payloads() { diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxStartedEvent.java b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxStartedEvent.java index 2839afa..525da10 100644 --- a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxStartedEvent.java +++ b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxStartedEvent.java @@ -19,8 +19,8 @@ package io.servicecomb.saga.omega.transaction; class TxStartedEvent extends TxEvent { - TxStartedEvent(long txId, Object[] payloads) { - super(payloads, txId); + TxStartedEvent(String globalTxId, String localTxId, String parentTxId, Object[] payloads) { + super(globalTxId, localTxId, parentTxId, payloads); } @Override diff --git a/omega/omega-transaction/src/test/java/io/servicecomb/saga/omega/transaction/PreTransactionInterceptorTest.java b/omega/omega-transaction/src/test/java/io/servicecomb/saga/omega/transaction/PreTransactionInterceptorTest.java index b20cd3c..7d335ad 100644 --- a/omega/omega-transaction/src/test/java/io/servicecomb/saga/omega/transaction/PreTransactionInterceptorTest.java +++ b/omega/omega-transaction/src/test/java/io/servicecomb/saga/omega/transaction/PreTransactionInterceptorTest.java @@ -17,25 +17,27 @@ package io.servicecomb.saga.omega.transaction; -import static com.seanyinx.github.unit.scaffolding.Randomness.nextId; import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify; import static org.hamcrest.collection.IsIterableContainingInOrder.contains; import static org.junit.Assert.assertThat; import java.util.ArrayList; import java.util.List; +import java.util.UUID; import org.junit.Test; public class PreTransactionInterceptorTest { private final List<byte[]> messages = new ArrayList<>(); - private final long txId = nextId(); + private final String globalTxId = UUID.randomUUID().toString(); + private final String localTxId = UUID.randomUUID().toString(); + private final String parentTxId = UUID.randomUUID().toString(); private final MessageSender sender = messages::add; private final MessageSerializer serializer = event -> { if (event.payloads()[0] instanceof String) { String message = (String) event.payloads()[0]; - return serialize(txId, message); + return serialize(globalTxId, localTxId, parentTxId, message); } throw new IllegalArgumentException("Expected instance of String, but was " + event.getClass()); }; @@ -43,14 +45,14 @@ public class PreTransactionInterceptorTest { private final String message = uniquify("message"); private final PreTransactionInterceptor interceptor = new PreTransactionInterceptor(sender, serializer); - private byte[] serialize(long txId, String message) { - return (txId + ":" + message).getBytes(); + private byte[] serialize(String globalTxId, String localTxId, String parentTxId, String message) { + return (globalTxId + ":" + localTxId + ":" + parentTxId + ":" + message).getBytes(); } @Test public void sendsSerializedMessage() throws Exception { - interceptor.intercept(txId, message); + interceptor.intercept(globalTxId, localTxId, parentTxId, message); - assertThat(messages, contains(serialize(txId, message))); + assertThat(messages, contains(serialize(globalTxId, localTxId, parentTxId, message))); } } -- To stop receiving notification emails like this one, please contact "commits@servicecomb.apache.org" <commits@servicecomb.apache.org>.