This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 2c395dab2f19125738d5132ac51ba27b45559ae8 Author: seanyinx <[email protected]> AuthorDate: Mon Jan 8 16:22:21 2018 +0800 SCB-168 exception free connection & disconnection Signed-off-by: seanyinx <[email protected]> --- .../connector/grpc/GrpcClientMessageSender.java | 11 +++++++- .../grpc/LoadBalancedClusterMessageSender.java | 21 ++++++++++++--- .../grpc/LoadBalancedClusterMessageSenderTest.java | 31 ++++++++++++++++++++++ .../saga/omega/transaction/MessageSender.java | 4 +++ 4 files changed, 63 insertions(+), 4 deletions(-) diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java index 59fbce1..4729db1 100644 --- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java +++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java @@ -39,6 +39,7 @@ import io.grpc.ManagedChannel; public class GrpcClientMessageSender implements MessageSender { + private final String target; private final TxEventServiceStub asyncEventService; private final MessageSerializer serializer; @@ -47,11 +48,14 @@ public class GrpcClientMessageSender implements MessageSender { private final GrpcCompensateStreamObserver compensateStreamObserver; private final GrpcServiceConfig serviceConfig; - public GrpcClientMessageSender(ManagedChannel channel, + public GrpcClientMessageSender( + String address, + ManagedChannel channel, MessageSerializer serializer, MessageDeserializer deserializer, ServiceConfig serviceConfig, MessageHandler handler) { + this.target = address; this.asyncEventService = TxEventServiceGrpc.newStub(channel); this.blockingEventService = TxEventServiceGrpc.newBlockingStub(channel); this.serializer = serializer; @@ -71,6 +75,11 @@ public class GrpcClientMessageSender implements MessageSender { } @Override + public String target() { + return target; + } + + @Override public void send(TxEvent event) { blockingEventService.onTxEvent(convertEvent(event)); } diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java index e5c9d19..14ce340 100644 --- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java +++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java @@ -63,7 +63,9 @@ public class LoadBalancedClusterMessageSender implements MessageSender { channels.add(channel); senders.put( - new GrpcClientMessageSender(channel, + new GrpcClientMessageSender( + address, + channel, serializer, deserializer, serviceConfig, @@ -81,12 +83,25 @@ public class LoadBalancedClusterMessageSender implements MessageSender { @Override public void onConnected() { - senders.keySet().forEach(MessageSender::onConnected); + senders.keySet().forEach(sender -> { + try { + sender.onConnected(); + } catch (Exception e) { + log.error("Failed connecting to alpha at {}", sender.target(), e); + } + }); } @Override public void onDisconnected() { - senders.keySet().forEach(MessageSender::onDisconnected); + senders.keySet().forEach(sender -> { + try { + sender.onDisconnected(); + } catch (Exception e) { + log.error("Failed disconnecting from alpha at {}", sender.target(), e); + } + }); + } @Override diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java index 6f2d90b..dc75663 100644 --- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java +++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java @@ -28,6 +28,7 @@ import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.verify; import java.io.IOException; import java.util.ArrayList; @@ -184,6 +185,36 @@ public class LoadBalancedClusterMessageSenderTest { } @Test + public void swallowException_UntilAllSendersConnected() throws Exception { + MessageSender underlying1 = Mockito.mock(MessageSender.class); + doThrow(RuntimeException.class).when(underlying1).onConnected(); + + MessageSender underlying2 = Mockito.mock(MessageSender.class); + + MessageSender sender = new LoadBalancedClusterMessageSender(underlying1, underlying2); + + sender.onConnected(); + + verify(underlying1).onConnected(); + verify(underlying2).onConnected(); + } + + @Test + public void swallowException_UntilAllSendersDisconnected() throws Exception { + MessageSender underlying1 = Mockito.mock(MessageSender.class); + doThrow(RuntimeException.class).when(underlying1).onDisconnected(); + + MessageSender underlying2 = Mockito.mock(MessageSender.class); + + MessageSender sender = new LoadBalancedClusterMessageSender(underlying1, underlying2); + + sender.onDisconnected(); + + verify(underlying1).onDisconnected(); + verify(underlying2).onDisconnected(); + } + + @Test public void considerFasterServerFirst() throws Exception { // we don't know which server is selected at first messageSender.send(event); diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java index 39b4d62..6a4ede6 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java @@ -27,5 +27,9 @@ public interface MessageSender { default void close() { } + default String target() { + return "UNKNOWN"; + } + void send(TxEvent event); } -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
