This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch batch_insert_tablets_metrics in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ada3b616f9a7596158775dda0041edf66c23bb48 Author: HTHou <[email protected]> AuthorDate: Wed Nov 20 11:03:03 2024 +0800 Batch update inserted points metric for insertMultiTablets --- .../db/storageengine/dataregion/DataRegion.java | 108 +++++++++++++-------- .../dataregion/memtable/AbstractMemTable.java | 35 ------- .../dataregion/memtable/IMemTable.java | 3 - .../dataregion/memtable/TsFileProcessor.java | 47 ++++----- .../wal/recover/file/TsFilePlanRedoer.java | 39 +++++++- .../dataregion/memtable/TsFileProcessorTest.java | 88 ++++++++--------- 6 files changed, 171 insertions(+), 149 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 37997c86591..85821b3b435 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -31,6 +31,8 @@ import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.schema.SchemaConstant; import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics; +import org.apache.iotdb.commons.service.metric.enums.Metric; +import org.apache.iotdb.commons.service.metric.enums.Tag; import org.apache.iotdb.commons.utils.CommonDateTimeUtils; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.commons.utils.TimePartitionUtils; @@ -131,6 +133,7 @@ import org.apache.iotdb.db.tools.settle.TsFileAndModSettleTool; import org.apache.iotdb.db.utils.CommonUtils; import org.apache.iotdb.db.utils.DateTimeUtils; import org.apache.iotdb.db.utils.ModificationUtils; +import org.apache.iotdb.metrics.utils.MetricLevel; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; @@ -1158,7 +1161,7 @@ public class DataRegion implements IDataRegionForQuery { InsertTabletNode insertTabletNode, Map<Long, List<int[]>[]> splitMap, TSStatus[] results, - long[] costsForMetrics) { + long[] infoForMetrics) { boolean noFailure = true; for (Entry<Long, List<int[]>[]> entry : splitMap.entrySet()) { long timePartitionId = entry.getKey(); @@ -1173,7 +1176,7 @@ public class DataRegion implements IDataRegionForQuery { results, timePartitionId, noFailure, - costsForMetrics) + infoForMetrics) && noFailure; } List<int[]> unSequenceRangeList = rangeLists[0]; @@ -1186,7 +1189,7 @@ public class DataRegion implements IDataRegionForQuery { results, timePartitionId, noFailure, - costsForMetrics) + infoForMetrics) && noFailure; } } @@ -1213,13 +1216,9 @@ public class DataRegion implements IDataRegionForQuery { } TSStatus[] results = new TSStatus[insertTabletNode.getRowCount()]; Arrays.fill(results, RpcUtils.SUCCESS_STATUS); - long[] costsForMetrics = new long[4]; - boolean noFailure = executeInsertTablet(insertTabletNode, results, costsForMetrics); - - PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(costsForMetrics[0]); - PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(costsForMetrics[1]); - PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(costsForMetrics[2]); - PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(costsForMetrics[3]); + long[] infoForMetrics = new long[5]; + boolean noFailure = executeInsertTablet(insertTabletNode, results, infoForMetrics); + updateTsFileProcessorMetric(insertTabletNode, infoForMetrics); if (!noFailure) { throw new BatchProcessException(results); @@ -1230,7 +1229,7 @@ public class DataRegion implements IDataRegionForQuery { } private boolean executeInsertTablet( - InsertTabletNode insertTabletNode, TSStatus[] results, long[] costsForMetrics) + InsertTabletNode insertTabletNode, TSStatus[] results, long[] infoForMetrics) throws OutOfTTLException { boolean noFailure; int loc = insertTabletNode.checkTTL(results, i -> getTTL(insertTabletNode)); @@ -1244,7 +1243,7 @@ public class DataRegion implements IDataRegionForQuery { split(insertTabletNode, start, end, splitInfo); start = end; } - noFailure = doInsert(insertTabletNode, splitInfo, results, costsForMetrics) && noFailure; + noFailure = doInsert(insertTabletNode, splitInfo, results, infoForMetrics) && noFailure; if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable() && !insertTabletNode.isGeneratedByRemoteConsensusLeader()) { @@ -1289,7 +1288,7 @@ public class DataRegion implements IDataRegionForQuery { TSStatus[] results, long timePartitionId, boolean noFailure, - long[] costsForMetrics) { + long[] infoForMetrics) { if (insertTabletNode.allMeasurementFailed()) { if (logger.isDebugEnabled()) { logger.debug( @@ -1319,8 +1318,7 @@ public class DataRegion implements IDataRegionForQuery { registerToTsFile(insertTabletNode, tsFileProcessor); try { - tsFileProcessor.insertTablet( - insertTabletNode, rangeList, results, noFailure, costsForMetrics); + tsFileProcessor.insertTablet(insertTabletNode, rangeList, results, noFailure, infoForMetrics); } catch (WriteProcessRejectException e) { logger.warn("insert to TsFileProcessor rejected, {}", e.getMessage()); return false; @@ -1358,12 +1356,9 @@ public class DataRegion implements IDataRegionForQuery { if (tsFileProcessor == null || insertRowNode.allMeasurementFailed()) { return null; } - long[] costsForMetrics = new long[4]; - tsFileProcessor.insert(insertRowNode, costsForMetrics); - PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(costsForMetrics[0]); - PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(costsForMetrics[1]); - PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(costsForMetrics[2]); - PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(costsForMetrics[3]); + long[] infoForMetrics = new long[5]; + tsFileProcessor.insert(insertRowNode, infoForMetrics); + updateTsFileProcessorMetric(insertRowNode, infoForMetrics); // register TableSchema (and maybe more) for table insertion registerToTsFile(insertRowNode, tsFileProcessor); return tsFileProcessor; @@ -1374,8 +1369,10 @@ public class DataRegion implements IDataRegionForQuery { } private List<InsertRowNode> insertToTsFileProcessors( - InsertRowsNode insertRowsNode, boolean[] areSequence, long[] timePartitionIds) { - long[] costsForMetrics = new long[4]; + InsertRowsNode insertRowsNode, + boolean[] areSequence, + long[] timePartitionIds, + long[] infoForMetrics) { Map<TsFileProcessor, InsertRowsNode> tsFileProcessorMap = new HashMap<>(); for (int i = 0; i < areSequence.length; i++) { InsertRowNode insertRowNode = insertRowsNode.getInsertRowNodeList().get(i); @@ -1416,7 +1413,7 @@ public class DataRegion implements IDataRegionForQuery { TsFileProcessor tsFileProcessor = entry.getKey(); InsertRowsNode subInsertRowsNode = entry.getValue(); try { - tsFileProcessor.insertRows(subInsertRowsNode, costsForMetrics); + tsFileProcessor.insertRows(subInsertRowsNode, infoForMetrics); } catch (WriteProcessException e) { insertRowsNode .getResults() @@ -1432,11 +1429,6 @@ public class DataRegion implements IDataRegionForQuery { fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence()); } } - - PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(costsForMetrics[0]); - PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(costsForMetrics[1]); - PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(costsForMetrics[2]); - PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(costsForMetrics[3]); return executedInsertRowNodeList; } @@ -3284,7 +3276,6 @@ public class DataRegion implements IDataRegionForQuery { return; } long ttl = getTTL(insertRowsOfOneDeviceNode); - long[] costsForMetrics = new long[4]; Map<TsFileProcessor, InsertRowsNode> tsFileProcessorMap = new HashMap<>(); for (int i = 0; i < insertRowsOfOneDeviceNode.getInsertRowNodeList().size(); i++) { InsertRowNode insertRowNode = insertRowsOfOneDeviceNode.getInsertRowNodeList().get(i); @@ -3348,11 +3339,12 @@ public class DataRegion implements IDataRegionForQuery { }); } List<InsertRowNode> executedInsertRowNodeList = new ArrayList<>(); + long[] infoForMetrics = new long[5]; for (Map.Entry<TsFileProcessor, InsertRowsNode> entry : tsFileProcessorMap.entrySet()) { TsFileProcessor tsFileProcessor = entry.getKey(); InsertRowsNode subInsertRowsNode = entry.getValue(); try { - tsFileProcessor.insertRows(subInsertRowsNode, costsForMetrics); + tsFileProcessor.insertRows(subInsertRowsNode, infoForMetrics); } catch (WriteProcessException e) { insertRowsOfOneDeviceNode .getResults() @@ -3368,10 +3360,7 @@ public class DataRegion implements IDataRegionForQuery { } } - PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(costsForMetrics[0]); - PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(costsForMetrics[1]); - PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(costsForMetrics[2]); - PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(costsForMetrics[3]); + updateTsFileProcessorMetric(insertRowsOfOneDeviceNode, infoForMetrics); if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable() && !insertRowsOfOneDeviceNode.isGeneratedByRemoteConsensusLeader()) { // disable updating last cache on follower @@ -3438,8 +3427,10 @@ public class DataRegion implements IDataRegionForQuery { > lastFlushTimeMap.getFlushedTime( timePartitionIds[i], insertRowNode.getDeviceID()); } + long[] infoForMetrics = new long[5]; List<InsertRowNode> executedInsertRowNodeList = - insertToTsFileProcessors(insertRowsNode, areSequence, timePartitionIds); + insertToTsFileProcessors(insertRowsNode, areSequence, timePartitionIds, infoForMetrics); + updateTsFileProcessorMetric(insertRowsNode, infoForMetrics); if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable() && !insertRowsNode.isGeneratedByRemoteConsensusLeader()) { @@ -3477,14 +3468,14 @@ public class DataRegion implements IDataRegionForQuery { insertMultiTabletsNode.getSearchIndex()); return; } - long[] costsForMetrics = new long[4]; + long[] infoForMetrics = new long[5]; for (int i = 0; i < insertMultiTabletsNode.getInsertTabletNodeList().size(); i++) { InsertTabletNode insertTabletNode = insertMultiTabletsNode.getInsertTabletNodeList().get(i); TSStatus[] results = new TSStatus[insertTabletNode.getRowCount()]; Arrays.fill(results, RpcUtils.SUCCESS_STATUS); boolean noFailure = false; try { - noFailure = executeInsertTablet(insertTabletNode, results, costsForMetrics); + noFailure = executeInsertTablet(insertTabletNode, results, infoForMetrics); } catch (WriteProcessException e) { insertMultiTabletsNode .getResults() @@ -3506,11 +3497,7 @@ public class DataRegion implements IDataRegionForQuery { insertMultiTabletsNode.getResults().put(i, firstStatus); } } - - PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(costsForMetrics[0]); - PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(costsForMetrics[1]); - PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(costsForMetrics[2]); - PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(costsForMetrics[3]); + updateTsFileProcessorMetric(insertMultiTabletsNode, infoForMetrics); } finally { writeUnlock(); @@ -3521,6 +3508,41 @@ public class DataRegion implements IDataRegionForQuery { } } + private void updateTsFileProcessorMetric(InsertNode insertNode, long[] infoForMetrics) { + PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(infoForMetrics[0]); + PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(infoForMetrics[1]); + PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(infoForMetrics[2]); + PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(infoForMetrics[3]); + MetricService.getInstance() + .count( + infoForMetrics[4], + Metric.QUANTITY.toString(), + MetricLevel.CORE, + Tag.NAME.toString(), + Metric.POINTS_IN.toString(), + Tag.DATABASE.toString(), + databaseName, + Tag.REGION.toString(), + dataRegionId, + Tag.TYPE.toString(), + Metric.MEMTABLE_POINT_COUNT.toString()); + if (!insertNode.isGeneratedByRemoteConsensusLeader()) { + MetricService.getInstance() + .count( + infoForMetrics[4], + Metric.LEADER_QUANTITY.toString(), + MetricLevel.CORE, + Tag.NAME.toString(), + Metric.POINTS_IN.toString(), + Tag.DATABASE.toString(), + databaseName, + Tag.REGION.toString(), + dataRegionId, + Tag.TYPE.toString(), + Metric.MEMTABLE_POINT_COUNT.toString()); + } + } + /** * @return the disk space occupied by this data region, unit is MB */ diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java index bd1d796ac65..6b0bd5e9f93 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java @@ -27,15 +27,12 @@ import org.apache.iotdb.commons.path.IFullPath; import org.apache.iotdb.commons.path.NonAlignedFullPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; -import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.commons.service.metric.enums.Metric; -import org.apache.iotdb.commons.service.metric.enums.Tag; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; import org.apache.iotdb.db.schemaengine.schemaregion.utils.ResourceByPathUtils; @@ -51,7 +48,6 @@ import org.apache.iotdb.db.utils.MemUtils; import org.apache.iotdb.db.utils.ModificationUtils; import org.apache.iotdb.db.utils.datastructure.AlignedTVList; import org.apache.iotdb.db.utils.datastructure.TVList; -import org.apache.iotdb.metrics.utils.MetricLevel; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.ChunkMetadata; @@ -306,37 +302,6 @@ public abstract class AbstractMemTable implements IMemTable { } } - public void updateMemtablePointCountMetric(InsertNode insertNode, int pointsInserted) { - MetricService.getInstance() - .count( - pointsInserted, - Metric.QUANTITY.toString(), - MetricLevel.CORE, - Tag.NAME.toString(), - METRIC_POINT_IN, - Tag.DATABASE.toString(), - database, - Tag.REGION.toString(), - dataRegionId, - Tag.TYPE.toString(), - Metric.MEMTABLE_POINT_COUNT.toString()); - if (!insertNode.isGeneratedByRemoteConsensusLeader()) { - MetricService.getInstance() - .count( - pointsInserted, - Metric.LEADER_QUANTITY.toString(), - MetricLevel.CORE, - Tag.NAME.toString(), - METRIC_POINT_IN, - Tag.DATABASE.toString(), - database, - Tag.REGION.toString(), - dataRegionId, - Tag.TYPE.toString(), - Metric.MEMTABLE_POINT_COUNT.toString()); - } - } - @Override public void write( IDeviceID deviceId, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java index f171c2d22a7..35d9ad0e95c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java @@ -24,7 +24,6 @@ import org.apache.iotdb.commons.path.IFullPath; import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; import org.apache.iotdb.db.storageengine.dataregion.flush.FlushStatus; @@ -203,6 +202,4 @@ public interface IMemTable extends WALEntryValue { void markAsNotGeneratedByPipe(); boolean isTotallyGeneratedByPipe(); - - void updateMemtablePointCountMetric(InsertNode insertNode, int pointsInserted); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index f478af7db50..7bd09293d76 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -249,12 +249,12 @@ public class TsFileProcessor { logger.info("reopen a tsfile processor {}", tsFileResource.getTsFile()); } - private void ensureMemTable(long[] costsForMetrics) { + private void ensureMemTable(long[] infoForMetrics) { if (workMemTable == null) { long startTime = System.nanoTime(); createNewWorkingMemTable(); // recordCreateMemtableBlockCost - costsForMetrics[0] += System.nanoTime() - startTime; + infoForMetrics[0] += System.nanoTime() - startTime; WritingMetrics.getInstance() .recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionId(), 1); } @@ -265,10 +265,10 @@ public class TsFileProcessor { * * @param insertRowNode physical plan of insertion */ - public void insert(InsertRowNode insertRowNode, long[] costsForMetrics) + public void insert(InsertRowNode insertRowNode, long[] infoForMetrics) throws WriteProcessException { - ensureMemTable(costsForMetrics); + ensureMemTable(infoForMetrics); long[] memIncrements; long memControlStartTime = System.nanoTime(); @@ -287,7 +287,7 @@ public class TsFileProcessor { insertRowNode.getDataTypes(), insertRowNode.getValues()); } // recordScheduleMemoryBlockCost - costsForMetrics[1] += System.nanoTime() - memControlStartTime; + infoForMetrics[1] += System.nanoTime() - memControlStartTime; long startTime = System.nanoTime(); WALFlushListener walFlushListener; @@ -306,7 +306,7 @@ public class TsFileProcessor { e); } finally { // recordScheduleWalCost - costsForMetrics[2] += System.nanoTime() - startTime; + infoForMetrics[2] += System.nanoTime() - startTime; } startTime = System.nanoTime(); @@ -337,17 +337,18 @@ public class TsFileProcessor { if (!sequence) { tsFileResource.updateEndTime(insertRowNode.getDeviceID(), insertRowNode.getTime()); } - workMemTable.updateMemtablePointCountMetric(insertRowNode, pointInserted); tsFileResource.updateProgressIndex(insertRowNode.getProgressIndex()); // RecordScheduleMemTableCost - costsForMetrics[3] += System.nanoTime() - startTime; + infoForMetrics[3] += System.nanoTime() - startTime; + // update memtable point inserted count + infoForMetrics[4] += pointInserted; } - public void insertRows(InsertRowsNode insertRowsNode, long[] costsForMetrics) + public void insertRows(InsertRowsNode insertRowsNode, long[] infoForMetrics) throws WriteProcessException { - ensureMemTable(costsForMetrics); + ensureMemTable(infoForMetrics); long[] memIncrements; @@ -377,7 +378,7 @@ public class TsFileProcessor { } } // recordScheduleMemoryBlockCost - costsForMetrics[1] += System.nanoTime() - memControlStartTime; + infoForMetrics[1] += System.nanoTime() - memControlStartTime; long startTime = System.nanoTime(); WALFlushListener walFlushListener; @@ -396,7 +397,7 @@ public class TsFileProcessor { e); } finally { // recordScheduleWalCost - costsForMetrics[2] += System.nanoTime() - startTime; + infoForMetrics[2] += System.nanoTime() - startTime; } startTime = System.nanoTime(); @@ -428,11 +429,12 @@ public class TsFileProcessor { tsFileResource.updateEndTime(insertRowNode.getDeviceID(), insertRowNode.getTime()); } } - workMemTable.updateMemtablePointCountMetric(insertRowsNode, pointInserted); tsFileResource.updateProgressIndex(insertRowsNode.getProgressIndex()); // recordScheduleMemTableCost - costsForMetrics[3] += System.nanoTime() - startTime; + infoForMetrics[3] += System.nanoTime() - startTime; + // update memtable point inserted count + infoForMetrics[4] += pointInserted; } private void createNewWorkingMemTable() { @@ -449,7 +451,7 @@ public class TsFileProcessor { List<int[]> rangeList, TSStatus[] results, boolean noFailure, - long[] costsForMetrics) + long[] infoForMetrics) throws WriteProcessException { long memControlStartTime = System.nanoTime(); long[] totalMemIncrements = new long[NUM_MEM_TO_ESTIMATE]; @@ -469,7 +471,7 @@ public class TsFileProcessor { } } // recordScheduleMemoryBlockCost - costsForMetrics[1] += System.nanoTime() - memControlStartTime; + infoForMetrics[1] += System.nanoTime() - memControlStartTime; return totalMemIncrements; } @@ -535,13 +537,13 @@ public class TsFileProcessor { List<int[]> rangeList, TSStatus[] results, boolean noFailure, - long[] costsForMetrics) + long[] infoForMetrics) throws WriteProcessException { - ensureMemTable(costsForMetrics); + ensureMemTable(infoForMetrics); long[] memIncrements = - scheduleMemoryBlock(insertTabletNode, rangeList, results, noFailure, costsForMetrics); + scheduleMemoryBlock(insertTabletNode, rangeList, results, noFailure, infoForMetrics); long startTime = System.nanoTime(); WALFlushListener walFlushListener; @@ -562,7 +564,7 @@ public class TsFileProcessor { throw new WriteProcessException(e); } finally { // recordScheduleWalCost - costsForMetrics[2] += System.nanoTime() - startTime; + infoForMetrics[2] += System.nanoTime() - startTime; } startTime = System.nanoTime(); @@ -624,11 +626,12 @@ public class TsFileProcessor { } } } - workMemTable.updateMemtablePointCountMetric(insertTabletNode, pointInserted); tsFileResource.updateProgressIndex(insertTabletNode.getProgressIndex()); // recordScheduleMemTableCost - costsForMetrics[3] += System.nanoTime() - startTime; + infoForMetrics[3] += System.nanoTime() - startTime; + // update memtable point inserted count + infoForMetrics[4] += pointInserted; } @SuppressWarnings("squid:S3776") // High Cognitive Complexity diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java index e1455e97b54..e3d560a0173 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java @@ -20,6 +20,9 @@ package org.apache.iotdb.db.storageengine.dataregion.wal.recover.file; import org.apache.iotdb.commons.path.MeasurementPath; +import org.apache.iotdb.commons.service.metric.MetricService; +import org.apache.iotdb.commons.service.metric.enums.Metric; +import org.apache.iotdb.commons.service.metric.enums.Tag; import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; @@ -34,6 +37,7 @@ import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; import org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEntry; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.metrics.utils.MetricLevel; import java.io.IOException; import java.util.ArrayList; @@ -114,7 +118,7 @@ public class TsFilePlanRedoer { (InsertTabletNode) node, 0, ((InsertTabletNode) node).getRowCount()); } } - recoveryMemTable.updateMemtablePointCountMetric(node, pointsInserted); + updatePointsInsertedMetric(node, pointsInserted); } void redoInsertRows(InsertRowsNode insertRowsNode) { @@ -140,7 +144,38 @@ public class TsFilePlanRedoer { pointsInserted += recoveryMemTable.insert(node); } } - recoveryMemTable.updateMemtablePointCountMetric(insertRowsNode, pointsInserted); + updatePointsInsertedMetric(insertRowsNode, pointsInserted); + } + + private void updatePointsInsertedMetric(InsertNode insertNode, int pointsInserted) { + MetricService.getInstance() + .count( + pointsInserted, + Metric.QUANTITY.toString(), + MetricLevel.CORE, + Tag.NAME.toString(), + Metric.POINTS_IN.toString(), + Tag.DATABASE.toString(), + tsFileResource.getDatabaseName(), + Tag.REGION.toString(), + tsFileResource.getDataRegionId(), + Tag.TYPE.toString(), + Metric.MEMTABLE_POINT_COUNT.toString()); + if (!insertNode.isGeneratedByRemoteConsensusLeader()) { + MetricService.getInstance() + .count( + pointsInserted, + Metric.LEADER_QUANTITY.toString(), + MetricLevel.CORE, + Tag.NAME.toString(), + Metric.POINTS_IN.toString(), + Tag.DATABASE.toString(), + tsFileResource.getDatabaseName(), + Tag.REGION.toString(), + tsFileResource.getDataRegionId(), + Tag.TYPE.toString(), + Metric.MEMTABLE_POINT_COUNT.toString()); + } } void resetRecoveryMemTable(IMemTable memTable) { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java index 7daa2ef837f..2e731ed66c9 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java @@ -132,7 +132,7 @@ public class TsFileProcessorTest { for (int i = 1; i <= 100; i++) { TSRecord record = new TSRecord(i, deviceId); record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i))); - processor.insert(buildInsertRowNodeByTSRecord(record), new long[4]); + processor.insert(buildInsertRowNodeByTSRecord(record), new long[5]); } // query data in memory @@ -190,7 +190,7 @@ public class TsFileProcessorTest { for (int i = 1; i <= 100; i++) { TSRecord record = new TSRecord(i, deviceId); record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i))); - processor.insert(buildInsertRowNodeByTSRecord(record), new long[4]); + processor.insert(buildInsertRowNodeByTSRecord(record), new long[5]); } // query data in memory @@ -274,7 +274,7 @@ public class TsFileProcessorTest { for (int i = 1; i <= 10; i++) { TSRecord record = new TSRecord(i, deviceId); record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i))); - processor.insert(buildInsertRowNodeByTSRecord(record), new long[4]); + processor.insert(buildInsertRowNodeByTSRecord(record), new long[5]); } processor.asyncFlush(); } @@ -308,7 +308,7 @@ public class TsFileProcessorTest { Collections.singletonList(new int[] {0, 10}), new TSStatus[10], true, - new long[4]); + new long[5]); IMemTable memTable = processor.getWorkMemTable(); Assert.assertEquals(1596808, memTable.getTVListsRamCost()); processor.insertTablet( @@ -316,14 +316,14 @@ public class TsFileProcessorTest { Collections.singletonList(new int[] {0, 10}), new TSStatus[10], true, - new long[4]); + new long[5]); Assert.assertEquals(1596808, memTable.getTVListsRamCost()); processor.insertTablet( genInsertTableNode(200, true), Collections.singletonList(new int[] {0, 10}), new TSStatus[10], true, - new long[4]); + new long[5]); Assert.assertEquals(1596808, memTable.getTVListsRamCost()); Assert.assertEquals(90000, memTable.getTotalPointsNum()); Assert.assertEquals(720360, memTable.memSize()); @@ -331,7 +331,7 @@ public class TsFileProcessorTest { for (int i = 1; i <= 100; i++) { TSRecord record = new TSRecord(i, deviceId); record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i))); - processor.insert(buildInsertRowNodeByTSRecord(record), new long[4]); + processor.insert(buildInsertRowNodeByTSRecord(record), new long[5]); } Assert.assertEquals(1598424, memTable.getTVListsRamCost()); Assert.assertEquals(90100, memTable.getTotalPointsNum()); @@ -359,7 +359,7 @@ public class TsFileProcessorTest { Collections.singletonList(new int[] {0, 10}), new TSStatus[10], true, - new long[4]); + new long[5]); IMemTable memTable = processor.getWorkMemTable(); Assert.assertEquals(1596808, memTable.getTVListsRamCost()); processor.insertTablet( @@ -367,49 +367,49 @@ public class TsFileProcessorTest { Collections.singletonList(new int[] {0, 10}), new TSStatus[10], true, - new long[4]); + new long[5]); Assert.assertEquals(3192808, memTable.getTVListsRamCost()); processor.insertTablet( genInsertTableNode(100, true), Collections.singletonList(new int[] {0, 10}), new TSStatus[10], true, - new long[4]); + new long[5]); Assert.assertEquals(3192808, memTable.getTVListsRamCost()); processor.insertTablet( genInsertTableNodeFors3000ToS6000(100, true), Collections.singletonList(new int[] {0, 10}), new TSStatus[10], true, - new long[4]); + new long[5]); Assert.assertEquals(3192808, memTable.getTVListsRamCost()); processor.insertTablet( genInsertTableNode(200, true), Collections.singletonList(new int[] {0, 10}), new TSStatus[10], true, - new long[4]); + new long[5]); Assert.assertEquals(3192808, memTable.getTVListsRamCost()); processor.insertTablet( genInsertTableNodeFors3000ToS6000(200, true), Collections.singletonList(new int[] {0, 10}), new TSStatus[10], true, - new long[4]); + new long[5]); Assert.assertEquals(3192808, memTable.getTVListsRamCost()); processor.insertTablet( genInsertTableNode(300, true), Collections.singletonList(new int[] {0, 10}), new TSStatus[10], true, - new long[4]); + new long[5]); Assert.assertEquals(6385616, memTable.getTVListsRamCost()); processor.insertTablet( genInsertTableNodeFors3000ToS6000(300, true), Collections.singletonList(new int[] {0, 10}), new TSStatus[10], true, - new long[4]); + new long[5]); Assert.assertEquals(6385616, memTable.getTVListsRamCost()); Assert.assertEquals(240000, memTable.getTotalPointsNum()); @@ -418,14 +418,14 @@ public class TsFileProcessorTest { for (int i = 1; i <= 100; i++) { TSRecord record = new TSRecord(i, deviceId); record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i))); - processor.insert(buildInsertRowNodeByTSRecord(record), new long[4]); + processor.insert(buildInsertRowNodeByTSRecord(record), new long[5]); } Assert.assertEquals(6387232, memTable.getTVListsRamCost()); // Test records for (int i = 1; i <= 100; i++) { TSRecord record = new TSRecord(i, deviceId); record.addTuple(DataPoint.getDataPoint(dataType, "s1", String.valueOf(i))); - processor.insert(buildInsertRowNodeByTSRecord(record), new long[4]); + processor.insert(buildInsertRowNodeByTSRecord(record), new long[5]); } Assert.assertEquals(6388848, memTable.getTVListsRamCost()); Assert.assertEquals(240200, memTable.getTotalPointsNum()); @@ -453,7 +453,7 @@ public class TsFileProcessorTest { Collections.singletonList(new int[] {0, 10}), new TSStatus[10], true, - new long[4]); + new long[5]); IMemTable memTable = processor.getWorkMemTable(); Assert.assertEquals(3192000, memTable.getTVListsRamCost()); processor.insertTablet( @@ -461,14 +461,14 @@ public class TsFileProcessorTest { Collections.singletonList(new int[] {0, 10}), new TSStatus[10], true, - new long[4]); + new long[5]); Assert.assertEquals(3192000, memTable.getTVListsRamCost()); processor.insertTablet( genInsertTableNode(200, false), Collections.singletonList(new int[] {0, 10}), new TSStatus[10], true, - new long[4]); + new long[5]); Assert.assertEquals(3192000, memTable.getTVListsRamCost()); Assert.assertEquals(90000, memTable.getTotalPointsNum()); Assert.assertEquals(1440000, memTable.memSize()); @@ -476,7 +476,7 @@ public class TsFileProcessorTest { for (int i = 1; i <= 100; i++) { TSRecord record = new TSRecord(i, deviceId); record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i))); - processor.insert(buildInsertRowNodeByTSRecord(record), new long[4]); + processor.insert(buildInsertRowNodeByTSRecord(record), new long[5]); } Assert.assertEquals(3193616, memTable.getTVListsRamCost()); Assert.assertEquals(90100, memTable.getTotalPointsNum()); @@ -502,7 +502,7 @@ public class TsFileProcessorTest { for (int i = 1; i <= 100; i++) { TSRecord record = new TSRecord(i, deviceId); record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i))); - processor1.insert(buildInsertRowNodeByTSRecord(record), new long[4]); + processor1.insert(buildInsertRowNodeByTSRecord(record), new long[5]); } IMemTable memTable1 = processor1.getWorkMemTable(); @@ -525,7 +525,7 @@ public class TsFileProcessorTest { record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i))); insertRowsNode.addOneInsertRowNode(buildInsertRowNodeByTSRecord(record), i - 1); } - processor2.insertRows(insertRowsNode, new long[4]); + processor2.insertRows(insertRowsNode, new long[5]); IMemTable memTable2 = processor2.getWorkMemTable(); Assert.assertEquals(memTable1.getTVListsRamCost(), memTable2.getTVListsRamCost()); @@ -536,19 +536,19 @@ public class TsFileProcessorTest { TSRecord record = new TSRecord(101, deviceId); record.addTuple(DataPoint.getDataPoint(dataType, measurementId, "1")); InsertRowNode insertRowNode1 = buildInsertRowNodeByTSRecord(record); - processor1.insert(insertRowNode1, new long[4]); + processor1.insert(insertRowNode1, new long[5]); record = new TSRecord(101, deviceId); record.addTuple(DataPoint.getDataPoint(dataType, "s99", "1")); InsertRowNode insertRowNode2 = buildInsertRowNodeByTSRecord(record); - processor1.insert(insertRowNode2, new long[4]); + processor1.insert(insertRowNode2, new long[5]); record = new TSRecord(102, deviceId); record.addTuple(DataPoint.getDataPoint(dataType, "s99", "1")); InsertRowNode insertRowNode3 = buildInsertRowNodeByTSRecord(record); - processor1.insert(insertRowNode3, new long[4]); + processor1.insert(insertRowNode3, new long[5]); record = new TSRecord(102, "root.vehicle.d2"); record.addTuple(DataPoint.getDataPoint(dataType, measurementId, "1")); InsertRowNode insertRowNode4 = buildInsertRowNodeByTSRecord(record); - processor1.insert(insertRowNode4, new long[4]); + processor1.insert(insertRowNode4, new long[5]); // insert more rows by insertRows insertRowsNode = new InsertRowsNode(new PlanNodeId("")); @@ -556,7 +556,7 @@ public class TsFileProcessorTest { insertRowsNode.addOneInsertRowNode(insertRowNode2, 1); insertRowsNode.addOneInsertRowNode(insertRowNode3, 2); insertRowsNode.addOneInsertRowNode(insertRowNode4, 3); - processor2.insertRows(insertRowsNode, new long[4]); + processor2.insertRows(insertRowsNode, new long[5]); Assert.assertEquals(memTable1.getTVListsRamCost(), memTable2.getTVListsRamCost()); Assert.assertEquals(memTable1.getTotalPointsNum(), memTable2.getTotalPointsNum()); @@ -568,9 +568,9 @@ public class TsFileProcessorTest { insertRowNode1.setMeasurements(new String[1]); insertRowNode1.setValues(new String[1]); insertRowsNode.addOneInsertRowNode(insertRowNode1, 0); - processor2.insertRows(insertRowsNode, new long[4]); + processor2.insertRows(insertRowsNode, new long[5]); - processor1.insert(insertRowNode1, new long[4]); + processor1.insert(insertRowNode1, new long[5]); Assert.assertEquals(memTable1.getTVListsRamCost(), memTable2.getTVListsRamCost()); Assert.assertEquals(memTable1.getTotalPointsNum(), memTable2.getTotalPointsNum()); Assert.assertEquals(memTable1.memSize(), memTable2.memSize()); @@ -597,7 +597,7 @@ public class TsFileProcessorTest { record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i))); InsertRowNode node = buildInsertRowNodeByTSRecord(record); node.setAligned(true); - processor1.insert(node, new long[4]); + processor1.insert(node, new long[5]); } IMemTable memTable1 = processor1.getWorkMemTable(); @@ -623,7 +623,7 @@ public class TsFileProcessorTest { node.setAligned(true); insertRowsNode.addOneInsertRowNode(node, i - 1); } - processor2.insertRows(insertRowsNode, new long[4]); + processor2.insertRows(insertRowsNode, new long[5]); IMemTable memTable2 = processor2.getWorkMemTable(); Assert.assertEquals(memTable1.getTVListsRamCost(), memTable2.getTVListsRamCost()); @@ -635,22 +635,22 @@ public class TsFileProcessorTest { record.addTuple(DataPoint.getDataPoint(dataType, measurementId, "1")); InsertRowNode insertRowNode1 = buildInsertRowNodeByTSRecord(record); insertRowNode1.setAligned(true); - processor1.insert(insertRowNode1, new long[4]); + processor1.insert(insertRowNode1, new long[5]); record = new TSRecord(101, deviceId); record.addTuple(DataPoint.getDataPoint(dataType, "s99", "1")); InsertRowNode insertRowNode2 = buildInsertRowNodeByTSRecord(record); insertRowNode2.setAligned(true); - processor1.insert(insertRowNode2, new long[4]); + processor1.insert(insertRowNode2, new long[5]); record = new TSRecord(102, deviceId); record.addTuple(DataPoint.getDataPoint(dataType, "s99", "1")); InsertRowNode insertRowNode3 = buildInsertRowNodeByTSRecord(record); insertRowNode3.setAligned(true); - processor1.insert(insertRowNode3, new long[4]); + processor1.insert(insertRowNode3, new long[5]); record = new TSRecord(102, "root.vehicle.d2"); record.addTuple(DataPoint.getDataPoint(dataType, measurementId, "1")); InsertRowNode insertRowNode4 = buildInsertRowNodeByTSRecord(record); insertRowNode4.setAligned(true); - processor1.insert(insertRowNode4, new long[4]); + processor1.insert(insertRowNode4, new long[5]); // insert more rows by insertRows insertRowsNode = new InsertRowsNode(new PlanNodeId("")); @@ -659,7 +659,7 @@ public class TsFileProcessorTest { insertRowsNode.addOneInsertRowNode(insertRowNode2, 1); insertRowsNode.addOneInsertRowNode(insertRowNode3, 2); insertRowsNode.addOneInsertRowNode(insertRowNode4, 3); - processor2.insertRows(insertRowsNode, new long[4]); + processor2.insertRows(insertRowsNode, new long[5]); Assert.assertEquals(memTable1.getTVListsRamCost(), memTable2.getTVListsRamCost()); Assert.assertEquals(memTable1.getTotalPointsNum(), memTable2.getTotalPointsNum()); @@ -672,9 +672,9 @@ public class TsFileProcessorTest { insertRowNode1.setValues(new String[1]); insertRowsNode.addOneInsertRowNode(insertRowNode1, 0); insertRowsNode.setAligned(true); - processor2.insertRows(insertRowsNode, new long[4]); + processor2.insertRows(insertRowsNode, new long[5]); - processor1.insert(insertRowNode1, new long[4]); + processor1.insert(insertRowNode1, new long[5]); Assert.assertEquals(memTable1.getTVListsRamCost(), memTable2.getTVListsRamCost()); Assert.assertEquals(memTable1.getTotalPointsNum(), memTable2.getTotalPointsNum()); Assert.assertEquals(memTable1.memSize(), memTable2.memSize()); @@ -703,7 +703,7 @@ public class TsFileProcessorTest { if (i <= 50) { node.setAligned(true); } - processor1.insert(node, new long[4]); + processor1.insert(node, new long[5]); } IMemTable memTable1 = processor1.getWorkMemTable(); @@ -732,7 +732,7 @@ public class TsFileProcessorTest { } insertRowsNode.addOneInsertRowNode(node, i - 1); } - processor2.insertRows(insertRowsNode, new long[4]); + processor2.insertRows(insertRowsNode, new long[5]); IMemTable memTable2 = processor2.getWorkMemTable(); Assert.assertEquals(memTable1.getTVListsRamCost(), memTable2.getTVListsRamCost()); @@ -764,7 +764,7 @@ public class TsFileProcessorTest { if (i <= 50) { node.setAligned(true); } - processor1.insert(node, new long[4]); + processor1.insert(node, new long[5]); } IMemTable memTable1 = processor1.getWorkMemTable(); @@ -794,7 +794,7 @@ public class TsFileProcessorTest { } insertRowsNode.addOneInsertRowNode(node, i - 1); } - processor2.insertRows(insertRowsNode, new long[4]); + processor2.insertRows(insertRowsNode, new long[5]); IMemTable memTable2 = processor2.getWorkMemTable(); Assert.assertEquals(memTable1.getTVListsRamCost(), memTable2.getTVListsRamCost()); @@ -831,7 +831,7 @@ public class TsFileProcessorTest { for (int i = 1; i <= 100; i++) { TSRecord record = new TSRecord(i, deviceId); record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i))); - processor.insert(buildInsertRowNodeByTSRecord(record), new long[4]); + processor.insert(buildInsertRowNodeByTSRecord(record), new long[5]); } // query data in memory
