This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch batch_memtable_metric13 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b16d27b10bc3009e6470215b862682c91187ae26 Author: HTHou <[email protected]> AuthorDate: Tue Oct 29 10:07:45 2024 +0800 [To 1.3] Batch update inserted point number metric --- .../dataregion/memtable/AbstractMemTable.java | 140 +++++---------------- .../dataregion/memtable/IMemTable.java | 11 +- .../dataregion/memtable/TsFileProcessor.java | 20 +-- .../wal/recover/file/TsFilePlanRedoer.java | 22 ++-- 4 files changed, 65 insertions(+), 128 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java index ce85508914f..bbabf29ac18 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java @@ -31,6 +31,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; import org.apache.iotdb.db.schemaengine.schemaregion.utils.ResourceByPathUtils; @@ -198,7 +199,7 @@ public abstract class AbstractMemTable implements IMemTable { } @Override - public void insert(InsertRowNode insertRowNode) { + public int insert(InsertRowNode insertRowNode) { String[] measurements = insertRowNode.getMeasurements(); Object[] values = insertRowNode.getValues(); @@ -228,39 +229,11 @@ public abstract class AbstractMemTable implements IMemTable { - nullPointsNumber; totalPointsNum += pointsInserted; - - MetricService.getInstance() - .count( - pointsInserted, - Metric.QUANTITY.toString(), - MetricLevel.CORE, - Tag.NAME.toString(), - METRIC_POINT_IN, - Tag.DATABASE.toString(), - database, - Tag.REGION.toString(), - dataRegionId, - Tag.TYPE.toString(), - Metric.MEMTABLE_POINT_COUNT.toString()); - if (!insertRowNode.isGeneratedByRemoteConsensusLeader()) { - MetricService.getInstance() - .count( - pointsInserted, - Metric.LEADER_QUANTITY.toString(), - MetricLevel.CORE, - Tag.NAME.toString(), - METRIC_POINT_IN, - Tag.DATABASE.toString(), - database, - Tag.REGION.toString(), - dataRegionId, - Tag.TYPE.toString(), - Metric.MEMTABLE_POINT_COUNT.toString()); - } + return pointsInserted; } @Override - public void insertAlignedRow(InsertRowNode insertRowNode) { + public int insertAlignedRow(InsertRowNode insertRowNode) { String[] measurements = insertRowNode.getMeasurements(); Object[] values = insertRowNode.getValues(); @@ -277,46 +250,18 @@ public abstract class AbstractMemTable implements IMemTable { dataTypes.add(schema.getType()); } if (schemaList.isEmpty()) { - return; + return 0; } memSize += MemUtils.getAlignedRowRecordSize(dataTypes, values); writeAlignedRow(insertRowNode.getDeviceID(), schemaList, insertRowNode.getTime(), values); int pointsInserted = insertRowNode.getMeasurements().length - insertRowNode.getFailedMeasurementNumber(); totalPointsNum += pointsInserted; - - MetricService.getInstance() - .count( - pointsInserted, - Metric.QUANTITY.toString(), - MetricLevel.CORE, - Tag.NAME.toString(), - METRIC_POINT_IN, - Tag.DATABASE.toString(), - database, - Tag.REGION.toString(), - dataRegionId, - Tag.TYPE.toString(), - Metric.MEMTABLE_POINT_COUNT.toString()); - if (!insertRowNode.isGeneratedByRemoteConsensusLeader()) { - MetricService.getInstance() - .count( - pointsInserted, - Metric.LEADER_QUANTITY.toString(), - MetricLevel.CORE, - Tag.NAME.toString(), - METRIC_POINT_IN, - Tag.DATABASE.toString(), - database, - Tag.REGION.toString(), - dataRegionId, - Tag.TYPE.toString(), - Metric.MEMTABLE_POINT_COUNT.toString()); - } + return pointsInserted; } @Override - public void insertTablet(InsertTabletNode insertTabletNode, int start, int end) + public int insertTablet(InsertTabletNode insertTabletNode, int start, int end) throws WriteProcessException { try { writeTabletNode(insertTabletNode, start, end); @@ -325,41 +270,14 @@ public abstract class AbstractMemTable implements IMemTable { (insertTabletNode.getDataTypes().length - insertTabletNode.getFailedMeasurementNumber()) * (end - start); totalPointsNum += pointsInserted; - MetricService.getInstance() - .count( - pointsInserted, - Metric.QUANTITY.toString(), - MetricLevel.CORE, - Tag.NAME.toString(), - METRIC_POINT_IN, - Tag.DATABASE.toString(), - database, - Tag.REGION.toString(), - dataRegionId, - Tag.TYPE.toString(), - Metric.MEMTABLE_POINT_COUNT.toString()); - if (!insertTabletNode.isGeneratedByRemoteConsensusLeader()) { - MetricService.getInstance() - .count( - pointsInserted, - Metric.LEADER_QUANTITY.toString(), - MetricLevel.CORE, - Tag.NAME.toString(), - METRIC_POINT_IN, - Tag.DATABASE.toString(), - database, - Tag.REGION.toString(), - dataRegionId, - Tag.TYPE.toString(), - Metric.MEMTABLE_POINT_COUNT.toString()); - } + return pointsInserted; } catch (RuntimeException e) { throw new WriteProcessException(e); } } @Override - public void insertAlignedTablet(InsertTabletNode insertTabletNode, int start, int end) + public int insertAlignedTablet(InsertTabletNode insertTabletNode, int start, int end) throws WriteProcessException { try { writeAlignedTablet(insertTabletNode, start, end); @@ -368,10 +286,31 @@ public abstract class AbstractMemTable implements IMemTable { (insertTabletNode.getDataTypes().length - insertTabletNode.getFailedMeasurementNumber()) * (end - start); totalPointsNum += pointsInserted; + return pointsInserted; + } catch (RuntimeException e) { + throw new WriteProcessException(e); + } + } + + public void updateMemtablePointCountMetric(InsertNode insertNode, int pointsInserted) { + MetricService.getInstance() + .count( + pointsInserted, + Metric.QUANTITY.toString(), + MetricLevel.CORE, + Tag.NAME.toString(), + METRIC_POINT_IN, + Tag.DATABASE.toString(), + database, + Tag.REGION.toString(), + dataRegionId, + Tag.TYPE.toString(), + Metric.MEMTABLE_POINT_COUNT.toString()); + if (!insertNode.isGeneratedByRemoteConsensusLeader()) { MetricService.getInstance() .count( pointsInserted, - Metric.QUANTITY.toString(), + Metric.LEADER_QUANTITY.toString(), MetricLevel.CORE, Tag.NAME.toString(), METRIC_POINT_IN, @@ -381,23 +320,6 @@ public abstract class AbstractMemTable implements IMemTable { dataRegionId, Tag.TYPE.toString(), Metric.MEMTABLE_POINT_COUNT.toString()); - if (!insertTabletNode.isGeneratedByRemoteConsensusLeader()) { - MetricService.getInstance() - .count( - pointsInserted, - Metric.LEADER_QUANTITY.toString(), - MetricLevel.CORE, - Tag.NAME.toString(), - METRIC_POINT_IN, - Tag.DATABASE.toString(), - database, - Tag.REGION.toString(), - dataRegionId, - Tag.TYPE.toString(), - Metric.MEMTABLE_POINT_COUNT.toString()); - } - } catch (RuntimeException e) { - throw new WriteProcessException(e); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java index 7918969d2e1..dc46cc8f33f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; import org.apache.iotdb.db.storageengine.dataregion.flush.FlushStatus; @@ -93,9 +94,9 @@ public interface IMemTable extends WALEntryValue { * * @param insertRowNode insertRowNode */ - void insert(InsertRowNode insertRowNode); + int insert(InsertRowNode insertRowNode); - void insertAlignedRow(InsertRowNode insertRowNode); + int insertAlignedRow(InsertRowNode insertRowNode); /** * insert tablet into this memtable. The rows to be inserted are in the range [start, end). Null @@ -106,10 +107,10 @@ public interface IMemTable extends WALEntryValue { * @param start included * @param end excluded */ - void insertTablet(InsertTabletNode insertTabletNode, int start, int end) + int insertTablet(InsertTabletNode insertTabletNode, int start, int end) throws WriteProcessException; - void insertAlignedTablet(InsertTabletNode insertTabletNode, int start, int end) + int insertAlignedTablet(InsertTabletNode insertTabletNode, int start, int end) throws WriteProcessException; ReadOnlyMemChunk query( @@ -205,4 +206,6 @@ public interface IMemTable extends WALEntryValue { void markAsNotGeneratedByPipe(); boolean isTotallyGeneratedByPipe(); + + void updateMemtablePointCountMetric(InsertNode insertNode, int pointsInserted); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index f39cb9f0bb0..5b025847fa4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -321,10 +321,11 @@ public class TsFileProcessor { insertRowNode, tsFileResource); + int pointInserted; if (insertRowNode.isAligned()) { - workMemTable.insertAlignedRow(insertRowNode); + pointInserted = workMemTable.insertAlignedRow(insertRowNode); } else { - workMemTable.insert(insertRowNode); + pointInserted = workMemTable.insert(insertRowNode); } // Update start time of this memtable @@ -334,6 +335,7 @@ public class TsFileProcessor { if (!sequence) { tsFileResource.updateEndTime(insertRowNode.getDeviceID(), insertRowNode.getTime()); } + workMemTable.updateMemtablePointCountMetric(insertRowNode, pointInserted); tsFileResource.updateProgressIndex(insertRowNode.getProgressIndex()); // RecordScheduleMemTableCost @@ -414,12 +416,13 @@ public class TsFileProcessor { walFlushListener.getWalEntryHandler(), insertRowsNode, tsFileResource); - for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) { + int pointInserted = 0; + for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) { if (insertRowNode.isAligned()) { - workMemTable.insertAlignedRow(insertRowNode); + pointInserted += workMemTable.insertAlignedRow(insertRowNode); } else { - workMemTable.insert(insertRowNode); + pointInserted += workMemTable.insert(insertRowNode); } // update start time of this memtable @@ -430,6 +433,7 @@ public class TsFileProcessor { tsFileResource.updateEndTime(insertRowNode.getDeviceID(), insertRowNode.getTime()); } } + workMemTable.updateMemtablePointCountMetric(insertRowsNode, pointInserted); tsFileResource.updateProgressIndex(insertRowsNode.getProgressIndex()); // recordScheduleMemTableCost costsForMetrics[3] += System.nanoTime() - startTime; @@ -526,11 +530,12 @@ public class TsFileProcessor { insertTabletNode, tsFileResource); + int pointInserted; try { if (insertTabletNode.isAligned()) { - workMemTable.insertAlignedTablet(insertTabletNode, start, end); + pointInserted = workMemTable.insertAlignedTablet(insertTabletNode, start, end); } else { - workMemTable.insertTablet(insertTabletNode, start, end); + pointInserted = workMemTable.insertTablet(insertTabletNode, start, end); } } catch (WriteProcessException e) { for (int i = start; i < end; i++) { @@ -550,6 +555,7 @@ public class TsFileProcessor { tsFileResource.updateEndTime( insertTabletNode.getDeviceID(), insertTabletNode.getTimes()[end - 1]); } + workMemTable.updateMemtablePointCountMetric(insertTabletNode, pointInserted); tsFileResource.updateProgressIndex(insertTabletNode.getProgressIndex()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java index 162b1ac02b2..55e385bf8a8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java @@ -91,24 +91,29 @@ public class TsFilePlanRedoer { } } + int pointsInserted; if (node instanceof InsertRowNode) { if (node.isAligned()) { - recoveryMemTable.insertAlignedRow((InsertRowNode) node); + pointsInserted = recoveryMemTable.insertAlignedRow((InsertRowNode) node); } else { - recoveryMemTable.insert((InsertRowNode) node); + pointsInserted = recoveryMemTable.insert((InsertRowNode) node); } } else { if (node.isAligned()) { - recoveryMemTable.insertAlignedTablet( - (InsertTabletNode) node, 0, ((InsertTabletNode) node).getRowCount()); + pointsInserted = + recoveryMemTable.insertAlignedTablet( + (InsertTabletNode) node, 0, ((InsertTabletNode) node).getRowCount()); } else { - recoveryMemTable.insertTablet( - (InsertTabletNode) node, 0, ((InsertTabletNode) node).getRowCount()); + pointsInserted = + recoveryMemTable.insertTablet( + (InsertTabletNode) node, 0, ((InsertTabletNode) node).getRowCount()); } } + recoveryMemTable.updateMemtablePointCountMetric(node, pointsInserted); } void redoInsertRows(InsertRowsNode insertRowsNode) { + int pointsInserted = 0; for (InsertRowNode node : insertRowsNode.getInsertRowNodeList()) { if (!node.hasValidMeasurements()) { continue; @@ -125,11 +130,12 @@ public class TsFilePlanRedoer { } } if (node.isAligned()) { - recoveryMemTable.insertAlignedRow(node); + pointsInserted += recoveryMemTable.insertAlignedRow(node); } else { - recoveryMemTable.insert(node); + pointsInserted += recoveryMemTable.insert(node); } } + recoveryMemTable.updateMemtablePointCountMetric(insertRowsNode, pointsInserted); } void resetRecoveryMemTable(IMemTable memTable) {
