This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch cluster_test
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 0d374231ede9bc3953ecb5d18c045ce4c71e2858
Author: jt <[email protected]>
AuthorDate: Tue Jan 12 11:23:38 2021 +0800

    refinements
---
 .../iotdb/cluster/ClusterFileFlushPolicy.java      | 67 ----------------------
 .../iotdb/cluster/coordinator/Coordinator.java     |  2 +-
 .../apache/iotdb/cluster/server/ClientServer.java  | 46 ++++-----------
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  4 +-
 4 files changed, 16 insertions(+), 103 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterFileFlushPolicy.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterFileFlushPolicy.java
deleted file mode 100644
index cca5c2a..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterFileFlushPolicy.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import org.apache.iotdb.cluster.server.member.MetaGroupMember;
-import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy;
-import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
-import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ClusterFileFlushPolicy implements TsFileFlushPolicy {
-
-  private static final Logger logger = 
LoggerFactory.getLogger(ClusterFileFlushPolicy.class);
-
-  private ExecutorService closePartitionExecutor;
-  private MetaGroupMember metaGroupMember;
-
-  public ClusterFileFlushPolicy(
-      MetaGroupMember metaGroupMember) {
-    this.metaGroupMember = metaGroupMember;
-    this.closePartitionExecutor = new ThreadPoolExecutor(16, 1024, 0, 
TimeUnit.SECONDS,
-        new LinkedBlockingDeque<>(), r -> {
-      Thread thread = new Thread(r);
-      thread.setName("ClusterFileFlushPolicy-" + thread.getId());
-      return thread;
-    });
-  }
-
-  @Override
-  public void apply(StorageGroupProcessor storageGroupProcessor, 
TsFileProcessor processor,
-      boolean isSeq) {
-    logger.info("The memtable size reaches the threshold, async flush it to 
tsfile: {}",
-        processor.getTsFileResource().getTsFile().getAbsolutePath());
-
-    if (processor.shouldClose()) {
-      // find the related DataGroupMember and close the processor through it
-      // we execute it in another thread to avoid deadlocks
-      closePartitionExecutor
-          .submit(() -> 
metaGroupMember.closePartition(storageGroupProcessor.getStorageGroupName(),
-              processor.getTimeRangeId(), isSeq));
-    }
-    // flush the memtable anyway to avoid the insertion trigger the policy 
again
-    processor.asyncFlush();
-  }
-}
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
index 4e15c6f..25b4054 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
@@ -562,7 +562,7 @@ public class Coordinator {
 
   private TSStatus forwardDataPlanSync(PhysicalPlan plan, Node receiver, Node 
header)
     throws IOException {
-    RaftService.Client client = null;
+    RaftService.Client client;
     try {
       client = metaGroupMember.getClientProvider().getSyncDataClient(receiver,
         RaftServer.getWriteOperationTimeoutMS());
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
index dd63e2f..8472358 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.cluster.server;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.List;
@@ -29,9 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.iotdb.cluster.client.async.AsyncDataClient;
 import org.apache.iotdb.cluster.client.sync.SyncDataClient;
@@ -80,7 +79,7 @@ import org.slf4j.LoggerFactory;
 /**
  * ClientServer is the cluster version of TSServiceImpl, which is responsible 
for the processing of
  * the user requests (sqls and session api). It inherits the basic procedures 
from TSServiceImpl,
- * but redirect the queries of data and metadata to a MetaGroupMember of the 
local node.
+ * but redirects the queries of data and metadata to the coordinator of the 
local node.
  */
 public class ClientServer extends TSServiceImpl {
 
@@ -107,7 +106,7 @@ public class ClientServer extends TSServiceImpl {
   private TServer poolServer;
 
   /**
-   * The socket poolServer will listen to. Async service requires nonblocking 
socket
+   * The socket poolServer will listen to.
    */
   private TServerTransport serverTransport;
 
@@ -124,8 +123,8 @@ public class ClientServer extends TSServiceImpl {
   }
 
   /**
-   * Create a thrift server to listen to the client port and accept requests 
from clients. This
-   * server is run in a separate thread. Calling the method twice does not 
induce side effects.
+   * Create a thrift server to listen to the client port and accept requests 
from external clients.
+   * This server is run in a separate thread. Calling the method twice does 
not induce side effects.
    *
    * @throws TTransportException
    */
@@ -147,27 +146,20 @@ public class ClientServer extends TSServiceImpl {
     }
     serverTransport = new TServerSocket(new 
InetSocketAddress(config.getClusterRpcIp(),
         config.getClusterRpcPort()));
-    // async service also requires nonblocking server, and HsHaServer is 
basically more efficient a
-    // nonblocking server
-    int maxConcurrentClientNum = Math.max(CommonUtils.getCpuCores(),
+
+    int minConcurrentClientNum = CommonUtils.getCpuCores();
+    int maxConcurrentClientNum = Math.max(minConcurrentClientNum,
         config.getMaxConcurrentClientNum());
     TThreadPoolServer.Args poolArgs =
         new 
TThreadPoolServer.Args(serverTransport).maxWorkerThreads(maxConcurrentClientNum)
-            .minWorkerThreads(CommonUtils.getCpuCores());
+            .minWorkerThreads(minConcurrentClientNum);
     poolArgs.executorService(new ThreadPoolExecutor(poolArgs.minWorkerThreads,
         poolArgs.maxWorkerThreads, poolArgs.stopTimeoutVal, 
poolArgs.stopTimeoutUnit,
-        new SynchronousQueue<>(), new ThreadFactory() {
-      private AtomicLong threadIndex = new AtomicLong(0);
-
-      @Override
-      public Thread newThread(Runnable r) {
-        return new Thread(r, "ClusterClient" + threadIndex.incrementAndGet());
-      }
-    }));
-    // ClientServer will do the following processing when the HsHaServer has 
parsed a request
+        new SynchronousQueue<>(),
+        new 
ThreadFactoryBuilder().setNameFormat("ClusterClient-%d").setDaemon(true).build()));
+    // ClientServer will do the following processing when the server has 
parsed a request
     poolArgs.processor(new Processor<>(this));
     poolArgs.protocolFactory(protocolFactory);
-    // nonblocking server requests FramedTransport
     poolArgs.transportFactory(RpcTransportFactory.INSTANCE);
 
     poolServer = new TThreadPoolServer(poolArgs);
@@ -257,20 +249,6 @@ public class ClientServer extends TSServiceImpl {
   }
 
   /**
-   * Get the data types of each path in “paths”. If "aggregation" is not null, 
all "paths" will use
-   * this aggregation.
-   *
-   * @param paths       full timeseries paths
-   * @param aggregation if not null, it means "paths" all use this aggregation
-   * @return the data types of "paths" (using the aggregation)
-   * @throws MetadataException
-   */
-  protected List<TSDataType> getSeriesTypesByString(List<PartialPath> paths, 
String aggregation)
-      throws MetadataException {
-    return ((CMManager) IoTDB.metaManager).getSeriesTypesByPaths(paths, 
aggregation).left;
-  }
-
-  /**
    * Generate and cache a QueryContext using "queryId". In the distributed 
version, the QueryContext
    * is a RemoteQueryContext.
    *
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java 
b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 84d033c..9543257 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -993,7 +993,9 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
    */
   private boolean checkLogin(long sessionId) {
     boolean isLoggedIn = sessionIdUsernameMap.get(sessionId) != null;
-    LOGGER.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
+    if (!isLoggedIn) {
+      LOGGER.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
+    }
     return isLoggedIn;
   }
 

Reply via email to