This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch feature-client-session-0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/feature-client-session-0.13 by
this push:
new 7661544cb5 finish customized rpc thread event handler
7661544cb5 is described below
commit 7661544cb5568b22b0631d9f42a06ed340549a78
Author: xiangdong huang <[email protected]>
AuthorDate: Tue Oct 25 01:18:22 2022 +0800
finish customized rpc thread event handler
---
.../external/api/thrift/JudgableServerContext.java | 7 +-
.../external/api/thrift/ServerContextFactory.java | 6 +-
.../thrift/TThreadPoolServerWithContext.java | 155 ---------------------
.../db/service/thrift/ThriftServiceThread.java | 2 +-
.../thrift/handler/BaseServerContextHandler.java | 47 ++++---
.../handler/InfluxDBServiceThriftHandler.java | 1 -
.../thrift/handler/RPCServiceThriftHandler.java | 1 -
7 files changed, 36 insertions(+), 183 deletions(-)
diff --git
a/external-api/src/main/java/org/apache/iotdb/external/api/thrift/JudgableServerContext.java
b/external-api/src/main/java/org/apache/iotdb/external/api/thrift/JudgableServerContext.java
index 5948804a57..fea949f750 100644
---
a/external-api/src/main/java/org/apache/iotdb/external/api/thrift/JudgableServerContext.java
+++
b/external-api/src/main/java/org/apache/iotdb/external/api/thrift/JudgableServerContext.java
@@ -25,9 +25,12 @@ public interface JudgableServerContext extends ServerContext
{
/**
* this method will be called when a client connects to the IoTDB server.
*
- * @return true / false
+ * @return false if we do not allow this connection
*/
- public boolean authorised();
+ public boolean whenConnecte();
+
+ /** @return false if we do not allow this connection */
+ public boolean whenDisconnect();
@Override
default <T> T unwrap(Class<T> iface) {
diff --git
a/external-api/src/main/java/org/apache/iotdb/external/api/thrift/ServerContextFactory.java
b/external-api/src/main/java/org/apache/iotdb/external/api/thrift/ServerContextFactory.java
index b6788b376a..048a93ca8f 100644
---
a/external-api/src/main/java/org/apache/iotdb/external/api/thrift/ServerContextFactory.java
+++
b/external-api/src/main/java/org/apache/iotdb/external/api/thrift/ServerContextFactory.java
@@ -18,6 +18,10 @@
*/
package org.apache.iotdb.external.api.thrift;
+import org.apache.thrift.protocol.TProtocol;
+
+import java.net.Socket;
+
public interface ServerContextFactory {
- JudgableServerContext newServerContext();
+ JudgableServerContext newServerContext(TProtocol out, Socket socket);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/TThreadPoolServerWithContext.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/TThreadPoolServerWithContext.java
deleted file mode 100644
index e356159911..0000000000
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/TThreadPoolServerWithContext.java
+++ /dev/null
@@ -1,155 +0,0 @@
-package org.apache.iotdb.db.service.thrift;
-
-import org.apache.iotdb.external.api.thrift.JudgableServerContext;
-
-import org.apache.thrift.TException;
-import org.apache.thrift.TProcessor;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.server.ServerContext;
-import org.apache.thrift.server.TServerEventHandler;
-import org.apache.thrift.server.TThreadPoolServer;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Optional;
-import java.util.concurrent.RejectedExecutionException;
-
-public class TThreadPoolServerWithContext extends TThreadPoolServer {
- private static final Logger LOGGER =
LoggerFactory.getLogger(TThreadPoolServerWithContext.class);
-
- public TThreadPoolServerWithContext(Args args) {
- super(args);
- }
-
- @Override
- protected void execute() {
- while (!stopped_) {
- try {
- TTransport client = serverTransport_.accept();
- try {
- getExecutorService().execute(new WorkerProcess(client));
- } catch (RejectedExecutionException ree) {
- if (!stopped_) {
- LOGGER.warn(
- "ThreadPool is saturated with incoming requests. Closing
latest connection.");
- }
- client.close();
- }
- } catch (TTransportException ttx) {
- if (!stopped_) {
- LOGGER.warn("Transport error occurred during acceptance of message",
ttx);
- }
- }
- }
- }
-
- // The following codes are copied from TThreadPoolServer.WorkerProcess
- // and we add additional processing for connectionContext
-
- private class WorkerProcess implements Runnable {
-
- /** Client that this services. */
- private TTransport client_;
-
- /**
- * Default constructor.
- *
- * @param client Transport to process
- */
- private WorkerProcess(TTransport client) {
- client_ = client;
- }
-
- /** Loops on processing a client forever */
- public void run() {
- TProcessor processor = null;
- TTransport inputTransport = null;
- TTransport outputTransport = null;
- TProtocol inputProtocol = null;
- TProtocol outputProtocol = null;
-
- Optional<TServerEventHandler> eventHandler = Optional.empty();
- ServerContext connectionContext = null;
-
- try {
- processor = processorFactory_.getProcessor(client_);
- inputTransport = inputTransportFactory_.getTransport(client_);
- outputTransport = outputTransportFactory_.getTransport(client_);
- inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
- outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
-
- eventHandler = Optional.ofNullable(getEventHandler());
-
- if (eventHandler.isPresent()) {
- connectionContext = eventHandler.get().createContext(inputProtocol,
outputProtocol);
- if (connectionContext instanceof JudgableServerContext
- && !((JudgableServerContext) connectionContext).authorised()) {
- return;
- }
- }
-
- while (true) {
- if (Thread.currentThread().isInterrupted()) {
- LOGGER.debug("WorkerProcess requested to shutdown");
- break;
- }
- if (eventHandler.isPresent()) {
- eventHandler.get().processContext(connectionContext,
inputTransport, outputTransport);
- }
- // This process cannot be interrupted by Interrupting the Thread.
This
- // will return once a message has been processed or the socket
timeout
- // has elapsed, at which point it will return and check the interrupt
- // state of the thread.
- processor.process(inputProtocol, outputProtocol);
- }
- } catch (Exception x) {
- LOGGER.debug("Error processing request", x);
-
- // We'll usually receive RuntimeException types here
- // Need to unwrap to ascertain real causing exception before we choose
to ignore
- // Ignore err-logging all transport-level/type exceptions
- if (!isIgnorableException(x)) {
- // Log the exception at error level and continue
- LOGGER.error(
- (x instanceof TException ? "Thrift " : "")
- + "Error occurred during processing of message.",
- x);
- }
- } finally {
- if (eventHandler.isPresent()) {
- eventHandler.get().deleteContext(connectionContext, inputProtocol,
outputProtocol);
- }
- if (inputTransport != null) {
- inputTransport.close();
- }
- if (outputTransport != null) {
- outputTransport.close();
- }
- if (client_.isOpen()) {
- client_.close();
- }
- }
- }
-
- private boolean isIgnorableException(Exception x) {
- TTransportException tTransportException = null;
-
- if (x instanceof TTransportException) {
- tTransportException = (TTransportException) x;
- } else if (x.getCause() instanceof TTransportException) {
- tTransportException = (TTransportException) x.getCause();
- }
-
- if (tTransportException != null) {
- switch (tTransportException.getType()) {
- case TTransportException.END_OF_FILE:
- case TTransportException.TIMED_OUT:
- return true;
- }
- }
- return false;
- }
- }
-}
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java
index edf98d1774..7deb504be3 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java
@@ -154,7 +154,7 @@ public class ThriftServiceThread extends Thread {
serverTransport = openTransport(bindAddress, port);
TThreadPoolServer.Args poolArgs =
initSyncedPoolArgs(processor, threadsName, maxWorkerThreads,
timeoutSecond);
- poolServer = new TThreadPoolServerWithContext(poolArgs);
+ poolServer = new TThreadPoolServer(poolArgs);
logger.warn("注册EventHandler");
poolServer.setServerEventHandler(serverEventHandler);
} catch (TTransportException e) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/BaseServerContextHandler.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/BaseServerContextHandler.java
index 68c22a3f89..c591c6ca57 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/BaseServerContextHandler.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/BaseServerContextHandler.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.db.service.thrift.handler;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.query.control.SessionManager;
import org.apache.iotdb.db.query.control.clientsession.ClientSession;
import org.apache.iotdb.external.api.thrift.JudgableServerContext;
@@ -29,45 +28,49 @@ import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.server.ServerContext;
import org.apache.thrift.transport.TSocket;
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.net.Socket;
+import java.util.ServiceLoader;
public class BaseServerContextHandler {
- private ServerContextFactory factory = () -> (JudgableServerContext) () ->
true;
+ private static ServerContextFactory factory = null;
+ private static final Logger logger =
LoggerFactory.getLogger(BaseServerContextHandler.class);
- public BaseServerContextHandler(Logger logger) {
- String factoryClass =
- IoTDBDescriptor.getInstance()
- .getConfig()
- .getCustomizedProperties()
- .getProperty("rpc_service_thrift_handler_context_class");
- if (factoryClass != null) {
- try {
- factory = (ServerContextFactory)
Class.forName(factoryClass).newInstance();
- } catch (Exception e) {
- logger.warn(
- "configuration announced ServerContextFactory {}, but it is not
found in classpath",
- factoryClass);
- factory = null;
+ static {
+ ServiceLoader<ServerContextFactory> contextFactoryLoader =
+ ServiceLoader.load(ServerContextFactory.class);
+ for (ServerContextFactory loader : contextFactoryLoader) {
+ if (factory != null) {
+ // it means there is more than one implementation.
+ logger.warn("There are more than one ServerContextFactory
implementation. pls check.");
}
+ logger.info("Will set ServerContextFactory from {} ",
loader.getClass().getName());
+ factory = loader;
}
}
+ public BaseServerContextHandler() {}
+
public ServerContext createContext(TProtocol in, TProtocol out) {
Socket socket =
((TSocket) ((TElasticFramedTransport)
out.getTransport()).getSocket()).getSocket();
+ JudgableServerContext context = null;
getSessionManager().registerSession(new ClientSession(socket));
if (factory != null) {
- JudgableServerContext context = factory.newServerContext();
- // TODO
- return context;
+ context = factory.newServerContext(out, socket);
+ if (!context.whenConnecte()) {
+ return context;
+ }
}
-
- return null;
+ return context;
}
- public void deleteContext(ServerContext arg0, TProtocol arg1, TProtocol
arg2) {
+ public void deleteContext(ServerContext context, TProtocol in, TProtocol
out) {
getSessionManager().removeCurrSession();
+ if (context != null && factory != null) {
+ ((JudgableServerContext) context).whenDisconnect();
+ }
}
protected SessionManager getSessionManager() {
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/InfluxDBServiceThriftHandler.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/InfluxDBServiceThriftHandler.java
index 8279873480..aa3f54b5c5 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/InfluxDBServiceThriftHandler.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/InfluxDBServiceThriftHandler.java
@@ -34,7 +34,6 @@ public class InfluxDBServiceThriftHandler extends
BaseServerContextHandler
private static final Logger logger =
LoggerFactory.getLogger(InfluxDBServiceThriftHandler.class);
public InfluxDBServiceThriftHandler(InfluxDBServiceImpl influxDBServiceImpl)
{
- super(logger);
this.influxDBServiceImpl = influxDBServiceImpl;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/RPCServiceThriftHandler.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/RPCServiceThriftHandler.java
index eae4e55508..e5a3016974 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/RPCServiceThriftHandler.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/RPCServiceThriftHandler.java
@@ -35,7 +35,6 @@ public class RPCServiceThriftHandler extends
BaseServerContextHandler
private AtomicLong thriftConnectionNumber = new AtomicLong(0);
public RPCServiceThriftHandler(TSServiceImpl serviceImpl) {
- super(logger);
this.serviceImpl = serviceImpl;
MetricService.getInstance()
.addMetricSet(new
RPCServiceThriftHandlerMetrics(thriftConnectionNumber));