[ 
https://issues.apache.org/jira/browse/SCB-211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16325923#comment-16325923
 ] 

ASF GitHub Bot commented on SCB-211:
------------------------------------

seanyinx closed pull request #113: SCB-211 wait until connection recover 
instead of try sending
URL: https://github.com/apache/incubator-servicecomb-saga/pull/113
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
index 6eae8a34..73940ed7 100644
--- 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
+++ 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
@@ -24,10 +24,10 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
@@ -47,10 +47,11 @@
 
 public class LoadBalancedClusterMessageSender implements MessageSender {
   private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  private final Map<MessageSender, Long> senders = new HashMap<>();
+  private final Map<MessageSender, Long> senders = new ConcurrentHashMap<>();
   private final Collection<ManagedChannel> channels;
 
   private final BlockingQueue<Runnable> pendingTasks = new 
LinkedBlockingQueue<>();
+  private final BlockingQueue<MessageSender> availableMessageSenders = new 
LinkedBlockingQueue<>();
   private final ScheduledExecutorService scheduler = 
Executors.newSingleThreadScheduledExecutor();
 
   public LoadBalancedClusterMessageSender(String[] addresses,
@@ -146,9 +147,16 @@ public void send(TxEvent event) {
   private MessageSender fastestSender() {
     return senders.entrySet()
         .stream()
+        .filter(entry -> entry.getValue() < Long.MAX_VALUE)
         .min(Comparator.comparingLong(Entry::getValue))
         .map(Entry::getKey)
-        .orElse(NO_OP_SENDER);
+        .orElse((event -> {
+          try {
+            availableMessageSenders.take().send(event);
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+          }
+        }));
   }
 
   private void scheduleReconnectTask(int reconnectDelay) {
@@ -163,7 +171,7 @@ private void scheduleReconnectTask(int reconnectDelay) {
 
   private Function<MessageSender, Runnable> errorHandlerFactory() {
     return messageSender -> {
-      Runnable runnable = new PushBackReconnectRunnable(messageSender, 
senders, pendingTasks);
+      Runnable runnable = new PushBackReconnectRunnable(messageSender, 
senders, pendingTasks, availableMessageSenders);
       return () -> pendingTasks.offer(runnable);
     };
   }
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnable.java
 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnable.java
index d21e5682..f019d107 100644
--- 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnable.java
+++ 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnable.java
@@ -31,13 +31,17 @@
   private final Map<MessageSender, Long> senders;
   private final BlockingQueue<Runnable> pendingTasks;
 
+  private final BlockingQueue<MessageSender> connectedSenders;
+
   PushBackReconnectRunnable(
       MessageSender messageSender,
       Map<MessageSender, Long> senders,
-      BlockingQueue<Runnable> pendingTasks) {
+      BlockingQueue<Runnable> pendingTasks,
+      BlockingQueue<MessageSender> connectedSenders) {
     this.messageSender = messageSender;
     this.senders = senders;
     this.pendingTasks = pendingTasks;
+    this.connectedSenders = connectedSenders;
   }
 
   @Override
@@ -47,6 +51,7 @@ public void run() {
       messageSender.onDisconnected();
       messageSender.onConnected();
       senders.put(messageSender, 0L);
+      connectedSenders.offer(messageSender);
       log.info("Retry connecting to alpha at {} is successful", 
messageSender.target());
     } catch (Exception e) {
       log.error("Failed to reconnect to alpha at {}", messageSender.target(), 
e);
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
index 8fe5247f..d5f432fb 100644
--- 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
+++ 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
@@ -19,6 +19,7 @@
 
 import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static java.lang.Thread.State.WAITING;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.Matchers.contains;
@@ -26,6 +27,7 @@
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
@@ -84,9 +86,9 @@
   private final MessageDeserializer deserializer = message -> new Object[] 
{new String(message)};
 
   private final List<String> compensated = new ArrayList<>();
-  private final MessageHandler handler = (globalTxId, localTxId, parentTxId, 
compensationMethod, payloads) -> {
-    compensated.add(globalTxId);
-  };
+
+  private final MessageHandler handler = (globalTxId, localTxId, parentTxId, 
compensationMethod, payloads) ->
+      compensated.add(globalTxId);
 
   private final String globalTxId = uniquify("globalTxId");
   private final String localTxId = uniquify("localTxId");
@@ -170,7 +172,7 @@ public void resetLatencyOnReconnection() throws Exception {
     messageSender.send(event);
 
     startServerOnPort(deadPort);
-    await().atMost(1, SECONDS).until(() -> connected.get(deadPort).size() == 
3);
+    await().atMost(2, SECONDS).until(() -> connected.get(deadPort).size() == 
3);
 
     TxEvent abortedEvent = new TxAbortedEvent(globalTxId, localTxId, 
parentTxId, compensationMethod, new RuntimeException("oops"));
     messageSender.send(abortedEvent);
@@ -195,6 +197,9 @@ public void stopSendingOnInterruption() throws Exception {
 
     Thread.sleep(300);
 
+    // stop trying to send message out on exception
+    verify(underlying, times(1)).send(event);
+
     thread.interrupt();
     thread.join();
   }
@@ -266,6 +271,27 @@ public void blowsUpWhenNoServerAddressProvided() throws 
Exception {
     }
   }
 
+  @Test
+  public void stopSendingWhenClusterIsDown() throws Exception {
+    servers.values().forEach(Server::shutdownNow);
+    messageSender.onConnected();
+
+    Thread thread = new Thread(() -> messageSender.send(event));
+    thread.start();
+
+    // we don't want to keep sending on cluster down
+    await().atMost(2, SECONDS).until(() -> thread.isAlive() && 
thread.getState().equals(WAITING));
+
+    assertThat(eventsMap.get(8080).isEmpty(), is(true));
+    assertThat(eventsMap.get(8090).isEmpty(), is(true));
+
+    startServerOnPort(8080);
+    startServerOnPort(8090);
+
+    await().atMost(2, SECONDS).until(() -> connected.get(8080).size() == 2 || 
connected.get(8090).size() == 2);
+    await().atMost(2, SECONDS).until(() -> eventsMap.get(8080).size() == 1 || 
eventsMap.get(8090).size() == 1);
+  }
+
   private int killServerReceivedMessage() {
     for (int port : eventsMap.keySet()) {
       if (!eventsMap.get(port).isEmpty()) {
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnableTest.java
 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnableTest.java
index a5b51b1c..e6589466 100644
--- 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnableTest.java
+++ 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnableTest.java
@@ -19,8 +19,9 @@
 
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
@@ -32,16 +33,17 @@
 import org.apache.servicecomb.saga.omega.transaction.MessageSender;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.Mockito;
 
 public class PushBackReconnectRunnableTest {
   private static final Runnable NO_OP_RUNNABLE = () -> {
   };
 
-  private final MessageSender sender = Mockito.mock(MessageSender.class);
+  private final MessageSender sender = mock(MessageSender.class);
   private final BlockingQueue<Runnable> runnables = new 
LinkedBlockingQueue<>();
+  private final BlockingQueue<MessageSender> connectedSenders = new 
LinkedBlockingQueue<>();
   private final Map<MessageSender, Long> senders = new HashMap<>();
-  private final PushBackReconnectRunnable pushBack = new 
PushBackReconnectRunnable(sender, senders, runnables);
+
+  private final PushBackReconnectRunnable pushBack = new 
PushBackReconnectRunnable(sender, senders, runnables, connectedSenders);
 
   @Before
   public void setUp() throws Exception {
@@ -68,6 +70,7 @@ public void pushFailedCallbackToEndOfQueue() throws Exception 
{
 
     assertThat(runnables.isEmpty(), is(true));
     assertThat(senders.get(sender), is(0L));
+    assertThat(connectedSenders, contains(sender));
 
     verify(sender, times(3)).onDisconnected();
     verify(sender, times(1)).onConnected();
diff --git 
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
index e2a1c623..23703256 100644
--- 
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
+++ 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
@@ -29,8 +29,9 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.servicecomb.saga.omega.context.IdGenerator;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
@@ -56,6 +57,8 @@
   private final OmegaContext omegaContext = new OmegaContext(idGenerator);
   private final SagaStartAspect aspect = new SagaStartAspect(sender, 
omegaContext);
 
+  private final ExecutorService executor = Executors.newSingleThreadExecutor();
+
   @Before
   public void setUp() throws Exception {
     when(idGenerator.nextId()).thenReturn(globalTxId);
@@ -122,7 +125,7 @@ public void sendsAbortEventOnTimeout() throws Throwable {
       return null;
     });
 
-    CompletableFuture.runAsync(() -> {
+    executor.execute(() -> {
       try {
         aspect.advise(joinPoint, sagaStart);
       } catch (Throwable throwable) {
diff --git 
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
index dabfbc72..2d92d0f0 100644
--- 
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
+++ 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
@@ -28,8 +28,9 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.servicecomb.saga.omega.context.IdGenerator;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
@@ -58,6 +59,8 @@
   private final OmegaContext omegaContext = new OmegaContext(idGenerator);
   private final TransactionAspect aspect = new TransactionAspect(sender, 
omegaContext);
 
+  private final ExecutorService executor = Executors.newSingleThreadExecutor();
+
   @Before
   public void setUp() throws Exception {
     when(idGenerator.nextId()).thenReturn(newLocalTxId);
@@ -127,7 +130,7 @@ public void sendsAbortEventOnTimeout() throws Throwable {
       return null;
     });
 
-    CompletableFuture.runAsync(() -> {
+    executor.execute(() -> {
       try {
         // need to setup the thread local for it
         omegaContext.setGlobalTxId(globalTxId);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> [pack] exponential backoff reconnect on cluster down or network down
> --------------------------------------------------------------------
>
>                 Key: SCB-211
>                 URL: https://issues.apache.org/jira/browse/SCB-211
>             Project: Apache ServiceComb
>          Issue Type: Improvement
>          Components: Saga
>            Reporter: Yin Xiang
>            Assignee: Eric Lee
>            Priority: Major
>
> when omega is disconnected from alpha cluster, it keeps trying on failure of 
> sending event.
> an exponential back off retry is better for the sake of cpu resources and 
> disk space due to logs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to