This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 144320a [IOTDB-689]add thriftEventHandler to cleanup when a client
quit (#1231)
144320a is described below
commit 144320a0a6797e6704c959dd332b2cffbfd82fb8
Author: Xiangdong Huang <[email protected]>
AuthorDate: Thu May 21 13:05:51 2020 +0800
[IOTDB-689]add thriftEventHandler to cleanup when a client quit (#1231)
* add thriftEventHandler to cleanup when a client quit
Co-authored-by: xiangdong huang <[email protected]>
---
.travis.yml | 2 +-
.../org/apache/iotdb/db/service/RPCService.java | 1 +
.../iotdb/db/service/RPCServiceThriftHandler.java | 55 +++++++++++++++++++++
.../iotdb/db/sync/receiver/SyncServerManager.java | 6 ++-
.../db/sync/receiver/SyncServerThriftHandler.java | 57 ++++++++++++++++++++++
.../db/sync/receiver/transfer/SyncServiceImpl.java | 6 +++
6 files changed, 124 insertions(+), 3 deletions(-)
diff --git a/.travis.yml b/.travis.yml
index 35616ee..360409b 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -117,7 +117,7 @@ matrix:
token:
secure:
"a2n2+MFfuOb3hsrw5vjiIUZzjt6S0j4YmRVrms6NVKF+EpEAWU/zOjzfccLGRr3VFUAlkoM2p578c/0DMMGew2oj3Yl1iJe2n35BWIV6r/psUKRUMdgMAogdshQzZGMYmY1XL2xA3ATu8cf4F8WoRAafd/S58JGQdTfsQ5svh31BxK0iuh+nMIZb4dYxO717dSVD45D9hoF5ROxdtTRbyAaXFPs5djxWrdzKw0J3e6a8m82K9FGcpy4pY9ct3ttbyEzGqMnzE4fhd3KgopFJj+3QdAi/3Cmkf1voxorQdCSbktl4PtlzMY/nxF6XETI2E4T+GLUVMw4iHiXCabqwOiudGnEj5DjNz6X5MVCOyj7lrFBTlDirGzv9fNpvQ2I47WOL8DzBJPqFwQ0nRAlbvcWVtE4aAqj8YF1QwwLxEIi4abDfOwaxGG2TOixQf627pvhh1o0RdEf9R6aRyur7vbM
[...]
script:
- - mvn verify sonar:sonar -Dsonar.organization=apache
-Dsonar.projectKey=apache_incubator-iotdb -DskipTests -pl '!distribution' -am
+ - mvn verify sonar:sonar -Dsonar.organization=apache
-Dsonar.projectKey=apache_incubator-iotdb -DskipTests -pl
'!site','!distribution' -am
- os: linux
if: fork = false #only fork=true (i.e., the committer has permission to
write the repo)
name: code-coverage
diff --git a/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
index 52b4b79..3e1cc87 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
@@ -217,6 +217,7 @@ public class RPCService implements RPCServiceMBean,
IService {
poolArgs.processor(processor);
poolArgs.protocolFactory(protocolFactory);
poolServer = new TThreadPoolServer(poolArgs);
+ poolServer.setServerEventHandler(new RPCServiceThriftHandler(impl));
poolServer.serve();
} catch (TTransportException e) {
throw new RPCServiceException(String.format("%s: failed to start %s,
because ", IoTDBConstant.GLOBAL_DB_NAME,
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/RPCServiceThriftHandler.java
b/server/src/main/java/org/apache/iotdb/db/service/RPCServiceThriftHandler.java
new file mode 100644
index 0000000..0c7867f
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/service/RPCServiceThriftHandler.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service;
+
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.server.ServerContext;
+import org.apache.thrift.server.TServerEventHandler;
+import org.apache.thrift.transport.TTransport;
+
+public class RPCServiceThriftHandler implements TServerEventHandler {
+ private TSServiceImpl serviceImpl;
+
+ RPCServiceThriftHandler(TSServiceImpl serviceImpl) {
+ this.serviceImpl = serviceImpl;
+ }
+
+ @Override
+ public ServerContext createContext(TProtocol arg0, TProtocol arg1) {
+ // nothing
+ return null;
+ }
+
+ @Override
+ public void deleteContext(ServerContext arg0, TProtocol arg1, TProtocol
arg2) {
+ //release query resources.
+ serviceImpl.handleClientExit();
+ }
+
+ @Override
+ public void preServe() {
+ //nothing
+ }
+
+ @Override
+ public void processContext(ServerContext arg0, TTransport arg1, TTransport
arg2) {
+ // nothing
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
index f1fd2f1..fa432fa 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
@@ -136,9 +136,11 @@ public class SyncServerManager implements IService {
private Processor<SyncService.Iface> processor;
private TThreadPoolServer.Args poolArgs;
private CountDownLatch threadStopLatch;
+ private SyncServiceImpl serviceImpl;
public SyncServiceThread(CountDownLatch stopLatch) {
- processor = new SyncService.Processor<>(new SyncServiceImpl());
+ serviceImpl = new SyncServiceImpl();
+ processor = new SyncService.Processor<>(serviceImpl);
this.threadStopLatch = stopLatch;
}
@@ -152,7 +154,6 @@ public class SyncServerManager implements IService {
} else {
protocolFactory = new TBinaryProtocol.Factory();
}
- processor = new SyncService.Processor<>(new SyncServiceImpl());
poolArgs = new TThreadPoolServer.Args(serverTransport).stopTimeoutVal(
IoTDBDescriptor.getInstance().getConfig().getThriftServerAwaitTimeForStopService());
poolArgs.executorService =
IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(poolArgs,
@@ -160,6 +161,7 @@ public class SyncServerManager implements IService {
poolArgs.protocolFactory(protocolFactory);
poolArgs.processor(processor);
poolServer = new TThreadPoolServer(poolArgs);
+ poolServer.setServerEventHandler(new
SyncServerThriftHandler(serviceImpl));
poolServer.serve();
} catch (TTransportException e) {
logger.error("{}: failed to start {}, because ",
IoTDBConstant.GLOBAL_DB_NAME,
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerThriftHandler.java
b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerThriftHandler.java
new file mode 100644
index 0000000..38f036d
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerThriftHandler.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.sync.receiver;
+
+import org.apache.iotdb.db.sync.receiver.transfer.SyncServiceImpl;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.server.ServerContext;
+import org.apache.thrift.server.TServerEventHandler;
+import org.apache.thrift.transport.TTransport;
+
+public class SyncServerThriftHandler implements TServerEventHandler {
+ private SyncServiceImpl serviceImpl;
+
+ SyncServerThriftHandler(SyncServiceImpl serviceImpl) {
+ this.serviceImpl = serviceImpl;
+ }
+
+ @Override
+ public ServerContext createContext(TProtocol arg0, TProtocol arg1) {
+ // nothing
+ return null;
+ }
+
+ @Override
+ public void deleteContext(ServerContext arg0, TProtocol arg1, TProtocol
arg2) {
+ //release query resources.
+ serviceImpl.handleClientExit();
+ }
+
+ @Override
+ public void preServe() {
+ //nothing
+ }
+
+ @Override
+ public void processContext(ServerContext arg0, TTransport arg1, TTransport
arg2) {
+ // nothing
+ }
+}
+
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
index 169a689..9110bd1 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
@@ -315,4 +315,10 @@ public class SyncServiceImpl implements SyncService.Iface {
return new SyncStatus(SyncConstant.ERROR_CODE, errorMsg);
}
+ /**
+ * release resources or cleanup when a client (a sender) is disconnected
(normally or abnormally).
+ */
+ public void handleClientExit() {
+ //do nothing now
+ }
}
\ No newline at end of file