This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 166b32b16a55064143a9b741beef4ea2c2ca2f19
Author: seanyinx <[email protected]>
AuthorDate: Tue Dec 26 20:04:09 2017 +0800

    SCB-97 closed thrift connections on shutdown
    
    Signed-off-by: seanyinx <[email protected]>
---
 .../alpha/server/SwiftTxEventEndpointImpl.java     |  5 +++
 .../connector/thrift/ThriftMessageSender.java      | 27 ++++---------
 .../connector/thrift/ThriftMessageSenderTest.java  | 12 +++++-
 .../saga/omega/spring/OmegaSpringConfig.java       | 46 +++++++++++++++++++++-
 .../contracts/thrift/SwiftTxEventEndpoint.java     |  2 +-
 5 files changed, 69 insertions(+), 23 deletions(-)

diff --git 
a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java
 
b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java
index 3ae39f6..78b93b4 100644
--- 
a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java
+++ 
b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java
@@ -43,4 +43,9 @@ class SwiftTxEventEndpointImpl implements 
SwiftTxEventEndpoint {
         message.payloads()
     ));
   }
+
+  @Override
+  public void close() throws Exception {
+    
+  }
 }
diff --git 
a/omega/omega-connector/omega-connector-thrift/src/main/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSender.java
 
b/omega/omega-connector/omega-connector-thrift/src/main/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSender.java
index bbf9bfa..abdbec5 100644
--- 
a/omega/omega-connector/omega-connector-thrift/src/main/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSender.java
+++ 
b/omega/omega-connector/omega-connector-thrift/src/main/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSender.java
@@ -17,35 +17,17 @@
 
 package io.servicecomb.saga.omega.connector.thrift;
 
-import static com.google.common.net.HostAndPort.fromParts;
-
-import java.util.concurrent.ExecutionException;
-
-import com.facebook.nifty.client.FramedClientConnector;
-import com.facebook.swift.service.ThriftClientManager;
-
 import io.servicecomb.saga.omega.transaction.MessageSender;
 import io.servicecomb.saga.omega.transaction.MessageSerializer;
 import io.servicecomb.saga.omega.transaction.TxEvent;
 import io.servicecomb.saga.pack.contracts.thrift.SwiftTxEvent;
 import io.servicecomb.saga.pack.contracts.thrift.SwiftTxEventEndpoint;
 
-public class ThriftMessageSender implements MessageSender {
-  private static final ThriftClientManager clientManager = new 
ThriftClientManager();
+public class ThriftMessageSender implements MessageSender, AutoCloseable {
   private final SwiftTxEventEndpoint eventService;
   private final MessageSerializer serializer;
 
-  public static ThriftMessageSender create(String host, int port, 
MessageSerializer serializer) {
-    FramedClientConnector connector = new 
FramedClientConnector(fromParts(host, port));
-    try {
-      SwiftTxEventEndpoint endpoint = clientManager.createClient(connector, 
SwiftTxEventEndpoint.class).get();
-      return new ThriftMessageSender(endpoint, serializer);
-    } catch (InterruptedException | ExecutionException e) {
-      throw new IllegalStateException("Failed to create transaction event 
endpoint client to " + host + ":" + port, e);
-    }
-  }
-
-  ThriftMessageSender(SwiftTxEventEndpoint eventService, MessageSerializer 
serializer) {
+  public ThriftMessageSender(SwiftTxEventEndpoint eventService, 
MessageSerializer serializer) {
     this.eventService = eventService;
     this.serializer = serializer;
   }
@@ -61,4 +43,9 @@ public class ThriftMessageSender implements MessageSender {
         serializer.serialize(event)
     ));
   }
+
+  @Override
+  public void close() throws Exception {
+    eventService.close();
+  }
 }
diff --git 
a/omega/omega-connector/omega-connector-thrift/src/test/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSenderTest.java
 
b/omega/omega-connector/omega-connector-thrift/src/test/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSenderTest.java
index 3e1f833..9d0ea97 100644
--- 
a/omega/omega-connector/omega-connector-thrift/src/test/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSenderTest.java
+++ 
b/omega/omega-connector/omega-connector-thrift/src/test/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSenderTest.java
@@ -52,7 +52,17 @@ public class ThriftMessageSenderTest {
     }
   };
 
-  private final SwiftTxEventEndpoint eventService = (event) -> swiftTxEvent = 
event;
+  private final SwiftTxEventEndpoint eventService = new SwiftTxEventEndpoint() 
{
+    @Override
+    public void handle(SwiftTxEvent message) {
+      swiftTxEvent = message;
+    }
+
+    @Override
+    public void close() throws Exception {
+    }
+  };
+
   private final ThriftMessageSender messageSender = new 
