This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch hot_compaction
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/hot_compaction by this push:
new 6483032 query adapter
new 8466408 resolve conflicts
6483032 is described below
commit 64830328d7895f70fded8e4192c6beb3ff8c0791
Author: JackieTien97 <[email protected]>
AuthorDate: Tue Jun 23 18:02:54 2020 +0800
query adapter
---
.../engine/storagegroup/StorageGroupProcessor.java | 210 ++++++++++++---------
.../db/engine/storagegroup/TsFileProcessor.java | 27 ++-
2 files changed, 141 insertions(+), 96 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 009acac..95c3710 100755
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -229,22 +229,20 @@ public class StorageGroupProcessor {
private TsFileFlushPolicy fileFlushPolicy;
/**
- * partitionDirectFileVersions records the versions of the direct TsFiles
(generated by close,
- * not including the files generated by merge) of each partition.
- * As data file close is managed by the leader in the distributed version,
the files with the
- * same version(s) have the same data, despite that the inner structure (the
size and
- * organization of chunks) may be different, so we can easily find what
remote files we do not
- * have locally.
- * partition number -> version number set
+ * partitionDirectFileVersions records the versions of the direct TsFiles
(generated by close, not
+ * including the files generated by merge) of each partition. As data file
close is managed by the
+ * leader in the distributed version, the files with the same version(s)
have the same data,
+ * despite that the inner structure (the size and organization of chunks)
may be different, so we
+ * can easily find what remote files we do not have locally. partition
number -> version number
+ * set
*/
private Map<Long, Set<Long>> partitionDirectFileVersions = new HashMap<>();
/**
- * The max file versions in each partition. By recording this, if several
IoTDB instances have
- * the same policy of closing file and their ingestion is identical, then
files of the same
- * version in different IoTDB instance will have identical data, providing
convenience for data
- * comparison across different instances.
- * partition number -> max version number
+ * The max file versions in each partition. By recording this, if several
IoTDB instances have the
+ * same policy of closing file and their ingestion is identical, then files
of the same version in
+ * different IoTDB instance will have identical data, providing convenience
for data comparison
+ * across different instances. partition number -> max version number
*/
private Map<Long, Long> partitionMaxFileVersions = new HashMap<>();
@@ -272,12 +270,12 @@ public class StorageGroupProcessor {
try {
// collect candidate TsFiles from sequential and unsequential data
directory
Pair<List<TsFileResource>, List<TsFileResource>> seqTsFilesPair =
getAllFiles(
- DirectoryManager.getInstance().getAllSequenceFileFolders());
+ DirectoryManager.getInstance().getAllSequenceFileFolders());
List<TsFileResource> tmpSeqTsFiles = seqTsFilesPair.left;
List<TsFileResource> oldSeqTsFiles = seqTsFilesPair.right;
upgradeSeqFileList.addAll(oldSeqTsFiles);
Pair<List<TsFileResource>, List<TsFileResource>> unseqTsFilesPair =
getAllFiles(
- DirectoryManager.getInstance().getAllUnSequenceFileFolders());
+ DirectoryManager.getInstance().getAllUnSequenceFileFolders());
List<TsFileResource> tmpUnseqTsFiles = unseqTsFilesPair.left;
List<TsFileResource> oldUnseqTsFiles = unseqTsFilesPair.right;
upgradeUnseqFileList.addAll(oldUnseqTsFiles);
@@ -287,12 +285,14 @@ public class StorageGroupProcessor {
for (TsFileResource resource : sequenceFileTreeSet) {
long partitionNum = resource.getTimePartition();
- partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new
HashSet<>()).addAll(resource.getHistoricalVersions());
+ partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new
HashSet<>())
+ .addAll(resource.getHistoricalVersions());
updatePartitionFileVersion(partitionNum,
Collections.max(resource.getHistoricalVersions()));
}
for (TsFileResource resource : unSequenceFileList) {
long partitionNum = resource.getTimePartition();
- partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new
HashSet<>()).addAll(resource.getHistoricalVersions());
+ partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new
HashSet<>())
+ .addAll(resource.getHistoricalVersions());
updatePartitionFileVersion(partitionNum,
Collections.max(resource.getHistoricalVersions()));
}
@@ -317,7 +317,6 @@ public class StorageGroupProcessor {
throw new StorageGroupProcessorException(e);
}
-
for (TsFileResource resource : sequenceFileTreeSet) {
long timePartitionId = resource.getTimePartition();
Map<String, Long> endTimeMap = new HashMap<>();
@@ -346,11 +345,11 @@ public class StorageGroupProcessor {
/**
* use old seq file to update latestTimeForEachDevice,
globalLatestFlushedTimeForEachDevice,
* partitionLatestFlushedTimeForEachDevice and
timePartitionIdVersionControllerMap
- *
*/
private void updateLastestFlushedTime() throws IOException {
- VersionController versionController = new
SimpleFileVersionController(storageGroupSysDir.getPath());
+ VersionController versionController = new SimpleFileVersionController(
+ storageGroupSysDir.getPath());
long currentVersion = versionController.currVersion();
for (TsFileResource resource : upgradeSeqFileList) {
for (Entry<String, Integer> entry :
resource.getDeviceToIndexMap().entrySet()) {
@@ -359,24 +358,27 @@ public class StorageGroupProcessor {
long endTime = resource.getEndTime(index);
long endTimePartitionId = StorageEngine.getTimePartition(endTime);
latestTimeForEachDevice.computeIfAbsent(endTimePartitionId, l -> new
HashMap<>())
- .put(deviceId, endTime);
+ .put(deviceId, endTime);
globalLatestFlushedTimeForEachDevice.put(deviceId, endTime);
// set all the covered partition's LatestFlushedTime to Long.MAX_VALUE
long partitionId =
StorageEngine.getTimePartition(resource.getStartTime(index));
while (partitionId <= endTimePartitionId) {
partitionLatestFlushedTimeForEachDevice.computeIfAbsent(partitionId,
l -> new HashMap<>())
- .put(deviceId, Long.MAX_VALUE);
+ .put(deviceId, Long.MAX_VALUE);
if (!timePartitionIdVersionControllerMap.containsKey(partitionId)) {
- File directory =
SystemFileFactory.INSTANCE.getFile(storageGroupSysDir,
String.valueOf(partitionId));
- if(!directory.exists()){
+ File directory = SystemFileFactory.INSTANCE
+ .getFile(storageGroupSysDir, String.valueOf(partitionId));
+ if (!directory.exists()) {
directory.mkdirs();
}
- File versionFile = SystemFileFactory.INSTANCE.getFile(directory,
SimpleFileVersionController.FILE_PREFIX + currentVersion);
+ File versionFile = SystemFileFactory.INSTANCE
+ .getFile(directory, SimpleFileVersionController.FILE_PREFIX +
currentVersion);
if (!versionFile.createNewFile()) {
logger.warn("Version file {} has already been created ",
versionFile);
}
- timePartitionIdVersionControllerMap.put(partitionId, new
SimpleFileVersionController(storageGroupSysDir.getPath(), partitionId));
+ timePartitionIdVersionControllerMap.put(partitionId,
+ new SimpleFileVersionController(storageGroupSysDir.getPath(),
partitionId));
}
partitionId++;
}
@@ -402,7 +404,8 @@ public class StorageGroupProcessor {
});
}
- private Pair<List<TsFileResource>, List<TsFileResource>>
getAllFiles(List<String> folders) throws IOException {
+ private Pair<List<TsFileResource>, List<TsFileResource>>
getAllFiles(List<String> folders)
+ throws IOException {
List<File> tsFiles = new ArrayList<>();
List<File> upgradeFiles = new ArrayList<>();
for (String baseDir : folders) {
@@ -420,9 +423,12 @@ public class StorageGroupProcessor {
// the process was interrupted before the merged files could be named
continueFailedRenames(fileFolder, MERGE_SUFFIX);
- File[] oldTsfileArray =
fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(), TSFILE_SUFFIX);
- File[] oldResourceFileArray =
fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(),
TsFileResource.RESOURCE_SUFFIX);
- File[] oldModificationFileArray =
fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(),
ModificationFile.FILE_SUFFIX);
+ File[] oldTsfileArray = fsFactory
+ .listFilesBySuffix(fileFolder.getAbsolutePath(), TSFILE_SUFFIX);
+ File[] oldResourceFileArray = fsFactory
+ .listFilesBySuffix(fileFolder.getAbsolutePath(),
TsFileResource.RESOURCE_SUFFIX);
+ File[] oldModificationFileArray = fsFactory
+ .listFilesBySuffix(fileFolder.getAbsolutePath(),
ModificationFile.FILE_SUFFIX);
File upgradeFolder = fsFactory.getFile(fileFolder,
IoTDBConstant.UPGRADE_FOLDER_NAME);
// move the old files to upgrade folder if exists
if (oldTsfileArray.length != 0 || oldResourceFileArray.length != 0) {
@@ -611,7 +617,8 @@ public class StorageGroupProcessor {
long timePartitionId =
StorageEngine.getTimePartition(insertPlan.getTime());
latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new
HashMap<>());
- partitionLatestFlushedTimeForEachDevice.computeIfAbsent(timePartitionId,
id -> new HashMap<>());
+ partitionLatestFlushedTimeForEachDevice
+ .computeIfAbsent(timePartitionId, id -> new HashMap<>());
// insert to sequence or unSequence file
insertToTsFileProcessor(insertPlan,
@@ -625,8 +632,9 @@ public class StorageGroupProcessor {
/**
* Insert a tablet (rows belonging to the same devices) into this storage
group.
+ *
* @param insertTabletPlan
- * @throws WriteProcessException when update last cache failed
+ * @throws WriteProcessException when update last cache failed
* @throws BatchInsertionException if some of the rows failed to be inserted
*/
public void insertTablet(InsertTabletPlan insertTabletPlan) throws
WriteProcessException,
@@ -660,7 +668,8 @@ public class StorageGroupProcessor {
// before is first start point
int before = loc;
// before time partition
- long beforeTimePartition =
StorageEngine.getTimePartition(insertTabletPlan.getTimes()[before]);
+ long beforeTimePartition = StorageEngine
+ .getTimePartition(insertTabletPlan.getTimes()[before]);
// init map
long lastFlushTime = partitionLatestFlushedTimeForEachDevice.
computeIfAbsent(beforeTimePartition, id -> new HashMap<>()).
@@ -723,15 +732,15 @@ public class StorageGroupProcessor {
}
/**
- * insert batch to tsfile processor thread-safety that the caller need to
guarantee
- * The rows to be inserted are in the range [start, end)
+ * insert batch to tsfile processor thread-safety that the caller need to
guarantee The rows to be
+ * inserted are in the range [start, end)
*
* @param insertTabletPlan insert a tablet of a device
- * @param sequence whether is sequence
- * @param start start index of rows to be inserted in insertTabletPlan
- * @param end end index of rows to be inserted in insertTabletPlan
- * @param results result array
- * @param timePartitionId time partition id
+ * @param sequence whether is sequence
+ * @param start start index of rows to be inserted in
insertTabletPlan
+ * @param end end index of rows to be inserted in
insertTabletPlan
+ * @param results result array
+ * @param timePartitionId time partition id
* @return false if any failure occurs when inserting the tablet, true
otherwise
*/
private boolean insertTabletToTsFileProcessor(InsertTabletPlan
insertTabletPlan,
@@ -882,10 +891,10 @@ public class StorageGroupProcessor {
/**
* get processor from hashmap, flush oldest processor if necessary
*
- * @param timeRangeId time partition range
+ * @param timeRangeId time partition range
* @param tsFileProcessorTreeMap tsFileProcessorTreeMap
- * @param fileList file list to add new processor
- * @param sequence whether is sequence or not
+ * @param fileList file list to add new processor
+ * @param sequence whether is sequence or not
*/
private TsFileProcessor getOrCreateTsFileProcessorIntern(long timeRangeId,
TreeMap<Long, TsFileProcessor> tsFileProcessorTreeMap,
@@ -1279,7 +1288,7 @@ public class StorageGroupProcessor {
tsfileResourcesForQuery.add(tsFileResource);
} else {
// left: in-memory data, right: meta of disk data
- Pair<List<ReadOnlyMemChunk>, List<ChunkMetadata>> pair =
+ Pair<List<ReadOnlyMemChunk>, List<List<ChunkMetadata>>> pair =
tsFileResource.getUnsealedFileProcessor()
.query(deviceId, measurementId, schema.getType(),
schema.getEncodingType(),
schema.getProps(), context);
@@ -1287,7 +1296,17 @@ public class StorageGroupProcessor {
tsfileResourcesForQuery.add(new
TsFileResource(tsFileResource.getFile(),
tsFileResource.getDeviceToIndexMap(),
tsFileResource.getStartTimes(), tsFileResource.getEndTimes(),
pair.left,
- pair.right));
+ pair.right.get(0)));
+
+ List<TsFileResource> vmTsFileResourceList =
+ tsFileResource.getUnsealedFileProcessor().getVmTsFileResources();
+
+ for (int i = 1; i < pair.right.size(); i++) {
+ TsFileResource tmp = vmTsFileResourceList.get(i - 1);
+ tsfileResourcesForQuery.add(
+ new TsFileResource(tmp.getFile(), tmp.getDeviceToIndexMap(),
tmp.getStartTimes(),
+ tmp.getEndTimes(), pair.left, pair.right.get(i)));
+ }
}
} catch (IOException e) {
throw new MetadataException(e);
@@ -1321,7 +1340,8 @@ public class StorageGroupProcessor {
int deviceIndex = tsFileResource.getDeviceToIndexMap().get(deviceId);
long startTime = tsFileResource.getStartTime(deviceIndex);
- long endTime = tsFileResource.isClosed() || !isSeq ?
tsFileResource.getEndTime(deviceIndex) : Long.MAX_VALUE;
+ long endTime = tsFileResource.isClosed() || !isSeq ?
tsFileResource.getEndTime(deviceIndex)
+ : Long.MAX_VALUE;
if (!isAlive(endTime)) {
return false;
@@ -1338,9 +1358,9 @@ public class StorageGroupProcessor {
* Delete data whose timestamp <= 'timestamp' and belongs to the time series
* deviceId.measurementId.
*
- * @param deviceId the deviceId of the timeseries to be deleted.
+ * @param deviceId the deviceId of the timeseries to be deleted.
* @param measurementId the measurementId of the timeseries to be deleted.
- * @param timestamp the delete range is (0, timestamp].
+ * @param timestamp the delete range is (0, timestamp].
*/
public void delete(String deviceId, String measurementId, long timestamp)
throws IOException {
// TODO: how to avoid partial deletion?
@@ -1395,7 +1415,8 @@ public class StorageGroupProcessor {
}
}
- private void logDeletion(long timestamp, String deviceId, String
measurementId, long timePartitionId)
+ private void logDeletion(long timestamp, String deviceId, String
measurementId,
+ long timePartitionId)
throws IOException {
if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
DeletePlan deletionPlan = new DeletePlan(timestamp, new Path(deviceId,
measurementId));
@@ -1477,7 +1498,8 @@ public class StorageGroupProcessor {
partitionLatestFlushedTimeForEachDevice
.computeIfAbsent(processor.getTimeRangeId(), id -> new HashMap<>())
.put(entry.getKey(), entry.getValue());
-
updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(processor.getTimeRangeId(),
entry.getKey(), entry.getValue());
+
updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(processor.getTimeRangeId(),
+ entry.getKey(), entry.getValue());
if (globalLatestFlushedTimeForEachDevice
.getOrDefault(entry.getKey(), Long.MIN_VALUE) < entry.getValue()) {
globalLatestFlushedTimeForEachDevice.put(entry.getKey(),
entry.getValue());
@@ -1490,10 +1512,11 @@ public class StorageGroupProcessor {
/**
* used for upgrading
*/
- public void updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(long
partitionId, String deviceId, long time) {
+ public void updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(long
partitionId,
+ String deviceId, long time) {
newlyFlushedPartitionLatestFlushedTimeForEachDevice
- .computeIfAbsent(partitionId, id -> new HashMap<>())
- .compute(deviceId, (k, v) -> v == null ? time : Math.max(v, time));
+ .computeIfAbsent(partitionId, id -> new HashMap<>())
+ .compute(deviceId, (k, v) -> v == null ? time : Math.max(v, time));
}
/**
@@ -1547,8 +1570,8 @@ public class StorageGroupProcessor {
for (TsFileResource resource : upgradedResources) {
long partitionId = resource.getTimePartition();
resource.getDeviceToIndexMap().forEach((device, index) ->
- updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(partitionId,
device,
- resource.getEndTime(index))
+
updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(partitionId, device,
+ resource.getEndTime(index))
);
}
insertLock.writeLock().lock();
@@ -1747,6 +1770,7 @@ public class StorageGroupProcessor {
/**
* acquire the write locks of the resource and the merge lock
+ *
* @param seqFile
*/
private void doubleWriteLock(TsFileResource seqFile) {
@@ -1763,7 +1787,7 @@ public class StorageGroupProcessor {
if (fileLockGot) {
seqFile.writeUnlock();
}
- if(mergeLockGot) {
+ if (mergeLockGot) {
mergeLock.writeLock().unlock();
}
}
@@ -1772,6 +1796,7 @@ public class StorageGroupProcessor {
/**
* release the write locks of the resource and the merge lock
+ *
* @param seqFile
*/
private void doubleWriteUnlock(TsFileResource seqFile) {
@@ -1798,7 +1823,7 @@ public class StorageGroupProcessor {
mergeLock.writeLock().lock();
try {
if (loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted,
newTsFileResource,
- newFilePartitionId)){
+ newFilePartitionId)) {
updateLatestTimeMap(newTsFileResource);
}
} catch (DiskSpaceInsufficientException e) {
@@ -1865,7 +1890,8 @@ public class StorageGroupProcessor {
long partitionNum = newTsFileResource.getTimePartition();
partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new
HashSet<>())
.addAll(newTsFileResource.getHistoricalVersions());
- updatePartitionFileVersion(partitionNum,
Collections.max(newTsFileResource.getHistoricalVersions()));
+ updatePartitionFileVersion(partitionNum,
+ Collections.max(newTsFileResource.getHistoricalVersions()));
} catch (DiskSpaceInsufficientException e) {
logger.error(
"Failed to append the tsfile {} to storage group processor {}
because the disk space is insufficient.",
@@ -1880,11 +1906,13 @@ public class StorageGroupProcessor {
/**
* Set the version in "partition" to "version" if "version" is larger than
the current version.
+ *
* @param partition
* @param version
*/
public void setPartitionFileVersionToMax(long partition, long version) {
- partitionMaxFileVersions.compute(partition, (prt, oldVer) ->
computeMaxVersion(oldVer, version));
+ partitionMaxFileVersions
+ .compute(partition, (prt, oldVer) -> computeMaxVersion(oldVer,
version));
}
private long computeMaxVersion(Long oldVersion, Long newVersion) {
@@ -1895,12 +1923,14 @@ public class StorageGroupProcessor {
}
/**
- * Find the position of "newTsFileResource" in the sequence files if it can
be inserted into them.
+ * Find the position of "newTsFileResource" in the sequence files if it can
be inserted into
+ * them.
+ *
* @param newTsFileResource
* @param newFilePartitionId
- * @return POS_ALREADY_EXIST(-2) if some file has the same name as the one
to be inserted
- * POS_OVERLAP(-3) if some file overlaps the new file
- * an insertion position i >= -1 if the new file can be inserted
between [i, i+1]
+ * @return POS_ALREADY_EXIST(- 2) if some file has the same name as the one
to be inserted
+ * POS_OVERLAP(-3) if some file overlaps the new file an insertion position
i >= -1 if the new
+ * file can be inserted between [i, i+1]
*/
private int findInsertionPosition(TsFileResource newTsFileResource, long
newFilePartitionId,
List<TsFileResource> sequenceList) {
@@ -1941,11 +1971,11 @@ public class StorageGroupProcessor {
/**
* Compare each device in the two files to find the time relation of them.
+ *
* @param fileA
* @param fileB
- * @return -1 if fileA is totally older than fileB (A < B)
- * 0 if fileA is partially older than fileB and partially newer
than fileB (A X B)
- * 1 if fileA is totally newer than fileB (B < A)
+ * @return -1 if fileA is totally older than fileB (A < B) 0 if fileA is
partially older than
+ * fileB and partially newer than fileB (A X B) 1 if fileA is totally newer
than fileB (B < A)
*/
private int compareTsFileDevices(TsFileResource fileA, TsFileResource fileB)
{
boolean hasPre = false, hasSubsequence = false;
@@ -1981,10 +2011,10 @@ public class StorageGroupProcessor {
}
/**
- * If the historical versions of a file is a sub-set of the given file's,
(close and) remove it to reduce
- * unnecessary merge. Only used when the file sender and the receiver share
the same file
- * close policy.
- * Warning: DO NOT REMOVE
+ * If the historical versions of a file is a sub-set of the given file's,
(close and) remove it to
+ * reduce unnecessary merge. Only used when the file sender and the receiver
share the same file
+ * close policy. Warning: DO NOT REMOVE
+ *
* @param resource
*/
@SuppressWarnings("unused")
@@ -2026,13 +2056,14 @@ public class StorageGroupProcessor {
/**
* remove the given tsFileResource. If the corresponding tsFileProcessor is
in the working status,
- * close it before remove the related resource files.
- * maybe time-consuming for closing a tsfile.
+ * close it before remove the related resource files. maybe time-consuming
for closing a tsfile.
+ *
* @param tsFileResource
* @param iterator
* @param isSeq
*/
- private void removeFullyOverlapFile(TsFileResource tsFileResource,
Iterator<TsFileResource> iterator
+ private void removeFullyOverlapFile(TsFileResource tsFileResource,
+ Iterator<TsFileResource> iterator
, boolean isSeq) {
if (!tsFileResource.isClosed()) {
// also remove the TsFileProcessor if the overlapped file is not closed
@@ -2073,9 +2104,9 @@ public class StorageGroupProcessor {
* returns directly; otherwise, the time stamp is the mean of the timestamps
of the two files, the
* version number is the version number in the tsfile with a larger
timestamp.
*
- * @param tsfileName origin tsfile name
- * @param insertIndex the new file will be inserted between the files
[insertIndex, insertIndex
- * + 1]
+ * @param tsfileName origin tsfile name
+ * @param insertIndex the new file will be inserted between the files
[insertIndex, insertIndex +
+ * 1]
* @return appropriate filename
*/
private String getFileNameForLoadingFile(String tsfileName, int insertIndex,
@@ -2139,12 +2170,12 @@ public class StorageGroupProcessor {
/**
* Execute the loading process by the type.
*
- * @param type load type
- * @param tsFileResource tsfile resource to be loaded
+ * @param type load type
+ * @param tsFileResource tsfile resource to be loaded
* @param filePartitionId the partition id of the new file
- * @UsedBy sync module, load external tsfile module.
* @return load the file successfully
* @UsedBy sync module, load external tsfile module.
+ * @UsedBy sync module, load external tsfile module.
*/
private boolean loadTsFileByType(LoadTsFileType type, File syncedTsFile,
TsFileResource tsFileResource, long filePartitionId)
@@ -2153,7 +2184,8 @@ public class StorageGroupProcessor {
switch (type) {
case LOAD_UNSEQUENCE:
targetFile = new
File(DirectoryManager.getInstance().getNextFolderForUnSequenceFile(),
- storageGroupName + File.separatorChar + filePartitionId +
File.separator + tsFileResource
+ storageGroupName + File.separatorChar + filePartitionId +
File.separator
+ + tsFileResource
.getFile().getName());
tsFileResource.setFile(targetFile);
if (unSequenceFileList.contains(tsFileResource)) {
@@ -2213,7 +2245,8 @@ public class StorageGroupProcessor {
}
partitionDirectFileVersions.computeIfAbsent(filePartitionId,
p -> new HashSet<>()).addAll(tsFileResource.getHistoricalVersions());
- updatePartitionFileVersion(filePartitionId,
Collections.max(tsFileResource.getHistoricalVersions()));
+ updatePartitionFileVersion(filePartitionId,
+ Collections.max(tsFileResource.getHistoricalVersions()));
return true;
}
@@ -2354,6 +2387,7 @@ public class StorageGroupProcessor {
@FunctionalInterface
public interface CloseTsFileCallBack {
+
void call(TsFileProcessor caller) throws TsFileProcessorException,
IOException;
}
@@ -2362,17 +2396,17 @@ public class StorageGroupProcessor {
}
/**
- * Check if the data of "tsFileResource" all exist locally by comparing the
historical versions
- * in the partition of "partitionNumber". This is available only when the
IoTDB instances which generated
- * "tsFileResource" have the same close file policy as the local one.
- * If one of the version in "tsFileResource" equals to a version of a
working file, false is
- * returned because "tsFileResource" may have unwritten data of that file.
+ * Check if the data of "tsFileResource" all exist locally by comparing the
historical versions in
+ * the partition of "partitionNumber". This is available only when the IoTDB
instances which
+ * generated "tsFileResource" have the same close file policy as the local
one. If one of the
+ * version in "tsFileResource" equals to a version of a working file, false
is returned because
+ * "tsFileResource" may have unwritten data of that file.
+ *
* @param tsFileResource
* @param partitionNum
* @return true if the historicalVersions of "tsFileResource" is a subset of
- * partitionDirectFileVersions, or false if it is not a subset and it
contains any
- * version of a working file
- * USED by cluster module
+ * partitionDirectFileVersions, or false if it is not a subset and it
contains any version of a
+ * working file USED by cluster module
*/
public boolean isFileAlreadyExist(TsFileResource tsFileResource, long
partitionNum) {
// consider the case: The local node crashes when it is writing TsFile
no.5.
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index bf38e63..af31bf4 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -67,7 +67,6 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
-import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.slf4j.Logger;
@@ -851,7 +850,7 @@ public class TsFileProcessor {
* @param encoding encoding
* @return left: the chunk data in memory; right: the chunkMetadatas of data
on disk
*/
- public Pair<List<ReadOnlyMemChunk>, List<ChunkMetadata>> query(String
deviceId,
+ public Pair<List<ReadOnlyMemChunk>, List<List<ChunkMetadata>>> query(String
deviceId,
String measurementId, TSDataType dataType, TSEncoding encoding,
Map<String, String> props,
QueryContext context) {
if (logger.isDebugEnabled()) {
@@ -883,18 +882,26 @@ public class TsFileProcessor {
List<Modification> modifications =
context.getPathModifications(modificationFile,
deviceId + IoTDBConstant.PATH_SEPARATOR + measurementId);
+ List<List<ChunkMetadata>> rightResult = new ArrayList<>();
+
+ // get unseal tsfile data
List<ChunkMetadata> chunkMetadataList = writer
.getVisibleMetadataList(deviceId, measurementId, dataType);
- for (RestorableTsFileIOWriter vmWriter : vmWriters) {
- chunkMetadataList
- .addAll(vmWriter.getVisibleMetadataList(deviceId, measurementId,
dataType));
- }
QueryUtils.modifyChunkMetaData(chunkMetadataList,
modifications);
-
chunkMetadataList.removeIf(context::chunkNotSatisfy);
+ rightResult.add(chunkMetadataList);
- return new Pair<>(readOnlyMemChunks, chunkMetadataList);
+ // get vm tsfile data
+ for (RestorableTsFileIOWriter vmWriter : vmWriters) {
+ chunkMetadataList = vmWriter.getVisibleMetadataList(deviceId,
measurementId, dataType);
+ QueryUtils.modifyChunkMetaData(chunkMetadataList,
+ modifications);
+ chunkMetadataList.removeIf(context::chunkNotSatisfy);
+ rightResult.add(chunkMetadataList);
+ }
+
+ return new Pair<>(readOnlyMemChunks, rightResult);
} catch (Exception e) {
logger.error("{}: {} get ReadOnlyMemChunk has error", storageGroupName,
tsFileResource.getFile().getName(), e);
@@ -927,4 +934,8 @@ public class TsFileProcessor {
throw new TsFileProcessorException(e);
}
}
+
+ public List<TsFileResource> getVmTsFileResources() {
+ return vmTsFileResources;
+ }
}