This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 172ab45 [IoTDB-786]refactor the thrift rpc service to reduce
duplicate codes of RPC and Sync module (#1453)
172ab45 is described below
commit 172ab45c2245ccbbd0dfb670809b194b59c57a8b
Author: Xiangdong Huang <[email protected]>
AuthorDate: Sun Jul 5 10:05:42 2020 +0800
[IoTDB-786]refactor the thrift rpc service to reduce duplicate codes of RPC
and Sync module (#1453)
* refactor the thrift rpc service to reduce duplicate codes of RPC and Sync
module
* fix bug that SyncClient is inconsistent with the server after refactor
Co-authored-by: xiangdong huang <[email protected]>
---
.../org/apache/iotdb/db/service/RPCService.java | 218 +++------------------
.../iotdb/db/service/thrift/ThriftService.java | 150 ++++++++++++++
.../db/service/thrift/ThriftServiceThread.java | 143 ++++++++++++++
.../iotdb/db/sync/receiver/SyncServerManager.java | 189 ++++++------------
.../db/sync/receiver/SyncServerManagerMBean.java | 34 ++++
.../iotdb/db/sync/sender/transfer/SyncClient.java | 3 +-
6 files changed, 409 insertions(+), 328 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
index 2ae9928..977c64b 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
@@ -18,47 +18,20 @@
*/
package org.apache.iotdb.db.service;
-import java.net.InetSocketAddress;
-import java.util.concurrent.CountDownLatch;
-import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.StartupException;
-import org.apache.iotdb.db.exception.runtime.RPCServiceException;
-import org.apache.iotdb.service.rpc.thrift.TSIService;
+import org.apache.iotdb.db.service.thrift.ThriftService;
+import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
import org.apache.iotdb.service.rpc.thrift.TSIService.Processor;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.server.TServer;
-import org.apache.thrift.server.TThreadPoolServer;
-import org.apache.thrift.transport.TFastFramedTransport;
-import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* A service to handle jdbc request from client.
*/
-public class RPCService implements RPCServiceMBean, IService {
+public class RPCService extends ThriftService implements RPCServiceMBean {
- private static final Logger logger =
LoggerFactory.getLogger(RPCService.class);
- private static final String STATUS_UP = "UP";
- private static final String STATUS_DOWN = "DOWN";
- private final String mbeanName = String
- .format("%s:%s=%s", IoTDBConstant.IOTDB_PACKAGE, IoTDBConstant.JMX_TYPE,
- getID().getJmxName());
- private RPCServiceThread rpcServiceThread;
- private TProtocolFactory protocolFactory;
- private Processor<TSIService.Iface> processor;
- private TThreadPoolServer.Args poolArgs;
private TSServiceImpl impl;
- private CountDownLatch stopLatch;
-
private RPCService() {
}
@@ -67,106 +40,49 @@ public class RPCService implements RPCServiceMBean,
IService {
}
@Override
- public String getRPCServiceStatus() {
- if (rpcServiceThread == null) {
- logger.debug("Start latch is null when getting status");
- } else {
- logger.debug("Start status is {} when getting status",
rpcServiceThread.isServing());
- }
- if(stopLatch == null) {
- logger.debug("Stop latch is null when getting status");
- } else {
- logger.debug("Stop latch is {} when getting status",
stopLatch.getCount());
- }
-
- if(rpcServiceThread != null && rpcServiceThread.isServing()) {
- return STATUS_UP;
- } else {
- return STATUS_DOWN;
- }
- }
-
- @Override
public int getRPCPort() {
IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
return config.getRpcPort();
}
@Override
- public void start() throws StartupException {
- JMXService.registerMBean(getInstance(), mbeanName);
- startService();
+ public ThriftService getImplementation() {
+ return getInstance();
}
@Override
- public void stop() {
- stopService();
- JMXService.deregisterMBean(mbeanName);
+ public void initTProcessor() throws
ClassNotFoundException,IllegalAccessException,InstantiationException{
+ impl = (TSServiceImpl)
Class.forName(IoTDBDescriptor.getInstance().getConfig()
+ .getRpcImplClassName()).newInstance();
+ processor = new Processor<>(impl);
}
@Override
- public ServiceType getID() {
- return ServiceType.RPC_SERVICE;
+ public void initThriftServiceThread()
+ throws IllegalAccessException, InstantiationException,
ClassNotFoundException {
+ IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ thriftServiceThread = new ThriftServiceThread(processor,
+ getID().getName(), ThreadName.RPC_CLIENT.getName(),
+ config.getRpcAddress(), config.getRpcPort(),
config.getRpcMaxConcurrentClientNum(),
+ config.getThriftServerAwaitTimeForStopService(),
+ new RPCServiceThriftHandler(impl),
+
IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable());
+ thriftServiceThread.setName(ThreadName.RPC_SERVICE.getName());
}
@Override
- @SuppressWarnings("squid:S2276")
- public synchronized void startService() throws StartupException {
- if (STATUS_UP.equals(getRPCServiceStatus())) {
- logger.info("{}: {} has been already running now",
IoTDBConstant.GLOBAL_DB_NAME,
- this.getID().getName());
- return;
- }
- logger.info("{}: start {}...", IoTDBConstant.GLOBAL_DB_NAME,
this.getID().getName());
- try {
- reset();
- rpcServiceThread = new RPCServiceThread(stopLatch);
- rpcServiceThread.setName(ThreadName.RPC_SERVICE.getName());
- rpcServiceThread.start();
- while (!rpcServiceThread.isServing()) {
- //sleep 100ms for waiting the rpc server start.
- Thread.sleep(100);
- }
- } catch (InterruptedException | ClassNotFoundException |
- IllegalAccessException | InstantiationException e) {
- Thread.currentThread().interrupt();
- throw new StartupException(this.getID().getName(), e.getMessage());
- }
-
- logger.info("{}: start {} successfully, listening on ip {} port {}",
IoTDBConstant.GLOBAL_DB_NAME,
- this.getID().getName(),
IoTDBDescriptor.getInstance().getConfig().getRpcAddress(),
- IoTDBDescriptor.getInstance().getConfig().getRpcPort());
- }
-
- private void reset() {
- rpcServiceThread = null;
- stopLatch = new CountDownLatch(1);
+ public String getBindIP() {
+ return IoTDBDescriptor.getInstance().getConfig().getRpcAddress();
}
@Override
- public synchronized void restartService() throws StartupException {
- stopService();
- startService();
+ public int getBindPort() {
+ return IoTDBDescriptor.getInstance().getConfig().getRpcPort();
}
@Override
- public synchronized void stopService() {
- if (STATUS_DOWN.equals(getRPCServiceStatus())) {
- logger.info("{}: {} isn't running now", IoTDBConstant.GLOBAL_DB_NAME,
this.getID().getName());
- return;
- }
- logger.info("{}: closing {}...", IoTDBConstant.GLOBAL_DB_NAME,
this.getID().getName());
- if (rpcServiceThread != null) {
- rpcServiceThread.close();
- }
- try {
- stopLatch.await();
- reset();
- logger.info("{}: close {} successfully", IoTDBConstant.GLOBAL_DB_NAME,
this.getID().getName());
- } catch (InterruptedException e) {
- logger.error("{}: close {} failed because: ",
IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName(), e);
- Thread.currentThread().interrupt();
- }
+ public ServiceType getID() {
+ return ServiceType.RPC_SERVICE;
}
private static class RPCServiceHolder {
@@ -177,88 +93,4 @@ public class RPCService implements RPCServiceMBean,
IService {
}
}
- private class RPCServiceThread extends Thread {
-
- private TServerSocket serverTransport;
- private TServer poolServer;
- private CountDownLatch threadStopLatch;
-
- public RPCServiceThread(CountDownLatch threadStopLatch)
- throws ClassNotFoundException, IllegalAccessException,
InstantiationException {
-
if(IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()) {
- protocolFactory = new TCompactProtocol.Factory();
- }
- else {
- protocolFactory = new TBinaryProtocol.Factory();
- }
- IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- impl = (TSServiceImpl)
Class.forName(config.getRpcImplClassName()).newInstance();
- processor = new TSIService.Processor<>(impl);
- this.threadStopLatch = threadStopLatch;
- }
-
- @SuppressWarnings("squid:S2093") // socket will be used later
- @Override
- public void run() {
- logger.info("The RPC service thread begin to run...");
- try {
- IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- serverTransport = new TServerSocket(new
InetSocketAddress(config.getRpcAddress(),
- config.getRpcPort()));
- //this is for testing.
- if (!serverTransport.getServerSocket().isBound()) {
- logger.error("The RPC service port is not bound.");
- }
- poolArgs = new
TThreadPoolServer.Args(serverTransport).maxWorkerThreads(IoTDBDescriptor.
-
getInstance().getConfig().getRpcMaxConcurrentClientNum()).minWorkerThreads(1)
- .stopTimeoutVal(
-
IoTDBDescriptor.getInstance().getConfig().getThriftServerAwaitTimeForStopService());
- poolArgs.executorService =
IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(poolArgs,
- ThreadName.RPC_CLIENT.getName());
- poolArgs.processor(processor);
- poolArgs.protocolFactory(protocolFactory);
- poolArgs.transportFactory(new TFastFramedTransport.Factory());
- poolServer = new TThreadPoolServer(poolArgs);
- poolServer.setServerEventHandler(new RPCServiceThriftHandler(impl));
- poolServer.serve();
- } catch (TTransportException e) {
- throw new RPCServiceException(String.format("%s: failed to start %s,
because ", IoTDBConstant.GLOBAL_DB_NAME,
- getID().getName()), e);
- } catch (Exception e) {
- throw new RPCServiceException(String.format("%s: %s exit, because ",
IoTDBConstant.GLOBAL_DB_NAME, getID().getName()), e);
- } finally {
- close();
- if (threadStopLatch == null) {
- logger.debug("Stop Count Down latch is null");
- } else {
- logger.debug("Stop Count Down latch is {}",
threadStopLatch.getCount());
- }
-
- if (threadStopLatch != null && threadStopLatch.getCount() == 1) {
- threadStopLatch.countDown();
- }
- logger.debug("{}: close TThreadPoolServer and TServerSocket for {}",
- IoTDBConstant.GLOBAL_DB_NAME, getID().getName());
- }
- }
-
- private synchronized void close() {
- if (poolServer != null) {
- poolServer.setShouldStop(true);
- poolServer.stop();
- poolServer = null;
- }
- if (serverTransport != null) {
- serverTransport.close();
- serverTransport = null;
- }
- }
-
- boolean isServing() {
- if (poolServer != null) {
- return poolServer.isServing();
- }
- return false;
- }
- }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftService.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftService.java
new file mode 100644
index 0000000..c4c3d56
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftService.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service.thrift;
+
+import java.util.concurrent.CountDownLatch;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.service.IService;
+import org.apache.iotdb.db.service.JMXService;
+import org.apache.thrift.TProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class ThriftService implements IService {
+
+ private static final Logger logger =
LoggerFactory.getLogger(ThriftService.class);
+ private static final String STATUS_UP = "UP";
+ private static final String STATUS_DOWN = "DOWN";
+ protected final String mbeanName = String
+ .format("%s:%s=%s", IoTDBConstant.IOTDB_PACKAGE, IoTDBConstant.JMX_TYPE,
+ getID().getJmxName());
+ protected ThriftServiceThread thriftServiceThread;
+ protected TProcessor processor;
+
+ private CountDownLatch stopLatch;
+
+ public String getRPCServiceStatus() {
+ if (thriftServiceThread == null) {
+ logger.debug("Start latch is null when getting status");
+ } else {
+ logger.debug("Start status is {} when getting status",
thriftServiceThread.isServing());
+ }
+ if(stopLatch == null) {
+ logger.debug("Stop latch is null when getting status");
+ } else {
+ logger.debug("Stop latch is {} when getting status",
stopLatch.getCount());
+ }
+
+ if(thriftServiceThread != null && thriftServiceThread.isServing()) {
+ return STATUS_UP;
+ } else {
+ return STATUS_DOWN;
+ }
+ }
+
+ public int getRPCPort() {
+ IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ return config.getRpcPort();
+ }
+
+ public abstract ThriftService getImplementation();
+
+ @Override
+ public void start() throws StartupException {
+ JMXService.registerMBean(getImplementation(), mbeanName);
+ startService();
+ }
+
+ @Override
+ public void stop() {
+ stopService();
+ JMXService.deregisterMBean(mbeanName);
+ }
+
+ public abstract void initTProcessor() throws
ClassNotFoundException,IllegalAccessException,InstantiationException;
+ public abstract void initThriftServiceThread()
+ throws IllegalAccessException, InstantiationException,
ClassNotFoundException;
+ public abstract String getBindIP();
+ public abstract int getBindPort();
+
+
+ @SuppressWarnings("squid:S2276")
+ public synchronized void startService() throws StartupException {
+ if (STATUS_UP.equals(getRPCServiceStatus())) {
+ logger.info("{}: {} has been already running now",
IoTDBConstant.GLOBAL_DB_NAME,
+ this.getID().getName());
+ return;
+ }
+ logger.info("{}: start {}...", IoTDBConstant.GLOBAL_DB_NAME,
this.getID().getName());
+ try {
+ reset();
+ initTProcessor();
+ initThriftServiceThread();
+ thriftServiceThread.setThreadStopLatch(stopLatch);
+ thriftServiceThread.start();
+
+ while (!thriftServiceThread.isServing()) {
+ //sleep 100ms for waiting the rpc server start.
+ Thread.sleep(100);
+ }
+ } catch (InterruptedException | ClassNotFoundException |
+ IllegalAccessException | InstantiationException e) {
+ Thread.currentThread().interrupt();
+ throw new StartupException(this.getID().getName(), e.getMessage());
+ }
+
+ logger.info("{}: start {} successfully, listening on ip {} port {}",
IoTDBConstant.GLOBAL_DB_NAME,
+ this.getID().getName(), getBindIP(), getBindPort());
+ }
+
+ private void reset() {
+ thriftServiceThread = null;
+ stopLatch = new CountDownLatch(1);
+ }
+
+
+ public synchronized void restartService() throws StartupException {
+ stopService();
+ startService();
+ }
+
+ public synchronized void stopService() {
+ if (STATUS_DOWN.equals(getRPCServiceStatus())) {
+ logger.info("{}: {} isn't running now", IoTDBConstant.GLOBAL_DB_NAME,
this.getID().getName());
+ return;
+ }
+ logger.info("{}: closing {}...", IoTDBConstant.GLOBAL_DB_NAME,
this.getID().getName());
+ if (thriftServiceThread != null) {
+ thriftServiceThread.close();
+ }
+ try {
+ stopLatch.await();
+ reset();
+ logger.info("{}: close {} successfully", IoTDBConstant.GLOBAL_DB_NAME,
this.getID().getName());
+ } catch (InterruptedException e) {
+ logger.error("{}: close {} failed because: ",
IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName(), e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java
new file mode 100644
index 0000000..5bb8ae5
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service.thrift;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.CountDownLatch;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.exception.runtime.RPCServiceException;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TServerEventHandler;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TFastFramedTransport;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ThriftServiceThread extends Thread{
+
+ private static final Logger logger =
LoggerFactory.getLogger(ThriftServiceThread.class);
+ private TServerSocket serverTransport;
+ private TServer poolServer;
+ private CountDownLatch threadStopLatch;
+
+ private String serviceName;
+
+ private TProtocolFactory protocolFactory;
+ private TThreadPoolServer.Args poolArgs;
+
+ @SuppressWarnings("squid:S107")
+ public ThriftServiceThread(TProcessor processor, String serviceName,
+ String threadsName,
+ String bindAddress, int port, int maxWorkerThreads, int timeoutMs,
+ TServerEventHandler serverEventHandler, boolean compress) {
+ if(compress) {
+ protocolFactory = new TCompactProtocol.Factory();
+ }
+ else {
+ protocolFactory = new TBinaryProtocol.Factory();
+ }
+ this.serviceName = serviceName;
+
+ try {
+ serverTransport = new TServerSocket(new InetSocketAddress(bindAddress,
port));
+ poolArgs = new TThreadPoolServer.Args(serverTransport)
+ .maxWorkerThreads(maxWorkerThreads)
+ .minWorkerThreads(1)
+ .stopTimeoutVal(timeoutMs);
+ poolArgs.executorService =
IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(poolArgs,
+ threadsName);
+ poolArgs.processor(processor);
+ poolArgs.protocolFactory(protocolFactory);
+ poolArgs.transportFactory(new TFastFramedTransport.Factory());
+ poolServer = new TThreadPoolServer(poolArgs);
+ poolServer.setServerEventHandler(serverEventHandler);
+ } catch (TTransportException e) {
+ close();
+ if (threadStopLatch == null) {
+ logger.debug("Stop Count Down latch is null");
+ } else {
+ logger.debug("Stop Count Down latch is {}",
threadStopLatch.getCount());
+ }
+ if (threadStopLatch != null && threadStopLatch.getCount() == 1) {
+ threadStopLatch.countDown();
+ }
+ logger.debug("{}: close TThreadPoolServer and TServerSocket for {}",
+ IoTDBConstant.GLOBAL_DB_NAME, serviceName);
+ throw new RPCServiceException(String.format("%s: failed to start %s,
because ",
+ IoTDBConstant.GLOBAL_DB_NAME, serviceName), e);
+ }
+ }
+
+ public void setThreadStopLatch(CountDownLatch threadStopLatch) {
+ this.threadStopLatch = threadStopLatch;
+ }
+
+ @SuppressWarnings("squid:S2093") // socket will be used later
+ @Override
+ public void run() {
+ logger.info("The {} service thread begin to run...", serviceName);
+ try {
+ poolServer.serve();
+ } catch (Exception e) {
+ throw new RPCServiceException(String.format("%s: %s exit, because ",
+ IoTDBConstant.GLOBAL_DB_NAME, serviceName), e);
+ } finally {
+ close();
+ if (threadStopLatch == null) {
+ logger.debug("Stop Count Down latch is null");
+ } else {
+ logger.debug("Stop Count Down latch is {}",
threadStopLatch.getCount());
+ }
+
+ if (threadStopLatch != null && threadStopLatch.getCount() == 1) {
+ threadStopLatch.countDown();
+ }
+ logger.debug("{}: close TThreadPoolServer and TServerSocket for {}",
+ IoTDBConstant.GLOBAL_DB_NAME, serviceName);
+ }
+ }
+
+ public synchronized void close() {
+ if (poolServer != null) {
+ poolServer.setShouldStop(true);
+ poolServer.stop();
+
+ poolServer = null;
+ }
+ if (serverTransport != null) {
+ serverTransport.close();
+ serverTransport = null;
+ }
+ }
+
+ public boolean isServing() {
+ if (poolServer != null) {
+ return poolServer.isServing();
+ }
+ return false;
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
index fa432fa..88d3046 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
@@ -19,58 +19,80 @@
package org.apache.iotdb.db.sync.receiver;
import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.concurrent.CountDownLatch;
-import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StartupException;
-import org.apache.iotdb.db.service.IService;
import org.apache.iotdb.db.service.ServiceType;
+import org.apache.iotdb.db.service.thrift.ThriftService;
+import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
import org.apache.iotdb.db.sync.receiver.load.FileLoaderManager;
import org.apache.iotdb.db.sync.receiver.recover.SyncReceiverLogAnalyzer;
import org.apache.iotdb.db.sync.receiver.transfer.SyncServiceImpl;
import org.apache.iotdb.service.sync.thrift.SyncService;
-import org.apache.iotdb.service.sync.thrift.SyncService.Processor;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.server.TServer;
-import org.apache.thrift.server.TThreadPoolServer;
-import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* sync receiver server.
*/
-public class SyncServerManager implements IService {
+public class SyncServerManager extends ThriftService implements
SyncServerManagerMBean {
+ private static Logger logger =
LoggerFactory.getLogger(SyncServerManager.class);
+ private SyncServiceImpl serviceImpl;
- private static final Logger logger =
LoggerFactory.getLogger(SyncServerManager.class);
+ private static class ServerManagerHolder {
- private IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
+ private static final SyncServerManager INSTANCE = new SyncServerManager();
+ }
- private SyncServiceThread syncServerThread;
+ public static SyncServerManager getInstance() {
+ return SyncServerManager.ServerManagerHolder.INSTANCE;
+ }
- //stopLatch is also for letting the IT know whether the socket is closed.
- private CountDownLatch stopLatch;
+ @Override
+ public ServiceType getID() {
+ return ServiceType.SYNC_SERVICE;
+ }
- private SyncServerManager() {
+ @Override
+ public ThriftService getImplementation() {
+ return getInstance();
}
- public static SyncServerManager getInstance() {
- return ServerManagerHolder.INSTANCE;
+ @Override
+ public void initTProcessor() {
+ serviceImpl = new SyncServiceImpl();
+ processor = new SyncService.Processor<>(serviceImpl);
+ }
+
+ @Override
+ public void initThriftServiceThread()
+ throws IllegalAccessException, InstantiationException,
ClassNotFoundException {
+ IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ thriftServiceThread = new ThriftServiceThread(processor,
+ getID().getName(), ThreadName.SYNC_CLIENT.getName(),
+ config.getRpcAddress(), config.getSyncServerPort(),
+ Integer.MAX_VALUE, config.getThriftServerAwaitTimeForStopService(),
+ new SyncServerThriftHandler(serviceImpl),
+ config.isRpcThriftCompressionEnable()
+ );
+ thriftServiceThread.setName(ThreadName.SYNC_SERVER.getName());
}
- /**
- * Start sync receiver's server.
- */
@Override
- public void start() throws StartupException {
- if (!conf.isSyncEnable()) {
+ public String getBindIP() {
+ return IoTDBDescriptor.getInstance().getConfig().getRpcAddress();
+ }
+
+ @Override
+ public int getBindPort() {
+ return IoTDBDescriptor.getInstance().getConfig().getSyncServerPort();
+ }
+
+ @Override
+ public void startService() throws StartupException {
+ IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ if (!config.isSyncEnable()) {
return;
}
FileLoaderManager.getInstance().start();
@@ -79,122 +101,21 @@ public class SyncServerManager implements IService {
} catch (IOException e) {
logger.error("Can not recover receiver sync state", e);
}
- if (conf.getIpWhiteList() == null) {
+ if (config.getIpWhiteList() == null) {
logger.error(
"Sync server failed to start because IP white list is null, please
set IP white list.");
return;
}
- stopLatch = new CountDownLatch(1);
- conf.setIpWhiteList(conf.getIpWhiteList().replaceAll(" ", ""));
- syncServerThread = new SyncServiceThread(stopLatch);
- syncServerThread.setName(ThreadName.SYNC_SERVER.getName());
- syncServerThread.start();
- try {
- while (!syncServerThread.isServing()) {
- //sleep 100ms for waiting the sync server start.
- Thread.sleep(100);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new StartupException(this.getID().getName(), e.getMessage());
- }
- logger.info("Sync server has started.");
+ config.setIpWhiteList(config.getIpWhiteList().replace(" ", ""));
+ super.startService();
}
- /**
- * Close sync receiver's server.
- */
@Override
- public void stop() {
- if (conf.isSyncEnable()) {
+ public void stopService() {
+ if (IoTDBDescriptor.getInstance().getConfig().isSyncEnable()) {
FileLoaderManager.getInstance().stop();
- syncServerThread.close();
- try {
- stopLatch.await();
- } catch (InterruptedException e) {
- logger.error(e.getMessage(), e);
- Thread.currentThread().interrupt();
- }
- }
- }
-
- @Override
- public ServiceType getID() {
- return ServiceType.SYNC_SERVICE;
- }
-
- private static class ServerManagerHolder {
-
- private static final SyncServerManager INSTANCE = new SyncServerManager();
- }
-
- private class SyncServiceThread extends Thread {
-
- private TServerSocket serverTransport;
- private TServer poolServer;
- private TProtocolFactory protocolFactory;
- private Processor<SyncService.Iface> processor;
- private TThreadPoolServer.Args poolArgs;
- private CountDownLatch threadStopLatch;
- private SyncServiceImpl serviceImpl;
-
- public SyncServiceThread(CountDownLatch stopLatch) {
- serviceImpl = new SyncServiceImpl();
- processor = new SyncService.Processor<>(serviceImpl);
- this.threadStopLatch = stopLatch;
+ super.stopService();
}
- @Override
- public void run() {
- try {
- serverTransport = new TServerSocket(
- new InetSocketAddress(conf.getRpcAddress(),
conf.getSyncServerPort()));
- if (conf.isRpcThriftCompressionEnable()) {
- protocolFactory = new TCompactProtocol.Factory();
- } else {
- protocolFactory = new TBinaryProtocol.Factory();
- }
- poolArgs = new TThreadPoolServer.Args(serverTransport).stopTimeoutVal(
-
IoTDBDescriptor.getInstance().getConfig().getThriftServerAwaitTimeForStopService());
- poolArgs.executorService =
IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(poolArgs,
- ThreadName.SYNC_CLIENT.getName());
- poolArgs.protocolFactory(protocolFactory);
- poolArgs.processor(processor);
- poolServer = new TThreadPoolServer(poolArgs);
- poolServer.setServerEventHandler(new
SyncServerThriftHandler(serviceImpl));
- poolServer.serve();
- } catch (TTransportException e) {
- logger.error("{}: failed to start {}, because ",
IoTDBConstant.GLOBAL_DB_NAME,
- getID().getName(), e);
- } catch (Exception e) {
- logger.error("{}: {} exit, because ", IoTDBConstant.GLOBAL_DB_NAME,
getID().getName(), e);
- } finally {
- close();
- if (threadStopLatch != null && threadStopLatch.getCount() == 1) {
- threadStopLatch.countDown();
- }
- logger.info("{}: close TThreadPoolServer and TServerSocket for {}",
- IoTDBConstant.GLOBAL_DB_NAME, getID().getName());
-
- }
- }
-
- private synchronized void close() {
- if (poolServer != null) {
- poolServer.stop();
- poolServer = null;
- }
- if (serverTransport != null) {
- serverTransport.close();
- serverTransport = null;
- }
- }
-
- boolean isServing() {
- if (poolServer != null) {
- return poolServer.isServing();
- }
- return false;
- }
}
}
\ No newline at end of file
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManagerMBean.java
b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManagerMBean.java
new file mode 100644
index 0000000..3992693
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManagerMBean.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.sync.receiver;
+
+import org.apache.iotdb.db.exception.StartupException;
+
+public interface SyncServerManagerMBean {
+
+ String getRPCServiceStatus();
+
+ int getRPCPort();
+
+ void startService() throws StartupException;
+
+ void restartService() throws StartupException;
+
+ void stopService();
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
index e627e43..d599ca8 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
@@ -75,6 +75,7 @@ import org.apache.iotdb.tsfile.utils.BytesUtils;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TFastFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
@@ -273,7 +274,7 @@ public class SyncClient implements ISyncClient {
@Override
public void establishConnection(String serverIp, int serverPort) throws
SyncConnectionException {
- transport = new TSocket(serverIp, serverPort, TIMEOUT_MS);
+ transport = new TFastFramedTransport(new TSocket(serverIp, serverPort,
TIMEOUT_MS));
TProtocol protocol = new TBinaryProtocol(transport);
serviceClient = new SyncService.Client(protocol);
try {