This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch replaceRegionId in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b8866dc8e8c5c5dfa368c3fefd50cd7044f563fe Author: shuwenwei <[email protected]> AuthorDate: Tue Dec 9 17:19:11 2025 +0800 replace region id for object binary --- .../compaction/execute/utils/CompactionUtils.java | 15 +++++-- .../execute/utils/MultiTsFileDeviceIterator.java | 3 +- .../fast/FastAlignedSeriesCompactionExecutor.java | 3 +- .../fast/reader/CompactionAlignedChunkReader.java | 8 ++-- .../read/reader/chunk/DiskAlignedChunkLoader.java | 16 +++++++- .../org/apache/iotdb/db/utils/ObjectTypeUtils.java | 46 ++++++++++++++++++++++ 6 files changed, 80 insertions(+), 11 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java index 0262eb36170..bac48729167 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java @@ -533,7 +533,8 @@ public class CompactionUtils { TsFileSequenceReader reader, List<AbstractAlignedChunkMetadata> alignedChunkMetadataList, List<ModEntry> timeMods, - List<List<ModEntry>> valueMods) + List<List<ModEntry>> valueMods, + int currentRegionId) throws IOException { if (alignedChunkMetadataList.isEmpty()) { return; @@ -578,7 +579,8 @@ public class CompactionUtils { objectColumnIndexList, timeDeletionIntervalList, objectDeletionIntervalList, - deletionCursors); + deletionCursors, + currentRegionId); } } @@ -589,7 +591,8 @@ public class CompactionUtils { List<Integer> objectColumnIndexList, List<ModEntry> timeDeletions, List<List<ModEntry>> objectDeletions, - int[] deletionCursors) + int[] deletionCursors, + int currentRegionId) throws IOException { Chunk timeChunk = reader.readMemChunk((ChunkMetadata) alignedChunkMetadata.getTimeChunkMetadata()); @@ -612,6 +615,12 @@ public class CompactionUtils { continue; } Chunk chunk = reader.readMemChunk(valueChunkMetadata); + if (chunk != null) { + chunk + .getHeader() + .setReplaceDecoder( + decoder -> ObjectTypeUtils.getReplaceDecoder(decoder, currentRegionId)); + } valueChunks.add(chunk); valuePages.add( chunk == null diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java index 75900689ca2..c5843d112bf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java @@ -488,7 +488,8 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { readerMap.get(tsFileResource), alignedChunkMetadataList, Collections.singletonList(ttlDeletion), - modificationForValueColumns.stream().map(v -> emptyList).collect(Collectors.toList())); + modificationForValueColumns.stream().map(v -> emptyList).collect(Collectors.toList()), + tsFileResource.getTsFileID().regionId); } ModificationUtils.modifyAlignedChunkMetaData( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastAlignedSeriesCompactionExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastAlignedSeriesCompactionExecutor.java index 166417a97ea..7a8e884a491 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastAlignedSeriesCompactionExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastAlignedSeriesCompactionExecutor.java @@ -284,7 +284,8 @@ public class FastAlignedSeriesCompactionExecutor extends SeriesCompactionExecuto readerCacheMap.get(resource), alignedChunkMetadataList, Collections.singletonList(ttlDeletion), - valueModifications.stream().map(v -> emptyList).collect(Collectors.toList())); + valueModifications.stream().map(v -> emptyList).collect(Collectors.toList()), + resource.getTsFileID().regionId); } // modify aligned chunk metadatas diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/reader/CompactionAlignedChunkReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/reader/CompactionAlignedChunkReader.java index a94150deca1..3cbc0c2fbae 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/reader/CompactionAlignedChunkReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/reader/CompactionAlignedChunkReader.java @@ -92,7 +92,7 @@ public class CompactionAlignedChunkReader { ByteBuffer compressedTimePageData, List<ByteBuffer> compressedValuePageDatas) throws IOException { - return getPontReader( + return getPointReader( timePageHeader, valuePageHeaders, compressedTimePageData, @@ -106,11 +106,11 @@ public class CompactionAlignedChunkReader { ByteBuffer compressedTimePageData, List<ByteBuffer> compressedValuePageDatas) throws IOException { - return getPontReader( + return getPointReader( timePageHeader, valuePageHeaders, compressedTimePageData, compressedValuePageDatas, false); } - private IPointReader getPontReader( + private IPointReader getPointReader( PageHeader timePageHeader, List<PageHeader> valuePageHeaders, ByteBuffer compressedTimePageData, @@ -146,7 +146,7 @@ public class CompactionAlignedChunkReader { valuePageHeaders.get(i), uncompressedPageData, valueType, - Decoder.getDecoderByType(valueChunkHeader.getEncodingType(), valueType)); + valueChunkHeader.calculateDecoderForNonTimeChunk()); valuePageReader.setDeleteIntervalList(valueDeleteIntervalList.get(i)); valuePageReaders.add(valuePageReader); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/DiskAlignedChunkLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/DiskAlignedChunkLoader.java index a7c6eb96d42..27883b34e9c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/DiskAlignedChunkLoader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/DiskAlignedChunkLoader.java @@ -24,7 +24,9 @@ import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet; import org.apache.iotdb.db.storageengine.buffer.ChunkCache; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.utils.ObjectTypeUtils; +import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.AbstractAlignedChunkMetadata; import org.apache.tsfile.file.metadata.ChunkMetadata; import org.apache.tsfile.file.metadata.IChunkMetadata; @@ -92,7 +94,7 @@ public class DiskAlignedChunkLoader implements IChunkLoader { context); List<Chunk> valueChunkList = new ArrayList<>(); for (IChunkMetadata valueChunkMetadata : alignedChunkMetadata.getValueChunkMetadataList()) { - valueChunkList.add( + Chunk chunk = valueChunkMetadata == null ? null : ChunkCache.getInstance() @@ -104,7 +106,17 @@ public class DiskAlignedChunkLoader implements IChunkLoader { resource.isClosed()), valueChunkMetadata.getDeleteIntervalList(), valueChunkMetadata.getStatistics(), - context)); + context); + final TsFileID tsFileID = getTsFileID(); + if (chunk != null + && tsFileID.regionId > 0 + && chunkMetaData.getDataType() == TSDataType.OBJECT) { + chunk + .getHeader() + .setReplaceDecoder( + decoder -> ObjectTypeUtils.getReplaceDecoder(decoder, tsFileID.regionId)); + } + valueChunkList.add(chunk); } long t2 = System.nanoTime(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java index 44ba3235911..3bef2f4d04f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java @@ -35,7 +35,10 @@ import org.apache.iotdb.mpp.rpc.thrift.TReadObjectReq; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.tsfile.common.conf.TSFileConfig; +import org.apache.tsfile.encoding.decoder.Decoder; +import org.apache.tsfile.encoding.decoder.DecoderWrapper; import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.BytesUtils; import org.apache.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,7 +47,9 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; import java.util.Collections; @@ -147,6 +152,47 @@ public class ObjectTypeUtils { return buffer; } + public static Binary generateObjectBinary(long objectSize, String relativePath) { + byte[] filePathBytes = relativePath.getBytes(StandardCharsets.UTF_8); + byte[] valueBytes = new byte[filePathBytes.length + Long.BYTES]; + System.arraycopy(BytesUtils.longToBytes(objectSize), 0, valueBytes, 0, Long.BYTES); + System.arraycopy(filePathBytes, 0, valueBytes, Long.BYTES, filePathBytes.length); + return new Binary(valueBytes); + } + + public static DecoderWrapper getReplaceDecoder(final Decoder decoder, final int newRegionId) { + return new ObjectRegionIdReplaceDecoder(decoder, newRegionId); + } + + private static class ObjectRegionIdReplaceDecoder extends DecoderWrapper { + + private final int newRegionId; + + public ObjectRegionIdReplaceDecoder(Decoder decoder, int newRegionId) { + super(decoder); + this.newRegionId = newRegionId; + } + + @Override + public Binary readBinary(ByteBuffer buffer) { + Binary originValue = originDecoder.readBinary(buffer); + Pair<Long, String> pair = ObjectTypeUtils.parseObjectBinary(originValue); + try { + Path path = Paths.get(pair.getRight()); + int regionId = Integer.parseInt(path.getName(0).toString()); + if (regionId == newRegionId) { + return originValue; + } + String newPath = pair.getRight().replaceFirst(regionId + "", newRegionId + ""); + return ObjectTypeUtils.generateObjectBinary(pair.getLeft(), newPath); + } catch (NumberFormatException e) { + throw new IoTDBRuntimeException( + "wrong object file path: " + pair.getRight(), + TSStatusCode.OBJECT_READ_ERROR.getStatusCode()); + } + } + } + public static int getActualReadSize(String filePath, long fileSize, long offset, long length) { if (offset >= fileSize) { throw new SemanticException(
