This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ca_performance
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ca_performance by this push:
     new 8b56430485e stop always fetching latest time partition from CN
8b56430485e is described below

commit 8b56430485ea355326d3afc5e938958050347c90
Author: JackieTien97 <[email protected]>
AuthorDate: Tue Aug 8 17:27:02 2023 +0800

    stop always fetching latest time partition from CN
---
 .../main/java/org/apache/iotdb/SessionExample.java | 81 +++++++++++-----------
 .../plan/analyze/ClusterPartitionFetcher.java      | 14 +++-
 .../analyze/cache/partition/PartitionCache.java    | 72 ++++++++++---------
 3 files changed, 90 insertions(+), 77 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java 
b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 60bf282ed8f..7947936d5cd 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -23,10 +23,8 @@ import org.apache.iotdb.common.rpc.thrift.TAggregationType;
 import org.apache.iotdb.isession.SessionDataSet;
 import org.apache.iotdb.isession.SessionDataSet.DataIterator;
 import org.apache.iotdb.isession.template.Template;
-import org.apache.iotdb.isession.util.Version;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.session.Session;
 import org.apache.iotdb.session.template.MeasurementNode;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -56,8 +54,8 @@ public class SessionExample {
   private static final String ROOT_SG1_D1_S3 = "root.sg1.d1.s3";
   private static final String ROOT_SG1_D1_S4 = "root.sg1.d1.s4";
   private static final String ROOT_SG1_D1_S5 = "root.sg1.d1.s5";
-  private static final String ROOT_SG1_D1 = "root.sg1.d1";
-  private static final String ROOT_SG1 = "root.sg1";
+  private static final String ROOT_SG1_D1 = "root.db.d1";
+  private static final String ROOT_SG1 = "root.db";
   private static final String LOCAL_HOST = "127.0.0.1";
   public static final String SELECT_D1 = "select * from root.sg1.d1";
 
@@ -65,45 +63,45 @@ public class SessionExample {
 
   public static void main(String[] args)
       throws IoTDBConnectionException, StatementExecutionException {
-    session =
-        new Session.Builder()
-            .host(LOCAL_HOST)
-            .port(6667)
-            .username("root")
-            .password("root")
-            .version(Version.V_1_0)
-            .build();
-    session.open(false);
-
-    // set session fetchSize
-    session.setFetchSize(10000);
-
-    try {
-      session.createDatabase("root.sg1");
-    } catch (StatementExecutionException e) {
-      if (e.getStatusCode() != 
TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
-        throw e;
-      }
-    }
-
-    //     createTemplate();
-    createTimeseries();
-    createMultiTimeseries();
-    insertRecord();
-    insertTablet();
-    //    insertTabletWithNullValues();
-    //    insertTablets();
-    //    insertRecords();
+    //    session =
+    //        new Session.Builder()
+    //            .host(LOCAL_HOST)
+    //            .port(6667)
+    //            .username("root")
+    //            .password("root")
+    //            .version(Version.V_1_0)
+    //            .build();
+    //    session.open(false);
+    //
+    //     set session fetchSize
+    //    session.setFetchSize(10000);
+    //
+    //    try {
+    //      session.createDatabase("root.sg1");
+    //    } catch (StatementExecutionException e) {
+    //      if (e.getStatusCode() != 
TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
+    //        throw e;
+    //      }
+    //    }
+    //
+    //         createTemplate();
+    //    createTimeseries();
+    //    createMultiTimeseries();
+    //    insertRecord();
+    //    insertTablet();
+    //        insertTabletWithNullValues();
+    //        insertTablets();
+    //        insertRecords();
     //    insertText();
     //    selectInto();
     //    createAndDropContinuousQueries();
     //    nonQuery();
-    query();
+    //    query();
     //    queryWithTimeout();
-    rawDataQuery();
-    lastDataQuery();
-    aggregationQuery();
-    groupByQuery();
+    //    rawDataQuery();
+    //    lastDataQuery();
+    //    aggregationQuery();
+    //    groupByQuery();
     //    queryByIterator();
     //    deleteData();
     //    deleteTimeseries();
@@ -117,10 +115,10 @@ public class SessionExample {
     sessionEnableRedirect.setFetchSize(10000);
 
     fastLastDataQueryForOneDevice();
-    insertRecord4Redirect();
-    query4Redirect();
+    //    insertRecord4Redirect();
+    //    query4Redirect();
     sessionEnableRedirect.close();
-    session.close();
+    //    session.close();
   }
 
   private static void createAndDropContinuousQueries()
@@ -803,7 +801,6 @@ public class SessionExample {
     List<String> paths = new ArrayList<>();
     paths.add("s1");
     paths.add("s2");
-    paths.add("s3");
     try (SessionDataSet sessionDataSet =
         sessionEnableRedirect.executeLastDataQueryForOneDevice(
             ROOT_SG1, ROOT_SG1_D1, paths, true)) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
index c0b099fbdcd..2e60b445426 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
@@ -58,11 +58,13 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 
 public class ClusterPartitionFetcher implements IPartitionFetcher {
@@ -227,7 +229,17 @@ public class ClusterPartitionFetcher implements 
IPartitionFetcher {
   @Override
   public List<TRegionReplicaSet> getAllDataPartitionsForOneDevice(
       String database, String deviceId) {
-    return partitionCache.getAllDataPartitionsForOneDevice(database, deviceId);
+    Optional<List<TRegionReplicaSet>> res =
+        partitionCache.getAllDataPartitionsForOneDevice(database, deviceId);
+    if (res.isPresent()) {
+      return res.get();
+    } else {
+      DataPartitionQueryParam queryParam =
+          new DataPartitionQueryParam(deviceId, Collections.emptyList(), true, 
true);
+      return getDataPartitionWithUnclosedTimeRange(
+              Collections.singletonMap(database, 
Collections.singletonList(queryParam)))
+          .getDataRegionReplicaSet(deviceId, Collections.emptyList());
+    }
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
index 0d1204d8178..a0a0268e072 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
@@ -67,6 +67,7 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -625,44 +626,47 @@ public class PartitionCache {
     }
   }
 
-  public List<TRegionReplicaSet> getAllDataPartitionsForOneDevice(
+  public Optional<List<TRegionReplicaSet>> getAllDataPartitionsForOneDevice(
       String database, String deviceId) {
-    DataPartitionTable dataPartitionTable = 
dataPartitionCache.getIfPresent(database);
-    if (dataPartitionTable != null) {
-      Map<TSeriesPartitionSlot, SeriesPartitionTable> 
cachedDatabasePartitionMap =
-          dataPartitionTable.getDataPartitionMap();
-
-      TSeriesPartitionSlot seriesPartitionSlot = 
partitionExecutor.getSeriesPartitionSlot(deviceId);
-      SeriesPartitionTable cachedSeriesPartitionTable =
-          cachedDatabasePartitionMap.get(seriesPartitionSlot);
-
-      if (cachedSeriesPartitionTable != null) {
-        List<TRegionReplicaSet> regionReplicaSets = new LinkedList<>();
-        TTimePartitionSlot current =
-            TimePartitionUtils.getTimePartition(System.currentTimeMillis());
-        MutableBoolean hasLatest = new MutableBoolean(false);
-        cachedSeriesPartitionTable
-            .getSeriesPartitionMap()
-            .forEach(
-                (slot, value) -> {
-                  for (TConsensusGroupId consensusGroupId : value) {
-                    
regionReplicaSets.add(getRegionReplicaSet(consensusGroupId));
-                  }
-                  hasLatest.setValue(
-                      hasLatest.booleanValue() || (slot.startTime == 
current.startTime));
-                });
-        if (hasLatest.booleanValue()) {
-          // cache hit
-          cacheMetrics.record(true, CacheMetrics.DATA_PARTITION_CACHE_NAME);
-          return regionReplicaSets;
+    dataPartitionCacheLock.readLock().lock();
+    try {
+      DataPartitionTable dataPartitionTable = 
dataPartitionCache.getIfPresent(database);
+      if (dataPartitionTable != null) {
+        Map<TSeriesPartitionSlot, SeriesPartitionTable> 
cachedDatabasePartitionMap =
+            dataPartitionTable.getDataPartitionMap();
+
+        TSeriesPartitionSlot seriesPartitionSlot =
+            partitionExecutor.getSeriesPartitionSlot(deviceId);
+        SeriesPartitionTable cachedSeriesPartitionTable =
+            cachedDatabasePartitionMap.get(seriesPartitionSlot);
+
+        if (cachedSeriesPartitionTable != null) {
+          List<TRegionReplicaSet> regionReplicaSets = new LinkedList<>();
+          TTimePartitionSlot current =
+              TimePartitionUtils.getTimePartition(System.currentTimeMillis());
+          MutableBoolean hasLatest = new MutableBoolean(false);
+          cachedSeriesPartitionTable
+              .getSeriesPartitionMap()
+              .forEach(
+                  (slot, value) -> {
+                    for (TConsensusGroupId consensusGroupId : value) {
+                      
regionReplicaSets.add(getRegionReplicaSet(consensusGroupId));
+                    }
+                    hasLatest.setValue(
+                        hasLatest.booleanValue() || (slot.startTime == 
current.startTime));
+                  });
+          if (hasLatest.booleanValue()) {
+            // cache hit
+            cacheMetrics.record(true, CacheMetrics.DATA_PARTITION_CACHE_NAME);
+            return Optional.of(regionReplicaSets);
+          }
         }
       }
+    } finally {
+      dataPartitionCacheLock.readLock().unlock();
     }
-    DataPartitionQueryParam queryParam =
-        new DataPartitionQueryParam(deviceId, Collections.emptyList(), true, 
true);
-    return getDataPartition(
-            Collections.singletonMap(database, 
Collections.singletonList(queryParam)))
-        .getDataRegionReplicaSet(deviceId, Collections.emptyList());
+
+    return Optional.empty();
   }
 
   /**

Reply via email to