This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 46096fe2f0f0d3d03fb256ed2fd5f221c0b9e851
Author: Eric Lee <dagang...@huawei.com>
AuthorDate: Tue Jan 23 16:09:18 2018 +0800

    SCB-239 remove timeout from omega
    
    Signed-off-by: Eric Lee <dagang...@huawei.com>
---
 ...eInterceptor.java => OnceAwareInterceptor.java} |  4 +-
 .../saga/omega/transaction/SagaStartAspect.java    | 25 +--------
 .../saga/omega/transaction/TransactionAspect.java  | 29 +----------
 ...ptorTest.java => OnceAwareInterceptorTest.java} | 59 ++++------------------
 .../omega/transaction/SagaStartAspectTest.java     | 44 ----------------
 .../omega/transaction/TransactionAspectTest.java   | 48 ------------------
 6 files changed, 17 insertions(+), 192 deletions(-)

diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OnceAwareInterceptor.java
similarity index 94%
rename from 
omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
rename to 
omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OnceAwareInterceptor.java
index 9de26d2..2ab7d12 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OnceAwareInterceptor.java
@@ -19,12 +19,12 @@ package org.apache.servicecomb.saga.omega.transaction;
 
 import java.util.concurrent.atomic.AtomicReference;
 
-class TimeAwareInterceptor implements EventAwareInterceptor {
+class OnceAwareInterceptor implements EventAwareInterceptor {
   private final EventAwareInterceptor interceptor;
   private final AtomicReference<EventAwareInterceptor> interceptorRef;
   private Throwable throwable = null;
 
-  TimeAwareInterceptor(EventAwareInterceptor interceptor) {
+  OnceAwareInterceptor(EventAwareInterceptor interceptor) {
     this.interceptor = interceptor;
     this.interceptorRef = new AtomicReference<>(interceptor);
   }
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
index db8e3a0..7328fef 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
@@ -17,12 +17,8 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-
 import java.lang.invoke.MethodHandles;
 import java.lang.reflect.Method;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
 import org.apache.servicecomb.saga.omega.context.annotations.SagaStart;
@@ -38,7 +34,7 @@ public class SagaStartAspect {
   private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final SagaStartAnnotationProcessor sagaStartAnnotationProcessor;
-  private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+
   private final OmegaContext context;
 
   public SagaStartAspect(MessageSender sender, OmegaContext context) {
@@ -51,11 +47,10 @@ public class SagaStartAspect {
     initializeOmegaContext();
     Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
 
-    TimeAwareInterceptor interceptor = new 
TimeAwareInterceptor(sagaStartAnnotationProcessor);
+    OnceAwareInterceptor interceptor = new 
OnceAwareInterceptor(sagaStartAnnotationProcessor);
     interceptor.preIntercept(context.globalTxId(), method.toString(), 
sagaStart.timeout());
     LOG.debug("Initialized context {} before execution of method {}", context, 
method.toString());
 
-    scheduleTimeoutTask(interceptor, method, sagaStart.timeout());
     try {
       Object result = joinPoint.proceed();
 
@@ -74,20 +69,4 @@ public class SagaStartAspect {
   private void initializeOmegaContext() {
     context.setLocalTxId(context.newGlobalTxId());
   }
-
-  private void scheduleTimeoutTask(
-      TimeAwareInterceptor interceptor,
-      Method method,
-      int timeout) {
-
-    if (timeout > 0) {
-      executor.schedule(
-          () -> interceptor.onTimeout(
-              context.globalTxId(),
-              method.toString(),
-              new OmegaTxTimeoutException("Saga " + method.toString() + " 
timed out")),
-          timeout,
-          MILLISECONDS);
-    }
-  }
 }
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index 718d1fd..090fe2e 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -17,12 +17,8 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-
 import java.lang.invoke.MethodHandles;
 import java.lang.reflect.Method;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 
 import javax.transaction.InvalidTransactionException;
 
@@ -40,7 +36,7 @@ public class TransactionAspect {
   private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final OmegaContext context;
-  private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+
   private final CompensableInterceptor interceptor;
 
   public TransactionAspect(MessageSender sender, OmegaContext context) {
@@ -58,7 +54,7 @@ public class TransactionAspect {
     String localTxId = context.localTxId();
     context.newLocalTxId();
 
-    TimeAwareInterceptor interceptor = new 
TimeAwareInterceptor(this.interceptor);
+    OnceAwareInterceptor interceptor = new 
OnceAwareInterceptor(this.interceptor);
     AlphaResponse response = interceptor.preIntercept(localTxId, signature, 
compensable.timeout(), joinPoint.getArgs());
     if (response.aborted()) {
       String abortedLocalTxId = context.localTxId();
@@ -68,9 +64,6 @@ public class TransactionAspect {
     }
     LOG.debug("Updated context {} for compensable method {} ", context, 
method.toString());
 
-    // TODO: 2018/1/15 omega shall be stateless, all states shall be on alpha
-    scheduleTimeoutTask(interceptor, localTxId, signature, method, 
compensable.timeout());
-
     try {
       Object result = joinPoint.proceed();
       interceptor.postIntercept(localTxId, signature);
@@ -85,24 +78,6 @@ public class TransactionAspect {
     }
   }
 
-  private void scheduleTimeoutTask(
-      TimeAwareInterceptor interceptor,
-      String localTxId,
-      String signature,
-      Method method,
-      int timeout) {
-
-    if (timeout > 0) {
-      executor.schedule(
-          () -> interceptor.onTimeout(
-              localTxId,
-              signature,
-              new OmegaTxTimeoutException("Transaction " + method.toString() + 
" timed out")),
-          timeout,
-          MILLISECONDS);
-    }
-  }
-
   private String compensationMethodSignature(ProceedingJoinPoint joinPoint, 
Compensable compensable, Method method)
       throws NoSuchMethodException {
 
diff --git 
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/OnceAwareInterceptorTest.java
similarity index 62%
rename from 
omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
rename to 
omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/OnceAwareInterceptorTest.java
index 0f2d2eb..0a87491 100644
--- 
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
+++ 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/OnceAwareInterceptorTest.java
@@ -20,12 +20,9 @@ package org.apache.servicecomb.saga.omega.transaction;
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
 
 import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.BrokenBarrierException;
-import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -34,7 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
-public class TimeAwareInterceptorTest {
+public class OnceAwareInterceptorTest {
   private static final int runningCounts = 1000;
 
   private final String localTxId = uniquify("localTxId");
@@ -66,70 +63,36 @@ public class TimeAwareInterceptorTest {
   };
 
   private final ExecutorService executorService = 
Executors.newFixedThreadPool(2);
-  private final RuntimeException timeoutException = new 
OmegaTxTimeoutException("timed out");
 
-
-  @Test(timeout = 5000)
-  public void invokeEitherPostInterceptOrOnTimeoutConcurrently() throws 
Exception {
+  @Test
+  public void invokePostIntercept() throws Exception {
     List<Future<?>> futures = new LinkedList<>();
 
     for (int i = 0; i < runningCounts; i++) {
-      TimeAwareInterceptor interceptor = new TimeAwareInterceptor(underlying);
-      CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
-      ExpectedException exception = ExpectedException.none();
-
-      futures.add(executorService.submit(() -> {
-        try {
-          waitForSignal(cyclicBarrier);
-          interceptor.postIntercept(localTxId, signature);
-        } catch (Throwable throwable) {
-          exception.expect(OmegaTxTimeoutException.class);
-        }
-      }));
-
-      futures.add(executorService.submit(() -> {
-        waitForSignal(cyclicBarrier);
-        interceptor.onTimeout(localTxId, signature, timeoutException);
-      }));
+      OnceAwareInterceptor interceptor = new OnceAwareInterceptor(underlying);
+
+      futures.add(executorService.submit(() -> 
interceptor.postIntercept(localTxId, signature)));
     }
 
     waitTillAllDone(futures);
 
-    assertThat(postInterceptInvoked.get() + onTimeoutInvoked.get(), 
is(runningCounts));
+    assertThat(postInterceptInvoked.get(), is(runningCounts));
   }
 
-  @Test(timeout = 5000)
+  @Test
   public void invokeEitherOnErrorOrOnTimeoutConcurrently() throws Exception {
     RuntimeException oops = new RuntimeException("oops");
     List<Future<?>> futures = new LinkedList<>();
 
     for (int i = 0; i < runningCounts; i++) {
-      TimeAwareInterceptor interceptor = new TimeAwareInterceptor(underlying);
-      CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
-
-
-      futures.add(executorService.submit(() -> {
-        waitForSignal(cyclicBarrier);
-        interceptor.onError(localTxId, signature, oops);
-      }));
+      OnceAwareInterceptor interceptor = new OnceAwareInterceptor(underlying);
 
-      futures.add(executorService.submit(() -> {
-        waitForSignal(cyclicBarrier);
-        interceptor.onTimeout(localTxId, signature, timeoutException);
-      }));
+      futures.add(executorService.submit(() -> interceptor.onError(localTxId, 
signature, oops)));
     }
 
     waitTillAllDone(futures);
 
-    assertThat(onErrorInvoked.get() + onTimeoutInvoked.get(), 
is(runningCounts));
-  }
-
-  private void waitForSignal(CyclicBarrier cyclicBarrier) {
-    try {
-      cyclicBarrier.await();
-    } catch (InterruptedException | BrokenBarrierException e) {
-      fail(e.getMessage());
-    }
+    assertThat(onErrorInvoked.get(), is(runningCounts));
   }
 
   private void waitTillAllDone(List<Future<?>> futures)
diff --git 
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
index 1bc2b28..77d40ef 100644
--- 
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
+++ 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
@@ -18,20 +18,14 @@
 package org.apache.servicecomb.saga.omega.transaction;
 
 import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.awaitility.Awaitility.await;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 import org.apache.servicecomb.saga.common.EventType;
 import org.apache.servicecomb.saga.omega.context.IdGenerator;
@@ -62,8 +56,6 @@ public class SagaStartAspectTest {
   private final OmegaContext omegaContext = new OmegaContext(idGenerator);
   private final SagaStartAspect aspect = new SagaStartAspect(sender, 
omegaContext);
 
-  private final ExecutorService executor = Executors.newSingleThreadExecutor();
-
   @Before
   public void setUp() throws Exception {
     when(idGenerator.nextId()).thenReturn(globalTxId);
@@ -120,42 +112,6 @@ public class SagaStartAspectTest {
     assertThat(omegaContext.localTxId(), is(nullValue()));
   }
 
-  @Test
-  public void sendsAbortEventOnTimeout() throws Throwable {
-    CountDownLatch latch = new CountDownLatch(1);
-    when(sagaStart.timeout()).thenReturn(100);
-    when(joinPoint.proceed()).thenAnswer(invocationOnMock -> {
-      latch.await();
-      assertThat(omegaContext.localTxId(), is(globalTxId));
-      return null;
-    });
-
-    ExpectedException exception = ExpectedException.none();
-    executor.execute(() -> {
-      try {
-        aspect.advise(joinPoint, sagaStart);
-      } catch (Throwable throwable) {
-        exception.expect(OmegaTxTimeoutException.class);
-      }
-    });
-
-    await().atMost(1, SECONDS).until(() -> messages.size() == 2);
-
-    TxEvent event = messages.get(1);
-
-    assertThat(event.globalTxId(), is(globalTxId));
-    assertThat(event.localTxId(), is(globalTxId));
-    assertThat(event.parentTxId(), is(nullValue()));
-    assertThat(event.type(), is(EventType.TxAbortedEvent));
-
-    latch.countDown();
-
-    await().atMost(1, SECONDS).until(() -> omegaContext.localTxId() == null);
-
-    // no redundant ended message received
-    assertThat(messages.size(), is(2));
-  }
-
   private String doNothing() {
     return "doNothing";
   }
diff --git 
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
index 8689a1e..31d148f 100644
--- 
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
+++ 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
@@ -18,11 +18,8 @@
 package org.apache.servicecomb.saga.omega.transaction;
 
 import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.awaitility.Awaitility.await;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -32,9 +29,6 @@ import static org.mockito.Mockito.when;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 import javax.transaction.InvalidTransactionException;
 
@@ -69,8 +63,6 @@ public class TransactionAspectTest {
   private final OmegaContext omegaContext = new OmegaContext(idGenerator);
   private final TransactionAspect aspect = new TransactionAspect(sender, 
omegaContext);
 
-  private final ExecutorService executor = Executors.newSingleThreadExecutor();
-
   @Before
   public void setUp() throws Exception {
     when(idGenerator.nextId()).thenReturn(newLocalTxId);
@@ -131,46 +123,6 @@ public class TransactionAspectTest {
   }
 
   @Test
-  public void sendsAbortEventOnTimeout() throws Throwable {
-    CountDownLatch latch = new CountDownLatch(1);
-    when(compensable.timeout()).thenReturn(100);
-    when(joinPoint.proceed()).thenAnswer(invocationOnMock -> {
-      latch.await();
-      assertThat(omegaContext.localTxId(), is(newLocalTxId));
-      return null;
-    });
-
-    ExpectedException exception = ExpectedException.none();
-    executor.execute(() -> {
-      try {
-        // need to setup the thread local for it
-        omegaContext.setGlobalTxId(globalTxId);
-        omegaContext.setLocalTxId(localTxId);
-
-        aspect.advise(joinPoint, compensable);
-      } catch (Throwable throwable) {
-        exception.expect(OmegaTxTimeoutException.class);
-      }
-    });
-
-    await().atMost(1, SECONDS).until(() -> messages.size() == 2);
-
-    TxEvent event = messages.get(1);
-
-    assertThat(event.globalTxId(), is(globalTxId));
-    assertThat(event.localTxId(), is(newLocalTxId));
-    assertThat(event.parentTxId(), is(localTxId));
-    assertThat(event.type(), is(EventType.TxAbortedEvent));
-
-    latch.countDown();
-
-    await().atMost(1, SECONDS).until(() -> 
localTxId.equals(omegaContext.localTxId()));
-
-    // no redundant ended message received
-    assertThat(messages.size(), is(2));
-  }
-
-  @Test
   public void returnImmediatelyWhenReceivedRejectResponse() throws Throwable {
     MessageSender sender = mock(MessageSender.class);
     when(sender.send(any())).thenReturn(new AlphaResponse(true));

-- 
To stop receiving notification emails like this one, please contact
ningji...@apache.org.

Reply via email to