This is an automated email from the ASF dual-hosted git repository. hxd pushed a commit to branch revert-2439-execute_compaction_after_sync_and_load in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d8f5b360dd41dd23623e869b014a36dc6f747abd Author: Xiangdong Huang <[email protected]> AuthorDate: Fri Jan 8 19:45:41 2021 +0800 Revert "Execute compaction after sync and load is called (#2439)" This reverts commit 38eb730d735a3e1f3b0beaa956a50e4ea37f9d50. --- .../engine/storagegroup/StorageGroupProcessor.java | 69 ++++++++++------------ 1 file changed, 32 insertions(+), 37 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index bfbd87a..fdf623a 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -819,11 +819,11 @@ public class StorageGroupProcessor { * inserted are in the range [start, end) * * @param insertTabletPlan insert a tablet of a device - * @param sequence whether is sequence - * @param start start index of rows to be inserted in insertTabletPlan - * @param end end index of rows to be inserted in insertTabletPlan - * @param results result array - * @param timePartitionId time partition id + * @param sequence whether is sequence + * @param start start index of rows to be inserted in insertTabletPlan + * @param end end index of rows to be inserted in insertTabletPlan + * @param results result array + * @param timePartitionId time partition id * @return false if any failure occurs when inserting the tablet, true otherwise */ private boolean insertTabletToTsFileProcessor(InsertTabletPlan insertTabletPlan, @@ -982,9 +982,9 @@ public class StorageGroupProcessor { /** * get processor from hashmap, flush oldest processor if necessary * - * @param timeRangeId time partition range + * @param timeRangeId time partition range * @param tsFileProcessorTreeMap tsFileProcessorTreeMap - * @param sequence whether is sequence or not + * @param sequence whether is sequence or not */ private TsFileProcessor getOrCreateTsFileProcessorIntern(long timeRangeId, TreeMap<Long, TsFileProcessor> tsFileProcessorTreeMap, @@ -1451,9 +1451,9 @@ public class StorageGroupProcessor { * Delete data whose timestamp <= 'timestamp' and belongs to the time series * deviceId.measurementId. * - * @param path the timeseries path of the to be deleted. + * @param path the timeseries path of the to be deleted. * @param startTime the startTime of delete range. - * @param endTime the endTime of delete range. + * @param endTime the endTime of delete range. */ public void delete(PartialPath path, long startTime, long endTime, long planIndex) throws IOException { @@ -1679,7 +1679,25 @@ public class StorageGroupProcessor { } logger.info("signal closing storage group condition in {}", storageGroupName); - executeCompaction(tsFileProcessor.getTimeRangeId()); + if (!compactionMergeWorking && !CompactionMergeTaskPoolManager.getInstance() + .isTerminated()) { + compactionMergeWorking = true; + logger.info("{} submit a compaction merge task", storageGroupName); + try { + // fork and filter current tsfile, then commit then to compaction merge + tsFileManagement.forkCurrentFileList(tsFileProcessor.getTimeRangeId()); + CompactionMergeTaskPoolManager.getInstance() + .submitTask( + tsFileManagement.new CompactionMergeTask(this::closeCompactionMergeCallBack, + tsFileProcessor.getTimeRangeId())); + } catch (IOException | RejectedExecutionException e) { + this.closeCompactionMergeCallBack(); + logger.error("{} compaction submit task failed", storageGroupName); + } + } else { + logger.info("{} last compaction merge task is working, skip current merge", + storageGroupName); + } } /** @@ -1862,29 +1880,6 @@ public class StorageGroupProcessor { tsFileManagement.writeUnlock(); writeUnlock(); } - - executeCompaction(newFilePartitionId); - } - - private void executeCompaction(long timePartition) { - if (!compactionMergeWorking && !CompactionMergeTaskPoolManager.getInstance() - .isTerminated()) { - compactionMergeWorking = true; - logger.info("{} submit a compaction merge task", storageGroupName); - try { - // fork and filter current tsfile, then commit then to compaction merge - tsFileManagement.forkCurrentFileList(timePartition); - CompactionMergeTaskPoolManager.getInstance().submitTask( - tsFileManagement.new CompactionMergeTask(this::closeCompactionMergeCallBack, - timePartition)); - } catch (IOException | RejectedExecutionException e) { - this.closeCompactionMergeCallBack(); - logger.error("{} compaction submit task failed", storageGroupName); - } - } else { - logger.info("{} last compaction merge task is working, skip current merge", - storageGroupName); - } } /** @@ -2086,9 +2081,9 @@ public class StorageGroupProcessor { * returns directly; otherwise, the time stamp is the mean of the timestamps of the two files, the * version number is the version number in the tsfile with a larger timestamp. * - * @param tsfileName origin tsfile name + * @param tsfileName origin tsfile name * @param insertIndex the new file will be inserted between the files [insertIndex, insertIndex + - * 1] + * 1] * @return appropriate filename */ private String getFileNameForLoadingFile(String tsfileName, int insertIndex, @@ -2154,8 +2149,8 @@ public class StorageGroupProcessor { /** * Execute the loading process by the type. * - * @param type load type - * @param tsFileResource tsfile resource to be loaded + * @param type load type + * @param tsFileResource tsfile resource to be loaded * @param filePartitionId the partition id of the new file * @return load the file successfully * @UsedBy sync module, load external tsfile module.
