This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.11 by this push:
new e34d129 [To rel/0.11] Fix compaction recover list bug to 0.11 (#2452)
e34d129 is described below
commit e34d1297e6515895a0e0b2ab52443b0cb5473873
Author: zhanglingzhe0820 <[email protected]>
AuthorDate: Fri Jan 8 21:56:33 2021 +0800
[To rel/0.11] Fix compaction recover list bug to 0.11 (#2452)
Fix compaction recover list bug
---
.../db/engine/compaction/TsFileManagement.java | 5 ++
.../level/LevelCompactionTsFileManagement.java | 57 ++++++++++++++++++++--
.../no/NoCompactionTsFileManagement.java | 5 ++
.../engine/storagegroup/StorageGroupProcessor.java | 10 ++--
.../compaction/LevelCompactionRecoverTest.java | 10 ++--
5 files changed, 74 insertions(+), 13 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index 096abec..0c728c7 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -99,6 +99,11 @@ public abstract class TsFileManagement {
public abstract void add(TsFileResource tsFileResource, boolean sequence);
/**
+ * add one TsFile to list for recover
+ */
+ public abstract void addRecover(TsFileResource tsFileResource, boolean
sequence);
+
+ /**
* add some TsFiles to list
*/
public abstract void addAll(List<TsFileResource> tsFileResourceList, boolean
sequence);
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 5c4a168..66d7d71 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -81,6 +81,8 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
private final Map<Long, List<List<TsFileResource>>>
unSequenceTsFileResources = new ConcurrentSkipListMap<>();
private final List<List<TsFileResource>> forkedSequenceTsFileResources = new
ArrayList<>();
private final List<List<TsFileResource>> forkedUnSequenceTsFileResources =
new ArrayList<>();
+ private final List<TsFileResource> sequenceRecoverTsFileResources = new
CopyOnWriteArrayList<>();
+ private final List<TsFileResource> unSequenceRecoverTsFileResources = new
CopyOnWriteArrayList<>();
public LevelCompactionTsFileManagement(String storageGroupName, String
storageGroupDir) {
super(storageGroupName, storageGroupDir);
@@ -240,6 +242,19 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
}
@Override
+ public void addRecover(TsFileResource tsFileResource, boolean sequence) {
+ if (sequence) {
+ synchronized (sequenceRecoverTsFileResources) {
+ sequenceRecoverTsFileResources.add(tsFileResource);
+ }
+ } else {
+ synchronized (unSequenceTsFileResources) {
+ unSequenceRecoverTsFileResources.add(tsFileResource);
+ }
+ }
+ }
+
+ @Override
public void addAll(List<TsFileResource> tsFileResourceList, boolean
sequence) {
for (TsFileResource tsFileResource : tsFileResourceList) {
add(tsFileResource, sequence);
@@ -349,7 +364,7 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
}
if (fullMerge) {
// get tsfile resource from list, as they have been recovered in
StorageGroupProcessor
- TsFileResource targetTsFileResource = getTsFileResource(targetFile,
isSeq);
+ TsFileResource targetTsFileResource =
getRecoverTsFileResource(targetFile, isSeq);
long timePartition = targetTsFileResource.getTimePartition();
RestorableTsFileIOWriter writer = new
RestorableTsFileIOWriter(target);
// if not complete compaction, resume merge
@@ -371,7 +386,7 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
deleteAllSubLevelFiles(isSeq, timePartition);
} else {
// get tsfile resource from list, as they have been recovered in
StorageGroupProcessor
- TsFileResource targetResource = getTsFileResource(targetFile, isSeq);
+ TsFileResource targetResource = getRecoverTsFileResource(targetFile,
isSeq);
long timePartition = targetResource.getTimePartition();
List<TsFileResource> sourceTsFileResources = new ArrayList<>();
for (String file : sourceFileList) {
@@ -392,13 +407,26 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
.merge(targetResource, sourceTsFileResources, storageGroupName,
compactionLogger, deviceSet,
isSeq);
+ // complete compaction and delete source file
+ writeLock();
+ try {
+ int targetLevel = getMergeLevel(targetResource.getTsFile());
+ if (isSeq) {
+
sequenceTsFileResources.get(timePartition).get(targetLevel).add(targetResource);
+ sequenceRecoverTsFileResources.clear();
+ } else {
+
unSequenceTsFileResources.get(timePartition).get(targetLevel).add(targetResource);
+ unSequenceRecoverTsFileResources.clear();
+ }
+ deleteLevelFilesInList(timePartition, sourceTsFileResources,
level, isSeq);
+ } finally {
+ writeUnlock();
+ }
+ deleteLevelFilesInDisk(sourceTsFileResources);
compactionLogger.close();
} else {
writer.close();
}
- // complete compaction and delete source file
- deleteLevelFilesInDisk(sourceTsFileResources);
- deleteLevelFilesInList(timePartition, sourceTsFileResources, level,
isSeq);
}
}
} catch (IOException | IllegalPathException e) {
@@ -611,6 +639,25 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
return Integer.parseInt(mergeLevelStr);
}
+ private TsFileResource getRecoverTsFileResource(String filePath, boolean
isSeq)
+ throws IOException {
+ if (isSeq) {
+ for (TsFileResource tsFileResource : sequenceRecoverTsFileResources) {
+ if (Files.isSameFile(tsFileResource.getTsFile().toPath(), new
File(filePath).toPath())) {
+ return tsFileResource;
+ }
+ }
+ } else {
+ for (TsFileResource tsFileResource : unSequenceRecoverTsFileResources) {
+ if (Files.isSameFile(tsFileResource.getTsFile().toPath(), new
File(filePath).toPath())) {
+ return tsFileResource;
+ }
+ }
+ }
+ logger.error("cannot get tsfile resource path: {}", filePath);
+ throw new IOException();
+ }
+
private TsFileResource getTsFileResource(String filePath, boolean isSeq)
throws IOException {
if (isSeq) {
for (List<SortedSet<TsFileResource>> tsFileResourcesWithLevel :
sequenceTsFileResources
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
index 1e130a0..7d3638d 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
@@ -92,6 +92,11 @@ public class NoCompactionTsFileManagement extends
TsFileManagement {
}
@Override
+ public void addRecover(TsFileResource tsFileResource, boolean sequence) {
+ logger.info("{} do not need to recover", storageGroupName);
+ }
+
+ @Override
public void addAll(List<TsFileResource> tsFileResourceList, boolean
sequence) {
if (sequence) {
sequenceFileTreeSet.addAll(tsFileResourceList);
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index b558f2c..201b908 100755
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -588,9 +588,13 @@ public class StorageGroupProcessor {
try {
// this tsfile is not zero level, no need to perform redo wal
if
(LevelCompactionTsFileManagement.getMergeLevel(tsFileResource.getTsFile()) > 0)
{
- recoverPerformer.recover(false);
- tsFileResource.setClosed(true);
- tsFileManagement.add(tsFileResource, isSeq);
+ writer = recoverPerformer.recover(false);
+ if (writer.hasCrashed()) {
+ tsFileManagement.addRecover(tsFileResource, isSeq);
+ } else {
+ tsFileResource.setClosed(true);
+ tsFileManagement.add(tsFileResource, isSeq);
+ }
continue;
} else {
writer = recoverPerformer.recover(true);
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
index 4ee7880..b401c03 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
@@ -113,7 +113,7 @@ public class LevelCompactionRecoverTest extends
LevelCompactionTest {
CompactionUtils.merge(targetTsFileResource, new
ArrayList<>(seqResources.subList(0, 3)),
COMPACTION_TEST_SG, compactionLogger, new HashSet<>(), true);
compactionLogger.close();
- levelCompactionTsFileManagement.add(targetTsFileResource, true);
+ levelCompactionTsFileManagement.addRecover(targetTsFileResource, true);
levelCompactionTsFileManagement.recover();
context = new QueryContext();
path = new PartialPath(
@@ -191,7 +191,7 @@ public class LevelCompactionRecoverTest extends
LevelCompactionTest {
}
logStream.close();
- levelCompactionTsFileManagement.add(targetTsFileResource, true);
+ levelCompactionTsFileManagement.addRecover(targetTsFileResource, true);
levelCompactionTsFileManagement.recover();
context = new QueryContext();
path = new PartialPath(
@@ -274,7 +274,7 @@ public class LevelCompactionRecoverTest extends
LevelCompactionTest {
out.truncate(Long.parseLong(logs.get(logs.size() - 1).split(" ")[1]) - 1);
out.close();
- levelCompactionTsFileManagement.add(targetTsFileResource, true);
+ levelCompactionTsFileManagement.addRecover(targetTsFileResource, true);
levelCompactionTsFileManagement.recover();
context = new QueryContext();
path = new PartialPath(
@@ -333,7 +333,7 @@ public class LevelCompactionRecoverTest extends
LevelCompactionTest {
CompactionUtils.merge(targetTsFileResource, new
ArrayList<>(seqResources.subList(0, 3)),
COMPACTION_TEST_SG, compactionLogger, new HashSet<>(), false);
compactionLogger.close();
- levelCompactionTsFileManagement.add(targetTsFileResource, false);
+ levelCompactionTsFileManagement.addRecover(targetTsFileResource, false);
levelCompactionTsFileManagement.recover();
context = new QueryContext();
path = new PartialPath(
@@ -484,7 +484,7 @@ public class LevelCompactionRecoverTest extends
LevelCompactionTest {
compactionLogger.logFile(TARGET_NAME, targetTsFileResource.getTsFile());
CompactionUtils.merge(targetTsFileResource, new
ArrayList<>(seqResources.subList(0, 3)),
COMPACTION_TEST_SG, compactionLogger, new HashSet<>(), true);
- levelCompactionTsFileManagement.add(targetTsFileResource, true);
+ levelCompactionTsFileManagement.addRecover(targetTsFileResource, true);
compactionLogger.close();
levelCompactionTsFileManagement.recover();
QueryContext context = new QueryContext();