This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new af0dba3dcdd Pipe Consensus: Add load point metrics on receiver side
(#12820)
af0dba3dcdd is described below
commit af0dba3dcddedb09978053025f8269a4ba03a2f0
Author: Zikun Ma <[email protected]>
AuthorDate: Fri Jun 28 19:00:27 2024 +0800
Pipe Consensus: Add load point metrics on receiver side (#12820)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../pipeconsensus/PipeConsensusSyncConnector.java | 2 +
.../PipeConsensusTsFileInsertionEventHandler.java | 2 +
.../request/PipeConsensusTsFileSealReq.java | 9 ++-
.../request/PipeConsensusTsFileSealWithModReq.java | 2 +
.../common/tsfile/PipeTsFileInsertionEvent.java | 26 ++++++
.../common/tsfile/TsFileInsertionPointCounter.java | 24 +++---
.../pipeconsensus/PipeConsensusReceiver.java | 50 +++++++++++-
.../execution/load/LoadTsFileManager.java | 93 +++++++++++-----------
.../dataregion/memtable/TsFileProcessor.java | 10 +++
.../request/PipeConsensusTransferFileSealReq.java | 11 +++
.../PipeConsensusTransferFileSealWithModReq.java | 19 +++++
11 files changed, 192 insertions(+), 56 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
index c10c5bf93bf..ad7ae47b302 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
@@ -320,6 +320,7 @@ public class PipeConsensusSyncConnector extends
IoTDBConnector {
modFile.length(),
tsFile.getName(),
tsFile.length(),
+ pipeTsFileInsertionEvent.getFlushPointCount(),
tCommitId,
tConsensusGroupId,
pipeTsFileInsertionEvent.getProgressIndex(),
@@ -333,6 +334,7 @@ public class PipeConsensusSyncConnector extends
IoTDBConnector {
PipeConsensusTsFileSealReq.toTPipeConsensusTransferReq(
tsFile.getName(),
tsFile.length(),
+ pipeTsFileInsertionEvent.getFlushPointCount(),
tCommitId,
tConsensusGroupId,
pipeTsFileInsertionEvent.getProgressIndex(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
index bc8879946aa..2989323bb07 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
@@ -143,6 +143,7 @@ public class PipeConsensusTsFileInsertionEventHandler
modFile.length(),
tsFile.getName(),
tsFile.length(),
+ event.getFlushPointCount(),
commitId,
consensusGroupId,
event.getProgressIndex(),
@@ -150,6 +151,7 @@ public class PipeConsensusTsFileInsertionEventHandler
: PipeConsensusTsFileSealReq.toTPipeConsensusTransferReq(
tsFile.getName(),
tsFile.length(),
+ event.getFlushPointCount(),
commitId,
consensusGroupId,
event.getProgressIndex(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/request/PipeConsensusTsFileSealReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/request/PipeConsensusTsFileSealReq.java
index 0caf783d695..2f8de50a638 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/request/PipeConsensusTsFileSealReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/request/PipeConsensusTsFileSealReq.java
@@ -43,6 +43,7 @@ public class PipeConsensusTsFileSealReq extends
PipeConsensusTransferFileSealReq
public static PipeConsensusTsFileSealReq toTPipeConsensusTransferReq(
String fileName,
long fileLength,
+ long pointCount,
TCommitId commitId,
TConsensusGroupId consensusGroupId,
ProgressIndex progressIndex,
@@ -51,7 +52,13 @@ public class PipeConsensusTsFileSealReq extends
PipeConsensusTransferFileSealReq
return (PipeConsensusTsFileSealReq)
new PipeConsensusTsFileSealReq()
.convertToTPipeConsensusTransferReq(
- fileName, fileLength, commitId, consensusGroupId,
progressIndex, thisDataNodeId);
+ fileName,
+ fileLength,
+ pointCount,
+ commitId,
+ consensusGroupId,
+ progressIndex,
+ thisDataNodeId);
}
public static PipeConsensusTsFileSealReq fromTPipeConsensusTransferReq(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/request/PipeConsensusTsFileSealWithModReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/request/PipeConsensusTsFileSealWithModReq.java
index 64b037174f7..e28333bf9f4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/request/PipeConsensusTsFileSealWithModReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/request/PipeConsensusTsFileSealWithModReq.java
@@ -48,6 +48,7 @@ public class PipeConsensusTsFileSealWithModReq extends
PipeConsensusTransferFile
long modFileLength,
String tsFileName,
long tsFileLength,
+ long tsFilePointCount,
TCommitId commitId,
TConsensusGroupId consensusGroupId,
ProgressIndex progressIndex,
@@ -58,6 +59,7 @@ public class PipeConsensusTsFileSealWithModReq extends
PipeConsensusTransferFile
.convertToTPipeConsensusTransferReq(
Arrays.asList(modFileName, tsFileName),
Arrays.asList(modFileLength, tsFileLength),
+ Arrays.asList(0L, tsFilePointCount),
new HashMap<>(),
commitId,
consensusGroupId,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 9d4a013b069..ae53e72b8e1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -66,6 +66,10 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
private final AtomicBoolean isClosed;
private TsFileInsertionDataContainer dataContainer;
+ // The point count of the TsFile. Used for metrics on PipeConsensus'
receiver side.
+ // May be updated after it is flushed. Should be negative if not set.
+ private long flushPointCount = TsFileProcessor.FLUSH_POINT_COUNT_NOT_SET;
+
public PipeTsFileInsertionEvent(
final TsFileResource resource, final boolean isLoaded, final boolean
isGeneratedByPipe) {
// The modFile must be copied before the event is assigned to the
listening pipes
@@ -115,6 +119,9 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
synchronized (isClosed) {
isClosed.set(true);
isClosed.notifyAll();
+
+ // Update flushPointCount after TsFile is closed
+ flushPointCount = processor.getMemTableFlushPointCount();
}
});
}
@@ -155,6 +162,13 @@ public class PipeTsFileInsertionEvent extends
EnrichedEvent implements TsFileIns
if (isClosedNow) {
isClosed.set(true);
isClosed.notifyAll();
+
+ // Update flushPointCount after TsFile is closed
+ final TsFileProcessor processor = resource.getProcessor();
+ if (processor != null) {
+ flushPointCount = processor.getMemTableFlushPointCount();
+ }
+
break;
}
}
@@ -193,6 +207,18 @@ public class PipeTsFileInsertionEvent extends
EnrichedEvent implements TsFileIns
return resource.getFileStartTime();
}
+ /**
+ * Only used for metrics on PipeConsensus' receiver side. If the event is
recovered after data
+ * node's restart, the flushPointCount can be not set. It's totally fine for
the PipeConsensus'
+ * receiver side. The receiver side will count the actual point count from
the TsFile.
+ *
+ * <p>If you want to get the actual point count with no risk, you can call
{@link
+ * #count(boolean)}.
+ */
+ public long getFlushPointCount() {
+ return flushPointCount;
+ }
+
/////////////////////////// EnrichedEvent ///////////////////////////
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionPointCounter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionPointCounter.java
index f76c220dd2b..219a8933a73 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionPointCounter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionPointCounter.java
@@ -45,7 +45,6 @@ public class TsFileInsertionPointCounter implements
AutoCloseable {
private final TsFileSequenceReader tsFileSequenceReader;
- final Map<IDeviceID, Set<String>> filteredDeviceMeasurementMap;
final Map<IDeviceID, List<TimeseriesMetadata>>
allDeviceTimeseriesMetadataMap;
private boolean shouldParsePattern = false;
@@ -58,14 +57,18 @@ public class TsFileInsertionPointCounter implements
AutoCloseable {
try {
tsFileSequenceReader = new TsFileSequenceReader(tsFile.getPath(), true,
true);
-
- filteredDeviceMeasurementMap = filterDeviceMeasurementsMapByPattern();
allDeviceTimeseriesMetadataMap =
tsFileSequenceReader.getAllTimeseriesMetadata(false);
- if (shouldParsePattern) {
- countMatchedTimeseriesPoints();
- } else {
+ if (Objects.isNull(this.pattern) || pattern.isRoot()) {
countAllTimeseriesPoints();
+ } else {
+ final Map<IDeviceID, Set<String>> filteredDeviceMeasurementMap =
+ filterDeviceMeasurementsMapByPatternAndJudgeShouldParsePattern();
+ if (shouldParsePattern) {
+ countMatchedTimeseriesPoints(filteredDeviceMeasurementMap);
+ } else {
+ countAllTimeseriesPoints();
+ }
}
// No longer need this. Help GC.
@@ -76,7 +79,9 @@ public class TsFileInsertionPointCounter implements
AutoCloseable {
}
}
- private Map<IDeviceID, Set<String>> filterDeviceMeasurementsMapByPattern()
throws IOException {
+ private Map<IDeviceID, Set<String>>
+ filterDeviceMeasurementsMapByPatternAndJudgeShouldParsePattern() throws
IOException {
+ // pattern should be non-null here
final Map<IDeviceID, List<String>> originalDeviceMeasurementsMap =
tsFileSequenceReader.getDeviceMeasurementsMap();
final Map<IDeviceID, Set<String>> filteredDeviceMeasurementsMap = new
HashMap<>();
@@ -87,7 +92,7 @@ public class TsFileInsertionPointCounter implements
AutoCloseable {
// case 1: for example, pattern is root.a.b or pattern is null and
device is root.a.b.c
// in this case, all data can be matched without checking the
measurements
- if (Objects.isNull(pattern) || pattern.isRoot() ||
pattern.coversDevice(deviceId)) {
+ if (pattern.coversDevice(deviceId)) {
if (!entry.getValue().isEmpty()) {
filteredDeviceMeasurementsMap.put(
new PlainDeviceID(deviceId), new HashSet<>(entry.getValue()));
@@ -124,7 +129,8 @@ public class TsFileInsertionPointCounter implements
AutoCloseable {
return filteredDeviceMeasurementsMap;
}
- private void countMatchedTimeseriesPoints() {
+ private void countMatchedTimeseriesPoints(
+ final Map<IDeviceID, Set<String>> filteredDeviceMeasurementMap) {
for (final Map.Entry<IDeviceID, List<TimeseriesMetadata>>
deviceTimeseriesMetadataEntry :
allDeviceTimeseriesMetadataMap.entrySet()) {
final IDeviceID deviceId = deviceTimeseriesMetadataEntry.getKey();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index 87f3da19285..1e43056261c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -48,8 +48,11 @@ import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request
import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFileSealReq;
import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFileSealWithModReq;
import org.apache.iotdb.db.pipe.consensus.PipeConsensusReceiverMetrics;
+import
org.apache.iotdb.db.pipe.event.common.tsfile.TsFileInsertionPointCounter;
+import org.apache.iotdb.db.queryengine.execution.load.LoadTsFileManager;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
@@ -419,7 +422,9 @@ public class PipeConsensusReceiver {
loadFileToDataRegion(
fileAbsolutePath,
ProgressIndexType.deserializeFrom(ByteBuffer.wrap(req.getProgressIndex())));
+ updateWritePointCountMetrics(req.getPointCount(), fileAbsolutePath);
pipeConsensusReceiverMetrics.recordTsFileSealLoadTimer(System.nanoTime()
- endPreCheckNanos);
+
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
// if transfer success, disk buffer will be released.
tsFileWriter.returnSelf();
@@ -532,11 +537,14 @@ public class PipeConsensusReceiver {
long endPreCheckNanos = System.nanoTime();
pipeConsensusReceiverMetrics.recordTsFileSealPreCheckTimer(
endPreCheckNanos - startPreCheckNanos);
+ final String tsFileAbsolutePath = fileAbsolutePaths.get(1);
final TSStatus status =
loadFileToDataRegion(
- fileAbsolutePaths.get(1),
+ tsFileAbsolutePath,
ProgressIndexType.deserializeFrom(ByteBuffer.wrap(req.getProgressIndex())));
+ updateWritePointCountMetrics(req.getPointCounts().get(1),
tsFileAbsolutePath);
pipeConsensusReceiverMetrics.recordTsFileSealLoadTimer(System.nanoTime()
- endPreCheckNanos);
+
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
// if transfer success, disk buffer will be released.
tsFileWriter.returnSelf();
@@ -623,6 +631,46 @@ public class PipeConsensusReceiver {
return RpcUtils.SUCCESS_STATUS;
}
+ private void updateWritePointCountMetrics(
+ final long writePointCountGivenByReq, final String tsFileAbsolutePath) {
+ if (writePointCountGivenByReq >= 0) {
+ updateWritePointCountMetrics(writePointCountGivenByReq);
+ return;
+ }
+
+ // If the point count in the req is not given,
+ // we will read the actual point count from the TsFile.
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "PipeConsensus-PipeName-{}: The point count of TsFile {} is not
given by sender, "
+ + "will read actual point count from TsFile.",
+ consensusPipeName,
+ tsFileAbsolutePath);
+ }
+
+ try (final TsFileInsertionPointCounter counter =
+ new TsFileInsertionPointCounter(new File(tsFileAbsolutePath), null)) {
+ updateWritePointCountMetrics(counter.count());
+ } catch (IOException e) {
+ LOGGER.warn(
+ "PipeConsensus-PipeName-{}: Failed to read TsFile when counting
points: {}.",
+ consensusPipeName,
+ tsFileAbsolutePath,
+ e);
+ }
+ }
+
+ private void updateWritePointCountMetrics(long writePointCount) {
+ final DataRegion dataRegion =
+ StorageEngine.getInstance().getDataRegion(((DataRegionId)
consensusGroupId));
+ dataRegion
+ .getNonSystemDatabaseName()
+ .ifPresent(
+ databaseName ->
+ LoadTsFileManager.updateWritePointCountMetrics(
+ dataRegion, databaseName, writePointCount));
+ }
+
private TsFileResource generateTsFileResource(String filePath, ProgressIndex
progressIndex)
throws IOException {
final File tsFile = new File(filePath);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
index c068e7696c1..0e0be8deae6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
@@ -314,6 +314,51 @@ public class LoadTsFileManager {
}
}
+ public static void updateWritePointCountMetrics(
+ final DataRegion dataRegion, final String databaseName, final long
writePointCount) {
+ MemTableFlushTask.recordFlushPointsMetricInternal(
+ writePointCount, databaseName, dataRegion.getDataRegionId());
+ MetricService.getInstance()
+ .count(
+ writePointCount,
+ Metric.QUANTITY.toString(),
+ MetricLevel.CORE,
+ Tag.NAME.toString(),
+ Metric.POINTS_IN.toString(),
+ Tag.DATABASE.toString(),
+ databaseName,
+ Tag.REGION.toString(),
+ dataRegion.getDataRegionId(),
+ Tag.TYPE.toString(),
+ Metric.LOAD_POINT_COUNT.toString());
+ // Because we cannot accurately judge who is the leader here,
+ // we directly divide the writePointCount by the replicationNum to ensure
the
+ // correctness of this metric, which will be accurate in most cases
+ final int replicationNum =
+ DataRegionConsensusImpl.getInstance()
+ .getReplicationNum(
+ ConsensusGroupId.Factory.create(
+ TConsensusGroupType.DataRegion.getValue(),
+ Integer.parseInt(dataRegion.getDataRegionId())));
+ // It may happen that the replicationNum is 0 when load and db deletion
occurs
+ // concurrently, so we can just not to count the number of points in this
case
+ if (replicationNum != 0) {
+ MetricService.getInstance()
+ .count(
+ writePointCount / replicationNum,
+ Metric.LEADER_QUANTITY.toString(),
+ MetricLevel.CORE,
+ Tag.NAME.toString(),
+ Metric.POINTS_IN.toString(),
+ Tag.DATABASE.toString(),
+ databaseName,
+ Tag.REGION.toString(),
+ dataRegion.getDataRegionId(),
+ Tag.TYPE.toString(),
+ Metric.LOAD_POINT_COUNT.toString());
+ }
+ }
+
private static class TsFileWriterManager {
private final File taskDir;
@@ -420,51 +465,9 @@ public class LoadTsFileManager {
dataRegion
.getNonSystemDatabaseName()
.ifPresent(
- databaseName -> {
- long writePointCount = getTsFileWritePointCount(writer);
- // Report load tsFile points to IoTDB flush metrics
- MemTableFlushTask.recordFlushPointsMetricInternal(
- writePointCount, databaseName,
dataRegion.getDataRegionId());
- MetricService.getInstance()
- .count(
- writePointCount,
- Metric.QUANTITY.toString(),
- MetricLevel.CORE,
- Tag.NAME.toString(),
- Metric.POINTS_IN.toString(),
- Tag.DATABASE.toString(),
- databaseName,
- Tag.REGION.toString(),
- dataRegion.getDataRegionId(),
- Tag.TYPE.toString(),
- Metric.LOAD_POINT_COUNT.toString());
- // Because we cannot accurately judge who is the leader here,
- // we directly divide the writePointCount by the
replicationNum to ensure the
- // correctness of this metric, which will be accurate in
most cases
- int replicationNum =
- DataRegionConsensusImpl.getInstance()
- .getReplicationNum(
- ConsensusGroupId.Factory.create(
- TConsensusGroupType.DataRegion.getValue(),
-
Integer.parseInt(dataRegion.getDataRegionId())));
- // It may happen that the replicationNum is 0 when load and
db deletion occurs
- // concurrently, so we can just not to count the number of
points in this case
- if (replicationNum != 0) {
- MetricService.getInstance()
- .count(
- writePointCount / replicationNum,
- Metric.LEADER_QUANTITY.toString(),
- MetricLevel.CORE,
- Tag.NAME.toString(),
- Metric.POINTS_IN.toString(),
- Tag.DATABASE.toString(),
- databaseName,
- Tag.REGION.toString(),
- dataRegion.getDataRegionId(),
- Tag.TYPE.toString(),
- Metric.LOAD_POINT_COUNT.toString());
- }
- });
+ databaseName ->
+ updateWritePointCountMetrics(
+ dataRegion, databaseName,
getTsFileWritePointCount(writer)));
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 2d8f84574ae..02fdf59563c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -169,6 +169,11 @@ public class TsFileProcessor {
/** This callback is called before the workMemtable is added into the
flushingMemTables. */
private final DataRegion.UpdateEndTimeCallBack updateLatestFlushTimeCallback;
+ public static final long FLUSH_POINT_COUNT_NOT_SET = -1;
+
+ /** Point count when the memtable is flushed. Used for metrics on
PipeConsensus' receiver side. */
+ private long memTableFlushPointCount = FLUSH_POINT_COUNT_NOT_SET;
+
/** Wal node. */
private final IWALNode walNode;
@@ -1360,6 +1365,7 @@ public class TsFileProcessor {
storageGroupName,
dataRegionInfo.getDataRegion().getDataRegionId());
flushTask.syncFlushMemTable();
+ memTableFlushPointCount = memTableToFlush.getTotalPointsNum();
} catch (Throwable e) {
if (writer == null) {
logger.info(
@@ -2099,6 +2105,10 @@ public class TsFileProcessor {
return workMemTable != null ? workMemTable.getUpdateTime() :
Long.MAX_VALUE;
}
+ public long getMemTableFlushPointCount() {
+ return memTableFlushPointCount;
+ }
+
public boolean isSequence() {
return sequence;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/pipeconsensus/request/PipeConsensusTransferFileSealReq.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/pipeconsensus/request/PipeConsensusTransferFileSealReq.java
index ab7f65db2a7..dff42275de3 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/pipeconsensus/request/PipeConsensusTransferFileSealReq.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/pipeconsensus/request/PipeConsensusTransferFileSealReq.java
@@ -37,6 +37,7 @@ public abstract class PipeConsensusTransferFileSealReq
private transient String fileName;
private transient long fileLength;
+ private transient long pointCount;
public final String getFileName() {
return fileName;
@@ -46,6 +47,10 @@ public abstract class PipeConsensusTransferFileSealReq
return fileLength;
}
+ public final long getPointCount() {
+ return pointCount;
+ }
+
protected abstract PipeConsensusRequestType getPlanType();
/////////////////////////////// Thrift ///////////////////////////////
@@ -53,6 +58,7 @@ public abstract class PipeConsensusTransferFileSealReq
protected PipeConsensusTransferFileSealReq
convertToTPipeConsensusTransferReq(
String fileName,
long fileLength,
+ long pointCount,
TCommitId commitId,
TConsensusGroupId consensusGroupId,
ProgressIndex progressIndex,
@@ -61,6 +67,7 @@ public abstract class PipeConsensusTransferFileSealReq
this.fileName = fileName;
this.fileLength = fileLength;
+ this.pointCount = pointCount;
this.commitId = commitId;
this.consensusGroupId = consensusGroupId;
@@ -71,6 +78,7 @@ public abstract class PipeConsensusTransferFileSealReq
final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
ReadWriteIOUtils.write(fileName, outputStream);
ReadWriteIOUtils.write(fileLength, outputStream);
+ ReadWriteIOUtils.write(pointCount, outputStream);
this.body = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
}
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
@@ -88,6 +96,7 @@ public abstract class PipeConsensusTransferFileSealReq
fileName = ReadWriteIOUtils.readString(req.body);
fileLength = ReadWriteIOUtils.readLong(req.body);
+ pointCount = ReadWriteIOUtils.readLong(req.body);
version = req.version;
type = req.type;
@@ -113,6 +122,7 @@ public abstract class PipeConsensusTransferFileSealReq
PipeConsensusTransferFileSealReq that = (PipeConsensusTransferFileSealReq)
obj;
return fileName.equals(that.fileName)
&& fileLength == that.fileLength
+ && pointCount == that.pointCount
&& version == that.version
&& type == that.type
&& body.equals(that.body)
@@ -127,6 +137,7 @@ public abstract class PipeConsensusTransferFileSealReq
return Objects.hash(
fileName,
fileLength,
+ pointCount,
version,
type,
body,
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/pipeconsensus/request/PipeConsensusTransferFileSealWithModReq.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/pipeconsensus/request/PipeConsensusTransferFileSealWithModReq.java
index 0d7138a46f8..e1572ec9fb5 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/pipeconsensus/request/PipeConsensusTransferFileSealWithModReq.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/pipeconsensus/request/PipeConsensusTransferFileSealWithModReq.java
@@ -40,6 +40,7 @@ public abstract class PipeConsensusTransferFileSealWithModReq
extends TPipeConse
private transient List<String> fileNames;
private transient List<Long> fileLengths;
+ private transient List<Long> pointCounts;
private transient Map<String, String> parameters;
public final List<String> getFileNames() {
@@ -50,6 +51,10 @@ public abstract class
PipeConsensusTransferFileSealWithModReq extends TPipeConse
return fileLengths;
}
+ public final List<Long> getPointCounts() {
+ return pointCounts;
+ }
+
public final Map<String, String> getParameters() {
return parameters;
}
@@ -61,6 +66,7 @@ public abstract class PipeConsensusTransferFileSealWithModReq
extends TPipeConse
protected PipeConsensusTransferFileSealWithModReq
convertToTPipeConsensusTransferReq(
List<String> fileNames,
List<Long> fileLengths,
+ List<Long> pointCounts,
Map<String, String> parameters,
TCommitId commitId,
TConsensusGroupId consensusGroupId,
@@ -70,6 +76,7 @@ public abstract class PipeConsensusTransferFileSealWithModReq
extends TPipeConse
this.fileNames = fileNames;
this.fileLengths = fileLengths;
+ this.pointCounts = pointCounts;
this.parameters = parameters;
this.commitId = commitId;
@@ -87,6 +94,10 @@ public abstract class
PipeConsensusTransferFileSealWithModReq extends TPipeConse
for (Long fileLength : fileLengths) {
ReadWriteIOUtils.write(fileLength, outputStream);
}
+ ReadWriteIOUtils.write(pointCounts.size(), outputStream);
+ for (Long pointCount : pointCounts) {
+ ReadWriteIOUtils.write(pointCount, outputStream);
+ }
ReadWriteIOUtils.write(parameters.size(), outputStream);
for (final Map.Entry<String, String> entry : parameters.entrySet()) {
ReadWriteIOUtils.write(entry.getKey(), outputStream);
@@ -118,6 +129,12 @@ public abstract class
PipeConsensusTransferFileSealWithModReq extends TPipeConse
fileLengths.add(ReadWriteIOUtils.readLong(req.body));
}
+ pointCounts = new ArrayList<>();
+ size = ReadWriteIOUtils.readInt(req.body);
+ for (int i = 0; i < size; ++i) {
+ pointCounts.add(ReadWriteIOUtils.readLong(req.body));
+ }
+
parameters = new HashMap<>();
size = ReadWriteIOUtils.readInt(req.body);
for (int i = 0; i < size; ++i) {
@@ -150,6 +167,7 @@ public abstract class
PipeConsensusTransferFileSealWithModReq extends TPipeConse
PipeConsensusTransferFileSealWithModReq that =
(PipeConsensusTransferFileSealWithModReq) obj;
return Objects.equals(fileNames, that.fileNames)
&& Objects.equals(fileLengths, that.fileLengths)
+ && Objects.equals(pointCounts, that.pointCounts)
&& Objects.equals(parameters, that.parameters)
&& Objects.equals(version, that.version)
&& Objects.equals(type, that.type)
@@ -165,6 +183,7 @@ public abstract class
PipeConsensusTransferFileSealWithModReq extends TPipeConse
return Objects.hash(
fileNames,
fileLengths,
+ pointCounts,
parameters,
version,
type,