This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 01c4a1d415576ad9920fa84cd81ad37dc4f6c637 Author: seanyinx <[email protected]> AuthorDate: Mon Jan 8 16:05:15 2018 +0800 SCB-168 load balance based on server latency Signed-off-by: seanyinx <[email protected]> --- .../grpc/LoadBalancedClusterMessageSender.java | 128 +++++++++------------ .../grpc/LoadBalancedClusterMessageSenderTest.java | 84 +++++++++++--- .../saga/omega/spring/OmegaSpringConfig.java | 59 ++-------- .../saga/omega/transaction/MessageSender.java | 3 + 4 files changed, 141 insertions(+), 133 deletions(-) diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java index 276f887..e5c9d19 100644 --- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java +++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java @@ -17,13 +17,16 @@ package org.apache.servicecomb.saga.omega.connector.grpc; +import static java.util.Collections.emptyList; + import java.lang.invoke.MethodHandles; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.net.URI; -import java.util.Arrays; -import java.util.List; -import java.util.stream.Collectors; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.function.Consumer; import org.apache.servicecomb.saga.omega.context.ServiceConfig; import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer; @@ -34,51 +37,61 @@ import org.apache.servicecomb.saga.omega.transaction.TxEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.grpc.Attributes; -import io.grpc.EquivalentAddressGroup; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; -import io.grpc.NameResolver; -import io.grpc.NameResolver.Factory; -import io.grpc.util.RoundRobinLoadBalancerFactory; public class LoadBalancedClusterMessageSender implements MessageSender { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private final MessageSender messageSender; + private final Map<MessageSender, Long> senders = new HashMap<>(); + private final Collection<ManagedChannel> channels; - public LoadBalancedClusterMessageSender(String addresses, + public LoadBalancedClusterMessageSender(String[] addresses, MessageSerializer serializer, MessageDeserializer deserializer, ServiceConfig serviceConfig, MessageHandler handler) { - this(new GrpcClientMessageSender(clusterDirectAddressChannel(addresses), - serializer, - deserializer, - serviceConfig, - handler)); - } + if (addresses.length == 0) { + throw new IllegalArgumentException("No reachable cluster address provided"); + } - LoadBalancedClusterMessageSender(MessageSender messageSender) { - this.messageSender = messageSender; + channels = new ArrayList<>(addresses.length); + for (String address : addresses) { + ManagedChannel channel = ManagedChannelBuilder.forTarget(address) + .usePlaintext(true) + .build(); + + channels.add(channel); + senders.put( + new GrpcClientMessageSender(channel, + serializer, + deserializer, + serviceConfig, + handler), + 0L); + } } - private static ManagedChannel clusterDirectAddressChannel(String addresses) { - return ManagedChannelBuilder.forTarget(addresses) - .nameResolverFactory(new ClusterNameResolverFactory(addresses)) - .loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance()) - .usePlaintext(true) - .build(); + LoadBalancedClusterMessageSender(MessageSender... messageSenders) { + for (MessageSender sender : messageSenders) { + senders.put(sender, 0L); + } + channels = emptyList(); } @Override public void onConnected() { - messageSender.onConnected(); + senders.keySet().forEach(MessageSender::onConnected); } @Override public void onDisconnected() { - messageSender.onDisconnected(); + senders.keySet().forEach(MessageSender::onDisconnected); + } + + @Override + public void close() { + channels.forEach(ManagedChannel::shutdownNow); } @Override @@ -86,54 +99,27 @@ public class LoadBalancedClusterMessageSender implements MessageSender { boolean success = false; do { try { - messageSender.send(event); + withFastestSender(messageSender -> { + // very large latency on exception + senders.put(messageSender, Long.MAX_VALUE); + + long startTime = System.nanoTime(); + messageSender.send(event); + senders.put(messageSender, System.nanoTime() - startTime); + }); + success = true; } catch (Exception e) { log.error("Retry sending event {} due to failure", event, e); } } while (!success && !Thread.currentThread().isInterrupted()); - } - private static class ClusterNameResolverFactory extends Factory { - private final String addresses; - - private ClusterNameResolverFactory(String addresses) { - this.addresses = addresses; - } - - @Override - public NameResolver newNameResolver(URI targetUri, Attributes params) { - return new NameResolver() { - @Override - public String getServiceAuthority() { - return "localhost"; - } - - @Override - public void start(final Listener listener) { - List<SocketAddress> socketAddresses = Arrays.stream(addresses.split(",")) - .map(address -> { - String[] split = address.split(":"); - return new InetSocketAddress(split[0], Integer.parseInt(split[1])); - }) - .collect(Collectors.toList()); - - listener.onAddresses( - Arrays.asList(new EquivalentAddressGroup(socketAddresses.get(0)), - new EquivalentAddressGroup(socketAddresses.get(1))), - Attributes.EMPTY); - } - - @Override - public void shutdown() { - } - }; - } - - @Override - public String getDefaultScheme() { - return "directaddress"; - } + private void withFastestSender(Consumer<MessageSender> consumer) { + senders.entrySet() + .stream() + .min(Comparator.comparingLong(Entry::getValue)) + .map(Entry::getKey) + .ifPresent(consumer); } } diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java index 4fee0e6..6f2d90b 100644 --- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java +++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java @@ -17,10 +17,13 @@ package org.apache.servicecomb.saga.omega.connector.grpc; +import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing; import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify; import static java.util.concurrent.TimeUnit.SECONDS; import static org.awaitility.Awaitility.await; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; @@ -48,16 +51,19 @@ import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand; import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig; import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent; import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceImplBase; +import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; -import org.junit.Ignore; +import org.junit.FixMethodOrder; import org.junit.Test; +import org.junit.runners.MethodSorters; import org.mockito.Mockito; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; +@FixMethodOrder(MethodSorters.NAME_ASCENDING) public class LoadBalancedClusterMessageSenderTest { private static final int[] ports = {8080, 8090}; @@ -66,6 +72,11 @@ public class LoadBalancedClusterMessageSenderTest { private static final Queue<TxEvent> eventsOn8080 = new ConcurrentLinkedQueue<>(); private static final Queue<TxEvent> eventsOn8090 = new ConcurrentLinkedQueue<>(); + private static final Map<Integer, Integer> delays = new HashMap<Integer, Integer>() {{ + put(8080, 0); + put(8090, 100); + }}; + private static final Map<Integer, Set<String>> connected = new HashMap<Integer, Set<String>>() {{ put(8080, new ConcurrentSkipListSet<>()); put(8090, new ConcurrentSkipListSet<>()); @@ -76,8 +87,6 @@ public class LoadBalancedClusterMessageSenderTest { put(8090, eventsOn8090); }}; - private final String addresses = "localhost:8080,localhost:8090"; - private final MessageSerializer serializer = objects -> objects[0].toString().getBytes(); private final MessageDeserializer deserializer = message -> new Object[] {new String(message)}; @@ -91,18 +100,23 @@ public class LoadBalancedClusterMessageSenderTest { private final TxEvent event = new TxEvent(globalTxId, localTxId, parentTxId, compensationMethod, "blah"); private final String serviceName = uniquify("serviceName"); - private final MessageSender messageSender = new LoadBalancedClusterMessageSender( - addresses, - serializer, - deserializer, - new ServiceConfig(serviceName), - handler); + private final String[] addresses = {"localhost:8080", "localhost:8090"}; + private final MessageSender messageSender = newMessageSender(addresses); + + private MessageSender newMessageSender(String[] addresses) { + return new LoadBalancedClusterMessageSender( + addresses, + serializer, + deserializer, + new ServiceConfig(serviceName), + handler); + } @BeforeClass public static void beforeClass() throws Exception { Arrays.stream(ports).forEach(port -> { ServerBuilder<?> serverBuilder = ServerBuilder.forPort(port); - serverBuilder.addService(new MyTxEventService(connected.get(port), eventsMap.get(port))); + serverBuilder.addService(new MyTxEventService(connected.get(port), eventsMap.get(port), delays.get(port))); Server server = serverBuilder.build(); try { @@ -119,8 +133,14 @@ public class LoadBalancedClusterMessageSenderTest { servers.forEach(Server::shutdown); } + @After + public void after() throws Exception { + eventsOn8080.clear(); + eventsOn8090.clear(); + } + @Test - public void reconnectOnConnectionLoss() throws Exception { + public void resendToAnotherServerOnFailure() throws Exception { messageSender.send(event); killServerReceivedMessage(); @@ -150,7 +170,6 @@ public class LoadBalancedClusterMessageSenderTest { thread.join(); } - @Ignore @Test public void broadcastConnectionAndDisconnection() throws Exception { messageSender.onConnected(); @@ -164,6 +183,29 @@ public class LoadBalancedClusterMessageSenderTest { assertThat(connected.get(8090).isEmpty(), is(true)); } + @Test + public void considerFasterServerFirst() throws Exception { + // we don't know which server is selected at first + messageSender.send(event); + + // but now we only send to the one with lowest latency + messageSender.send(event); + messageSender.send(event); + + assertThat(eventsOn8080.size(), greaterThanOrEqualTo(2)); + assertThat(eventsOn8090.size(), lessThanOrEqualTo(1)); + } + + @Test + public void blowsUpWhenNoServerAddressProvided() throws Exception { + try { + newMessageSender(new String[0]); + expectFailing(IllegalArgumentException.class); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), is("No reachable cluster address provided")); + } + } + private void killServerReceivedMessage() { int index = 0; for (int port : eventsMap.keySet()) { @@ -177,15 +219,17 @@ public class LoadBalancedClusterMessageSenderTest { private static class MyTxEventService extends TxEventServiceImplBase { private final Set<String> connected; private final Queue<TxEvent> events; + private final int delay; - private MyTxEventService(Set<String> connected, Queue<TxEvent> events) { + private MyTxEventService(Set<String> connected, Queue<TxEvent> events, int delay) { this.connected = connected; this.events = events; + this.delay = delay; } @Override public void onConnected(GrpcServiceConfig request, StreamObserver<GrpcCompensateCommand> responseObserver) { - connected.add(request.getInstanceId()); + connected.add(request.getServiceName()); } @Override @@ -197,13 +241,23 @@ public class LoadBalancedClusterMessageSenderTest { request.getCompensationMethod(), new String(request.getPayloads().toByteArray()))); + sleep(); + responseObserver.onNext(GrpcAck.newBuilder().build()); responseObserver.onCompleted(); } + private void sleep() { + try { + Thread.sleep(delay); + } catch (InterruptedException e) { + fail(e.getMessage()); + } + } + @Override public void onDisconnected(GrpcServiceConfig request, StreamObserver<GrpcAck> responseObserver) { - connected.remove(request.getInstanceId()); + connected.remove(request.getServiceName()); responseObserver.onNext(GrpcAck.newBuilder().build()); responseObserver.onCompleted(); } diff --git a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java index 7ed1f84..00afc1a 100644 --- a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java +++ b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java @@ -17,14 +17,7 @@ package org.apache.servicecomb.saga.omega.spring; -import java.lang.invoke.MethodHandles; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import javax.annotation.PreDestroy; - -import org.apache.servicecomb.saga.omega.connector.grpc.GrpcClientMessageSender; +import org.apache.servicecomb.saga.omega.connector.grpc.LoadBalancedClusterMessageSender; import org.apache.servicecomb.saga.omega.context.IdGenerator; import org.apache.servicecomb.saga.omega.context.OmegaContext; import org.apache.servicecomb.saga.omega.context.ServiceConfig; @@ -32,22 +25,13 @@ import org.apache.servicecomb.saga.omega.context.UniqueIdGenerator; import org.apache.servicecomb.saga.omega.format.KryoMessageFormat; import org.apache.servicecomb.saga.omega.transaction.MessageHandler; import org.apache.servicecomb.saga.omega.transaction.MessageSender; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Lazy; -import io.grpc.ManagedChannel; -import io.grpc.ManagedChannelBuilder; - @Configuration class OmegaSpringConfig { - private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - - private final List<ManagedChannel> channels = new ArrayList<>(); - private final List<MessageSender> senders = new ArrayList<>(); @Bean IdGenerator<String> idGenerator() { @@ -64,40 +48,21 @@ class OmegaSpringConfig { return new ServiceConfig(serviceName); } - @PreDestroy - void close() { - senders.forEach(MessageSender::onDisconnected); - channels.forEach(ManagedChannel::shutdown); - } - @Bean - MessageSender grpcMessageSender(@Value("${alpha.cluster.address}") String[] addresses, ServiceConfig serviceConfig, + MessageSender grpcMessageSender( + @Value("${alpha.cluster.address}") String[] addresses, + ServiceConfig serviceConfig, @Lazy MessageHandler handler) { - // TODO: 2017/12/26 connect to the one with lowest latency - for (String address : addresses) { - try { - MessageSender sender = new GrpcClientMessageSender(grpcChannel(address), new KryoMessageFormat(), - new KryoMessageFormat(), serviceConfig, handler); - sender.onConnected(); - senders.add(sender); - return sender; - } catch (Exception e) { - log.error("Unable to connect to alpha at {}", address, e); - } - } - - throw new IllegalArgumentException( - "None of the alpha cluster is reachable: " + Arrays.toString(addresses)); - } - private ManagedChannel grpcChannel(String address) { - String[] pair = address.split(":"); + MessageSender sender = new LoadBalancedClusterMessageSender( + addresses, + new KryoMessageFormat(), + new KryoMessageFormat(), + serviceConfig, + handler); - ManagedChannel channel = ManagedChannelBuilder.forAddress(pair[0], Integer.parseInt(pair[1])) - .usePlaintext(true) - .build(); + Runtime.getRuntime().addShutdownHook(new Thread(sender::close)); - channels.add(channel); - return channel; + return sender; } } diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java index d37c5b8..39b4d62 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java @@ -24,5 +24,8 @@ public interface MessageSender { default void onDisconnected() { } + default void close() { + } + void send(TxEvent event); } -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
