This is an automated email from the ASF dual-hosted git repository. seanyinx pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 335bf7a8b3fb1104fa0a5db79a474c7bc129f8ae Author: Eric Lee <[email protected]> AuthorDate: Tue Jan 16 15:42:30 2018 +0800 SCB-234 fail fast SagaStartedEvent when all alpha servers are down Signed-off-by: Eric Lee <[email protected]> --- .../grpc/LoadBalancedClusterMessageSender.java | 20 ++--- .../connector/grpc/RetryableMessageSender.java | 47 ++++++++++++ .../grpc/LoadBalancedClusterMessageSenderTest.java | 10 ++- .../connector/grpc/RetryableMessageSenderTest.java | 85 ++++++++++++++++++++++ .../saga/omega/spring/OmegaSpringConfig.java | 10 ++- 5 files changed, 160 insertions(+), 12 deletions(-) diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java index 700864a..b518524 100644 --- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java +++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java @@ -53,7 +53,8 @@ public class LoadBalancedClusterMessageSender implements MessageSender { private final Collection<ManagedChannel> channels; private final BlockingQueue<Runnable> pendingTasks = new LinkedBlockingQueue<>(); - private final BlockingQueue<MessageSender> availableMessageSenders = new LinkedBlockingQueue<>(); + private final BlockingQueue<MessageSender> availableMessageSenders; + private final MessageSender retryableMessageSender; private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); public LoadBalancedClusterMessageSender(String[] addresses, @@ -61,12 +62,17 @@ public class LoadBalancedClusterMessageSender implements MessageSender { MessageDeserializer deserializer, ServiceConfig serviceConfig, MessageHandler handler, - int reconnectDelay) { + int reconnectDelay, + BlockingQueue<MessageSender> availableMessageSenders, + MessageSender retryableMessageSender) { if (addresses.length == 0) { throw new IllegalArgumentException("No reachable cluster address provided"); } + this.availableMessageSenders = availableMessageSenders; + this.retryableMessageSender = retryableMessageSender; + channels = new ArrayList<>(addresses.length); for (String address : addresses) { ManagedChannel channel = ManagedChannelBuilder.forTarget(address) @@ -95,6 +101,8 @@ public class LoadBalancedClusterMessageSender implements MessageSender { senders.put(sender, 0L); } channels = emptyList(); + availableMessageSenders = new LinkedBlockingQueue<>(); + retryableMessageSender = new RetryableMessageSender(availableMessageSenders); } @Override @@ -155,13 +163,7 @@ public class LoadBalancedClusterMessageSender implements MessageSender { .filter(entry -> entry.getValue() < Long.MAX_VALUE) .min(Comparator.comparingLong(Entry::getValue)) .map(Entry::getKey) - .orElse(event -> { - try { - return availableMessageSenders.take().send(event); - } catch (InterruptedException e) { - throw new OmegaException("Failed to send event " + event + " due to interruption", e); - } - }); + .orElse(retryableMessageSender); } private void scheduleReconnectTask(int reconnectDelay) { diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSender.java new file mode 100644 index 0000000..abce82b --- /dev/null +++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSender.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.saga.omega.connector.grpc; + +import static org.apache.servicecomb.saga.common.EventType.SagaStartedEvent; + +import java.util.concurrent.BlockingQueue; + +import org.apache.servicecomb.saga.omega.transaction.AlphaResponse; +import org.apache.servicecomb.saga.omega.transaction.MessageSender; +import org.apache.servicecomb.saga.omega.transaction.OmegaException; +import org.apache.servicecomb.saga.omega.transaction.TxEvent; + +public class RetryableMessageSender implements MessageSender { + private final BlockingQueue<MessageSender> availableMessageSenders; + + public RetryableMessageSender(BlockingQueue<MessageSender> availableMessageSenders) { + this.availableMessageSenders = availableMessageSenders; + } + + @Override + public AlphaResponse send(TxEvent event) { + if (event.type() == SagaStartedEvent) { + throw new OmegaException("Failed to process subsequent requests because no alpha server is available"); + } + try { + return availableMessageSenders.take().send(event); + } catch (InterruptedException e) { + throw new OmegaException("Failed to send event " + event + " due to interruption", e); + } + } +} diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java index 93cb854..315c5ae 100644 --- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java +++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java @@ -37,7 +37,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; import org.apache.servicecomb.saga.common.EventType; import org.apache.servicecomb.saga.omega.context.ServiceConfig; @@ -101,6 +103,8 @@ public class LoadBalancedClusterMessageSenderTest { private final String serviceName = uniquify("serviceName"); private final String[] addresses = {"localhost:8080", "localhost:8090"}; + private final BlockingQueue<MessageSender> availableMessageSenders = new LinkedBlockingQueue<>(); + private final MessageSender retryableMessageSender = new RetryableMessageSender(availableMessageSenders); private final MessageSender messageSender = newMessageSender(addresses); private MessageSender newMessageSender(String[] addresses) { @@ -110,7 +114,9 @@ public class LoadBalancedClusterMessageSenderTest { deserializer, new ServiceConfig(serviceName), handler, - 100); + 100, + availableMessageSenders, + retryableMessageSender); } @BeforeClass @@ -156,7 +162,7 @@ public class LoadBalancedClusterMessageSenderTest { assertThat(eventsMap.get(deadPort).size(), is(1)); assertThat(eventsMap.get(deadPort).peek().toString(), is(event.toString())); - int livePort = deadPort == 8080? 8090 : 8080; + int livePort = deadPort == 8080 ? 8090 : 8080; assertThat(eventsMap.get(livePort).size(), is(2)); assertThat(eventsMap.get(livePort).peek().toString(), is(event.toString())); diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java new file mode 100644 index 0000000..7ffbf9a --- /dev/null +++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.saga.omega.connector.grpc; + +import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing; +import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.concurrent.BlockingQueue; + +import org.apache.servicecomb.saga.omega.transaction.MessageSender; +import org.apache.servicecomb.saga.omega.transaction.OmegaException; +import org.apache.servicecomb.saga.omega.transaction.SagaStartedEvent; +import org.apache.servicecomb.saga.omega.transaction.TxEvent; +import org.apache.servicecomb.saga.omega.transaction.TxStartedEvent; +import org.junit.Test; + +public class RetryableMessageSenderTest { + @SuppressWarnings("unchecked") + private final BlockingQueue<MessageSender> availableMessageSenders = mock(BlockingQueue.class); + private final MessageSender messageSender = new RetryableMessageSender(availableMessageSenders); + + private final String globalTxId = uniquify("globalTxId"); + private final String localTxId = uniquify("localTxId"); + private final TxStartedEvent event = new TxStartedEvent(globalTxId, localTxId, null, "method x"); + + @Test + public void sendEventWhenSenderIsAvailable() throws InterruptedException { + MessageSender sender = mock(MessageSender.class); + when(availableMessageSenders.take()).thenReturn(sender); + + messageSender.send(event); + + verify(sender, times(1)).send(event); + } + + @Test + public void blowsUpWhenEventIsSagaStarted() { + TxEvent event = new SagaStartedEvent(globalTxId, localTxId); + + try { + messageSender.send(event); + expectFailing(OmegaException.class); + } catch (OmegaException e) { + assertThat(e.getMessage(), + is("Failed to process subsequent requests because no alpha server is available")); + } + } + + @Test + public void blowsUpWhenInterrupted() throws InterruptedException { + Thread thread = new Thread(() -> { + try { + messageSender.send(event); + expectFailing(OmegaException.class); + } catch (OmegaException e) { + assertThat(e.getMessage().endsWith("interruption"), is(true)); + } + }); + + thread.start(); + thread.interrupt(); + thread.join(); + } +} \ No newline at end of file diff --git a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java index fa4027b..78321a4 100644 --- a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java +++ b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java @@ -17,7 +17,11 @@ package org.apache.servicecomb.saga.omega.spring; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + import org.apache.servicecomb.saga.omega.connector.grpc.LoadBalancedClusterMessageSender; +import org.apache.servicecomb.saga.omega.connector.grpc.RetryableMessageSender; import org.apache.servicecomb.saga.omega.context.CompensationContext; import org.apache.servicecomb.saga.omega.context.IdGenerator; import org.apache.servicecomb.saga.omega.context.OmegaContext; @@ -63,13 +67,17 @@ class OmegaSpringConfig { @Lazy MessageHandler handler) { MessageFormat messageFormat = new KryoMessageFormat(); + BlockingQueue<MessageSender> availableMessageSenders = new LinkedBlockingQueue<>(); + MessageSender retryableMessageSender = new RetryableMessageSender(availableMessageSenders); MessageSender sender = new LoadBalancedClusterMessageSender( addresses, messageFormat, messageFormat, serviceConfig, handler, - reconnectDelay); + reconnectDelay, + availableMessageSenders, + retryableMessageSender); sender.onConnected(); Runtime.getRuntime().addShutdownHook(new Thread(() -> { -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
