This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch ca_performance
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ca_performance by this push:
new 8b56430485e stop always fetching latest time partition from CN
8b56430485e is described below
commit 8b56430485ea355326d3afc5e938958050347c90
Author: JackieTien97 <[email protected]>
AuthorDate: Tue Aug 8 17:27:02 2023 +0800
stop always fetching latest time partition from CN
---
.../main/java/org/apache/iotdb/SessionExample.java | 81 +++++++++++-----------
.../plan/analyze/ClusterPartitionFetcher.java | 14 +++-
.../analyze/cache/partition/PartitionCache.java | 72 ++++++++++---------
3 files changed, 90 insertions(+), 77 deletions(-)
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 60bf282ed8f..7947936d5cd 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -23,10 +23,8 @@ import org.apache.iotdb.common.rpc.thrift.TAggregationType;
import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.isession.SessionDataSet.DataIterator;
import org.apache.iotdb.isession.template.Template;
-import org.apache.iotdb.isession.util.Version;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.session.template.MeasurementNode;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -56,8 +54,8 @@ public class SessionExample {
private static final String ROOT_SG1_D1_S3 = "root.sg1.d1.s3";
private static final String ROOT_SG1_D1_S4 = "root.sg1.d1.s4";
private static final String ROOT_SG1_D1_S5 = "root.sg1.d1.s5";
- private static final String ROOT_SG1_D1 = "root.sg1.d1";
- private static final String ROOT_SG1 = "root.sg1";
+ private static final String ROOT_SG1_D1 = "root.db.d1";
+ private static final String ROOT_SG1 = "root.db";
private static final String LOCAL_HOST = "127.0.0.1";
public static final String SELECT_D1 = "select * from root.sg1.d1";
@@ -65,45 +63,45 @@ public class SessionExample {
public static void main(String[] args)
throws IoTDBConnectionException, StatementExecutionException {
- session =
- new Session.Builder()
- .host(LOCAL_HOST)
- .port(6667)
- .username("root")
- .password("root")
- .version(Version.V_1_0)
- .build();
- session.open(false);
-
- // set session fetchSize
- session.setFetchSize(10000);
-
- try {
- session.createDatabase("root.sg1");
- } catch (StatementExecutionException e) {
- if (e.getStatusCode() !=
TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
- throw e;
- }
- }
-
- // createTemplate();
- createTimeseries();
- createMultiTimeseries();
- insertRecord();
- insertTablet();
- // insertTabletWithNullValues();
- // insertTablets();
- // insertRecords();
+ // session =
+ // new Session.Builder()
+ // .host(LOCAL_HOST)
+ // .port(6667)
+ // .username("root")
+ // .password("root")
+ // .version(Version.V_1_0)
+ // .build();
+ // session.open(false);
+ //
+ // set session fetchSize
+ // session.setFetchSize(10000);
+ //
+ // try {
+ // session.createDatabase("root.sg1");
+ // } catch (StatementExecutionException e) {
+ // if (e.getStatusCode() !=
TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
+ // throw e;
+ // }
+ // }
+ //
+ // createTemplate();
+ // createTimeseries();
+ // createMultiTimeseries();
+ // insertRecord();
+ // insertTablet();
+ // insertTabletWithNullValues();
+ // insertTablets();
+ // insertRecords();
// insertText();
// selectInto();
// createAndDropContinuousQueries();
// nonQuery();
- query();
+ // query();
// queryWithTimeout();
- rawDataQuery();
- lastDataQuery();
- aggregationQuery();
- groupByQuery();
+ // rawDataQuery();
+ // lastDataQuery();
+ // aggregationQuery();
+ // groupByQuery();
// queryByIterator();
// deleteData();
// deleteTimeseries();
@@ -117,10 +115,10 @@ public class SessionExample {
sessionEnableRedirect.setFetchSize(10000);
fastLastDataQueryForOneDevice();
- insertRecord4Redirect();
- query4Redirect();
+ // insertRecord4Redirect();
+ // query4Redirect();
sessionEnableRedirect.close();
- session.close();
+ // session.close();
}
private static void createAndDropContinuousQueries()
@@ -803,7 +801,6 @@ public class SessionExample {
List<String> paths = new ArrayList<>();
paths.add("s1");
paths.add("s2");
- paths.add("s3");
try (SessionDataSet sessionDataSet =
sessionEnableRedirect.executeLastDataQueryForOneDevice(
ROOT_SG1, ROOT_SG1_D1, paths, true)) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
index c0b099fbdcd..2e60b445426 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
@@ -58,11 +58,13 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
public class ClusterPartitionFetcher implements IPartitionFetcher {
@@ -227,7 +229,17 @@ public class ClusterPartitionFetcher implements
IPartitionFetcher {
@Override
public List<TRegionReplicaSet> getAllDataPartitionsForOneDevice(
String database, String deviceId) {
- return partitionCache.getAllDataPartitionsForOneDevice(database, deviceId);
+ Optional<List<TRegionReplicaSet>> res =
+ partitionCache.getAllDataPartitionsForOneDevice(database, deviceId);
+ if (res.isPresent()) {
+ return res.get();
+ } else {
+ DataPartitionQueryParam queryParam =
+ new DataPartitionQueryParam(deviceId, Collections.emptyList(), true,
true);
+ return getDataPartitionWithUnclosedTimeRange(
+ Collections.singletonMap(database,
Collections.singletonList(queryParam)))
+ .getDataRegionReplicaSet(deviceId, Collections.emptyList());
+ }
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
index 0d1204d8178..a0a0268e072 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
@@ -67,6 +67,7 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -625,44 +626,47 @@ public class PartitionCache {
}
}
- public List<TRegionReplicaSet> getAllDataPartitionsForOneDevice(
+ public Optional<List<TRegionReplicaSet>> getAllDataPartitionsForOneDevice(
String database, String deviceId) {
- DataPartitionTable dataPartitionTable =
dataPartitionCache.getIfPresent(database);
- if (dataPartitionTable != null) {
- Map<TSeriesPartitionSlot, SeriesPartitionTable>
cachedDatabasePartitionMap =
- dataPartitionTable.getDataPartitionMap();
-
- TSeriesPartitionSlot seriesPartitionSlot =
partitionExecutor.getSeriesPartitionSlot(deviceId);
- SeriesPartitionTable cachedSeriesPartitionTable =
- cachedDatabasePartitionMap.get(seriesPartitionSlot);
-
- if (cachedSeriesPartitionTable != null) {
- List<TRegionReplicaSet> regionReplicaSets = new LinkedList<>();
- TTimePartitionSlot current =
- TimePartitionUtils.getTimePartition(System.currentTimeMillis());
- MutableBoolean hasLatest = new MutableBoolean(false);
- cachedSeriesPartitionTable
- .getSeriesPartitionMap()
- .forEach(
- (slot, value) -> {
- for (TConsensusGroupId consensusGroupId : value) {
-
regionReplicaSets.add(getRegionReplicaSet(consensusGroupId));
- }
- hasLatest.setValue(
- hasLatest.booleanValue() || (slot.startTime ==
current.startTime));
- });
- if (hasLatest.booleanValue()) {
- // cache hit
- cacheMetrics.record(true, CacheMetrics.DATA_PARTITION_CACHE_NAME);
- return regionReplicaSets;
+ dataPartitionCacheLock.readLock().lock();
+ try {
+ DataPartitionTable dataPartitionTable =
dataPartitionCache.getIfPresent(database);
+ if (dataPartitionTable != null) {
+ Map<TSeriesPartitionSlot, SeriesPartitionTable>
cachedDatabasePartitionMap =
+ dataPartitionTable.getDataPartitionMap();
+
+ TSeriesPartitionSlot seriesPartitionSlot =
+ partitionExecutor.getSeriesPartitionSlot(deviceId);
+ SeriesPartitionTable cachedSeriesPartitionTable =
+ cachedDatabasePartitionMap.get(seriesPartitionSlot);
+
+ if (cachedSeriesPartitionTable != null) {
+ List<TRegionReplicaSet> regionReplicaSets = new LinkedList<>();
+ TTimePartitionSlot current =
+ TimePartitionUtils.getTimePartition(System.currentTimeMillis());
+ MutableBoolean hasLatest = new MutableBoolean(false);
+ cachedSeriesPartitionTable
+ .getSeriesPartitionMap()
+ .forEach(
+ (slot, value) -> {
+ for (TConsensusGroupId consensusGroupId : value) {
+
regionReplicaSets.add(getRegionReplicaSet(consensusGroupId));
+ }
+ hasLatest.setValue(
+ hasLatest.booleanValue() || (slot.startTime ==
current.startTime));
+ });
+ if (hasLatest.booleanValue()) {
+ // cache hit
+ cacheMetrics.record(true, CacheMetrics.DATA_PARTITION_CACHE_NAME);
+ return Optional.of(regionReplicaSets);
+ }
}
}
+ } finally {
+ dataPartitionCacheLock.readLock().unlock();
}
- DataPartitionQueryParam queryParam =
- new DataPartitionQueryParam(deviceId, Collections.emptyList(), true,
true);
- return getDataPartition(
- Collections.singletonMap(database,
Collections.singletonList(queryParam)))
- .getDataRegionReplicaSet(deviceId, Collections.emptyList());
+
+ return Optional.empty();
}
/**