This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f9ffee7015e Fix last cache metric updating logic (#12876)
f9ffee7015e is described below
commit f9ffee7015edf4443b75f9fec93d1d3682da9691
Author: Haonan <[email protected]>
AuthorDate: Mon Jul 8 19:48:18 2024 +0800
Fix last cache metric updating logic (#12876)
---
.../db/storageengine/dataregion/DataRegion.java | 77 +++++++++++-----------
1 file changed, 40 insertions(+), 37 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index d5dd8745e3d..7f49bf24818 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -916,6 +916,15 @@ public class DataRegion implements IDataRegionForQuery {
fileFlushPolicy.apply(this, tsFileProcessor,
tsFileProcessor.isSequence());
WritingMetrics.getInstance().recordMemControlFlushMemTableCount(1);
}
+ if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
+ if (!insertRowNode.isGeneratedByRemoteConsensusLeader()) {
+ // disable updating last cache on follower
+ startTime = System.nanoTime();
+ tryToUpdateInsertRowLastCache(insertRowNode);
+ PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost(
+ System.nanoTime() - startTime);
+ }
+ }
} finally {
writeUnlock();
}
@@ -1020,9 +1029,16 @@ public class DataRegion implements IDataRegionForQuery {
insertTabletNode, before, loc, isSequence, results,
beforeTimePartition)
&& noFailure;
}
- startTime = System.nanoTime();
- tryToUpdateInsertTabletLastCache(insertTabletNode);
-
PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost(System.nanoTime()
- startTime);
+
+ if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
+ if (!insertTabletNode.isGeneratedByRemoteConsensusLeader()) {
+ // disable updating last cache on follower
+ startTime = System.nanoTime();
+ tryToUpdateInsertTabletLastCache(insertTabletNode);
+ PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost(
+ System.nanoTime() - startTime);
+ }
+ }
if (!noFailure) {
throw new BatchProcessException(results);
@@ -1096,11 +1112,6 @@ public class DataRegion implements IDataRegionForQuery {
}
private void tryToUpdateInsertTabletLastCache(InsertTabletNode node) {
- if (!CommonDescriptor.getInstance().getConfig().isLastCacheEnable()
- || node.isGeneratedByRemoteConsensusLeader()) {
- // disable updating last cache on follower
- return;
- }
long latestFlushedTime =
lastFlushTimeMap.getGlobalFlushedTime(node.getDeviceID());
String[] measurements = node.getMeasurements();
MeasurementSchema[] measurementSchemas = node.getMeasurementSchemas();
@@ -1139,16 +1150,6 @@ public class DataRegion implements IDataRegionForQuery {
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(costsForMetrics[1]);
PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(costsForMetrics[2]);
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(costsForMetrics[3]);
-
- if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
- if (insertRowNode.isGeneratedByRemoteConsensusLeader()) {
- return tsFileProcessor;
- }
- // disable updating last cache on follower
- long startTime = System.nanoTime();
- tryToUpdateInsertRowLastCache(insertRowNode);
-
PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost(System.nanoTime()
- startTime);
- }
return tsFileProcessor;
}
@@ -1178,7 +1179,7 @@ public class DataRegion implements IDataRegionForQuery {
latestFlushedTime);
}
- private void insertToTsFileProcessors(
+ private List<InsertRowNode> insertToTsFileProcessors(
InsertRowsNode insertRowsNode, boolean[] areSequence, long[]
timePartitionIds) {
long[] costsForMetrics = new long[4];
Map<TsFileProcessor, InsertRowsNode> tsFileProcessorMap = new HashMap<>();
@@ -1241,16 +1242,7 @@ public class DataRegion implements IDataRegionForQuery {
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(costsForMetrics[1]);
PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(costsForMetrics[2]);
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(costsForMetrics[3]);
-
- if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
- if (insertRowsNode.isGeneratedByRemoteConsensusLeader()) {
- return;
- }
- // disable updating last cache on follower
- long startTime = System.nanoTime();
- tryToUpdateInsertRowsLastCache(executedInsertRowNodeList);
-
PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost(System.nanoTime()
- startTime);
- }
+ return executedInsertRowNodeList;
}
private void tryToUpdateInsertRowsLastCache(List<InsertRowNode> nodeList) {
@@ -3343,14 +3335,13 @@ public class DataRegion implements IDataRegionForQuery {
PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(costsForMetrics[2]);
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(costsForMetrics[3]);
if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
- if (insertRowsOfOneDeviceNode.isGeneratedByRemoteConsensusLeader()) {
- return;
+ if (!insertRowsOfOneDeviceNode.isGeneratedByRemoteConsensusLeader()) {
+ // disable updating last cache on follower
+ startTime = System.nanoTime();
+ tryToUpdateInsertRowsLastCache(executedInsertRowNodeList);
+ PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost(
+ System.nanoTime() - startTime);
}
- // disable updating last cache on follower
- startTime = System.nanoTime();
- tryToUpdateInsertRowsLastCache(executedInsertRowNodeList);
- PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost(
- System.nanoTime() - startTime);
}
} finally {
writeUnlock();
@@ -3410,7 +3401,19 @@ public class DataRegion implements IDataRegionForQuery {
> lastFlushTimeMap.getFlushedTime(
timePartitionIds[i], insertRowNode.getDeviceID());
}
- insertToTsFileProcessors(insertRowsNode, areSequence, timePartitionIds);
+ List<InsertRowNode> executedInsertRowNodeList =
+ insertToTsFileProcessors(insertRowsNode, areSequence,
timePartitionIds);
+
+ if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
+ if (!insertRowsNode.isGeneratedByRemoteConsensusLeader()) {
+ // disable updating last cache on follower
+ startTime = System.nanoTime();
+ tryToUpdateInsertRowsLastCache(executedInsertRowNodeList);
+ PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost(
+ System.nanoTime() - startTime);
+ }
+ }
+
if (!insertRowsNode.getResults().isEmpty()) {
throw new BatchProcessException("Partial failed inserting rows");
}