This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch fix_sync_thread_bug in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit b566d47de5cb536bf64a9c5ab09fb1db7f041dab Author: lta <[email protected]> AuthorDate: Thu Mar 28 10:11:50 2019 +0800 Reorganized the start and close of sync thread --- .../db/concurrent/IoTDBThreadPoolFactory.java | 9 ++ .../java/org/apache/iotdb/db/service/IoTDB.java | 8 +- .../org/apache/iotdb/db/service/ServiceType.java | 4 +- .../iotdb/db/sync/receiver/ServerManager.java | 112 ++++++++++++++------- 4 files changed, 92 insertions(+), 41 deletions(-) diff --git a/iotdb/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java b/iotdb/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java index f191607..4693d60 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java @@ -128,4 +128,13 @@ public class IoTDBThreadPoolFactory { args.stopTimeoutUnit, executorQueue, new IoTThreadFactory(poolName, handler)); } + /** + * function for creating sync client thread pool. + */ + public static ExecutorService createSyncClientThreadPool(Args args, String poolName) { + SynchronousQueue<Runnable> executorQueue = new SynchronousQueue<>(); + return new ThreadPoolExecutor(args.minWorkerThreads, args.maxWorkerThreads, args.stopTimeoutVal, + args.stopTimeoutUnit, executorQueue, new IoTThreadFactory(poolName)); + } + } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/iotdb/src/main/java/org/apache/iotdb/db/service/IoTDB.java index fead066..0f5c18d 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/service/IoTDB.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/service/IoTDB.java @@ -33,8 +33,8 @@ import org.apache.iotdb.db.exception.StartupException; import org.apache.iotdb.db.exception.builder.ExceptionBuilder; import org.apache.iotdb.db.metadata.MManager; import org.apache.iotdb.db.monitor.StatMonitor; -import org.apache.iotdb.db.sync.receiver.ServerManager; import org.apache.iotdb.db.query.control.FileReaderManager; +import org.apache.iotdb.db.sync.receiver.ServerManager; import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager; import org.apache.iotdb.db.writelog.manager.WriteLogNodeManager; import org.slf4j.Logger; @@ -46,7 +46,6 @@ public class IoTDB implements IoTDBMBean { private final String mbeanName = String.format("%s:%s=%s", IoTDBConstant.IOTDB_PACKAGE, IoTDBConstant.JMX_TYPE, "IoTDB"); private RegisterManager registerManager = new RegisterManager(); - private ServerManager serverManager = ServerManager.getInstance(); public static final IoTDB getInstance() { return IoTDBHolder.INSTANCE; @@ -107,18 +106,17 @@ public class IoTDB implements IoTDBMBean { registerManager.register(StatMonitor.getInstance()); registerManager.register(BasicMemController.getInstance()); registerManager.register(FileReaderManager.getInstance()); + registerManager.register(ServerManager.getInstance()); JMXService.registerMBean(getInstance(), mbeanName); initErrorInformation(); - serverManager.startServer(); LOGGER.info("IoTDB is set up."); } public void deactivate() { LOGGER.info("Deactivating IoTDB..."); - serverManager.closeServer(); registerManager.deregisterAll(); JMXService.deregisterMBean(mbeanName); LOGGER.info("IoTDB is deactivated."); @@ -141,7 +139,7 @@ public class IoTDB implements IoTDBMBean { * Recover data using system log. * * @throws RecoverException if FileNode(Manager)Exception is encountered during the recovery. - * @throws IOException if IOException is encountered during the recovery. + * @throws IOException if IOException is encountered during the recovery. */ private void systemDataRecovery() throws RecoverException { LOGGER.info("{}: start checking write log...", IoTDBConstant.GLOBAL_DB_NAME); diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/ServiceType.java b/iotdb/src/main/java/org/apache/iotdb/db/service/ServiceType.java index e6c58ea..4b396f2 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/service/ServiceType.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/service/ServiceType.java @@ -29,7 +29,9 @@ public enum ServiceType { CLOSE_MERGE_SERVICE("Close&Merge ServerService", ""), JVM_MEM_CONTROL_SERVICE("Memory Controller", ""), AUTHORIZATION_SERVICE("Authorization ServerService", ""), - FILE_READER_MANAGER_SERVICE("File reader manager ServerService", ""); + FILE_READER_MANAGER_SERVICE("File reader manager ServerService", ""), + SYNC_SERVICE("SYNC ServerService", ""); + private String name; private String jmxName; diff --git a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/ServerManager.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/ServerManager.java index 94ef1f7..00651e6 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/ServerManager.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/ServerManager.java @@ -18,12 +18,16 @@ */ package org.apache.iotdb.db.sync.receiver; +import java.net.InetSocketAddress; import org.apache.iotdb.db.concurrent.ThreadName; import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.StartupException; +import org.apache.iotdb.db.service.IService; +import org.apache.iotdb.db.service.ServiceType; import org.apache.iotdb.service.sync.thrift.SyncService; -import org.apache.thrift.TProcessor; +import org.apache.iotdb.service.sync.thrift.SyncService.Processor; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TBinaryProtocol.Factory; import org.apache.thrift.server.TServer; @@ -36,11 +40,10 @@ import org.slf4j.LoggerFactory; /** * sync receiver server. */ -public class ServerManager { +public class ServerManager implements IService { private static final Logger LOGGER = LoggerFactory.getLogger(ServerManager.class); - private TServerSocket serverTransport; - private TServer poolServer; + private Thread syncServerThread; private IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig(); private ServerManager() { @@ -51,51 +54,90 @@ public class ServerManager { } /** - * start sync receiver's server. + * Start sync receiver's server. */ - public void startServer() throws StartupException { - Factory protocolFactory; - TProcessor processor; - TThreadPoolServer.Args poolArgs; + @Override + public void start() throws StartupException { if (!conf.isSyncEnable()) { return; } - try { - if (conf.getIpWhiteList() == null) { - LOGGER.error( - "Sync server failed to start because IP white list is null, please set IP white list."); - return; - } - conf.setIpWhiteList(conf.getIpWhiteList().replaceAll(" ", "")); - serverTransport = new TServerSocket(conf.getSyncServerPort()); - protocolFactory = new TBinaryProtocol.Factory(); - processor = new SyncService.Processor<>(new ServerServiceImpl()); - poolArgs = new TThreadPoolServer.Args(serverTransport); - poolArgs.processor(processor); - poolArgs.protocolFactory(protocolFactory); - poolServer = new TThreadPoolServer(poolArgs); - LOGGER.info("Sync server has started."); - Runnable syncServerRunnable = () -> poolServer.serve(); - Thread syncServerThread = new Thread(syncServerRunnable, ThreadName.SYNC_SERVER.getName()); - syncServerThread.start(); - } catch (TTransportException e) { - throw new StartupException("Cannot start sync server.", e); + if (conf.getIpWhiteList() == null) { + LOGGER.error( + "Sync server failed to start because IP white list is null, please set IP white list."); + return; } + conf.setIpWhiteList(conf.getIpWhiteList().replaceAll(" ", "")); + syncServerThread = new SyncServiceThread(); + syncServerThread.setName(ThreadName.SYNC_SERVER.getName()); + syncServerThread.start(); + LOGGER.info("Sync server has started."); } /** - * close sync receiver's server. + * Close sync receiver's server. */ - public void closeServer() { - if (conf.isSyncEnable() && poolServer != null) { - poolServer.stop(); - serverTransport.close(); - LOGGER.info("Stop sync server."); + @Override + public void stop() { + if (conf.isSyncEnable()) { + ((SyncServiceThread) syncServerThread).close(); } } + @Override + public ServiceType getID() { + return ServiceType.SYNC_SERVICE; + } + private static class ServerManagerHolder { private static final ServerManager INSTANCE = new ServerManager(); } + + private class SyncServiceThread extends Thread { + + private TServerSocket serverTransport; + private TServer poolServer; + private Factory protocolFactory; + private Processor<SyncService.Iface> processor; + private TThreadPoolServer.Args poolArgs; + + public SyncServiceThread() { + processor = new SyncService.Processor<>(new ServerServiceImpl()); + } + + @Override + public void run() { + try { + serverTransport = new TServerSocket( + new InetSocketAddress(conf.getRpcAddress(), conf.getSyncServerPort())); + protocolFactory = new TBinaryProtocol.Factory(); + processor = new SyncService.Processor<>(new ServerServiceImpl()); + poolArgs = new TThreadPoolServer.Args(serverTransport); + poolArgs.processor(processor); + poolArgs.protocolFactory(protocolFactory); + poolServer = new TThreadPoolServer(poolArgs); + poolServer.serve(); + } catch (TTransportException e) { + LOGGER.error("{}: failed to start {}, because ", IoTDBConstant.GLOBAL_DB_NAME, + getID().getName(), e); + } catch (Exception e) { + LOGGER.error("{}: {} exit, because ", IoTDBConstant.GLOBAL_DB_NAME, getID().getName(), e); + } finally { + close(); + LOGGER.info("{}: close TThreadPoolServer and TServerSocket for {}", + IoTDBConstant.GLOBAL_DB_NAME, getID().getName()); + } + } + + private synchronized void close() { + if (poolServer != null) { + poolServer.stop(); + poolServer = null; + } + if (serverTransport != null) { + serverTransport.close(); + serverTransport = null; + } + } + } } \ No newline at end of file
