This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch hot_compaction
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/hot_compaction by this push:
     new 6483032  query adapter
     new 8466408  resolve conflicts
6483032 is described below

commit 64830328d7895f70fded8e4192c6beb3ff8c0791
Author: JackieTien97 <[email protected]>
AuthorDate: Tue Jun 23 18:02:54 2020 +0800

    query adapter
---
 .../engine/storagegroup/StorageGroupProcessor.java | 210 ++++++++++++---------
 .../db/engine/storagegroup/TsFileProcessor.java    |  27 ++-
 2 files changed, 141 insertions(+), 96 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 009acac..95c3710 100755
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -229,22 +229,20 @@ public class StorageGroupProcessor {
   private TsFileFlushPolicy fileFlushPolicy;
 
   /**
-   * partitionDirectFileVersions records the versions of the direct TsFiles 
(generated by close,
-   * not including the files generated by merge) of each partition.
-   * As data file close is managed by the leader in the distributed version, 
the files with the
-   * same version(s) have the same data, despite that the inner structure (the 
size and
-   * organization of chunks) may be different, so we can easily find what 
remote files we do not
-   * have locally.
-   * partition number -> version number set
+   * partitionDirectFileVersions records the versions of the direct TsFiles 
(generated by close, not
+   * including the files generated by merge) of each partition. As data file 
close is managed by the
+   * leader in the distributed version, the files with the same version(s) 
have the same data,
+   * despite that the inner structure (the size and organization of chunks) 
may be different, so we
+   * can easily find what remote files we do not have locally. partition 
number -> version number
+   * set
    */
   private Map<Long, Set<Long>> partitionDirectFileVersions = new HashMap<>();
 
   /**
-   * The max file versions in each partition. By recording this, if several 
IoTDB instances have
-   * the same policy of closing file and their ingestion is identical, then 
files of the same
-   * version in different IoTDB instance will have identical data, providing 
convenience for data
-   * comparison across different instances.
-   * partition number -> max version number
+   * The max file versions in each partition. By recording this, if several 
IoTDB instances have the
+   * same policy of closing file and their ingestion is identical, then files 
of the same version in
+   * different IoTDB instance will have identical data, providing convenience 
for data comparison
+   * across different instances. partition number -> max version number
    */
   private Map<Long, Long> partitionMaxFileVersions = new HashMap<>();
 
@@ -272,12 +270,12 @@ public class StorageGroupProcessor {
     try {
       // collect candidate TsFiles from sequential and unsequential data 
directory
       Pair<List<TsFileResource>, List<TsFileResource>> seqTsFilesPair = 
getAllFiles(
-              DirectoryManager.getInstance().getAllSequenceFileFolders());
+          DirectoryManager.getInstance().getAllSequenceFileFolders());
       List<TsFileResource> tmpSeqTsFiles = seqTsFilesPair.left;
       List<TsFileResource> oldSeqTsFiles = seqTsFilesPair.right;
       upgradeSeqFileList.addAll(oldSeqTsFiles);
       Pair<List<TsFileResource>, List<TsFileResource>> unseqTsFilesPair = 
getAllFiles(
-              DirectoryManager.getInstance().getAllUnSequenceFileFolders());
+          DirectoryManager.getInstance().getAllUnSequenceFileFolders());
       List<TsFileResource> tmpUnseqTsFiles = unseqTsFilesPair.left;
       List<TsFileResource> oldUnseqTsFiles = unseqTsFilesPair.right;
       upgradeUnseqFileList.addAll(oldUnseqTsFiles);
@@ -287,12 +285,14 @@ public class StorageGroupProcessor {
 
       for (TsFileResource resource : sequenceFileTreeSet) {
         long partitionNum = resource.getTimePartition();
-        partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new 
HashSet<>()).addAll(resource.getHistoricalVersions());
+        partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new 
HashSet<>())
+            .addAll(resource.getHistoricalVersions());
         updatePartitionFileVersion(partitionNum, 
Collections.max(resource.getHistoricalVersions()));
       }
       for (TsFileResource resource : unSequenceFileList) {
         long partitionNum = resource.getTimePartition();
-        partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new 
HashSet<>()).addAll(resource.getHistoricalVersions());
+        partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new 
HashSet<>())
+            .addAll(resource.getHistoricalVersions());
         updatePartitionFileVersion(partitionNum, 
Collections.max(resource.getHistoricalVersions()));
       }
 
