This is an automated email from the ASF dual-hosted git repository. hxd pushed a commit to branch for_travis_stable in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit b224ae440a0ae1a6f1b94e17560bef3509026ee4 Author: xiangdong huang <[email protected]> AuthorDate: Tue Mar 24 18:53:20 2020 +0800 guaratee the sync server is started before a UT/IT finished; --- .../iotdb/db/sync/receiver/SyncServerManager.java | 36 +++++++++++++- .../db/sync/thrift/SyncServiceEventHandler.java | 56 ++++++++++++++++++++++ 2 files changed, 90 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java index a80bf62..056a9a0 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.sync.receiver; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.concurrent.CountDownLatch; import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.db.concurrent.ThreadName; import org.apache.iotdb.db.conf.IoTDBConfig; @@ -27,10 +28,12 @@ import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.StartupException; import org.apache.iotdb.db.service.IService; +import org.apache.iotdb.db.service.JDBCServiceEventHandler; import org.apache.iotdb.db.service.ServiceType; import org.apache.iotdb.db.sync.receiver.load.FileLoaderManager; import org.apache.iotdb.db.sync.receiver.recover.SyncReceiverLogAnalyzer; import org.apache.iotdb.db.sync.receiver.transfer.SyncServiceImpl; +import org.apache.iotdb.db.sync.thrift.SyncServiceEventHandler; import org.apache.iotdb.service.sync.thrift.SyncService; import org.apache.iotdb.service.sync.thrift.SyncService.Processor; import org.apache.thrift.protocol.TBinaryProtocol; @@ -54,6 +57,11 @@ public class SyncServerManager implements IService { private Thread syncServerThread; + //we add this latch for avoiding in some ITs, the syncService is not startup but the IT has finished. + private CountDownLatch startLatch; + //stopLatch is also for letting the IT know whether the socket is closed. + private CountDownLatch stopLatch; + private SyncServerManager() { } @@ -80,10 +88,18 @@ public class SyncServerManager implements IService { "Sync server failed to start because IP white list is null, please set IP white list."); return; } + startLatch = new CountDownLatch(1); + stopLatch = new CountDownLatch(1); conf.setIpWhiteList(conf.getIpWhiteList().replaceAll(" ", "")); - syncServerThread = new SyncServiceThread(); + syncServerThread = new SyncServiceThread(startLatch, stopLatch); syncServerThread.setName(ThreadName.SYNC_SERVER.getName()); syncServerThread.start(); + try { + startLatch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new StartupException(this.getID().getName(), e.getMessage()); + } logger.info("Sync server has started."); } @@ -95,6 +111,12 @@ public class SyncServerManager implements IService { if (conf.isSyncEnable()) { FileLoaderManager.getInstance().stop(); ((SyncServiceThread) syncServerThread).close(); + try { + stopLatch.await(); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + Thread.currentThread().interrupt(); + } } } @@ -115,9 +137,14 @@ public class SyncServerManager implements IService { private TProtocolFactory protocolFactory; private Processor<SyncService.Iface> processor; private TThreadPoolServer.Args poolArgs; + //we add this latch for avoiding in some ITs, the syncService is not startup but the IT has finished. + private CountDownLatch threadStartLatch; + private CountDownLatch threadStopLatch; - public SyncServiceThread() { + public SyncServiceThread(CountDownLatch startLatch, CountDownLatch stopLatch) { processor = new SyncService.Processor<>(new SyncServiceImpl()); + this.threadStartLatch = startLatch; + this.threadStopLatch = stopLatch; } @Override @@ -138,6 +165,7 @@ public class SyncServerManager implements IService { poolArgs.protocolFactory(protocolFactory); poolArgs.processor(processor); poolServer = new TThreadPoolServer(poolArgs); + poolServer.setServerEventHandler(new SyncServiceEventHandler(threadStartLatch)); poolServer.serve(); } catch (TTransportException e) { logger.error("{}: failed to start {}, because ", IoTDBConstant.GLOBAL_DB_NAME, @@ -146,8 +174,12 @@ public class SyncServerManager implements IService { logger.error("{}: {} exit, because ", IoTDBConstant.GLOBAL_DB_NAME, getID().getName(), e); } finally { close(); + if (threadStopLatch != null && threadStopLatch.getCount() == 1) { + threadStopLatch.countDown(); + } logger.info("{}: close TThreadPoolServer and TServerSocket for {}", IoTDBConstant.GLOBAL_DB_NAME, getID().getName()); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/sync/thrift/SyncServiceEventHandler.java b/server/src/main/java/org/apache/iotdb/db/sync/thrift/SyncServiceEventHandler.java new file mode 100644 index 0000000..25469ab --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/sync/thrift/SyncServiceEventHandler.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.sync.thrift; + +import java.util.concurrent.CountDownLatch; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.server.ServerContext; +import org.apache.thrift.server.TServerEventHandler; +import org.apache.thrift.transport.TTransport; + +public class SyncServiceEventHandler implements TServerEventHandler { + + private CountDownLatch startLatch; + + public SyncServiceEventHandler(CountDownLatch startLatch) { + this.startLatch = startLatch; + } + + @Override + public void preServe() { + startLatch.countDown(); + } + + @Override + public ServerContext createContext(TProtocol input, TProtocol output) { + return null; + } + + @Override + public void deleteContext(ServerContext serverContext, TProtocol input, TProtocol output) { + + } + + @Override + public void processContext(ServerContext serverContext, TTransport inputTransport, + TTransport outputTransport) { + + } +}
