This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch fix_sync_thread_bug
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit b566d47de5cb536bf64a9c5ab09fb1db7f041dab
Author: lta <[email protected]>
AuthorDate: Thu Mar 28 10:11:50 2019 +0800

    Reorganized the start and close of sync thread
---
 .../db/concurrent/IoTDBThreadPoolFactory.java      |   9 ++
 .../java/org/apache/iotdb/db/service/IoTDB.java    |   8 +-
 .../org/apache/iotdb/db/service/ServiceType.java   |   4 +-
 .../iotdb/db/sync/receiver/ServerManager.java      | 112 ++++++++++++++-------
 4 files changed, 92 insertions(+), 41 deletions(-)

diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java
index f191607..4693d60 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java
@@ -128,4 +128,13 @@ public class IoTDBThreadPoolFactory {
         args.stopTimeoutUnit, executorQueue, new IoTThreadFactory(poolName, 
handler));
   }
 
+  /**
+   * function for creating sync client thread pool.
+   */
+  public static ExecutorService createSyncClientThreadPool(Args args, String 
poolName) {
+    SynchronousQueue<Runnable> executorQueue = new SynchronousQueue<>();
+    return new ThreadPoolExecutor(args.minWorkerThreads, 
args.maxWorkerThreads, args.stopTimeoutVal,
+        args.stopTimeoutUnit, executorQueue, new IoTThreadFactory(poolName));
+  }
+
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/IoTDB.java 
b/iotdb/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index fead066..0f5c18d 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -33,8 +33,8 @@ import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.exception.builder.ExceptionBuilder;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.monitor.StatMonitor;
-import org.apache.iotdb.db.sync.receiver.ServerManager;
 import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.db.sync.receiver.ServerManager;
 import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
 import org.apache.iotdb.db.writelog.manager.WriteLogNodeManager;
 import org.slf4j.Logger;