@@ -317,7 +317,6 @@ public class StorageGroupProcessor {
       throw new StorageGroupProcessorException(e);
     }
 
-
     for (TsFileResource resource : sequenceFileTreeSet) {
       long timePartitionId = resource.getTimePartition();
       Map<String, Long> endTimeMap = new HashMap<>();
@@ -346,11 +345,11 @@ public class StorageGroupProcessor {
   /**
    * use old seq file to update latestTimeForEachDevice, 
globalLatestFlushedTimeForEachDevice,
    * partitionLatestFlushedTimeForEachDevice and 
timePartitionIdVersionControllerMap
-   *
    */
   private void updateLastestFlushedTime() throws IOException {
 
-    VersionController versionController = new 
SimpleFileVersionController(storageGroupSysDir.getPath());
+    VersionController versionController = new SimpleFileVersionController(
+        storageGroupSysDir.getPath());
     long currentVersion = versionController.currVersion();
     for (TsFileResource resource : upgradeSeqFileList) {
       for (Entry<String, Integer> entry : 
resource.getDeviceToIndexMap().entrySet()) {
@@ -359,24 +358,27 @@ public class StorageGroupProcessor {
         long endTime = resource.getEndTime(index);
         long endTimePartitionId = StorageEngine.getTimePartition(endTime);
         latestTimeForEachDevice.computeIfAbsent(endTimePartitionId, l -> new 
HashMap<>())
-                .put(deviceId, endTime);
+            .put(deviceId, endTime);
         globalLatestFlushedTimeForEachDevice.put(deviceId, endTime);
 
         // set all the covered partition's LatestFlushedTime to Long.MAX_VALUE
         long partitionId = 
StorageEngine.getTimePartition(resource.getStartTime(index));
         while (partitionId <= endTimePartitionId) {
           partitionLatestFlushedTimeForEachDevice.computeIfAbsent(partitionId, 
l -> new HashMap<>())
-                  .put(deviceId, Long.MAX_VALUE);
+              .put(deviceId, Long.MAX_VALUE);
           if (!timePartitionIdVersionControllerMap.containsKey(partitionId)) {
-            File directory = 
SystemFileFactory.INSTANCE.getFile(storageGroupSysDir, 
String.valueOf(partitionId));
-            if(!directory.exists()){
+            File directory = SystemFileFactory.INSTANCE
+                .getFile(storageGroupSysDir, String.valueOf(partitionId));
+            if (!directory.exists()) {
               directory.mkdirs();
             }
-            File versionFile = SystemFileFactory.INSTANCE.getFile(directory, 
SimpleFileVersionController.FILE_PREFIX + currentVersion);
+            File versionFile = SystemFileFactory.INSTANCE
+                .getFile(directory, SimpleFileVersionController.FILE_PREFIX + 
currentVersion);
             if (!versionFile.createNewFile()) {
               logger.warn("Version file {} has already been created ", 
versionFile);
             }
-            timePartitionIdVersionControllerMap.put(partitionId, new 
SimpleFileVersionController(storageGroupSysDir.getPath(), partitionId));
+            timePartitionIdVersionControllerMap.put(partitionId,
+                new SimpleFileVersionController(storageGroupSysDir.getPath(), 
partitionId));
           }
           partitionId++;
         }
@@ -402,7 +404,8 @@ public class StorageGroupProcessor {
         });
   }
 
-  private Pair<List<TsFileResource>, List<TsFileResource>> 
getAllFiles(List<String> folders) throws IOException {
+  private Pair<List<TsFileResource>, List<TsFileResource>> 
getAllFiles(List<String> folders)
+      throws IOException {
     List<File> tsFiles = new ArrayList<>();
     List<File> upgradeFiles = new ArrayList<>();
     for (String baseDir : folders) {
@@ -420,9 +423,12 @@ public class StorageGroupProcessor {
       // the process was interrupted before the merged files could be named
       continueFailedRenames(fileFolder, MERGE_SUFFIX);
 
-      File[] oldTsfileArray = 
fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(), TSFILE_SUFFIX);
-      File[] oldResourceFileArray = 
fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(), 
TsFileResource.RESOURCE_SUFFIX);
-      File[] oldModificationFileArray = 
fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(), 
ModificationFile.FILE_SUFFIX);
+      File[] oldTsfileArray = fsFactory
+          .listFilesBySuffix(fileFolder.getAbsolutePath(), TSFILE_SUFFIX);
+      File[] oldResourceFileArray = fsFactory
+          .listFilesBySuffix(fileFolder.getAbsolutePath(), 
TsFileResource.RESOURCE_SUFFIX);
+      File[] oldModificationFileArray = fsFactory
+          .listFilesBySuffix(fileFolder.getAbsolutePath(), 
ModificationFile.FILE_SUFFIX);
       File upgradeFolder = fsFactory.getFile(fileFolder, 
IoTDBConstant.UPGRADE_FOLDER_NAME);
       // move the old files to upgrade folder if exists
       if (oldTsfileArray.length != 0 || oldResourceFileArray.length != 0) {
@@ -611,7 +617,8 @@ public class StorageGroupProcessor {
       long timePartitionId = 
StorageEngine.getTimePartition(insertPlan.getTime());
 
       latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new 
HashMap<>());
-      partitionLatestFlushedTimeForEachDevice.computeIfAbsent(timePartitionId, 
id -> new HashMap<>());
+      partitionLatestFlushedTimeForEachDevice
+          .computeIfAbsent(timePartitionId, id -> new HashMap<>());
 
       // insert to sequence or unSequence file
       insertToTsFileProcessor(insertPlan,
@@ -625,8 +632,9 @@ public class StorageGroupProcessor {
 
   /**
    * Insert a tablet (rows belonging to the same devices) into this storage 
group.
+   *
    * @param insertTabletPlan
-   * @throws WriteProcessException when update last cache failed
+   * @throws WriteProcessException   when update last cache failed
    * @throws BatchInsertionException if some of the rows failed to be inserted
    */
   public void insertTablet(InsertTabletPlan insertTabletPlan) throws 
WriteProcessException,
@@ -660,7 +668,8 @@ public class StorageGroupProcessor {
       // before is first start point
       int before = loc;
       // before time partition
-      long beforeTimePartition = 
StorageEngine.getTimePartition(insertTabletPlan.getTimes()[before]);
+      long beforeTimePartition = StorageEngine
+          .getTimePartition(insertTabletPlan.getTimes()[before]);
       // init map
       long lastFlushTime = partitionLatestFlushedTimeForEachDevice.
           computeIfAbsent(beforeTimePartition, id -> new HashMap<>()).
@@ -723,15 +732,15 @@ public class StorageGroupProcessor {
   }
 
   /**
-   * insert batch to tsfile processor thread-safety that the caller need to 
guarantee
-   * The rows to be inserted are in the range [start, end)
+   * insert batch to tsfile processor thread-safety that the caller need to 
guarantee The rows to be
+   * inserted are in the range [start, end)
    *
    * @param insertTabletPlan insert a tablet of a device
-   * @param sequence whether is sequence
-   * @param start start index of rows to be inserted in insertTabletPlan
-   * @param end end index of rows to be inserted in insertTabletPlan
-   * @param results result array
-   * @param timePartitionId time partition id
+   * @param sequence         whether is sequence
+   * @param start            start index of rows to be inserted in 
insertTabletPlan
+   * @param end              end index of rows to be inserted in 
insertTabletPlan
+   * @param results          result array
+   * @param timePartitionId  time partition id
    * @return false if any failure occurs when inserting the tablet, true 
otherwise
    */
   private boolean insertTabletToTsFileProcessor(InsertTabletPlan 
insertTabletPlan,
@@ -882,10 +891,10 @@ public class StorageGroupProcessor {
   /**
    * get processor from hashmap, flush oldest processor if necessary
    *
-   * @param timeRangeId time partition range
+   * @param timeRangeId            time partition range
    * @param tsFileProcessorTreeMap tsFileProcessorTreeMap
-   * @param fileList file list to add new processor
-   * @param sequence whether is sequence or not
+   * @param fileList               file list to add new processor
+   * @param sequence               whether is sequence or not
    */
   private TsFileProcessor getOrCreateTsFileProcessorIntern(long timeRangeId,
       TreeMap<Long, TsFileProcessor> tsFileProcessorTreeMap,
@@ -1279,7 +1288,7 @@ public class StorageGroupProcessor {
           tsfileResourcesForQuery.add(tsFileResource);
         } else {
           // left: in-memory data, right: meta of disk data
-          Pair<List<ReadOnlyMemChunk>, List<ChunkMetadata>> pair =
+          Pair<List<ReadOnlyMemChunk>, List<List<ChunkMetadata>>> pair =
               tsFileResource.getUnsealedFileProcessor()
                   .query(deviceId, measurementId, schema.getType(), 
schema.getEncodingType(),
                       schema.getProps(), context);
@@ -1287,7 +1296,17 @@ public class StorageGroupProcessor {
           tsfileResourcesForQuery.add(new 
TsFileResource(tsFileResource.getFile(),
               tsFileResource.getDeviceToIndexMap(),
               tsFileResource.getStartTimes(), tsFileResource.getEndTimes(), 
pair.left,
-              pair.right));
+              pair.right.get(0)));
+
+          List<TsFileResource> vmTsFileResourceList =
+              tsFileResource.getUnsealedFileProcessor().getVmTsFileResources();
+
+          for (int i = 1; i < pair.right.size(); i++) {
+            TsFileResource tmp = vmTsFileResourceList.get(i - 1);
+            tsfileResourcesForQuery.add(
+                new TsFileResource(tmp.getFile(), tmp.getDeviceToIndexMap(), 
tmp.getStartTimes(),
+                    tmp.getEndTimes(), pair.left, pair.right.get(i)));
+          }
         }
       } catch (IOException e) {
         throw new MetadataException(e);
@@ -1321,7 +1340,8 @@ public class StorageGroupProcessor {
 
     int deviceIndex = tsFileResource.getDeviceToIndexMap().get(deviceId);
     long startTime = tsFileResource.getStartTime(deviceIndex);
-    long endTime = tsFileResource.isClosed() || !isSeq ? 
tsFileResource.getEndTime(deviceIndex) : Long.MAX_VALUE;
+    long endTime = tsFileResource.isClosed() || !isSeq ? 
tsFileResource.getEndTime(deviceIndex)
+        : Long.MAX_VALUE;
 
     if (!isAlive(endTime)) {
       return false;
@@ -1338,9 +1358,9 @@ public class StorageGroupProcessor {
    * Delete data whose timestamp <= 'timestamp' and belongs to the time series
    * deviceId.measurementId.
    *
-   * @param deviceId the deviceId of the timeseries to be deleted.
+   * @param deviceId      the deviceId of the timeseries to be deleted.
    * @param measurementId the measurementId of the timeseries to be deleted.
-   * @param timestamp the delete range is (0, timestamp].
+   * @param timestamp     the delete range is (0, timestamp].
    */
   public void delete(String deviceId, String measurementId, long timestamp) 
throws IOException {
     // TODO: how to avoid partial deletion?
@@ -1395,7 +1415,8 @@ public class StorageGroupProcessor {
     }
   }
 
-  private void logDeletion(long timestamp, String deviceId, String 
measurementId, long timePartitionId)
+  private void logDeletion(long timestamp, String deviceId, String 
measurementId,
+      long timePartitionId)
       throws IOException {
     if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
       DeletePlan deletionPlan = new DeletePlan(timestamp, new Path(deviceId, 
measurementId));
@@ -1477,7 +1498,8 @@ public class StorageGroupProcessor {
       partitionLatestFlushedTimeForEachDevice
           .computeIfAbsent(processor.getTimeRangeId(), id -> new HashMap<>())
           .put(entry.getKey(), entry.getValue());
-      
updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(processor.getTimeRangeId(),
 entry.getKey(), entry.getValue());
+      
updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(processor.getTimeRangeId(),
+          entry.getKey(), entry.getValue());
       if (globalLatestFlushedTimeForEachDevice
           .getOrDefault(entry.getKey(), Long.MIN_VALUE) < entry.getValue()) {
         globalLatestFlushedTimeForEachDevice.put(entry.getKey(), 
entry.getValue());
@@ -1490,10 +1512,11 @@ public class StorageGroupProcessor {
   /**
    * used for upgrading
    */
-  public void updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(long 
partitionId, String deviceId, long time) {
+  public void updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(long 
partitionId,
+      String deviceId, long time) {
     newlyFlushedPartitionLatestFlushedTimeForEachDevice
-            .computeIfAbsent(partitionId, id -> new HashMap<>())
-            .compute(deviceId, (k, v) -> v == null ? time : Math.max(v, time));
+        .computeIfAbsent(partitionId, id -> new HashMap<>())
+        .compute(deviceId, (k, v) -> v == null ? time : Math.max(v, time));
   }
 
   /**
@@ -1547,8 +1570,8 @@ public class StorageGroupProcessor {
     for (TsFileResource resource : upgradedResources) {
       long partitionId = resource.getTimePartition();
       resource.getDeviceToIndexMap().forEach((device, index) ->
-        updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(partitionId, 
device,
-            resource.getEndTime(index))
+          
updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(partitionId, device,
+              resource.getEndTime(index))
       );
     }
     insertLock.writeLock().lock();
@@ -1747,6 +1770,7 @@ public class StorageGroupProcessor {
 
   /**
    * acquire the write locks of the resource and the merge lock
+   *
    * @param seqFile
    */
   private void doubleWriteLock(TsFileResource seqFile) {
@@ -1763,7 +1787,7 @@ public class StorageGroupProcessor {
         if (fileLockGot) {
           seqFile.writeUnlock();
         }
-        if(mergeLockGot) {
+        if (mergeLockGot) {
           mergeLock.writeLock().unlock();
         }
       }
@@ -1772,6 +1796,7 @@ public class StorageGroupProcessor {
 
   /**
    * release the write locks of the resource and the merge lock
+   *
    * @param seqFile
    */
   private void doubleWriteUnlock(TsFileResource seqFile) {
@@ -1798,7 +1823,7 @@ public class StorageGroupProcessor {
     mergeLock.writeLock().lock();
     try {
       if (loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, 
newTsFileResource,
-          newFilePartitionId)){
+          newFilePartitionId)) {
         updateLatestTimeMap(newTsFileResource);
       }
     } catch (DiskSpaceInsufficientException e) {
@@ -1865,7 +1890,8 @@ public class StorageGroupProcessor {
       long partitionNum = newTsFileResource.getTimePartition();
       partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new 
HashSet<>())
           .addAll(newTsFileResource.getHistoricalVersions());
-      updatePartitionFileVersion(partitionNum, 
Collections.max(newTsFileResource.getHistoricalVersions()));
+      updatePartitionFileVersion(partitionNum,
+          Collections.max(newTsFileResource.getHistoricalVersions()));
     } catch (DiskSpaceInsufficientException e) {
       logger.error(
           "Failed to append the tsfile {} to storage group processor {} 
because the disk space is insufficient.",
@@ -1880,11 +1906,13 @@ public class StorageGroupProcessor {
 
   /**
    * Set the version in "partition" to "version" if "version" is larger than 
the current version.
+   *
    * @param partition
    * @param version
    */
   public void setPartitionFileVersionToMax(long partition, long version) {
-    partitionMaxFileVersions.compute(partition, (prt, oldVer) -> 
computeMaxVersion(oldVer, version));
+    partitionMaxFileVersions
+        .compute(partition, (prt, oldVer) -> computeMaxVersion(oldVer, 
version));
   }
 
   private long computeMaxVersion(Long oldVersion, Long newVersion) {
@@ -1895,12 +1923,14 @@ public class StorageGroupProcessor {
   }
 
   /**
-   * Find the position of "newTsFileResource" in the sequence files if it can 
be inserted into them.
+   * Find the position of "newTsFileResource" in the sequence files if it can 
be inserted into
+   * them.
+   *
    * @param newTsFileResource
    * @param newFilePartitionId
-   * @return POS_ALREADY_EXIST(-2) if some file has the same name as the one 
to be inserted
-   *         POS_OVERLAP(-3) if some file overlaps the new file
-   *         an insertion position i >= -1 if the new file can be inserted 
between [i, i+1]
+   * @return POS_ALREADY_EXIST(- 2) if some file has the same name as the one 
to be inserted
+   * POS_OVERLAP(-3) if some file overlaps the new file an insertion position 
i >= -1 if the new
+   * file can be inserted between [i, i+1]
    */
   private int findInsertionPosition(TsFileResource newTsFileResource, long 
newFilePartitionId,
       List<TsFileResource> sequenceList) {
@@ -1941,11 +1971,11 @@ public class StorageGroupProcessor {
 
   /**
    * Compare each device in the two files to find the time relation of them.
+   *
    * @param fileA
    * @param fileB
-   * @return -1 if fileA is totally older than fileB (A < B)
-   *          0 if fileA is partially older than fileB and partially newer 
than fileB (A X B)
-   *          1 if fileA is totally newer than fileB (B < A)
+   * @return -1 if fileA is totally older than fileB (A < B) 0 if fileA is 
partially older than
+   * fileB and partially newer than fileB (A X B) 1 if fileA is totally newer 
than fileB (B < A)
    */
   private int compareTsFileDevices(TsFileResource fileA, TsFileResource fileB) 
{
     boolean hasPre = false, hasSubsequence = false;
@@ -1981,10 +2011,10 @@ public class StorageGroupProcessor {
   }
 
   /**
-   * If the historical versions of a file is a sub-set of the given file's, 
(close and) remove it to reduce
-   * unnecessary merge. Only used when the file sender and the receiver share 
the same file
-   * close policy.
-   * Warning: DO NOT REMOVE
+   * If the historical versions of a file is a sub-set of the given file's, 
(close and) remove it to
+   * reduce unnecessary merge. Only used when the file sender and the receiver 
share the same file
+   * close policy. Warning: DO NOT REMOVE
+   *
    * @param resource
    */
   @SuppressWarnings("unused")
@@ -2026,13 +2056,14 @@ public class StorageGroupProcessor {
 
   /**
    * remove the given tsFileResource. If the corresponding tsFileProcessor is 
in the working status,
-   * close it before remove the related resource files.
-   * maybe time-consuming for closing a tsfile.
+   * close it before remove the related resource files. maybe time-consuming 
for closing a tsfile.
+   *
    * @param tsFileResource
    * @param iterator
    * @param isSeq
    */
-  private void removeFullyOverlapFile(TsFileResource tsFileResource, 
Iterator<TsFileResource> iterator
+  private void removeFullyOverlapFile(TsFileResource tsFileResource,
+      Iterator<TsFileResource> iterator
       , boolean isSeq) {
     if (!tsFileResource.isClosed()) {
       // also remove the TsFileProcessor if the overlapped file is not closed
@@ -2073,9 +2104,9 @@ public class StorageGroupProcessor {
    * returns directly; otherwise, the time stamp is the mean of the timestamps 
of the two files, the
    * version number is the version number in the tsfile with a larger 
timestamp.
    *
-   * @param tsfileName origin tsfile name
-   * @param insertIndex the new file will be inserted between the files 
[insertIndex, insertIndex
-   *                   + 1]
+   * @param tsfileName  origin tsfile name
+   * @param insertIndex the new file will be inserted between the files 
[insertIndex, insertIndex +
+   *                    1]
    * @return appropriate filename
    */
   private String getFileNameForLoadingFile(String tsfileName, int insertIndex,
@@ -2139,12 +2170,12 @@ public class StorageGroupProcessor {
   /**
    * Execute the loading process by the type.
    *
-   * @param type load type
-   * @param tsFileResource tsfile resource to be loaded
+   * @param type            load type
+   * @param tsFileResource  tsfile resource to be loaded
    * @param filePartitionId the partition id of the new file
-   * @UsedBy sync module, load external tsfile module.
    * @return load the file successfully
    * @UsedBy sync module, load external tsfile module.
+   * @UsedBy sync module, load external tsfile module.
    */
   private boolean loadTsFileByType(LoadTsFileType type, File syncedTsFile,
       TsFileResource tsFileResource, long filePartitionId)
@@ -2153,7 +2184,8 @@ public class StorageGroupProcessor {
     switch (type) {
       case LOAD_UNSEQUENCE:
         targetFile = new 
File(DirectoryManager.getInstance().getNextFolderForUnSequenceFile(),
-            storageGroupName + File.separatorChar + filePartitionId + 
File.separator + tsFileResource
+            storageGroupName + File.separatorChar + filePartitionId + 
File.separator
+                + tsFileResource
                 .getFile().getName());
         tsFileResource.setFile(targetFile);
         if (unSequenceFileList.contains(tsFileResource)) {
@@ -2213,7 +2245,8 @@ public class StorageGroupProcessor {
     }
     partitionDirectFileVersions.computeIfAbsent(filePartitionId,
         p -> new HashSet<>()).addAll(tsFileResource.getHistoricalVersions());
-    updatePartitionFileVersion(filePartitionId, 
Collections.max(tsFileResource.getHistoricalVersions()));
+    updatePartitionFileVersion(filePartitionId,
+        Collections.max(tsFileResource.getHistoricalVersions()));
     return true;
   }
 
@@ -2354,6 +2387,7 @@ public class StorageGroupProcessor {
 
   @FunctionalInterface
   public interface CloseTsFileCallBack {
+
     void call(TsFileProcessor caller) throws TsFileProcessorException, 
IOException;
   }
 
@@ -2362,17 +2396,17 @@ public class StorageGroupProcessor {
   }
 
   /**
-   * Check if the data of "tsFileResource" all exist locally by comparing the 
historical versions
-   * in the partition of "partitionNumber". This is available only when the 
IoTDB instances which generated
-   * "tsFileResource" have the same close file policy as the local one.
-   * If one of the version in "tsFileResource" equals to a version of a 
working file, false is
-   * returned because "tsFileResource" may have unwritten data of that file.
+   * Check if the data of "tsFileResource" all exist locally by comparing the 
historical versions in
+   * the partition of "partitionNumber". This is available only when the IoTDB 
instances which
+   * generated "tsFileResource" have the same close file policy as the local 
one. If one of the
+   * version in "tsFileResource" equals to a version of a working file, false 
is returned because
+   * "tsFileResource" may have unwritten data of that file.
+   *
    * @param tsFileResource
    * @param partitionNum
    * @return true if the historicalVersions of "tsFileResource" is a subset of
-   * partitionDirectFileVersions, or false if it is not a subset and it 
contains any
-   * version of a working file
-   * USED by cluster module
+   * partitionDirectFileVersions, or false if it is not a subset and it 
contains any version of a
+   * working file USED by cluster module
    */
   public boolean isFileAlreadyExist(TsFileResource tsFileResource, long 
partitionNum) {
     // consider the case: The local node crashes when it is writing TsFile 
no.5.
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index bf38e63..af31bf4 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -67,7 +67,6 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
-import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
 import org.slf4j.Logger;
@@ -851,7 +850,7 @@ public class TsFileProcessor {
    * @param encoding encoding
    * @return left: the chunk data in memory; right: the chunkMetadatas of data 
on disk
    */
-  public Pair<List<ReadOnlyMemChunk>, List<ChunkMetadata>> query(String 
deviceId,
+  public Pair<List<ReadOnlyMemChunk>, List<List<ChunkMetadata>>> query(String 
deviceId,
       String measurementId, TSDataType dataType, TSEncoding encoding, 
Map<String, String> props,
       QueryContext context) {
     if (logger.isDebugEnabled()) {
@@ -883,18 +882,26 @@ public class TsFileProcessor {
       List<Modification> modifications = 
context.getPathModifications(modificationFile,
           deviceId + IoTDBConstant.PATH_SEPARATOR + measurementId);
 
+      List<List<ChunkMetadata>> rightResult = new ArrayList<>();
+
+      // get unseal tsfile data
       List<ChunkMetadata> chunkMetadataList = writer
           .getVisibleMetadataList(deviceId, measurementId, dataType);
-      for (RestorableTsFileIOWriter vmWriter : vmWriters) {
-        chunkMetadataList
-            .addAll(vmWriter.getVisibleMetadataList(deviceId, measurementId, 
dataType));
-      }
       QueryUtils.modifyChunkMetaData(chunkMetadataList,
           modifications);
-
       chunkMetadataList.removeIf(context::chunkNotSatisfy);
+      rightResult.add(chunkMetadataList);
 
-      return new Pair<>(readOnlyMemChunks, chunkMetadataList);
+      // get vm tsfile data
+      for (RestorableTsFileIOWriter vmWriter : vmWriters) {
+        chunkMetadataList = vmWriter.getVisibleMetadataList(deviceId, 
measurementId, dataType);
+        QueryUtils.modifyChunkMetaData(chunkMetadataList,
+            modifications);
+        chunkMetadataList.removeIf(context::chunkNotSatisfy);
+        rightResult.add(chunkMetadataList);
+      }
+
+      return new Pair<>(readOnlyMemChunks, rightResult);
     } catch (Exception e) {
       logger.error("{}: {} get ReadOnlyMemChunk has error", storageGroupName,
           tsFileResource.getFile().getName(), e);
@@ -927,4 +934,8 @@ public class TsFileProcessor {
       throw new TsFileProcessorException(e);
     }
   }
+
+  public List<TsFileResource> getVmTsFileResources() {
+    return vmTsFileResources;
+  }
 }

Reply via email to