This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 166b32b16a55064143a9b741beef4ea2c2ca2f19 Author: seanyinx <[email protected]> AuthorDate: Tue Dec 26 20:04:09 2017 +0800 SCB-97 closed thrift connections on shutdown Signed-off-by: seanyinx <[email protected]> --- .../alpha/server/SwiftTxEventEndpointImpl.java | 5 +++ .../connector/thrift/ThriftMessageSender.java | 27 ++++--------- .../connector/thrift/ThriftMessageSenderTest.java | 12 +++++- .../saga/omega/spring/OmegaSpringConfig.java | 46 +++++++++++++++++++++- .../contracts/thrift/SwiftTxEventEndpoint.java | 2 +- 5 files changed, 69 insertions(+), 23 deletions(-) diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java index 3ae39f6..78b93b4 100644 --- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java +++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java @@ -43,4 +43,9 @@ class SwiftTxEventEndpointImpl implements SwiftTxEventEndpoint { message.payloads() )); } + + @Override + public void close() throws Exception { + + } } diff --git a/omega/omega-connector/omega-connector-thrift/src/main/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSender.java b/omega/omega-connector/omega-connector-thrift/src/main/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSender.java index bbf9bfa..abdbec5 100644 --- a/omega/omega-connector/omega-connector-thrift/src/main/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSender.java +++ b/omega/omega-connector/omega-connector-thrift/src/main/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSender.java @@ -17,35 +17,17 @@ package io.servicecomb.saga.omega.connector.thrift; -import static com.google.common.net.HostAndPort.fromParts; - -import java.util.concurrent.ExecutionException; - -import com.facebook.nifty.client.FramedClientConnector; -import com.facebook.swift.service.ThriftClientManager; - import io.servicecomb.saga.omega.transaction.MessageSender; import io.servicecomb.saga.omega.transaction.MessageSerializer; import io.servicecomb.saga.omega.transaction.TxEvent; import io.servicecomb.saga.pack.contracts.thrift.SwiftTxEvent; import io.servicecomb.saga.pack.contracts.thrift.SwiftTxEventEndpoint; -public class ThriftMessageSender implements MessageSender { - private static final ThriftClientManager clientManager = new ThriftClientManager(); +public class ThriftMessageSender implements MessageSender, AutoCloseable { private final SwiftTxEventEndpoint eventService; private final MessageSerializer serializer; - public static ThriftMessageSender create(String host, int port, MessageSerializer serializer) { - FramedClientConnector connector = new FramedClientConnector(fromParts(host, port)); - try { - SwiftTxEventEndpoint endpoint = clientManager.createClient(connector, SwiftTxEventEndpoint.class).get(); - return new ThriftMessageSender(endpoint, serializer); - } catch (InterruptedException | ExecutionException e) { - throw new IllegalStateException("Failed to create transaction event endpoint client to " + host + ":" + port, e); - } - } - - ThriftMessageSender(SwiftTxEventEndpoint eventService, MessageSerializer serializer) { + public ThriftMessageSender(SwiftTxEventEndpoint eventService, MessageSerializer serializer) { this.eventService = eventService; this.serializer = serializer; } @@ -61,4 +43,9 @@ public class ThriftMessageSender implements MessageSender { serializer.serialize(event) )); } + + @Override + public void close() throws Exception { + eventService.close(); + } } diff --git a/omega/omega-connector/omega-connector-thrift/src/test/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSenderTest.java b/omega/omega-connector/omega-connector-thrift/src/test/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSenderTest.java index 3e1f833..9d0ea97 100644 --- a/omega/omega-connector/omega-connector-thrift/src/test/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSenderTest.java +++ b/omega/omega-connector/omega-connector-thrift/src/test/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSenderTest.java @@ -52,7 +52,17 @@ public class ThriftMessageSenderTest { } }; - private final SwiftTxEventEndpoint eventService = (event) -> swiftTxEvent = event; + private final SwiftTxEventEndpoint eventService = new SwiftTxEventEndpoint() { + @Override + public void handle(SwiftTxEvent message) { + swiftTxEvent = message; + } + + @Override + public void close() throws Exception { + } + }; + private final ThriftMessageSender messageSender = new ThriftMessageSender(eventService, serializer); @Test diff --git a/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java b/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java index 33a0cbc..5f62884 100644 --- a/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java +++ b/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java @@ -17,8 +17,15 @@ package io.servicecomb.saga.omega.spring; +import static com.google.common.net.HostAndPort.fromParts; + import java.lang.invoke.MethodHandles; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; + +import javax.annotation.PreDestroy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,16 +33,23 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import com.facebook.nifty.client.FramedClientConnector; +import com.facebook.swift.service.ThriftClientManager; + import io.servicecomb.saga.omega.connector.thrift.ThriftMessageSender; import io.servicecomb.saga.omega.context.IdGenerator; import io.servicecomb.saga.omega.context.OmegaContext; import io.servicecomb.saga.omega.context.UniqueIdGenerator; import io.servicecomb.saga.omega.format.NativeMessageFormat; import io.servicecomb.saga.omega.transaction.MessageSender; +import io.servicecomb.saga.omega.transaction.MessageSerializer; +import io.servicecomb.saga.pack.contracts.thrift.SwiftTxEventEndpoint; @Configuration class OmegaSpringConfig { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private final ThriftClientManager clientManager = new ThriftClientManager(); + private final List<AutoCloseable> closeables = new ArrayList<>(); @Bean IdGenerator<String> idGenerator() { @@ -53,7 +67,9 @@ class OmegaSpringConfig { for (String address : addresses) { try { String[] pair = address.split(":"); - return ThriftMessageSender.create(pair[0], Integer.parseInt(pair[1]), new NativeMessageFormat()); + ThriftMessageSender sender = createMessageSender(clientManager, pair[0], Integer.parseInt(pair[1]), new NativeMessageFormat()); + closeables.add(sender); + return sender; } catch (Exception e) { log.error("Unable to connect to alpha at {}", address, e); } @@ -62,4 +78,32 @@ class OmegaSpringConfig { throw new IllegalArgumentException( "None of the alpha cluster is reachable: " + Arrays.toString(addresses)); } + + private ThriftMessageSender createMessageSender(ThriftClientManager clientManager, + String host, + int port, + MessageSerializer serializer) { + + FramedClientConnector connector = new FramedClientConnector(fromParts(host, port)); + + try { + SwiftTxEventEndpoint endpoint = clientManager.createClient(connector, SwiftTxEventEndpoint.class).get(); + return new ThriftMessageSender(endpoint, serializer); + } catch (InterruptedException | ExecutionException e) { + throw new IllegalStateException("Failed to create transaction event endpoint client to " + host + ":" + port, e); + } + } + + @PreDestroy + void close() { + for (AutoCloseable closeable : closeables) { + try { + closeable.close(); + } catch (Exception e) { + log.warn("Failed to close message sender", e); + } + } + + clientManager.close(); + } } diff --git a/pack-contracts/pack-contract-thrift/src/main/java/io/servicecomb/saga/pack/contracts/thrift/SwiftTxEventEndpoint.java b/pack-contracts/pack-contract-thrift/src/main/java/io/servicecomb/saga/pack/contracts/thrift/SwiftTxEventEndpoint.java index beff7bc..ae1fde9 100644 --- a/pack-contracts/pack-contract-thrift/src/main/java/io/servicecomb/saga/pack/contracts/thrift/SwiftTxEventEndpoint.java +++ b/pack-contracts/pack-contract-thrift/src/main/java/io/servicecomb/saga/pack/contracts/thrift/SwiftTxEventEndpoint.java @@ -21,7 +21,7 @@ import com.facebook.swift.service.ThriftMethod; import com.facebook.swift.service.ThriftService; @ThriftService("TxEventEndpoint") -public interface SwiftTxEventEndpoint { +public interface SwiftTxEventEndpoint extends AutoCloseable { @ThriftMethod void handle(SwiftTxEvent message); -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
