This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f79ce6e18 [IOTDB-3716] Finish updateRegionCache interface. (#6564)
9f79ce6e18 is described below

commit 9f79ce6e186b9a68b3ae5ed6326600da9aa3f557
Author: ZhangHongYin <[email protected]>
AuthorDate: Mon Jul 4 16:02:50 2022 +0800

    [IOTDB-3716] Finish updateRegionCache interface. (#6564)
---
 .../mpp/plan/analyze/ClusterPartitionFetcher.java  | 26 ++++++++++++++++++++++
 .../mpp/plan/analyze/FakePartitionFetcherImpl.java |  6 +++++
 .../db/mpp/plan/analyze/IPartitionFetcher.java     |  3 +++
 .../plan/analyze/StandalonePartitionFetcher.java   |  6 +++++
 .../impl/DataNodeInternalRPCServiceImpl.java       | 11 +++++++++
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |  1 +
 thrift/src/main/thrift/datanode.thrift             | 12 ++++++++++
 7 files changed, 65 insertions(+)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
index 7e9bddec6d..3aaddaa818 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.plan.analyze;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
@@ -51,6 +52,7 @@ import 
org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
 import org.apache.iotdb.db.metadata.utils.MetaUtils;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
 import org.apache.iotdb.db.service.metrics.recorder.CacheMetricsRecorder;
+import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 
@@ -71,6 +73,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
@@ -259,6 +263,11 @@ public class ClusterPartitionFetcher implements 
IPartitionFetcher {
     }
   }
 
+  @Override
+  public boolean updateRegionCache(TRegionRouteReq req) {
+    return partitionCache.updateGroupIdToReplicaSetMap(req);
+  }
+
   @Override
   public void invalidAllCache() {
     logger.debug("Invalidate partition cache");
@@ -463,6 +472,12 @@ public class ClusterPartitionFetcher implements 
IPartitionFetcher {
 
     private final int seriesPartitionSlotNum;
 
+    /** the latest time when groupIdToReplicaSetMap updated. */
+    private final AtomicLong latestUpdateTime = new AtomicLong(0);
+    /** TConsensusGroupId -> TRegionReplicaSet */
+    private final Map<TConsensusGroupId, TRegionReplicaSet> 
groupIdToReplicaSetMap =
+        new ConcurrentHashMap<>();
+
     public PartitionCache(String seriesSlotExecutorName, int 
seriesPartitionSlotNum) {
       this.seriesSlotExecutorName = seriesSlotExecutorName;
       this.seriesPartitionSlotNum = seriesPartitionSlotNum;
@@ -737,6 +752,17 @@ public class ClusterPartitionFetcher implements 
IPartitionFetcher {
       }
     }
 
+    public boolean updateGroupIdToReplicaSetMap(TRegionRouteReq req) {
+      long timestamp = req.getTimestamp();
+      boolean result = (timestamp == 
latestUpdateTime.accumulateAndGet(timestamp, Math::max));
+      // if timestamp is greater than latestUpdateTime, then update
+      if (result) {
+        groupIdToReplicaSetMap.clear();
+        groupIdToReplicaSetMap.putAll(req.getRegionRouteMap());
+      }
+      return result;
+    }
+
     @Override
     public String toString() {
       return "PartitionCache{"
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakePartitionFetcherImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakePartitionFetcherImpl.java
index 5e15ce763d..e042e57fad 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakePartitionFetcherImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakePartitionFetcherImpl.java
@@ -32,6 +32,7 @@ import 
org.apache.iotdb.commons.partition.SchemaNodeManagementPartition;
 import org.apache.iotdb.commons.partition.SchemaPartition;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
+import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -214,6 +215,11 @@ public class FakePartitionFetcherImpl implements 
IPartitionFetcher {
     return null;
   }
 
+  @Override
+  public boolean updateRegionCache(TRegionRouteReq req) {
+    return true;
+  }
+
   @Override
   public void invalidAllCache() {}
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/IPartitionFetcher.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/IPartitionFetcher.java
index 93f0b6a202..c187794e85 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/IPartitionFetcher.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/IPartitionFetcher.java
@@ -23,6 +23,7 @@ import 
org.apache.iotdb.commons.partition.DataPartitionQueryParam;
 import org.apache.iotdb.commons.partition.SchemaNodeManagementPartition;
 import org.apache.iotdb.commons.partition.SchemaPartition;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
+import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
 
 import java.util.List;
 import java.util.Map;
@@ -50,5 +51,7 @@ public interface IPartitionFetcher {
 
   DataPartition getOrCreateDataPartition(List<DataPartitionQueryParam> 
dataPartitionQueryParams);
 
+  boolean updateRegionCache(TRegionRouteReq req);
+
   void invalidAllCache();
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandalonePartitionFetcher.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandalonePartitionFetcher.java
index d389e124f3..33f35d006c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandalonePartitionFetcher.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandalonePartitionFetcher.java
@@ -34,6 +34,7 @@ import 
org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
 import org.apache.iotdb.db.localconfignode.LocalConfigNode;
 import org.apache.iotdb.db.metadata.utils.MetaUtils;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
+import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -129,6 +130,11 @@ public class StandalonePartitionFetcher implements 
IPartitionFetcher {
     }
   }
 
+  @Override
+  public boolean updateRegionCache(TRegionRouteReq req) {
+    return true;
+  }
+
   @Override
   public void invalidAllCache() {}
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index b25d320b5b..4ef5e03fb2 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -81,6 +81,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
 import org.apache.iotdb.mpp.rpc.thrift.TInvalidatePermissionCacheReq;
 import org.apache.iotdb.mpp.rpc.thrift.TMigrateRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TMigrateRegionResp;
+import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
 import org.apache.iotdb.mpp.rpc.thrift.TSchemaFetchRequest;
 import org.apache.iotdb.mpp.rpc.thrift.TSchemaFetchResponse;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
@@ -345,6 +346,16 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
     return resp;
   }
 
+  @Override
+  public TSStatus updateRegionCache(TRegionRouteReq req) throws TException {
+    boolean result = 
ClusterPartitionFetcher.getInstance().updateRegionCache(req);
+    if (result) {
+      return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+    } else {
+      return RpcUtils.getStatus(TSStatusCode.CACHE_UPDATE_FAIL);
+    }
+  }
+
   private Map<TConsensusGroupId, Boolean> getJudgedLeaders() {
     Map<TConsensusGroupId, Boolean> result = new HashMap<>();
     if (DataRegionConsensusImpl.getInstance() != null) {
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java 
b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 0fcc1a8ada..146068ccd7 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -122,6 +122,7 @@ public enum TSStatusCode {
   MIGRATE_REGION_ERROR(710),
   CREATE_REGION_ERROR(711),
   DELETE_REGION_ERROR(712),
+  CACHE_UPDATE_FAIL(713),
 
   // configuration
   CONFIG_ERROR(800),
diff --git a/thrift/src/main/thrift/datanode.thrift 
b/thrift/src/main/thrift/datanode.thrift
index 8b3cf0c419..3b515f646b 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -172,6 +172,11 @@ struct THeartbeatResp {
   4: optional i16 memory
 }
 
+struct TRegionRouteReq {
+  1: required i64 timestamp
+  2: required map<common.TConsensusGroupId, common.TRegionReplicaSet> 
regionRouteMap
+}
+
 service IDataNodeRPCService {
 
   // -----------------------------------For Data 
Node-----------------------------------------------
@@ -249,6 +254,13 @@ service IDataNodeRPCService {
   **/
   THeartbeatResp getDataNodeHeartBeat(THeartbeatReq req)
 
+  /**
+  * ConfigNode will ask DataNode to update region cache
+  *
+  * @param ConfigNode will send timestamp and new regionRouteMap in 
TRegionRouteReq
+  **/
+  common.TSStatus updateRegionCache(TRegionRouteReq req)
+
   /**
    * Config node will create a function on a list of data nodes.
    *

Reply via email to