This is an automated email from the ASF dual-hosted git repository. zhanglei pushed a commit to branch SCB-1321 in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit ea7c70aaf254f6ba6b2198f2bc48b1e8b393af71 Author: Lei Zhang <[email protected]> AuthorDate: Mon Jul 8 20:14:00 2019 +0800 SCB-1321 Refactoring interrupt Omega timeout thread --- .../omega/transaction/AbstractRecoveryPolicy.java | 5 +- .../omega/transaction/CompensableInterceptor.java | 2 +- .../transaction/RecoveryPolicyTimeoutWrapper.java | 226 --------------------- .../pack/omega/transaction/SagaAbortedEvent.java | 42 ++++ .../transaction/SagaStartAnnotationProcessor.java | 22 +- .../pack/omega/transaction/SagaStartAspect.java | 35 +--- .../wrapper/RecoveryPolicyTimeoutWrapper.java | 222 ++++++++++++++++++++ ...SagaStartAnnotationProcessorTimeoutWrapper.java | 107 ++++++++++ .../SagaStartAnnotationProcessorWrapper.java} | 32 +-- .../omega/transaction/wrapper/TimeoutProb.java | 92 +++++++++ .../transaction/wrapper/TimeoutProbManager.java | 100 +++++++++ .../omega/transaction/SagaStartAspectTest.java | 44 +++- 12 files changed, 639 insertions(+), 290 deletions(-) diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/AbstractRecoveryPolicy.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/AbstractRecoveryPolicy.java index 7de6470..8526581 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/AbstractRecoveryPolicy.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/AbstractRecoveryPolicy.java @@ -19,6 +19,7 @@ package org.apache.servicecomb.pack.omega.transaction; import org.apache.servicecomb.pack.omega.context.OmegaContext; import org.apache.servicecomb.pack.omega.transaction.annotations.Compensable; +import org.apache.servicecomb.pack.omega.transaction.wrapper.RecoveryPolicyTimeoutWrapper; import org.aspectj.lang.ProceedingJoinPoint; public abstract class AbstractRecoveryPolicy implements RecoveryPolicy { @@ -32,8 +33,8 @@ public abstract class AbstractRecoveryPolicy implements RecoveryPolicy { CompensableInterceptor interceptor, OmegaContext context, String parentTxId, int retries) throws Throwable { if(compensable.timeout()>0){ - return RecoveryPolicyTimeoutWrapper - .getInstance().wrapper(this).applyTo(joinPoint, compensable, interceptor, context, parentTxId, retries); + RecoveryPolicyTimeoutWrapper wrapper = new RecoveryPolicyTimeoutWrapper(this); + return wrapper.applyTo(joinPoint, compensable, interceptor, context, parentTxId, retries); }else{ return this.applyTo(joinPoint, compensable, interceptor, context, parentTxId, retries); } diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CompensableInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CompensableInterceptor.java index 7186147..08ad7f7 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CompensableInterceptor.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CompensableInterceptor.java @@ -19,7 +19,7 @@ package org.apache.servicecomb.pack.omega.transaction; import org.apache.servicecomb.pack.omega.context.OmegaContext; -class CompensableInterceptor implements EventAwareInterceptor { +public class CompensableInterceptor implements EventAwareInterceptor { private final OmegaContext context; private final SagaMessageSender sender; diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/RecoveryPolicyTimeoutWrapper.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/RecoveryPolicyTimeoutWrapper.java deleted file mode 100644 index 3fe60ae..0000000 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/RecoveryPolicyTimeoutWrapper.java +++ /dev/null @@ -1,226 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.servicecomb.pack.omega.transaction; - -import java.lang.invoke.MethodHandles; -import java.lang.reflect.Method; -import java.nio.channels.ClosedByInterruptException; -import java.util.Set; -import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import org.apache.servicecomb.pack.omega.context.OmegaContext; -import org.apache.servicecomb.pack.omega.transaction.annotations.Compensable; -import org.aspectj.lang.ProceedingJoinPoint; -import org.aspectj.lang.reflect.MethodSignature; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * RecoveryPolicy Wrapper - * 1.Use this wrapper to send a request if the @Compensable timeout>0 - * 2.Terminate thread execution if execution time is greater than the timeout of @Compensable - * - * Exception - * 1.If the interrupt succeeds, a TransactionTimeoutException is thrown and the local transaction is rollback - * 2.If the interrupt fails, throw an OmegaException - * - * Note: Omega end thread coding advice - * 1.add short sleep to while true loop. Otherwise, the thread may not be able to terminate. - * 2.Replace the synchronized with ReentrantLock, Otherwise, the thread may not be able to terminate. - * */ - -public class RecoveryPolicyTimeoutWrapper { - - private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private static RecoveryPolicyTimeoutWrapper instance = new RecoveryPolicyTimeoutWrapper(100); - private AbstractRecoveryPolicy recoveryPolicy; - private final transient Set<TimeoutProb> timeoutProbs = new ConcurrentSkipListSet<TimeoutProb>(); - - public static RecoveryPolicyTimeoutWrapper getInstance() { - return instance; - } - - public RecoveryPolicyTimeoutWrapper(int delay) { - this.interrupter.scheduleWithFixedDelay( - new Runnable() { - @Override - public void run() { - try { - RecoveryPolicyTimeoutWrapper.this.interrupt(); - } catch (Exception e) { - LOG.error("The overtime thread interrupt fail",e); - } - } - }, - 0, delay, TimeUnit.MICROSECONDS - ); - } - - /** - * Configuration timeout probe thread - */ - private final transient ScheduledExecutorService interrupter = - Executors.newSingleThreadScheduledExecutor( - new TimeoutProbeThreadFactory() - ); - - /** - * Loop detection of all thread timeout probes, remove probe if the thread has terminated - */ - private void interrupt() { - synchronized (this.interrupter) { - for (TimeoutProb timeoutProb : this.timeoutProbs) { - if (timeoutProb.interruptFailureException == null) { - if (timeoutProb.expired()) { - if (timeoutProb.interrupted()) { - this.timeoutProbs.remove(timeoutProb); - } - } - } - } - } - } - - public RecoveryPolicyTimeoutWrapper wrapper(AbstractRecoveryPolicy recoveryPolicy) { - this.recoveryPolicy = recoveryPolicy; - return this; - } - - public Object applyTo(ProceedingJoinPoint joinPoint, Compensable compensable, - CompensableInterceptor interceptor, OmegaContext context, String parentTxId, int retries) - throws Throwable { - final TimeoutProb timeoutProb = new TimeoutProb(joinPoint, compensable); - this.timeoutProbs.add(timeoutProb); - Object output; - try { - output = this.recoveryPolicy - .applyTo(joinPoint, compensable, interceptor, context, parentTxId, retries); - if (timeoutProb.getInterruptFailureException() != null) { - throw new OmegaException(timeoutProb.interruptFailureException); - } - } catch (InterruptedException e) { - if (timeoutProb.getInterruptFailureException() != null) { - throw new OmegaException(timeoutProb.interruptFailureException); - }else{ - throw new TransactionTimeoutException(e.getMessage(),e); - } - } catch (IllegalMonitorStateException e) { - if (timeoutProb.getInterruptFailureException() != null) { - throw new OmegaException(timeoutProb.interruptFailureException); - }else{ - throw new TransactionTimeoutException(e.getMessage(),e); - } - } catch (ClosedByInterruptException e) { - if (timeoutProb.getInterruptFailureException() != null) { - throw new OmegaException(timeoutProb.interruptFailureException); - }else{ - throw new TransactionTimeoutException(e.getMessage(),e); - } - } catch (Throwable e) { - throw e; - } finally { - this.timeoutProbs.remove(timeoutProb); - } - return output; - } - - /** - * Define timeout probe - */ - private static final class TimeoutProb implements - Comparable<TimeoutProb> { - - private final transient Thread thread = Thread.currentThread(); - private final transient long startTime = System.currentTimeMillis(); - private final transient long expireTime; - private Exception interruptFailureException = null; - private final transient ProceedingJoinPoint joinPoint; - - public TimeoutProb(final ProceedingJoinPoint pnt, Compensable compensable) { - this.joinPoint = pnt; - this.expireTime = this.startTime + TimeUnit.SECONDS.toMillis(compensable.timeout()); - } - - @Override - public int compareTo(final TimeoutProb obj) { - int compare; - if (this.expireTime > obj.expireTime) { - compare = 1; - } else if (this.expireTime < obj.expireTime) { - compare = -1; - } else { - compare = 0; - } - return compare; - } - - public Exception getInterruptFailureException() { - return interruptFailureException; - } - - /** - * - * @return Returns TRUE if expired - */ - public boolean expired() { - return this.expireTime < System.currentTimeMillis(); - } - - /** - * Interrupt thread - * - * @return Returns TRUE if the thread has been interrupted - */ - public boolean interrupted() { - boolean interrupted; - if (this.thread.isAlive()) { - // 如果当前线程是活动状态,则发送线程中断信号 - try { - this.thread.interrupt(); - } catch (Exception e) { - this.interruptFailureException = e; - LOG.info("Failed to interrupt the thread " + this.thread.getName(), e); - throw e; - } - final Method method = MethodSignature.class.cast(this.joinPoint.getSignature()).getMethod(); - LOG.warn("{}: interrupted on {}ms timeout (over {}ms)", - new Object[]{method, System.currentTimeMillis() - this.startTime, - this.expireTime - this.startTime} - ); - interrupted = false; - } else { - interrupted = true; - } - return interrupted; - } - } - - public class TimeoutProbeThreadFactory implements ThreadFactory { - - public Thread newThread(Runnable runnable) { - Thread thread = new Thread(new ThreadGroup("recovery-policy-timeout-wrapper"), runnable, - "probe"); - thread.setPriority(Thread.MAX_PRIORITY); - thread.setDaemon(true); - return thread; - } - } -} diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaAbortedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaAbortedEvent.java new file mode 100644 index 0000000..fcc77e6 --- /dev/null +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaAbortedEvent.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.pack.omega.transaction; + +import java.io.PrintWriter; +import java.io.StringWriter; +import org.apache.servicecomb.pack.common.EventType; + +public class SagaAbortedEvent extends TxEvent { + + private static final int PAYLOADS_MAX_LENGTH = 10240; + + public SagaAbortedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Throwable throwable) { + super(EventType.SagaAbortedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0, "", 0, + stackTrace(throwable)); + } + + private static String stackTrace(Throwable e) { + StringWriter writer = new StringWriter(); + e.printStackTrace(new PrintWriter(writer)); + String stackTrace = writer.toString(); + if (stackTrace.length() > PAYLOADS_MAX_LENGTH) { + stackTrace = stackTrace.substring(0, PAYLOADS_MAX_LENGTH); + } + return stackTrace; + } +} \ No newline at end of file diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAnnotationProcessor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAnnotationProcessor.java index 87b7808..767171b 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAnnotationProcessor.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAnnotationProcessor.java @@ -20,17 +20,17 @@ package org.apache.servicecomb.pack.omega.transaction; import javax.transaction.TransactionalException; import org.apache.servicecomb.pack.omega.context.OmegaContext; -class SagaStartAnnotationProcessor { +public class SagaStartAnnotationProcessor { private final OmegaContext omegaContext; private final SagaMessageSender sender; - SagaStartAnnotationProcessor(OmegaContext omegaContext, SagaMessageSender sender) { + public SagaStartAnnotationProcessor(OmegaContext omegaContext, SagaMessageSender sender) { this.omegaContext = omegaContext; this.sender = sender; } - AlphaResponse preIntercept(int timeout) { + public AlphaResponse preIntercept(int timeout) { try { return sender .send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId(), timeout)); @@ -39,7 +39,7 @@ class SagaStartAnnotationProcessor { } } - void postIntercept(String parentTxId) { + public void postIntercept(String parentTxId) { AlphaResponse response = sender .send(new SagaEndedEvent(omegaContext.globalTxId(), omegaContext.localTxId())); if (response.aborted()) { @@ -47,10 +47,16 @@ class SagaStartAnnotationProcessor { } } - void onError(String compensationMethod, Throwable throwable) { + public void onError(String compensationMethod, Throwable throwable) { String globalTxId = omegaContext.globalTxId(); - sender.send( - new TxAbortedEvent(globalTxId, omegaContext.localTxId(), null, compensationMethod, - throwable)); + if(omegaContext.isAlphaFeatureAkkaEnabled()){ + sender.send( + new SagaAbortedEvent(globalTxId, omegaContext.localTxId(), null, compensationMethod, + throwable)); + }else{ + sender.send( + new TxAbortedEvent(globalTxId, omegaContext.localTxId(), null, compensationMethod, + throwable)); + } } } diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAspect.java index ec4441e..51eba27 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAspect.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAspect.java @@ -17,20 +17,16 @@ package org.apache.servicecomb.pack.omega.transaction; -import java.lang.invoke.MethodHandles; -import java.lang.reflect.Method; import org.apache.servicecomb.pack.omega.context.OmegaContext; import org.apache.servicecomb.pack.omega.context.annotations.SagaStart; +import org.apache.servicecomb.pack.omega.transaction.wrapper.SagaStartAnnotationProcessorTimeoutWrapper; +import org.apache.servicecomb.pack.omega.transaction.wrapper.SagaStartAnnotationProcessorWrapper; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; -import org.aspectj.lang.reflect.MethodSignature; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; @Aspect public class SagaStartAspect { - private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private final SagaStartAnnotationProcessor sagaStartAnnotationProcessor; @@ -44,27 +40,12 @@ public class SagaStartAspect { @Around("execution(@org.apache.servicecomb.pack.omega.context.annotations.SagaStart * *(..)) && @annotation(sagaStart)") Object advise(ProceedingJoinPoint joinPoint, SagaStart sagaStart) throws Throwable { initializeOmegaContext(); - Method method = ((MethodSignature) joinPoint.getSignature()).getMethod(); - - sagaStartAnnotationProcessor.preIntercept(sagaStart.timeout()); - LOG.debug("Initialized context {} before execution of method {}", context, method.toString()); - - try { - Object result = joinPoint.proceed(); - - sagaStartAnnotationProcessor.postIntercept(context.globalTxId()); - LOG.debug("Transaction with context {} has finished.", context); - - return result; - } catch (Throwable throwable) { - // We don't need to handle the OmegaException here - if (!(throwable instanceof OmegaException)) { - sagaStartAnnotationProcessor.onError(method.toString(), throwable); - LOG.error("Transaction {} failed.", context.globalTxId()); - } - throw throwable; - } finally { - context.clear(); + if(context.isAlphaFeatureAkkaEnabled() && sagaStart.timeout()>0){ + SagaStartAnnotationProcessorTimeoutWrapper wrapper = new SagaStartAnnotationProcessorTimeoutWrapper(this.sagaStartAnnotationProcessor); + return wrapper.apply(joinPoint,sagaStart,context); + }else{ + SagaStartAnnotationProcessorWrapper wrapper = new SagaStartAnnotationProcessorWrapper(this.sagaStartAnnotationProcessor); + return wrapper.apply(joinPoint,sagaStart,context); } } diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/RecoveryPolicyTimeoutWrapper.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/RecoveryPolicyTimeoutWrapper.java new file mode 100644 index 0000000..0a59acb --- /dev/null +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/RecoveryPolicyTimeoutWrapper.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.pack.omega.transaction.wrapper; + +import java.lang.invoke.MethodHandles; +import java.nio.channels.ClosedByInterruptException; +import org.apache.servicecomb.pack.omega.context.OmegaContext; +import org.apache.servicecomb.pack.omega.transaction.AbstractRecoveryPolicy; +import org.apache.servicecomb.pack.omega.transaction.CompensableInterceptor; +import org.apache.servicecomb.pack.omega.transaction.OmegaException; +import org.apache.servicecomb.pack.omega.transaction.TransactionTimeoutException; +import org.apache.servicecomb.pack.omega.transaction.annotations.Compensable; +import org.aspectj.lang.ProceedingJoinPoint; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * RecoveryPolicy Wrapper + * 1.Use this wrapper to send a request if the @Compensable timeout>0 + * 2.Terminate thread execution if execution time is greater than the timeout of @Compensable + * + * Exception + * 1.If the interrupt succeeds, a TransactionTimeoutException is thrown and the local transaction is rollback + * 2.If the interrupt fails, throw an OmegaException + * + * Note: Omega end thread coding advice + * 1.add short sleep to while true loop. Otherwise, the thread may not be able to terminate. + * 2.Replace the synchronized with ReentrantLock, Otherwise, the thread may not be able to terminate. + * */ + +public class RecoveryPolicyTimeoutWrapper { + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + //private static RecoveryPolicyTimeoutWrapper instance = new RecoveryPolicyTimeoutWrapper(); + private AbstractRecoveryPolicy recoveryPolicy; + //private final transient Set<TimeoutProb> timeoutProbs = new ConcurrentSkipListSet<TimeoutProb>(); + +// public static RecoveryPolicyTimeoutWrapper getInstance() { +// return instance; +// } + + public RecoveryPolicyTimeoutWrapper(AbstractRecoveryPolicy recoveryPolicy) { + this.recoveryPolicy = recoveryPolicy; +// this.interrupter.scheduleWithFixedDelay( +// new Runnable() { +// @Override +// public void run() { +// try { +// RecoveryPolicyTimeoutWrapper.this.interrupt(); +// } catch (Exception e) { +// LOG.error("The overtime thread interrupt fail",e); +// } +// } +// }, +// 0, delay, TimeUnit.MICROSECONDS +// ); + } + + /** + * Configuration timeout probe thread + */ +// private final transient ScheduledExecutorService interrupter = +// Executors.newSingleThreadScheduledExecutor( +// new TimeoutProbeThreadFactory() +// ); + + /** + * Loop detection of all thread timeout probes, remove probe if the thread has terminated + */ +// private void interrupt() { +// synchronized (this.interrupter) { +// for (TimeoutProb timeoutProb : this.timeoutProbs) { +// if (timeoutProb.interruptFailureException == null) { +// if (timeoutProb.expired()) { +// if (timeoutProb.interrupted()) { +// this.timeoutProbs.remove(timeoutProb); +// } +// } +// } +// } +// } +// } + +// public RecoveryPolicyTimeoutWrapper wrapper(AbstractRecoveryPolicy recoveryPolicy) { +// this.recoveryPolicy = recoveryPolicy; +// return this; +// } + + public Object applyTo(ProceedingJoinPoint joinPoint, Compensable compensable, + CompensableInterceptor interceptor, OmegaContext context, String parentTxId, int retries) + throws Throwable { + final TimeoutProb timeoutProb = TimeoutProbManager.getInstance().addTimeoutProb(compensable.timeout()); + Object output; + try { + output = this.recoveryPolicy + .applyTo(joinPoint, compensable, interceptor, context, parentTxId, retries); + if (timeoutProb.getInterruptFailureException() != null) { + throw new OmegaException(timeoutProb.getInterruptFailureException()); + } + } catch (InterruptedException e) { + if (timeoutProb.getInterruptFailureException() != null) { + throw new OmegaException(timeoutProb.getInterruptFailureException()); + }else{ + throw new TransactionTimeoutException(e.getMessage(),e); + } + } catch (IllegalMonitorStateException e) { + if (timeoutProb.getInterruptFailureException() != null) { + throw new OmegaException(timeoutProb.getInterruptFailureException()); + }else{ + throw new TransactionTimeoutException(e.getMessage(),e); + } + } catch (ClosedByInterruptException e) { + if (timeoutProb.getInterruptFailureException() != null) { + throw new OmegaException(timeoutProb.getInterruptFailureException()); + }else{ + throw new TransactionTimeoutException(e.getMessage(),e); + } + } catch (Throwable e) { + throw e; + } finally { + TimeoutProbManager.getInstance().removeTimeoutProb(timeoutProb); + } + return output; + } + + /** + * Define timeout probe + */ +// private static final class TimeoutProb implements +// Comparable<TimeoutProb> { +// +// private final transient Thread thread = Thread.currentThread(); +// private final transient long startTime = System.currentTimeMillis(); +// private final transient long expireTime; +// private Exception interruptFailureException = null; +// private final transient ProceedingJoinPoint joinPoint; +// +// public TimeoutProb(final ProceedingJoinPoint pnt, Compensable compensable) { +// this.joinPoint = pnt; +// this.expireTime = this.startTime + TimeUnit.SECONDS.toMillis(compensable.timeout()); +// } +// +// @Override +// public int compareTo(final TimeoutProb obj) { +// int compare; +// if (this.expireTime > obj.expireTime) { +// compare = 1; +// } else if (this.expireTime < obj.expireTime) { +// compare = -1; +// } else { +// compare = 0; +// } +// return compare; +// } +// +// public Exception getInterruptFailureException() { +// return interruptFailureException; +// } +// +// /** +// * +// * @return Returns TRUE if expired +// */ +// public boolean expired() { +// return this.expireTime < System.currentTimeMillis(); +// } +// +// /** +// * Interrupt thread +// * +// * @return Returns TRUE if the thread has been interrupted +// */ +// public boolean interrupted() { +// boolean interrupted; +// if (this.thread.isAlive()) { +// // 如果当前线程是活动状态,则发送线程中断信号 +// try { +// this.thread.interrupt(); +// } catch (Exception e) { +// this.interruptFailureException = e; +// LOG.info("Failed to interrupt the thread " + this.thread.getName(), e); +// throw e; +// } +// final Method method = MethodSignature.class.cast(this.joinPoint.getSignature()).getMethod(); +// LOG.warn("{}: interrupted on {}ms timeout (over {}ms)", +// new Object[]{method, System.currentTimeMillis() - this.startTime, +// this.expireTime - this.startTime} +// ); +// interrupted = false; +// } else { +// interrupted = true; +// } +// return interrupted; +// } +// } +// +// public class TimeoutProbeThreadFactory implements ThreadFactory { +// +// public Thread newThread(Runnable runnable) { +// Thread thread = new Thread(new ThreadGroup("recovery-policy-timeout-wrapper"), runnable, +// "probe"); +// thread.setPriority(Thread.MAX_PRIORITY); +// thread.setDaemon(true); +// return thread; +// } +// } +} diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/SagaStartAnnotationProcessorTimeoutWrapper.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/SagaStartAnnotationProcessorTimeoutWrapper.java new file mode 100644 index 0000000..5d7d7d6 --- /dev/null +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/SagaStartAnnotationProcessorTimeoutWrapper.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.pack.omega.transaction.wrapper; + +import java.lang.invoke.MethodHandles; +import java.lang.reflect.Method; +import java.nio.channels.ClosedByInterruptException; +import org.apache.servicecomb.pack.omega.context.OmegaContext; +import org.apache.servicecomb.pack.omega.context.annotations.SagaStart; +import org.apache.servicecomb.pack.omega.transaction.OmegaException; +import org.apache.servicecomb.pack.omega.transaction.SagaStartAnnotationProcessor; +import org.apache.servicecomb.pack.omega.transaction.TransactionTimeoutException; +import org.aspectj.lang.ProceedingJoinPoint; +import org.aspectj.lang.reflect.MethodSignature; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SagaStartAnnotationProcessorTimeoutWrapper { + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private final SagaStartAnnotationProcessor sagaStartAnnotationProcessor; + + public SagaStartAnnotationProcessorTimeoutWrapper( + SagaStartAnnotationProcessor sagaStartAnnotationProcessor) { + this.sagaStartAnnotationProcessor = sagaStartAnnotationProcessor; + } + + public Object apply(ProceedingJoinPoint joinPoint, SagaStart sagaStart, OmegaContext context) + throws Throwable { + final TimeoutProb timeoutProb = TimeoutProbManager.getInstance() + .addTimeoutProb(sagaStart.timeout()); + Object output; + try { + Method method = ((MethodSignature) joinPoint.getSignature()).getMethod(); + sagaStartAnnotationProcessor.preIntercept(sagaStart.timeout()); + if (LOG.isDebugEnabled()) { + LOG.debug("Initialized context {} before execution of method {}", context, + method.toString()); + } + try { + output = joinPoint.proceed(); + if (timeoutProb.getInterruptFailureException() != null) { + throw new OmegaException(timeoutProb.getInterruptFailureException()); + } + sagaStartAnnotationProcessor.postIntercept(context.globalTxId()); + if (LOG.isDebugEnabled()) { + LOG.debug("Transaction with context {} has finished.", context); + } + return output; + } catch (Throwable throwable) { + // TODO We still need to intercept some exceptions that we can't judge the state of the child transaction. + // At this point, we don't need to send SagaAbortEvent, just need to throw a TransactionTimeoutException + // For example, java.net.SocketTimeoutException, etc. + if (LOG.isDebugEnabled()) { + LOG.debug("TimeoutWrapper exception {}", throwable.getClass().getName()); + } + if (timeoutProb.getInterruptFailureException() != null) { + LOG.info("TimeoutProb interrupt fail"); + throw timeoutProb.getInterruptFailureException(); + } else if (isThreadInterruptException(throwable)) { + // We don't have to send an SagaAbortEvent + // Because the SagaActor state automatically change to suspended when timeout. + throw new TransactionTimeoutException("Timeout interrupt", throwable); + } else { + // We don't need to handle the OmegaException here + if (!(throwable instanceof OmegaException)) { + LOG.info("TimeoutWrapper Exception {}", throwable.getClass().getName()); + sagaStartAnnotationProcessor.onError(method.toString(), throwable); + LOG.error("Transaction {} failed.", context.globalTxId()); + } + } + throw throwable; + } + } finally { + context.clear(); + TimeoutProbManager.getInstance().removeTimeoutProb(timeoutProb); + } + } + + private boolean isThreadInterruptException(Throwable throwable) { + if (throwable instanceof InterruptedException || + throwable instanceof IllegalMonitorStateException || + throwable instanceof ClosedByInterruptException || + throwable.getCause() instanceof InterruptedException || + throwable.getCause() instanceof IllegalMonitorStateException || + throwable.getCause() instanceof ClosedByInterruptException) { + return true; + } else { + return false; + } + } +} diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/SagaStartAnnotationProcessorWrapper.java similarity index 74% copy from omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAspect.java copy to omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/SagaStartAnnotationProcessorWrapper.java index ec4441e..2837c7f 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAspect.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/SagaStartAnnotationProcessorWrapper.java @@ -15,46 +15,38 @@ * limitations under the License. */ -package org.apache.servicecomb.pack.omega.transaction; +package org.apache.servicecomb.pack.omega.transaction.wrapper; import java.lang.invoke.MethodHandles; import java.lang.reflect.Method; import org.apache.servicecomb.pack.omega.context.OmegaContext; import org.apache.servicecomb.pack.omega.context.annotations.SagaStart; +import org.apache.servicecomb.pack.omega.transaction.OmegaException; +import org.apache.servicecomb.pack.omega.transaction.SagaStartAnnotationProcessor; import org.aspectj.lang.ProceedingJoinPoint; -import org.aspectj.lang.annotation.Around; -import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.reflect.MethodSignature; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@Aspect -public class SagaStartAspect { - private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); +public class SagaStartAnnotationProcessorWrapper { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private final SagaStartAnnotationProcessor sagaStartAnnotationProcessor; - private final OmegaContext context; - - public SagaStartAspect(SagaMessageSender sender, OmegaContext context) { - this.context = context; - this.sagaStartAnnotationProcessor = new SagaStartAnnotationProcessor(context, sender); + public SagaStartAnnotationProcessorWrapper( + SagaStartAnnotationProcessor sagaStartAnnotationProcessor) { + this.sagaStartAnnotationProcessor = sagaStartAnnotationProcessor; } - @Around("execution(@org.apache.servicecomb.pack.omega.context.annotations.SagaStart * *(..)) && @annotation(sagaStart)") - Object advise(ProceedingJoinPoint joinPoint, SagaStart sagaStart) throws Throwable { - initializeOmegaContext(); + public Object apply(ProceedingJoinPoint joinPoint, SagaStart sagaStart, OmegaContext context) + throws Throwable { Method method = ((MethodSignature) joinPoint.getSignature()).getMethod(); - sagaStartAnnotationProcessor.preIntercept(sagaStart.timeout()); LOG.debug("Initialized context {} before execution of method {}", context, method.toString()); - try { Object result = joinPoint.proceed(); - sagaStartAnnotationProcessor.postIntercept(context.globalTxId()); LOG.debug("Transaction with context {} has finished.", context); - return result; } catch (Throwable throwable) { // We don't need to handle the OmegaException here @@ -67,8 +59,4 @@ public class SagaStartAspect { context.clear(); } } - - private void initializeOmegaContext() { - context.setLocalTxId(context.newGlobalTxId()); - } } diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/TimeoutProb.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/TimeoutProb.java new file mode 100644 index 0000000..dc70aef --- /dev/null +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/TimeoutProb.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.pack.omega.transaction.wrapper; + +import java.lang.invoke.MethodHandles; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Define timeout probe + */ +public class TimeoutProb implements Comparable<TimeoutProb> { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private final transient Thread thread = Thread.currentThread(); + private final transient long startTime = System.currentTimeMillis(); + private final transient long expireTime; + private Exception interruptFailureException = null; + private boolean interruptSent = false; + public TimeoutProb(int timeout) { + this.expireTime = this.startTime + TimeUnit.SECONDS.toMillis(timeout); + } + + @Override + public int compareTo(final TimeoutProb obj) { + int compare; + if (this.expireTime > obj.expireTime) { + compare = 1; + } else if (this.expireTime < obj.expireTime) { + compare = -1; + } else { + compare = 0; + } + return compare; + } + + public Exception getInterruptFailureException() { + return interruptFailureException; + } + + /** + * @return Returns TRUE if expired + */ + public boolean expired() { + return this.expireTime < System.currentTimeMillis(); + } + + /** + * Interrupt thread + * + * @return Returns TRUE if the thread has been interrupted + */ + public boolean interrupted() { + boolean interrupted; + if (this.thread.isAlive()) { + // 如果当前线程是活动状态,则发送线程中断信号 + try { + this.thread.interrupt(); + if(!interruptSent){ + LOG.warn("Thread interrupted on {}ms timeout (over {}ms)", + new Object[]{System.currentTimeMillis() - this.startTime, + this.expireTime - this.startTime} + ); + } + interruptSent = true; + } catch (Exception e) { + this.interruptFailureException = e; + LOG.info("Failed to interrupt the thread " + this.thread.getName(), e); + throw e; + } + interrupted = false; + } else { + interrupted = true; + } + return interrupted; + } +} diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/TimeoutProbManager.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/TimeoutProbManager.java new file mode 100644 index 0000000..3a3a2ac --- /dev/null +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/TimeoutProbManager.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.pack.omega.transaction.wrapper; + +import java.lang.invoke.MethodHandles; +import java.util.Set; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TimeoutProbManager { + + private static TimeoutProbManager instance = new TimeoutProbManager(100); + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private final transient Set<TimeoutProb> timeoutProbs = new ConcurrentSkipListSet<TimeoutProb>(); + private final transient ScheduledExecutorService interrupter = + Executors.newSingleThreadScheduledExecutor( + new TimeoutProbeThreadFactory() + ); + + public static TimeoutProbManager getInstance() { + return instance; + } + + public TimeoutProbManager(int delay) { + this.interrupter.scheduleWithFixedDelay( + new Runnable() { + @Override + public void run() { + try { + TimeoutProbManager.this.interrupt(); + } catch (Exception e) { + LOG.error("The overtime thread interrupt fail", e); + } + } + }, + 0, delay, TimeUnit.MICROSECONDS + ); + } + + public TimeoutProb addTimeoutProb(int timeout) { + final TimeoutProb timeoutProb = new TimeoutProb(timeout); + this.timeoutProbs.add(timeoutProb); + return timeoutProb; + } + + public void removeTimeoutProb(TimeoutProb timeoutProb) { + this.timeoutProbs.remove(timeoutProb); + } + + /** + * Loop detection of all thread timeout probes, remove probe if the thread has terminated + */ + private void interrupt() { + synchronized (this.interrupter) { + for (TimeoutProb timeoutProb : this.timeoutProbs) { + if (timeoutProb.getInterruptFailureException() == null) { + if (timeoutProb.expired()) { + if (timeoutProb.interrupted()) { + this.timeoutProbs.remove(timeoutProb); + } + } + } + } + } + } + + /** + * Configuration timeout probe thread + */ + public class TimeoutProbeThreadFactory implements ThreadFactory { + + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(new ThreadGroup("recovery-policy-timeout-wrapper"), runnable, + "probe"); + thread.setPriority(Thread.MAX_PRIORITY); + thread.setDaemon(true); + return thread; + } + } +} diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAspectTest.java index c11a790..656eac1 100644 --- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAspectTest.java +++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAspectTest.java @@ -72,20 +72,20 @@ public class SagaStartAspectTest { private final IdGenerator<String> idGenerator = Mockito.mock(IdGenerator.class); private final SagaStart sagaStart = Mockito.mock(SagaStart.class); - private final OmegaContext omegaContext = new OmegaContext(idGenerator); - private final SagaStartAspect aspect = new SagaStartAspect(sender, omegaContext); + private OmegaContext omegaContext; + private SagaStartAspect aspect; @Before public void setUp() throws Exception { when(idGenerator.nextId()).thenReturn(globalTxId); when(joinPoint.getSignature()).thenReturn(methodSignature); - when(methodSignature.getMethod()).thenReturn(this.getClass().getDeclaredMethod("doNothing")); - omegaContext.clear(); } @Test public void newGlobalTxIdInSagaStart() throws Throwable { + omegaContext = new OmegaContext(idGenerator); + aspect = new SagaStartAspect(sender, omegaContext); aspect.advise(joinPoint, sagaStart); TxEvent startedEvent = messages.get(0); @@ -108,6 +108,8 @@ public class SagaStartAspectTest { @Test public void clearContextOnSagaStartError() throws Throwable { + omegaContext = new OmegaContext(idGenerator); + aspect = new SagaStartAspect(sender, omegaContext); RuntimeException oops = new RuntimeException("oops"); when(joinPoint.proceed()).thenThrow(oops); @@ -138,6 +140,40 @@ public class SagaStartAspectTest { assertThat(omegaContext.localTxId(), is(nullValue())); } + @Test + public void clearContextOnSagaStartErrorWithAkka() throws Throwable { + omegaContext = new OmegaContext(idGenerator,true); + aspect = new SagaStartAspect(sender, omegaContext); + RuntimeException oops = new RuntimeException("oops"); + + when(joinPoint.proceed()).thenThrow(oops); + + try { + aspect.advise(joinPoint, sagaStart); + expectFailing(RuntimeException.class); + } catch (RuntimeException e) { + assertThat(e, is(oops)); + } + + assertThat(messages.size(), is(2)); + TxEvent event = messages.get(0); + + assertThat(event.globalTxId(), is(globalTxId)); + assertThat(event.localTxId(), is(globalTxId)); + assertThat(event.parentTxId(), is(nullValue())); + assertThat(event.type(), is(EventType.SagaStartedEvent)); + + event = messages.get(1); + + assertThat(event.globalTxId(), is(globalTxId)); + assertThat(event.localTxId(), is(globalTxId)); + assertThat(event.parentTxId(), is(nullValue())); + assertThat(event.type(), is(EventType.SagaAbortedEvent)); + + assertThat(omegaContext.globalTxId(), is(nullValue())); + assertThat(omegaContext.localTxId(), is(nullValue())); + } + private String doNothing() { return "doNothing"; }
