This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch cluster in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 3020603ac6130b06d499b3da10120be5dbadc420 Author: lta <[email protected]> AuthorDate: Mon May 20 11:12:12 2019 +0800 Increase the function of query polling --- .../ClusterRpcSingleQueryManager.java | 84 +++++++------------ .../cluster/query/utils/ClusterRpcReaderUtils.java | 95 ++++++---------------- .../iotdb/cluster/rpc/raft/NodeAsClient.java | 3 +- .../org/apache/iotdb/cluster/utils/RaftUtils.java | 52 ++++++++---- .../integration/IoTDBAggregationLargeDataIT.java | 1 - 5 files changed, 93 insertions(+), 142 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java index 6c4f2ad..af4db31 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java @@ -37,12 +37,16 @@ import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterFilterSeries import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterSelectSeriesReader; import org.apache.iotdb.cluster.query.utils.ClusterRpcReaderUtils; import org.apache.iotdb.cluster.query.utils.QueryPlanPartitionUtils; +import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest; +import org.apache.iotdb.cluster.rpc.raft.request.querydata.CloseSeriesReaderRequest; +import org.apache.iotdb.cluster.rpc.raft.request.querydata.InitSeriesReaderRequest; +import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataByTimestampRequest; +import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataRequest; import org.apache.iotdb.cluster.rpc.raft.response.BasicQueryDataResponse; import org.apache.iotdb.cluster.rpc.raft.response.querydata.InitSeriesReaderResponse; import org.apache.iotdb.cluster.rpc.raft.response.querydata.QuerySeriesDataByTimestampResponse; import org.apache.iotdb.cluster.rpc.raft.response.querydata.QuerySeriesDataResponse; import org.apache.iotdb.cluster.utils.QPExecutorUtils; -import org.apache.iotdb.cluster.utils.RaftUtils; import org.apache.iotdb.db.exception.PathErrorException; import org.apache.iotdb.db.qp.physical.crud.QueryPlan; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -140,8 +144,6 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag String groupId = entry.getKey(); QueryPlan queryPlan = entry.getValue(); if (!QPExecutorUtils.canHandleQueryByGroupId(groupId)) { - PeerId randomPeer = RaftUtils.getRandomPeerID(groupId); - queryNodes.put(groupId, randomPeer); Map<PathType, QueryPlan> allQueryPlan = new EnumMap<>(PathType.class); allQueryPlan.put(PathType.SELECT_PATH, queryPlan); List<Filter> filterList = new ArrayList<>(); @@ -150,9 +152,12 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag allQueryPlan.put(PathType.FILTER_PATH, filterGroupEntity.getQueryPlan()); filterList = filterGroupEntity.getFilters(); } + /** create request **/ + BasicRequest request = InitSeriesReaderRequest + .createInitialQueryRequest(groupId, taskId, readDataConsistencyLevel, + allQueryPlan, filterList); InitSeriesReaderResponse response = (InitSeriesReaderResponse) ClusterRpcReaderUtils - .createClusterSeriesReader(groupId, randomPeer, readDataConsistencyLevel, - allQueryPlan, taskId, filterList); + .createClusterSeriesReader(groupId, request, this); handleInitReaderResponse(groupId, allQueryPlan, response); } else { dataGroupUsage.add(groupId); @@ -167,14 +172,15 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag for (Entry<String, FilterGroupEntity> entry : filterGroupEntityMap.entrySet()) { String groupId = entry.getKey(); if (!selectPathPlans.containsKey(groupId)) { - PeerId randomPeer = RaftUtils.getRandomPeerID(groupId); Map<PathType, QueryPlan> allQueryPlan = new EnumMap<>(PathType.class); FilterGroupEntity filterGroupEntity = filterGroupEntityMap.get(groupId); allQueryPlan.put(PathType.FILTER_PATH, filterGroupEntity.getQueryPlan()); List<Filter> filterList = filterGroupEntity.getFilters(); + BasicRequest request = InitSeriesReaderRequest + .createInitialQueryRequest(groupId, taskId, readDataConsistencyLevel, + allQueryPlan, filterList); InitSeriesReaderResponse response = (InitSeriesReaderResponse) ClusterRpcReaderUtils - .createClusterSeriesReader(groupId, randomPeer, readDataConsistencyLevel, - allQueryPlan, taskId, filterList); + .createClusterSeriesReader(groupId, request, this); handleInitReaderResponse(groupId, allQueryPlan, response); } } @@ -230,18 +236,21 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag } } } - QuerySeriesDataResponse response = ClusterRpcReaderUtils - .fetchBatchData(groupId, queryNodes.get(groupId), taskId, PathType.SELECT_PATH, - fetchDataSeries, + BasicRequest request = QuerySeriesDataRequest + .createFetchDataRequest(groupId, taskId, PathType.SELECT_PATH, fetchDataSeries, queryRounds++); + QuerySeriesDataResponse response = (QuerySeriesDataResponse) ClusterRpcReaderUtils + .handleQueryRequest(request, queryNodes.get(groupId), 0); handleFetchDataResponseForSelectPaths(fetchDataSeries, response); } @Override public void fetchBatchDataForFilterPaths(String groupId) throws RaftConnectionException { - QuerySeriesDataResponse response = ClusterRpcReaderUtils - .fetchBatchData(groupId, queryNodes.get(groupId), taskId, PathType.FILTER_PATH, null, - queryRounds++); + BasicRequest request = QuerySeriesDataRequest + .createFetchDataRequest(groupId, taskId, PathType.FILTER_PATH, null, queryRounds++); + QuerySeriesDataResponse response = (QuerySeriesDataResponse) ClusterRpcReaderUtils + .handleQueryRequest(request, queryNodes.get(groupId), 0); + handleFetchDataResponseForFilterPaths(groupId, response); } @@ -253,9 +262,10 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag String groupId = entry.getKey(); List<String> fetchDataFilterSeries = new ArrayList<>(); entry.getValue().forEach(path -> fetchDataFilterSeries.add(path.getFullPath())); - QuerySeriesDataByTimestampResponse response = ClusterRpcReaderUtils - .fetchBatchDataByTimestamp(groupId, queryNodes.get(groupId), taskId, queryRounds++, - batchTimestamp, fetchDataFilterSeries); + BasicRequest request = QuerySeriesDataByTimestampRequest + .createRequest(groupId, queryRounds++, taskId, batchTimestamp, fetchDataFilterSeries); + QuerySeriesDataByTimestampResponse response = (QuerySeriesDataByTimestampResponse) ClusterRpcReaderUtils + .handleQueryRequest(request, queryNodes.get(groupId), 0); handleFetchDataByTimestampResponseForSelectPaths(fetchDataFilterSeries, response); } } @@ -332,7 +342,8 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag for (Entry<String, PeerId> entry : queryNodes.entrySet()) { String groupId = entry.getKey(); PeerId queryNode = entry.getValue(); - ClusterRpcReaderUtils.releaseRemoteQueryResource(groupId, queryNode, taskId); + BasicRequest request = CloseSeriesReaderRequest.createReleaseResourceRequest(groupId, taskId); + ClusterRpcReaderUtils.handleQueryRequest(request, queryNode, 0); } } @@ -356,60 +367,27 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag return queryRounds; } - public void setQueryRounds(long queryRounds) { - this.queryRounds = queryRounds; - } - public QueryPlan getOriginQueryPlan() { return originQueryPlan; } - public void setOriginQueryPlan(QueryPlan queryPlan) { - this.originQueryPlan = queryPlan; - } - - public Map<String, PeerId> getQueryNodes() { - return queryNodes; - } - - public void setQueryNodes( - Map<String, PeerId> queryNodes) { - this.queryNodes = queryNodes; + public void setQueryNode(String groupID, PeerId peerId) { + this.queryNodes.put(groupID, peerId); } public Map<String, QueryPlan> getSelectPathPlans() { return selectPathPlans; } - public void setSelectPathPlans( - Map<String, QueryPlan> selectPathPlans) { - this.selectPathPlans = selectPathPlans; - } - public Map<String, List<Path>> getSelectSeriesByGroupId() { return selectSeriesByGroupId; } - public void setSelectSeriesByGroupId( - Map<String, List<Path>> selectSeriesByGroupId) { - this.selectSeriesByGroupId = selectSeriesByGroupId; - } - public Map<Path, ClusterSelectSeriesReader> getSelectSeriesReaders() { return selectSeriesReaders; } - public void setSelectSeriesReaders( - Map<Path, ClusterSelectSeriesReader> selectSeriesReaders) { - this.selectSeriesReaders = selectSeriesReaders; - } - public Map<String, FilterGroupEntity> getFilterGroupEntityMap() { return filterGroupEntityMap; } - - public void setFilterGroupEntityMap( - Map<String, FilterGroupEntity> filterGroupEntityMap) { - this.filterGroupEntityMap = filterGroupEntityMap; - } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java index dca2d30..0247bbe 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java @@ -19,32 +19,34 @@ package org.apache.iotdb.cluster.query.utils; import com.alipay.sofa.jraft.entity.PeerId; -import java.io.IOException; import java.util.List; -import java.util.Map; import org.apache.iotdb.cluster.config.ClusterDescriptor; +import org.apache.iotdb.cluster.entity.Server; import org.apache.iotdb.cluster.exception.RaftConnectionException; import org.apache.iotdb.cluster.qp.task.QPTask.TaskState; import org.apache.iotdb.cluster.qp.task.QueryTask; import org.apache.iotdb.cluster.query.PathType; +import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager; import org.apache.iotdb.cluster.rpc.raft.NodeAsClient; import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest; import org.apache.iotdb.cluster.rpc.raft.request.querydata.CloseSeriesReaderRequest; -import org.apache.iotdb.cluster.rpc.raft.request.querydata.InitSeriesReaderRequest; import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataByTimestampRequest; import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataRequest; import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse; import org.apache.iotdb.cluster.rpc.raft.response.querydata.QuerySeriesDataByTimestampResponse; import org.apache.iotdb.cluster.rpc.raft.response.querydata.QuerySeriesDataResponse; import org.apache.iotdb.cluster.utils.RaftUtils; -import org.apache.iotdb.db.qp.physical.crud.QueryPlan; -import org.apache.iotdb.tsfile.read.filter.basic.Filter; +import org.apache.iotdb.cluster.utils.hash.Router; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Utils for cluster reader which needs to acquire data from remote query node. */ public class ClusterRpcReaderUtils { + private static final Logger LOGGER = LoggerFactory.getLogger(ClusterRpcReaderUtils.class); + /** * Count limit to redo a task */ @@ -56,72 +58,27 @@ public class ClusterRpcReaderUtils { /** * Create cluster series reader - * - * @param peerId query node to fetch data - * @param readDataConsistencyLevel consistency level of read data - * @param taskId task id assigned by coordinator node */ - public static BasicResponse createClusterSeriesReader(String groupId, PeerId peerId, - int readDataConsistencyLevel, Map<PathType, QueryPlan> allQueryPlan, String taskId, - List<Filter> filterList) throws RaftConnectionException, IOException { - - /** handle request **/ - BasicRequest request = InitSeriesReaderRequest - .createInitialQueryRequest(groupId, taskId, readDataConsistencyLevel, - allQueryPlan, filterList); - return handleQueryRequest(request, peerId, 0); - } - - /** - * Fetch batch data for select series in a query without value filter or filter series. - * - * @param groupId data group id - * @param peerId query node id - * @param taskId task id of query task - * @param pathType type of path - * @param fetchDataSeries series list which need to fetch data - * @param queryRounds query rounds - */ - public static QuerySeriesDataResponse fetchBatchData(String groupId, PeerId peerId, String taskId, - PathType pathType, List<String> fetchDataSeries, long queryRounds) + public static BasicResponse createClusterSeriesReader(String groupId, BasicRequest request, + ClusterRpcSingleQueryManager manager) throws RaftConnectionException { - BasicRequest request = QuerySeriesDataRequest - .createFetchDataRequest(groupId, taskId, pathType, fetchDataSeries, queryRounds); - return (QuerySeriesDataResponse) handleQueryRequest(request, peerId, 0); - } - /** - * Fetch batch data corresponding to a given list of timestamp for select series in a query with - * value filter. - * - * @param groupId data group id - * @param peerId query node id - * @param taskId task id of query task - * @param queryRounds query rounds - * @param batchTimestamp list of valid timestamp - * @param fetchDataSeries series list which need to fetch data - */ - public static QuerySeriesDataByTimestampResponse fetchBatchDataByTimestamp(String groupId, - PeerId peerId, String taskId, long queryRounds, List<Long> batchTimestamp, - List<String> fetchDataSeries) - throws RaftConnectionException { - BasicRequest request = QuerySeriesDataByTimestampRequest - .createRequest(groupId, queryRounds, taskId, batchTimestamp, fetchDataSeries); - return (QuerySeriesDataByTimestampResponse) handleQueryRequest(request, peerId, 0); - } - - /** - * Release remote query resources - * - * @param groupId data group id - * @param peerId target query node - * @param taskId unique task id - */ - public static void releaseRemoteQueryResource(String groupId, PeerId peerId, String taskId) - throws RaftConnectionException { - - BasicRequest request = CloseSeriesReaderRequest.createReleaseResourceRequest(groupId, taskId); - handleQueryRequest(request, peerId, 0); + List<PeerId> peerIdList = RaftUtils + .getPeerIDList(groupId, Server.getInstance(), Router.getInstance()); + int randomPeerIndex = RaftUtils.getRandomInt(peerIdList.size()); + BasicResponse response; + for (int i = 0; i < peerIdList.size(); i++) { + PeerId peerId = peerIdList.get((i + randomPeerIndex) % peerIdList.size()); + try { + response = handleQueryRequest(request, peerId, 0); + manager.setQueryNode(groupId, peerId); + return response; + } catch (RaftConnectionException e) { + LOGGER.error("Can not init series reader in Node<{}> of group<{}>", peerId, groupId, e); + } + } + throw new RaftConnectionException( + String.format("Can not init series reader in all nodes of group<%s>.", groupId)); } /** @@ -132,7 +89,7 @@ public class ClusterRpcReaderUtils { * @param taskRetryNum retry num of the request * @return Response from remote query node */ - private static BasicResponse handleQueryRequest(BasicRequest request, PeerId peerId, + public static BasicResponse handleQueryRequest(BasicRequest request, PeerId peerId, int taskRetryNum) throws RaftConnectionException { if (taskRetryNum > TASK_MAX_RETRY) { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java index bab1536..197c7eb 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java @@ -42,8 +42,7 @@ public interface NodeAsClient { * @param peerId leader node of the target group * */ - QueryTask syncHandleRequest(BasicRequest request, PeerId peerId) - throws RaftConnectionException; + QueryTask syncHandleRequest(BasicRequest request, PeerId peerId); /** * Shut down client diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java index d5486fb..61fbdca 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java @@ -118,18 +118,29 @@ public class RaftUtils { return getRandomPeerID(groupId, server, router); } + /** + * Get random peer id + */ public static PeerId getRandomPeerID(String groupId, Server server, Router router) { - PeerId randomPeerId; + List<PeerId> peerIdList = getPeerIDList(groupId, server, router); + return peerIdList.get(getRandomInt(peerIdList.size())); + } + + /** + * Get peer id list by groupid + */ + public static List<PeerId> getPeerIDList(String groupId, Server server, Router router) { + List<PeerId> peerIdList = new ArrayList<>(); if (groupId.equals(ClusterConfig.METADATA_GROUP_ID)) { RaftService service = (RaftService) server.getMetadataHolder().getService(); - List<PeerId> peerIdList = service.getPeerIdList(); - randomPeerId = peerIdList.get(getRandomInt(peerIdList.size())); + peerIdList.addAll(service.getPeerIdList()); } else { PhysicalNode[] physicalNodes = router.getNodesByGroupId(groupId); - PhysicalNode node = physicalNodes[getRandomInt(physicalNodes.length)]; - randomPeerId = getPeerIDFrom(node); + for (PhysicalNode node : physicalNodes) { + peerIdList.add(getPeerIDFrom(node)); + } } - return randomPeerId; + return peerIdList; } /** @@ -196,7 +207,7 @@ public class RaftUtils { @OnlyForTest public static void clearRaftGroupLeader() { - groupLeaderCache.clear(); + groupLeaderCache.clear(); } /** @@ -339,7 +350,8 @@ public class RaftUtils { try { LOGGER.debug("Handle null-read in data group for reading."); final byte[] reqContext = RaftUtils.createRaftRequestContext(); - DataPartitionRaftHolder dataPartitionRaftHolder = (DataPartitionRaftHolder) server.getDataPartitionHolder(groupId); + DataPartitionRaftHolder dataPartitionRaftHolder = (DataPartitionRaftHolder) server + .getDataPartitionHolder(groupId); ((RaftService) dataPartitionRaftHolder.getService()).getNode() .readIndex(reqContext, new ReadIndexClosure() { @Override @@ -360,7 +372,7 @@ public class RaftUtils { } } - public static Status createErrorStatus(String errorMsg){ + public static Status createErrorStatus(String errorMsg) { Status status = new Status(); status.setErrorMsg(errorMsg); status.setCode(-1); @@ -386,8 +398,8 @@ public class RaftUtils { } /** - * Get all node information of the data group of input storage group. - * The first node is the current leader + * Get all node information of the data group of input storage group. The first node is the + * current leader * * @param sg storage group ID. If null, return metadata group info */ @@ -400,7 +412,8 @@ public class RaftUtils { PeerId[] nodes; if (sg == null) { groupId = ClusterConfig.METADATA_GROUP_ID; - List<PeerId> peerIdList = ((RaftService) server.getMetadataHolder().getService()).getPeerIdList(); + List<PeerId> peerIdList = ((RaftService) server.getMetadataHolder().getService()) + .getPeerIdList(); nodes = peerIdList.toArray(new PeerId[peerIdList.size()]); } else { PhysicalNode[] group = router.routeGroup(sg); @@ -434,7 +447,8 @@ public class RaftUtils { return getDataPartitionOfNode(ip, port, server, router); } - public static Map<String[], String[]> getDataPartitionOfNode(String ip, int port, Server server, Router router) { + public static Map<String[], String[]> getDataPartitionOfNode(String ip, int port, Server server, + Router router) { PhysicalNode[][] groups = router.getGroupsNodes(ip, port); if (groups == null) { return null; @@ -444,7 +458,8 @@ public class RaftUtils { for (int i = 0; i < groups.length; i++) { groupSGMap.put(generateStringKey(groups[i]), new ArrayList<>()); } - Set<String> allSGList = ((MetadataStateManchine)((RaftService)server.getMetadataHolder().getService()).getFsm()).getAllStorageGroups(); + Set<String> allSGList = ((MetadataStateManchine) ((RaftService) server.getMetadataHolder() + .getService()).getFsm()).getAllStorageGroups(); for (String sg : allSGList) { String key = generateStringKey(router.routeGroup(sg)); if (groupSGMap.containsKey(key)) { @@ -496,7 +511,8 @@ public class RaftUtils { RaftService raftService = (RaftService) server.getMetadataHolder().getService(); metricMap.put(raftService.getGroupId(), getReplicaMetricFromRaftService(raftService, metric)); - router.getAllGroupId().forEach(groupId -> metricMap.put(groupId, getReplicaMetric(groupId, metric))); + router.getAllGroupId() + .forEach(groupId -> metricMap.put(groupId, getReplicaMetric(groupId, metric))); return metricMap; } @@ -505,12 +521,14 @@ public class RaftUtils { RaftService service = (RaftService) server.getDataPartitionHolder(groupId).getService(); return getReplicaMetricFromRaftService(service, metric); } else { - LOGGER.debug("Current host does not contain group {}, all groups are {}.", groupId, server.getDataPartitionHolderMap().keySet()); + LOGGER.debug("Current host does not contain group {}, all groups are {}.", groupId, + server.getDataPartitionHolderMap().keySet()); return getReplicaMetricFromRemoteNode(groupId, metric); } } - private static Map<String, Long> getReplicaMetricFromRaftService(RaftService service, String metric) { + private static Map<String, Long> getReplicaMetricFromRaftService(RaftService service, + String metric) { String groupId = service.getGroupId(); LOGGER.debug("Get replica metric {} for group {}.", metric, service.getGroupId()); NodeImpl node = (NodeImpl) service.getNode(); diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationLargeDataIT.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationLargeDataIT.java index 973d3e7..494029c 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationLargeDataIT.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationLargeDataIT.java @@ -35,7 +35,6 @@ import java.sql.Statement; import org.apache.iotdb.cluster.config.ClusterConfig; import org.apache.iotdb.cluster.config.ClusterDescriptor; import org.apache.iotdb.cluster.entity.Server; -import org.apache.iotdb.cluster.integration.Constant; import org.apache.iotdb.cluster.utils.EnvironmentUtils; import org.apache.iotdb.cluster.utils.QPExecutorUtils; import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
