This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster_read
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/cluster_read by this push:
     new fc5f424  fix a bug
     new 28217cf  Merge branch 'cluster_read' of 
github.com:apache/incubator-iotdb into cluster_read
fc5f424 is described below

commit fc5f4242dc104bc9e2f800fe9fe04eaa7d30090f
Author: lta <[email protected]>
AuthorDate: Wed Apr 24 17:27:50 2019 +0800

    fix a bug
---
 .../cluster/query/manager/querynode/ClusterLocalQueryManager.java | 1 +
 .../iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java      | 1 +
 .../org/apache/iotdb/cluster/service/TSServiceClusterImpl.java    | 8 ++++----
 .../org/apache/iotdb/cluster/query/ClusterQueryLargeDataTest.java | 5 +++++
 .../src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java  | 4 ++--
 5 files changed, 13 insertions(+), 6 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
index 979f60e..fcb8a14 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
@@ -111,4 +111,5 @@ public class ClusterLocalQueryManager implements 
IClusterLocalQueryManager {
   public static ConcurrentHashMap<Long, ClusterLocalSingleQueryManager> 
getSingleQueryManagerMap() {
     return SINGLE_QUERY_MANAGER_MAP;
   }
+
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
index 039bf75..b9f2b41 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
@@ -280,6 +280,7 @@ public class RaftNodeAsClientManager {
         LOGGER.error(e.getMessage());
         qpTask.setTaskState(TaskState.EXCEPTION);
         releaseClient(RaftNodeAsClient.this);
+        boltClientService.disconnect(leader.getEndpoint());
         qpTask.run(null);
         throw new RaftConnectionException(e);
       }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
index 075bd91..bfc74c1 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
@@ -71,10 +71,10 @@ public class TSServiceClusterImpl extends TSServiceImpl {
   private QueryMetadataExecutor queryMetadataExecutor = new 
QueryMetadataExecutor();
 
   private IClusterRpcQueryManager queryManager = 
ClusterRpcQueryManager.getInstance();
-  private QueryProcessor queryProcessor = new 
QueryProcessor(queryDataExecutor);
 
   public TSServiceClusterImpl() throws IOException {
     super();
+    processor = new QueryProcessor(queryDataExecutor);
   }
 
 
@@ -129,7 +129,7 @@ public class TSServiceClusterImpl extends TSServiceImpl {
       /** find all valid physical plans **/
       for (int i = 0; i < statements.size(); i++) {
         try {
-          PhysicalPlan plan = queryProcessor
+          PhysicalPlan plan = processor
               .parseSQLToPhysicalPlan(statements.get(i), zoneIds.get());
           plan.setProposer(username.get());
 
@@ -298,7 +298,7 @@ public class TSServiceClusterImpl extends TSServiceImpl {
       throws PathErrorException, QueryFilterOptimizationException, 
FileNodeManagerException,
       ProcessorException, IOException {
     PhysicalPlan physicalPlan = queryStatus.get().get(statement);
-    queryProcessor.getExecutor().setFetchSize(fetchSize);
+    processor.getExecutor().setFetchSize(fetchSize);
 
     long jobId = QueryResourceManager.getInstance().assignJobId();
     QueryContext context = new QueryContext(jobId);
@@ -306,7 +306,7 @@ public class TSServiceClusterImpl extends TSServiceImpl {
     contextMapLocal.get().put(req.queryId, context);
 
     queryManager.addSingleQuery(jobId, (QueryPlan) physicalPlan);
-    QueryDataSet queryDataSet = 
queryProcessor.getExecutor().processQuery((QueryPlan) physicalPlan,
+    QueryDataSet queryDataSet = 
processor.getExecutor().processQuery((QueryPlan) physicalPlan,
         context);
     queryRet.get().put(statement, queryDataSet);
     return queryDataSet;
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterQueryLargeDataTest.java
 
b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterQueryLargeDataTest.java
new file mode 100644
index 0000000..6381f25
--- /dev/null
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterQueryLargeDataTest.java
@@ -0,0 +1,5 @@
+package org.apache.iotdb.cluster.query;
+
+public class ClusterQueryLargeDataTest {
+
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java 
b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 426b3e8..4e85fad 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -99,7 +99,7 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
   protected static final String INFO_NOT_LOGIN = "{}: Not login.";
   protected static final String ERROR_NOT_LOGIN = "Not login";
 
-  private QueryProcessor processor = new QueryProcessor(new 
OverflowQPExecutor());
+  protected QueryProcessor processor;
   // Record the username for every rpc connection. Username.get() is null if
   // login is failed.
   protected ThreadLocal<String> username = new ThreadLocal<>();
@@ -110,7 +110,7 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
   protected ThreadLocal<Map<Long, QueryContext>> contextMapLocal = new 
ThreadLocal<>();
 
   public TSServiceImpl() throws IOException {
-    // do nothing because there is no need
+     processor = new QueryProcessor(new OverflowQPExecutor());
   }
 
   @Override

Reply via email to