ThriftMessageSender(eventService, serializer);
 
   @Test
diff --git 
a/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java
 
b/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java
index 33a0cbc..5f62884 100644
--- 
a/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java
+++ 
b/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java
@@ -17,8 +17,15 @@
 
 package io.servicecomb.saga.omega.spring;
 
+import static com.google.common.net.HostAndPort.fromParts;
+
 import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import javax.annotation.PreDestroy;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -26,16 +33,23 @@ import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
+import com.facebook.nifty.client.FramedClientConnector;
+import com.facebook.swift.service.ThriftClientManager;
+
 import io.servicecomb.saga.omega.connector.thrift.ThriftMessageSender;
 import io.servicecomb.saga.omega.context.IdGenerator;
 import io.servicecomb.saga.omega.context.OmegaContext;
 import io.servicecomb.saga.omega.context.UniqueIdGenerator;
 import io.servicecomb.saga.omega.format.NativeMessageFormat;
 import io.servicecomb.saga.omega.transaction.MessageSender;
+import io.servicecomb.saga.omega.transaction.MessageSerializer;
+import io.servicecomb.saga.pack.contracts.thrift.SwiftTxEventEndpoint;
 
 @Configuration
 class OmegaSpringConfig {
   private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final ThriftClientManager clientManager = new ThriftClientManager();
+  private final List<AutoCloseable> closeables = new ArrayList<>();
 
   @Bean
   IdGenerator<String> idGenerator() {
@@ -53,7 +67,9 @@ class OmegaSpringConfig {
     for (String address : addresses) {
       try {
         String[] pair = address.split(":");
-        return ThriftMessageSender.create(pair[0], Integer.parseInt(pair[1]), 
new NativeMessageFormat());
+        ThriftMessageSender sender = createMessageSender(clientManager, 
pair[0], Integer.parseInt(pair[1]), new NativeMessageFormat());
+        closeables.add(sender);
+        return sender;
       } catch (Exception e) {
         log.error("Unable to connect to alpha at {}", address, e);
       }
@@ -62,4 +78,32 @@ class OmegaSpringConfig {
     throw new IllegalArgumentException(
         "None of the alpha cluster is reachable: " + 
Arrays.toString(addresses));
   }
+
+  private ThriftMessageSender createMessageSender(ThriftClientManager 
clientManager,
+      String host,
+      int port,
+      MessageSerializer serializer) {
+
+    FramedClientConnector connector = new 
FramedClientConnector(fromParts(host, port));
+
+    try {
+      SwiftTxEventEndpoint endpoint = clientManager.createClient(connector, 
SwiftTxEventEndpoint.class).get();
+      return new ThriftMessageSender(endpoint, serializer);
+    } catch (InterruptedException | ExecutionException e) {
+      throw new IllegalStateException("Failed to create transaction event 
endpoint client to " + host + ":" + port, e);
+    }
+  }
+
+  @PreDestroy
+  void close() {
+    for (AutoCloseable closeable : closeables) {
+      try {
+        closeable.close();
+      } catch (Exception e) {
+        log.warn("Failed to close message sender", e);
+      }
+    }
+
+    clientManager.close();
+  }
 }
diff --git 
a/pack-contracts/pack-contract-thrift/src/main/java/io/servicecomb/saga/pack/contracts/thrift/SwiftTxEventEndpoint.java
 
b/pack-contracts/pack-contract-thrift/src/main/java/io/servicecomb/saga/pack/contracts/thrift/SwiftTxEventEndpoint.java
index beff7bc..ae1fde9 100644
--- 
a/pack-contracts/pack-contract-thrift/src/main/java/io/servicecomb/saga/pack/contracts/thrift/SwiftTxEventEndpoint.java
+++ 
b/pack-contracts/pack-contract-thrift/src/main/java/io/servicecomb/saga/pack/contracts/thrift/SwiftTxEventEndpoint.java
@@ -21,7 +21,7 @@ import com.facebook.swift.service.ThriftMethod;
 import com.facebook.swift.service.ThriftService;
 
 @ThriftService("TxEventEndpoint")
-public interface SwiftTxEventEndpoint {
+public interface SwiftTxEventEndpoint extends AutoCloseable {
 
   @ThriftMethod
   void handle(SwiftTxEvent message);

-- 
To stop receiving notification emails like this one, please contact
"[email protected]" <[email protected]>.

Reply via email to