This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 38eb730 Execute compaction after sync and load is called (#2439)
38eb730 is described below
commit 38eb730d735a3e1f3b0beaa956a50e4ea37f9d50
Author: zhanglingzhe0820 <[email protected]>
AuthorDate: Fri Jan 8 14:19:06 2021 +0800
Execute compaction after sync and load is called (#2439)
---
.../engine/storagegroup/StorageGroupProcessor.java | 69 ++++++++++++----------
1 file changed, 37 insertions(+), 32 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index fdf623a..bfbd87a 100755
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -819,11 +819,11 @@ public class StorageGroupProcessor {
* inserted are in the range [start, end)
*
* @param insertTabletPlan insert a tablet of a device
- * @param sequence whether is sequence
- * @param start start index of rows to be inserted in
insertTabletPlan
- * @param end end index of rows to be inserted in
insertTabletPlan
- * @param results result array
- * @param timePartitionId time partition id
+ * @param sequence whether is sequence
+ * @param start start index of rows to be inserted in insertTabletPlan
+ * @param end end index of rows to be inserted in insertTabletPlan
+ * @param results result array
+ * @param timePartitionId time partition id
* @return false if any failure occurs when inserting the tablet, true
otherwise
*/
private boolean insertTabletToTsFileProcessor(InsertTabletPlan
insertTabletPlan,
@@ -982,9 +982,9 @@ public class StorageGroupProcessor {
/**
* get processor from hashmap, flush oldest processor if necessary
*
- * @param timeRangeId time partition range
+ * @param timeRangeId time partition range
* @param tsFileProcessorTreeMap tsFileProcessorTreeMap
- * @param sequence whether is sequence or not
+ * @param sequence whether is sequence or not
*/
private TsFileProcessor getOrCreateTsFileProcessorIntern(long timeRangeId,
TreeMap<Long, TsFileProcessor> tsFileProcessorTreeMap,
@@ -1451,9 +1451,9 @@ public class StorageGroupProcessor {
* Delete data whose timestamp <= 'timestamp' and belongs to the time series
* deviceId.measurementId.
*
- * @param path the timeseries path of the to be deleted.
+ * @param path the timeseries path of the to be deleted.
* @param startTime the startTime of delete range.
- * @param endTime the endTime of delete range.
+ * @param endTime the endTime of delete range.
*/
public void delete(PartialPath path, long startTime, long endTime, long
planIndex)
throws IOException {
@@ -1679,25 +1679,7 @@ public class StorageGroupProcessor {
}
logger.info("signal closing storage group condition in {}",
storageGroupName);
- if (!compactionMergeWorking &&
!CompactionMergeTaskPoolManager.getInstance()
- .isTerminated()) {
- compactionMergeWorking = true;
- logger.info("{} submit a compaction merge task", storageGroupName);
- try {
- // fork and filter current tsfile, then commit then to compaction merge
- tsFileManagement.forkCurrentFileList(tsFileProcessor.getTimeRangeId());
- CompactionMergeTaskPoolManager.getInstance()
- .submitTask(
- tsFileManagement.new
CompactionMergeTask(this::closeCompactionMergeCallBack,
- tsFileProcessor.getTimeRangeId()));
- } catch (IOException | RejectedExecutionException e) {
- this.closeCompactionMergeCallBack();
- logger.error("{} compaction submit task failed", storageGroupName);
- }
- } else {
- logger.info("{} last compaction merge task is working, skip current
merge",
- storageGroupName);
- }
+ executeCompaction(tsFileProcessor.getTimeRangeId());
}
/**
@@ -1880,6 +1862,29 @@ public class StorageGroupProcessor {
tsFileManagement.writeUnlock();
writeUnlock();
}
+
+ executeCompaction(newFilePartitionId);
+ }
+
+ private void executeCompaction(long timePartition) {
+ if (!compactionMergeWorking &&
!CompactionMergeTaskPoolManager.getInstance()
+ .isTerminated()) {
+ compactionMergeWorking = true;
+ logger.info("{} submit a compaction merge task", storageGroupName);
+ try {
+ // fork and filter current tsfile, then commit then to compaction merge
+ tsFileManagement.forkCurrentFileList(timePartition);
+ CompactionMergeTaskPoolManager.getInstance().submitTask(
+ tsFileManagement.new
CompactionMergeTask(this::closeCompactionMergeCallBack,
+ timePartition));
+ } catch (IOException | RejectedExecutionException e) {
+ this.closeCompactionMergeCallBack();
+ logger.error("{} compaction submit task failed", storageGroupName);
+ }
+ } else {
+ logger.info("{} last compaction merge task is working, skip current
merge",
+ storageGroupName);
+ }
}
/**
@@ -2081,9 +2086,9 @@ public class StorageGroupProcessor {
* returns directly; otherwise, the time stamp is the mean of the timestamps
of the two files, the
* version number is the version number in the tsfile with a larger
timestamp.
*
- * @param tsfileName origin tsfile name
+ * @param tsfileName origin tsfile name
* @param insertIndex the new file will be inserted between the files
[insertIndex, insertIndex +
- * 1]
+ * 1]
* @return appropriate filename
*/
private String getFileNameForLoadingFile(String tsfileName, int insertIndex,
@@ -2149,8 +2154,8 @@ public class StorageGroupProcessor {
/**
* Execute the loading process by the type.
*
- * @param type load type
- * @param tsFileResource tsfile resource to be loaded
+ * @param type load type
+ * @param tsFileResource tsfile resource to be loaded
* @param filePartitionId the partition id of the new file
* @return load the file successfully
* @UsedBy sync module, load external tsfile module.