This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch SCB-665 in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit a9d52d6e4e824f349ae7b9e7fe46441c49be2c0f Author: Willem Jiang <jiangni...@huawei.com> AuthorDate: Sun Aug 12 11:16:22 2018 +0800 SCB-817 Added TCC events in Omega part (WIP) --- .../saga/omega/context/annotations/TccStart.java | 15 +++++ .../saga/omega/transaction/annotations/TCC.java | 64 ++++++++++++++++++++++ .../saga/omega/transaction/tcc/TccAspect.java | 48 ++++++++++++++++ .../saga/omega/transaction/tcc/TccInterceptor.java | 4 ++ .../tcc/TccStartAnnotationProcessor.java | 47 ++++++++++++++++ .../saga/omega/transaction/tcc/TccStartAspect.java | 60 ++++++++++++++++++++ .../omega/transaction/tcc/events/CancelEvent.java | 20 +++++++ .../omega/transaction/tcc/events/ConfirmEvent.java | 20 +++++++ .../transaction/tcc/events/ParticipateEvent.java | 21 +++++++ .../tcc/events/TransactionEndEvent.java | 4 ++ .../tcc/events/TransationStartEvent.java | 4 ++ 11 files changed, 307 insertions(+) diff --git a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/annotations/TccStart.java b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/annotations/TccStart.java new file mode 100644 index 0000000..c9154f9 --- /dev/null +++ b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/annotations/TccStart.java @@ -0,0 +1,15 @@ +package org.apache.servicecomb.saga.omega.context.annotations; + +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +/** + * Indicates the annotated method will start a TCC . + */ +@Retention(RUNTIME) +@Target(METHOD) +public @interface TccStart { +} diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/TCC.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/TCC.java new file mode 100644 index 0000000..a095978 --- /dev/null +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/TCC.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.servicecomb.saga.omega.transaction.annotations; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.METHOD}) +/** + * Indicates the annotated method will start a sub-transaction. <br> + * A <code>@TCC</code> method should satisfy below requirements: + * <ol> + * <li>all parameters are serialized</li> + * <li>is idempotent</li> + * <li>the object instance which @TCC method resides in should be stateless</li> + * </ol> + */ +public @interface TCC { + /** + * Confirm method name.<br> + * A confirm method should satisfy below requirements: + * <ol> + * <li>has same parameter list as @TCC method's</li> + * <li>all parameters are serialized</li> + * <li>is idempotent</li> + * <li>be in the same class as @TCC method is in</li> + * </ol> + * + * @return + */ + String confirmMethod() default ""; + + /** + * Cancel method name.<br> + * A cancel method should satisfy below requirements: + * <ol> + * <li>has same parameter list as @TCC method's</li> + * <li>all parameters are serialized</li> + * <li>is idempotent</li> + * <li>be in the same class as @TCC method is in</li> + * </ol> + * + * @return + */ + String cancelMethod() default ""; + +} diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccAspect.java new file mode 100644 index 0000000..173cb78 --- /dev/null +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccAspect.java @@ -0,0 +1,48 @@ +package org.apache.servicecomb.saga.omega.transaction.tcc; + +import java.lang.invoke.MethodHandles; +import java.lang.reflect.Method; + +import org.apache.servicecomb.saga.omega.context.OmegaContext; +import org.apache.servicecomb.saga.omega.transaction.CompensableInterceptor; +import org.apache.servicecomb.saga.omega.transaction.MessageSender; +import org.apache.servicecomb.saga.omega.transaction.RecoveryPolicy; +import org.apache.servicecomb.saga.omega.transaction.RecoveryPolicyFactory; +import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable; +import org.aspectj.lang.ProceedingJoinPoint; +import org.aspectj.lang.annotation.Around; +import org.aspectj.lang.annotation.Aspect; +import org.aspectj.lang.reflect.MethodSignature; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Aspect +public class TccAspect { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private final OmegaContext context; + + private final CompensableInterceptor interceptor; + + public TccAspect(MessageSender sender, OmegaContext context) { + this.context = context; + this.interceptor = new CompensableInterceptor(context, sender); + } + + @Around("execution(@org.apache.servicecomb.saga.omega.transaction.annotations.Compensable * *(..)) && @annotation(compensable)") + Object advise(ProceedingJoinPoint joinPoint, Compensable compensable) throws Throwable { + Method method = ((MethodSignature) joinPoint.getSignature()).getMethod(); + String localTxId = context.localTxId(); + context.newLocalTxId(); + LOG.debug("Updated context {} for compensable method {} ", context, method.toString()); + + int retries = compensable.retries(); + RecoveryPolicy recoveryPolicy = RecoveryPolicyFactory.getRecoveryPolicy(retries); + try { + return recoveryPolicy.apply(joinPoint, compensable, interceptor, context, localTxId, retries); + } finally { + context.setLocalTxId(localTxId); + LOG.debug("Restored context back to {}", context); + } + } +} diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccInterceptor.java new file mode 100644 index 0000000..5ada269 --- /dev/null +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccInterceptor.java @@ -0,0 +1,4 @@ +package org.apache.servicecomb.saga.omega.transaction.tcc; + +public class TccInterceptor { +} diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessor.java new file mode 100644 index 0000000..3e708cd --- /dev/null +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessor.java @@ -0,0 +1,47 @@ +package org.apache.servicecomb.saga.omega.transaction.tcc; + +import javax.transaction.TransactionalException; + +import org.apache.servicecomb.saga.omega.context.OmegaContext; +import org.apache.servicecomb.saga.omega.transaction.AlphaResponse; +import org.apache.servicecomb.saga.omega.transaction.EventAwareInterceptor; +import org.apache.servicecomb.saga.omega.transaction.MessageSender; +import org.apache.servicecomb.saga.omega.transaction.OmegaException; +import org.apache.servicecomb.saga.omega.transaction.SagaEndedEvent; +import org.apache.servicecomb.saga.omega.transaction.SagaStartedEvent; +import org.apache.servicecomb.saga.omega.transaction.TxAbortedEvent; + +public class TccStartAnnotationProcessor implements EventAwareInterceptor { + + private final OmegaContext omegaContext; + private final MessageSender sender; + + TccStartAnnotationProcessor(OmegaContext omegaContext, MessageSender sender) { + this.omegaContext = omegaContext; + this.sender = sender; + } + + @Override + public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod, + int retries, Object... message) { + try { + return sender.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId(), timeout)); + } catch (OmegaException e) { + throw new TransactionalException(e.getMessage(), e.getCause()); + } + } + + @Override + public void postIntercept(String parentTxId, String compensationMethod) { + AlphaResponse response = sender.send(new SagaEndedEvent(omegaContext.globalTxId(), omegaContext.localTxId())); + if (response.aborted()) { + throw new OmegaException("transaction " + parentTxId + " is aborted"); + } + } + + @Override + public void onError(String parentTxId, String compensationMethod, Throwable throwable) { + String globalTxId = omegaContext.globalTxId(); + sender.send(new TxAbortedEvent(globalTxId, omegaContext.localTxId(), null, compensationMethod, throwable)); + } +} diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAspect.java new file mode 100644 index 0000000..5b380db --- /dev/null +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAspect.java @@ -0,0 +1,60 @@ +package org.apache.servicecomb.saga.omega.transaction.tcc; + +import java.lang.invoke.MethodHandles; +import java.lang.reflect.Method; + +import org.apache.servicecomb.saga.omega.context.OmegaContext; +import org.apache.servicecomb.saga.omega.context.annotations.SagaStart; +import org.apache.servicecomb.saga.omega.transaction.MessageSender; +import org.apache.servicecomb.saga.omega.transaction.OmegaException; +import org.aspectj.lang.ProceedingJoinPoint; +import org.aspectj.lang.annotation.Around; +import org.aspectj.lang.annotation.Aspect; +import org.aspectj.lang.reflect.MethodSignature; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Aspect +public class TccStartAspect { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private final TccStartAnnotationProcessor tccStartAnnotationProcessor; + + private final OmegaContext context; + + public TccStartAspect(MessageSender sender, OmegaContext context) { + this.context = context; + this.tccStartAnnotationProcessor = new TccStartAnnotationProcessor(context, sender); + } + + @Around("execution(@org.apache.servicecomb.saga.omega.context.annotations.TccStart * *(..)) && @annotation(sagaStart)") + Object advise(ProceedingJoinPoint joinPoint, SagaStart sagaStart) throws Throwable { + initializeOmegaContext(); + Method method = ((MethodSignature) joinPoint.getSignature()).getMethod(); + + tccStartAnnotationProcessor.preIntercept(context.globalTxId(), method.toString(), sagaStart.timeout(), "", 0); + LOG.debug("Initialized context {} before execution of method {}", context, method.toString()); + + try { + Object result = joinPoint.proceed(); + + tccStartAnnotationProcessor.postIntercept(context.globalTxId(), method.toString()); + LOG.debug("Transaction with context {} has finished.", context); + + return result; + } catch (Throwable throwable) { + // We don't need to handle the OmegaException here + if (!(throwable instanceof OmegaException)) { + tccStartAnnotationProcessor.onError(context.globalTxId(), method.toString(), throwable); + LOG.error("Transaction {} failed.", context.globalTxId()); + } + throw throwable; + } finally { + context.clear(); + } + } + + private void initializeOmegaContext() { + context.setLocalTxId(context.newGlobalTxId()); + } +} diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/CancelEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/CancelEvent.java new file mode 100644 index 0000000..7659812 --- /dev/null +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/CancelEvent.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.servicecomb.saga.omega.transaction.tcc.events; + +public class CancelEvent { +} diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/ConfirmEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/ConfirmEvent.java new file mode 100644 index 0000000..f01675b --- /dev/null +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/ConfirmEvent.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.servicecomb.saga.omega.transaction.tcc.events; + +public class ConfirmEvent { +} diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/ParticipateEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/ParticipateEvent.java new file mode 100644 index 0000000..83e910b --- /dev/null +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/ParticipateEvent.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.servicecomb.saga.omega.transaction.tcc.events; + + +public class ParticipateEvent { +} diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TransactionEndEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TransactionEndEvent.java new file mode 100644 index 0000000..9613602 --- /dev/null +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TransactionEndEvent.java @@ -0,0 +1,4 @@ +package org.apache.servicecomb.saga.omega.transaction.tcc.events; + +public class TransactionEndEvent { +} diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TransationStartEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TransationStartEvent.java new file mode 100644 index 0000000..54858c6 --- /dev/null +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TransationStartEvent.java @@ -0,0 +1,4 @@ +package org.apache.servicecomb.saga.omega.transaction.tcc.events; + +public class TransationStartEvent { +}