This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new dc4c694 Fix remove partition CI Bug (#3103)
dc4c694 is described below
commit dc4c6942891090b1a804776d3f50a29ac57d6b7f
Author: zhanglingzhe0820 <[email protected]>
AuthorDate: Thu Apr 29 15:25:47 2021 +0800
Fix remove partition CI Bug (#3103)
---
.../org/apache/iotdb/db/engine/StorageEngine.java | 4 +-
.../compaction/CompactionMergeTaskPoolManager.java | 38 ++-
.../db/engine/compaction/TsFileManagement.java | 19 +-
.../level/LevelCompactionTsFileManagement.java | 274 ++++++++++++---------
.../no/NoCompactionTsFileManagement.java | 132 ++++++----
.../iotdb/db/engine/merge/task/MergeFileTask.java | 14 +-
.../engine/storagegroup/StorageGroupProcessor.java | 44 ++--
.../compaction/LevelCompactionCacheTest.java | 2 +-
.../engine/compaction/LevelCompactionLogTest.java | 2 +-
.../compaction/LevelCompactionMergeTest.java | 6 +-
.../compaction/LevelCompactionMoreDataTest.java | 2 +-
.../LevelCompactionTsFileManagementTest.java | 69 ++++++
.../NoCompactionTsFileManagementTest.java | 71 +++++-
13 files changed, 457 insertions(+), 220 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 0f2eed3..e7ed671 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -934,13 +934,13 @@ public class StorageEngine implements IService {
set.stream()
.sorted(Comparator.comparing(StorageGroupProcessor::getVirtualStorageGroupId))
.collect(Collectors.toList());
- list.forEach(storageGroupProcessor ->
storageGroupProcessor.getTsFileManagement().readLock());
+ list.forEach(StorageGroupProcessor::readLock);
return list;
}
/** unlock all merge lock of the storage group processor related to the
query */
public void mergeUnLock(List<StorageGroupProcessor> list) {
- list.forEach(storageGroupProcessor ->
storageGroupProcessor.getTsFileManagement().readUnLock());
+ list.forEach(StorageGroupProcessor::readUnlock);
}
static class InstanceHolder {
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
index 5b67d9e..cdf6f4a 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
@@ -32,7 +32,15 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
@@ -46,6 +54,7 @@ public class CompactionMergeTaskPoolManager implements
IService {
private static final CompactionMergeTaskPoolManager INSTANCE =
new CompactionMergeTaskPoolManager();
private ExecutorService pool;
+ private Map<String, Set<Future<Void>>> storageGroupTasks = new
ConcurrentHashMap<>();
public static CompactionMergeTaskPoolManager getInstance() {
return INSTANCE;
@@ -68,6 +77,7 @@ public class CompactionMergeTaskPoolManager implements
IService {
pool.shutdownNow();
logger.info("Waiting for task pool to shut down");
waitTermination();
+ storageGroupTasks.clear();
}
}
@@ -77,6 +87,7 @@ public class CompactionMergeTaskPoolManager implements
IService {
awaitTermination(pool, milliseconds);
logger.info("Waiting for task pool to shut down");
waitTermination();
+ storageGroupTasks.clear();
}
}
@@ -103,6 +114,7 @@ public class CompactionMergeTaskPoolManager implements
IService {
}
}
}
+ storageGroupTasks.clear();
logger.info("All compaction task finish");
}
}
@@ -127,6 +139,7 @@ public class CompactionMergeTaskPoolManager implements
IService {
}
}
pool = null;
+ storageGroupTasks.clear();
logger.info("CompactionManager stopped");
}
@@ -146,9 +159,30 @@ public class CompactionMergeTaskPoolManager implements
IService {
return ServiceType.COMPACTION_SERVICE;
}
- public void submitTask(Runnable compactionMergeTask) throws
RejectedExecutionException {
+ public void submitTask(String storageGroupName, Callable<Void>
compactionMergeTask)
+ throws RejectedExecutionException {
if (pool != null && !pool.isTerminated()) {
- pool.submit(compactionMergeTask);
+ Future<Void> future = pool.submit(compactionMergeTask);
+ storageGroupTasks
+ .computeIfAbsent(storageGroupName, k -> new
ConcurrentSkipListSet<>())
+ .add(future);
+ }
+ }
+
+ /**
+ * Abort all compactions of a storage group. The caller must acquire the
write lock of the
+ * corresponding storage group.
+ */
+ public void abortCompaction(String storageGroup) {
+ Set<Future<Void>> subTasks =
+ storageGroupTasks.getOrDefault(storageGroup, Collections.emptySet());
+ Iterator<Future<Void>> subIterator = subTasks.iterator();
+ while (subIterator.hasNext()) {
+ Future<Void> next = subIterator.next();
+ if (!next.isDone() && !next.isCancelled()) {
+ next.cancel(true);
+ }
+ subIterator.remove();
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index a483f73..3b68e37 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -44,6 +44,7 @@ import java.io.IOException;
import java.nio.file.Files;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.Callable;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -129,11 +130,11 @@ public abstract class TsFileManagement {
/** fork current TsFile list (call this before merge) */
public abstract void forkCurrentFileList(long timePartition) throws
IOException;
- public void readLock() {
+ protected void readLock() {
compactionMergeLock.readLock().lock();
}
- public void readUnLock() {
+ protected void readUnLock() {
compactionMergeLock.readLock().unlock();
}
@@ -151,7 +152,7 @@ public abstract class TsFileManagement {
protected abstract void merge(long timePartition);
- public class CompactionMergeTask implements Runnable {
+ public class CompactionMergeTask implements Callable<Void> {
private CloseCompactionMergeCallBack closeCompactionMergeCallBack;
private long timePartitionId;
@@ -163,13 +164,14 @@ public abstract class TsFileManagement {
}
@Override
- public void run() {
+ public Void call() {
merge(timePartitionId);
closeCompactionMergeCallBack.call();
+ return null;
}
}
- public class CompactionRecoverTask implements Runnable {
+ public class CompactionRecoverTask implements Callable<Void> {
private CloseCompactionMergeCallBack closeCompactionMergeCallBack;
@@ -178,9 +180,10 @@ public abstract class TsFileManagement {
}
@Override
- public void run() {
+ public Void call() {
recover();
closeCompactionMergeCallBack.call();
+ return null;
}
}
@@ -382,8 +385,8 @@ public abstract class TsFileManagement {
List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles, File
mergeLog) {
logger.info("{} a merge task is ending...", storageGroupName);
- if (unseqFiles.isEmpty()) {
- // merge runtime exception arose, just end this merge
+ if (Thread.currentThread().isInterrupted() || unseqFiles.isEmpty()) {
+ // merge task abort, or merge runtime exception arose, just end this
merge
isUnseqMerging = false;
logger.info("{} a merge task abnormally ends", storageGroupName);
return;
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 5bd7cbc..61f928d 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -43,7 +43,7 @@ import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -51,8 +51,6 @@ import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.CopyOnWriteArrayList;
import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
import static
org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.COMPACTION_LOG_NAME;
@@ -80,14 +78,12 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
// First map is partition list; Second list is level list; Third list is
file list in level;
private final Map<Long, List<SortedSet<TsFileResource>>>
sequenceTsFileResources =
- new ConcurrentSkipListMap<>();
- private final Map<Long, List<List<TsFileResource>>>
unSequenceTsFileResources =
- new ConcurrentSkipListMap<>();
+ new HashMap<>();
+ private final Map<Long, List<List<TsFileResource>>>
unSequenceTsFileResources = new HashMap<>();
private final List<List<TsFileResource>> forkedSequenceTsFileResources = new
ArrayList<>();
private final List<List<TsFileResource>> forkedUnSequenceTsFileResources =
new ArrayList<>();
- private final List<TsFileResource> sequenceRecoverTsFileResources = new
CopyOnWriteArrayList<>();
- private final List<TsFileResource> unSequenceRecoverTsFileResources =
- new CopyOnWriteArrayList<>();
+ private final List<TsFileResource> sequenceRecoverTsFileResources = new
ArrayList<>();
+ private final List<TsFileResource> unSequenceRecoverTsFileResources = new
ArrayList<>();
public LevelCompactionTsFileManagement(String storageGroupName, String
storageGroupDir) {
super(storageGroupName, storageGroupDir);
@@ -139,17 +135,13 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
if (sequence) {
if (sequenceTsFileResources.containsKey(timePartitionId)) {
if (sequenceTsFileResources.get(timePartitionId).size() > level) {
- synchronized (sequenceTsFileResources) {
-
sequenceTsFileResources.get(timePartitionId).get(level).removeAll(mergeTsFiles);
- }
+
sequenceTsFileResources.get(timePartitionId).get(level).removeAll(mergeTsFiles);
}
}
} else {
if (unSequenceTsFileResources.containsKey(timePartitionId)) {
if (unSequenceTsFileResources.get(timePartitionId).size() > level) {
- synchronized (unSequenceTsFileResources) {
-
unSequenceTsFileResources.get(timePartitionId).get(level).removeAll(mergeTsFiles);
- }
+
unSequenceTsFileResources.get(timePartitionId).get(level).removeAll(mergeTsFiles);
}
}
}
@@ -173,82 +165,89 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
@Deprecated
@Override
public List<TsFileResource> getTsFileList(boolean sequence) {
- List<TsFileResource> result = new ArrayList<>();
- if (sequence) {
- synchronized (sequenceTsFileResources) {
+ readLock();
+ try {
+ List<TsFileResource> result = new ArrayList<>();
+ if (sequence) {
for (long timePartition : sequenceTsFileResources.keySet()) {
result.addAll(getTsFileListByTimePartition(true, timePartition));
}
- }
- } else {
- synchronized (unSequenceTsFileResources) {
+ } else {
for (long timePartition : unSequenceTsFileResources.keySet()) {
result.addAll(getTsFileListByTimePartition(false, timePartition));
}
}
+ return result;
+ } finally {
+ readUnLock();
}
- return result;
}
public List<TsFileResource> getTsFileListByTimePartition(boolean sequence,
long timePartition) {
- List<TsFileResource> result = new ArrayList<>();
- if (sequence) {
- synchronized (sequenceTsFileResources) {
+ readLock();
+ try {
+ List<TsFileResource> result = new ArrayList<>();
+ if (sequence) {
List<SortedSet<TsFileResource>> sequenceTsFileList =
sequenceTsFileResources.get(timePartition);
for (int i = sequenceTsFileList.size() - 1; i >= 0; i--) {
result.addAll(sequenceTsFileList.get(i));
}
- }
- } else {
- synchronized (unSequenceTsFileResources) {
+ } else {
List<List<TsFileResource>> unSequenceTsFileList =
unSequenceTsFileResources.get(timePartition);
for (int i = unSequenceTsFileList.size() - 1; i >= 0; i--) {
result.addAll(unSequenceTsFileList.get(i));
}
}
+ return result;
+ } finally {
+ readUnLock();
}
- return result;
}
@Override
public Iterator<TsFileResource> getIterator(boolean sequence) {
- return getTsFileList(sequence).iterator();
+ readLock();
+ try {
+ return getTsFileList(sequence).iterator();
+ } finally {
+ readUnLock();
+ }
}
@Override
public void remove(TsFileResource tsFileResource, boolean sequence) {
- if (sequence) {
- synchronized (sequenceTsFileResources) {
+ writeLock();
+ try {
+ if (sequence) {
for (SortedSet<TsFileResource> sequenceTsFileResource :
sequenceTsFileResources.get(tsFileResource.getTimePartition())) {
sequenceTsFileResource.remove(tsFileResource);
}
- }
- } else {
- synchronized (unSequenceTsFileResources) {
+ } else {
for (List<TsFileResource> unSequenceTsFileResource :
unSequenceTsFileResources.get(tsFileResource.getTimePartition())) {
unSequenceTsFileResource.remove(tsFileResource);
}
}
+ } finally {
+ writeUnlock();
}
}
@Override
public void removeAll(List<TsFileResource> tsFileResourceList, boolean
sequence) {
- if (sequence) {
- synchronized (sequenceTsFileResources) {
+ writeLock();
+ try {
+ if (sequence) {
for (List<SortedSet<TsFileResource>> partitionSequenceTsFileResource :
sequenceTsFileResources.values()) {
for (SortedSet<TsFileResource> levelTsFileResource :
partitionSequenceTsFileResource) {
levelTsFileResource.removeAll(tsFileResourceList);
}
}
- }
- } else {
- synchronized (unSequenceTsFileResources) {
+ } else {
for (List<List<TsFileResource>> partitionUnSequenceTsFileResource :
unSequenceTsFileResources.values()) {
for (List<TsFileResource> levelTsFileResource :
partitionUnSequenceTsFileResource) {
@@ -256,15 +255,18 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
}
}
}
+ } finally {
+ writeUnlock();
}
}
@Override
public void add(TsFileResource tsFileResource, boolean sequence) {
- long timePartitionId = tsFileResource.getTimePartition();
- int level = getMergeLevel(tsFileResource.getTsFile());
- if (sequence) {
- synchronized (sequenceTsFileResources) {
+ writeLock();
+ try {
+ long timePartitionId = tsFileResource.getTimePartition();
+ int level = getMergeLevel(tsFileResource.getTsFile());
+ if (sequence) {
if (level <= seqLevelNum - 1) {
// current file has normal level
sequenceTsFileResources
@@ -278,9 +280,7 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
.get(seqLevelNum - 1)
.add(tsFileResource);
}
- }
- } else {
- synchronized (unSequenceTsFileResources) {
+ } else {
if (level <= unseqLevelNum - 1) {
// current file has normal level
unSequenceTsFileResources
@@ -295,101 +295,124 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
.add(tsFileResource);
}
}
+ } finally {
+ writeUnlock();
}
}
@Override
public void addRecover(TsFileResource tsFileResource, boolean sequence) {
if (sequence) {
- synchronized (sequenceRecoverTsFileResources) {
- sequenceRecoverTsFileResources.add(tsFileResource);
- }
+ sequenceRecoverTsFileResources.add(tsFileResource);
} else {
- synchronized (unSequenceTsFileResources) {
- unSequenceRecoverTsFileResources.add(tsFileResource);
- }
+ unSequenceRecoverTsFileResources.add(tsFileResource);
}
}
@Override
public void addAll(List<TsFileResource> tsFileResourceList, boolean
sequence) {
- for (TsFileResource tsFileResource : tsFileResourceList) {
- add(tsFileResource, sequence);
+ writeLock();
+ try {
+ for (TsFileResource tsFileResource : tsFileResourceList) {
+ add(tsFileResource, sequence);
+ }
+ } finally {
+ writeUnlock();
}
}
@Override
public boolean contains(TsFileResource tsFileResource, boolean sequence) {
- if (sequence) {
- for (SortedSet<TsFileResource> sequenceTsFileResource :
- sequenceTsFileResources.computeIfAbsent(
- tsFileResource.getTimePartition(),
this::newSequenceTsFileResources)) {
- if (sequenceTsFileResource.contains(tsFileResource)) {
- return true;
+ readLock();
+ try {
+ if (sequence) {
+ for (SortedSet<TsFileResource> sequenceTsFileResource :
+ sequenceTsFileResources.computeIfAbsent(
+ tsFileResource.getTimePartition(),
this::newSequenceTsFileResources)) {
+ if (sequenceTsFileResource.contains(tsFileResource)) {
+ return true;
+ }
}
- }
- } else {
- for (List<TsFileResource> unSequenceTsFileResource :
- unSequenceTsFileResources.computeIfAbsent(
- tsFileResource.getTimePartition(),
this::newUnSequenceTsFileResources)) {
- if (unSequenceTsFileResource.contains(tsFileResource)) {
- return true;
+ } else {
+ for (List<TsFileResource> unSequenceTsFileResource :
+ unSequenceTsFileResources.computeIfAbsent(
+ tsFileResource.getTimePartition(),
this::newUnSequenceTsFileResources)) {
+ if (unSequenceTsFileResource.contains(tsFileResource)) {
+ return true;
+ }
}
}
+ return false;
+ } finally {
+ readUnLock();
}
- return false;
}
@Override
public void clear() {
- sequenceTsFileResources.clear();
- unSequenceTsFileResources.clear();
+ writeLock();
+ try {
+ sequenceTsFileResources.clear();
+ unSequenceTsFileResources.clear();
+ } finally {
+ writeUnlock();
+ }
}
@Override
@SuppressWarnings("squid:S3776")
public boolean isEmpty(boolean sequence) {
- if (sequence) {
- for (List<SortedSet<TsFileResource>> partitionSequenceTsFileResource :
- sequenceTsFileResources.values()) {
- for (SortedSet<TsFileResource> sequenceTsFileResource :
partitionSequenceTsFileResource) {
- if (!sequenceTsFileResource.isEmpty()) {
- return false;
+ readLock();
+ try {
+ if (sequence) {
+ for (List<SortedSet<TsFileResource>> partitionSequenceTsFileResource :
+ sequenceTsFileResources.values()) {
+ for (SortedSet<TsFileResource> sequenceTsFileResource :
partitionSequenceTsFileResource) {
+ if (!sequenceTsFileResource.isEmpty()) {
+ return false;
+ }
}
}
- }
- } else {
- for (List<List<TsFileResource>> partitionUnSequenceTsFileResource :
- unSequenceTsFileResources.values()) {
- for (List<TsFileResource> unSequenceTsFileResource :
partitionUnSequenceTsFileResource) {
- if (!unSequenceTsFileResource.isEmpty()) {
- return false;
+ } else {
+ for (List<List<TsFileResource>> partitionUnSequenceTsFileResource :
+ unSequenceTsFileResources.values()) {
+ for (List<TsFileResource> unSequenceTsFileResource :
partitionUnSequenceTsFileResource) {
+ if (!unSequenceTsFileResource.isEmpty()) {
+ return false;
+ }
}
}
}
+ return true;
+ } finally {
+ readUnLock();
}
- return true;
}
@Override
public int size(boolean sequence) {
- int result = 0;
- if (sequence) {
- for (List<SortedSet<TsFileResource>> partitionSequenceTsFileResource :
- sequenceTsFileResources.values()) {
- for (int i = seqLevelNum - 1; i >= 0; i--) {
- result += partitionSequenceTsFileResource.get(i).size();
+ readLock();
+ try {
+ int result = 0;
+ if (sequence) {
+ for (List<SortedSet<TsFileResource>> partitionSequenceTsFileResource :
+ sequenceTsFileResources.values()) {
+ for (int i = seqLevelNum - 1; i >= 0; i--) {
+ result += partitionSequenceTsFileResource.get(i).size();
+ }
}
- }
- } else {
- for (List<List<TsFileResource>> partitionUnSequenceTsFileResource :
- unSequenceTsFileResources.values()) {
- for (int i = unseqLevelNum - 1; i >= 0; i--) {
- result += partitionUnSequenceTsFileResource.get(i).size();
+ } else {
+ for (List<List<TsFileResource>> partitionUnSequenceTsFileResource :
+ unSequenceTsFileResources.values()) {
+ for (int i = unseqLevelNum - 1; i >= 0; i--) {
+ result += partitionUnSequenceTsFileResource.get(i).size();
+ }
}
}
+ return result;
+ } finally {
+ readUnLock();
}
- return result;
}
/** recover files */
@@ -479,6 +502,10 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
// complete compaction and delete source file
writeLock();
try {
+ if (Thread.currentThread().isInterrupted()) {
+ throw new InterruptedException(
+ String.format("%s [Compaction] abort", storageGroupName));
+ }
int targetLevel = getMergeLevel(targetResource.getTsFile());
if (isSeq) {
sequenceTsFileResources.get(timePartition).get(targetLevel).add(targetResource);
@@ -499,7 +526,7 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
}
}
}
- } catch (IOException | IllegalPathException e) {
+ } catch (IOException | IllegalPathException | InterruptedException e) {
logger.error("recover level tsfile management error ", e);
} finally {
if (logFile.exists()) {
@@ -532,21 +559,22 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
@Override
public void forkCurrentFileList(long timePartition) {
- synchronized (sequenceTsFileResources) {
+ readLock();
+ try {
forkTsFileList(
forkedSequenceTsFileResources,
sequenceTsFileResources.computeIfAbsent(timePartition,
this::newSequenceTsFileResources),
seqLevelNum,
seqFileNumInEachLevel);
- }
- // we have to copy all unseq file
- synchronized (unSequenceTsFileResources) {
+ // we have to copy all unseq file
forkTsFileList(
forkedUnSequenceTsFileResources,
unSequenceTsFileResources.computeIfAbsent(
timePartition, this::newUnSequenceTsFileResources),
unseqLevelNum + 1,
unseqFileNumInEachLevel);
+ } finally {
+ readUnLock();
}
}
@@ -664,6 +692,11 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
toMergeTsFiles.size());
writeLock();
try {
+ if (Thread.currentThread().isInterrupted()) {
+ throw new InterruptedException(
+ String.format("%s [Compaction] abort", storageGroupName));
+ }
+
if (sequence) {
sequenceTsFileResources.get(timePartition).get(i +
1).add(newResource);
} else {
@@ -710,32 +743,31 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
}
private List<SortedSet<TsFileResource>> newSequenceTsFileResources(Long k) {
- List<SortedSet<TsFileResource>> newSequenceTsFileResources = new
CopyOnWriteArrayList<>();
+ List<SortedSet<TsFileResource>> newSequenceTsFileResources = new
ArrayList<>();
for (int i = 0; i < seqLevelNum; i++) {
newSequenceTsFileResources.add(
- Collections.synchronizedSortedSet(
- new TreeSet<>(
- (o1, o2) -> {
- try {
- int rangeCompare =
- Long.compare(
-
Long.parseLong(o1.getTsFile().getParentFile().getName()),
-
Long.parseLong(o2.getTsFile().getParentFile().getName()));
- return rangeCompare == 0
- ? compareFileName(o1.getTsFile(), o2.getTsFile())
- : rangeCompare;
- } catch (NumberFormatException e) {
- return compareFileName(o1.getTsFile(), o2.getTsFile());
- }
- })));
+ new TreeSet<>(
+ (o1, o2) -> {
+ try {
+ int rangeCompare =
+ Long.compare(
+
Long.parseLong(o1.getTsFile().getParentFile().getName()),
+
Long.parseLong(o2.getTsFile().getParentFile().getName()));
+ return rangeCompare == 0
+ ? compareFileName(o1.getTsFile(), o2.getTsFile())
+ : rangeCompare;
+ } catch (NumberFormatException e) {
+ return compareFileName(o1.getTsFile(), o2.getTsFile());
+ }
+ }));
}
return newSequenceTsFileResources;
}
private List<List<TsFileResource>> newUnSequenceTsFileResources(Long k) {
- List<List<TsFileResource>> newUnSequenceTsFileResources = new
CopyOnWriteArrayList<>();
+ List<List<TsFileResource>> newUnSequenceTsFileResources = new
ArrayList<>();
for (int i = 0; i < unseqLevelNum; i++) {
- newUnSequenceTsFileResources.add(new CopyOnWriteArrayList<>());
+ newUnSequenceTsFileResources.add(new ArrayList<>());
}
return newUnSequenceTsFileResources;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
index 6d9864d..5c3d8b1 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
@@ -50,65 +50,75 @@ public class NoCompactionTsFileManagement extends
TsFileManagement {
@Deprecated
@Override
public List<TsFileResource> getTsFileList(boolean sequence) {
- List<TsFileResource> result = new ArrayList<>();
- if (sequence) {
- synchronized (sequenceFileTreeSetMap) {
+ readLock();
+ try {
+ List<TsFileResource> result = new ArrayList<>();
+ if (sequence) {
for (TreeSet<TsFileResource> tsFileResourceTreeSet :
sequenceFileTreeSetMap.values()) {
result.addAll(tsFileResourceTreeSet);
}
- }
- } else {
- synchronized (unSequenceFileListMap) {
+ } else {
for (List<TsFileResource> tsFileResourceList :
unSequenceFileListMap.values()) {
result.addAll(tsFileResourceList);
}
}
+ return result;
+ } finally {
+ readUnLock();
}
- return result;
}
@Override
public List<TsFileResource> getTsFileListByTimePartition(boolean sequence,
long timePartition) {
- if (sequence) {
- synchronized (sequenceFileTreeSetMap) {
- return new
ArrayList<>(sequenceFileTreeSetMap.getOrDefault(timePartition, new
TreeSet<>()));
- }
- } else {
- synchronized (unSequenceFileListMap) {
+ readLock();
+ try {
+ if (sequence) {
+ return new ArrayList<>(
+ sequenceFileTreeSetMap.getOrDefault(timePartition,
newSequenceTsFileResources(0L)));
+ } else {
return new ArrayList<>(
unSequenceFileListMap.getOrDefault(timePartition,
Collections.emptyList()));
}
+ } finally {
+ readUnLock();
}
}
@Override
public Iterator<TsFileResource> getIterator(boolean sequence) {
- return getTsFileList(sequence).iterator();
+ readLock();
+ try {
+ return getTsFileList(sequence).iterator();
+ } finally {
+ readUnLock();
+ }
}
@Override
public void remove(TsFileResource tsFileResource, boolean sequence) {
- if (sequence) {
- synchronized (sequenceFileTreeSetMap) {
+ writeLock();
+ try {
+ if (sequence) {
TreeSet<TsFileResource> sequenceFileTreeSet =
sequenceFileTreeSetMap.get(tsFileResource.getTimePartition());
sequenceFileTreeSet.remove(tsFileResource);
- }
- } else {
- synchronized (unSequenceFileListMap) {
+ } else {
List<TsFileResource> unSequenceFileList =
unSequenceFileListMap.get(tsFileResource.getTimePartition());
unSequenceFileList.remove(tsFileResource);
}
+ } finally {
+ writeUnlock();
}
}
@Override
public void removeAll(List<TsFileResource> tsFileResourceList, boolean
sequence) {
- if (tsFileResourceList.size() > 0) {
- tsFileResourceList.sort((o1, o2) -> (int) (o1.getTimePartition() -
o2.getTimePartition()));
- if (sequence) {
- synchronized (sequenceFileTreeSetMap) {
+ writeLock();
+ try {
+ if (tsFileResourceList.size() > 0) {
+ tsFileResourceList.sort((o1, o2) -> (int) (o1.getTimePartition() -
o2.getTimePartition()));
+ if (sequence) {
long currTimePartition =
tsFileResourceList.get(0).getTimePartition();
int startIndex = 0;
for (int i = 1; i < tsFileResourceList.size(); i++) {
@@ -124,9 +134,7 @@ public class NoCompactionTsFileManagement extends
TsFileManagement {
sequenceFileTreeSetMap
.get(currTimePartition)
.removeAll(tsFileResourceList.subList(startIndex,
tsFileResourceList.size()));
- }
- } else {
- synchronized (unSequenceFileListMap) {
+ } else {
long currTimePartition =
tsFileResourceList.get(0).getTimePartition();
int startIndex = 0;
for (int i = 1; i < tsFileResourceList.size(); i++) {
@@ -144,24 +152,27 @@ public class NoCompactionTsFileManagement extends
TsFileManagement {
.removeAll(tsFileResourceList.subList(startIndex,
tsFileResourceList.size()));
}
}
+ } finally {
+ writeUnlock();
}
}
@Override
public void add(TsFileResource tsFileResource, boolean sequence) {
- long timePartitionId = tsFileResource.getTimePartition();
- if (sequence) {
- synchronized (sequenceFileTreeSetMap) {
+ writeLock();
+ try {
+ long timePartitionId = tsFileResource.getTimePartition();
+ if (sequence) {
sequenceFileTreeSetMap
.computeIfAbsent(timePartitionId, this::newSequenceTsFileResources)
.add(tsFileResource);
- }
- } else {
- synchronized (unSequenceFileListMap) {
+ } else {
unSequenceFileListMap
.computeIfAbsent(timePartitionId,
this::newUnSequenceTsFileResources)
.add(tsFileResource);
}
+ } finally {
+ writeUnlock();
}
}
@@ -172,73 +183,86 @@ public class NoCompactionTsFileManagement extends
TsFileManagement {
@Override
public void addAll(List<TsFileResource> tsFileResourceList, boolean
sequence) {
- for (TsFileResource tsFileResource : tsFileResourceList) {
- add(tsFileResource, sequence);
+ writeLock();
+ try {
+ for (TsFileResource tsFileResource : tsFileResourceList) {
+ add(tsFileResource, sequence);
+ }
+ } finally {
+ writeUnlock();
}
}
@Override
public boolean contains(TsFileResource tsFileResource, boolean sequence) {
- if (sequence) {
- synchronized (sequenceFileTreeSetMap) {
+ readLock();
+ try {
+ if (sequence) {
return sequenceFileTreeSetMap
.getOrDefault(tsFileResource.getTimePartition(),
newSequenceTsFileResources(0L))
.contains(tsFileResource);
- }
- } else {
- synchronized (unSequenceFileListMap) {
+ } else {
return unSequenceFileListMap
.getOrDefault(tsFileResource.getTimePartition(), new ArrayList<>())
.contains(tsFileResource);
}
+ } finally {
+ readUnLock();
}
}
@Override
public void clear() {
- sequenceFileTreeSetMap.clear();
- unSequenceFileListMap.clear();
+ writeLock();
+ try {
+ sequenceFileTreeSetMap.clear();
+ unSequenceFileListMap.clear();
+ } finally {
+ writeUnlock();
+ }
}
@Override
public boolean isEmpty(boolean sequence) {
- if (sequence) {
- synchronized (sequenceFileTreeSetMap) {
+ readLock();
+ try {
+ if (sequence) {
for (Set<TsFileResource> sequenceFileTreeSet :
sequenceFileTreeSetMap.values()) {
if (!sequenceFileTreeSet.isEmpty()) {
return false;
}
}
- }
- } else {
- synchronized (unSequenceFileListMap) {
+ } else {
for (List<TsFileResource> unSequenceFileList :
unSequenceFileListMap.values()) {
if (!unSequenceFileList.isEmpty()) {
return false;
}
}
}
+ return true;
+ } finally {
+ readUnLock();
}
- return true;
}
@Override
public int size(boolean sequence) {
- int result = 0;
- if (sequence) {
- synchronized (sequenceFileTreeSetMap) {
+ readLock();
+ try {
+ int result = 0;
+ if (sequence) {
for (Set<TsFileResource> sequenceFileTreeSet :
sequenceFileTreeSetMap.values()) {
result += sequenceFileTreeSet.size();
}
- }
- } else {
- synchronized (unSequenceFileListMap) {
+ } else {
for (List<TsFileResource> unSequenceFileList :
unSequenceFileListMap.values()) {
result += unSequenceFileList.size();
}
}
+ return result;
+ } finally {
+ readUnLock();
}
- return result;
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
index 15b5176..0c4241b 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
@@ -164,6 +164,10 @@ public class MergeFileTask {
seqFile.writeLock();
try {
+ if (Thread.currentThread().isInterrupted()) {
+ return;
+ }
+
FileReaderManager.getInstance().closeFileAndRemoveReader(seqFile.getTsFilePath());
resource.removeFileReader(seqFile);
@@ -338,16 +342,20 @@ public class MergeFileTask {
}
updateStartTimeAndEndTime(seqFile, fileWriter);
resource.removeFileReader(seqFile);
-
FileReaderManager.getInstance().closeFileAndRemoveReader(seqFile.getTsFilePath());
fileWriter.endFile();
updatePlanIndexes(seqFile);
- mergeLogger.logFileMergeEnd();
- logger.debug("{} moved unmerged chunks of {} to the new file", taskName,
seqFile);
seqFile.writeLock();
try {
+ if (Thread.currentThread().isInterrupted()) {
+ return;
+ }
+
seqFile.serialize();
+ mergeLogger.logFileMergeEnd();
+ logger.debug("{} moved unmerged chunks of {} to the new file", taskName,
seqFile);
+
FileReaderManager.getInstance().closeFileAndRemoveReader(seqFile.getTsFilePath());
// change tsFile name
seqFile.getTsFile().delete();
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 2282543..e0e7589 100755
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -515,6 +515,7 @@ public class StorageGroupProcessor {
try {
CompactionMergeTaskPoolManager.getInstance()
.submitTask(
+ logicalStorageGroupName,
tsFileManagement.new
CompactionRecoverTask(this::closeCompactionMergeCallBack));
} catch (RejectedExecutionException e) {
this.closeCompactionMergeCallBack();
@@ -1524,7 +1525,7 @@ public class StorageGroupProcessor {
QueryFileManager filePathsManager,
Filter timeFilter)
throws QueryProcessException {
- insertLock.readLock().lock();
+ readLock();
try {
List<TsFileResource> seqResources =
getFileResourceListForQuery(
@@ -1556,10 +1557,18 @@ public class StorageGroupProcessor {
} catch (MetadataException e) {
throw new QueryProcessException(e);
} finally {
- insertLock.readLock().unlock();
+ readUnlock();
}
}
+ public void readLock() {
+ insertLock.readLock().lock();
+ }
+
+ public void readUnlock() {
+ insertLock.readLock().unlock();
+ }
+
public void writeLock() {
insertLock.writeLock().lock();
}
@@ -1661,7 +1670,6 @@ public class StorageGroupProcessor {
// TODO: how to avoid partial deletion?
// FIXME: notice that if we may remove a SGProcessor out of memory, we
need to close all opened
// mod files in mergingModification, sequenceFileList, and
unsequenceFileList
- tsFileManagement.readLock();
writeLock();
// record files which are updated so that we can roll back them in case of
exception
@@ -1704,7 +1712,6 @@ public class StorageGroupProcessor {
throw new IOException(e);
} finally {
writeUnlock();
- tsFileManagement.readUnLock();
}
}
@@ -1944,6 +1951,7 @@ public class StorageGroupProcessor {
tsFileManagement.setForceFullMerge(fullMerge);
CompactionMergeTaskPoolManager.getInstance()
.submitTask(
+ logicalStorageGroupName,
tsFileManagement
.new CompactionMergeTask(this::closeCompactionMergeCallBack,
timePartition));
} catch (IOException | RejectedExecutionException e) {
@@ -2001,14 +2009,12 @@ public class StorageGroupProcessor {
upgradeFileCount.getAndAdd(-1);
// load all upgraded resources in this sg to tsFileManagement
if (upgradeFileCount.get() == 0) {
- tsFileManagement.writeLock();
writeLock();
try {
loadUpgradedResources(upgradeSeqFileList, true);
loadUpgradedResources(upgradeUnseqFileList, false);
} finally {
writeUnlock();
- tsFileManagement.writeUnlock();
}
// after upgrade complete, update partitionLatestFlushedTimeForEachDevice
for (Entry<Long, Map<String, Long>> entry :
@@ -2089,7 +2095,6 @@ public class StorageGroupProcessor {
public void loadNewTsFileForSync(TsFileResource newTsFileResource) throws
LoadFileException {
File tsfileToBeInserted = newTsFileResource.getTsFile();
long newFilePartitionId = newTsFileResource.getTimePartitionWithCheck();
- tsFileManagement.writeLock();
writeLock();
try {
if (loadTsFileByType(
@@ -2113,7 +2118,6 @@ public class StorageGroupProcessor {
throw new LoadFileException(e);
} finally {
writeUnlock();
- tsFileManagement.writeUnlock();
}
}
@@ -2161,7 +2165,6 @@ public class StorageGroupProcessor {
public void loadNewTsFile(TsFileResource newTsFileResource) throws
LoadFileException {
File tsfileToBeInserted = newTsFileResource.getTsFile();
long newFilePartitionId = newTsFileResource.getTimePartitionWithCheck();
- tsFileManagement.writeLock();
writeLock();
try {
List<TsFileResource> sequenceList = tsFileManagement.getTsFileList(true);
@@ -2222,7 +2225,6 @@ public class StorageGroupProcessor {
throw new LoadFileException(e);
} finally {
writeUnlock();
- tsFileManagement.writeUnlock();
}
}
@@ -2612,7 +2614,6 @@ public class StorageGroupProcessor {
* module.
*/
public boolean deleteTsfile(File tsfieToBeDeleted) {
- tsFileManagement.writeLock();
writeLock();
TsFileResource tsFileResourceToBeDeleted = null;
try {
@@ -2638,7 +2639,6 @@ public class StorageGroupProcessor {
}
} finally {
writeUnlock();
- tsFileManagement.writeUnlock();
}
if (tsFileResourceToBeDeleted == null) {
return false;
@@ -2668,7 +2668,6 @@ public class StorageGroupProcessor {
* @return whether the file to be moved exists. @UsedBy load external tsfile
module.
*/
public boolean moveTsfile(File fileToBeMoved, File targetDir) {
- tsFileManagement.writeLock();
writeLock();
TsFileResource tsFileResourceToBeMoved = null;
try {
@@ -2694,7 +2693,6 @@ public class StorageGroupProcessor {
}
} finally {
writeUnlock();
- tsFileManagement.writeUnlock();
}
if (tsFileResourceToBeMoved == null) {
return false;
@@ -2801,22 +2799,21 @@ public class StorageGroupProcessor {
/** remove all partitions that satisfy a filter. */
public void removePartitions(TimePartitionFilter filter) {
// this requires blocking all other activities
- tsFileManagement.writeLock();
- insertLock.writeLock().lock();
+ writeLock();
try {
- // abort ongoing merges
+ // abort ongoing comapctions and merges
+
CompactionMergeTaskPoolManager.getInstance().abortCompaction(logicalStorageGroupName);
MergeManager.getINSTANCE().abortMerge(logicalStorageGroupName);
// close all working files that should be removed
removePartitions(filter, workSequenceTsFileProcessors.entrySet());
removePartitions(filter, workUnsequenceTsFileProcessors.entrySet());
// remove data files
- removePartitions(filter, tsFileManagement.getIterator(true));
- removePartitions(filter, tsFileManagement.getIterator(false));
+ removePartitions(filter, tsFileManagement.getIterator(true), true);
+ removePartitions(filter, tsFileManagement.getIterator(false), false);
} finally {
- insertLock.writeLock().unlock();
- tsFileManagement.writeUnlock();
+ writeUnlock();
}
}
@@ -2840,12 +2837,13 @@ public class StorageGroupProcessor {
}
// may remove the iterator's data
- private void removePartitions(TimePartitionFilter filter,
Iterator<TsFileResource> iterator) {
+ private void removePartitions(
+ TimePartitionFilter filter, Iterator<TsFileResource> iterator, boolean
sequence) {
while (iterator.hasNext()) {
TsFileResource tsFileResource = iterator.next();
if (filter.satisfy(logicalStorageGroupName,
tsFileResource.getTimePartition())) {
tsFileResource.remove();
- iterator.remove();
+ tsFileManagement.remove(tsFileResource, sequence);
updateLatestFlushTimeToPartition(tsFileResource.getTimePartition(),
Long.MIN_VALUE);
logger.debug("{} is removed during deleting partitions",
tsFileResource.getTsFilePath());
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionCacheTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionCacheTest.java
index bd02b4f..46c2771 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionCacheTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionCacheTest.java
@@ -97,7 +97,7 @@ public class LevelCompactionCacheTest extends
LevelCompactionTest {
levelCompactionTsFileManagement
.new CompactionMergeTask(this::closeCompactionMergeCallBack, 0);
compactionMergeWorking = true;
- compactionMergeTask.run();
+ compactionMergeTask.call();
while (compactionMergeWorking) {
// wait
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionLogTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionLogTest.java
index c39ab3a..d6d9c99 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionLogTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionLogTest.java
@@ -69,7 +69,7 @@ public class LevelCompactionLogTest extends
LevelCompactionTest {
levelCompactionTsFileManagement
.new CompactionMergeTask(this::closeCompactionMergeCallBack, 0);
compactionMergeWorking = true;
- compactionMergeTask.run();
+ compactionMergeTask.call();
while (compactionMergeWorking) {
// wait
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
index 4e2d65c..3d9aecd 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
@@ -82,7 +82,7 @@ public class LevelCompactionMergeTest extends
LevelCompactionTest {
levelCompactionTsFileManagement
.new CompactionMergeTask(this::closeCompactionMergeCallBack, 0);
compactionMergeWorking = true;
- compactionMergeTask.run();
+ compactionMergeTask.call();
while (compactionMergeWorking) {
// wait
}
@@ -126,7 +126,7 @@ public class LevelCompactionMergeTest extends
LevelCompactionTest {
levelCompactionTsFileManagement
.new CompactionMergeTask(this::closeCompactionMergeCallBack, 0);
compactionMergeWorking = true;
- compactionMergeTask.run();
+ compactionMergeTask.call();
while (compactionMergeWorking) {
// wait
}
@@ -191,7 +191,7 @@ public class LevelCompactionMergeTest extends
LevelCompactionTest {
levelCompactionTsFileManagement
.new CompactionMergeTask(this::closeCompactionMergeCallBack, 0);
compactionMergeWorking = true;
- compactionMergeTask.run();
+ compactionMergeTask.call();
while (compactionMergeWorking) {
// wait
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMoreDataTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMoreDataTest.java
index 55df625..1411eac 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMoreDataTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMoreDataTest.java
@@ -203,7 +203,7 @@ public class LevelCompactionMoreDataTest extends
LevelCompactionTest {
levelCompactionTsFileManagement
.new CompactionMergeTask(this::closeCompactionMergeCallBack, 0);
compactionMergeWorking = true;
- compactionMergeTask.run();
+ compactionMergeTask.call();
while (compactionMergeWorking) {
// wait
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTsFileManagementTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTsFileManagementTest.java
index 4dc40b4..7375bad 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTsFileManagementTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTsFileManagementTest.java
@@ -156,4 +156,73 @@ public class LevelCompactionTsFileManagementTest extends
LevelCompactionTest {
assertEquals(0, levelCompactionTsFileManagement.size(true));
assertEquals(0, levelCompactionTsFileManagement.size(false));
}
+
+ @Test
+ public void testIteratorRemove() {
+ LevelCompactionTsFileManagement levelCompactionTsFileManagement =
+ new LevelCompactionTsFileManagement(COMPACTION_TEST_SG,
tempSGDir.getPath());
+ for (TsFileResource tsFileResource : seqResources) {
+ levelCompactionTsFileManagement.add(tsFileResource, true);
+ }
+ levelCompactionTsFileManagement.addAll(seqResources, false);
+ assertEquals(6,
levelCompactionTsFileManagement.getTsFileList(true).size());
+
+ Iterator<TsFileResource> tsFileResourceIterator =
+ levelCompactionTsFileManagement.getIterator(true);
+ tsFileResourceIterator.next();
+ try {
+ tsFileResourceIterator.remove();
+ } catch (UnsupportedOperationException e) {
+ // pass
+ }
+ assertEquals(6,
levelCompactionTsFileManagement.getTsFileList(true).size());
+
+ TsFileResource tsFileResource1 =
+ new TsFileResource(
+ new File(
+ TestConstant.BASE_OUTPUT_PATH.concat(
+ 10
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 10
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 1
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile")));
+ TsFileResource tsFileResource2 =
+ new TsFileResource(
+ new File(
+ TestConstant.BASE_OUTPUT_PATH.concat(
+ 11
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 11
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 1
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile")));
+ levelCompactionTsFileManagement.add(tsFileResource1, true);
+ levelCompactionTsFileManagement.add(tsFileResource2, true);
+ TsFileResource tsFileResource3 =
+ new TsFileResource(
+ new File(
+ TestConstant.BASE_OUTPUT_PATH.concat(
+ 12
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 12
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 2
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile")));
+ levelCompactionTsFileManagement.add(tsFileResource3, true);
+ Iterator<TsFileResource> tsFileResourceIterator2 =
+ levelCompactionTsFileManagement.getIterator(true);
+ int count = 0;
+ while (tsFileResourceIterator2.hasNext()) {
+ count++;
+ tsFileResourceIterator2.next();
+ }
+ assertEquals(9, count);
+ }
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/NoCompactionTsFileManagementTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/NoCompactionTsFileManagementTest.java
index 2c14215..6407c9e 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/NoCompactionTsFileManagementTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/NoCompactionTsFileManagementTest.java
@@ -154,11 +154,80 @@ public class NoCompactionTsFileManagementTest extends
LevelCompactionTest {
noCompactionTsFileManagement.recover();
CompactionMergeTask compactionMergeTask =
noCompactionTsFileManagement.new CompactionMergeTask(() -> {}, 0);
- compactionMergeTask.run();
+ compactionMergeTask.call();
assertEquals(1, noCompactionTsFileManagement.size(true));
assertEquals(1, noCompactionTsFileManagement.size(false));
noCompactionTsFileManagement.clear();
assertEquals(0, noCompactionTsFileManagement.size(true));
assertEquals(0, noCompactionTsFileManagement.size(false));
}
+
+ @Test
+ public void testIteratorRemove() {
+ NoCompactionTsFileManagement noCompactionTsFileManagement =
+ new NoCompactionTsFileManagement(COMPACTION_TEST_SG,
tempSGDir.getPath());
+ for (TsFileResource tsFileResource : seqResources) {
+ noCompactionTsFileManagement.add(tsFileResource, true);
+ }
+ noCompactionTsFileManagement.addAll(seqResources, false);
+ assertEquals(6, noCompactionTsFileManagement.getTsFileList(true).size());
+
+ Iterator<TsFileResource> tsFileResourceIterator =
+ noCompactionTsFileManagement.getIterator(true);
+ tsFileResourceIterator.next();
+ try {
+ tsFileResourceIterator.remove();
+ } catch (UnsupportedOperationException e) {
+ // pass
+ }
+ assertEquals(6, noCompactionTsFileManagement.getTsFileList(true).size());
+
+ TsFileResource tsFileResource1 =
+ new TsFileResource(
+ new File(
+ TestConstant.BASE_OUTPUT_PATH.concat(
+ 10
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 10
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 1
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile")));
+ TsFileResource tsFileResource2 =
+ new TsFileResource(
+ new File(
+ TestConstant.BASE_OUTPUT_PATH.concat(
+ 11
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 11
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 1
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile")));
+ noCompactionTsFileManagement.add(tsFileResource1, true);
+ noCompactionTsFileManagement.add(tsFileResource2, true);
+ TsFileResource tsFileResource3 =
+ new TsFileResource(
+ new File(
+ TestConstant.BASE_OUTPUT_PATH.concat(
+ 12
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 12
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 2
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile")));
+ noCompactionTsFileManagement.add(tsFileResource3, true);
+ Iterator<TsFileResource> tsFileResourceIterator2 =
+ noCompactionTsFileManagement.getIterator(true);
+ int count = 0;
+ while (tsFileResourceIterator2.hasNext()) {
+ count++;
+ tsFileResourceIterator2.next();
+ }
+ assertEquals(9, count);
+ }
}