This is an automated email from the ASF dual-hosted git repository. hxd pushed a commit to branch iotdb-689 in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 4ab7856f8ffd18b539e0fc5d8773105dabf3c0bb Author: xiangdong huang <[email protected]> AuthorDate: Wed May 20 15:14:11 2020 +0800 add thriftEventHandler to cleanup when a client quit --- .../org/apache/iotdb/db/service/RPCService.java | 1 + .../iotdb/db/service/RPCServiceThriftHandler.java | 55 +++++++++++++++++++++ .../iotdb/db/sync/receiver/SyncServerManager.java | 6 ++- .../db/sync/receiver/SyncServerThriftHandler.java | 57 ++++++++++++++++++++++ .../db/sync/receiver/transfer/SyncServiceImpl.java | 6 +++ 5 files changed, 123 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/service/RPCService.java b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java index 52b4b79..3e1cc87 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/RPCService.java +++ b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java @@ -217,6 +217,7 @@ public class RPCService implements RPCServiceMBean, IService { poolArgs.processor(processor); poolArgs.protocolFactory(protocolFactory); poolServer = new TThreadPoolServer(poolArgs); + poolServer.setServerEventHandler(new RPCServiceThriftHandler(impl)); poolServer.serve(); } catch (TTransportException e) { throw new RPCServiceException(String.format("%s: failed to start %s, because ", IoTDBConstant.GLOBAL_DB_NAME, diff --git a/server/src/main/java/org/apache/iotdb/db/service/RPCServiceThriftHandler.java b/server/src/main/java/org/apache/iotdb/db/service/RPCServiceThriftHandler.java new file mode 100644 index 0000000..0c7867f --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/service/RPCServiceThriftHandler.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.service; + +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.server.ServerContext; +import org.apache.thrift.server.TServerEventHandler; +import org.apache.thrift.transport.TTransport; + +public class RPCServiceThriftHandler implements TServerEventHandler { + private TSServiceImpl serviceImpl; + + RPCServiceThriftHandler(TSServiceImpl serviceImpl) { + this.serviceImpl = serviceImpl; + } + + @Override + public ServerContext createContext(TProtocol arg0, TProtocol arg1) { + // nothing + return null; + } + + @Override + public void deleteContext(ServerContext arg0, TProtocol arg1, TProtocol arg2) { + //release query resources. + serviceImpl.handleClientExit(); + } + + @Override + public void preServe() { + //nothing + } + + @Override + public void processContext(ServerContext arg0, TTransport arg1, TTransport arg2) { + // nothing + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java index f1fd2f1..fa432fa 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java @@ -136,9 +136,11 @@ public class SyncServerManager implements IService { private Processor<SyncService.Iface> processor; private TThreadPoolServer.Args poolArgs; private CountDownLatch threadStopLatch; + private SyncServiceImpl serviceImpl; public SyncServiceThread(CountDownLatch stopLatch) { - processor = new SyncService.Processor<>(new SyncServiceImpl()); + serviceImpl = new SyncServiceImpl(); + processor = new SyncService.Processor<>(serviceImpl); this.threadStopLatch = stopLatch; } @@ -152,7 +154,6 @@ public class SyncServerManager implements IService { } else { protocolFactory = new TBinaryProtocol.Factory(); } - processor = new SyncService.Processor<>(new SyncServiceImpl()); poolArgs = new TThreadPoolServer.Args(serverTransport).stopTimeoutVal( IoTDBDescriptor.getInstance().getConfig().getThriftServerAwaitTimeForStopService()); poolArgs.executorService = IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(poolArgs, @@ -160,6 +161,7 @@ public class SyncServerManager implements IService { poolArgs.protocolFactory(protocolFactory); poolArgs.processor(processor); poolServer = new TThreadPoolServer(poolArgs); + poolServer.setServerEventHandler(new SyncServerThriftHandler(serviceImpl)); poolServer.serve(); } catch (TTransportException e) { logger.error("{}: failed to start {}, because ", IoTDBConstant.GLOBAL_DB_NAME, diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerThriftHandler.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerThriftHandler.java new file mode 100644 index 0000000..38f036d --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerThriftHandler.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.sync.receiver; + +import org.apache.iotdb.db.sync.receiver.transfer.SyncServiceImpl; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.server.ServerContext; +import org.apache.thrift.server.TServerEventHandler; +import org.apache.thrift.transport.TTransport; + +public class SyncServerThriftHandler implements TServerEventHandler { + private SyncServiceImpl serviceImpl; + + SyncServerThriftHandler(SyncServiceImpl serviceImpl) { + this.serviceImpl = serviceImpl; + } + + @Override + public ServerContext createContext(TProtocol arg0, TProtocol arg1) { + // nothing + return null; + } + + @Override + public void deleteContext(ServerContext arg0, TProtocol arg1, TProtocol arg2) { + //release query resources. + serviceImpl.handleClientExit(); + } + + @Override + public void preServe() { + //nothing + } + + @Override + public void processContext(ServerContext arg0, TTransport arg1, TTransport arg2) { + // nothing + } +} + diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java index 169a689..9110bd1 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java @@ -315,4 +315,10 @@ public class SyncServiceImpl implements SyncService.Iface { return new SyncStatus(SyncConstant.ERROR_CODE, errorMsg); } + /** + * release resources or cleanup when a client (a sender) is disconnected (normally or abnormally). + */ + public void handleClientExit() { + //do nothing now + } } \ No newline at end of file
