This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch mods in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f93f4a5b0fc34f69950ce1b0a8688347491010ad Author: Caideyipi <[email protected]> AuthorDate: Sat May 9 15:32:01 2026 +0800 fix --- .../scan/TsFileInsertionEventScanParser.java | 150 +++++++++++++++------ 1 file changed, 110 insertions(+), 40 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java index 32823459fcf..ee5cd750c25 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java @@ -83,6 +83,7 @@ public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser { private final long startTime; private final long endTime; private final Filter filter; + private boolean hasModifications; private IChunkReader chunkReader; private BatchData data; @@ -94,11 +95,16 @@ public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser { private boolean currentIsAligned; private final List<IMeasurementSchema> currentMeasurements = new ArrayList<>(); private final List<ModsOperationUtil.ModsInfo> modsInfos = new ArrayList<>(); + private final List<Boolean> currentMeasurementHasMods = new ArrayList<>(); + private boolean currentChunkHasMods; // Cached time chunk private final List<Chunk> timeChunkList = new ArrayList<>(); private final List<Boolean> isMultiPageList = new ArrayList<>(); private final Map<String, Integer> measurementIndexMap = new HashMap<>(); + private final Map<String, Boolean> measurementHasOverlappedModsCache = new HashMap<>(); + private final Map<String, Map<Long, Statistics>> alignedChunkStatisticsCache = new HashMap<>(); + private final Map<String, Map<Long, Statistics>> nonAlignedChunkStatisticsCache = new HashMap<>(); private int lastIndex = -1; private ChunkHeader firstChunkHeader4NextSequentialValueChunks; @@ -148,6 +154,7 @@ public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser { isWithMod ? ModsOperationUtil.loadModificationsFromTsFile(tsFile) : PatternTreeMapFactory.getModsPatternTreeMap(); + hasModifications = !currentModifications.isEmpty(); allocatedMemoryBlockForModifications = PipeDataNodeResourceManager.memory() .forceAllocateForTabletWithRetry(currentModifications.ramBytesUsed()); @@ -383,18 +390,25 @@ public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser { private boolean putValueToColumns(final BatchData data, final Tablet tablet, final int rowIndex) { boolean isNeedFillTime = false; + final long currentTime = data.currentTime(); if (data.getDataType() == TSDataType.VECTOR) { - for (int i = 0; i < tablet.getSchemas().size(); ++i) { - final TsPrimitiveType primitiveType = data.getVector()[i]; + final TsPrimitiveType[] vector = data.getVector(); + final List<IMeasurementSchema> schemas = tablet.getSchemas(); + final BitMap[] bitMaps = tablet.getBitMaps(); + for (int i = 0, size = schemas.size(); i < size; ++i) { + final TSDataType columnType = schemas.get(i).getType(); + final TsPrimitiveType primitiveType = vector[i]; if (Objects.isNull(primitiveType) - || ModsOperationUtil.isDelete(data.currentTime(), modsInfos.get(i))) { - switch (tablet.getSchemas().get(i).getType()) { + || (currentChunkHasMods + && currentMeasurementHasMods.get(i) + && ModsOperationUtil.isDelete(currentTime, modsInfos.get(i)))) { + switch (columnType) { case TEXT: case BLOB: case STRING: tablet.addValue(rowIndex, i, Binary.EMPTY_VALUE.getValues()); } - tablet.getBitMaps()[i].mark(rowIndex); + bitMaps[i].mark(rowIndex); continue; } @@ -429,8 +443,9 @@ public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser { } } } else { - if (!modsInfos.isEmpty() - && ModsOperationUtil.isDelete(data.currentTime(), modsInfos.get(0))) { + if (currentChunkHasMods + && currentMeasurementHasMods.get(0) + && ModsOperationUtil.isDelete(currentTime, modsInfos.get(0))) { return false; } @@ -474,6 +489,8 @@ public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser { final List<Chunk> valueChunkList = new ArrayList<>(); currentMeasurements.clear(); modsInfos.clear(); + currentMeasurementHasMods.clear(); + currentChunkHasMods = false; if (lastMarker == MetaMarker.SEPARATOR) { chunkReader = null; @@ -520,11 +537,7 @@ public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser { chunkHeader.getDataType(), chunkHeader.getEncodingType(), chunkHeader.getCompressionType())); - modsInfos.addAll( - ModsOperationUtil.initializeMeasurementMods( - currentDevice, - Collections.singletonList(chunkHeader.getMeasurementID()), - currentModifications)); + initializeCurrentMeasurementMods(chunkHeader.getMeasurementID()); return; } case MetaMarker.VALUE_CHUNK_HEADER: @@ -591,11 +604,7 @@ public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser { chunkHeader.getDataType(), chunkHeader.getEncodingType(), chunkHeader.getCompressionType())); - modsInfos.addAll( - ModsOperationUtil.initializeMeasurementMods( - currentDevice, - Collections.singletonList(chunkHeader.getMeasurementID()), - currentModifications)); + initializeCurrentMeasurementMods(chunkHeader.getMeasurementID()); break; } case MetaMarker.CHUNK_GROUP_HEADER: @@ -609,6 +618,7 @@ public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser { timeChunkList.clear(); isMultiPageList.clear(); measurementIndexMap.clear(); + clearCurrentChunkGroupCaches(); final IDeviceID deviceID = tsFileSequenceReader.readChunkGroupHeader().getDeviceID(); currentDevice = treePattern.mayOverlapWithDevice(deviceID) ? deviceID : null; break; @@ -658,17 +668,11 @@ public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser { } // Skip the chunk if it is fully deleted by mods - if (!currentModifications.isEmpty()) { + if (hasModifications + && measurementHasOverlappedMods(currentDevice, chunkHeader.getMeasurementID())) { final Statistics statistics = - isAlignedValueChunk - ? findAlignedChunkStatistics( - tsFileSequenceReader.getIChunkMetadataList( - currentDevice, chunkHeader.getMeasurementID()), - currentChunkHeaderOffset) - : findNonAlignedChunkStatistics( - tsFileSequenceReader.getIChunkMetadataList( - currentDevice, chunkHeader.getMeasurementID()), - currentChunkHeaderOffset); + getChunkStatistics( + chunkHeader.getMeasurementID(), isAlignedValueChunk, currentChunkHeaderOffset); if (statistics != null && ModsOperationUtil.isAllDeletedByMods( currentDevice, @@ -742,33 +746,99 @@ public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser { if (allocatedMemoryBlockForChunk != null) { allocatedMemoryBlockForChunk.close(); } + + clearCurrentChunkGroupCaches(); + } + + private void initializeCurrentMeasurementMods(final String measurementID) { + if (!hasModifications) { + return; + } + + if (!measurementHasOverlappedMods(currentDevice, measurementID)) { + modsInfos.add(null); + currentMeasurementHasMods.add(false); + return; + } + + final ModsOperationUtil.ModsInfo modsInfo = + ModsOperationUtil.initializeMeasurementMods( + currentDevice, Collections.singletonList(measurementID), currentModifications) + .get(0); + final boolean measurementHasMods = !modsInfo.getMods().isEmpty(); + modsInfos.add(modsInfo); + currentMeasurementHasMods.add(measurementHasMods); + currentChunkHasMods |= measurementHasMods; + } + + private boolean measurementHasOverlappedMods( + final IDeviceID deviceID, final String measurementID) { + if (!hasModifications) { + return false; + } + + return measurementHasOverlappedModsCache.computeIfAbsent( + measurementID, + ignored -> { + final List<?> mods = currentModifications.getOverlapped(deviceID, measurementID); + return Objects.nonNull(mods) && !mods.isEmpty(); + }); + } + + private Statistics getChunkStatistics( + final String measurementID, + final boolean isAlignedValueChunk, + final long currentChunkHeaderOffset) + throws IOException { + final Map<String, Map<Long, Statistics>> statisticsCache = + isAlignedValueChunk ? alignedChunkStatisticsCache : nonAlignedChunkStatisticsCache; + Map<Long, Statistics> statisticsByOffset = statisticsCache.get(measurementID); + if (Objects.isNull(statisticsByOffset)) { + statisticsByOffset = + isAlignedValueChunk + ? buildAlignedChunkStatisticsCache(measurementID) + : buildNonAlignedChunkStatisticsCache(measurementID); + statisticsCache.put(measurementID, statisticsByOffset); + } + return statisticsByOffset.get(currentChunkHeaderOffset); } - private Statistics findAlignedChunkStatistics( - List<IChunkMetadata> metadataList, long currentChunkHeaderOffset) { + private Map<Long, Statistics> buildAlignedChunkStatisticsCache(final String measurementID) + throws IOException { + final Map<Long, Statistics> statisticsByOffset = new HashMap<>(); + final List<IChunkMetadata> metadataList = + tsFileSequenceReader.getIChunkMetadataList(currentDevice, measurementID); for (IChunkMetadata metadata : metadataList) { if (!(metadata instanceof AlignedChunkMetadata)) { continue; } - List<IChunkMetadata> list = ((AlignedChunkMetadata) metadata).getValueChunkMetadataList(); + final List<IChunkMetadata> list = + ((AlignedChunkMetadata) metadata).getValueChunkMetadataList(); for (IChunkMetadata m : list) { - if (m.getOffsetOfChunkHeader() == currentChunkHeaderOffset) { - return m.getStatistics(); + if (Objects.nonNull(m)) { + statisticsByOffset.put(m.getOffsetOfChunkHeader(), m.getStatistics()); } } - break; } - return null; + return statisticsByOffset; } - private Statistics findNonAlignedChunkStatistics( - List<IChunkMetadata> metadataList, long currentChunkHeaderOffset) { + private Map<Long, Statistics> buildNonAlignedChunkStatisticsCache(final String measurementID) + throws IOException { + final Map<Long, Statistics> statisticsByOffset = new HashMap<>(); + final List<IChunkMetadata> metadataList = + tsFileSequenceReader.getIChunkMetadataList(currentDevice, measurementID); for (IChunkMetadata metadata : metadataList) { - if (metadata.getOffsetOfChunkHeader() == currentChunkHeaderOffset) { - // found the corresponding chunk metadata - return metadata.getStatistics(); + if (Objects.nonNull(metadata)) { + statisticsByOffset.put(metadata.getOffsetOfChunkHeader(), metadata.getStatistics()); } } - return null; + return statisticsByOffset; + } + + private void clearCurrentChunkGroupCaches() { + measurementHasOverlappedModsCache.clear(); + alignedChunkStatisticsCache.clear(); + nonAlignedChunkStatisticsCache.clear(); } }
