This is an automated email from the ASF dual-hosted git repository.
ericpai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 76a4e43955 [IOTDB-3018] Fix compation bugs on handling deleted target
file and service shutdown deadlock (#5693)
76a4e43955 is described below
commit 76a4e43955658be78bfa2b8807c527820a6b545d
Author: BaiJian <[email protected]>
AuthorDate: Fri Apr 29 19:33:40 2022 +0800
[IOTDB-3018] Fix compation bugs on handling deleted target file and service
shutdown deadlock (#5693)
---
.../engine/compaction/CompactionTaskManager.java | 45 ++++++-----
.../compaction/cross/CrossSpaceCompactionTask.java | 2 +-
.../compaction/inner/InnerSpaceCompactionTask.java | 34 +++++----
.../impl/ReadPointCompactionPerformer.java | 5 +-
.../compaction/task/AbstractCompactionTask.java | 19 ++---
.../compaction/task/CompactionTaskSummary.java | 32 ++++++++
.../compaction/CompactionTaskManagerTest.java | 2 +-
.../inner/InnerCompactionEmptyTsFileTest.java | 89 ++++++++++++++++++++++
.../compaction/inner/InnerCompactionTest.java | 63 +--------------
9 files changed, 178 insertions(+), 113 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
index 2c159d8331..df314abffa 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import
org.apache.iotdb.db.engine.compaction.comparator.DefaultCompactionTaskComparatorImpl;
import org.apache.iotdb.db.engine.compaction.constant.CompactionTaskStatus;
import org.apache.iotdb.db.engine.compaction.task.AbstractCompactionTask;
+import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary;
import org.apache.iotdb.db.utils.datastructure.FixedPriorityBlockingQueue;
import com.google.common.util.concurrent.RateLimiter;
@@ -69,8 +70,8 @@ public class CompactionTaskManager implements IService {
new FixedPriorityBlockingQueue<>(1024, new
DefaultCompactionTaskComparatorImpl());
// <fullStorageGroupName,futureSet>, it is used to store all compaction
tasks under each
// virtualStorageGroup
- private Map<String, Map<AbstractCompactionTask, Future<Void>>>
storageGroupTasks =
- new HashMap<>();
+ private Map<String, Map<AbstractCompactionTask,
Future<CompactionTaskSummary>>>
+ storageGroupTasks = new HashMap<>();
// The thread pool that periodically fetches and executes the compaction
task from
// candidateCompactionTaskQueue to taskExecutionPool. The default number of
threads for this pool
@@ -124,7 +125,7 @@ public class CompactionTaskManager implements IService {
}
@Override
- public synchronized void stop() {
+ public void stop() {
if (taskExecutionPool != null) {
taskExecutionPool.shutdownNow();
compactionTaskSubmissionThreadPool.shutdownNow();
@@ -136,24 +137,24 @@ public class CompactionTaskManager implements IService {
}
@Override
- public synchronized void waitAndStop(long milliseconds) {
+ public void waitAndStop(long milliseconds) {
if (taskExecutionPool != null) {
awaitTermination(taskExecutionPool, milliseconds);
awaitTermination(compactionTaskSubmissionThreadPool, milliseconds);
- logger.info("Waiting for task taskExecutionPool to shut down");
+ logger.info("Waiting for task taskExecutionPool to shut down in {} ms",
milliseconds);
waitTermination();
storageGroupTasks.clear();
}
}
@TestOnly
- public synchronized void waitAllCompactionFinish() {
+ public void waitAllCompactionFinish() {
long sleepingStartTime = 0;
if (taskExecutionPool != null) {
while (taskExecutionPool.getActiveCount() > 0 ||
taskExecutionPool.getQueue().size() > 0) {
// wait
try {
- this.wait(200);
+ Thread.sleep(200);
sleepingStartTime += 200;
if (sleepingStartTime % 10000 == 0) {
logger.warn(
@@ -168,16 +169,17 @@ public class CompactionTaskManager implements IService {
}
}
storageGroupTasks.clear();
+ candidateCompactionTaskQueue.clear();
logger.info("All compaction task finish");
}
}
- private synchronized void waitTermination() {
+ private void waitTermination() {
long startTime = System.currentTimeMillis();
while (!taskExecutionPool.isTerminated()) {
int timeMillis = 0;
try {
- this.wait(200);
+ Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
@@ -192,7 +194,7 @@ public class CompactionTaskManager implements IService {
logger.info("CompactionManager stopped");
}
- private synchronized void awaitTermination(ExecutorService service, long
milliseconds) {
+ private void awaitTermination(ExecutorService service, long milliseconds) {
try {
service.shutdown();
service.awaitTermination(milliseconds, TimeUnit.MILLISECONDS);
@@ -301,28 +303,28 @@ public class CompactionTaskManager implements IService {
/**
* This method will directly submit a task to thread pool if there is
available thread.
*
- * @throws RejectedExecutionException
+ * @return the future of the task.
*/
- public synchronized void submitTask(AbstractCompactionTask compactionTask)
- throws RejectedExecutionException {
- if (taskExecutionPool != null && !taskExecutionPool.isTerminated()) {
- Future<Void> future = taskExecutionPool.submit(compactionTask);
+ public synchronized Future<CompactionTaskSummary> submitTask(
+ AbstractCompactionTask compactionTask) throws RejectedExecutionException
{
+ if (taskExecutionPool != null && !taskExecutionPool.isShutdown()) {
+ Future<CompactionTaskSummary> future =
taskExecutionPool.submit(compactionTask);
storageGroupTasks
.computeIfAbsent(compactionTask.getFullStorageGroupName(), x -> new
HashMap<>())
.put(compactionTask, future);
- return;
+ return future;
}
logger.warn(
"A CompactionTask failed to be submitted to CompactionTaskManager
because {}",
taskExecutionPool == null
? "taskExecutionPool is null"
: "taskExecutionPool is terminated");
+ return null;
}
public synchronized Future<Void> submitSubTask(Callable<Void>
subCompactionTask) {
- if (subCompactionTaskExecutionPool != null &&
!subCompactionTaskExecutionPool.isTerminated()) {
- Future<Void> future =
subCompactionTaskExecutionPool.submit(subCompactionTask);
- return future;
+ if (subCompactionTaskExecutionPool != null &&
!subCompactionTaskExecutionPool.isShutdown()) {
+ return subCompactionTaskExecutionPool.submit(subCompactionTask);
}
return null;
}
@@ -336,7 +338,7 @@ public class CompactionTaskManager implements IService {
public synchronized List<AbstractCompactionTask> abortCompaction(String
storageGroupName) {
List<AbstractCompactionTask> compactionTaskOfCurSG = new ArrayList<>();
if (storageGroupTasks.containsKey(storageGroupName)) {
- for (Map.Entry<AbstractCompactionTask, Future<Void>> taskFutureEntry :
+ for (Map.Entry<AbstractCompactionTask, Future<CompactionTaskSummary>>
taskFutureEntry :
storageGroupTasks.get(storageGroupName).entrySet()) {
taskFutureEntry.getValue().cancel(true);
compactionTaskOfCurSG.add(taskFutureEntry.getKey());
@@ -367,7 +369,8 @@ public class CompactionTaskManager implements IService {
public synchronized List<AbstractCompactionTask>
getRunningCompactionTaskList() {
List<AbstractCompactionTask> tasks = new ArrayList<>();
- for (Map<AbstractCompactionTask, Future<Void>> taskFutureMap :
storageGroupTasks.values()) {
+ for (Map<AbstractCompactionTask, Future<CompactionTaskSummary>>
taskFutureMap :
+ storageGroupTasks.values()) {
tasks.addAll(taskFutureMap.keySet());
}
return tasks;
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java
index 8f45039fb4..3f3568ea82 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java
@@ -149,7 +149,7 @@ public class CrossSpaceCompactionTask extends
AbstractCompactionTask {
}
} catch (Throwable throwable) {
// catch throwable instead of exception to handle OOM errors
- LOGGER.error("Meet errors in cross space compaction, {}",
throwable.getMessage());
+ LOGGER.error("Meet errors in cross space compaction.");
CompactionExceptionHandler.handleException(
fullStorageGroupName,
logFile,
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTask.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTask.java
index a7b5acaaa3..f2f9092084 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTask.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceList;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
@@ -38,6 +39,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
@@ -99,6 +101,8 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
targetTsFileResource =
TsFileNameGenerator.getInnerCompactionTargetFileResource(
selectedTsFileResourceList, sequence);
+ List<TsFileResource> targetTsFileList =
+ new ArrayList<>(Collections.singletonList(targetTsFileResource));
LOGGER.info(
"{} [Compaction] starting compaction task with {} files",
fullStorageGroupName,
@@ -113,8 +117,7 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
try {
compactionLogger = new CompactionLogger(logFile);
compactionLogger.logFiles(selectedTsFileResourceList,
CompactionLogger.STR_SOURCE_FILES);
- compactionLogger.logFiles(
- Collections.singletonList(targetTsFileResource),
CompactionLogger.STR_TARGET_FILES);
+ compactionLogger.logFiles(targetTsFileList,
CompactionLogger.STR_TARGET_FILES);
LOGGER.info("{} [InnerSpaceCompactionTask] Close the logger",
fullStorageGroupName);
compactionLogger.close();
LOGGER.info(
@@ -122,11 +125,12 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
// carry out the compaction
performer.setSourceFiles(selectedTsFileResourceList);
-
performer.setTargetFiles(Collections.singletonList(targetTsFileResource));
+ // As elements in targetFiles may be removed in
ReadPointCompactionPerformer, we should use a
+ // mutable list instead of Collections.singletonList()
+ performer.setTargetFiles(targetTsFileList);
performer.perform();
- CompactionUtils.moveTargetFile(
- Collections.singletonList(targetTsFileResource), true,
fullStorageGroupName);
+ CompactionUtils.moveTargetFile(targetTsFileList, true,
fullStorageGroupName);
LOGGER.info("{} [InnerSpaceCompactionTask] start to rename mods file",
fullStorageGroupName);
CompactionUtils.combineModsInInnerCompaction(
@@ -142,14 +146,14 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
tsFileManager.replace(
selectedTsFileResourceList,
Collections.emptyList(),
- Collections.singletonList(targetTsFileResource),
+ targetTsFileList,
timePartition,
true);
} else {
tsFileManager.replace(
Collections.emptyList(),
selectedTsFileResourceList,
- Collections.singletonList(targetTsFileResource),
+ targetTsFileList,
timePartition,
false);
}
@@ -166,10 +170,11 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
isHoldingWriteLock[i] = true;
}
- if (targetTsFileResource.getTsFile().length()
- < TSFileConfig.MAGIC_STRING.getBytes().length * 2L + Byte.BYTES) {
+ if (targetTsFileResource.getTsFile().exists()
+ && targetTsFileResource.getTsFile().length()
+ < TSFileConfig.MAGIC_STRING.getBytes().length * 2L + Byte.BYTES)
{
// the file size is smaller than magic string and version number
- throw new RuntimeException(
+ throw new TsFileNotCompleteException(
String.format(
"target file %s is smaller than magic string and version
number size",
targetTsFileResource));
@@ -194,10 +199,6 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
FileUtils.delete(logFile);
}
} catch (Throwable throwable) {
- LOGGER.error(
- "{} [Compaction] Throwable is caught during execution of
SizeTieredCompaction, {}",
- fullStorageGroupName,
- throwable.getMessage());
LOGGER.warn("{} [Compaction] Start to handle exception",
fullStorageGroupName);
if (throwable instanceof InterruptedException) {
Thread.currentThread().interrupt();
@@ -209,7 +210,7 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
CompactionExceptionHandler.handleException(
fullStorageGroupName,
logFile,
- Collections.singletonList(targetTsFileResource),
+ targetTsFileList,
selectedTsFileResourceList,
Collections.emptyList(),
tsFileManager,
@@ -220,7 +221,7 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
CompactionExceptionHandler.handleException(
fullStorageGroupName,
logFile,
- Collections.singletonList(targetTsFileResource),
+ targetTsFileList,
Collections.emptyList(),
selectedTsFileResourceList,
tsFileManager,
@@ -228,6 +229,7 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
true,
isSequence());
}
+ throw throwable;
} finally {
releaseFileLocksAndResetMergingStatus();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
index 43a121eff7..9d7c6789f4 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
@@ -192,8 +192,7 @@ public class ReadPointCompactionPerformer
AbstractCompactionWriter compactionWriter,
QueryContext queryContext,
QueryDataSource queryDataSource)
- throws MetadataException, IOException, InterruptedException {
- boolean hasStartChunkGroup = false;
+ throws IOException, InterruptedException {
MultiTsFileDeviceIterator.MeasurementIterator measurementIterator =
deviceIterator.iterateNotAlignedSeries(device, false);
Set<String> allMeasurements = measurementIterator.getAllMeasurements();
@@ -204,7 +203,7 @@ public class ReadPointCompactionPerformer
int idx = 0;
for (String measurement : allMeasurements) {
if (measurementsForEachSubTask[idx % subTaskNums] == null) {
- measurementsForEachSubTask[idx % subTaskNums] = new HashSet<String>();
+ measurementsForEachSubTask[idx % subTaskNums] = new HashSet<>();
}
measurementsForEachSubTask[idx++ % subTaskNums].add(measurement);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java
index 3fbf180eb2..0fbe2e7d40 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java
@@ -32,11 +32,12 @@ import java.util.concurrent.atomic.AtomicInteger;
/**
* AbstractCompactionTask is the base class for all compaction task, it
carries out the execution of
- * compaction. AbstractCompactionTask uses a template method, it execute the
abstract function
- * <i>doCompaction</i> implemented by subclass, and decrease the
currentTaskNum in
- * CompactionScheduler when the <i>doCompaction</i> finish.
+ * compaction. AbstractCompactionTask uses a template method, it executes the
abstract function
+ * {@link AbstractCompactionTask#doCompaction()} implemented by subclass, and
decrease the
+ * currentTaskNum in CompactionScheduler when the {@link
AbstractCompactionTask#doCompaction()} is
+ * finished. The future returns the {@link CompactionTaskSummary} of this task
execution.
*/
-public abstract class AbstractCompactionTask implements Callable<Void> {
+public abstract class AbstractCompactionTask implements
Callable<CompactionTaskSummary> {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
protected String fullStorageGroupName;
@@ -64,25 +65,25 @@ public abstract class AbstractCompactionTask implements
Callable<Void> {
protected abstract void doCompaction() throws Exception;
@Override
- public Void call() throws Exception {
+ public CompactionTaskSummary call() throws Exception {
ran = true;
long startTime = System.currentTimeMillis();
currentTaskNum.incrementAndGet();
+ boolean isSuccess = false;
try {
doCompaction();
+ isSuccess = true;
} catch (InterruptedException e) {
LOGGER.warn("Current task is interrupted");
- Thread.interrupted();
} catch (Exception e) {
- LOGGER.error(e.getMessage(), e);
+ LOGGER.error("Running compaction task failed", e);
} finally {
this.currentTaskNum.decrementAndGet();
CompactionTaskManager.getInstance().removeRunningTaskFuture(this);
timeCost = System.currentTimeMillis() - startTime;
finished = true;
}
-
- return null;
+ return new CompactionTaskSummary(isSuccess);
}
public String getFullStorageGroupName() {
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionTaskSummary.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionTaskSummary.java
new file mode 100644
index 0000000000..a7380969ff
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionTaskSummary.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.compaction.task;
+
+/** The summary of one {@link AbstractCompactionTask} execution */
+public class CompactionTaskSummary {
+ private final boolean success;
+
+ public CompactionTaskSummary(boolean success) {
+ this.success = success;
+ }
+
+ public boolean isSuccess() {
+ return success;
+ }
+}
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
index 977ebd1a43..923e9e3b77 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
@@ -210,7 +210,7 @@ public class CompactionTaskManagerTest extends
InnerCompactionTest {
new ReadChunkCompactionPerformer(seqResources),
new AtomicInteger(0));
CompactionTaskManager manager = CompactionTaskManager.getInstance();
- manager.addTaskToWaitingQueue(task1);
+ Assert.assertTrue(manager.addTaskToWaitingQueue(task1));
manager.submitTaskFromTaskQueue();
while (manager.getTotalTaskCount() > 0) {
Thread.sleep(10);
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionEmptyTsFileTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionEmptyTsFileTest.java
new file mode 100644
index 0000000000..5b1306ff6a
--- /dev/null
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionEmptyTsFileTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.compaction.inner;
+
+import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
+import
org.apache.iotdb.db.engine.compaction.performer.impl.ReadPointCompactionPerformer;
+import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary;
+import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class InnerCompactionEmptyTsFileTest extends InnerCompactionTest {
+
+ File tempSGDir;
+
+ @Before
+ public void setUp() throws Exception {
+ tempSGDir = new File(TestConstant.getTestTsFileDir("root.compactionTest",
0, 0));
+ if (!tempSGDir.exists()) {
+ Assert.assertTrue(tempSGDir.mkdirs());
+ }
+ seqFileNum = 0;
+ unseqFileNum = 4;
+ super.setUp();
+ tempSGDir = new File(TestConstant.BASE_OUTPUT_PATH.concat("tempSG"));
+ tempSGDir.mkdirs();
+ tsFileManager = new TsFileManager(COMPACTION_TEST_SG, "0",
tempSGDir.getAbsolutePath());
+ }
+
+ @After
+ public void tearDown() throws IOException, StorageEngineException {
+ super.tearDown();
+ FileUtils.deleteDirectory(tempSGDir);
+ }
+
+ @Override
+ void prepareFile(TsFileResource tsFileResource, long timeOffset, long ptNum,
long valueOffset)
+ throws IOException, WriteProcessException {
+ // create some empty tsfiles
+ super.prepareFile(tsFileResource, 0, 0, 0);
+ }
+
+ @Test
+ public void testCompactWithPartialEmptyUnseqFiles() throws Exception {
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+
+ // Here we compact file 0-2
+ InnerSpaceCompactionTask task =
+ new InnerSpaceCompactionTask(
+ 0,
+ tsFileManager,
+ unseqResources.subList(0, 3),
+ false,
+ new ReadPointCompactionPerformer(),
+ new AtomicInteger(0));
+ Future<CompactionTaskSummary> future =
CompactionTaskManager.getInstance().submitTask(task);
+ Assert.assertTrue(future.get().isSuccess());
+ }
+}
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionTest.java
index b74a9a8dc6..11a1b13d7f 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionTest.java
@@ -74,10 +74,8 @@ public abstract class InnerCompactionTest {
protected List<TsFileResource> seqResources = new ArrayList<>();
protected List<TsFileResource> unseqResources = new ArrayList<>();
- private int prevMergeChunkThreshold;
-
@Before
- public void setUp() throws IOException, WriteProcessException,
MetadataException, Exception {
+ public void setUp() throws Exception {
EnvironmentUtils.envSetUp();
IoTDB.configManager.init();
prepareSeries();
@@ -213,63 +211,4 @@ public abstract class InnerCompactionTest {
}
fileWriter.close();
}
-
- List<TsFileResource> prepareTsFileResources() throws IOException,
WriteProcessException {
- List<TsFileResource> ret = new ArrayList<>();
- // prepare file 1
- File file1 =
- new File(
- TestConstant.getTestTsFileDir("root.compactionTest", 0, 0)
- .concat(
- System.nanoTime()
- + IoTDBConstant.FILE_NAME_SEPARATOR
- + 0
- + IoTDBConstant.FILE_NAME_SEPARATOR
- + 0
- + IoTDBConstant.FILE_NAME_SEPARATOR
- + 0
- + ".tsfile"));
- TsFileResource tsFileResource1 = new TsFileResource(file1);
- tsFileResource1.setStatus(TsFileResourceStatus.CLOSED);
- tsFileResource1.updatePlanIndexes((long) 0);
- TsFileWriter fileWriter1 = new TsFileWriter(tsFileResource1.getTsFile());
- fileWriter1.registerTimeseries(new Path(deviceIds[0]),
measurementSchemas[0]);
- TSRecord record1 = new TSRecord(0, deviceIds[0]);
- record1.addTuple(
- DataPoint.getDataPoint(
- measurementSchemas[0].getType(),
- measurementSchemas[0].getMeasurementId(),
- String.valueOf(0)));
- fileWriter1.write(record1);
- fileWriter1.close();
- // prepare file 2
- File file2 =
- new File(
- TestConstant.getTestTsFileDir("root.compactionTest", 0, 0)
- .concat(
- System.nanoTime()
- + IoTDBConstant.FILE_NAME_SEPARATOR
- + 1
- + IoTDBConstant.FILE_NAME_SEPARATOR
- + 0
- + IoTDBConstant.FILE_NAME_SEPARATOR
- + 0
- + ".tsfile"));
- TsFileResource tsFileResource2 = new TsFileResource(file2);
- tsFileResource2.setStatus(TsFileResourceStatus.CLOSED);
- tsFileResource2.updatePlanIndexes((long) 1);
- TsFileWriter fileWriter2 = new TsFileWriter(tsFileResource2.getTsFile());
- fileWriter2.registerTimeseries(new Path(deviceIds[0]),
measurementSchemas[1]);
- TSRecord record2 = new TSRecord(0, deviceIds[0]);
- record2.addTuple(
- DataPoint.getDataPoint(
- measurementSchemas[1].getType(),
- measurementSchemas[1].getMeasurementId(),
- String.valueOf(0)));
- fileWriter2.write(record2);
- fileWriter2.close();
- ret.add(tsFileResource1);
- ret.add(tsFileResource2);
- return ret;
- }
}