This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit e717f952fda1f52f64e9a2159a2f9faad152ae1a Author: seanyinx <[email protected]> AuthorDate: Thu Dec 28 16:14:12 2017 +0800 SCB-98 supported compensation context from remote instead of local caching Signed-off-by: seanyinx <[email protected]> --- .../saga/omega/context/OmegaContext.java | 63 ++++++---------------- .../spring/CompensableAnnotationProcessor.java | 10 +++- .../spring/CompensableMethodCheckingCallback.java | 10 ++-- .../spring/TransactionAspectConfig.java | 4 +- .../spring/TransactionInterceptionTest.java | 11 ++-- .../saga/omega/transaction/MessageHandler.java | 2 +- .../saga/omega/transaction/TransactionAspect.java | 6 --- 7 files changed, 42 insertions(+), 64 deletions(-) diff --git a/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java b/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java index 1fe8661..ccc3738 100644 --- a/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java +++ b/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java @@ -20,8 +20,8 @@ package io.servicecomb.saga.omega.context; import java.lang.invoke.MethodHandles; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,8 +34,8 @@ public class OmegaContext { private final ThreadLocal<String> globalTxId = new ThreadLocal<>(); private final ThreadLocal<String> localTxId = new ThreadLocal<>(); private final ThreadLocal<String> parentTxId = new ThreadLocal<>(); - private final Map<String, Map<String, CompensationContext>> compensationContexts = new ConcurrentHashMap<>(); private final IdGenerator<String> idGenerator; + private final Map<String, CompensationContext> compensationContexts = new HashMap<>(); public OmegaContext(IdGenerator<String> idGenerator) { this.idGenerator = idGenerator; @@ -77,50 +77,23 @@ public class OmegaContext { this.parentTxId.set(parentTxId); } - // TODO: 2017/12/23 remove this context entry by the end of its corresponding global tx - public void addContext(String globalTxId, String localTxId, Object target, String compensationMethod, Object... args) { - compensationContexts.computeIfAbsent(globalTxId, k -> new ConcurrentHashMap<>()) - .put(localTxId, new CompensationContext(target, compensationMethod, args)); + public void addCompensationContext(Method compensationMethod, Object target) { + compensationMethod.setAccessible(true); + compensationContexts.put(compensationMethod.toString(), new CompensationContext(target, compensationMethod)); } - public boolean containsContext(String globalTxId) { - return compensationContexts.containsKey(globalTxId); - } - - public void compensate(String globalTxId) { - Map<String, CompensationContext> contexts = compensationContexts.remove(globalTxId); - - for (CompensationContext compensationContext : contexts.values()) { - try { - invokeMethod(compensationContext); - } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { - LOG.error( - "Pre-checking for compensate method " + compensationContext.compensationMethod - + " was somehow skipped, did you forget to configure compensable method checking on service startup?", - e); - } - } - } - - private void invokeMethod(CompensationContext compensationContext) - throws NoSuchMethodException, IllegalAccessException, InvocationTargetException { + public void compensate(String globalTxId, String localTxId, String compensationMethod, Object[] payloads) { + CompensationContext compensationContext = compensationContexts.get(compensationMethod); - Method method = compensationContext.target - .getClass() - .getDeclaredMethod(compensationContext.compensationMethod, argClasses(compensationContext)); - method.setAccessible(true); - - method.invoke(compensationContext.target, compensationContext.args); - } - - private Class<?>[] argClasses(CompensationContext compensationContext) { - Class<?>[] classes = new Class<?>[compensationContext.args.length]; - - for (int i = 0; i < compensationContext.args.length; i++) { - classes[i] = compensationContext.args[i].getClass(); + try { + compensationContext.compensationMethod.invoke(compensationContext.target, payloads); + LOG.info("Compensated transaction with global tx id [{}], local tx id [{}]", globalTxId, localTxId); + } catch (IllegalAccessException | InvocationTargetException e) { + LOG.error( + "Pre-checking for compensate method " + compensationContext.compensationMethod.toString() + + " was somehow skipped, did you forget to configure compensable method checking on service startup?", + e); } - - return classes; } @Override @@ -134,13 +107,11 @@ public class OmegaContext { private static final class CompensationContext { private final Object target; - private final String compensationMethod; - private final Object[] args; + private final Method compensationMethod; - private CompensationContext(Object target, String compensationMethod, Object... args) { + private CompensationContext(Object target, Method compensationMethod) { this.target = target; this.compensationMethod = compensationMethod; - this.args = args; } } } diff --git a/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java b/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java index 81318ad..e97d4d3 100644 --- a/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java +++ b/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java @@ -21,8 +21,16 @@ import org.springframework.beans.BeansException; import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.util.ReflectionUtils; +import io.servicecomb.saga.omega.context.OmegaContext; + class CompensableAnnotationProcessor implements BeanPostProcessor { + private final OmegaContext omegaContext; + + CompensableAnnotationProcessor(OmegaContext omegaContext) { + this.omegaContext = omegaContext; + } + @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { checkMethod(bean); @@ -35,6 +43,6 @@ class CompensableAnnotationProcessor implements BeanPostProcessor { } private void checkMethod(Object bean) { - ReflectionUtils.doWithMethods(bean.getClass(), new CompensableMethodCheckingCallback(bean)); + ReflectionUtils.doWithMethods(bean.getClass(), new CompensableMethodCheckingCallback(bean, omegaContext)); } } diff --git a/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java b/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java index 23352f4..64779d7 100644 --- a/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java +++ b/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java @@ -24,16 +24,19 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.ReflectionUtils.MethodCallback; +import io.servicecomb.saga.omega.context.OmegaContext; import io.servicecomb.saga.omega.transaction.OmegaException; import io.servicecomb.saga.omega.transaction.annotations.Compensable; class CompensableMethodCheckingCallback implements MethodCallback { - private static Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private final Object bean; + private final OmegaContext omegaContext; - CompensableMethodCheckingCallback(Object bean) { + CompensableMethodCheckingCallback(Object bean, OmegaContext omegaContext) { this.bean = bean; + this.omegaContext = omegaContext; } @Override @@ -45,7 +48,8 @@ class CompensableMethodCheckingCallback implements MethodCallback { String compensationMethod = method.getAnnotation(Compensable.class).compensationMethod(); try { - bean.getClass().getDeclaredMethod(compensationMethod, method.getParameterTypes()); + Method signature = bean.getClass().getDeclaredMethod(compensationMethod, method.getParameterTypes()); + omegaContext.addCompensationContext(signature, bean); LOG.debug("Found compensation method [{}] in {}", compensationMethod, bean.getClass().getCanonicalName()); } catch (NoSuchMethodException e) { throw new OmegaException( diff --git a/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java b/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java index 9fe5956..5982109 100644 --- a/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java +++ b/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java @@ -35,7 +35,7 @@ public class TransactionAspectConfig { } @Bean - CompensableAnnotationProcessor compensableAnnotationProcessor() { - return new CompensableAnnotationProcessor(); + CompensableAnnotationProcessor compensableAnnotationProcessor(OmegaContext omegaContext) { + return new CompensableAnnotationProcessor(omegaContext); } } diff --git a/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java index 83e57b0..8b873b8 100644 --- a/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java +++ b/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java @@ -106,15 +106,16 @@ public class TransactionInterceptionTest { User user = userService.add(new User(username, email)); // another sub transaction to the same service within the same global transaction - omegaContext.newLocalTxId(); + String localTxId = omegaContext.newLocalTxId(); User anotherUser = userService.add(new User(uniquify("Jack"), uniquify("[email protected]"))); - messageHandler.onReceive("to be compensated".getBytes()); + String compensationMethod = TransactionalUserService.class.getDeclaredMethod("delete", User.class).toString(); + + messageHandler.onReceive(globalTxId, this.localTxId, compensationMethod, user); + messageHandler.onReceive(globalTxId, localTxId, compensationMethod, anotherUser); assertThat(userRepository.findOne(user.id()), is(nullValue())); assertThat(userRepository.findOne(anotherUser.id()), is(nullValue())); - - assertThat(omegaContext.containsContext(globalTxId), is(false)); } private List<String> toString(List<byte[]> messages) { @@ -158,7 +159,7 @@ public class TransactionInterceptionTest { @Bean MessageHandler handler(OmegaContext omegaContext) { - return bytes -> omegaContext.compensate(globalTxId); + return omegaContext::compensate; } } diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageHandler.java b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageHandler.java index e954381..caf2da8 100644 --- a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageHandler.java +++ b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageHandler.java @@ -18,5 +18,5 @@ package io.servicecomb.saga.omega.transaction; public interface MessageHandler { - void onReceive(byte[] message); + void onReceive(String globalTxId, String localTxId, String compensationMethod, Object... payloads); } diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TransactionAspect.java index ecce0ee..e4de9c9 100644 --- a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TransactionAspect.java +++ b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TransactionAspect.java @@ -48,12 +48,6 @@ public class TransactionAspect { Method method = ((MethodSignature) joinPoint.getSignature()).getMethod(); LOG.debug("Intercepting compensable method {} with context {}", method.toString(), context); - context.addContext(context.globalTxId(), - context.localTxId(), - joinPoint.getTarget(), - compensable.compensationMethod(), - joinPoint.getArgs()); - preIntercept(joinPoint); Object result = joinPoint.proceed(); postIntercept(); -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
