This is an automated email from the ASF dual-hosted git repository. ericpai pushed a commit to branch bugfix/cp-iotdb-3018 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 22833be80705ef761103bd5909ce8e4836358b7e Author: Eric Pai <[email protected]> AuthorDate: Sat Apr 30 17:55:40 2022 +0800 [To rel/0.13][IOTDB-3018] Fix compation bugs on handling deleted target file --- .../engine/compaction/CompactionTaskManager.java | 31 ++++---- .../task/RewriteCrossSpaceCompactionTask.java | 2 +- .../inner/sizetiered/SizeTieredCompactionTask.java | 31 ++++---- .../inner/utils/InnerSpaceCompactionUtils.java | 7 ++ .../compaction/task/AbstractCompactionTask.java | 17 ++-- .../compaction/task/CompactionTaskSummary.java | 32 ++++++++ .../compaction/CompactionTaskManagerTest.java | 2 +- .../inner/InnerCompactionEmptyTsFileTest.java | 90 ++++++++++++++++++++++ 8 files changed, 173 insertions(+), 39 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java index 96021ce239..280ea16c27 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java @@ -26,6 +26,7 @@ import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.compaction.constant.CompactionTaskStatus; import org.apache.iotdb.db.engine.compaction.task.AbstractCompactionTask; +import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary; import org.apache.iotdb.db.service.IService; import org.apache.iotdb.db.service.ServiceType; import org.apache.iotdb.db.utils.TestOnly; @@ -68,7 +69,8 @@ public class CompactionTaskManager implements IService { new FixedPriorityBlockingQueue<>(1024, new CompactionTaskComparator()); // <logicalStorageGroupName,futureSet>, it is used to terminate all compaction tasks under the // logicalStorageGroup - private Map<String, Set<Future<Void>>> storageGroupTasks = new ConcurrentHashMap<>(); + private Map<String, Set<Future<CompactionTaskSummary>>> storageGroupTasks = + new ConcurrentHashMap<>(); private List<AbstractCompactionTask> runningCompactionTaskList = new ArrayList<>(); // The thread pool that periodically fetches and executes the compaction task from @@ -139,7 +141,7 @@ public class CompactionTaskManager implements IService { if (taskExecutionPool != null) { awaitTermination(taskExecutionPool, milliseconds); awaitTermination(compactionTaskSubmissionThreadPool, milliseconds); - logger.info("Waiting for task taskExecutionPool to shut down"); + logger.info("Waiting for task taskExecutionPool to shut down in {} ms", milliseconds); waitTermination(); storageGroupTasks.clear(); } @@ -168,6 +170,7 @@ public class CompactionTaskManager implements IService { } } storageGroupTasks.clear(); + candidateCompactionTaskQueue.clear(); logger.info("All compaction task finish"); } } @@ -297,25 +300,25 @@ public class CompactionTaskManager implements IService { /** * This method will directly submit a task to thread pool if there is available thread. * - * @throws RejectedExecutionException + * @return the future of the task. */ - public synchronized void submitTask(Callable<Void> compactionMergeTask) - throws RejectedExecutionException { - if (taskExecutionPool != null && !taskExecutionPool.isTerminated()) { - taskExecutionPool.submit(compactionMergeTask); - return; + public synchronized Future<CompactionTaskSummary> submitTask( + Callable<CompactionTaskSummary> compactionMergeTask) throws RejectedExecutionException { + if (taskExecutionPool != null && !taskExecutionPool.isShutdown()) { + Future<CompactionTaskSummary> future = taskExecutionPool.submit(compactionMergeTask); + return future; } logger.warn( "A CompactionTask failed to be submitted to CompactionTaskManager because {}", taskExecutionPool == null ? "taskExecutionPool is null" : "taskExecutionPool is terminated"); + return null; } public synchronized Future<Void> submitSubTask(Callable<Void> subCompactionTask) { - if (subCompactionTaskExecutionPool != null && !subCompactionTaskExecutionPool.isTerminated()) { - Future<Void> future = subCompactionTaskExecutionPool.submit(subCompactionTask); - return future; + if (subCompactionTaskExecutionPool != null && !subCompactionTaskExecutionPool.isShutdown()) { + return subCompactionTaskExecutionPool.submit(subCompactionTask); } return null; } @@ -325,12 +328,12 @@ public class CompactionTaskManager implements IService { * corresponding storage group. */ public void abortCompaction(String fullStorageGroupName) { - Set<Future<Void>> subTasks = + Set<Future<CompactionTaskSummary>> subTasks = storageGroupTasks.getOrDefault(fullStorageGroupName, Collections.emptySet()); candidateCompactionTaskQueue.clear(); - Iterator<Future<Void>> subIterator = subTasks.iterator(); + Iterator<Future<CompactionTaskSummary>> subIterator = subTasks.iterator(); while (subIterator.hasNext()) { - Future<Void> next = subIterator.next(); + Future<CompactionTaskSummary> next = subIterator.next(); if (!next.isDone() && !next.isCancelled()) { next.cancel(true); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossSpaceCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossSpaceCompactionTask.java index 13f233f64c..a95b8d44d7 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossSpaceCompactionTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossSpaceCompactionTask.java @@ -87,7 +87,7 @@ public class RewriteCrossSpaceCompactionTask extends AbstractCrossSpaceCompactio executeCompaction(); } catch (Throwable throwable) { // catch throwable instead of exception to handle OOM errors - logger.error("Meet errors in cross space compaction, {}", throwable.getMessage()); + logger.error("Meet errors in cross space compaction", throwable); CompactionExceptionHandler.handleException( fullStorageGroupName, logFile, diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java index 8779ead76b..8c01e06635 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java @@ -31,12 +31,14 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.engine.storagegroup.TsFileResourceList; import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus; import org.apache.iotdb.tsfile.common.conf.TSFileConfig; +import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException; import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -92,6 +94,8 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask { TsFileResource targetTsFileResource = TsFileNameGenerator.getInnerCompactionTargetFileResource( selectedTsFileResourceList, sequence); + List<TsFileResource> targetTsFileList = + new ArrayList<>(Collections.singletonList(targetTsFileResource)); LOGGER.info( "{} [Compaction] starting compaction task with {} files", fullStorageGroupName, @@ -107,8 +111,7 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask { sizeTieredCompactionLogger = new CompactionLogger(logFile); sizeTieredCompactionLogger.logFiles( selectedTsFileResourceList, CompactionLogger.STR_SOURCE_FILES); - sizeTieredCompactionLogger.logFiles( - Collections.singletonList(targetTsFileResource), CompactionLogger.STR_TARGET_FILES); + sizeTieredCompactionLogger.logFiles(targetTsFileList, CompactionLogger.STR_TARGET_FILES); LOGGER.info("{} [SizeTiredCompactionTask] Close the logger", fullStorageGroupName); sizeTieredCompactionLogger.close(); LOGGER.info( @@ -119,9 +122,7 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask { InnerSpaceCompactionUtils.compact(targetTsFileResource, selectedTsFileResourceList); } else { CompactionUtils.compact( - Collections.emptyList(), - selectedTsFileResourceList, - Collections.singletonList(targetTsFileResource)); + Collections.emptyList(), selectedTsFileResourceList, targetTsFileList); } InnerSpaceCompactionUtils.moveTargetFile(targetTsFileResource, fullStorageGroupName); @@ -140,14 +141,14 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask { tsFileManager.replace( selectedTsFileResourceList, Collections.emptyList(), - Collections.singletonList(targetTsFileResource), + targetTsFileList, timePartition, true); } else { tsFileManager.replace( Collections.emptyList(), selectedTsFileResourceList, - Collections.singletonList(targetTsFileResource), + targetTsFileList, timePartition, false); } @@ -164,10 +165,11 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask { isHoldingWriteLock[i] = true; } - if (targetTsFileResource.getTsFile().length() - < TSFileConfig.MAGIC_STRING.getBytes().length * 2L + Byte.BYTES) { + if (targetTsFileResource.getTsFile().exists() + && targetTsFileResource.getTsFile().length() + < TSFileConfig.MAGIC_STRING.getBytes().length * 2L + Byte.BYTES) { // the file size is smaller than magic string and version number - throw new RuntimeException( + throw new TsFileNotCompleteException( String.format( "target file %s is smaller than magic string and version number size", targetTsFileResource)); @@ -193,10 +195,6 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask { FileUtils.delete(logFile); } } catch (Throwable throwable) { - LOGGER.error( - "{} [Compaction] Throwable is caught during execution of SizeTieredCompaction, {}", - fullStorageGroupName, - throwable); LOGGER.warn("{} [Compaction] Start to handle exception", fullStorageGroupName); if (sizeTieredCompactionLogger != null) { sizeTieredCompactionLogger.close(); @@ -205,7 +203,7 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask { CompactionExceptionHandler.handleException( fullStorageGroupName, logFile, - Collections.singletonList(targetTsFileResource), + targetTsFileList, selectedTsFileResourceList, Collections.emptyList(), tsFileManager, @@ -216,7 +214,7 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask { CompactionExceptionHandler.handleException( fullStorageGroupName, logFile, - Collections.singletonList(targetTsFileResource), + targetTsFileList, Collections.emptyList(), selectedTsFileResourceList, tsFileManager, @@ -224,6 +222,7 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask { true, isSequence()); } + throw throwable; } finally { releaseFileLocksAndResetMergingStatus(); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java index e5becdf2d2..cf402099c7 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java @@ -270,6 +270,13 @@ public class InnerSpaceCompactionUtils { */ public static void moveTargetFile(TsFileResource targetResource, String fullStorageGroupName) throws IOException { + if (!targetResource.getTsFile().exists()) { + logger.info( + "{} [Compaction] Tmp target tsfile {} may be deleted after compaction.", + fullStorageGroupName, + targetResource.getTsFilePath()); + return; + } if (!targetResource.getTsFilePath().endsWith(IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX)) { logger.warn( "{} [Compaction] Tmp target tsfile {} should be end with {}", diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java index 359e7027b0..089e888145 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java @@ -31,11 +31,12 @@ import java.util.concurrent.atomic.AtomicInteger; /** * AbstractCompactionTask is the base class for all compaction task, it carries out the execution of - * compaction. AbstractCompactionTask uses a template method, it execute the abstract function - * <i>doCompaction</i> implemented by subclass, and decrease the currentTaskNum in - * CompactionScheduler when the <i>doCompaction</i> finish. + * * compaction. AbstractCompactionTask uses a template method, it executes the abstract function * + * {@link AbstractCompactionTask#doCompaction()} implemented by subclass, and decrease the * + * currentTaskNum in CompactionScheduler when the {@link AbstractCompactionTask#doCompaction()} is * + * finished. The future returns the {@link CompactionTaskSummary} of this task execution. */ -public abstract class AbstractCompactionTask implements Callable<Void> { +public abstract class AbstractCompactionTask implements Callable<CompactionTaskSummary> { private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME); protected String fullStorageGroupName; @@ -60,20 +61,22 @@ public abstract class AbstractCompactionTask implements Callable<Void> { protected abstract void doCompaction() throws Exception; @Override - public Void call() throws Exception { + public CompactionTaskSummary call() throws Exception { long startTime = System.currentTimeMillis(); currentTaskNum.incrementAndGet(); + boolean isSuccess = false; try { doCompaction(); + isSuccess = true; } catch (Exception e) { - LOGGER.error(e.getMessage(), e); + LOGGER.error("Running compaction task failed", e); } finally { CompactionTaskManager.getInstance().removeRunningTaskFromList(this); timeCost = System.currentTimeMillis() - startTime; this.currentTaskNum.decrementAndGet(); } - return null; + return new CompactionTaskSummary(isSuccess); } public String getFullStorageGroupName() { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionTaskSummary.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionTaskSummary.java new file mode 100644 index 0000000000..a7380969ff --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionTaskSummary.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.engine.compaction.task; + +/** The summary of one {@link AbstractCompactionTask} execution */ +public class CompactionTaskSummary { + private final boolean success; + + public CompactionTaskSummary(boolean success) { + this.success = success; + } + + public boolean isSuccess() { + return success; + } +} diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java index 511cdb144f..fa6c4386c2 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java @@ -178,7 +178,7 @@ public class CompactionTaskManagerTest extends InnerCompactionTest { new SizeTieredCompactionTask( "root.compactionTest", "0", 0, tsFileManager, seqResources, true, new AtomicInteger(0)); CompactionTaskManager manager = CompactionTaskManager.getInstance(); - manager.addTaskToWaitingQueue(task1); + Assert.assertTrue(manager.addTaskToWaitingQueue(task1)); manager.submitTaskFromTaskQueue(); while (manager.getTotalTaskCount() > 0) { Thread.sleep(10); diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionEmptyTsFileTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionEmptyTsFileTest.java new file mode 100644 index 0000000000..fbc0a5620a --- /dev/null +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionEmptyTsFileTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.engine.compaction.inner; + +import org.apache.iotdb.db.constant.TestConstant; +import org.apache.iotdb.db.engine.compaction.CompactionTaskManager; +import org.apache.iotdb.db.engine.compaction.inner.sizetiered.SizeTieredCompactionTask; +import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary; +import org.apache.iotdb.db.engine.storagegroup.TsFileManager; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.tsfile.exception.write.WriteProcessException; + +import org.apache.commons.io.FileUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; + +public class InnerCompactionEmptyTsFileTest extends InnerCompactionTest { + + File tempSGDir; + + @Before + public void setUp() throws Exception { + tempSGDir = new File(TestConstant.getTestTsFileDir("root.compactionTest", 0, 0)); + if (!tempSGDir.exists()) { + Assert.assertTrue(tempSGDir.mkdirs()); + } + seqFileNum = 0; + unseqFileNum = 4; + super.setUp(); + tempSGDir = new File(TestConstant.BASE_OUTPUT_PATH.concat("tempSG")); + tempSGDir.mkdirs(); + tsFileManager = new TsFileManager(COMPACTION_TEST_SG, "0", tempSGDir.getAbsolutePath()); + } + + @After + public void tearDown() throws IOException, StorageEngineException { + super.tearDown(); + FileUtils.deleteDirectory(tempSGDir); + } + + @Override + void prepareFile(TsFileResource tsFileResource, long timeOffset, long ptNum, long valueOffset) + throws IOException, WriteProcessException { + // create some empty tsfiles + super.prepareFile(tsFileResource, 0, 0, 0); + } + + @Test + public void testCompactWithPartialEmptyUnseqFiles() throws Exception { + tsFileManager.addAll(seqResources, true); + tsFileManager.addAll(unseqResources, false); + + // Here we compact file 0-2 + AbstractInnerSpaceCompactionTask task = + new SizeTieredCompactionTask( + "root.compactionTest", + "0", + 0, + tsFileManager, + unseqResources.subList(0, 3), + false, + new AtomicInteger(0)); + Future<CompactionTaskSummary> future = CompactionTaskManager.getInstance().submitTask(task); + Assert.assertTrue(future.get().isSuccess()); + } +}
