This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/cluster by this push:
     new e4f07e2  fix init of raft node as manager
e4f07e2 is described below

commit e4f07e2a2e45244d59a1063772281439e5e84bed
Author: lta <[email protected]>
AuthorDate: Thu May 23 16:17:32 2019 +0800

    fix init of raft node as manager
---
 .../cluster/concurrent/pool/QPTaskManager.java     | 64 ----------------
 .../cluster/concurrent/pool/QueryTimerManager.java | 74 -------------------
 .../org/apache/iotdb/cluster/entity/Server.java    |  4 +-
 .../querynode/ClusterLocalSingleQueryManager.java  |  6 +-
 .../rpc/raft/impl/RaftNodeAsClientManager.java     | 13 ++++
 .../cluster/concurrent/pool/QPTaskManagerTest.java | 85 ----------------------
 .../integration/IoTDBAggregationSmallDataIT.java   |  9 ---
 .../apache/iotdb/cluster/utils/RaftUtilsTest.java  |  2 +
 8 files changed, 20 insertions(+), 237 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManager.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManager.java
deleted file mode 100644
index d831157..0000000
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManager.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.cluster.concurrent.pool;
-
-import org.apache.iotdb.cluster.concurrent.ThreadName;
-import org.apache.iotdb.cluster.config.ClusterDescriptor;
-
-/**
- * Manage all qp tasks in thread.
- */
-public class QPTaskManager extends ThreadPoolManager {
-
-  private static final String MANAGER_NAME = "qp task manager";
-
-  private QPTaskManager() {
-    init();
-  }
-
-  public static QPTaskManager getInstance() {
-    return QPTaskManager.InstanceHolder.instance;
-  }
-
-  /**
-   * Name of Pool Manager
-   */
-  @Override
-  public String getManagerName() {
-    return MANAGER_NAME;
-  }
-
-  @Override
-  public String getThreadName() {
-    return ThreadName.QP_TASK.getName();
-  }
-
-  @Override
-  public int getThreadPoolSize() {
-    return 
ClusterDescriptor.getInstance().getConfig().getConcurrentQPSubTaskThread();
-  }
-
-  private static class InstanceHolder {
-
-    private InstanceHolder() {
-    }
-
-    private static QPTaskManager instance = new QPTaskManager();
-  }
-}
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/QueryTimerManager.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/QueryTimerManager.java
deleted file mode 100644
index 779488c..0000000
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/QueryTimerManager.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.cluster.concurrent.pool;
-
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import org.apache.iotdb.cluster.concurrent.ThreadName;
-import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
-
-/**
- * Manage all query timer in query node, if timer is timeout, close all query 
resource for remote
- * coordinator node.
- */
-public class QueryTimerManager extends ThreadPoolManager {
-
-  private static final String MANAGER_NAME = "remote-query-timer-manager";
-
-  private static final int CORE_POOL_SIZE = 1;
-
-  @Override
-  public void init() {
-    pool = IoTDBThreadPoolFactory.newScheduledThreadPool(getThreadPoolSize(), 
getThreadName());
-  }
-
-  public static QueryTimerManager getInstance() {
-    return QueryTimerManager.QueryTimerManagerHolder.INSTANCE;
-  }
-
-  @Override
-  public String getManagerName() {
-    return MANAGER_NAME;
-  }
-
-  @Override
-  public String getThreadName() {
-    return ThreadName.REMOTE_QUERY_TIMER.getName();
-  }
-
-  @Override
-  public int getThreadPoolSize() {
-    return CORE_POOL_SIZE;
-  }
-
-  public ScheduledFuture<?> execute(Runnable task, long delayMs) {
-    checkInit();
-    return ((ScheduledExecutorService) pool).schedule(task, delayMs, 
TimeUnit.MICROSECONDS);
-  }
-
-  private static class QueryTimerManagerHolder {
-
-    private static final QueryTimerManager INSTANCE = new QueryTimerManager();
-
-    private QueryTimerManagerHolder() {
-
-    }
-  }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
index d12f78f..418b625 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
@@ -23,7 +23,7 @@ import com.alipay.sofa.jraft.entity.PeerId;
 import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.iotdb.cluster.concurrent.pool.QPTaskManager;
+import org.apache.iotdb.cluster.concurrent.pool.QPTaskThreadManager;
 import org.apache.iotdb.cluster.config.ClusterConfig;
 import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
@@ -181,7 +181,7 @@ public class Server {
   }
 
   public void stop() throws ProcessorException, RaftConnectionException, 
FileNodeManagerException {
-    QPTaskManager.getInstance().close(true, 
ClusterConstant.CLOSE_THREAD_POOL_BLOCK_TIMEOUT);
+    QPTaskThreadManager.getInstance().close(true, 
ClusterConstant.CLOSE_THREAD_POOL_BLOCK_TIMEOUT);
     ClusterRpcQueryManager.getInstance().close();
     ClusterLocalQueryManager.getInstance().close();
     CLIENT_MANAGER.shutdown();
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
index 25adbf5..f2a9ca3 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
@@ -24,7 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ScheduledFuture;
-import org.apache.iotdb.cluster.concurrent.pool.QueryTimerManager;
+import org.apache.iotdb.cluster.concurrent.pool.QueryTimerThreadManager;
 import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.query.PathType;
 import org.apache.iotdb.cluster.query.factory.ClusterSeriesReaderFactory;
@@ -132,7 +132,7 @@ public class ClusterLocalSingleQueryManager implements 
IClusterLocalSingleQueryM
    */
   public ClusterLocalSingleQueryManager(long jobId) {
     this.jobId = jobId;
-    queryTimer = QueryTimerManager.getInstance()
+    queryTimer = QueryTimerThreadManager.getInstance()
         .execute(new QueryTimerRunnable(), 
ClusterConstant.QUERY_TIMEOUT_IN_QUERY_NODE);
   }
 
@@ -431,7 +431,7 @@ public class ClusterLocalSingleQueryManager implements 
IClusterLocalSingleQueryM
   @Override
   public void resetQueryTimer() {
     queryTimer.cancel(false);
-    queryTimer = QueryTimerManager.getInstance()
+    queryTimer = QueryTimerThreadManager.getInstance()
         .execute(new QueryTimerRunnable(), 
ClusterConstant.QUERY_TIMEOUT_IN_QUERY_NODE);
   }
 
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
index 351eece..f7de451 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
@@ -84,12 +84,18 @@ public class RaftNodeAsClientManager {
    */
   private volatile boolean isShuttingDown;
 
+  /**
+   * Mark whether manager init or not
+   */
+  private volatile boolean isInit;
+
   private RaftNodeAsClientManager() {
 
   }
 
   public void init() {
     isShuttingDown = false;
+    isInit = true;
     taskQueue.clear();
     for (int i = 0; i < CLUSTER_CONFIG.getConcurrentInnerRpcClientThread(); 
i++) {
       THREAD_POOL_MANAGER.execute(() -> {
@@ -109,8 +115,10 @@ public class RaftNodeAsClientManager {
    * Produce qp task to be executed.
    */
   public void produceQPTask(SingleQPTask qpTask) throws 
RaftConnectionException {
+    checkInit();
     resourceLock.lock();
     try {
+      checkInit();
       checkShuttingDown();
       if (taskQueue.size() >= MAX_QUEUE_TASK_NUM) {
         throw new RaftConnectionException(String
@@ -125,6 +133,11 @@ public class RaftNodeAsClientManager {
     }
   }
 
+  public void checkInit(){
+    if(!isInit){
+      init();
+    }
+  }
 
   /**
    * Consume qp task
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManagerTest.java
 
b/cluster/src/test/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManagerTest.java
deleted file mode 100644
index 148d25d..0000000
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManagerTest.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.cluster.concurrent.pool;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.iotdb.cluster.config.ClusterConfig;
-import org.apache.iotdb.cluster.config.ClusterDescriptor;
-import org.apache.iotdb.cluster.utils.EnvironmentUtils;
-import org.apache.iotdb.db.exception.ProcessorException;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class QPTaskManagerTest {
-
-  private QPTaskManager qpTaskManager = QPTaskManager.getInstance();
-
-  private ClusterConfig clusterConfig = 
ClusterDescriptor.getInstance().getConfig();
-
-  private int blockTimeOut = 10;
-
-  private volatile boolean mark = true;
-
-  private Runnable testRunnable = () -> {
-    while(mark){}
-  };
-
-  private Runnable changeMark = () -> {
-    try {
-      Thread.sleep(blockTimeOut);
-      mark = false;
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-    }
-  };
-
-  @Before
-  public void setUp() throws Exception {
-    EnvironmentUtils.envSetUp();
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    EnvironmentUtils.cleanEnv();
-  }
-
-  @Test
-  public void testSubmitAndClose() throws InterruptedException {
-
-    assertEquals(clusterConfig.getConcurrentQPSubTaskThread(), 
qpTaskManager.getThreadPoolSize());
-
-    int threadPoolSize = qpTaskManager.getThreadPoolSize();
-    // test thread num
-    for (int i = 1; i <= threadPoolSize + 2; i++) {
-      qpTaskManager.submit(testRunnable);
-      Thread.sleep(10);
-      assertEquals(Math.min(i, threadPoolSize), qpTaskManager.getActiveCnt());
-    }
-
-    // test close
-    try {
-      new Thread(changeMark).start();
-      qpTaskManager.close(true, blockTimeOut);
-    } catch (ProcessorException e) {
-      assertEquals("qp task manager thread pool doesn't exit after 10 ms", 
e.getMessage());
-    }
-  }
-}
\ No newline at end of file
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationSmallDataIT.java
 
b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationSmallDataIT.java
index 056a70f..d097f8f 100644
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationSmallDataIT.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationSmallDataIT.java
@@ -78,20 +78,11 @@ public class IoTDBAggregationSmallDataIT {
       "CREATE TIMESERIES root.vehicle.d0.s4 WITH DATATYPE=BOOLEAN, 
ENCODING=PLAIN"};
   private static String[] insertSqls = new String[]{
 
-      "insert into root.vehicle.d0(timestamp,s0) values(1,101)",
-      "insert into root.vehicle.d0(timestamp,s0) values(2,198)",
-      "insert into root.vehicle.d0(timestamp,s0) values(100,99)",
-      "insert into root.vehicle.d0(timestamp,s0) values(101,99)",
-      "insert into root.vehicle.d0(timestamp,s0) values(102,80)",
-      "insert into root.vehicle.d0(timestamp,s0) values(103,99)",
       "insert into root.vehicle.d0(timestamp,s0) values(104,90)",
       "insert into root.vehicle.d0(timestamp,s0) values(105,99)",
       "insert into root.vehicle.d0(timestamp,s0) values(106,99)",
-      "insert into root.vehicle.d0(timestamp,s0) values(2,10000)",
-      "insert into root.vehicle.d0(timestamp,s0) values(50,10000)",
       "insert into root.vehicle.d0(timestamp,s0) values(1000,22222)",
       "insert into root.vehicle.d0(timestamp,s0) values(106,199)",
-      "DELETE FROM root.vehicle.d0.s0 WHERE time < 104",
 
       "insert into root.vehicle.d0(timestamp,s1) values(1,1101)",
       "insert into root.vehicle.d0(timestamp,s1) values(2,198)",
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/utils/RaftUtilsTest.java 
b/cluster/src/test/java/org/apache/iotdb/cluster/utils/RaftUtilsTest.java
index b6866ad..d8eed3d 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/utils/RaftUtilsTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/utils/RaftUtilsTest.java
@@ -44,6 +44,7 @@ import 
org.apache.iotdb.cluster.entity.raft.MetadataStateManchine;
 import org.apache.iotdb.cluster.entity.raft.RaftService;
 import org.apache.iotdb.cluster.qp.task.QPTask;
 import org.apache.iotdb.cluster.qp.task.SingleQPTask;
+import org.apache.iotdb.cluster.rpc.raft.impl.RaftNodeAsClientManager;
 import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
 import 
org.apache.iotdb.cluster.rpc.raft.request.nonquery.DataGroupNonQueryRequest;
 import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
@@ -51,6 +52,7 @@ import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
 import org.apache.iotdb.cluster.utils.hash.Router;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.Mock;
 import org.mockito.Mockito;

Reply via email to