This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch cluster_read
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster_read by this push:
new fc5f424 fix a bug
new 28217cf Merge branch 'cluster_read' of
github.com:apache/incubator-iotdb into cluster_read
fc5f424 is described below
commit fc5f4242dc104bc9e2f800fe9fe04eaa7d30090f
Author: lta <[email protected]>
AuthorDate: Wed Apr 24 17:27:50 2019 +0800
fix a bug
---
.../cluster/query/manager/querynode/ClusterLocalQueryManager.java | 1 +
.../iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java | 1 +
.../org/apache/iotdb/cluster/service/TSServiceClusterImpl.java | 8 ++++----
.../org/apache/iotdb/cluster/query/ClusterQueryLargeDataTest.java | 5 +++++
.../src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java | 4 ++--
5 files changed, 13 insertions(+), 6 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
index 979f60e..fcb8a14 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
@@ -111,4 +111,5 @@ public class ClusterLocalQueryManager implements
IClusterLocalQueryManager {
public static ConcurrentHashMap<Long, ClusterLocalSingleQueryManager>
getSingleQueryManagerMap() {
return SINGLE_QUERY_MANAGER_MAP;
}
+
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
index 039bf75..b9f2b41 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
@@ -280,6 +280,7 @@ public class RaftNodeAsClientManager {
LOGGER.error(e.getMessage());
qpTask.setTaskState(TaskState.EXCEPTION);
releaseClient(RaftNodeAsClient.this);
+ boltClientService.disconnect(leader.getEndpoint());
qpTask.run(null);
throw new RaftConnectionException(e);
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
b/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
index 075bd91..bfc74c1 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
@@ -71,10 +71,10 @@ public class TSServiceClusterImpl extends TSServiceImpl {
private QueryMetadataExecutor queryMetadataExecutor = new
QueryMetadataExecutor();
private IClusterRpcQueryManager queryManager =
ClusterRpcQueryManager.getInstance();
- private QueryProcessor queryProcessor = new
QueryProcessor(queryDataExecutor);
public TSServiceClusterImpl() throws IOException {
super();
+ processor = new QueryProcessor(queryDataExecutor);
}
@@ -129,7 +129,7 @@ public class TSServiceClusterImpl extends TSServiceImpl {
/** find all valid physical plans **/
for (int i = 0; i < statements.size(); i++) {
try {
- PhysicalPlan plan = queryProcessor
+ PhysicalPlan plan = processor
.parseSQLToPhysicalPlan(statements.get(i), zoneIds.get());
plan.setProposer(username.get());
@@ -298,7 +298,7 @@ public class TSServiceClusterImpl extends TSServiceImpl {
throws PathErrorException, QueryFilterOptimizationException,
FileNodeManagerException,
ProcessorException, IOException {
PhysicalPlan physicalPlan = queryStatus.get().get(statement);
- queryProcessor.getExecutor().setFetchSize(fetchSize);
+ processor.getExecutor().setFetchSize(fetchSize);
long jobId = QueryResourceManager.getInstance().assignJobId();
QueryContext context = new QueryContext(jobId);
@@ -306,7 +306,7 @@ public class TSServiceClusterImpl extends TSServiceImpl {
contextMapLocal.get().put(req.queryId, context);
queryManager.addSingleQuery(jobId, (QueryPlan) physicalPlan);
- QueryDataSet queryDataSet =
queryProcessor.getExecutor().processQuery((QueryPlan) physicalPlan,
+ QueryDataSet queryDataSet =
processor.getExecutor().processQuery((QueryPlan) physicalPlan,
context);
queryRet.get().put(statement, queryDataSet);
return queryDataSet;
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterQueryLargeDataTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterQueryLargeDataTest.java
new file mode 100644
index 0000000..6381f25
--- /dev/null
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterQueryLargeDataTest.java
@@ -0,0 +1,5 @@
+package org.apache.iotdb.cluster.query;
+
+public class ClusterQueryLargeDataTest {
+
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 426b3e8..4e85fad 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -99,7 +99,7 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
protected static final String INFO_NOT_LOGIN = "{}: Not login.";
protected static final String ERROR_NOT_LOGIN = "Not login";
- private QueryProcessor processor = new QueryProcessor(new
OverflowQPExecutor());
+ protected QueryProcessor processor;
// Record the username for every rpc connection. Username.get() is null if
// login is failed.
protected ThreadLocal<String> username = new ThreadLocal<>();
@@ -110,7 +110,7 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
protected ThreadLocal<Map<Long, QueryContext>> contextMapLocal = new
ThreadLocal<>();
public TSServiceImpl() throws IOException {
- // do nothing because there is no need
+ processor = new QueryProcessor(new OverflowQPExecutor());
}
@Override