This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 6a733d9270b06fa1d22f0ef8488562d09a739d4b
Author: lta <[email protected]>
AuthorDate: Mon May 20 22:33:10 2019 +0800

    fix a serve bug of set readmetadata level
---
 .../qp/executor/ClusterQueryProcessExecutor.java   | 11 ++++++-
 .../cluster/qp/executor/QueryMetadataExecutor.java |  1 -
 .../ClusterRpcSingleQueryManager.java              |  5 ++++
 .../cluster/query/utils/ClusterRpcReaderUtils.java |  3 +-
 .../querymetadata/QueryPathsAsyncProcessor.java    |  1 +
 .../cluster/service/TSServiceClusterImpl.java      |  5 ++--
 .../org/apache/iotdb/cluster/utils/RaftUtils.java  | 35 +++++-----------------
 .../iotdb/cluster/integration/IoTDBQueryIT.java    |  1 +
 .../cluster/query/utils/ExpressionUtilsTest.java   | 17 ++++-------
 .../query/utils/QueryPlanPartitionUtilsTest.java   |  4 ++-
 10 files changed, 39 insertions(+), 44 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/ClusterQueryProcessExecutor.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/ClusterQueryProcessExecutor.java
index 3dc8c43..30659e5 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/ClusterQueryProcessExecutor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/ClusterQueryProcessExecutor.java
@@ -41,13 +41,21 @@ import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.expression.QueryExpression;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ClusterQueryProcessExecutor extends AbstractQPExecutor implements 
IQueryProcessExecutor {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ClusterQueryProcessExecutor.class);
   private ThreadLocal<Integer> fetchSize = new ThreadLocal<>();
   private ClusterQueryRouter clusterQueryRouter = new ClusterQueryRouter();
 
-  private QueryMetadataExecutor queryMetadataExecutor = new 
QueryMetadataExecutor();
+  private QueryMetadataExecutor queryMetadataExecutor;
+
+  public ClusterQueryProcessExecutor(
+      QueryMetadataExecutor queryMetadataExecutor) {
+    this.queryMetadataExecutor = queryMetadataExecutor;
+  }
 
   @Override
   public QueryDataSet processQuery(QueryPlan queryPlan, QueryContext context)
