This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 49235b50798956ffdd1a65feb503cf9c1bfb7baa
Author: seanyinx <sean....@huawei.com>
AuthorDate: Tue Jan 2 16:29:43 2018 +0800

    SCB-149 pushed failed compensations to a scheduled task queue
    
    Signed-off-by: seanyinx <sean....@huawei.com>
---
 .../saga/alpha/core/PendingTaskRunner.java         | 45 ++++++++++++
 ...egaCallback.java => PushBackOmegaCallback.java} | 38 ++++-------
 .../saga/alpha/core/TxConsistentService.java       |  1 -
 .../saga/alpha/core/PendingTaskRunnerTest.java     | 60 ++++++++++++++++
 .../saga/alpha/core/PushBackOmegaCallbackTest.java | 65 ++++++++++++++++++
 .../saga/alpha/core/RetryOmegaCallbackTest.java    | 79 ----------------------
 .../servicecomb/saga/alpha/core/TxEventMaker.java  | 36 ++++++++++
 .../servicecomb/saga/alpha/server/AlphaConfig.java | 23 +++++--
 8 files changed, 235 insertions(+), 112 deletions(-)

diff --git 
a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/PendingTaskRunner.java
 
b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/PendingTaskRunner.java
new file mode 100644
index 0000000..c9a06fa
--- /dev/null
+++ 
b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/PendingTaskRunner.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.servicecomb.saga.alpha.core;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+public class PendingTaskRunner {
+  private final BlockingQueue<Runnable> pendingTasks;
+  private final int delay;
+
+  public PendingTaskRunner(BlockingQueue<Runnable> pendingTasks, int delay) {
+    this.pendingTasks = pendingTasks;
+    this.delay = delay;
+  }
+
+  public Future<?> run() {
+    return Executors.newSingleThreadScheduledExecutor()
+        .scheduleWithFixedDelay(() -> {
+          try {
+            pendingTasks.take().run();
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+          }
+        }, 0, delay, MILLISECONDS);
+  }
+}
diff --git 
a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/RetryOmegaCallback.java
 