@@ -46,7 +46,6 @@ public class IoTDB implements IoTDBMBean {
   private final String mbeanName = String.format("%s:%s=%s", 
IoTDBConstant.IOTDB_PACKAGE,
       IoTDBConstant.JMX_TYPE, "IoTDB");
   private RegisterManager registerManager = new RegisterManager();
-  private ServerManager serverManager = ServerManager.getInstance();
 
   public static final IoTDB getInstance() {
     return IoTDBHolder.INSTANCE;
@@ -107,18 +106,17 @@ public class IoTDB implements IoTDBMBean {
     registerManager.register(StatMonitor.getInstance());
     registerManager.register(BasicMemController.getInstance());
     registerManager.register(FileReaderManager.getInstance());
+    registerManager.register(ServerManager.getInstance());
 
     JMXService.registerMBean(getInstance(), mbeanName);
 
     initErrorInformation();
 
-    serverManager.startServer();
     LOGGER.info("IoTDB is set up.");
   }
 
   public void deactivate() {
     LOGGER.info("Deactivating IoTDB...");
-    serverManager.closeServer();
     registerManager.deregisterAll();
     JMXService.deregisterMBean(mbeanName);
     LOGGER.info("IoTDB is deactivated.");
@@ -141,7 +139,7 @@ public class IoTDB implements IoTDBMBean {
    * Recover data using system log.
    *
    * @throws RecoverException if FileNode(Manager)Exception is encountered 
during the recovery.
-   * @throws IOException      if IOException is encountered during the 
recovery.
+   * @throws IOException if IOException is encountered during the recovery.
    */
   private void systemDataRecovery() throws RecoverException {
     LOGGER.info("{}: start checking write log...", 
IoTDBConstant.GLOBAL_DB_NAME);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/ServiceType.java 
b/iotdb/src/main/java/org/apache/iotdb/db/service/ServiceType.java
index e6c58ea..4b396f2 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/ServiceType.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/ServiceType.java
@@ -29,7 +29,9 @@ public enum ServiceType {
   CLOSE_MERGE_SERVICE("Close&Merge ServerService", ""),
   JVM_MEM_CONTROL_SERVICE("Memory Controller", ""),
   AUTHORIZATION_SERVICE("Authorization ServerService", ""),
-  FILE_READER_MANAGER_SERVICE("File reader manager ServerService", "");
+  FILE_READER_MANAGER_SERVICE("File reader manager ServerService", ""),
+  SYNC_SERVICE("SYNC ServerService", "");
+
   private String name;
   private String jmxName;
 
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/ServerManager.java 
b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/ServerManager.java
index 94ef1f7..00651e6 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/ServerManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/ServerManager.java
@@ -18,12 +18,16 @@
  */
 package org.apache.iotdb.db.sync.receiver;
 
+import java.net.InetSocketAddress;
 import org.apache.iotdb.db.concurrent.ThreadName;
 import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.service.IService;
+import org.apache.iotdb.db.service.ServiceType;
 import org.apache.iotdb.service.sync.thrift.SyncService;
-import org.apache.thrift.TProcessor;
+import org.apache.iotdb.service.sync.thrift.SyncService.Processor;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TBinaryProtocol.Factory;
 import org.apache.thrift.server.TServer;
@@ -36,11 +40,10 @@ import org.slf4j.LoggerFactory;
 /**
  * sync receiver server.
  */
-public class ServerManager {
+public class ServerManager implements IService {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ServerManager.class);
-  private TServerSocket serverTransport;
-  private TServer poolServer;
+  private Thread syncServerThread;
   private IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
 
   private ServerManager() {
@@ -51,51 +54,90 @@ public class ServerManager {
   }
 
   /**
-   * start sync receiver's server.
+   * Start sync receiver's server.
    */
-  public void startServer() throws StartupException {
-    Factory protocolFactory;
-    TProcessor processor;
-    TThreadPoolServer.Args poolArgs;
+  @Override
+  public void start() throws StartupException {
     if (!conf.isSyncEnable()) {
       return;
     }
-    try {
-      if (conf.getIpWhiteList() == null) {
-        LOGGER.error(
-            "Sync server failed to start because IP white list is null, please 
set IP white list.");
-        return;
-      }
-      conf.setIpWhiteList(conf.getIpWhiteList().replaceAll(" ", ""));
-      serverTransport = new TServerSocket(conf.getSyncServerPort());
-      protocolFactory = new TBinaryProtocol.Factory();
-      processor = new SyncService.Processor<>(new ServerServiceImpl());
-      poolArgs = new TThreadPoolServer.Args(serverTransport);
-      poolArgs.processor(processor);
-      poolArgs.protocolFactory(protocolFactory);
-      poolServer = new TThreadPoolServer(poolArgs);
-      LOGGER.info("Sync server has started.");
-      Runnable syncServerRunnable = () -> poolServer.serve();
-      Thread syncServerThread = new Thread(syncServerRunnable, 
ThreadName.SYNC_SERVER.getName());
-      syncServerThread.start();
-    } catch (TTransportException e) {
-      throw new StartupException("Cannot start sync server.", e);
+    if (conf.getIpWhiteList() == null) {
+      LOGGER.error(
+          "Sync server failed to start because IP white list is null, please 
set IP white list.");
+      return;
     }
+    conf.setIpWhiteList(conf.getIpWhiteList().replaceAll(" ", ""));
+    syncServerThread = new SyncServiceThread();
+    syncServerThread.setName(ThreadName.SYNC_SERVER.getName());
+    syncServerThread.start();
+    LOGGER.info("Sync server has started.");
   }
 
   /**
-   * close sync receiver's server.
+   * Close sync receiver's server.
    */
-  public void closeServer() {
-    if (conf.isSyncEnable() && poolServer != null) {
-      poolServer.stop();
-      serverTransport.close();
-      LOGGER.info("Stop sync server.");
+  @Override
+  public void stop() {
+    if (conf.isSyncEnable()) {
+      ((SyncServiceThread) syncServerThread).close();
     }
   }
 
+  @Override
+  public ServiceType getID() {
+    return ServiceType.SYNC_SERVICE;
+  }
+
   private static class ServerManagerHolder {
 
     private static final ServerManager INSTANCE = new ServerManager();
   }
+
+  private class SyncServiceThread extends Thread {
+
+    private TServerSocket serverTransport;
+    private TServer poolServer;
+    private Factory protocolFactory;
+    private Processor<SyncService.Iface> processor;
+    private TThreadPoolServer.Args poolArgs;
+
+    public SyncServiceThread() {
+      processor = new SyncService.Processor<>(new ServerServiceImpl());
+    }
+
+    @Override
+    public void run() {
+      try {
+        serverTransport = new TServerSocket(
+            new InetSocketAddress(conf.getRpcAddress(), 
conf.getSyncServerPort()));
+        protocolFactory = new TBinaryProtocol.Factory();
+        processor = new SyncService.Processor<>(new ServerServiceImpl());
+        poolArgs = new TThreadPoolServer.Args(serverTransport);
+        poolArgs.processor(processor);
+        poolArgs.protocolFactory(protocolFactory);
+        poolServer = new TThreadPoolServer(poolArgs);
+        poolServer.serve();
+      } catch (TTransportException e) {
+        LOGGER.error("{}: failed to start {}, because ", 
IoTDBConstant.GLOBAL_DB_NAME,
+            getID().getName(), e);
+      } catch (Exception e) {
+        LOGGER.error("{}: {} exit, because ", IoTDBConstant.GLOBAL_DB_NAME, 
getID().getName(), e);
+      } finally {
+        close();
+        LOGGER.info("{}: close TThreadPoolServer and TServerSocket for {}",
+            IoTDBConstant.GLOBAL_DB_NAME, getID().getName());
+      }
+    }
+
+    private synchronized void close() {
+      if (poolServer != null) {
+        poolServer.stop();
+        poolServer = null;
+      }
+      if (serverTransport != null) {
+        serverTransport.close();
+        serverTransport = null;
+      }
+    }
+  }
 }
\ No newline at end of file

Reply via email to