This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new f9ffee7015e Fix last cache metric updating logic (#12876)
f9ffee7015e is described below

commit f9ffee7015edf4443b75f9fec93d1d3682da9691
Author: Haonan <[email protected]>
AuthorDate: Mon Jul 8 19:48:18 2024 +0800

    Fix last cache metric updating logic (#12876)
---
 .../db/storageengine/dataregion/DataRegion.java    | 77 +++++++++++-----------
 1 file changed, 40 insertions(+), 37 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index d5dd8745e3d..7f49bf24818 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -916,6 +916,15 @@ public class DataRegion implements IDataRegionForQuery {
         fileFlushPolicy.apply(this, tsFileProcessor, 
tsFileProcessor.isSequence());
         WritingMetrics.getInstance().recordMemControlFlushMemTableCount(1);
       }
+      if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
+        if (!insertRowNode.isGeneratedByRemoteConsensusLeader()) {
+          // disable updating last cache on follower
+          startTime = System.nanoTime();
+          tryToUpdateInsertRowLastCache(insertRowNode);
+          PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost(
+              System.nanoTime() - startTime);
+        }
+      }
     } finally {
       writeUnlock();
     }
@@ -1020,9 +1029,16 @@ public class DataRegion implements IDataRegionForQuery {
                     insertTabletNode, before, loc, isSequence, results, 
beforeTimePartition)
                 && noFailure;
       }
-      startTime = System.nanoTime();
-      tryToUpdateInsertTabletLastCache(insertTabletNode);
-      
PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost(System.nanoTime()
 - startTime);
+
+      if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
+        if (!insertTabletNode.isGeneratedByRemoteConsensusLeader()) {
+          // disable updating last cache on follower
+          startTime = System.nanoTime();
+          tryToUpdateInsertTabletLastCache(insertTabletNode);
+          PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost(
+              System.nanoTime() - startTime);
+        }
+      }
 
       if (!noFailure) {
         throw new BatchProcessException(results);
@@ -1096,11 +1112,6 @@ public class DataRegion implements IDataRegionForQuery {
   }
 
   private void tryToUpdateInsertTabletLastCache(InsertTabletNode node) {
-    if (!CommonDescriptor.getInstance().getConfig().isLastCacheEnable()
-        || node.isGeneratedByRemoteConsensusLeader()) {
-      // disable updating last cache on follower
-      return;
-    }
     long latestFlushedTime = 
lastFlushTimeMap.getGlobalFlushedTime(node.getDeviceID());
     String[] measurements = node.getMeasurements();
     MeasurementSchema[] measurementSchemas = node.getMeasurementSchemas();
@@ -1139,16 +1150,6 @@ public class DataRegion implements IDataRegionForQuery {
     
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(costsForMetrics[1]);
     PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(costsForMetrics[2]);
     
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(costsForMetrics[3]);
-
-    if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
-      if (insertRowNode.isGeneratedByRemoteConsensusLeader()) {
-        return tsFileProcessor;
-      }
-      // disable updating last cache on follower
-      long startTime = System.nanoTime();
-      tryToUpdateInsertRowLastCache(insertRowNode);
-      
PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost(System.nanoTime()
 - startTime);
-    }
     return tsFileProcessor;
   }
 
@@ -1178,7 +1179,7 @@ public class DataRegion implements IDataRegionForQuery {
             latestFlushedTime);
   }
 
-  private void insertToTsFileProcessors(
+  private List<InsertRowNode> insertToTsFileProcessors(
       InsertRowsNode insertRowsNode, boolean[] areSequence, long[] 
timePartitionIds) {
     long[] costsForMetrics = new long[4];
     Map<TsFileProcessor, InsertRowsNode> tsFileProcessorMap = new HashMap<>();
@@ -1241,16 +1242,7 @@ public class DataRegion implements IDataRegionForQuery {
     
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(costsForMetrics[1]);
     PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(costsForMetrics[2]);
     
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(costsForMetrics[3]);
-
-    if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
-      if (insertRowsNode.isGeneratedByRemoteConsensusLeader()) {
-        return;
-      }
-      // disable updating last cache on follower
-      long startTime = System.nanoTime();
-      tryToUpdateInsertRowsLastCache(executedInsertRowNodeList);
-      
PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost(System.nanoTime()
 - startTime);
-    }
+    return executedInsertRowNodeList;
   }
 
   private void tryToUpdateInsertRowsLastCache(List<InsertRowNode> nodeList) {
@@ -3343,14 +3335,13 @@ public class DataRegion implements IDataRegionForQuery {
       PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(costsForMetrics[2]);
       
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(costsForMetrics[3]);
       if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
-        if (insertRowsOfOneDeviceNode.isGeneratedByRemoteConsensusLeader()) {
-          return;
+        if (!insertRowsOfOneDeviceNode.isGeneratedByRemoteConsensusLeader()) {
+          // disable updating last cache on follower
+          startTime = System.nanoTime();
+          tryToUpdateInsertRowsLastCache(executedInsertRowNodeList);
+          PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost(
+              System.nanoTime() - startTime);
         }
-        // disable updating last cache on follower
-        startTime = System.nanoTime();
-        tryToUpdateInsertRowsLastCache(executedInsertRowNodeList);
-        PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost(
-            System.nanoTime() - startTime);
       }
     } finally {
       writeUnlock();
@@ -3410,7 +3401,19 @@ public class DataRegion implements IDataRegionForQuery {
                     > lastFlushTimeMap.getFlushedTime(
                         timePartitionIds[i], insertRowNode.getDeviceID());
       }
-      insertToTsFileProcessors(insertRowsNode, areSequence, timePartitionIds);
+      List<InsertRowNode> executedInsertRowNodeList =
+          insertToTsFileProcessors(insertRowsNode, areSequence, 
timePartitionIds);
+
+      if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
+        if (!insertRowsNode.isGeneratedByRemoteConsensusLeader()) {
+          // disable updating last cache on follower
+          startTime = System.nanoTime();
+          tryToUpdateInsertRowsLastCache(executedInsertRowNodeList);
+          PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost(
+              System.nanoTime() - startTime);
+        }
+      }
+
       if (!insertRowsNode.getResults().isEmpty()) {
         throw new BatchProcessException("Partial failed inserting rows");
       }

Reply via email to