This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch cluster in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 20c93d16e736c672c8c75ba62909afce5361171e Author: lta <[email protected]> AuthorDate: Tue May 21 01:08:28 2019 +0800 fix a serve bug --- .../manager/coordinatornode/ClusterRpcSingleQueryManager.java | 7 +++++++ .../query/reader/querynode/ClusterSelectSeriesBatchReader.java | 1 + .../raft/processor/querydata/InitSeriesReaderSyncProcessor.java | 5 +++++ .../raft/processor/querymetadata/QueryPathsAsyncProcessor.java | 1 - .../org/apache/iotdb/cluster/service/TSServiceClusterImpl.java | 9 ++++++++- .../java/org/apache/iotdb/cluster/integration/IoTDBQueryIT.java | 1 - 6 files changed, 21 insertions(+), 3 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java index 81ca292..8cc4ccd 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java @@ -187,6 +187,7 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag */ private void handleInitReaderResponse(String groupId, Map<PathType, QueryPlan> allQueryPlan, InitSeriesReaderResponse response) { + LOGGER.debug("Handle init reader response of group id {}", groupId); /** create cluster series reader **/ if (allQueryPlan.containsKey(PathType.SELECT_PATH)) { QueryPlan plan = allQueryPlan.get(PathType.SELECT_PATH); @@ -217,14 +218,18 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag @Override public void fetchBatchDataForSelectPaths(String groupId) throws RaftConnectionException { List<Integer> fetchDataSeriesIndexs = new ArrayList<>(); + List<Path> fetchDataSeries = new ArrayList<>(); List<Path> selectSeries = selectSeriesGroupEntityMap.get(groupId).getSelectPaths(); List<ClusterSelectSeriesReader> seriesReaders = selectSeriesGroupEntityMap.get(groupId) .getSelectSeriesReaders(); for (int i = 0; i < selectSeries.size(); i++) { if (seriesReaders.get(i).enableFetchData()) { fetchDataSeriesIndexs.add(i); + fetchDataSeries.add(selectSeries.get(i)); } } + LOGGER.debug("Fetch data for paths {} of group id {} from node {}", fetchDataSeries, groupId, + queryNodes.get(groupId)); BasicRequest request = QuerySeriesDataRequest .createFetchDataRequest(groupId, taskId, PathType.SELECT_PATH, fetchDataSeriesIndexs, queryRounds++); @@ -236,6 +241,8 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag @Override public void fetchBatchDataForAllFilterPaths(String groupId) throws RaftConnectionException { + LOGGER.debug("Fetch Data for filter paths {} of group id {} from node {}", + filterSeriesGroupEntityMap.get(groupId).getFilterPaths(), groupId, queryNodes.get(groupId)); BasicRequest request = QuerySeriesDataRequest .createFetchDataRequest(groupId, taskId, PathType.FILTER_PATH, null, queryRounds++); QuerySeriesDataResponse response = (QuerySeriesDataResponse) ClusterRpcReaderUtils diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReader.java index cbbad2e..cfc43b8 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReader.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReader.java @@ -62,6 +62,7 @@ public class ClusterSelectSeriesBatchReader extends for (int i = 0; i < CLUSTER_CONF.getBatchReadSize(); i++) { if (hasNext()) { TimeValuePair pair = reader.next(); + System.out.println("reader value:" + pair); batchData.putTime(pair.getTimestamp()); batchData.putAnObject(pair.getValue().getValue()); } else { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querydata/InitSeriesReaderSyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querydata/InitSeriesReaderSyncProcessor.java index 7474eec..a64d909 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querydata/InitSeriesReaderSyncProcessor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querydata/InitSeriesReaderSyncProcessor.java @@ -27,9 +27,13 @@ import org.apache.iotdb.cluster.rpc.raft.request.querydata.InitSeriesReaderReque import org.apache.iotdb.cluster.utils.QPExecutorUtils; import org.apache.iotdb.cluster.utils.RaftUtils; import org.apache.iotdb.db.exception.ProcessorException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class InitSeriesReaderSyncProcessor extends BasicSyncUserProcessor<InitSeriesReaderRequest> { + private static final Logger LOGGER = LoggerFactory.getLogger(InitSeriesReaderSyncProcessor.class); + @Override public Object handleRequest(BizContext bizContext, InitSeriesReaderRequest request) throws Exception { @@ -46,6 +50,7 @@ public class InitSeriesReaderSyncProcessor extends BasicSyncUserProcessor<InitSe * @param groupId group id */ private void handleNullRead(int readConsistencyLevel, String groupId) throws ProcessorException { + LOGGER.debug("Read data level is {}", readConsistencyLevel); if (readConsistencyLevel == ClusterConstant.STRONG_CONSISTENCY_LEVEL && !QPExecutorUtils .checkDataGroupLeader(groupId)) { Status nullReadTaskStatus = Status.OK(); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryPathsAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryPathsAsyncProcessor.java index 8e96032..8e1e47b 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryPathsAsyncProcessor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryPathsAsyncProcessor.java @@ -90,7 +90,6 @@ public class QueryPathsAsyncProcessor extends BasicAsyncUserProcessor<QueryPaths for (String path : request.getPath()) { response.addPaths(mManager.getPaths(path)); } - System.out.println("Paths: " + response.getPaths()); } @Override diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java b/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java index 950d02c..4306d673 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java @@ -307,7 +307,14 @@ public class TSServiceClusterImpl extends TSServiceImpl { queryManager.addSingleQuery(jobId, (QueryPlan) physicalPlan); QueryDataSet queryDataSet = processor.getExecutor().processQuery((QueryPlan) physicalPlan, context); - queryRet.get().put(statement, queryDataSet); + System.out.println("Create new Data Set"); + try { + queryRet.get().put(statement, queryDataSet); + }catch (Exception e){ + e.printStackTrace(); + } + System.out.println("Create new Data Set complete"); + System.out.println(queryDataSet == null); return queryDataSet; } diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBQueryIT.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBQueryIT.java index de2bf64..5be3cbc 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBQueryIT.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBQueryIT.java @@ -291,7 +291,6 @@ public class IoTDBQueryIT { .getConnection(Config.IOTDB_URL_PREFIX + URL, "root", "root")) { insertData(connection, createSQLs, insertSQLs); Statement statement = connection.createStatement(); - statement.execute("set read metadata level to 2"); for(int i =0 ; i < queryStatementsWithoutFilter.length; i++) { String queryStatement = queryStatementsWithoutFilter[i];
