This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch cluster_test in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0d374231ede9bc3953ecb5d18c045ce4c71e2858 Author: jt <[email protected]> AuthorDate: Tue Jan 12 11:23:38 2021 +0800 refinements --- .../iotdb/cluster/ClusterFileFlushPolicy.java | 67 ---------------------- .../iotdb/cluster/coordinator/Coordinator.java | 2 +- .../apache/iotdb/cluster/server/ClientServer.java | 46 ++++----------- .../org/apache/iotdb/db/service/TSServiceImpl.java | 4 +- 4 files changed, 16 insertions(+), 103 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterFileFlushPolicy.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterFileFlushPolicy.java deleted file mode 100644 index cca5c2a..0000000 --- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterFileFlushPolicy.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.cluster; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import org.apache.iotdb.cluster.server.member.MetaGroupMember; -import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy; -import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor; -import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ClusterFileFlushPolicy implements TsFileFlushPolicy { - - private static final Logger logger = LoggerFactory.getLogger(ClusterFileFlushPolicy.class); - - private ExecutorService closePartitionExecutor; - private MetaGroupMember metaGroupMember; - - public ClusterFileFlushPolicy( - MetaGroupMember metaGroupMember) { - this.metaGroupMember = metaGroupMember; - this.closePartitionExecutor = new ThreadPoolExecutor(16, 1024, 0, TimeUnit.SECONDS, - new LinkedBlockingDeque<>(), r -> { - Thread thread = new Thread(r); - thread.setName("ClusterFileFlushPolicy-" + thread.getId()); - return thread; - }); - } - - @Override - public void apply(StorageGroupProcessor storageGroupProcessor, TsFileProcessor processor, - boolean isSeq) { - logger.info("The memtable size reaches the threshold, async flush it to tsfile: {}", - processor.getTsFileResource().getTsFile().getAbsolutePath()); - - if (processor.shouldClose()) { - // find the related DataGroupMember and close the processor through it - // we execute it in another thread to avoid deadlocks - closePartitionExecutor - .submit(() -> metaGroupMember.closePartition(storageGroupProcessor.getStorageGroupName(), - processor.getTimeRangeId(), isSeq)); - } - // flush the memtable anyway to avoid the insertion trigger the policy again - processor.asyncFlush(); - } -} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java index 4e15c6f..25b4054 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java @@ -562,7 +562,7 @@ public class Coordinator { private TSStatus forwardDataPlanSync(PhysicalPlan plan, Node receiver, Node header) throws IOException { - RaftService.Client client = null; + RaftService.Client client; try { client = metaGroupMember.getClientProvider().getSyncDataClient(receiver, RaftServer.getWriteOperationTimeoutMS()); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java index dd63e2f..8472358 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java @@ -19,6 +19,7 @@ package org.apache.iotdb.cluster.server; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.IOException; import java.net.InetSocketAddress; import java.util.List; @@ -29,9 +30,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import org.apache.iotdb.cluster.client.async.AsyncDataClient; import org.apache.iotdb.cluster.client.sync.SyncDataClient; @@ -80,7 +79,7 @@ import org.slf4j.LoggerFactory; /** * ClientServer is the cluster version of TSServiceImpl, which is responsible for the processing of * the user requests (sqls and session api). It inherits the basic procedures from TSServiceImpl, - * but redirect the queries of data and metadata to a MetaGroupMember of the local node. + * but redirects the queries of data and metadata to the coordinator of the local node. */ public class ClientServer extends TSServiceImpl { @@ -107,7 +106,7 @@ public class ClientServer extends TSServiceImpl { private TServer poolServer; /** - * The socket poolServer will listen to. Async service requires nonblocking socket + * The socket poolServer will listen to. */ private TServerTransport serverTransport; @@ -124,8 +123,8 @@ public class ClientServer extends TSServiceImpl { } /** - * Create a thrift server to listen to the client port and accept requests from clients. This - * server is run in a separate thread. Calling the method twice does not induce side effects. + * Create a thrift server to listen to the client port and accept requests from external clients. + * This server is run in a separate thread. Calling the method twice does not induce side effects. * * @throws TTransportException */ @@ -147,27 +146,20 @@ public class ClientServer extends TSServiceImpl { } serverTransport = new TServerSocket(new InetSocketAddress(config.getClusterRpcIp(), config.getClusterRpcPort())); - // async service also requires nonblocking server, and HsHaServer is basically more efficient a - // nonblocking server - int maxConcurrentClientNum = Math.max(CommonUtils.getCpuCores(), + + int minConcurrentClientNum = CommonUtils.getCpuCores(); + int maxConcurrentClientNum = Math.max(minConcurrentClientNum, config.getMaxConcurrentClientNum()); TThreadPoolServer.Args poolArgs = new TThreadPoolServer.Args(serverTransport).maxWorkerThreads(maxConcurrentClientNum) - .minWorkerThreads(CommonUtils.getCpuCores()); + .minWorkerThreads(minConcurrentClientNum); poolArgs.executorService(new ThreadPoolExecutor(poolArgs.minWorkerThreads, poolArgs.maxWorkerThreads, poolArgs.stopTimeoutVal, poolArgs.stopTimeoutUnit, - new SynchronousQueue<>(), new ThreadFactory() { - private AtomicLong threadIndex = new AtomicLong(0); - - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "ClusterClient" + threadIndex.incrementAndGet()); - } - })); - // ClientServer will do the following processing when the HsHaServer has parsed a request + new SynchronousQueue<>(), + new ThreadFactoryBuilder().setNameFormat("ClusterClient-%d").setDaemon(true).build())); + // ClientServer will do the following processing when the server has parsed a request poolArgs.processor(new Processor<>(this)); poolArgs.protocolFactory(protocolFactory); - // nonblocking server requests FramedTransport poolArgs.transportFactory(RpcTransportFactory.INSTANCE); poolServer = new TThreadPoolServer(poolArgs); @@ -257,20 +249,6 @@ public class ClientServer extends TSServiceImpl { } /** - * Get the data types of each path in “paths”. If "aggregation" is not null, all "paths" will use - * this aggregation. - * - * @param paths full timeseries paths - * @param aggregation if not null, it means "paths" all use this aggregation - * @return the data types of "paths" (using the aggregation) - * @throws MetadataException - */ - protected List<TSDataType> getSeriesTypesByString(List<PartialPath> paths, String aggregation) - throws MetadataException { - return ((CMManager) IoTDB.metaManager).getSeriesTypesByPaths(paths, aggregation).left; - } - - /** * Generate and cache a QueryContext using "queryId". In the distributed version, the QueryContext * is a RemoteQueryContext. * diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java index 84d033c..9543257 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java @@ -993,7 +993,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { */ private boolean checkLogin(long sessionId) { boolean isLoggedIn = sessionIdUsernameMap.get(sessionId) != null; - LOGGER.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME); + if (!isLoggedIn) { + LOGGER.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME); + } return isLoggedIn; }
