This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d90088336d3 Read of .meta file is not included in the compaction io
metrics (#14772)
d90088336d3 is described below
commit d90088336d33f0e4cff4bb964eef9974c1333b10
Author: shuwenwei <[email protected]>
AuthorDate: Sat Feb 8 10:35:09 2025 +0800
Read of .meta file is not included in the compaction io metrics (#14772)
* Read of .meta file is not included in the compaction io metrics
* spotless
---
.../compaction/io/CompactionDiskTSMIterator.java | 43 ++++++++
.../compaction/io/CompactionTsFileInput.java | 122 ++++++++++++++++++++-
.../compaction/io/CompactionTsFileReader.java | 60 +---------
.../compaction/io/CompactionTsFileWriter.java | 9 ++
4 files changed, 178 insertions(+), 56 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionDiskTSMIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionDiskTSMIterator.java
new file mode 100644
index 00000000000..b90769dba37
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionDiskTSMIterator.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.compaction.io;
+
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType;
+
+import org.apache.tsfile.file.metadata.ChunkGroupMetadata;
+import org.apache.tsfile.write.writer.tsmiterator.DiskTSMIterator;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+public class CompactionDiskTSMIterator extends DiskTSMIterator {
+
+ public CompactionDiskTSMIterator(
+ CompactionType compactionType,
+ File cmtFile,
+ List<ChunkGroupMetadata> chunkGroupMetadataList,
+ LinkedList<Long> endPosForEachDevice)
+ throws IOException {
+ super(cmtFile, chunkGroupMetadataList, endPosForEachDevice);
+ this.input = new CompactionTsFileInput(compactionType, this.input);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileInput.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileInput.java
index b13475e4fe6..e51f26da14c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileInput.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileInput.java
@@ -19,6 +19,11 @@
package org.apache.iotdb.db.storageengine.dataregion.compaction.io;
+import org.apache.iotdb.db.service.metrics.CompactionMetrics;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionIoDataType;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType;
+
import org.apache.tsfile.exception.StopReadTsFileByInterruptException;
import org.apache.tsfile.read.reader.TsFileInput;
@@ -29,10 +34,33 @@ import java.nio.ByteBuffer;
public class CompactionTsFileInput implements TsFileInput {
private final TsFileInput tsFileInput;
- public CompactionTsFileInput(TsFileInput tsFileInput) {
+ private long metadataOffset = -1;
+
+ /** The type of compaction running. */
+ private final CompactionType compactionType;
+
+ /** A flag that indicates if an aligned series is being read. */
+ private volatile boolean readingAlignedSeries = false;
+
+ public CompactionTsFileInput(CompactionType compactionType, TsFileInput
tsFileInput) {
+ this.compactionType = compactionType;
this.tsFileInput = tsFileInput;
}
+ public void setMetadataOffset(long metadataOffset) {
+ this.metadataOffset = metadataOffset;
+ }
+
+ /** Marks the start of reading an aligned series. */
+ public void markStartOfAlignedSeries() {
+ readingAlignedSeries = true;
+ }
+
+ /** Marks the end of reading an aligned series. */
+ public void markEndOfAlignedSeries() {
+ readingAlignedSeries = false;
+ }
+
@Override
public long size() throws IOException {
try {
@@ -71,7 +99,9 @@ public class CompactionTsFileInput implements TsFileInput {
@Override
public int read(ByteBuffer dst) throws IOException {
+ acquireReadDataSizeWithCompactionReadRateLimiter(dst.remaining());
int readSize = tsFileInput.read(dst);
+ updateMetrics(position(), readSize);
if (Thread.currentThread().isInterrupted()) {
throw new StopReadTsFileByInterruptException();
}
@@ -80,7 +110,9 @@ public class CompactionTsFileInput implements TsFileInput {
@Override
public int read(ByteBuffer dst, long position) throws IOException {
+ acquireReadDataSizeWithCompactionReadRateLimiter(dst.remaining());
int readSize = tsFileInput.read(dst, position);
+ updateMetrics(position, readSize);
if (Thread.currentThread().isInterrupted()) {
throw new StopReadTsFileByInterruptException();
}
@@ -89,7 +121,7 @@ public class CompactionTsFileInput implements TsFileInput {
@Override
public InputStream wrapAsInputStream() throws IOException {
- return tsFileInput.wrapAsInputStream();
+ return new
CompactionTsFileInputStreamWrapper(tsFileInput.wrapAsInputStream());
}
@Override
@@ -101,4 +133,90 @@ public class CompactionTsFileInput implements TsFileInput {
public String getFilePath() {
return tsFileInput.getFilePath();
}
+
+ private void acquireReadDataSizeWithCompactionReadRateLimiter(int
readDataSize) {
+
CompactionTaskManager.getInstance().getCompactionReadOperationRateLimiter().acquire(1);
+
CompactionTaskManager.getInstance().getCompactionReadRateLimiter().acquire(readDataSize);
+ }
+
+ private void updateMetrics(long position, long totalSize) {
+ if (position >= metadataOffset) {
+ CompactionMetrics.getInstance()
+ .recordReadInfo(compactionType, CompactionIoDataType.METADATA,
totalSize);
+ } else {
+ CompactionMetrics.getInstance()
+ .recordReadInfo(
+ compactionType,
+ readingAlignedSeries
+ ? CompactionIoDataType.ALIGNED
+ : CompactionIoDataType.NOT_ALIGNED,
+ totalSize);
+ }
+ }
+
+ private class CompactionTsFileInputStreamWrapper extends InputStream {
+
+ private final InputStream inputStream;
+
+ public CompactionTsFileInputStreamWrapper(InputStream inputStream) {
+ this.inputStream = inputStream;
+ }
+
+ @Override
+ public int read() throws IOException {
+ acquireReadDataSizeWithCompactionReadRateLimiter(1);
+ long position = position();
+ int readSize = inputStream.read();
+ updateMetrics(position, readSize);
+ return readSize;
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ acquireReadDataSizeWithCompactionReadRateLimiter(b.length);
+ long position = position();
+ int readSize = inputStream.read(b);
+ updateMetrics(position, readSize);
+ return readSize;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ acquireReadDataSizeWithCompactionReadRateLimiter(len);
+ long position = position();
+ int readSize = inputStream.read(b, off, len);
+ updateMetrics(position, readSize);
+ return readSize;
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ return inputStream.skip(n);
+ }
+
+ @Override
+ public int available() throws IOException {
+ return inputStream.available();
+ }
+
+ @Override
+ public void close() throws IOException {
+ inputStream.close();
+ }
+
+ @Override
+ public synchronized void mark(int readlimit) {
+ inputStream.mark(readlimit);
+ }
+
+ @Override
+ public synchronized void reset() throws IOException {
+ inputStream.reset();
+ }
+
+ @Override
+ public boolean markSupported() {
+ return inputStream.markSupported();
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
index bf98611db2c..7aabb085643 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
@@ -19,29 +19,22 @@
package org.apache.iotdb.db.storageengine.dataregion.compaction.io;
-import org.apache.iotdb.db.service.metrics.CompactionMetrics;
-import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
-import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionIoDataType;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType;
import org.apache.tsfile.file.IMetadataIndexEntry;
import org.apache.tsfile.file.header.ChunkHeader;
-import org.apache.tsfile.file.metadata.ChunkMetadata;
import org.apache.tsfile.file.metadata.MetadataIndexNode;
import org.apache.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.tsfile.file.metadata.enums.MetadataIndexNodeType;
import org.apache.tsfile.read.TsFileSequenceReader;
-import org.apache.tsfile.read.common.Chunk;
import org.apache.tsfile.utils.Pair;
import java.io.IOException;
-import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.function.LongConsumer;
/**
* This class extends the TsFileSequenceReader class to read and manage TsFile
with a focus on
@@ -50,14 +43,6 @@ import java.util.function.LongConsumer;
*/
public class CompactionTsFileReader extends TsFileSequenceReader {
- private long metadataOffset = 0;
-
- /** The type of compaction running. */
- CompactionType compactionType;
-
- /** A flag that indicates if an aligned series is being read. */
- private volatile boolean readingAlignedSeries = false;
-
/**
* Constructs a new instance of CompactionTsFileReader.
*
@@ -67,44 +52,20 @@ public class CompactionTsFileReader extends
TsFileSequenceReader {
*/
public CompactionTsFileReader(String file, CompactionType compactionType)
throws IOException {
super(file);
- this.tsFileInput = new CompactionTsFileInput(tsFileInput);
- this.compactionType = compactionType;
- this.metadataOffset = readFileMetadata().getMetaOffset();
- }
-
- @Override
- protected ByteBuffer readData(long position, int totalSize, LongConsumer
ioSizeRecorder)
- throws IOException {
- acquireReadDataSizeWithCompactionReadRateLimiter(totalSize);
- ByteBuffer buffer = super.readData(position, totalSize, ioSizeRecorder);
- if (position >= metadataOffset) {
- CompactionMetrics.getInstance()
- .recordReadInfo(compactionType, CompactionIoDataType.METADATA,
totalSize);
- } else {
- CompactionMetrics.getInstance()
- .recordReadInfo(
- compactionType,
- readingAlignedSeries
- ? CompactionIoDataType.ALIGNED
- : CompactionIoDataType.NOT_ALIGNED,
- totalSize);
- }
- return buffer;
+ CompactionTsFileInput compactionTsFileInput =
+ new CompactionTsFileInput(compactionType, tsFileInput);
+ this.tsFileInput = compactionTsFileInput;
+
compactionTsFileInput.setMetadataOffset(readFileMetadata().getMetaOffset());
}
/** Marks the start of reading an aligned series. */
public void markStartOfAlignedSeries() {
- readingAlignedSeries = true;
+ ((CompactionTsFileInput) tsFileInput).markStartOfAlignedSeries();
}
/** Marks the end of reading an aligned series. */
public void markEndOfAlignedSeries() {
- readingAlignedSeries = false;
- }
-
- @Override
- public Chunk readMemChunk(ChunkMetadata metaData) throws IOException {
- return super.readMemChunk(metaData);
+ ((CompactionTsFileInput) tsFileInput).markEndOfAlignedSeries();
}
@SuppressWarnings("java:S2177")
@@ -112,10 +73,6 @@ public class CompactionTsFileReader extends
TsFileSequenceReader {
return ChunkHeader.deserializeFrom(tsFileInput, position);
}
- public InputStream wrapAsInputStream() throws IOException {
- return this.tsFileInput.wrapAsInputStream();
- }
-
public Map<String, Pair<TimeseriesMetadata, Pair<Long, Long>>>
getTimeseriesMetadataAndOffsetByDevice(
MetadataIndexNode measurementNode,
@@ -186,11 +143,6 @@ public class CompactionTsFileReader extends
TsFileSequenceReader {
return timeseriesMetadataOffsetMap;
}
- private void acquireReadDataSizeWithCompactionReadRateLimiter(int
readDataSize) {
-
CompactionTaskManager.getInstance().getCompactionReadOperationRateLimiter().acquire(1);
-
CompactionTaskManager.getInstance().getCompactionReadRateLimiter().acquire(readDataSize);
- }
-
@Override
public boolean equals(Object o) {
return super.equals(o);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java
index 7d89747d2f6..fcea93d6c2e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java
@@ -36,6 +36,7 @@ import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
import org.apache.tsfile.write.chunk.IChunkWriter;
import org.apache.tsfile.write.record.Tablet.ColumnCategory;
import org.apache.tsfile.write.writer.TsFileIOWriter;
+import org.apache.tsfile.write.writer.tsmiterator.TSMIterator;
import java.io.File;
import java.io.IOException;
@@ -150,6 +151,14 @@ public class CompactionTsFileWriter extends TsFileIOWriter
{
.recordWriteInfo(type, CompactionIoDataType.METADATA, writtenDataSize);
}
+ @Override
+ protected TSMIterator getTSMIterator() throws IOException {
+ return hasChunkMetadataInDisk
+ ? new CompactionDiskTSMIterator(
+ type, chunkMetadataTempFile, chunkGroupMetadataList,
endPosInCMTForDevice)
+ : TSMIterator.getTSMIteratorInMemory(chunkGroupMetadataList);
+ }
+
public boolean isEmptyTargetFile() {
return isEmptyTargetFile;
}