This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster by this push:
new e4f07e2 fix init of raft node as manager
e4f07e2 is described below
commit e4f07e2a2e45244d59a1063772281439e5e84bed
Author: lta <[email protected]>
AuthorDate: Thu May 23 16:17:32 2019 +0800
fix init of raft node as manager
---
.../cluster/concurrent/pool/QPTaskManager.java | 64 ----------------
.../cluster/concurrent/pool/QueryTimerManager.java | 74 -------------------
.../org/apache/iotdb/cluster/entity/Server.java | 4 +-
.../querynode/ClusterLocalSingleQueryManager.java | 6 +-
.../rpc/raft/impl/RaftNodeAsClientManager.java | 13 ++++
.../cluster/concurrent/pool/QPTaskManagerTest.java | 85 ----------------------
.../integration/IoTDBAggregationSmallDataIT.java | 9 ---
.../apache/iotdb/cluster/utils/RaftUtilsTest.java | 2 +
8 files changed, 20 insertions(+), 237 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManager.java
b/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManager.java
deleted file mode 100644
index d831157..0000000
---
a/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManager.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.cluster.concurrent.pool;
-
-import org.apache.iotdb.cluster.concurrent.ThreadName;
-import org.apache.iotdb.cluster.config.ClusterDescriptor;
-
-/**
- * Manage all qp tasks in thread.
- */
-public class QPTaskManager extends ThreadPoolManager {
-
- private static final String MANAGER_NAME = "qp task manager";
-
- private QPTaskManager() {
- init();
- }
-
- public static QPTaskManager getInstance() {
- return QPTaskManager.InstanceHolder.instance;
- }
-
- /**
- * Name of Pool Manager
- */
- @Override
- public String getManagerName() {
- return MANAGER_NAME;
- }
-
- @Override
- public String getThreadName() {
- return ThreadName.QP_TASK.getName();
- }
-
- @Override
- public int getThreadPoolSize() {
- return
ClusterDescriptor.getInstance().getConfig().getConcurrentQPSubTaskThread();
- }
-
- private static class InstanceHolder {
-
- private InstanceHolder() {
- }
-
- private static QPTaskManager instance = new QPTaskManager();
- }
-}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/QueryTimerManager.java
b/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/QueryTimerManager.java
deleted file mode 100644
index 779488c..0000000
---
a/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/QueryTimerManager.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.cluster.concurrent.pool;
-
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import org.apache.iotdb.cluster.concurrent.ThreadName;
-import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
-
-/**
- * Manage all query timer in query node, if timer is timeout, close all query
resource for remote
- * coordinator node.
- */
-public class QueryTimerManager extends ThreadPoolManager {
-
- private static final String MANAGER_NAME = "remote-query-timer-manager";
-
- private static final int CORE_POOL_SIZE = 1;
-
- @Override
- public void init() {
- pool = IoTDBThreadPoolFactory.newScheduledThreadPool(getThreadPoolSize(),
getThreadName());
- }
-
- public static QueryTimerManager getInstance() {
- return QueryTimerManager.QueryTimerManagerHolder.INSTANCE;
- }
-
- @Override
- public String getManagerName() {
- return MANAGER_NAME;
- }
-
- @Override
- public String getThreadName() {
- return ThreadName.REMOTE_QUERY_TIMER.getName();
- }
-
- @Override
- public int getThreadPoolSize() {
- return CORE_POOL_SIZE;
- }
-
- public ScheduledFuture<?> execute(Runnable task, long delayMs) {
- checkInit();
- return ((ScheduledExecutorService) pool).schedule(task, delayMs,
TimeUnit.MICROSECONDS);
- }
-
- private static class QueryTimerManagerHolder {
-
- private static final QueryTimerManager INSTANCE = new QueryTimerManager();
-
- private QueryTimerManagerHolder() {
-
- }
- }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
index d12f78f..418b625 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
@@ -23,7 +23,7 @@ import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory;
import java.util.HashMap;
import java.util.Map;
-import org.apache.iotdb.cluster.concurrent.pool.QPTaskManager;
+import org.apache.iotdb.cluster.concurrent.pool.QPTaskThreadManager;
import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
@@ -181,7 +181,7 @@ public class Server {
}
public void stop() throws ProcessorException, RaftConnectionException,
FileNodeManagerException {
- QPTaskManager.getInstance().close(true,
ClusterConstant.CLOSE_THREAD_POOL_BLOCK_TIMEOUT);
+ QPTaskThreadManager.getInstance().close(true,
ClusterConstant.CLOSE_THREAD_POOL_BLOCK_TIMEOUT);
ClusterRpcQueryManager.getInstance().close();
ClusterLocalQueryManager.getInstance().close();
CLIENT_MANAGER.shutdown();
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
index 25adbf5..f2a9ca3 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
@@ -24,7 +24,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
-import org.apache.iotdb.cluster.concurrent.pool.QueryTimerManager;
+import org.apache.iotdb.cluster.concurrent.pool.QueryTimerThreadManager;
import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.query.PathType;
import org.apache.iotdb.cluster.query.factory.ClusterSeriesReaderFactory;
@@ -132,7 +132,7 @@ public class ClusterLocalSingleQueryManager implements
IClusterLocalSingleQueryM
*/
public ClusterLocalSingleQueryManager(long jobId) {
this.jobId = jobId;
- queryTimer = QueryTimerManager.getInstance()
+ queryTimer = QueryTimerThreadManager.getInstance()
.execute(new QueryTimerRunnable(),
ClusterConstant.QUERY_TIMEOUT_IN_QUERY_NODE);
}
@@ -431,7 +431,7 @@ public class ClusterLocalSingleQueryManager implements
IClusterLocalSingleQueryM
@Override
public void resetQueryTimer() {
queryTimer.cancel(false);
- queryTimer = QueryTimerManager.getInstance()
+ queryTimer = QueryTimerThreadManager.getInstance()
.execute(new QueryTimerRunnable(),
ClusterConstant.QUERY_TIMEOUT_IN_QUERY_NODE);
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
index 351eece..f7de451 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
@@ -84,12 +84,18 @@ public class RaftNodeAsClientManager {
*/
private volatile boolean isShuttingDown;
+ /**
+ * Mark whether manager init or not
+ */
+ private volatile boolean isInit;
+
private RaftNodeAsClientManager() {
}
public void init() {
isShuttingDown = false;
+ isInit = true;
taskQueue.clear();
for (int i = 0; i < CLUSTER_CONFIG.getConcurrentInnerRpcClientThread();
i++) {
THREAD_POOL_MANAGER.execute(() -> {
@@ -109,8 +115,10 @@ public class RaftNodeAsClientManager {
* Produce qp task to be executed.
*/
public void produceQPTask(SingleQPTask qpTask) throws
RaftConnectionException {
+ checkInit();
resourceLock.lock();
try {
+ checkInit();
checkShuttingDown();
if (taskQueue.size() >= MAX_QUEUE_TASK_NUM) {
throw new RaftConnectionException(String
@@ -125,6 +133,11 @@ public class RaftNodeAsClientManager {
}
}
+ public void checkInit(){
+ if(!isInit){
+ init();
+ }
+ }
/**
* Consume qp task
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManagerTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManagerTest.java
deleted file mode 100644
index 148d25d..0000000
---
a/cluster/src/test/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManagerTest.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.cluster.concurrent.pool;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.iotdb.cluster.config.ClusterConfig;
-import org.apache.iotdb.cluster.config.ClusterDescriptor;
-import org.apache.iotdb.cluster.utils.EnvironmentUtils;
-import org.apache.iotdb.db.exception.ProcessorException;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class QPTaskManagerTest {
-
- private QPTaskManager qpTaskManager = QPTaskManager.getInstance();
-
- private ClusterConfig clusterConfig =
ClusterDescriptor.getInstance().getConfig();
-
- private int blockTimeOut = 10;
-
- private volatile boolean mark = true;
-
- private Runnable testRunnable = () -> {
- while(mark){}
- };
-
- private Runnable changeMark = () -> {
- try {
- Thread.sleep(blockTimeOut);
- mark = false;
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- };
-
- @Before
- public void setUp() throws Exception {
- EnvironmentUtils.envSetUp();
- }
-
- @After
- public void tearDown() throws Exception {
- EnvironmentUtils.cleanEnv();
- }
-
- @Test
- public void testSubmitAndClose() throws InterruptedException {
-
- assertEquals(clusterConfig.getConcurrentQPSubTaskThread(),
qpTaskManager.getThreadPoolSize());
-
- int threadPoolSize = qpTaskManager.getThreadPoolSize();
- // test thread num
- for (int i = 1; i <= threadPoolSize + 2; i++) {
- qpTaskManager.submit(testRunnable);
- Thread.sleep(10);
- assertEquals(Math.min(i, threadPoolSize), qpTaskManager.getActiveCnt());
- }
-
- // test close
- try {
- new Thread(changeMark).start();
- qpTaskManager.close(true, blockTimeOut);
- } catch (ProcessorException e) {
- assertEquals("qp task manager thread pool doesn't exit after 10 ms",
e.getMessage());
- }
- }
-}
\ No newline at end of file
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationSmallDataIT.java
b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationSmallDataIT.java
index 056a70f..d097f8f 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationSmallDataIT.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationSmallDataIT.java
@@ -78,20 +78,11 @@ public class IoTDBAggregationSmallDataIT {
"CREATE TIMESERIES root.vehicle.d0.s4 WITH DATATYPE=BOOLEAN,
ENCODING=PLAIN"};
private static String[] insertSqls = new String[]{
- "insert into root.vehicle.d0(timestamp,s0) values(1,101)",
- "insert into root.vehicle.d0(timestamp,s0) values(2,198)",
- "insert into root.vehicle.d0(timestamp,s0) values(100,99)",
- "insert into root.vehicle.d0(timestamp,s0) values(101,99)",
- "insert into root.vehicle.d0(timestamp,s0) values(102,80)",
- "insert into root.vehicle.d0(timestamp,s0) values(103,99)",
"insert into root.vehicle.d0(timestamp,s0) values(104,90)",
"insert into root.vehicle.d0(timestamp,s0) values(105,99)",
"insert into root.vehicle.d0(timestamp,s0) values(106,99)",
- "insert into root.vehicle.d0(timestamp,s0) values(2,10000)",
- "insert into root.vehicle.d0(timestamp,s0) values(50,10000)",
"insert into root.vehicle.d0(timestamp,s0) values(1000,22222)",
"insert into root.vehicle.d0(timestamp,s0) values(106,199)",
- "DELETE FROM root.vehicle.d0.s0 WHERE time < 104",
"insert into root.vehicle.d0(timestamp,s1) values(1,1101)",
"insert into root.vehicle.d0(timestamp,s1) values(2,198)",
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/utils/RaftUtilsTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/utils/RaftUtilsTest.java
index b6866ad..d8eed3d 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/utils/RaftUtilsTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/utils/RaftUtilsTest.java
@@ -44,6 +44,7 @@ import
org.apache.iotdb.cluster.entity.raft.MetadataStateManchine;
import org.apache.iotdb.cluster.entity.raft.RaftService;
import org.apache.iotdb.cluster.qp.task.QPTask;
import org.apache.iotdb.cluster.qp.task.SingleQPTask;
+import org.apache.iotdb.cluster.rpc.raft.impl.RaftNodeAsClientManager;
import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
import
org.apache.iotdb.cluster.rpc.raft.request.nonquery.DataGroupNonQueryRequest;
import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
@@ -51,6 +52,7 @@ import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
import org.apache.iotdb.cluster.utils.hash.Router;
import org.junit.After;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;