@@ -117,6 +125,7 @@ public class ClusterQueryProcessExecutor extends 
AbstractQPExecutor implements I
   public List<String> getAllPaths(String originPath)
       throws PathErrorException {
     try {
+      LOGGER.debug("read metadata level :" + 
getReadMetadataConsistencyLevel());
       return queryMetadataExecutor.processPathsQuery(originPath);
     } catch (InterruptedException | ProcessorException e) {
       throw new PathErrorException(e.getMessage());
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index 9a3f61d..229009e 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -414,7 +414,6 @@ public class QueryMetadataExecutor extends 
AbstractQPExecutor {
           success = true;
         } catch (RaftConnectionException e1) {
           LOGGER.debug("Get paths for {} task for group {} to node {} fail.", 
pathList, groupId, nextNode);
-          continue;
         }
       }
       LOGGER.debug("The final result for get paths for {} task is {}", 
pathList, success);
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
index 05bf9df..81ca292 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
@@ -53,12 +53,15 @@ import 
org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Manage all remote series reader resource in a query resource in coordinator 
node.
  */
 public class ClusterRpcSingleQueryManager implements 
IClusterRpcSingleQueryManager {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ClusterRpcSingleQueryManager.class);
   /**
    * Statistic all usage of local data group.
    */
@@ -133,6 +136,7 @@ public class ClusterRpcSingleQueryManager implements 
IClusterRpcSingleQueryManag
       SelectSeriesGroupEntity selectEntity = entry.getValue();
       QueryPlan queryPlan = selectEntity.getQueryPlan();
       if (!QPExecutorUtils.canHandleQueryByGroupId(groupId)) {
+        LOGGER.debug("Init series reader for group id {} from remote node." , 
groupId);
         Map<PathType, QueryPlan> allQueryPlan = new EnumMap<>(PathType.class);
         allQueryPlan.put(PathType.SELECT_PATH, queryPlan);
         List<Filter> filterList = new ArrayList<>();
@@ -149,6 +153,7 @@ public class ClusterRpcSingleQueryManager implements 
IClusterRpcSingleQueryManag
             .createClusterSeriesReader(groupId, request, this);
         handleInitReaderResponse(groupId, allQueryPlan, response);
       } else {
+        LOGGER.debug("Init series reader for group id {} locally." , groupId);
         dataGroupUsage.add(groupId);
         selectSeriesGroupEntityMap.remove(groupId);
         filterSeriesGroupEntityMap.remove(groupId);
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java
index 0247bbe..d38ca83 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java
@@ -72,9 +72,10 @@ public class ClusterRpcReaderUtils {
       try {
         response = handleQueryRequest(request, peerId, 0);
         manager.setQueryNode(groupId, peerId);
+        LOGGER.debug("Init series reader in Node<{}> of group<{}> success.", 
peerId, groupId);
         return response;
       } catch (RaftConnectionException e) {
-        LOGGER.error("Can not init series reader in Node<{}> of group<{}>", 
peerId, groupId, e);
+        LOGGER.debug("Can not init series reader in Node<{}> of group<{}>", 
peerId, groupId, e);
       }
     }
     throw new RaftConnectionException(
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryPathsAsyncProcessor.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryPathsAsyncProcessor.java
index 8e1e47b..8e96032 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryPathsAsyncProcessor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryPathsAsyncProcessor.java
@@ -90,6 +90,7 @@ public class QueryPathsAsyncProcessor extends 
BasicAsyncUserProcessor<QueryPaths
     for (String path : request.getPath()) {
       response.addPaths(mManager.getPaths(path));
     }
+    System.out.println("Paths: " + response.getPaths());
   }
 
   @Override
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
index ff716a3..950d02c 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
@@ -66,9 +66,10 @@ public class TSServiceClusterImpl extends TSServiceImpl {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TSServiceClusterImpl.class);
 
-  private ClusterQueryProcessExecutor queryDataExecutor = new 
ClusterQueryProcessExecutor();
-  private NonQueryExecutor nonQueryExecutor = new NonQueryExecutor();
   private QueryMetadataExecutor queryMetadataExecutor = new 
QueryMetadataExecutor();
+  private ClusterQueryProcessExecutor queryDataExecutor = new 
ClusterQueryProcessExecutor(
+      queryMetadataExecutor);
+  private NonQueryExecutor nonQueryExecutor = new NonQueryExecutor();
 
   private IClusterRpcQueryManager queryManager = 
ClusterRpcQueryManager.getInstance();
 
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
index 9dd3867..c2303d3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
@@ -98,38 +98,19 @@ public class RaftUtils {
    */
   private static final ConcurrentHashMap<String, PeerId> groupLeaderCache = 
new ConcurrentHashMap<>();
 
-  private static ThreadLocal<Map<String, Integer>> nodeIndexMap = new 
ThreadLocal<Map<String, Integer>>() {
-    @Override
-    protected Map<String, Integer> initialValue() {
-      Map<String, Integer> map = new HashMap<>();
-      router.getAllGroupId().forEach(groupId -> {
-        PhysicalNode[] physicalNodes = router.getNodesByGroupId(groupId);
-        map.put(groupId, getRandomInt(physicalNodes.length));
-      });
-      return map;
-    }
-  };
+  private static ThreadLocal<Map<String, Integer>> nodeIndexMap = 
ThreadLocal.withInitial(() -> {
+    Map<String, Integer> map = new HashMap<>();
+    router.getAllGroupId().forEach(groupId -> {
+      PhysicalNode[] physicalNodes = router.getNodesByGroupId(groupId);
+      map.put(groupId, getRandomInt(physicalNodes.length));
+    });
+    return map;
+  });
 
   private RaftUtils() {
   }
 
   /**
-   * Get peer id by input ip
-   *
-   * @return null if not found
-   */
-  public static PeerId getPeerIDByIP(String ip) {
-    RaftService service = (RaftService) 
server.getMetadataHolder().getService();
-    List<PeerId> peerIdList = service.getPeerIdList();
-    for (int i = 0; i < peerIdList.size(); i++) {
-      if (peerIdList.get(i).getIp().equals(ip)) {
-        return peerIdList.get(i);
-      }
-    }
-    return null;
-  }
-
-  /**
    * Get peer ID in order
    *
    * @return node id
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBQueryIT.java 
b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBQueryIT.java
index 5be3cbc..de2bf64 100644
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBQueryIT.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBQueryIT.java
@@ -291,6 +291,7 @@ public class IoTDBQueryIT {
         .getConnection(Config.IOTDB_URL_PREFIX + URL, "root", "root")) {
       insertData(connection, createSQLs, insertSQLs);
       Statement statement = connection.createStatement();
+      statement.execute("set read metadata level to 2");
 
       for(int i =0 ; i < queryStatementsWithoutFilter.length; i++) {
         String queryStatement = queryStatementsWithoutFilter[i];
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/query/utils/ExpressionUtilsTest.java
 
b/cluster/src/test/java/org/apache/iotdb/cluster/query/utils/ExpressionUtilsTest.java
index 84f8f5f..fb67377 100644
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/query/utils/ExpressionUtilsTest.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/query/utils/ExpressionUtilsTest.java
@@ -19,26 +19,20 @@
 package org.apache.iotdb.cluster.query.utils;
 
 import static org.apache.iotdb.cluster.utils.Utils.insertData;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
 import org.apache.iotdb.cluster.config.ClusterConfig;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.entity.Server;
 import org.apache.iotdb.cluster.qp.executor.ClusterQueryProcessExecutor;
+import org.apache.iotdb.cluster.qp.executor.QueryMetadataExecutor;
 import org.apache.iotdb.cluster.query.expression.TrueExpression;
-import 
org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcQueryManager;
-import 
org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager;
 import org.apache.iotdb.cluster.utils.EnvironmentUtils;
 import org.apache.iotdb.db.qp.QueryProcessor;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
@@ -54,7 +48,8 @@ import org.junit.Test;
 public class ExpressionUtilsTest {
   private Server server;
   private static final ClusterConfig CLUSTER_CONFIG = 
ClusterDescriptor.getInstance().getConfig();
-  private ClusterQueryProcessExecutor queryDataExecutor = new 
ClusterQueryProcessExecutor();
+  private ClusterQueryProcessExecutor queryDataExecutor = new 
ClusterQueryProcessExecutor(
+      new QueryMetadataExecutor());
   private QueryProcessor queryProcessor = new 
QueryProcessor(queryDataExecutor);
 
   private static final String URL = "127.0.0.1:6667/";
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtilsTest.java
 
b/cluster/src/test/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtilsTest.java
index 363ef98..9b44ade 100644
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtilsTest.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtilsTest.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.cluster.config.ClusterConfig;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.entity.Server;
 import org.apache.iotdb.cluster.qp.executor.ClusterQueryProcessExecutor;
+import org.apache.iotdb.cluster.qp.executor.QueryMetadataExecutor;
 import 
org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcQueryManager;
 import 
org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager;
 import 
org.apache.iotdb.cluster.query.manager.coordinatornode.FilterSeriesGroupEntity;
@@ -61,7 +62,8 @@ public class QueryPlanPartitionUtilsTest {
   private static final String LOCAL_ADDR = String
       .format("%s:%d", CLUSTER_CONFIG.getIp(), CLUSTER_CONFIG.getPort());
   private static ClusterRpcQueryManager manager = 
ClusterRpcQueryManager.getInstance();
-  private ClusterQueryProcessExecutor queryDataExecutor = new 
ClusterQueryProcessExecutor();
+  private ClusterQueryProcessExecutor queryDataExecutor = new 
ClusterQueryProcessExecutor(
+      new QueryMetadataExecutor());
   private QueryProcessor queryProcessor = new 
QueryProcessor(queryDataExecutor);
   private static final PhysicalNode localNode = new 
PhysicalNode(CLUSTER_CONFIG.getIp(),
       CLUSTER_CONFIG.getPort());

Reply via email to