This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9f79ce6e18 [IOTDB-3716] Finish updateRegionCache interface. (#6564)
9f79ce6e18 is described below
commit 9f79ce6e186b9a68b3ae5ed6326600da9aa3f557
Author: ZhangHongYin <[email protected]>
AuthorDate: Mon Jul 4 16:02:50 2022 +0800
[IOTDB-3716] Finish updateRegionCache interface. (#6564)
---
.../mpp/plan/analyze/ClusterPartitionFetcher.java | 26 ++++++++++++++++++++++
.../mpp/plan/analyze/FakePartitionFetcherImpl.java | 6 +++++
.../db/mpp/plan/analyze/IPartitionFetcher.java | 3 +++
.../plan/analyze/StandalonePartitionFetcher.java | 6 +++++
.../impl/DataNodeInternalRPCServiceImpl.java | 11 +++++++++
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 +
thrift/src/main/thrift/datanode.thrift | 12 ++++++++++
7 files changed, 65 insertions(+)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
index 7e9bddec6d..3aaddaa818 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.plan.analyze;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
@@ -51,6 +52,7 @@ import
org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
import org.apache.iotdb.db.metadata.utils.MetaUtils;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.db.service.metrics.recorder.CacheMetricsRecorder;
+import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
@@ -71,6 +73,8 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
@@ -259,6 +263,11 @@ public class ClusterPartitionFetcher implements
IPartitionFetcher {
}
}
+ @Override
+ public boolean updateRegionCache(TRegionRouteReq req) {
+ return partitionCache.updateGroupIdToReplicaSetMap(req);
+ }
+
@Override
public void invalidAllCache() {
logger.debug("Invalidate partition cache");
@@ -463,6 +472,12 @@ public class ClusterPartitionFetcher implements
IPartitionFetcher {
private final int seriesPartitionSlotNum;
+ /** the latest time when groupIdToReplicaSetMap updated. */
+ private final AtomicLong latestUpdateTime = new AtomicLong(0);
+ /** TConsensusGroupId -> TRegionReplicaSet */
+ private final Map<TConsensusGroupId, TRegionReplicaSet>
groupIdToReplicaSetMap =
+ new ConcurrentHashMap<>();
+
public PartitionCache(String seriesSlotExecutorName, int
seriesPartitionSlotNum) {
this.seriesSlotExecutorName = seriesSlotExecutorName;
this.seriesPartitionSlotNum = seriesPartitionSlotNum;
@@ -737,6 +752,17 @@ public class ClusterPartitionFetcher implements
IPartitionFetcher {
}
}
+ public boolean updateGroupIdToReplicaSetMap(TRegionRouteReq req) {
+ long timestamp = req.getTimestamp();
+ boolean result = (timestamp ==
latestUpdateTime.accumulateAndGet(timestamp, Math::max));
+ // if timestamp is greater than latestUpdateTime, then update
+ if (result) {
+ groupIdToReplicaSetMap.clear();
+ groupIdToReplicaSetMap.putAll(req.getRegionRouteMap());
+ }
+ return result;
+ }
+
@Override
public String toString() {
return "PartitionCache{"
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakePartitionFetcherImpl.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakePartitionFetcherImpl.java
index 5e15ce763d..e042e57fad 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakePartitionFetcherImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakePartitionFetcherImpl.java
@@ -32,6 +32,7 @@ import
org.apache.iotdb.commons.partition.SchemaNodeManagementPartition;
import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
+import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
import java.util.ArrayList;
import java.util.Arrays;
@@ -214,6 +215,11 @@ public class FakePartitionFetcherImpl implements
IPartitionFetcher {
return null;
}
+ @Override
+ public boolean updateRegionCache(TRegionRouteReq req) {
+ return true;
+ }
+
@Override
public void invalidAllCache() {}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/IPartitionFetcher.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/IPartitionFetcher.java
index 93f0b6a202..c187794e85 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/IPartitionFetcher.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/IPartitionFetcher.java
@@ -23,6 +23,7 @@ import
org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.commons.partition.SchemaNodeManagementPartition;
import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
+import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
import java.util.List;
import java.util.Map;
@@ -50,5 +51,7 @@ public interface IPartitionFetcher {
DataPartition getOrCreateDataPartition(List<DataPartitionQueryParam>
dataPartitionQueryParams);
+ boolean updateRegionCache(TRegionRouteReq req);
+
void invalidAllCache();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandalonePartitionFetcher.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandalonePartitionFetcher.java
index d389e124f3..33f35d006c 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandalonePartitionFetcher.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandalonePartitionFetcher.java
@@ -34,6 +34,7 @@ import
org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
import org.apache.iotdb.db.localconfignode.LocalConfigNode;
import org.apache.iotdb.db.metadata.utils.MetaUtils;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
+import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -129,6 +130,11 @@ public class StandalonePartitionFetcher implements
IPartitionFetcher {
}
}
+ @Override
+ public boolean updateRegionCache(TRegionRouteReq req) {
+ return true;
+ }
+
@Override
public void invalidAllCache() {}
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index b25d320b5b..4ef5e03fb2 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -81,6 +81,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidatePermissionCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TMigrateRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TMigrateRegionResp;
+import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
import org.apache.iotdb.mpp.rpc.thrift.TSchemaFetchRequest;
import org.apache.iotdb.mpp.rpc.thrift.TSchemaFetchResponse;
import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
@@ -345,6 +346,16 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
return resp;
}
+ @Override
+ public TSStatus updateRegionCache(TRegionRouteReq req) throws TException {
+ boolean result =
ClusterPartitionFetcher.getInstance().updateRegionCache(req);
+ if (result) {
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ } else {
+ return RpcUtils.getStatus(TSStatusCode.CACHE_UPDATE_FAIL);
+ }
+ }
+
private Map<TConsensusGroupId, Boolean> getJudgedLeaders() {
Map<TConsensusGroupId, Boolean> result = new HashMap<>();
if (DataRegionConsensusImpl.getInstance() != null) {
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 0fcc1a8ada..146068ccd7 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -122,6 +122,7 @@ public enum TSStatusCode {
MIGRATE_REGION_ERROR(710),
CREATE_REGION_ERROR(711),
DELETE_REGION_ERROR(712),
+ CACHE_UPDATE_FAIL(713),
// configuration
CONFIG_ERROR(800),
diff --git a/thrift/src/main/thrift/datanode.thrift
b/thrift/src/main/thrift/datanode.thrift
index 8b3cf0c419..3b515f646b 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -172,6 +172,11 @@ struct THeartbeatResp {
4: optional i16 memory
}
+struct TRegionRouteReq {
+ 1: required i64 timestamp
+ 2: required map<common.TConsensusGroupId, common.TRegionReplicaSet>
regionRouteMap
+}
+
service IDataNodeRPCService {
// -----------------------------------For Data
Node-----------------------------------------------
@@ -249,6 +254,13 @@ service IDataNodeRPCService {
**/
THeartbeatResp getDataNodeHeartBeat(THeartbeatReq req)
+ /**
+ * ConfigNode will ask DataNode to update region cache
+ *
+ * @param ConfigNode will send timestamp and new regionRouteMap in
TRegionRouteReq
+ **/
+ common.TSStatus updateRegionCache(TRegionRouteReq req)
+
/**
* Config node will create a function on a list of data nodes.
*