This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 335bf7a8b3fb1104fa0a5db79a474c7bc129f8ae
Author: Eric Lee <[email protected]>
AuthorDate: Tue Jan 16 15:42:30 2018 +0800

    SCB-234 fail fast SagaStartedEvent when all alpha servers are down
    
    Signed-off-by: Eric Lee <[email protected]>
---
 .../grpc/LoadBalancedClusterMessageSender.java     | 20 ++---
 .../connector/grpc/RetryableMessageSender.java     | 47 ++++++++++++
 .../grpc/LoadBalancedClusterMessageSenderTest.java | 10 ++-
 .../connector/grpc/RetryableMessageSenderTest.java | 85 ++++++++++++++++++++++
 .../saga/omega/spring/OmegaSpringConfig.java       | 10 ++-
 5 files changed, 160 insertions(+), 12 deletions(-)

diff --git 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
index 700864a..b518524 100644
--- 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
+++ 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
@@ -53,7 +53,8 @@ public class LoadBalancedClusterMessageSender implements 
MessageSender {
   private final Collection<ManagedChannel> channels;
 
   private final BlockingQueue<Runnable> pendingTasks = new 
LinkedBlockingQueue<>();
-  private final BlockingQueue<MessageSender> availableMessageSenders = new 
LinkedBlockingQueue<>();
+  private final BlockingQueue<MessageSender> availableMessageSenders;
+  private final MessageSender retryableMessageSender;
   private final ScheduledExecutorService scheduler = 
Executors.newSingleThreadScheduledExecutor();
 
   public LoadBalancedClusterMessageSender(String[] addresses,
@@ -61,12 +62,17 @@ public class LoadBalancedClusterMessageSender implements 
MessageSender {
       MessageDeserializer deserializer,
       ServiceConfig serviceConfig,
       MessageHandler handler,
-      int reconnectDelay) {
+      int reconnectDelay,
+      BlockingQueue<MessageSender> availableMessageSenders,
+      MessageSender retryableMessageSender) {
 
     if (addresses.length == 0) {
       throw new IllegalArgumentException("No reachable cluster address 
provided");
     }
 
+    this.availableMessageSenders = availableMessageSenders;
+    this.retryableMessageSender = retryableMessageSender;
+
     channels = new ArrayList<>(addresses.length);
     for (String address : addresses) {
       ManagedChannel channel = ManagedChannelBuilder.forTarget(address)
@@ -95,6 +101,8 @@ public class LoadBalancedClusterMessageSender implements 
MessageSender {
       senders.put(sender, 0L);
     }
     channels = emptyList();
+    availableMessageSenders = new LinkedBlockingQueue<>();
+    retryableMessageSender = new 
RetryableMessageSender(availableMessageSenders);
   }
 
   @Override
@@ -155,13 +163,7 @@ public class LoadBalancedClusterMessageSender implements 
MessageSender {
         .filter(entry -> entry.getValue() < Long.MAX_VALUE)
         .min(Comparator.comparingLong(Entry::getValue))
         .map(Entry::getKey)
-        .orElse(event -> {
-          try {
-            return availableMessageSenders.take().send(event);
-          } catch (InterruptedException e) {
-            throw new OmegaException("Failed to send event " + event + " due 
to interruption", e);
-          }
-        });
+        .orElse(retryableMessageSender);
   }
 
   private void scheduleReconnectTask(int reconnectDelay) {
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSender.java
 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSender.java
new file mode 100644
index 0000000..abce82b
--- /dev/null
+++ 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSender.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc;
+
+import static org.apache.servicecomb.saga.common.EventType.SagaStartedEvent;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
+import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+import org.apache.servicecomb.saga.omega.transaction.OmegaException;
+import org.apache.servicecomb.saga.omega.transaction.TxEvent;
+
+public class RetryableMessageSender implements MessageSender {
+  private final BlockingQueue<MessageSender> availableMessageSenders;
+
+  public RetryableMessageSender(BlockingQueue<MessageSender> 
availableMessageSenders) {
+    this.availableMessageSenders = availableMessageSenders;
+  }
+
+  @Override
+  public AlphaResponse send(TxEvent event) {
+    if (event.type() == SagaStartedEvent) {
+      throw new OmegaException("Failed to process subsequent requests because 
no alpha server is available");
+    }
+    try {
+      return availableMessageSenders.take().send(event);
+    } catch (InterruptedException e) {
+      throw new OmegaException("Failed to send event " + event + " due to 
interruption", e);
+    }
+  }
+}
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
index 93cb854..315c5ae 100644
--- 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
+++ 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
@@ -37,7 +37,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.servicecomb.saga.common.EventType;
 import org.apache.servicecomb.saga.omega.context.ServiceConfig;
@@ -101,6 +103,8 @@ public class LoadBalancedClusterMessageSenderTest {
 
   private final String serviceName = uniquify("serviceName");
   private final String[] addresses = {"localhost:8080", "localhost:8090"};
+  private final BlockingQueue<MessageSender> availableMessageSenders = new 
LinkedBlockingQueue<>();
+  private final MessageSender retryableMessageSender = new 
RetryableMessageSender(availableMessageSenders);
   private final MessageSender messageSender = newMessageSender(addresses);
 
   private MessageSender newMessageSender(String[] addresses) {
@@ -110,7 +114,9 @@ public class LoadBalancedClusterMessageSenderTest {
         deserializer,
         new ServiceConfig(serviceName),
         handler,
-        100);
+        100,
+        availableMessageSenders,
+        retryableMessageSender);
   }
 
   @BeforeClass
@@ -156,7 +162,7 @@ public class LoadBalancedClusterMessageSenderTest {
     assertThat(eventsMap.get(deadPort).size(), is(1));
     assertThat(eventsMap.get(deadPort).peek().toString(), 
is(event.toString()));
 
-    int livePort = deadPort == 8080? 8090 : 8080;
+    int livePort = deadPort == 8080 ? 8090 : 8080;
     assertThat(eventsMap.get(livePort).size(), is(2));
     assertThat(eventsMap.get(livePort).peek().toString(), 
is(event.toString()));
 
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
new file mode 100644
index 0000000..7ffbf9a
--- /dev/null
+++ 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc;
+
+import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
+import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+import org.apache.servicecomb.saga.omega.transaction.OmegaException;
+import org.apache.servicecomb.saga.omega.transaction.SagaStartedEvent;
+import org.apache.servicecomb.saga.omega.transaction.TxEvent;
+import org.apache.servicecomb.saga.omega.transaction.TxStartedEvent;
+import org.junit.Test;
+
+public class RetryableMessageSenderTest {
+  @SuppressWarnings("unchecked")
+  private final BlockingQueue<MessageSender> availableMessageSenders = 
mock(BlockingQueue.class);
+  private final MessageSender messageSender = new 
RetryableMessageSender(availableMessageSenders);
+
+  private final String globalTxId = uniquify("globalTxId");
+  private final String localTxId = uniquify("localTxId");
+  private final TxStartedEvent event = new TxStartedEvent(globalTxId, 
localTxId, null, "method x");
+
+  @Test
+  public void sendEventWhenSenderIsAvailable() throws InterruptedException {
+    MessageSender sender = mock(MessageSender.class);
+    when(availableMessageSenders.take()).thenReturn(sender);
+
+    messageSender.send(event);
+
+    verify(sender, times(1)).send(event);
+  }
+
+  @Test
+  public void blowsUpWhenEventIsSagaStarted() {
+    TxEvent event = new SagaStartedEvent(globalTxId, localTxId);
+
+    try {
+      messageSender.send(event);
+      expectFailing(OmegaException.class);
+    } catch (OmegaException e) {
+      assertThat(e.getMessage(),
+          is("Failed to process subsequent requests because no alpha server is 
available"));
+    }
+  }
+
+  @Test
+  public void blowsUpWhenInterrupted() throws InterruptedException {
+    Thread thread = new Thread(() -> {
+      try {
+        messageSender.send(event);
+        expectFailing(OmegaException.class);
+      } catch (OmegaException e) {
+        assertThat(e.getMessage().endsWith("interruption"), is(true));
+      }
+    });
+
+    thread.start();
+    thread.interrupt();
+    thread.join();
+  }
+}
\ No newline at end of file
diff --git 
a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
 
b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
index fa4027b..78321a4 100644
--- 
a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
+++ 
b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
@@ -17,7 +17,11 @@
 
 package org.apache.servicecomb.saga.omega.spring;
 
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
 import 
org.apache.servicecomb.saga.omega.connector.grpc.LoadBalancedClusterMessageSender;
+import org.apache.servicecomb.saga.omega.connector.grpc.RetryableMessageSender;
 import org.apache.servicecomb.saga.omega.context.CompensationContext;
 import org.apache.servicecomb.saga.omega.context.IdGenerator;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
@@ -63,13 +67,17 @@ class OmegaSpringConfig {
       @Lazy MessageHandler handler) {
 
     MessageFormat messageFormat = new KryoMessageFormat();
+    BlockingQueue<MessageSender> availableMessageSenders = new 
LinkedBlockingQueue<>();
+    MessageSender retryableMessageSender = new 
RetryableMessageSender(availableMessageSenders);
     MessageSender sender = new LoadBalancedClusterMessageSender(
         addresses,
         messageFormat,
         messageFormat,
         serviceConfig,
         handler,
-        reconnectDelay);
+        reconnectDelay,
+        availableMessageSenders,
+        retryableMessageSender);
 
     sender.onConnected();
     Runtime.getRuntime().addShutdownHook(new Thread(() -> {

-- 
To stop receiving notification emails like this one, please contact
"[email protected]" <[email protected]>.

Reply via email to