This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit e5efd4164e5d8b35b1f336c1c6657d37fb3fe257 Author: seanyinx <[email protected]> AuthorDate: Mon Jan 8 14:22:51 2018 +0800 SCB-168 load balanced alpha cluster client Signed-off-by: seanyinx <[email protected]> --- omega/omega-connector/omega-connector-grpc/pom.xml | 16 ++ .../grpc/LoadBalancedClusterMessageSender.java | 139 ++++++++++++++ .../grpc/LoadBalancedClusterMessageSenderTest.java | 211 +++++++++++++++++++++ .../src/test/resources/log4j2-test.xml | 30 +++ .../saga/omega/format/MessageFormatTestBase.java | 2 +- .../saga/omega/transaction/MessageSerializer.java | 2 - 6 files changed, 397 insertions(+), 3 deletions(-) diff --git a/omega/omega-connector/omega-connector-grpc/pom.xml b/omega/omega-connector/omega-connector-grpc/pom.xml index 375fac7..258eb4b 100644 --- a/omega/omega-connector/omega-connector-grpc/pom.xml +++ b/omega/omega-connector/omega-connector-grpc/pom.xml @@ -53,6 +53,18 @@ <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j-impl</artifactId> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + </dependency> <dependency> <groupId>junit</groupId> @@ -70,5 +82,9 @@ <groupId>com.github.seanyinx</groupId> <artifactId>unit-scaffolding</artifactId> </dependency> + <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + </dependency> </dependencies> </project> diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java new file mode 100644 index 0000000..276f887 --- /dev/null +++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.saga.omega.connector.grpc; + +import java.lang.invoke.MethodHandles; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.URI; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.servicecomb.saga.omega.context.ServiceConfig; +import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer; +import org.apache.servicecomb.saga.omega.transaction.MessageHandler; +import org.apache.servicecomb.saga.omega.transaction.MessageSender; +import org.apache.servicecomb.saga.omega.transaction.MessageSerializer; +import org.apache.servicecomb.saga.omega.transaction.TxEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.grpc.Attributes; +import io.grpc.EquivalentAddressGroup; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.NameResolver; +import io.grpc.NameResolver.Factory; +import io.grpc.util.RoundRobinLoadBalancerFactory; + +public class LoadBalancedClusterMessageSender implements MessageSender { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private final MessageSender messageSender; + + public LoadBalancedClusterMessageSender(String addresses, + MessageSerializer serializer, + MessageDeserializer deserializer, + ServiceConfig serviceConfig, + MessageHandler handler) { + + this(new GrpcClientMessageSender(clusterDirectAddressChannel(addresses), + serializer, + deserializer, + serviceConfig, + handler)); + } + + LoadBalancedClusterMessageSender(MessageSender messageSender) { + this.messageSender = messageSender; + } + + private static ManagedChannel clusterDirectAddressChannel(String addresses) { + return ManagedChannelBuilder.forTarget(addresses) + .nameResolverFactory(new ClusterNameResolverFactory(addresses)) + .loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance()) + .usePlaintext(true) + .build(); + } + + @Override + public void onConnected() { + messageSender.onConnected(); + } + + @Override + public void onDisconnected() { + messageSender.onDisconnected(); + } + + @Override + public void send(TxEvent event) { + boolean success = false; + do { + try { + messageSender.send(event); + success = true; + } catch (Exception e) { + log.error("Retry sending event {} due to failure", event, e); + } + } while (!success && !Thread.currentThread().isInterrupted()); + + } + + private static class ClusterNameResolverFactory extends Factory { + private final String addresses; + + private ClusterNameResolverFactory(String addresses) { + this.addresses = addresses; + } + + @Override + public NameResolver newNameResolver(URI targetUri, Attributes params) { + return new NameResolver() { + @Override + public String getServiceAuthority() { + return "localhost"; + } + + @Override + public void start(final Listener listener) { + List<SocketAddress> socketAddresses = Arrays.stream(addresses.split(",")) + .map(address -> { + String[] split = address.split(":"); + return new InetSocketAddress(split[0], Integer.parseInt(split[1])); + }) + .collect(Collectors.toList()); + + listener.onAddresses( + Arrays.asList(new EquivalentAddressGroup(socketAddresses.get(0)), + new EquivalentAddressGroup(socketAddresses.get(1))), + Attributes.EMPTY); + } + + @Override + public void shutdown() { + } + }; + } + + @Override + public String getDefaultScheme() { + return "directaddress"; + } + } +} diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java new file mode 100644 index 0000000..4fee0e6 --- /dev/null +++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.saga.omega.connector.grpc; + +import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.doThrow; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentSkipListSet; + +import org.apache.servicecomb.saga.omega.context.ServiceConfig; +import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer; +import org.apache.servicecomb.saga.omega.transaction.MessageHandler; +import org.apache.servicecomb.saga.omega.transaction.MessageSender; +import org.apache.servicecomb.saga.omega.transaction.MessageSerializer; +import org.apache.servicecomb.saga.omega.transaction.TxEvent; +import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck; +import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand; +import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig; +import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent; +import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceImplBase; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.mockito.Mockito; + +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.stub.StreamObserver; + +public class LoadBalancedClusterMessageSenderTest { + + private static final int[] ports = {8080, 8090}; + private static final List<Server> servers = new ArrayList<>(); + + private static final Queue<TxEvent> eventsOn8080 = new ConcurrentLinkedQueue<>(); + private static final Queue<TxEvent> eventsOn8090 = new ConcurrentLinkedQueue<>(); + + private static final Map<Integer, Set<String>> connected = new HashMap<Integer, Set<String>>() {{ + put(8080, new ConcurrentSkipListSet<>()); + put(8090, new ConcurrentSkipListSet<>()); + }}; + + private static final Map<Integer, Queue<TxEvent>> eventsMap = new HashMap<Integer, Queue<TxEvent>>() {{ + put(8080, eventsOn8080); + put(8090, eventsOn8090); + }}; + + private final String addresses = "localhost:8080,localhost:8090"; + + private final MessageSerializer serializer = objects -> objects[0].toString().getBytes(); + + private final MessageDeserializer deserializer = message -> new Object[] {new String(message)}; + private final MessageHandler handler = (globalTxId, localTxId, parentTxId, compensationMethod, payloads) -> { + }; + + private final String globalTxId = uniquify("globalTxId"); + private final String localTxId = uniquify("localTxId"); + private final String parentTxId = uniquify("parentTxId"); + private final String compensationMethod = getClass().getCanonicalName(); + private final TxEvent event = new TxEvent(globalTxId, localTxId, parentTxId, compensationMethod, "blah"); + + private final String serviceName = uniquify("serviceName"); + private final MessageSender messageSender = new LoadBalancedClusterMessageSender( + addresses, + serializer, + deserializer, + new ServiceConfig(serviceName), + handler); + + @BeforeClass + public static void beforeClass() throws Exception { + Arrays.stream(ports).forEach(port -> { + ServerBuilder<?> serverBuilder = ServerBuilder.forPort(port); + serverBuilder.addService(new MyTxEventService(connected.get(port), eventsMap.get(port))); + Server server = serverBuilder.build(); + + try { + server.start(); + servers.add(server); + } catch (IOException e) { + fail(e.getMessage()); + } + }); + } + + @AfterClass + public static void tearDown() throws Exception { + servers.forEach(Server::shutdown); + } + + @Test + public void reconnectOnConnectionLoss() throws Exception { + messageSender.send(event); + + killServerReceivedMessage(); + + messageSender.send(event); + + assertThat(eventsOn8080.size(), is(1)); + assertThat(eventsOn8080.peek().toString(), is(event.toString())); + + assertThat(eventsOn8090.size(), is(1)); + assertThat(eventsOn8090.peek().toString(), is(event.toString())); + } + + @Test (timeout = 1000) + public void stopSendingOnInterruption() throws Exception { + MessageSender underlying = Mockito.mock(MessageSender.class); + doThrow(RuntimeException.class).when(underlying).send(event); + + MessageSender messageSender = new LoadBalancedClusterMessageSender(underlying); + + Thread thread = new Thread(() -> messageSender.send(event)); + thread.start(); + + Thread.sleep(300); + + thread.interrupt(); + thread.join(); + } + + @Ignore + @Test + public void broadcastConnectionAndDisconnection() throws Exception { + messageSender.onConnected(); + await().atMost(1, SECONDS).until(() -> !connected.get(8080).isEmpty() && !connected.get(8090).isEmpty()); + + assertThat(connected.get(8080), contains(serviceName)); + assertThat(connected.get(8090), contains(serviceName)); + + messageSender.onDisconnected(); + assertThat(connected.get(8080).isEmpty(), is(true)); + assertThat(connected.get(8090).isEmpty(), is(true)); + } + + private void killServerReceivedMessage() { + int index = 0; + for (int port : eventsMap.keySet()) { + if (!eventsMap.get(port).isEmpty()) { + servers.get(index).shutdownNow(); + } + index++; + } + } + + private static class MyTxEventService extends TxEventServiceImplBase { + private final Set<String> connected; + private final Queue<TxEvent> events; + + private MyTxEventService(Set<String> connected, Queue<TxEvent> events) { + this.connected = connected; + this.events = events; + } + + @Override + public void onConnected(GrpcServiceConfig request, StreamObserver<GrpcCompensateCommand> responseObserver) { + connected.add(request.getInstanceId()); + } + + @Override + public void onTxEvent(GrpcTxEvent request, StreamObserver<GrpcAck> responseObserver) { + events.offer(new TxEvent( + request.getGlobalTxId(), + request.getLocalTxId(), + request.getParentTxId(), + request.getCompensationMethod(), + new String(request.getPayloads().toByteArray()))); + + responseObserver.onNext(GrpcAck.newBuilder().build()); + responseObserver.onCompleted(); + } + + @Override + public void onDisconnected(GrpcServiceConfig request, StreamObserver<GrpcAck> responseObserver) { + connected.remove(request.getInstanceId()); + responseObserver.onNext(GrpcAck.newBuilder().build()); + responseObserver.onCompleted(); + } + } +} diff --git a/omega/omega-connector/omega-connector-grpc/src/test/resources/log4j2-test.xml b/omega/omega-connector/omega-connector-grpc/src/test/resources/log4j2-test.xml new file mode 100644 index 0000000..58924c6 --- /dev/null +++ b/omega/omega-connector/omega-connector-grpc/src/test/resources/log4j2-test.xml @@ -0,0 +1,30 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<Configuration status="WARN"> + <Appenders> + <Console name="Console" target="SYSTEM_OUT"> + <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/> + </Console> + </Appenders> + <Loggers> + <Root level="info"> + <AppenderRef ref="Console"/> + </Root> + </Loggers> +</Configuration> diff --git a/omega/omega-format/src/test/java/org/apache/servicecomb/saga/omega/format/MessageFormatTestBase.java b/omega/omega-format/src/test/java/org/apache/servicecomb/saga/omega/format/MessageFormatTestBase.java index 17674b7..d3c591e 100644 --- a/omega/omega-format/src/test/java/org/apache/servicecomb/saga/omega/format/MessageFormatTestBase.java +++ b/omega/omega-format/src/test/java/org/apache/servicecomb/saga/omega/format/MessageFormatTestBase.java @@ -37,7 +37,7 @@ public class MessageFormatTestBase { @Test public void serializeObjectIntoBytes() throws Exception { - byte[] bytes = format.serialize(eventOf("hello", "world")); + byte[] bytes = format.serialize(new String[]{"hello", "world"}); Object[] message = format.deserialize(bytes); diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSerializer.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSerializer.java index 7dfe0ca..0bc1e46 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSerializer.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSerializer.java @@ -18,7 +18,5 @@ package org.apache.servicecomb.saga.omega.transaction; public interface MessageSerializer { - byte[] serialize(TxEvent event); - byte[] serialize(Object[] objects); } -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