b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/PushBackOmegaCallback.java
similarity index 58%
rename from 
alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/RetryOmegaCallback.java
rename to 
alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/PushBackOmegaCallback.java
index 6f1a7dd..8403af0 100644
--- 
a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/RetryOmegaCallback.java
+++ 
b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/PushBackOmegaCallback.java
@@ -18,49 +18,35 @@
 package io.servicecomb.saga.alpha.core;
 
 import java.lang.invoke.MethodHandles;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.BlockingQueue;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class RetryOmegaCallback implements OmegaCallback {
+public class PushBackOmegaCallback implements OmegaCallback {
   private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  private static final String ERROR_MESSAGE = "Failed to compensate service 
[{}] instance [{}] with method [{}], global tx id [{}] and local tx id [{}]";
 
+  private final BlockingQueue<Runnable> pendingCompensations;
   private final OmegaCallback underlying;
-  private final int delay;
 
-  public RetryOmegaCallback(OmegaCallback underlying, int delay) {
+  public PushBackOmegaCallback(BlockingQueue<Runnable> pendingCompensations, 
OmegaCallback underlying) {
+    this.pendingCompensations = pendingCompensations;
     this.underlying = underlying;
-    this.delay = delay;
   }
 
   @Override
   public void compensate(TxEvent event) {
-    boolean success = false;
-    do {
-      try {
-        underlying.compensate(event);
-        success = true;
-      } catch (Exception e) {
-        logError(ERROR_MESSAGE, event, e);
-        sleep(event);
-      }
-    } while (!success && !Thread.currentThread().isInterrupted());
-  }
-
-  private void sleep(TxEvent event) {
     try {
-      TimeUnit.MILLISECONDS.sleep(delay);
-    } catch (InterruptedException e) {
-      logError(ERROR_MESSAGE + " due to interruption", event, e);
-
-      Thread.currentThread().interrupt();
+      underlying.compensate(event);
+    } catch (Exception e) {
+      logError(event, e);
+      pendingCompensations.offer(() -> compensate(event));
     }
   }
 
-  private void logError(String message, TxEvent event, Exception e) {
-    log.error(message,
+  private void logError(TxEvent event, Exception e) {
+    log.error(
+        "Failed to compensate service [{}] instance [{}] with method [{}], 
global tx id [{}] and local tx id [{}]",
         event.serviceName(),
         event.instanceId(),
         event.compensationMethod(),
diff --git 
a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java
 
b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java
index 5dcb7bc..6fd9193 100644
--- 
a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java
+++ 
b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -46,7 +46,6 @@ public class TxConsistentService {
     CompletableFuture.runAsync(() -> eventCallbacks.getOrDefault(event.type(), 
DO_NOTHING_CONSUMER).accept(event));
   }
 
-  // TODO: 2017/12/27 we must define a way to find which service to 
compensate, to avoid sending to all
   private void compensate(TxEvent event) {
     List<TxEvent> events = 
eventRepository.findStartedTransactions(event.globalTxId(), 
TxStartedEvent.name());
     events.forEach(omegaCallback::compensate);
diff --git 
a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/PendingTaskRunnerTest.java
 
b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/PendingTaskRunnerTest.java
new file mode 100644
index 0000000..d806eec
--- /dev/null
+++ 
b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/PendingTaskRunnerTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.servicecomb.saga.alpha.core;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.junit.Test;
+
+public class PendingTaskRunnerTest {
+  private final List<String> messages = new ArrayList<>();
+  private final BlockingQueue<Runnable> runnables = new 
LinkedBlockingQueue<>();
+  private final PendingTaskRunner taskRunner = new 
PendingTaskRunner(runnables, 10);
+
+  @Test
+  public void burnsAllTasksInQueue() throws Exception {
+    runnables.offer(() -> messages.add("hello"));
+    runnables.offer(() -> messages.add("world"));
+
+    taskRunner.run();
+
+    await().atMost(500, MILLISECONDS).until(runnables::isEmpty);
+
+    assertThat(messages, contains("hello", "world"));
+  }
+
+  @Test
+  public void exitOnInterruption() throws Exception {
+    taskRunner.run().cancel(true);
+
+    runnables.offer(() -> messages.add("hello"));
+    Thread.sleep(300);
+
+    assertThat(runnables.isEmpty(), is(false));
+    assertThat(messages.isEmpty(), is(true));
+  }
+}
diff --git 
a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/PushBackOmegaCallbackTest.java
 
b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/PushBackOmegaCallbackTest.java
new file mode 100644
index 0000000..f53624c
--- /dev/null
+++ 
b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/PushBackOmegaCallbackTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.servicecomb.saga.alpha.core;
+
+import static io.servicecomb.saga.alpha.core.TxEventMaker.someEvent;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class PushBackOmegaCallbackTest {
+  private static final Runnable NO_OP_RUNNABLE = () -> {
+  };
+
+  private final OmegaCallback underlying = Mockito.mock(OmegaCallback.class);
+  private final BlockingQueue<Runnable> runnables = new 
LinkedBlockingQueue<>();
+  private final PushBackOmegaCallback pushBack = new 
PushBackOmegaCallback(runnables, underlying);
+
+  @Before
+  public void setUp() throws Exception {
+    runnables.offer(NO_OP_RUNNABLE);
+  }
+
+  @Test
+  public void pushFailedCallbackToEndOfQueue() throws Exception {
+    TxEvent event = someEvent();
+    
doThrow(AlphaException.class).doThrow(AlphaException.class).doNothing().when(underlying).compensate(event);
+
+    pushBack.compensate(event);
+
+    assertThat(runnables.size(), is(2));
+    assertThat(runnables.poll(), is(NO_OP_RUNNABLE));
+
+    // failed again and pushed back itself to queue
+    runnables.poll().run();
+    assertThat(runnables.size(), is(1));
+
+    runnables.poll().run();
+
+    verify(underlying, times(3)).compensate(event);
+  }
+}
diff --git 
a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/RetryOmegaCallbackTest.java
 
b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/RetryOmegaCallbackTest.java
deleted file mode 100644
index 27cc16f..0000000
--- 
a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/RetryOmegaCallbackTest.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.servicecomb.saga.alpha.core;
-
-import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
-import static org.mockito.Mockito.atMost;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import java.util.Date;
-import java.util.UUID;
-
-import org.junit.Test;
-import org.mockito.Mockito;
-
-public class RetryOmegaCallbackTest {
-  private final int delay = 100;
-  private final OmegaCallback underlying = Mockito.mock(OmegaCallback.class);
-  private final RetryOmegaCallback callback = new 
RetryOmegaCallback(underlying, delay);
-
-  @Test
-  public void retryOnFailure() throws Exception {
-    TxEvent event = someEvent();
-
-    doThrow(AlphaException.class)
-        .doThrow(AlphaException.class)
-        .doNothing()
-        .when(underlying)
-        .compensate(event);
-
-    callback.compensate(event);
-
-    verify(underlying, times(3)).compensate(event);
-  }
-
-  @Test
-  public void exitOnInterruption() throws Exception {
-    TxEvent event = someEvent();
-
-    doThrow(AlphaException.class).when(underlying).compensate(event);
-
-    Thread thread = new Thread(() -> callback.compensate(event));
-    thread.start();
-
-    Thread.sleep(300);
-    thread.interrupt();
-
-    verify(underlying, atMost(4)).compensate(event);
-  }
-
-  private TxEvent someEvent() {
-    return new TxEvent(
-        uniquify("serviceName"),
-        uniquify("instanceId"),
-        new Date(),
-        uniquify("globalTxId"),
-        uniquify("localTxId"),
-        UUID.randomUUID().toString(),
-        EventType.TxStartedEvent.name(),
-        getClass().getCanonicalName(),
-        uniquify("blah").getBytes());
-  }
-}
diff --git 
a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxEventMaker.java
 
b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxEventMaker.java
new file mode 100644
index 0000000..77ef920
--- /dev/null
+++ 
b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxEventMaker.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.servicecomb.saga.alpha.core;import static 
com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+
+import java.util.Date;
+import java.util.UUID;
+
+class TxEventMaker {
+  static TxEvent someEvent() {
+    return new TxEvent(
+        uniquify("serviceName"),
+        uniquify("instanceId"),
+        new Date(),
+        uniquify("globalTxId"),
+        uniquify("localTxId"),
+        UUID.randomUUID().toString(),
+        EventType.TxStartedEvent.name(),
+        TxEventMaker.class.getCanonicalName(),
+        uniquify("blah").getBytes());
+  }
+}
diff --git 
a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java
 
b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java
index f44c951..3685894 100644
--- 
a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java
+++ 
b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -18,7 +18,11 @@
 package io.servicecomb.saga.alpha.server;
 
 import java.util.Map;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.annotation.PostConstruct;
 
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
@@ -26,12 +30,17 @@ import org.springframework.context.annotation.Configuration;
 
 import io.servicecomb.saga.alpha.core.CompositeOmegaCallback;
 import io.servicecomb.saga.alpha.core.OmegaCallback;
-import io.servicecomb.saga.alpha.core.RetryOmegaCallback;
+import io.servicecomb.saga.alpha.core.PendingTaskRunner;
+import io.servicecomb.saga.alpha.core.PushBackOmegaCallback;
 import io.servicecomb.saga.alpha.core.TxConsistentService;
 import io.servicecomb.saga.alpha.core.TxEventRepository;
 
 @Configuration
 class AlphaConfig {
+  private final BlockingQueue<Runnable> pendingCompensations = new 
LinkedBlockingQueue<>();
+
+  @Value("${alpha.compensation.retry.delay:3000}")
+  private int delay;
 
   // TODO: 2017/12/27 to be filled with actual callbacks on completion of 
SCB-138
   @Bean
@@ -40,11 +49,8 @@ class AlphaConfig {
   }
 
   @Bean
-  OmegaCallback omegaCallback(
-      Map<String, Map<String, OmegaCallback>> callbacks,
-      @Value("${alpha.compensation.retry.delay:3000}") int delay) {
-
-    return new RetryOmegaCallback(new CompositeOmegaCallback(callbacks), 
delay);
+  OmegaCallback omegaCallback(Map<String, Map<String, OmegaCallback>> 
callbacks) {
+    return new PushBackOmegaCallback(pendingCompensations, new 
CompositeOmegaCallback(callbacks));
   }
   
   @Bean
@@ -68,4 +74,9 @@ class AlphaConfig {
                 eventRepository,
                 omegaCallback)));
   }
+
+  @PostConstruct
+  void init() {
+    new PendingTaskRunner(pendingCompensations, delay).run();
+  }
 }

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <commits@servicecomb.apache.org>.

Reply via email to