This is an automated email from the ASF dual-hosted git repository.
seanyinx pushed a commit to branch SCB-97_alpha_omega_bonding
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
The following commit(s) were added to refs/heads/SCB-97_alpha_omega_bonding by
this push:
new 12c437d SCB-97 closed thrift connections on shutdown
12c437d is described below
commit 12c437d844a29b78cc1dc1b1657350bfa56147e9
Author: seanyinx <[email protected]>
AuthorDate: Tue Dec 26 20:04:09 2017 +0800
SCB-97 closed thrift connections on shutdown
Signed-off-by: seanyinx <[email protected]>
---
.../alpha/server/SwiftTxEventEndpointImpl.java | 5 +++
.../connector/thrift/ThriftMessageSender.java | 27 ++++---------
.../connector/thrift/ThriftMessageSenderTest.java | 12 +++++-
.../saga/omega/spring/OmegaSpringConfig.java | 46 +++++++++++++++++++++-
.../contracts/thrift/SwiftTxEventEndpoint.java | 2 +-
5 files changed, 69 insertions(+), 23 deletions(-)
diff --git
a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java
b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java
index 3ae39f6..78b93b4 100644
---
a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java
+++
b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java
@@ -43,4 +43,9 @@ class SwiftTxEventEndpointImpl implements
SwiftTxEventEndpoint {
message.payloads()
));
}
+
+ @Override
+ public void close() throws Exception {
+
+ }
}
diff --git
a/omega/omega-connector/omega-connector-thrift/src/main/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSender.java
b/omega/omega-connector/omega-connector-thrift/src/main/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSender.java
index bbf9bfa..abdbec5 100644
---
a/omega/omega-connector/omega-connector-thrift/src/main/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSender.java
+++
b/omega/omega-connector/omega-connector-thrift/src/main/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSender.java
@@ -17,35 +17,17 @@
package io.servicecomb.saga.omega.connector.thrift;
-import static com.google.common.net.HostAndPort.fromParts;
-
-import java.util.concurrent.ExecutionException;
-
-import com.facebook.nifty.client.FramedClientConnector;
-import com.facebook.swift.service.ThriftClientManager;
-
import io.servicecomb.saga.omega.transaction.MessageSender;
import io.servicecomb.saga.omega.transaction.MessageSerializer;
import io.servicecomb.saga.omega.transaction.TxEvent;
import io.servicecomb.saga.pack.contracts.thrift.SwiftTxEvent;
import io.servicecomb.saga.pack.contracts.thrift.SwiftTxEventEndpoint;
-public class ThriftMessageSender implements MessageSender {
- private static final ThriftClientManager clientManager = new
ThriftClientManager();
+public class ThriftMessageSender implements MessageSender, AutoCloseable {
private final SwiftTxEventEndpoint eventService;
private final MessageSerializer serializer;
- public static ThriftMessageSender create(String host, int port,
MessageSerializer serializer) {
- FramedClientConnector connector = new
FramedClientConnector(fromParts(host, port));
- try {
- SwiftTxEventEndpoint endpoint = clientManager.createClient(connector,
SwiftTxEventEndpoint.class).get();
- return new ThriftMessageSender(endpoint, serializer);
- } catch (InterruptedException | ExecutionException e) {
- throw new IllegalStateException("Failed to create transaction event
endpoint client to " + host + ":" + port, e);
- }
- }
-
- ThriftMessageSender(SwiftTxEventEndpoint eventService, MessageSerializer
serializer) {
+ public ThriftMessageSender(SwiftTxEventEndpoint eventService,
MessageSerializer serializer) {
this.eventService = eventService;
this.serializer = serializer;
}
@@ -61,4 +43,9 @@ public class ThriftMessageSender implements MessageSender {
serializer.serialize(event)
));
}
+
+ @Override
+ public void close() throws Exception {
+ eventService.close();
+ }
}
diff --git
a/omega/omega-connector/omega-connector-thrift/src/test/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSenderTest.java
b/omega/omega-connector/omega-connector-thrift/src/test/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSenderTest.java
index 3e1f833..9d0ea97 100644
---
a/omega/omega-connector/omega-connector-thrift/src/test/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSenderTest.java
+++
b/omega/omega-connector/omega-connector-thrift/src/test/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSenderTest.java
@@ -52,7 +52,17 @@ public class ThriftMessageSenderTest {
}
};
- private final SwiftTxEventEndpoint eventService = (event) -> swiftTxEvent =
event;
+ private final SwiftTxEventEndpoint eventService = new SwiftTxEventEndpoint()
{
+ @Override
+ public void handle(SwiftTxEvent message) {
+ swiftTxEvent = message;
+ }
+
+ @Override
+ public void close() throws Exception {
+ }
+ };
+
private final ThriftMessageSender messageSender = new
ThriftMessageSender(eventService, serializer);
@Test
diff --git
a/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java
b/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java
index 33a0cbc..5f62884 100644
---
a/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java
+++
b/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java
@@ -17,8 +17,15 @@
package io.servicecomb.saga.omega.spring;
+import static com.google.common.net.HostAndPort.fromParts;
+
import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -26,16 +33,23 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import com.facebook.nifty.client.FramedClientConnector;
+import com.facebook.swift.service.ThriftClientManager;
+
import io.servicecomb.saga.omega.connector.thrift.ThriftMessageSender;
import io.servicecomb.saga.omega.context.IdGenerator;
import io.servicecomb.saga.omega.context.OmegaContext;
import io.servicecomb.saga.omega.context.UniqueIdGenerator;
import io.servicecomb.saga.omega.format.NativeMessageFormat;
import io.servicecomb.saga.omega.transaction.MessageSender;
+import io.servicecomb.saga.omega.transaction.MessageSerializer;
+import io.servicecomb.saga.pack.contracts.thrift.SwiftTxEventEndpoint;
@Configuration
class OmegaSpringConfig {
private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final ThriftClientManager clientManager = new ThriftClientManager();
+ private final List<AutoCloseable> closeables = new ArrayList<>();
@Bean
IdGenerator<String> idGenerator() {
@@ -53,7 +67,9 @@ class OmegaSpringConfig {
for (String address : addresses) {
try {
String[] pair = address.split(":");
- return ThriftMessageSender.create(pair[0], Integer.parseInt(pair[1]),
new NativeMessageFormat());
+ ThriftMessageSender sender = createMessageSender(clientManager,
pair[0], Integer.parseInt(pair[1]), new NativeMessageFormat());
+ closeables.add(sender);
+ return sender;
} catch (Exception e) {
log.error("Unable to connect to alpha at {}", address, e);
}
@@ -62,4 +78,32 @@ class OmegaSpringConfig {
throw new IllegalArgumentException(
"None of the alpha cluster is reachable: " +
Arrays.toString(addresses));
}
+
+ private ThriftMessageSender createMessageSender(ThriftClientManager
clientManager,
+ String host,
+ int port,
+ MessageSerializer serializer) {
+
+ FramedClientConnector connector = new
FramedClientConnector(fromParts(host, port));
+
+ try {
+ SwiftTxEventEndpoint endpoint = clientManager.createClient(connector,
SwiftTxEventEndpoint.class).get();
+ return new ThriftMessageSender(endpoint, serializer);
+ } catch (InterruptedException | ExecutionException e) {
+ throw new IllegalStateException("Failed to create transaction event
endpoint client to " + host + ":" + port, e);
+ }
+ }
+
+ @PreDestroy
+ void close() {
+ for (AutoCloseable closeable : closeables) {
+ try {
+ closeable.close();
+ } catch (Exception e) {
+ log.warn("Failed to close message sender", e);
+ }
+ }
+
+ clientManager.close();
+ }
}
diff --git
a/pack-contracts/pack-contract-thrift/src/main/java/io/servicecomb/saga/pack/contracts/thrift/SwiftTxEventEndpoint.java
b/pack-contracts/pack-contract-thrift/src/main/java/io/servicecomb/saga/pack/contracts/thrift/SwiftTxEventEndpoint.java
index beff7bc..ae1fde9 100644
---
a/pack-contracts/pack-contract-thrift/src/main/java/io/servicecomb/saga/pack/contracts/thrift/SwiftTxEventEndpoint.java
+++
b/pack-contracts/pack-contract-thrift/src/main/java/io/servicecomb/saga/pack/contracts/thrift/SwiftTxEventEndpoint.java
@@ -21,7 +21,7 @@ import com.facebook.swift.service.ThriftMethod;
import com.facebook.swift.service.ThriftService;
@ThriftService("TxEventEndpoint")
-public interface SwiftTxEventEndpoint {
+public interface SwiftTxEventEndpoint extends AutoCloseable {
@ThriftMethod
void handle(SwiftTxEvent message);
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].