This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch feature-client-session-0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/feature-client-session-0.13 by 
this push:
     new 7661544cb5 finish customized rpc thread event handler
7661544cb5 is described below

commit 7661544cb5568b22b0631d9f42a06ed340549a78
Author: xiangdong huang <[email protected]>
AuthorDate: Tue Oct 25 01:18:22 2022 +0800

    finish customized rpc thread event handler
---
 .../external/api/thrift/JudgableServerContext.java |   7 +-
 .../external/api/thrift/ServerContextFactory.java  |   6 +-
 .../thrift/TThreadPoolServerWithContext.java       | 155 ---------------------
 .../db/service/thrift/ThriftServiceThread.java     |   2 +-
 .../thrift/handler/BaseServerContextHandler.java   |  47 ++++---
 .../handler/InfluxDBServiceThriftHandler.java      |   1 -
 .../thrift/handler/RPCServiceThriftHandler.java    |   1 -
 7 files changed, 36 insertions(+), 183 deletions(-)

diff --git 
a/external-api/src/main/java/org/apache/iotdb/external/api/thrift/JudgableServerContext.java
 
b/external-api/src/main/java/org/apache/iotdb/external/api/thrift/JudgableServerContext.java
index 5948804a57..fea949f750 100644
--- 
a/external-api/src/main/java/org/apache/iotdb/external/api/thrift/JudgableServerContext.java
+++ 
b/external-api/src/main/java/org/apache/iotdb/external/api/thrift/JudgableServerContext.java
@@ -25,9 +25,12 @@ public interface JudgableServerContext extends ServerContext 
{
   /**
    * this method will be called when a client connects to the IoTDB server.
    *
-   * @return true / false
+   * @return false if we do not allow this connection
    */
-  public boolean authorised();
+  public boolean whenConnecte();
+
+  /** @return false if we do not allow this connection */
+  public boolean whenDisconnect();
 
   @Override
   default <T> T unwrap(Class<T> iface) {
diff --git 
a/external-api/src/main/java/org/apache/iotdb/external/api/thrift/ServerContextFactory.java
 
b/external-api/src/main/java/org/apache/iotdb/external/api/thrift/ServerContextFactory.java
index b6788b376a..048a93ca8f 100644
--- 
a/external-api/src/main/java/org/apache/iotdb/external/api/thrift/ServerContextFactory.java
+++ 
b/external-api/src/main/java/org/apache/iotdb/external/api/thrift/ServerContextFactory.java
@@ -18,6 +18,10 @@
  */
 package org.apache.iotdb.external.api.thrift;
 
+import org.apache.thrift.protocol.TProtocol;
+
+import java.net.Socket;
+
 public interface ServerContextFactory {
-  JudgableServerContext newServerContext();
+  JudgableServerContext newServerContext(TProtocol out, Socket socket);
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/TThreadPoolServerWithContext.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/TThreadPoolServerWithContext.java
deleted file mode 100644
index e356159911..0000000000
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/TThreadPoolServerWithContext.java
+++ /dev/null
@@ -1,155 +0,0 @@
-package org.apache.iotdb.db.service.thrift;
-
-import org.apache.iotdb.external.api.thrift.JudgableServerContext;
-
-import org.apache.thrift.TException;
-import org.apache.thrift.TProcessor;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.server.ServerContext;
-import org.apache.thrift.server.TServerEventHandler;
-import org.apache.thrift.server.TThreadPoolServer;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Optional;
-import java.util.concurrent.RejectedExecutionException;
-
-public class TThreadPoolServerWithContext extends TThreadPoolServer {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(TThreadPoolServerWithContext.class);
-
-  public TThreadPoolServerWithContext(Args args) {
-    super(args);
-  }
-
-  @Override
-  protected void execute() {
-    while (!stopped_) {
-      try {
-        TTransport client = serverTransport_.accept();
-        try {
-          getExecutorService().execute(new WorkerProcess(client));
-        } catch (RejectedExecutionException ree) {
-          if (!stopped_) {
-            LOGGER.warn(
-                "ThreadPool is saturated with incoming requests. Closing 
latest connection.");
-          }
-          client.close();
-        }
-      } catch (TTransportException ttx) {
-        if (!stopped_) {
-          LOGGER.warn("Transport error occurred during acceptance of message", 
ttx);
-        }
-      }
-    }
-  }
-
-  // The following codes are copied from TThreadPoolServer.WorkerProcess
-  // and we add additional processing for connectionContext
-
-  private class WorkerProcess implements Runnable {
-
-    /** Client that this services. */
-    private TTransport client_;
-
-    /**
-     * Default constructor.
-     *
-     * @param client Transport to process
-     */
-    private WorkerProcess(TTransport client) {
-      client_ = client;
-    }
-
-    /** Loops on processing a client forever */
-    public void run() {
-      TProcessor processor = null;
-      TTransport inputTransport = null;
-      TTransport outputTransport = null;
-      TProtocol inputProtocol = null;
-      TProtocol outputProtocol = null;
-
-      Optional<TServerEventHandler> eventHandler = Optional.empty();
-      ServerContext connectionContext = null;
-
-      try {
-        processor = processorFactory_.getProcessor(client_);
-        inputTransport = inputTransportFactory_.getTransport(client_);
-        outputTransport = outputTransportFactory_.getTransport(client_);
-        inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
-        outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
-
-        eventHandler = Optional.ofNullable(getEventHandler());
-
-        if (eventHandler.isPresent()) {
-          connectionContext = eventHandler.get().createContext(inputProtocol, 
outputProtocol);
-          if (connectionContext instanceof JudgableServerContext
-              && !((JudgableServerContext) connectionContext).authorised()) {
-            return;
-          }
-        }
-
-        while (true) {
-          if (Thread.currentThread().isInterrupted()) {
-            LOGGER.debug("WorkerProcess requested to shutdown");
-            break;
-          }
-          if (eventHandler.isPresent()) {
-            eventHandler.get().processContext(connectionContext, 
inputTransport, outputTransport);
-          }
-          // This process cannot be interrupted by Interrupting the Thread. 
This
-          // will return once a message has been processed or the socket 
timeout
-          // has elapsed, at which point it will return and check the interrupt
-          // state of the thread.
-          processor.process(inputProtocol, outputProtocol);
-        }
-      } catch (Exception x) {
-        LOGGER.debug("Error processing request", x);
-
-        // We'll usually receive RuntimeException types here
-        // Need to unwrap to ascertain real causing exception before we choose 
to ignore
-        // Ignore err-logging all transport-level/type exceptions
-        if (!isIgnorableException(x)) {
-          // Log the exception at error level and continue
-          LOGGER.error(
-              (x instanceof TException ? "Thrift " : "")
-                  + "Error occurred during processing of message.",
-              x);
-        }
-      } finally {
-        if (eventHandler.isPresent()) {
-          eventHandler.get().deleteContext(connectionContext, inputProtocol, 
outputProtocol);
-        }
-        if (inputTransport != null) {
-          inputTransport.close();
-        }
-        if (outputTransport != null) {
-          outputTransport.close();
-        }
-        if (client_.isOpen()) {
-          client_.close();
-        }
-      }
-    }
-
-    private boolean isIgnorableException(Exception x) {
-      TTransportException tTransportException = null;
-
-      if (x instanceof TTransportException) {
-        tTransportException = (TTransportException) x;
-      } else if (x.getCause() instanceof TTransportException) {
-        tTransportException = (TTransportException) x.getCause();
-      }
-
-      if (tTransportException != null) {
-        switch (tTransportException.getType()) {
-          case TTransportException.END_OF_FILE:
-          case TTransportException.TIMED_OUT:
-            return true;
-        }
-      }
-      return false;
-    }
-  }
-}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java
index edf98d1774..7deb504be3 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java
@@ -154,7 +154,7 @@ public class ThriftServiceThread extends Thread {
       serverTransport = openTransport(bindAddress, port);
       TThreadPoolServer.Args poolArgs =
           initSyncedPoolArgs(processor, threadsName, maxWorkerThreads, 
timeoutSecond);
-      poolServer = new TThreadPoolServerWithContext(poolArgs);
+      poolServer = new TThreadPoolServer(poolArgs);
       logger.warn("注册EventHandler");
       poolServer.setServerEventHandler(serverEventHandler);
     } catch (TTransportException e) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/BaseServerContextHandler.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/BaseServerContextHandler.java
index 68c22a3f89..c591c6ca57 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/BaseServerContextHandler.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/BaseServerContextHandler.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iotdb.db.service.thrift.handler;
 
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.query.control.SessionManager;
 import org.apache.iotdb.db.query.control.clientsession.ClientSession;
 import org.apache.iotdb.external.api.thrift.JudgableServerContext;
@@ -29,45 +28,49 @@ import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.server.ServerContext;
 import org.apache.thrift.transport.TSocket;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.net.Socket;
+import java.util.ServiceLoader;
 
 public class BaseServerContextHandler {
-  private ServerContextFactory factory = () -> (JudgableServerContext) () -> 
true;
+  private static ServerContextFactory factory = null;
+  private static final Logger logger = 
LoggerFactory.getLogger(BaseServerContextHandler.class);
 
-  public BaseServerContextHandler(Logger logger) {
-    String factoryClass =
-        IoTDBDescriptor.getInstance()
-            .getConfig()
-            .getCustomizedProperties()
-            .getProperty("rpc_service_thrift_handler_context_class");
-    if (factoryClass != null) {
-      try {
-        factory = (ServerContextFactory) 
Class.forName(factoryClass).newInstance();
-      } catch (Exception e) {
-        logger.warn(
-            "configuration announced ServerContextFactory {}, but it is not 
found in classpath",
-            factoryClass);
-        factory = null;
+  static {
+    ServiceLoader<ServerContextFactory> contextFactoryLoader =
+        ServiceLoader.load(ServerContextFactory.class);
+    for (ServerContextFactory loader : contextFactoryLoader) {
+      if (factory != null) {
+        // it means there is more than one implementation.
+        logger.warn("There are more than one ServerContextFactory 
implementation. pls check.");
       }
+      logger.info("Will set ServerContextFactory from {} ", 
loader.getClass().getName());
+      factory = loader;
     }
   }
 
+  public BaseServerContextHandler() {}
+
   public ServerContext createContext(TProtocol in, TProtocol out) {
     Socket socket =
         ((TSocket) ((TElasticFramedTransport) 
out.getTransport()).getSocket()).getSocket();
+    JudgableServerContext context = null;
     getSessionManager().registerSession(new ClientSession(socket));
     if (factory != null) {
-      JudgableServerContext context = factory.newServerContext();
-      // TODO
-      return context;
+      context = factory.newServerContext(out, socket);
+      if (!context.whenConnecte()) {
+        return context;
+      }
     }
-
-    return null;
+    return context;
   }
 
-  public void deleteContext(ServerContext arg0, TProtocol arg1, TProtocol 
arg2) {
+  public void deleteContext(ServerContext context, TProtocol in, TProtocol 
out) {
     getSessionManager().removeCurrSession();
+    if (context != null && factory != null) {
+      ((JudgableServerContext) context).whenDisconnect();
+    }
   }
 
   protected SessionManager getSessionManager() {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/InfluxDBServiceThriftHandler.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/InfluxDBServiceThriftHandler.java
index 8279873480..aa3f54b5c5 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/InfluxDBServiceThriftHandler.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/InfluxDBServiceThriftHandler.java
@@ -34,7 +34,6 @@ public class InfluxDBServiceThriftHandler extends 
BaseServerContextHandler
   private static final Logger logger = 
LoggerFactory.getLogger(InfluxDBServiceThriftHandler.class);
 
   public InfluxDBServiceThriftHandler(InfluxDBServiceImpl influxDBServiceImpl) 
{
-    super(logger);
     this.influxDBServiceImpl = influxDBServiceImpl;
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/RPCServiceThriftHandler.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/RPCServiceThriftHandler.java
index eae4e55508..e5a3016974 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/RPCServiceThriftHandler.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/RPCServiceThriftHandler.java
@@ -35,7 +35,6 @@ public class RPCServiceThriftHandler extends 
BaseServerContextHandler
   private AtomicLong thriftConnectionNumber = new AtomicLong(0);
 
   public RPCServiceThriftHandler(TSServiceImpl serviceImpl) {
-    super(logger);
     this.serviceImpl = serviceImpl;
     MetricService.getInstance()
         .addMetricSet(new 
RPCServiceThriftHandlerMetrics(thriftConnectionNumber));

Reply via email to