This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 46096fe2f0f0d3d03fb256ed2fd5f221c0b9e851 Author: Eric Lee <dagang...@huawei.com> AuthorDate: Tue Jan 23 16:09:18 2018 +0800 SCB-239 remove timeout from omega Signed-off-by: Eric Lee <dagang...@huawei.com> --- ...eInterceptor.java => OnceAwareInterceptor.java} | 4 +- .../saga/omega/transaction/SagaStartAspect.java | 25 +-------- .../saga/omega/transaction/TransactionAspect.java | 29 +---------- ...ptorTest.java => OnceAwareInterceptorTest.java} | 59 ++++------------------ .../omega/transaction/SagaStartAspectTest.java | 44 ---------------- .../omega/transaction/TransactionAspectTest.java | 48 ------------------ 6 files changed, 17 insertions(+), 192 deletions(-) diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OnceAwareInterceptor.java similarity index 94% rename from omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java rename to omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OnceAwareInterceptor.java index 9de26d2..2ab7d12 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OnceAwareInterceptor.java @@ -19,12 +19,12 @@ package org.apache.servicecomb.saga.omega.transaction; import java.util.concurrent.atomic.AtomicReference; -class TimeAwareInterceptor implements EventAwareInterceptor { +class OnceAwareInterceptor implements EventAwareInterceptor { private final EventAwareInterceptor interceptor; private final AtomicReference<EventAwareInterceptor> interceptorRef; private Throwable throwable = null; - TimeAwareInterceptor(EventAwareInterceptor interceptor) { + OnceAwareInterceptor(EventAwareInterceptor interceptor) { this.interceptor = interceptor; this.interceptorRef = new AtomicReference<>(interceptor); } diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java index db8e3a0..7328fef 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java @@ -17,12 +17,8 @@ package org.apache.servicecomb.saga.omega.transaction; -import static java.util.concurrent.TimeUnit.MILLISECONDS; - import java.lang.invoke.MethodHandles; import java.lang.reflect.Method; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import org.apache.servicecomb.saga.omega.context.OmegaContext; import org.apache.servicecomb.saga.omega.context.annotations.SagaStart; @@ -38,7 +34,7 @@ public class SagaStartAspect { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private final SagaStartAnnotationProcessor sagaStartAnnotationProcessor; - private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + private final OmegaContext context; public SagaStartAspect(MessageSender sender, OmegaContext context) { @@ -51,11 +47,10 @@ public class SagaStartAspect { initializeOmegaContext(); Method method = ((MethodSignature) joinPoint.getSignature()).getMethod(); - TimeAwareInterceptor interceptor = new TimeAwareInterceptor(sagaStartAnnotationProcessor); + OnceAwareInterceptor interceptor = new OnceAwareInterceptor(sagaStartAnnotationProcessor); interceptor.preIntercept(context.globalTxId(), method.toString(), sagaStart.timeout()); LOG.debug("Initialized context {} before execution of method {}", context, method.toString()); - scheduleTimeoutTask(interceptor, method, sagaStart.timeout()); try { Object result = joinPoint.proceed(); @@ -74,20 +69,4 @@ public class SagaStartAspect { private void initializeOmegaContext() { context.setLocalTxId(context.newGlobalTxId()); } - - private void scheduleTimeoutTask( - TimeAwareInterceptor interceptor, - Method method, - int timeout) { - - if (timeout > 0) { - executor.schedule( - () -> interceptor.onTimeout( - context.globalTxId(), - method.toString(), - new OmegaTxTimeoutException("Saga " + method.toString() + " timed out")), - timeout, - MILLISECONDS); - } - } } diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java index 718d1fd..090fe2e 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java @@ -17,12 +17,8 @@ package org.apache.servicecomb.saga.omega.transaction; -import static java.util.concurrent.TimeUnit.MILLISECONDS; - import java.lang.invoke.MethodHandles; import java.lang.reflect.Method; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import javax.transaction.InvalidTransactionException; @@ -40,7 +36,7 @@ public class TransactionAspect { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private final OmegaContext context; - private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + private final CompensableInterceptor interceptor; public TransactionAspect(MessageSender sender, OmegaContext context) { @@ -58,7 +54,7 @@ public class TransactionAspect { String localTxId = context.localTxId(); context.newLocalTxId(); - TimeAwareInterceptor interceptor = new TimeAwareInterceptor(this.interceptor); + OnceAwareInterceptor interceptor = new OnceAwareInterceptor(this.interceptor); AlphaResponse response = interceptor.preIntercept(localTxId, signature, compensable.timeout(), joinPoint.getArgs()); if (response.aborted()) { String abortedLocalTxId = context.localTxId(); @@ -68,9 +64,6 @@ public class TransactionAspect { } LOG.debug("Updated context {} for compensable method {} ", context, method.toString()); - // TODO: 2018/1/15 omega shall be stateless, all states shall be on alpha - scheduleTimeoutTask(interceptor, localTxId, signature, method, compensable.timeout()); - try { Object result = joinPoint.proceed(); interceptor.postIntercept(localTxId, signature); @@ -85,24 +78,6 @@ public class TransactionAspect { } } - private void scheduleTimeoutTask( - TimeAwareInterceptor interceptor, - String localTxId, - String signature, - Method method, - int timeout) { - - if (timeout > 0) { - executor.schedule( - () -> interceptor.onTimeout( - localTxId, - signature, - new OmegaTxTimeoutException("Transaction " + method.toString() + " timed out")), - timeout, - MILLISECONDS); - } - } - private String compensationMethodSignature(ProceedingJoinPoint joinPoint, Compensable compensable, Method method) throws NoSuchMethodException { diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/OnceAwareInterceptorTest.java similarity index 62% rename from omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java rename to omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/OnceAwareInterceptorTest.java index 0f2d2eb..0a87491 100644 --- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java +++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/OnceAwareInterceptorTest.java @@ -20,12 +20,9 @@ package org.apache.servicecomb.saga.omega.transaction; import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; import java.util.LinkedList; import java.util.List; -import java.util.concurrent.BrokenBarrierException; -import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -34,7 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; import org.junit.rules.ExpectedException; -public class TimeAwareInterceptorTest { +public class OnceAwareInterceptorTest { private static final int runningCounts = 1000; private final String localTxId = uniquify("localTxId"); @@ -66,70 +63,36 @@ public class TimeAwareInterceptorTest { }; private final ExecutorService executorService = Executors.newFixedThreadPool(2); - private final RuntimeException timeoutException = new OmegaTxTimeoutException("timed out"); - - @Test(timeout = 5000) - public void invokeEitherPostInterceptOrOnTimeoutConcurrently() throws Exception { + @Test + public void invokePostIntercept() throws Exception { List<Future<?>> futures = new LinkedList<>(); for (int i = 0; i < runningCounts; i++) { - TimeAwareInterceptor interceptor = new TimeAwareInterceptor(underlying); - CyclicBarrier cyclicBarrier = new CyclicBarrier(2); - ExpectedException exception = ExpectedException.none(); - - futures.add(executorService.submit(() -> { - try { - waitForSignal(cyclicBarrier); - interceptor.postIntercept(localTxId, signature); - } catch (Throwable throwable) { - exception.expect(OmegaTxTimeoutException.class); - } - })); - - futures.add(executorService.submit(() -> { - waitForSignal(cyclicBarrier); - interceptor.onTimeout(localTxId, signature, timeoutException); - })); + OnceAwareInterceptor interceptor = new OnceAwareInterceptor(underlying); + + futures.add(executorService.submit(() -> interceptor.postIntercept(localTxId, signature))); } waitTillAllDone(futures); - assertThat(postInterceptInvoked.get() + onTimeoutInvoked.get(), is(runningCounts)); + assertThat(postInterceptInvoked.get(), is(runningCounts)); } - @Test(timeout = 5000) + @Test public void invokeEitherOnErrorOrOnTimeoutConcurrently() throws Exception { RuntimeException oops = new RuntimeException("oops"); List<Future<?>> futures = new LinkedList<>(); for (int i = 0; i < runningCounts; i++) { - TimeAwareInterceptor interceptor = new TimeAwareInterceptor(underlying); - CyclicBarrier cyclicBarrier = new CyclicBarrier(2); - - - futures.add(executorService.submit(() -> { - waitForSignal(cyclicBarrier); - interceptor.onError(localTxId, signature, oops); - })); + OnceAwareInterceptor interceptor = new OnceAwareInterceptor(underlying); - futures.add(executorService.submit(() -> { - waitForSignal(cyclicBarrier); - interceptor.onTimeout(localTxId, signature, timeoutException); - })); + futures.add(executorService.submit(() -> interceptor.onError(localTxId, signature, oops))); } waitTillAllDone(futures); - assertThat(onErrorInvoked.get() + onTimeoutInvoked.get(), is(runningCounts)); - } - - private void waitForSignal(CyclicBarrier cyclicBarrier) { - try { - cyclicBarrier.await(); - } catch (InterruptedException | BrokenBarrierException e) { - fail(e.getMessage()); - } + assertThat(onErrorInvoked.get(), is(runningCounts)); } private void waitTillAllDone(List<Future<?>> futures) diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java index 1bc2b28..77d40ef 100644 --- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java +++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java @@ -18,20 +18,14 @@ package org.apache.servicecomb.saga.omega.transaction; import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing; -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.awaitility.Awaitility.await; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; import static org.mockito.Mockito.when; import java.util.ArrayList; import java.util.List; import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import org.apache.servicecomb.saga.common.EventType; import org.apache.servicecomb.saga.omega.context.IdGenerator; @@ -62,8 +56,6 @@ public class SagaStartAspectTest { private final OmegaContext omegaContext = new OmegaContext(idGenerator); private final SagaStartAspect aspect = new SagaStartAspect(sender, omegaContext); - private final ExecutorService executor = Executors.newSingleThreadExecutor(); - @Before public void setUp() throws Exception { when(idGenerator.nextId()).thenReturn(globalTxId); @@ -120,42 +112,6 @@ public class SagaStartAspectTest { assertThat(omegaContext.localTxId(), is(nullValue())); } - @Test - public void sendsAbortEventOnTimeout() throws Throwable { - CountDownLatch latch = new CountDownLatch(1); - when(sagaStart.timeout()).thenReturn(100); - when(joinPoint.proceed()).thenAnswer(invocationOnMock -> { - latch.await(); - assertThat(omegaContext.localTxId(), is(globalTxId)); - return null; - }); - - ExpectedException exception = ExpectedException.none(); - executor.execute(() -> { - try { - aspect.advise(joinPoint, sagaStart); - } catch (Throwable throwable) { - exception.expect(OmegaTxTimeoutException.class); - } - }); - - await().atMost(1, SECONDS).until(() -> messages.size() == 2); - - TxEvent event = messages.get(1); - - assertThat(event.globalTxId(), is(globalTxId)); - assertThat(event.localTxId(), is(globalTxId)); - assertThat(event.parentTxId(), is(nullValue())); - assertThat(event.type(), is(EventType.TxAbortedEvent)); - - latch.countDown(); - - await().atMost(1, SECONDS).until(() -> omegaContext.localTxId() == null); - - // no redundant ended message received - assertThat(messages.size(), is(2)); - } - private String doNothing() { return "doNothing"; } diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java index 8689a1e..31d148f 100644 --- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java +++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java @@ -18,11 +18,8 @@ package org.apache.servicecomb.saga.omega.transaction; import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing; -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.awaitility.Awaitility.await; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -32,9 +29,6 @@ import static org.mockito.Mockito.when; import java.util.ArrayList; import java.util.List; import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import javax.transaction.InvalidTransactionException; @@ -69,8 +63,6 @@ public class TransactionAspectTest { private final OmegaContext omegaContext = new OmegaContext(idGenerator); private final TransactionAspect aspect = new TransactionAspect(sender, omegaContext); - private final ExecutorService executor = Executors.newSingleThreadExecutor(); - @Before public void setUp() throws Exception { when(idGenerator.nextId()).thenReturn(newLocalTxId); @@ -131,46 +123,6 @@ public class TransactionAspectTest { } @Test - public void sendsAbortEventOnTimeout() throws Throwable { - CountDownLatch latch = new CountDownLatch(1); - when(compensable.timeout()).thenReturn(100); - when(joinPoint.proceed()).thenAnswer(invocationOnMock -> { - latch.await(); - assertThat(omegaContext.localTxId(), is(newLocalTxId)); - return null; - }); - - ExpectedException exception = ExpectedException.none(); - executor.execute(() -> { - try { - // need to setup the thread local for it - omegaContext.setGlobalTxId(globalTxId); - omegaContext.setLocalTxId(localTxId); - - aspect.advise(joinPoint, compensable); - } catch (Throwable throwable) { - exception.expect(OmegaTxTimeoutException.class); - } - }); - - await().atMost(1, SECONDS).until(() -> messages.size() == 2); - - TxEvent event = messages.get(1); - - assertThat(event.globalTxId(), is(globalTxId)); - assertThat(event.localTxId(), is(newLocalTxId)); - assertThat(event.parentTxId(), is(localTxId)); - assertThat(event.type(), is(EventType.TxAbortedEvent)); - - latch.countDown(); - - await().atMost(1, SECONDS).until(() -> localTxId.equals(omegaContext.localTxId())); - - // no redundant ended message received - assertThat(messages.size(), is(2)); - } - - @Test public void returnImmediatelyWhenReceivedRejectResponse() throws Throwable { MessageSender sender = mock(MessageSender.class); when(sender.send(any())).thenReturn(new AlphaResponse(true)); -- To stop receiving notification emails like this one, please contact ningji...@apache.org.