This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch xingtanzjr/fix_013_cross_selector in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3987c2d3d4d62cca9156c524878b1dab6435fed2 Author: Jinrui.Zhang <[email protected]> AuthorDate: Sat Dec 31 09:50:20 2022 +0800 fix the issue that the TimeIndex is not accurate when select crossing compaction task --- .../cross/rewrite/TsFileDeviceInfoStore.java | 103 +++++++++++++++++++++ .../selector/RewriteCompactionFileSelector.java | 27 ++++-- .../db/engine/storagegroup/TsFileResource.java | 19 ++++ .../storagegroup/timeindex/DeviceTimeIndex.java | 4 + .../engine/storagegroup/timeindex/ITimeIndex.java | 9 ++ 5 files changed, 155 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/TsFileDeviceInfoStore.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/TsFileDeviceInfoStore.java new file mode 100644 index 0000000000..e9f01a9295 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/TsFileDeviceInfoStore.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.compaction.cross.rewrite; + +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.engine.storagegroup.timeindex.DeviceTimeIndex; +import org.apache.iotdb.db.engine.storagegroup.timeindex.TimeIndexLevel; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +public class TsFileDeviceInfoStore { + + private Map<TsFileResource, TsFileDeviceInfo> cache; + + public TsFileDeviceInfoStore() { + cache = new HashMap<>(); + } + + public TsFileDeviceInfo get(TsFileResource tsFileResource) { + return cache.computeIfAbsent(tsFileResource, TsFileDeviceInfo::new); + } + + public static class TsFileDeviceInfo { + public TsFileResource resource; + private Map<String, DeviceInfo> deviceInfoMap; + + public TsFileDeviceInfo(TsFileResource tsFileResource) { + this.resource = tsFileResource; + } + + private void prepareDeviceInfos() throws IOException { + if (deviceInfoMap != null) { + return; + } + deviceInfoMap = new LinkedHashMap<>(); + if (TimeIndexLevel.valueOf(resource.getTimeIndexType()) == TimeIndexLevel.FILE_TIME_INDEX) { + DeviceTimeIndex timeIndex = resource.buildDeviceTimeIndex(); + for (String deviceId : timeIndex.getDevices()) { + deviceInfoMap.put( + deviceId, + new DeviceInfo( + deviceId, timeIndex.getStartTime(deviceId), timeIndex.getEndTime(deviceId))); + } + } else { + for (String deviceId : resource.getDevices()) { + deviceInfoMap.put( + deviceId, + new DeviceInfo( + deviceId, resource.getStartTime(deviceId), resource.getEndTime(deviceId))); + } + } + } + + public List<DeviceInfo> getDevices() throws IOException { + prepareDeviceInfos(); + return new ArrayList<>(deviceInfoMap.values()); + } + + public DeviceInfo getDeviceInfoById(String deviceId) throws IOException { + prepareDeviceInfos(); + return deviceInfoMap.get(deviceId); + } + + public boolean containsDevice(String deviceId) throws IOException { + prepareDeviceInfos(); + return deviceInfoMap.containsKey(deviceId); + } + } + + public static class DeviceInfo { + public String deviceId; + public long startTime; + public long endTime; + + public DeviceInfo(String deviceId, long startTime, long endTime) { + this.deviceId = deviceId; + this.startTime = startTime; + this.endTime = endTime; + } + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java index caf5943a4a..f2dffbeecf 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java @@ -20,6 +20,9 @@ package org.apache.iotdb.db.engine.compaction.cross.rewrite.selector; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.compaction.cross.rewrite.TsFileDeviceInfoStore; +import org.apache.iotdb.db.engine.compaction.cross.rewrite.TsFileDeviceInfoStore.DeviceInfo; +import org.apache.iotdb.db.engine.compaction.cross.rewrite.TsFileDeviceInfoStore.TsFileDeviceInfo; import org.apache.iotdb.db.engine.compaction.cross.rewrite.manage.CrossSpaceCompactionResource; import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; @@ -75,6 +78,10 @@ public class RewriteCompactionFileSelector implements ICrossSpaceMergeFileSelect private AbstractCompactionEstimator compactionEstimator; + // Cache the DeviceInfos for used seqFiles to avoid loading DeviceTimeIndex more than 1 times from + // disk for each seqFile because each seqFile may be scanned more than 1 time in each selector + private final TsFileDeviceInfoStore deviceInfoStore; + public RewriteCompactionFileSelector(CrossSpaceCompactionResource resource, long memoryBudget) { this.resource = resource; this.memoryBudget = memoryBudget; @@ -83,6 +90,7 @@ public class RewriteCompactionFileSelector implements ICrossSpaceMergeFileSelect this.maxCrossCompactionFileSize = IoTDBDescriptor.getInstance().getConfig().getMaxCrossCompactionCandidateFileSize(); this.compactionEstimator = new RewriteCrossCompactionEstimator(); + this.deviceInfoStore = new TsFileDeviceInfoStore(); } /** @@ -274,16 +282,21 @@ public class RewriteCompactionFileSelector implements ICrossSpaceMergeFileSelect * * @param unseqFile the tsFileResource of unseqFile to be compacted */ - private void selectOverlappedSeqFiles(TsFileResource unseqFile) { + private void selectOverlappedSeqFiles(TsFileResource unseqFile) throws IOException { final int SELECT_WARN_THRESHOLD = 10; - for (String deviceId : unseqFile.getDevices()) { - long unseqStartTime = unseqFile.getStartTime(deviceId); - long unseqEndTime = unseqFile.getEndTime(deviceId); + // It is unnecessary to cache DeviceInfo for unseqFile into store because it is only be used + // once in every selector. + TsFileDeviceInfo unseqFileDeviceInfo = new TsFileDeviceInfo(unseqFile); + for (DeviceInfo deviceInfo : unseqFileDeviceInfo.getDevices()) { + String deviceId = deviceInfo.deviceId; + long unseqStartTime = deviceInfo.startTime; + long unseqEndTime = deviceInfo.endTime; boolean noMoreOverlap = false; for (int i = 0; i < resource.getSeqFiles().size() && !noMoreOverlap; i++) { TsFileResource seqFile = resource.getSeqFiles().get(i); - if (!seqFile.mayContainsDevice(deviceId)) { + TsFileDeviceInfo seqFileDeviceInfo = deviceInfoStore.get(seqFile); + if (!seqFileDeviceInfo.containsDevice(deviceId)) { continue; } int crossSpaceCompactionTimes = 0; @@ -295,8 +308,8 @@ public class RewriteCompactionFileSelector implements ICrossSpaceMergeFileSelect logger.warn("Meets IOException when selecting files for cross space compaction", e); } - long seqEndTime = seqFile.getEndTime(deviceId); - long seqStartTime = seqFile.getStartTime(deviceId); + long seqEndTime = seqFileDeviceInfo.getDeviceInfoById(deviceId).endTime; + long seqStartTime = seqFileDeviceInfo.getDeviceInfoById(deviceId).startTime; if (!seqFile.isClosed()) { // for unclosed file, only select those that overlap with the unseq file if (unseqEndTime >= seqStartTime) { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java index 7f6707f98b..4aff97685a 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java @@ -317,6 +317,25 @@ public class TsFileResource { } } + public DeviceTimeIndex buildDeviceTimeIndex() throws IOException { + readLock(); + try (InputStream inputStream = + FSFactoryProducer.getFSFactory() + .getBufferedInputStream(file.getPath() + TsFileResource.RESOURCE_SUFFIX)) { + ReadWriteIOUtils.readByte(inputStream); + ITimeIndex timeIndexFromResourceFile = ITimeIndex.createTimeIndex(inputStream); + if (!(timeIndexFromResourceFile instanceof DeviceTimeIndex)) { + throw new IOException("cannot build DeviceTimeIndex from resource " + file.getPath()); + } + return (DeviceTimeIndex) timeIndexFromResourceFile; + } catch (Exception e) { + throw new IOException( + "Can't read file " + file.getPath() + TsFileResource.RESOURCE_SUFFIX + " from disk", e); + } finally { + readUnlock(); + } + } + public void updateStartTime(String device, long time) { timeIndex.updateStartTime(device, time); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java index b843de0e8b..8ffd113ed1 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java @@ -150,6 +150,10 @@ public class DeviceTimeIndex implements ITimeIndex { return deviceToIndex.keySet(); } + public Set<String> getDevices() { + return deviceToIndex.keySet(); + } + @Override public boolean endTimeEmpty() { for (long endTime : endTimes) { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java index b4e127bd3b..c725a2f8e0 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.engine.storagegroup.timeindex; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.exception.PartitionViolationException; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import java.io.IOException; import java.io.InputStream; @@ -186,4 +187,12 @@ public interface ITimeIndex { * it may or may not contain this device */ boolean mayContainsDevice(String device); + + static ITimeIndex createTimeIndex(InputStream inputStream) throws IOException { + byte timeIndexType = ReadWriteIOUtils.readByte(inputStream); + if (timeIndexType == -1) { + throw new IOException("The end of stream has been reached"); + } + return TimeIndexLevel.valueOf(timeIndexType).getTimeIndex().deserialize(inputStream); + } }
