This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch xingtanzjr/refine_cross_selection in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 226c470de2893282db4553b70f6e5728af228d8b Author: Jinrui.Zhang <[email protected]> AuthorDate: Thu Dec 22 12:40:11 2022 +0800 complete deviceTimeIndex build --- .../rewrite/CrossSpaceCompactionCandidate.java | 59 +++++++++++++++++++--- .../db/engine/storagegroup/TsFileResource.java | 15 ++++++ .../storagegroup/timeindex/DeviceTimeIndex.java | 4 ++ 3 files changed, 70 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/CrossSpaceCompactionCandidate.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/CrossSpaceCompactionCandidate.java index f3ddb7c230..0965ead3f9 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/CrossSpaceCompactionCandidate.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/CrossSpaceCompactionCandidate.java @@ -21,9 +21,14 @@ package org.apache.iotdb.db.engine.compaction.cross.rewrite; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus; +import org.apache.iotdb.db.engine.storagegroup.timeindex.DeviceTimeIndex; +import org.apache.iotdb.db.engine.storagegroup.timeindex.ITimeIndex; +import java.io.IOException; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; /** @@ -56,7 +61,7 @@ public class CrossSpaceCompactionCandidate { this.nextUnseqFileIndex = 0; } - public boolean hasNextSplit() { + public boolean hasNextSplit() throws IOException { if (nextUnseqFileIndex >= unseqFiles.size()) { return false; } @@ -67,10 +72,15 @@ public class CrossSpaceCompactionCandidate { return nextSplit; } - private boolean prepareNextSplit() { + private boolean prepareNextSplit() throws IOException { TsFileResourceCandidate unseqFile = unseqFiles.get(nextUnseqFileIndex); List<TsFileResourceCandidate> ret = new ArrayList<>(); + // The startTime and endTime of each device are different in one TsFile. So we need to do the + // check + // one by one. And we cannot skip any device in the unseq file because it may lead to omission + // of + // target seq file for (DeviceInfo unseqDeviceInfo : unseqFile.getDevices()) { for (TsFileResourceCandidate seqFile : seqFiles) { if (!seqFile.containsDevice(unseqDeviceInfo.deviceId)) { @@ -167,6 +177,7 @@ public class CrossSpaceCompactionCandidate { protected TsFileResource resource; protected boolean selected; protected boolean isValidCandidate; + private Map<String, DeviceInfo> deviceInfoMap; protected TsFileResourceCandidate(TsFileResource tsFileResource) { this.resource = tsFileResource; @@ -176,20 +187,46 @@ public class CrossSpaceCompactionCandidate { this.isValidCandidate = tsFileResource.isClosed() && tsFileResource.getTsFile().exists(); } + private void prepareDeviceInfos() throws IOException { + if (deviceInfoMap != null) { + return; + } + deviceInfoMap = new LinkedHashMap<>(); + if (resource.getTimeIndexType() == ITimeIndex.FILE_TIME_INDEX_TYPE) { + DeviceTimeIndex timeIndex = resource.buildDeviceTimeIndex(); + for (String deviceId : timeIndex.getDevices()) { + deviceInfoMap.put( + deviceId, + new DeviceInfo( + deviceId, timeIndex.getStartTime(deviceId), timeIndex.getEndTime(deviceId))); + } + } else { + for (String deviceId : resource.getDevices()) { + deviceInfoMap.put( + deviceId, + new DeviceInfo( + deviceId, resource.getStartTime(deviceId), resource.getEndTime(deviceId))); + } + } + } + protected void markAsSelected() { this.selected = true; } - protected List<DeviceInfo> getDevices() { - return null; + protected List<DeviceInfo> getDevices() throws IOException { + prepareDeviceInfos(); + return new ArrayList<>(deviceInfoMap.values()); } - protected DeviceInfo getDeviceInfoById(String deviceId) { - return null; + protected DeviceInfo getDeviceInfoById(String deviceId) throws IOException { + prepareDeviceInfos(); + return deviceInfoMap.get(deviceId); } - protected boolean containsDevice(String deviceId) { - return false; + protected boolean containsDevice(String deviceId) throws IOException { + prepareDeviceInfos(); + return deviceInfoMap.containsKey(deviceId); } } @@ -197,5 +234,11 @@ public class CrossSpaceCompactionCandidate { protected String deviceId; protected long startTime; protected long endTime; + + public DeviceInfo(String deviceId, long startTime, long endTime) { + this.deviceId = deviceId; + this.startTime = startTime; + this.endTime = endTime; + } } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java index a73ada0e33..2f9e194130 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java @@ -411,6 +411,21 @@ public class TsFileResource { return timeIndex.getDevices(file.getPath(), this); } + public DeviceTimeIndex buildDeviceTimeIndex() throws IOException { + readLock(); + try (InputStream inputStream = + FSFactoryProducer.getFSFactory() + .getBufferedInputStream(file.getPath() + TsFileResource.RESOURCE_SUFFIX)) { + DeviceTimeIndex deviceTimeIndex = new DeviceTimeIndex(); + return deviceTimeIndex.deserialize(inputStream); + } catch (Exception e) { + throw new IOException( + "Can't read file " + file.getPath() + TsFileResource.RESOURCE_SUFFIX + " from disk", e); + } finally { + readUnlock(); + } + } + /** * Whether this TsFileResource contains this device, if false, it must not contain this device, if * true, it may or may not contain this device diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java index 3a745f532d..2d50171bcb 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java @@ -150,6 +150,10 @@ public class DeviceTimeIndex implements ITimeIndex { endTimes = Arrays.copyOfRange(endTimes, 0, deviceToIndex.size()); } + public Set<String> getDevices() { + return deviceToIndex.keySet(); + } + @Override public Set<String> getDevices(String tsFilePath, TsFileResource tsFileResource) { return deviceToIndex.keySet();
