This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 01c4a1d415576ad9920fa84cd81ad37dc4f6c637
Author: seanyinx <[email protected]>
AuthorDate: Mon Jan 8 16:05:15 2018 +0800

    SCB-168 load balance based on server latency
    
    Signed-off-by: seanyinx <[email protected]>
---
 .../grpc/LoadBalancedClusterMessageSender.java     | 128 +++++++++------------
 .../grpc/LoadBalancedClusterMessageSenderTest.java |  84 +++++++++++---
 .../saga/omega/spring/OmegaSpringConfig.java       |  59 ++--------
 .../saga/omega/transaction/MessageSender.java      |   3 +
 4 files changed, 141 insertions(+), 133 deletions(-)

diff --git 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
index 276f887..e5c9d19 100644
--- 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
+++ 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
@@ -17,13 +17,16 @@
 
 package org.apache.servicecomb.saga.omega.connector.grpc;
 
+import static java.util.Collections.emptyList;
+
 import java.lang.invoke.MethodHandles;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.net.URI;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.function.Consumer;
 
 import org.apache.servicecomb.saga.omega.context.ServiceConfig;
 import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
@@ -34,51 +37,61 @@ import 
org.apache.servicecomb.saga.omega.transaction.TxEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.grpc.Attributes;
-import io.grpc.EquivalentAddressGroup;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
-import io.grpc.NameResolver;
-import io.grpc.NameResolver.Factory;
-import io.grpc.util.RoundRobinLoadBalancerFactory;
 
 public class LoadBalancedClusterMessageSender implements MessageSender {
   private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  private final MessageSender messageSender;
+  private final Map<MessageSender, Long> senders = new HashMap<>();
+  private final Collection<ManagedChannel> channels;
 
-  public LoadBalancedClusterMessageSender(String addresses,
+  public LoadBalancedClusterMessageSender(String[] addresses,
       MessageSerializer serializer,
       MessageDeserializer deserializer,
       ServiceConfig serviceConfig,
       MessageHandler handler) {
 
-    this(new GrpcClientMessageSender(clusterDirectAddressChannel(addresses),
-        serializer,
-        deserializer,
-        serviceConfig,
-        handler));
-  }
+    if (addresses.length == 0) {
+      throw new IllegalArgumentException("No reachable cluster address 
provided");
+    }
 
-  LoadBalancedClusterMessageSender(MessageSender messageSender) {
-    this.messageSender = messageSender;
+    channels = new ArrayList<>(addresses.length);
+    for (String address : addresses) {
+      ManagedChannel channel = ManagedChannelBuilder.forTarget(address)
+          .usePlaintext(true)
+          .build();
+
+      channels.add(channel);
+      senders.put(
+          new GrpcClientMessageSender(channel,
+              serializer,
+              deserializer,
+              serviceConfig,
+              handler),
+          0L);
+    }
   }
 
-  private static ManagedChannel clusterDirectAddressChannel(String addresses) {
-    return ManagedChannelBuilder.forTarget(addresses)
-        .nameResolverFactory(new ClusterNameResolverFactory(addresses))
-        .loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance())
-        .usePlaintext(true)
-        .build();
+  LoadBalancedClusterMessageSender(MessageSender... messageSenders) {
+    for (MessageSender sender : messageSenders) {
+      senders.put(sender, 0L);
+    }
+    channels = emptyList();
   }
 
   @Override
   public void onConnected() {
-    messageSender.onConnected();
+    senders.keySet().forEach(MessageSender::onConnected);
   }
 
   @Override
   public void onDisconnected() {
-    messageSender.onDisconnected();
+    senders.keySet().forEach(MessageSender::onDisconnected);
+  }
+
+  @Override
+  public void close() {
+    channels.forEach(ManagedChannel::shutdownNow);
   }
 
   @Override
@@ -86,54 +99,27 @@ public class LoadBalancedClusterMessageSender implements 
MessageSender {
     boolean success = false;
     do {
       try {
-        messageSender.send(event);
+        withFastestSender(messageSender -> {
+          // very large latency on exception
+          senders.put(messageSender, Long.MAX_VALUE);
+
+          long startTime = System.nanoTime();
+          messageSender.send(event);
+          senders.put(messageSender, System.nanoTime() - startTime);
+        });
+
         success = true;
       } catch (Exception e) {
         log.error("Retry sending event {} due to failure", event, e);
       }
     } while (!success && !Thread.currentThread().isInterrupted());
-
   }
 
-  private static class ClusterNameResolverFactory extends Factory {
-    private final String addresses;
-
-    private ClusterNameResolverFactory(String addresses) {
-      this.addresses = addresses;
-    }
-
-    @Override
-    public NameResolver newNameResolver(URI targetUri, Attributes params) {
-      return new NameResolver() {
-        @Override
-        public String getServiceAuthority() {
-          return "localhost";
-        }
-
-        @Override
-        public void start(final Listener listener) {
-          List<SocketAddress> socketAddresses = 
Arrays.stream(addresses.split(","))
-              .map(address -> {
-                String[] split = address.split(":");
-                return new InetSocketAddress(split[0], 
Integer.parseInt(split[1]));
-              })
-              .collect(Collectors.toList());
-
-          listener.onAddresses(
-              Arrays.asList(new EquivalentAddressGroup(socketAddresses.get(0)),
-                  new EquivalentAddressGroup(socketAddresses.get(1))),
-              Attributes.EMPTY);
-        }
-
-        @Override
-        public void shutdown() {
-        }
-      };
-    }
-
-    @Override
-    public String getDefaultScheme() {
-      return "directaddress";
-    }
+  private void withFastestSender(Consumer<MessageSender> consumer) {
+    senders.entrySet()
+        .stream()
+        .min(Comparator.comparingLong(Entry::getValue))
+        .map(Entry::getKey)
+        .ifPresent(consumer);
   }
 }
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
index 4fee0e6..6f2d90b 100644
--- 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
+++ 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
@@ -17,10 +17,13 @@
 
 package org.apache.servicecomb.saga.omega.connector.grpc;
 
+import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
@@ -48,16 +51,19 @@ import 
org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
 import 
org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceImplBase;
+import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
+import org.junit.FixMethodOrder;
 import org.junit.Test;
+import org.junit.runners.MethodSorters;
 import org.mockito.Mockito;
 
 import io.grpc.Server;
 import io.grpc.ServerBuilder;
 import io.grpc.stub.StreamObserver;
 
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
 public class LoadBalancedClusterMessageSenderTest {
 
   private static final int[] ports = {8080, 8090};
@@ -66,6 +72,11 @@ public class LoadBalancedClusterMessageSenderTest {
   private static final Queue<TxEvent> eventsOn8080 = new 
ConcurrentLinkedQueue<>();
   private static final Queue<TxEvent> eventsOn8090 = new 
ConcurrentLinkedQueue<>();
 
+  private static final Map<Integer, Integer> delays = new HashMap<Integer, 
Integer>() {{
+    put(8080, 0);
+    put(8090, 100);
+  }};
+
   private static final Map<Integer, Set<String>> connected = new 
HashMap<Integer, Set<String>>() {{
     put(8080, new ConcurrentSkipListSet<>());
     put(8090, new ConcurrentSkipListSet<>());
@@ -76,8 +87,6 @@ public class LoadBalancedClusterMessageSenderTest {
     put(8090, eventsOn8090);
   }};
 
-  private final String addresses = "localhost:8080,localhost:8090";
-
   private final MessageSerializer serializer = objects -> 
objects[0].toString().getBytes();
 
   private final MessageDeserializer deserializer = message -> new Object[] 
{new String(message)};
@@ -91,18 +100,23 @@ public class LoadBalancedClusterMessageSenderTest {
   private final TxEvent event = new TxEvent(globalTxId, localTxId, parentTxId, 
compensationMethod, "blah");
 
   private final String serviceName = uniquify("serviceName");
-  private final MessageSender messageSender = new 
LoadBalancedClusterMessageSender(
-      addresses,
-      serializer,
-      deserializer,
-      new ServiceConfig(serviceName),
-      handler);
+  private final String[] addresses = {"localhost:8080", "localhost:8090"};
+  private final MessageSender messageSender = newMessageSender(addresses);
+
+  private MessageSender newMessageSender(String[] addresses) {
+    return new LoadBalancedClusterMessageSender(
+        addresses,
+        serializer,
+        deserializer,
+        new ServiceConfig(serviceName),
+        handler);
+  }
 
   @BeforeClass
   public static void beforeClass() throws Exception {
     Arrays.stream(ports).forEach(port -> {
       ServerBuilder<?> serverBuilder = ServerBuilder.forPort(port);
-      serverBuilder.addService(new MyTxEventService(connected.get(port), 
eventsMap.get(port)));
+      serverBuilder.addService(new MyTxEventService(connected.get(port), 
eventsMap.get(port), delays.get(port)));
       Server server = serverBuilder.build();
 
       try {
@@ -119,8 +133,14 @@ public class LoadBalancedClusterMessageSenderTest {
     servers.forEach(Server::shutdown);
   }
 
+  @After
+  public void after() throws Exception {
+    eventsOn8080.clear();
+    eventsOn8090.clear();
+  }
+
   @Test
-  public void reconnectOnConnectionLoss() throws Exception {
+  public void resendToAnotherServerOnFailure() throws Exception {
     messageSender.send(event);
 
     killServerReceivedMessage();
@@ -150,7 +170,6 @@ public class LoadBalancedClusterMessageSenderTest {
     thread.join();
   }
 
-  @Ignore
   @Test
   public void broadcastConnectionAndDisconnection() throws Exception {
     messageSender.onConnected();
@@ -164,6 +183,29 @@ public class LoadBalancedClusterMessageSenderTest {
     assertThat(connected.get(8090).isEmpty(), is(true));
   }
 
+  @Test
+  public void considerFasterServerFirst() throws Exception {
+    // we don't know which server is selected at first
+    messageSender.send(event);
+
+    // but now we only send to the one with lowest latency
+    messageSender.send(event);
+    messageSender.send(event);
+
+    assertThat(eventsOn8080.size(), greaterThanOrEqualTo(2));
+    assertThat(eventsOn8090.size(), lessThanOrEqualTo(1));
+  }
+
+  @Test
+  public void blowsUpWhenNoServerAddressProvided() throws Exception {
+    try {
+      newMessageSender(new String[0]);
+      expectFailing(IllegalArgumentException.class);
+    } catch (IllegalArgumentException e) {
+      assertThat(e.getMessage(), is("No reachable cluster address provided"));
+    }
+  }
+
   private void killServerReceivedMessage() {
     int index = 0;
     for (int port : eventsMap.keySet()) {
@@ -177,15 +219,17 @@ public class LoadBalancedClusterMessageSenderTest {
   private static class MyTxEventService extends TxEventServiceImplBase {
     private final Set<String> connected;
     private final Queue<TxEvent> events;
+    private final int delay;
 
-    private MyTxEventService(Set<String> connected, Queue<TxEvent> events) {
+    private MyTxEventService(Set<String> connected, Queue<TxEvent> events, int 
delay) {
       this.connected = connected;
       this.events = events;
+      this.delay = delay;
     }
 
     @Override
     public void onConnected(GrpcServiceConfig request, 
StreamObserver<GrpcCompensateCommand> responseObserver) {
-      connected.add(request.getInstanceId());
+      connected.add(request.getServiceName());
     }
 
     @Override
@@ -197,13 +241,23 @@ public class LoadBalancedClusterMessageSenderTest {
           request.getCompensationMethod(),
           new String(request.getPayloads().toByteArray())));
 
+      sleep();
+
       responseObserver.onNext(GrpcAck.newBuilder().build());
       responseObserver.onCompleted();
     }
 
+    private void sleep() {
+      try {
+        Thread.sleep(delay);
+      } catch (InterruptedException e) {
+        fail(e.getMessage());
+      }
+    }
+
     @Override
     public void onDisconnected(GrpcServiceConfig request, 
StreamObserver<GrpcAck> responseObserver) {
-      connected.remove(request.getInstanceId());
+      connected.remove(request.getServiceName());
       responseObserver.onNext(GrpcAck.newBuilder().build());
       responseObserver.onCompleted();
     }
diff --git 
a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
 
b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
index 7ed1f84..00afc1a 100644
--- 
a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
+++ 
b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
@@ -17,14 +17,7 @@
 
 package org.apache.servicecomb.saga.omega.spring;
 
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import javax.annotation.PreDestroy;
-
-import 
org.apache.servicecomb.saga.omega.connector.grpc.GrpcClientMessageSender;
+import 
org.apache.servicecomb.saga.omega.connector.grpc.LoadBalancedClusterMessageSender;
 import org.apache.servicecomb.saga.omega.context.IdGenerator;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
 import org.apache.servicecomb.saga.omega.context.ServiceConfig;
@@ -32,22 +25,13 @@ import 
org.apache.servicecomb.saga.omega.context.UniqueIdGenerator;
 import org.apache.servicecomb.saga.omega.format.KryoMessageFormat;
 import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
 import org.apache.servicecomb.saga.omega.transaction.MessageSender;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.Lazy;
 
-import io.grpc.ManagedChannel;
-import io.grpc.ManagedChannelBuilder;
-
 @Configuration
 class OmegaSpringConfig {
-  private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-  private final List<ManagedChannel> channels = new ArrayList<>();
-  private final List<MessageSender> senders = new ArrayList<>();
 
   @Bean
   IdGenerator<String> idGenerator() {
@@ -64,40 +48,21 @@ class OmegaSpringConfig {
     return new ServiceConfig(serviceName);
   }
 
-  @PreDestroy
-  void close() {
-    senders.forEach(MessageSender::onDisconnected);
-    channels.forEach(ManagedChannel::shutdown);
-  }
-
   @Bean
-  MessageSender grpcMessageSender(@Value("${alpha.cluster.address}") String[] 
addresses, ServiceConfig serviceConfig,
+  MessageSender grpcMessageSender(
+      @Value("${alpha.cluster.address}") String[] addresses,
+      ServiceConfig serviceConfig,
       @Lazy MessageHandler handler) {
-    // TODO: 2017/12/26 connect to the one with lowest latency
-    for (String address : addresses) {
-      try {
-        MessageSender sender = new 
GrpcClientMessageSender(grpcChannel(address), new KryoMessageFormat(),
-            new KryoMessageFormat(), serviceConfig, handler);
-        sender.onConnected();
-        senders.add(sender);
-        return sender;
-      } catch (Exception e) {
-        log.error("Unable to connect to alpha at {}", address, e);
-      }
-    }
-
-    throw new IllegalArgumentException(
-        "None of the alpha cluster is reachable: " + 
Arrays.toString(addresses));
-  }
 
-  private ManagedChannel grpcChannel(String address) {
-    String[] pair = address.split(":");
+        MessageSender sender = new LoadBalancedClusterMessageSender(
+            addresses,
+            new KryoMessageFormat(),
+            new KryoMessageFormat(),
+            serviceConfig,
+            handler);
 
-    ManagedChannel channel = ManagedChannelBuilder.forAddress(pair[0], 
Integer.parseInt(pair[1]))
-        .usePlaintext(true)
-        .build();
+        Runtime.getRuntime().addShutdownHook(new Thread(sender::close));
 
-    channels.add(channel);
-    return channel;
+    return sender;
   }
 }
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java
index d37c5b8..39b4d62 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java
@@ -24,5 +24,8 @@ public interface MessageSender {
   default void onDisconnected() {
   }
 
+  default void close() {
+  }
+
   void send(TxEvent event);
 }

-- 
To stop receiving notification emails like this one, please contact
"[email protected]" <[email protected]>.

Reply via email to