This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 8814789a3070b35a17cbbc1e7638c9e7502beb83 Author: Willem Jiang <jiangni...@huawei.com> AuthorDate: Tue Aug 21 10:25:13 2018 +0800 SCB-818 Omega supports of TCC (WIP) --- .../omega/transaction/tcc/TccEventService.java | 25 ++++++++++++++++++++- .../transaction/tcc/TccParticipatorAspect.java | 14 ++++++++++-- .../tcc/TccStartAnnotationProcessor.java | 26 ++++++++++------------ .../saga/omega/transaction/tcc/TccStartAspect.java | 4 ++-- .../transaction/tcc/events/ParticipatedEvent.java | 10 ++++++--- .../transaction/tcc/events/TccEndedEvent.java | 4 ++-- .../transaction/tcc/events/TccStartedEvent.java | 4 ++-- 7 files changed, 61 insertions(+), 26 deletions(-) diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccEventService.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccEventService.java index 1649620..ae011bc 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccEventService.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccEventService.java @@ -1,4 +1,27 @@ package org.apache.servicecomb.saga.omega.transaction.tcc; -public class TccService { +import org.apache.servicecomb.saga.omega.transaction.AlphaResponse; +import org.apache.servicecomb.saga.omega.transaction.TxEvent; +import org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent; +import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent; +import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccStartedEvent; + +public interface TccEventService { + + void onConnected(); + + void onDisconnected(); + + void close(); + + String target(); + + AlphaResponse participate(ParticipatedEvent participateEvent); + + AlphaResponse TccTransactionStart(TccStartedEvent tccStartEvent); + + AlphaResponse TccTransactionStop(TccEndedEvent tccEndEvent); + + AlphaResponse send(TxEvent event); + } diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccParticipatorAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccParticipatorAspect.java index 02adac2..e64bc2a 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccParticipatorAspect.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccParticipatorAspect.java @@ -20,6 +20,7 @@ package org.apache.servicecomb.saga.omega.transaction.tcc; import java.lang.invoke.MethodHandles; import java.lang.reflect.Method; +import org.apache.servicecomb.saga.common.TransactionStatus; import org.apache.servicecomb.saga.omega.context.OmegaContext; import org.apache.servicecomb.saga.omega.transaction.MessageSender; import org.apache.servicecomb.saga.omega.transaction.OmegaException; @@ -27,6 +28,7 @@ import org.apache.servicecomb.saga.omega.transaction.RecoveryPolicy; import org.apache.servicecomb.saga.omega.transaction.RecoveryPolicyFactory; import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable; import org.apache.servicecomb.saga.omega.transaction.annotations.Participate; +import org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; @@ -39,9 +41,11 @@ public class TccParticipatorAspect { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private final OmegaContext context; + private final TccEventService tccEventService; - public TccParticipatorAspect(MessageSender sender, OmegaContext context) { + public TccParticipatorAspect(TccEventService tccEventService, OmegaContext context) { this.context = context; + this.tccEventService = tccEventService; } @Around("execution(@org.apache.servicecomb.saga.omega.transaction.annotations.Participate * *(..)) && @annotation(participate)") @@ -49,6 +53,7 @@ public class TccParticipatorAspect { Method method = ((MethodSignature) joinPoint.getSignature()).getMethod(); String localTxId = context.localTxId(); String cancelMethod = participate.cancelMethod(); + String confirmMethod = participate.confirmMethod(); context.newLocalTxId(); LOG.debug("Updated context {} for participate method {} ", context, method.toString()); @@ -56,13 +61,18 @@ public class TccParticipatorAspect { try { Object result = joinPoint.proceed(); // Send the participate message back + tccEventService.participate(new ParticipatedEvent(context.globalTxId(), context.localTxId(), localTxId, cancelMethod, confirmMethod, + TransactionStatus.Succeed)); LOG.debug("Participate Transaction with context {} has finished.", context); - return result; } catch (Throwable throwable) { // Now we don't handle the error message + tccEventService.participate(new ParticipatedEvent(context.globalTxId(), context.localTxId(), localTxId, cancelMethod, + confirmMethod, TransactionStatus.Failed)); LOG.error("Participate Transaction with context {} failed.", context, throwable); throw throwable; + } finally { + context.setLocalTxId(localTxId); } } } diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessor.java index b93373f..26621d1 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessor.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessor.java @@ -18,30 +18,30 @@ package org.apache.servicecomb.saga.omega.transaction.tcc; import javax.transaction.TransactionalException; +import org.apache.servicecomb.saga.common.TransactionStatus; import org.apache.servicecomb.saga.omega.context.OmegaContext; import org.apache.servicecomb.saga.omega.transaction.AlphaResponse; import org.apache.servicecomb.saga.omega.transaction.EventAwareInterceptor; -import org.apache.servicecomb.saga.omega.transaction.MessageSender; import org.apache.servicecomb.saga.omega.transaction.OmegaException; -import org.apache.servicecomb.saga.omega.transaction.SagaEndedEvent; -import org.apache.servicecomb.saga.omega.transaction.SagaStartedEvent; import org.apache.servicecomb.saga.omega.transaction.TxAbortedEvent; +import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent; +import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccStartedEvent; public class TccStartAnnotationProcessor implements EventAwareInterceptor { private final OmegaContext omegaContext; - private final MessageSender sender; + private final TccEventService eventService; - TccStartAnnotationProcessor(OmegaContext omegaContext, MessageSender sender) { + TccStartAnnotationProcessor(OmegaContext omegaContext, TccEventService eventService) { this.omegaContext = omegaContext; - this.sender = sender; + this.eventService = eventService; } @Override public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod, int retries, Object... message) { try { - return sender.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId(), timeout)); + return eventService.TccTransactionStart(new TccStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId())); } catch (OmegaException e) { throw new TransactionalException(e.getMessage(), e.getCause()); } @@ -49,17 +49,15 @@ public class TccStartAnnotationProcessor implements EventAwareInterceptor { @Override public void postIntercept(String parentTxId, String compensationMethod) { - // Send the confirm event - /*AlphaResponse response = sender.send(new SagaEndedEvent(omegaContext.globalTxId(), omegaContext.localTxId())); - if (response.aborted()) { - throw new OmegaException("transaction " + parentTxId + " is aborted"); - }*/ + eventService.TccTransactionStop(new TccEndedEvent(omegaContext.globalTxId(), omegaContext.localTxId(), + TransactionStatus.Succeed)); } @Override public void onError(String parentTxId, String compensationMethod, Throwable throwable) { // Send the cancel event - String globalTxId = omegaContext.globalTxId(); - sender.send(new TxAbortedEvent(globalTxId, omegaContext.localTxId(), null, compensationMethod, throwable)); + // Do we need to wait for the alpha finish all the transaction + eventService.TccTransactionStop(new TccEndedEvent(omegaContext.globalTxId(), omegaContext.localTxId(), + TransactionStatus.Failed)); } } diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAspect.java index 90728e3..f6d1d77 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAspect.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAspect.java @@ -39,9 +39,9 @@ public class TccStartAspect { private final OmegaContext context; - public TccStartAspect(MessageSender sender, OmegaContext context) { + public TccStartAspect(TccEventService tccEventServicer, OmegaContext context) { this.context = context; - this.tccStartAnnotationProcessor = new TccStartAnnotationProcessor(context, sender); + this.tccStartAnnotationProcessor = new TccStartAnnotationProcessor(context, tccEventServicer); } @Around("execution(@org.apache.servicecomb.saga.omega.context.annotations.TccStart * *(..)) && @annotation(tccStart)") diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/ParticipatedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/ParticipatedEvent.java index 73ede9c..3372f8e 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/ParticipatedEvent.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/ParticipatedEvent.java @@ -17,21 +17,25 @@ package org.apache.servicecomb.saga.omega.transaction.tcc.events; -public class ParticipateEvent { +import org.apache.servicecomb.saga.common.TransactionStatus; + +public class ParticipatedEvent { private final String globalTxId; private final String localTxId; private final String parentTxId; private final String confirmMethod; private final String cancelMethod; + private final TransactionStatus status; - public ParticipateEvent(String globalTxId, String localTxId, String parentTxId, String confirmMethod, - String cancelMethod) { + public ParticipatedEvent(String globalTxId, String localTxId, String parentTxId, String confirmMethod, + String cancelMethod, TransactionStatus status) { this.globalTxId = globalTxId; this.localTxId = localTxId; this.parentTxId = parentTxId; this.confirmMethod = confirmMethod; this.cancelMethod = cancelMethod; + this.status = status; } } diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TccEndedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TccEndedEvent.java index 9107ab0..7c666b2 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TccEndedEvent.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TccEndedEvent.java @@ -18,13 +18,13 @@ package org.apache.servicecomb.saga.omega.transaction.tcc.events; import org.apache.servicecomb.saga.common.TransactionStatus; -public class TccEndEvent { +public class TccEndedEvent { private final String globalTxId; private final String localTxId; private final TransactionStatus status; - public TccEndEvent(String globalTxId, String localTxId, + public TccEndedEvent(String globalTxId, String localTxId, TransactionStatus status) { this.globalTxId = globalTxId; this.localTxId = localTxId; diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TccStartedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TccStartedEvent.java index 64db3ea..edd0333 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TccStartedEvent.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TccStartedEvent.java @@ -16,7 +16,7 @@ */ package org.apache.servicecomb.saga.omega.transaction.tcc.events; -public class TccStartEvent { +public class TccStartedEvent { private final String globalTxId; private final String localTxId; @@ -31,7 +31,7 @@ public class TccStartEvent { - public TccStartEvent(String globalTxId, String localTxId) { + public TccStartedEvent(String globalTxId, String localTxId) { this.globalTxId = globalTxId; this.localTxId = localTxId; }