This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch cluster in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 6a733d9270b06fa1d22f0ef8488562d09a739d4b Author: lta <[email protected]> AuthorDate: Mon May 20 22:33:10 2019 +0800 fix a serve bug of set readmetadata level --- .../qp/executor/ClusterQueryProcessExecutor.java | 11 ++++++- .../cluster/qp/executor/QueryMetadataExecutor.java | 1 - .../ClusterRpcSingleQueryManager.java | 5 ++++ .../cluster/query/utils/ClusterRpcReaderUtils.java | 3 +- .../querymetadata/QueryPathsAsyncProcessor.java | 1 + .../cluster/service/TSServiceClusterImpl.java | 5 ++-- .../org/apache/iotdb/cluster/utils/RaftUtils.java | 35 +++++----------------- .../iotdb/cluster/integration/IoTDBQueryIT.java | 1 + .../cluster/query/utils/ExpressionUtilsTest.java | 17 ++++------- .../query/utils/QueryPlanPartitionUtilsTest.java | 4 ++- 10 files changed, 39 insertions(+), 44 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/ClusterQueryProcessExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/ClusterQueryProcessExecutor.java index 3dc8c43..30659e5 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/ClusterQueryProcessExecutor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/ClusterQueryProcessExecutor.java @@ -41,13 +41,21 @@ import org.apache.iotdb.tsfile.read.expression.IExpression; import org.apache.iotdb.tsfile.read.expression.QueryExpression; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; import org.apache.iotdb.tsfile.utils.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ClusterQueryProcessExecutor extends AbstractQPExecutor implements IQueryProcessExecutor { + private static final Logger LOGGER = LoggerFactory.getLogger(ClusterQueryProcessExecutor.class); private ThreadLocal<Integer> fetchSize = new ThreadLocal<>(); private ClusterQueryRouter clusterQueryRouter = new ClusterQueryRouter(); - private QueryMetadataExecutor queryMetadataExecutor = new QueryMetadataExecutor(); + private QueryMetadataExecutor queryMetadataExecutor; + + public ClusterQueryProcessExecutor( + QueryMetadataExecutor queryMetadataExecutor) { + this.queryMetadataExecutor = queryMetadataExecutor; + } @Override public QueryDataSet processQuery(QueryPlan queryPlan, QueryContext context) @@ -117,6 +125,7 @@ public class ClusterQueryProcessExecutor extends AbstractQPExecutor implements I public List<String> getAllPaths(String originPath) throws PathErrorException { try { + LOGGER.debug("read metadata level :" + getReadMetadataConsistencyLevel()); return queryMetadataExecutor.processPathsQuery(originPath); } catch (InterruptedException | ProcessorException e) { throw new PathErrorException(e.getMessage()); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java index 9a3f61d..229009e 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java @@ -414,7 +414,6 @@ public class QueryMetadataExecutor extends AbstractQPExecutor { success = true; } catch (RaftConnectionException e1) { LOGGER.debug("Get paths for {} task for group {} to node {} fail.", pathList, groupId, nextNode); - continue; } } LOGGER.debug("The final result for get paths for {} task is {}", pathList, success); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java index 05bf9df..81ca292 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java @@ -53,12 +53,15 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.BatchData; import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.read.filter.basic.Filter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Manage all remote series reader resource in a query resource in coordinator node. */ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManager { + private static final Logger LOGGER = LoggerFactory.getLogger(ClusterRpcSingleQueryManager.class); /** * Statistic all usage of local data group. */ @@ -133,6 +136,7 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag SelectSeriesGroupEntity selectEntity = entry.getValue(); QueryPlan queryPlan = selectEntity.getQueryPlan(); if (!QPExecutorUtils.canHandleQueryByGroupId(groupId)) { + LOGGER.debug("Init series reader for group id {} from remote node." , groupId); Map<PathType, QueryPlan> allQueryPlan = new EnumMap<>(PathType.class); allQueryPlan.put(PathType.SELECT_PATH, queryPlan); List<Filter> filterList = new ArrayList<>(); @@ -149,6 +153,7 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag .createClusterSeriesReader(groupId, request, this); handleInitReaderResponse(groupId, allQueryPlan, response); } else { + LOGGER.debug("Init series reader for group id {} locally." , groupId); dataGroupUsage.add(groupId); selectSeriesGroupEntityMap.remove(groupId); filterSeriesGroupEntityMap.remove(groupId); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java index 0247bbe..d38ca83 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java @@ -72,9 +72,10 @@ public class ClusterRpcReaderUtils { try { response = handleQueryRequest(request, peerId, 0); manager.setQueryNode(groupId, peerId); + LOGGER.debug("Init series reader in Node<{}> of group<{}> success.", peerId, groupId); return response; } catch (RaftConnectionException e) { - LOGGER.error("Can not init series reader in Node<{}> of group<{}>", peerId, groupId, e); + LOGGER.debug("Can not init series reader in Node<{}> of group<{}>", peerId, groupId, e); } } throw new RaftConnectionException( diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryPathsAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryPathsAsyncProcessor.java index 8e1e47b..8e96032 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryPathsAsyncProcessor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryPathsAsyncProcessor.java @@ -90,6 +90,7 @@ public class QueryPathsAsyncProcessor extends BasicAsyncUserProcessor<QueryPaths for (String path : request.getPath()) { response.addPaths(mManager.getPaths(path)); } + System.out.println("Paths: " + response.getPaths()); } @Override diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java b/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java index ff716a3..950d02c 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java @@ -66,9 +66,10 @@ public class TSServiceClusterImpl extends TSServiceImpl { private static final Logger LOGGER = LoggerFactory.getLogger(TSServiceClusterImpl.class); - private ClusterQueryProcessExecutor queryDataExecutor = new ClusterQueryProcessExecutor(); - private NonQueryExecutor nonQueryExecutor = new NonQueryExecutor(); private QueryMetadataExecutor queryMetadataExecutor = new QueryMetadataExecutor(); + private ClusterQueryProcessExecutor queryDataExecutor = new ClusterQueryProcessExecutor( + queryMetadataExecutor); + private NonQueryExecutor nonQueryExecutor = new NonQueryExecutor(); private IClusterRpcQueryManager queryManager = ClusterRpcQueryManager.getInstance(); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java index 9dd3867..c2303d3 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java @@ -98,38 +98,19 @@ public class RaftUtils { */ private static final ConcurrentHashMap<String, PeerId> groupLeaderCache = new ConcurrentHashMap<>(); - private static ThreadLocal<Map<String, Integer>> nodeIndexMap = new ThreadLocal<Map<String, Integer>>() { - @Override - protected Map<String, Integer> initialValue() { - Map<String, Integer> map = new HashMap<>(); - router.getAllGroupId().forEach(groupId -> { - PhysicalNode[] physicalNodes = router.getNodesByGroupId(groupId); - map.put(groupId, getRandomInt(physicalNodes.length)); - }); - return map; - } - }; + private static ThreadLocal<Map<String, Integer>> nodeIndexMap = ThreadLocal.withInitial(() -> { + Map<String, Integer> map = new HashMap<>(); + router.getAllGroupId().forEach(groupId -> { + PhysicalNode[] physicalNodes = router.getNodesByGroupId(groupId); + map.put(groupId, getRandomInt(physicalNodes.length)); + }); + return map; + }); private RaftUtils() { } /** - * Get peer id by input ip - * - * @return null if not found - */ - public static PeerId getPeerIDByIP(String ip) { - RaftService service = (RaftService) server.getMetadataHolder().getService(); - List<PeerId> peerIdList = service.getPeerIdList(); - for (int i = 0; i < peerIdList.size(); i++) { - if (peerIdList.get(i).getIp().equals(ip)) { - return peerIdList.get(i); - } - } - return null; - } - - /** * Get peer ID in order * * @return node id diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBQueryIT.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBQueryIT.java index 5be3cbc..de2bf64 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBQueryIT.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBQueryIT.java @@ -291,6 +291,7 @@ public class IoTDBQueryIT { .getConnection(Config.IOTDB_URL_PREFIX + URL, "root", "root")) { insertData(connection, createSQLs, insertSQLs); Statement statement = connection.createStatement(); + statement.execute("set read metadata level to 2"); for(int i =0 ; i < queryStatementsWithoutFilter.length; i++) { String queryStatement = queryStatementsWithoutFilter[i]; diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/utils/ExpressionUtilsTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/utils/ExpressionUtilsTest.java index 84f8f5f..fb67377 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/query/utils/ExpressionUtilsTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/utils/ExpressionUtilsTest.java @@ -19,26 +19,20 @@ package org.apache.iotdb.cluster.query.utils; import static org.apache.iotdb.cluster.utils.Utils.insertData; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertNull; import java.sql.Connection; import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; import org.apache.iotdb.cluster.config.ClusterConfig; import org.apache.iotdb.cluster.config.ClusterDescriptor; import org.apache.iotdb.cluster.entity.Server; import org.apache.iotdb.cluster.qp.executor.ClusterQueryProcessExecutor; +import org.apache.iotdb.cluster.qp.executor.QueryMetadataExecutor; import org.apache.iotdb.cluster.query.expression.TrueExpression; -import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcQueryManager; -import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager; import org.apache.iotdb.cluster.utils.EnvironmentUtils; import org.apache.iotdb.db.qp.QueryProcessor; import org.apache.iotdb.db.qp.physical.crud.QueryPlan; @@ -54,7 +48,8 @@ import org.junit.Test; public class ExpressionUtilsTest { private Server server; private static final ClusterConfig CLUSTER_CONFIG = ClusterDescriptor.getInstance().getConfig(); - private ClusterQueryProcessExecutor queryDataExecutor = new ClusterQueryProcessExecutor(); + private ClusterQueryProcessExecutor queryDataExecutor = new ClusterQueryProcessExecutor( + new QueryMetadataExecutor()); private QueryProcessor queryProcessor = new QueryProcessor(queryDataExecutor); private static final String URL = "127.0.0.1:6667/"; diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtilsTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtilsTest.java index 363ef98..9b44ade 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtilsTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtilsTest.java @@ -37,6 +37,7 @@ import org.apache.iotdb.cluster.config.ClusterConfig; import org.apache.iotdb.cluster.config.ClusterDescriptor; import org.apache.iotdb.cluster.entity.Server; import org.apache.iotdb.cluster.qp.executor.ClusterQueryProcessExecutor; +import org.apache.iotdb.cluster.qp.executor.QueryMetadataExecutor; import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcQueryManager; import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager; import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterSeriesGroupEntity; @@ -61,7 +62,8 @@ public class QueryPlanPartitionUtilsTest { private static final String LOCAL_ADDR = String .format("%s:%d", CLUSTER_CONFIG.getIp(), CLUSTER_CONFIG.getPort()); private static ClusterRpcQueryManager manager = ClusterRpcQueryManager.getInstance(); - private ClusterQueryProcessExecutor queryDataExecutor = new ClusterQueryProcessExecutor(); + private ClusterQueryProcessExecutor queryDataExecutor = new ClusterQueryProcessExecutor( + new QueryMetadataExecutor()); private QueryProcessor queryProcessor = new QueryProcessor(queryDataExecutor); private static final PhysicalNode localNode = new PhysicalNode(CLUSTER_CONFIG.getIp(), CLUSTER_CONFIG.getPort());
