This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch ca_performance
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ca_performance by this push:
new 89bbcb6e041 stop always fetching latest time partition from CN
new 720e670d3f7 Merge branch 'ca_performance' of
https://github.com/apache/iotdb into ca_performance
89bbcb6e041 is described below
commit 89bbcb6e0416ada1cf4e609a1f3bc6e4502241ca
Author: JackieTien97 <[email protected]>
AuthorDate: Tue Aug 8 16:52:32 2023 +0800
stop always fetching latest time partition from CN
---
.../protocol/thrift/impl/ClientRPCServiceImpl.java | 7 +---
.../queryengine/plan/analyze/AnalyzeVisitor.java | 18 +++++-----
.../plan/analyze/ClusterPartitionFetcher.java | 6 ++++
.../plan/analyze/IPartitionFetcher.java | 3 ++
.../analyze/cache/partition/PartitionCache.java | 42 ++++++++++++++++++++++
.../planner/plan/node/write/InsertTabletNode.java | 20 +++++------
.../plan/statement/crud/InsertTabletStatement.java | 10 +++---
.../apache/iotdb/db/utils/TimePartitionUtils.java | 10 +++---
.../plan/analyze/FakePartitionFetcherImpl.java | 6 ++++
.../queryengine/plan/plan/distribution/Util.java | 8 +++++
.../plan/node/write/WritePlanNodeSplitTest.java | 8 ++---
11 files changed, 100 insertions(+), 38 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index 06d2505dcac..0d7bc77a789 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -778,13 +778,8 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
deviceId = devicePath.getFullPath();
}
- DataPartitionQueryParam queryParam =
- new DataPartitionQueryParam(deviceId, Collections.emptyList(), true,
true);
- DataPartition dataPartition =
- partitionFetcher.getDataPartitionWithUnclosedTimeRange(
- Collections.singletonMap(db,
Collections.singletonList(queryParam)));
List<TRegionReplicaSet> regionReplicaSets =
- dataPartition.getDataRegionReplicaSet(deviceId,
Collections.emptyList());
+ partitionFetcher.getAllDataPartitionsForOneDevice(db, deviceId);
// no valid DataRegion
if (regionReplicaSets.isEmpty()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index 2e345832fee..7ff7886737b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -1804,15 +1804,15 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
if (timeRangeList.get(0).getMin() == Long.MIN_VALUE) {
needLeftAll = true;
startTime =
- (timeRangeList.get(0).getMax() /
TimePartitionUtils.timePartitionInterval)
- * TimePartitionUtils.timePartitionInterval; // included
- endTime = startTime + TimePartitionUtils.timePartitionInterval; //
excluded
+ (timeRangeList.get(0).getMax() /
TimePartitionUtils.TIME_PARTITION_INTERVAL)
+ * TimePartitionUtils.TIME_PARTITION_INTERVAL; // included
+ endTime = startTime + TimePartitionUtils.TIME_PARTITION_INTERVAL; //
excluded
timePartitionSlot =
TimePartitionUtils.getTimePartition(timeRangeList.get(0).getMax());
} else {
startTime =
- (timeRangeList.get(0).getMin() /
TimePartitionUtils.timePartitionInterval)
- * TimePartitionUtils.timePartitionInterval; // included
- endTime = startTime + TimePartitionUtils.timePartitionInterval; //
excluded
+ (timeRangeList.get(0).getMin() /
TimePartitionUtils.TIME_PARTITION_INTERVAL)
+ * TimePartitionUtils.TIME_PARTITION_INTERVAL; // included
+ endTime = startTime + TimePartitionUtils.TIME_PARTITION_INTERVAL; //
excluded
timePartitionSlot =
TimePartitionUtils.getTimePartition(timeRangeList.get(0).getMin());
needLeftAll = false;
}
@@ -1832,14 +1832,14 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
result.add(timePartitionSlot);
// next init
endTime =
- (curLeft / TimePartitionUtils.timePartitionInterval + 1)
- * TimePartitionUtils.timePartitionInterval;
+ (curLeft / TimePartitionUtils.TIME_PARTITION_INTERVAL + 1)
+ * TimePartitionUtils.TIME_PARTITION_INTERVAL;
timePartitionSlot = TimePartitionUtils.getTimePartition(curLeft);
} else if (curRight >= endTime) {
result.add(timePartitionSlot);
// next init
timePartitionSlot = new TTimePartitionSlot(endTime);
- endTime = endTime + TimePartitionUtils.timePartitionInterval;
+ endTime = endTime + TimePartitionUtils.TIME_PARTITION_INTERVAL;
} else {
index++;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
index c13d30e780c..c0b099fbdcd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
@@ -224,6 +224,12 @@ public class ClusterPartitionFetcher implements
IPartitionFetcher {
}
}
+ @Override
+ public List<TRegionReplicaSet> getAllDataPartitionsForOneDevice(
+ String database, String deviceId) {
+ return partitionCache.getAllDataPartitionsForOneDevice(database, deviceId);
+ }
+
@Override
public DataPartition getOrCreateDataPartition(
Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IPartitionFetcher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IPartitionFetcher.java
index 8930dccb21d..05dde1f8a00 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IPartitionFetcher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IPartitionFetcher.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.queryengine.plan.analyze;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.commons.partition.SchemaNodeManagementPartition;
@@ -56,6 +57,8 @@ public interface IPartitionFetcher {
DataPartition getDataPartitionWithUnclosedTimeRange(
Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap);
+ List<TRegionReplicaSet> getAllDataPartitionsForOneDevice(String database,
String deviceId);
+
/**
* Get or create data partition, used in standalone write scenarios. if
enableAutoCreateSchema is
* true and database/series/time slots not exists, then automatically create.
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
index 478033b335e..0d1204d8178 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
@@ -49,10 +49,12 @@ import
org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.db.schemaengine.schemaregion.utils.MetaUtils;
import org.apache.iotdb.db.service.metrics.CacheMetrics;
+import org.apache.iotdb.db.utils.TimePartitionUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -623,6 +625,46 @@ public class PartitionCache {
}
}
+ public List<TRegionReplicaSet> getAllDataPartitionsForOneDevice(
+ String database, String deviceId) {
+ DataPartitionTable dataPartitionTable =
dataPartitionCache.getIfPresent(database);
+ if (dataPartitionTable != null) {
+ Map<TSeriesPartitionSlot, SeriesPartitionTable>
cachedDatabasePartitionMap =
+ dataPartitionTable.getDataPartitionMap();
+
+ TSeriesPartitionSlot seriesPartitionSlot =
partitionExecutor.getSeriesPartitionSlot(deviceId);
+ SeriesPartitionTable cachedSeriesPartitionTable =
+ cachedDatabasePartitionMap.get(seriesPartitionSlot);
+
+ if (cachedSeriesPartitionTable != null) {
+ List<TRegionReplicaSet> regionReplicaSets = new LinkedList<>();
+ TTimePartitionSlot current =
+ TimePartitionUtils.getTimePartition(System.currentTimeMillis());
+ MutableBoolean hasLatest = new MutableBoolean(false);
+ cachedSeriesPartitionTable
+ .getSeriesPartitionMap()
+ .forEach(
+ (slot, value) -> {
+ for (TConsensusGroupId consensusGroupId : value) {
+
regionReplicaSets.add(getRegionReplicaSet(consensusGroupId));
+ }
+ hasLatest.setValue(
+ hasLatest.booleanValue() || (slot.startTime ==
current.startTime));
+ });
+ if (hasLatest.booleanValue()) {
+ // cache hit
+ cacheMetrics.record(true, CacheMetrics.DATA_PARTITION_CACHE_NAME);
+ return regionReplicaSets;
+ }
+ }
+ }
+ DataPartitionQueryParam queryParam =
+ new DataPartitionQueryParam(deviceId, Collections.emptyList(), true,
true);
+ return getDataPartition(
+ Collections.singletonMap(database,
Collections.singletonList(queryParam)))
+ .getDataRegionReplicaSet(deviceId, Collections.emptyList());
+ }
+
/**
* get dataPartition from database
*
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index c19b4f037d0..c028da65e85 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -191,9 +191,9 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
return Collections.emptyList();
}
long startTime =
- (times[0] / TimePartitionUtils.timePartitionInterval)
- * TimePartitionUtils.timePartitionInterval; // included
- long endTime = startTime + TimePartitionUtils.timePartitionInterval; //
excluded
+ (times[0] / TimePartitionUtils.TIME_PARTITION_INTERVAL)
+ * TimePartitionUtils.TIME_PARTITION_INTERVAL; // included
+ long endTime = startTime + TimePartitionUtils.TIME_PARTITION_INTERVAL; //
excluded
TTimePartitionSlot timePartitionSlot =
TimePartitionUtils.getTimePartition(times[0]);
int startLoc = 0; // included
@@ -210,8 +210,8 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
startLoc = i;
startTime = endTime;
endTime =
- (times[i] / TimePartitionUtils.timePartitionInterval + 1)
- * TimePartitionUtils.timePartitionInterval;
+ (times[i] / TimePartitionUtils.TIME_PARTITION_INTERVAL + 1)
+ * TimePartitionUtils.TIME_PARTITION_INTERVAL;
timePartitionSlot = TimePartitionUtils.getTimePartition(times[i]);
}
}
@@ -293,17 +293,17 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
public List<TTimePartitionSlot> getTimePartitionSlots() {
List<TTimePartitionSlot> result = new ArrayList<>();
long startTime =
- (times[0] / TimePartitionUtils.timePartitionInterval)
- * TimePartitionUtils.timePartitionInterval; // included
- long endTime = startTime + TimePartitionUtils.timePartitionInterval; //
excluded
+ (times[0] / TimePartitionUtils.TIME_PARTITION_INTERVAL)
+ * TimePartitionUtils.TIME_PARTITION_INTERVAL; // included
+ long endTime = startTime + TimePartitionUtils.TIME_PARTITION_INTERVAL; //
excluded
TTimePartitionSlot timePartitionSlot =
TimePartitionUtils.getTimePartition(times[0]);
for (int i = 1; i < times.length; i++) { // times are sorted in session
API.
if (times[i] >= endTime) {
result.add(timePartitionSlot);
// next init
endTime =
- (times[i] / TimePartitionUtils.timePartitionInterval + 1)
- * TimePartitionUtils.timePartitionInterval;
+ (times[i] / TimePartitionUtils.TIME_PARTITION_INTERVAL + 1)
+ * TimePartitionUtils.TIME_PARTITION_INTERVAL;
timePartitionSlot = TimePartitionUtils.getTimePartition(times[i]);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
index e7738efc740..7591a33fb66 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
@@ -119,17 +119,17 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
public List<TTimePartitionSlot> getTimePartitionSlots() {
List<TTimePartitionSlot> result = new ArrayList<>();
long startTime =
- (times[0] / TimePartitionUtils.timePartitionInterval)
- * TimePartitionUtils.timePartitionInterval; // included
- long endTime = startTime + TimePartitionUtils.timePartitionInterval; //
excluded
+ (times[0] / TimePartitionUtils.TIME_PARTITION_INTERVAL)
+ * TimePartitionUtils.TIME_PARTITION_INTERVAL; // included
+ long endTime = startTime + TimePartitionUtils.TIME_PARTITION_INTERVAL; //
excluded
TTimePartitionSlot timePartitionSlot =
TimePartitionUtils.getTimePartition(times[0]);
for (int i = 1; i < times.length; i++) { // times are sorted in session
API.
if (times[i] >= endTime) {
result.add(timePartitionSlot);
// next init
endTime =
- (times[i] / TimePartitionUtils.timePartitionInterval + 1)
- * TimePartitionUtils.timePartitionInterval;
+ (times[i] / TimePartitionUtils.TIME_PARTITION_INTERVAL + 1)
+ * TimePartitionUtils.TIME_PARTITION_INTERVAL;
timePartitionSlot = TimePartitionUtils.getTimePartition(times[i]);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TimePartitionUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TimePartitionUtils.java
index 3b5e0f8177e..c4b586c53bf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TimePartitionUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TimePartitionUtils.java
@@ -23,21 +23,23 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.utils.TestOnly;
public class TimePartitionUtils {
- public static long timePartitionInterval =
+ public static long TIME_PARTITION_INTERVAL =
CommonDescriptor.getInstance().getConfig().getTimePartitionInterval();
+ private TimePartitionUtils() {}
+
public static TTimePartitionSlot getTimePartition(long time) {
TTimePartitionSlot timePartitionSlot = new TTimePartitionSlot();
- timePartitionSlot.setStartTime(time - time % timePartitionInterval);
+ timePartitionSlot.setStartTime(time - time % TIME_PARTITION_INTERVAL);
return timePartitionSlot;
}
public static long getTimePartitionInterval() {
- return timePartitionInterval;
+ return TIME_PARTITION_INTERVAL;
}
@TestOnly
public static void setTimePartitionInterval(long timePartitionInterval) {
- TimePartitionUtils.timePartitionInterval = timePartitionInterval;
+ TimePartitionUtils.TIME_PARTITION_INTERVAL = timePartitionInterval;
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/FakePartitionFetcherImpl.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/FakePartitionFetcherImpl.java
index 3bb4e323bf1..ffda76d95bb 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/FakePartitionFetcherImpl.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/FakePartitionFetcherImpl.java
@@ -215,6 +215,12 @@ public class FakePartitionFetcherImpl implements
IPartitionFetcher {
return getDataPartition(sgNameToQueryParamsMap);
}
+ @Override
+ public List<TRegionReplicaSet> getAllDataPartitionsForOneDevice(
+ String database, String deviceId) {
+ return null;
+ }
+
@Override
public DataPartition getOrCreateDataPartition(
Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/Util.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/Util.java
index 28c8547001f..b585e641e09 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/Util.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/Util.java
@@ -378,6 +378,14 @@ public class Util {
return ANALYSIS.getDataPartitionInfo();
}
+ @Override
+ public List<TRegionReplicaSet> getAllDataPartitionsForOneDevice(
+ String database, String deviceId) {
+ return ANALYSIS
+ .getDataPartitionInfo()
+ .getDataRegionReplicaSet(deviceId, Collections.emptyList());
+ }
+
@Override
public DataPartition getOrCreateDataPartition(
Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/node/write/WritePlanNodeSplitTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/node/write/WritePlanNodeSplitTest.java
index dfe79d71771..9ac3e77c61e 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/node/write/WritePlanNodeSplitTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/node/write/WritePlanNodeSplitTest.java
@@ -104,7 +104,7 @@ public class WritePlanNodeSplitTest {
for (int i = 0; i < seriesSlotPartitionNum; i++) {
Map<TTimePartitionSlot, List<TRegionReplicaSet>> timePartitionSlotMap =
new HashMap<>();
for (int t = 0; t < 5; t++) {
- long startTime = t * TimePartitionUtils.timePartitionInterval;
+ long startTime = t * TimePartitionUtils.TIME_PARTITION_INTERVAL;
timePartitionSlotMap.put(
new TTimePartitionSlot(startTime),
Collections.singletonList(
@@ -125,7 +125,7 @@ public class WritePlanNodeSplitTest {
Map<TTimePartitionSlot, List<TRegionReplicaSet>> timePartitionSlotMap =
new HashMap<>();
for (int t = 0; t < 5; t++) {
timePartitionSlotMap.put(
- new TTimePartitionSlot(t *
TimePartitionUtils.timePartitionInterval),
+ new TTimePartitionSlot(t *
TimePartitionUtils.TIME_PARTITION_INTERVAL),
Collections.singletonList(
new TRegionReplicaSet(
new TConsensusGroupId(TConsensusGroupType.DataRegion, 99),
locationList)));
@@ -150,7 +150,7 @@ public class WritePlanNodeSplitTest {
}
private int getRegionIdByTime(long startTime) {
- return (int) (4 - (startTime / TimePartitionUtils.timePartitionInterval));
+ return (int) (4 - (startTime /
TimePartitionUtils.TIME_PARTITION_INTERVAL));
}
protected DataPartition getDataPartition(
@@ -288,7 +288,7 @@ public class WritePlanNodeSplitTest {
for (int i = 0; i < 5; i++) {
InsertRowNode insertRowNode = new InsertRowNode(new PlanNodeId("plan
node 3"));
insertRowNode.setDevicePath(new
PartialPath(String.format("root.sg1.d%d", i)));
- insertRowNode.setTime(i * TimePartitionUtils.timePartitionInterval);
+ insertRowNode.setTime(i * TimePartitionUtils.TIME_PARTITION_INTERVAL);
insertRowsNode.addOneInsertRowNode(insertRowNode, 2 * i);
insertRowNode = new InsertRowNode(new PlanNodeId("plan node 3"));