This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.13 by this push:
new 2a4e7a1eff [To rel/0.13] [IOTDB-3813] Remove access to meta manager in
compaction (#6788)
2a4e7a1eff is described below
commit 2a4e7a1eff389db2f5932b9c316a20898a830858
Author: Liu Xuxin <[email protected]>
AuthorDate: Fri Jul 29 11:10:16 2022 +0800
[To rel/0.13] [IOTDB-3813] Remove access to meta manager in compaction
(#6788)
---
.../db/engine/compaction/CompactionUtils.java | 137 ++++++++++++++++++---
.../manage/CrossSpaceCompactionResource.java | 6 -
.../cross/rewrite/task/SubCompactionTask.java | 19 ++-
.../inner/utils/InnerSpaceCompactionUtils.java | 12 +-
.../utils/SingleSeriesCompactionExecutor.java | 26 +++-
.../java/org/apache/iotdb/db/service/IoTDB.java | 3 +
.../apache/iotdb/db/service/IoTDBShutdownHook.java | 2 -
.../InnerSpaceCompactionUtilsNoAlignedTest.java | 6 +-
8 files changed, 158 insertions(+), 53 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
index c51d2dab98..e689ccadfe 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.engine.compaction;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.cache.ChunkCache;
import
org.apache.iotdb.db.engine.compaction.cross.rewrite.task.SubCompactionTask;
import
org.apache.iotdb.db.engine.compaction.inner.utils.MultiTsFileDeviceIterator;
import org.apache.iotdb.db.engine.compaction.writer.AbstractCompactionWriter;
@@ -33,24 +34,29 @@ import
org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.metadata.path.AlignedPath;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.reader.IBatchReader;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import org.slf4j.Logger;
@@ -61,6 +67,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -91,6 +98,7 @@ public class CompactionUtils {
QueryResourceManager.getInstance()
.getQueryFileManager()
.addUsedFilesForQuery(queryId, queryDataSource);
+ Map<TsFileResource, TsFileSequenceReader> readerCacheMap = new HashMap<>();
try (AbstractCompactionWriter compactionWriter =
getCompactionWriter(seqFileResources, unseqFileResources,
targetFileResources)) {
@@ -106,10 +114,20 @@ public class CompactionUtils {
if (isAligned) {
compactAlignedSeries(
- device, deviceIterator, compactionWriter, queryContext,
queryDataSource);
+ device,
+ deviceIterator,
+ compactionWriter,
+ queryContext,
+ queryDataSource,
+ readerCacheMap);
} else {
compactNonAlignedSeries(
- device, deviceIterator, compactionWriter, queryContext,
queryDataSource);
+ device,
+ deviceIterator,
+ compactionWriter,
+ queryContext,
+ queryDataSource,
+ readerCacheMap);
}
}
@@ -117,6 +135,7 @@ public class CompactionUtils {
updateDeviceStartTimeAndEndTime(targetFileResources, compactionWriter);
updatePlanIndexes(targetFileResources, seqFileResources,
unseqFileResources);
} finally {
+ clearReaderCache(readerCacheMap);
QueryResourceManager.getInstance().endQuery(queryId);
}
}
@@ -126,21 +145,20 @@ public class CompactionUtils {
MultiTsFileDeviceIterator deviceIterator,
AbstractCompactionWriter compactionWriter,
QueryContext queryContext,
- QueryDataSource queryDataSource)
+ QueryDataSource queryDataSource,
+ Map<TsFileResource, TsFileSequenceReader> readerCacheMap)
throws IOException, MetadataException {
MultiTsFileDeviceIterator.AlignedMeasurementIterator
alignedMeasurementIterator =
deviceIterator.iterateAlignedSeries(device);
Set<String> allMeasurements =
alignedMeasurementIterator.getAllMeasurements();
- List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
- for (String measurement : allMeasurements) {
- // TODO: use IDTable
- try {
- measurementSchemas.add(
- IoTDB.metaManager.getSeriesSchema(new PartialPath(device,
measurement)));
- } catch (PathNotExistException e) {
- logger.info("A deleted path is skipped: {}", e.getMessage());
- }
- }
+ Map<String, MeasurementSchema> schemaMap =
+ getMeasurementSchema(
+ device,
+ allMeasurements,
+ queryDataSource.getSeqResources(),
+ queryDataSource.getUnseqResources(),
+ readerCacheMap);
+ List<IMeasurementSchema> measurementSchemas = new
ArrayList<>(schemaMap.values());
if (measurementSchemas.isEmpty()) {
return;
}
@@ -173,12 +191,20 @@ public class CompactionUtils {
MultiTsFileDeviceIterator deviceIterator,
AbstractCompactionWriter compactionWriter,
QueryContext queryContext,
- QueryDataSource queryDataSource)
- throws IOException, InterruptedException {
+ QueryDataSource queryDataSource,
+ Map<TsFileResource, TsFileSequenceReader> readerCacheMap)
+ throws IOException, InterruptedException, IllegalPathException {
MultiTsFileDeviceIterator.MeasurementIterator measurementIterator =
deviceIterator.iterateNotAlignedSeries(device, false);
Set<String> allMeasurements = measurementIterator.getAllMeasurements();
int subTaskNums = Math.min(allMeasurements.size(), subTaskNum);
+ Map<String, MeasurementSchema> schemaMap =
+ getMeasurementSchema(
+ device,
+ allMeasurements,
+ queryDataSource.getSeqResources(),
+ queryDataSource.getUnseqResources(),
+ readerCacheMap);
// assign all measurements to different sub tasks
Set<String>[] measurementsForEachSubTask = new HashSet[subTaskNums];
@@ -203,6 +229,7 @@ public class CompactionUtils {
queryContext,
queryDataSource,
compactionWriter,
+ schemaMap,
i)));
}
@@ -220,6 +247,60 @@ public class CompactionUtils {
compactionWriter.endChunkGroup();
}
+ private static Map<String, MeasurementSchema> getMeasurementSchema(
+ String device,
+ Set<String> measurements,
+ List<TsFileResource> seqFiles,
+ List<TsFileResource> unseqFiles,
+ Map<TsFileResource, TsFileSequenceReader> readerCacheMap)
+ throws IllegalPathException, IOException {
+ HashMap<String, MeasurementSchema> schemaMap = new HashMap<>();
+ List<TsFileResource> allResources = new LinkedList<>(seqFiles);
+ allResources.addAll(unseqFiles);
+ // sort the tsfile by version, so that we can iterate the tsfile from the
newest to oldest
+ allResources.sort(
+ (o1, o2) -> {
+ try {
+ TsFileNameGenerator.TsFileName n1 =
+ TsFileNameGenerator.getTsFileName(o1.getTsFile().getName());
+ TsFileNameGenerator.TsFileName n2 =
+ TsFileNameGenerator.getTsFileName(o2.getTsFile().getName());
+ return (int) (n2.getVersion() - n1.getVersion());
+ } catch (IOException e) {
+ return 0;
+ }
+ });
+ for (String measurement : measurements) {
+ for (TsFileResource tsFileResource : allResources) {
+ if (!tsFileResource.mayContainsDevice(device)) {
+ continue;
+ }
+ MeasurementSchema schema =
+ getMeasurementSchemaFromReader(
+ tsFileResource,
+ readerCacheMap.computeIfAbsent(
+ tsFileResource,
+ x -> {
+ try {
+
FileReaderManager.getInstance().increaseFileReaderReference(x, true);
+ return
FileReaderManager.getInstance().get(x.getTsFilePath(), true);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format(
+ "Failed to construct sequence reader for %s",
tsFileResource));
+ }
+ }),
+ device,
+ measurement);
+ if (schema != null) {
+ schemaMap.put(measurement, schema);
+ break;
+ }
+ }
+ }
+ return schemaMap;
+ }
+
public static void writeWithReader(
AbstractCompactionWriter writer, IBatchReader reader, int subTaskId)
throws IOException {
while (reader.hasNextBatch()) {
@@ -231,6 +312,21 @@ public class CompactionUtils {
}
}
+ private static MeasurementSchema getMeasurementSchemaFromReader(
+ TsFileResource resource, TsFileSequenceReader reader, String device,
String measurement)
+ throws IllegalPathException, IOException {
+ List<ChunkMetadata> chunkMetadata =
+ reader.getChunkMetadataList(new PartialPath(device, measurement));
+ if (chunkMetadata.size() > 0) {
+ chunkMetadata.get(0).setFilePath(resource.getTsFilePath());
+ Chunk chunk = ChunkCache.getInstance().get(chunkMetadata.get(0));
+ ChunkHeader header = chunk.getHeader();
+ return new MeasurementSchema(
+ measurement, header.getDataType(), header.getEncodingType(),
header.getCompressionType());
+ }
+ return null;
+ }
+
/**
* @param measurementIds if device is aligned, then measurementIds contain
all measurements. If
* device is not aligned, then measurementIds only contain one
measurement.
@@ -427,10 +523,17 @@ public class CompactionUtils {
private static void checkThreadInterrupted(List<TsFileResource>
tsFileResource)
throws InterruptedException {
- if (Thread.currentThread().isInterrupted()) {
+ if (Thread.currentThread().isInterrupted() || !IoTDB.activated) {
throw new InterruptedException(
String.format(
"[Compaction] compaction for target file %s abort",
tsFileResource.toString()));
}
}
+
+ private static void clearReaderCache(Map<TsFileResource,
TsFileSequenceReader> readerCacheMap)
+ throws IOException {
+ for (TsFileResource resource : readerCacheMap.keySet()) {
+ FileReaderManager.getInstance().decreaseFileReaderReference(resource,
true);
+ }
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/manage/CrossSpaceCompactionResource.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/manage/CrossSpaceCompactionResource.java
index a19fef77f6..7ced2fbcfc 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/manage/CrossSpaceCompactionResource.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/manage/CrossSpaceCompactionResource.java
@@ -21,9 +21,7 @@ package
org.apache.iotdb.db.engine.compaction.cross.rewrite.manage;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
@@ -111,10 +109,6 @@ public class CrossSpaceCompactionResource {
chunkWriterCache.clear();
}
- public IMeasurementSchema getSchema(PartialPath path) throws
MetadataException {
- return IoTDB.metaManager.getSeriesSchema(path);
- }
-
/**
* Construct the a new or get an existing TsFileSequenceReader of a TsFile.
*
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java
index 08f63c6e8b..f95fec9033 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java
@@ -22,19 +22,17 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.engine.compaction.CompactionUtils;
import org.apache.iotdb.db.engine.compaction.writer.AbstractCompactionWriter;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.exception.metadata.PathNotExistException;
-import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.tsfile.read.reader.IBatchReader;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
@@ -51,6 +49,7 @@ public class SubCompactionTask implements Callable<Void> {
private final QueryContext queryContext;
private final QueryDataSource queryDataSource;
private final AbstractCompactionWriter compactionWriter;
+ private final Map<String, MeasurementSchema> schemaMap;
private final int taskId;
public SubCompactionTask(
@@ -59,26 +58,22 @@ public class SubCompactionTask implements Callable<Void> {
QueryContext queryContext,
QueryDataSource queryDataSource,
AbstractCompactionWriter compactionWriter,
+ Map<String, MeasurementSchema> schemaMap,
int taskId) {
this.device = device;
this.measurementList = measurementList;
this.queryContext = queryContext;
this.queryDataSource = queryDataSource;
this.compactionWriter = compactionWriter;
+ this.schemaMap = schemaMap;
this.taskId = taskId;
}
@Override
public Void call() throws Exception {
for (String measurement : measurementList) {
- List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
- try {
- measurementSchemas.add(
- IoTDB.metaManager.getSeriesSchema(new PartialPath(device,
measurement)));
- } catch (PathNotExistException e) {
- logger.info("A deleted path is skipped: {}", e.getMessage());
- continue;
- }
+ List<IMeasurementSchema> measurementSchemas =
+ Collections.singletonList(schemaMap.get(measurement));
IBatchReader dataBatchReader =
CompactionUtils.constructReader(
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
index cf402099c7..1f48abf35d 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
@@ -31,7 +31,6 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.service.IoTDB;
@@ -94,7 +93,7 @@ public class InnerSpaceCompactionUtils {
private static void checkThreadInterrupted(TsFileResource tsFileResource)
throws InterruptedException {
- if (Thread.currentThread().isInterrupted()) {
+ if (Thread.currentThread().isInterrupted() || !IoTDB.activated) {
throw new InterruptedException(
String.format(
"[Compaction] compaction for target file %s abort",
tsFileResource.toString()));
@@ -119,15 +118,8 @@ public class InnerSpaceCompactionUtils {
// dead-loop.
LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>>
readerAndChunkMetadataList =
seriesIterator.getMetadataListForCurrentSeries();
- try {
- measurementSchema = IoTDB.metaManager.getSeriesSchema(p);
- } catch (PathNotExistException e) {
- logger.info("A deleted path is skipped: {}", e.getMessage());
- continue;
- }
SingleSeriesCompactionExecutor compactionExecutorOfCurrentTimeSeries =
- new SingleSeriesCompactionExecutor(
- p, measurementSchema, readerAndChunkMetadataList, writer,
targetResource);
+ new SingleSeriesCompactionExecutor(p, readerAndChunkMetadataList,
writer, targetResource);
compactionExecutorOfCurrentTimeSeries.execute();
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
index cf9a04f41d..2d49094f44 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
@@ -26,6 +26,7 @@ import
org.apache.iotdb.db.engine.compaction.constant.ProcessChunkType;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
@@ -36,6 +37,7 @@ import
org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderByTimestamp;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import com.google.common.util.concurrent.RateLimiter;
@@ -46,12 +48,12 @@ import java.util.List;
/** This class is used to compact one series during inner space compaction. */
public class SingleSeriesCompactionExecutor {
+ private PartialPath series;
+ private IMeasurementSchema schema;
private String device;
private LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>>
readerAndChunkMetadataList;
private TsFileIOWriter fileWriter;
private TsFileResource targetResource;
-
- private IMeasurementSchema schema;
private ChunkWriterImpl chunkWriter;
private Chunk cachedChunk;
private ChunkMetadata cachedChunkMetadata;
@@ -75,15 +77,14 @@ public class SingleSeriesCompactionExecutor {
public SingleSeriesCompactionExecutor(
PartialPath series,
- IMeasurementSchema measurementSchema,
LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>>
readerAndChunkMetadataList,
TsFileIOWriter fileWriter,
TsFileResource targetResource) {
this.device = series.getDevice();
this.readerAndChunkMetadataList = readerAndChunkMetadataList;
this.fileWriter = fileWriter;
- this.schema = measurementSchema;
- this.chunkWriter = new ChunkWriterImpl(this.schema);
+ this.series = series;
+ this.chunkWriter = null;
this.cachedChunk = null;
this.cachedChunkMetadata = null;
this.targetResource = targetResource;
@@ -101,6 +102,10 @@ public class SingleSeriesCompactionExecutor {
List<ChunkMetadata> chunkMetadataList = readerListPair.right;
for (ChunkMetadata chunkMetadata : chunkMetadataList) {
Chunk currentChunk = reader.readMemChunk(chunkMetadata);
+ if (chunkWriter == null) {
+ constructChunkWriterFromReadChunk(currentChunk);
+ }
+
CompactionMetricsManager.recordReadInfo(
currentChunk.getHeader().getSerializedSize() +
currentChunk.getHeader().getDataSize());
@@ -135,6 +140,17 @@ public class SingleSeriesCompactionExecutor {
targetResource.updateEndTime(device, maxEndTimestamp);
}
+ private void constructChunkWriterFromReadChunk(Chunk chunk) {
+ ChunkHeader chunkHeader = chunk.getHeader();
+ this.schema =
+ new MeasurementSchema(
+ series.getMeasurement(),
+ chunkHeader.getDataType(),
+ chunkHeader.getEncodingType(),
+ chunkHeader.getCompressionType());
+ this.chunkWriter = new ChunkWriterImpl(this.schema);
+ }
+
private long getChunkSize(Chunk chunk) {
return chunk.getHeader().getSerializedSize() +
chunk.getHeader().getDataSize();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 4090c376fb..e103071b74 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -62,6 +62,7 @@ public class IoTDB implements IoTDBMBean {
private static final RegisterManager registerManager = new RegisterManager();
public static MManager metaManager = MManager.getInstance();
public static ServiceProvider serviceProvider;
+ public static volatile boolean activated = false;
private static boolean clusterMode = false;
public static IoTDB getInstance() {
@@ -125,6 +126,7 @@ public class IoTDB implements IoTDBMBean {
// reset config
config.setAutoCreateSchemaEnabled(prevIsAutoCreateSchemaEnabled);
config.setEnablePartialInsert(prevIsEnablePartialInsert);
+ activated = true;
logger.info("{} has started.", IoTDBConstant.GLOBAL_DB_NAME);
}
@@ -238,6 +240,7 @@ public class IoTDB implements IoTDBMBean {
PrimitiveArrayManager.close();
SystemInfo.getInstance().close();
JMXService.deregisterMBean(mbeanName);
+ activated = false;
logger.info("IoTDB is deactivated.");
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
b/server/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
index 939f172d54..403b714d35 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.db.service;
-import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
import org.apache.iotdb.db.utils.MemUtils;
import org.slf4j.Logger;
@@ -30,7 +29,6 @@ public class IoTDBShutdownHook extends Thread {
@Override
public void run() {
- CompactionTaskManager.getInstance().stop();
if (logger.isInfoEnabled()) {
logger.info(
"IoTDB exits. Jvm memory usage: {}",
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionUtilsNoAlignedTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionUtilsNoAlignedTest.java
index 8251b0d33f..b5e00e9dd0 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionUtilsNoAlignedTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionUtilsNoAlignedTest.java
@@ -38,6 +38,7 @@ import
org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -446,7 +447,10 @@ public class InnerSpaceCompactionUtilsNoAlignedTest {
new TsFileResource(new File(SEQ_DIRS,
String.format("%d-%d-0-0.tsfile", i + 1, i + 1)));
sourceFiles.add(resource);
CompactionFileGeneratorUtils.writeTsFile(
- fullPathSetWithDeleted, chunkPagePointsNum, i * 1500L, resource);
+ fullPathSetWithDeleted, chunkPagePointsNum, i * 2000L, resource);
+ Map<String, Pair<Long, Long>> deletionMap = new HashMap<>();
+ deletionMap.put(deletedPath, new Pair<>(i * 2000L, (i + 1) * 2000L));
+ CompactionFileGeneratorUtils.generateMods(deletionMap, resource,
false);
}
Map<PartialPath, List<TimeValuePair>> originData =
CompactionCheckerUtils.getDataByQuery(paths, schemaList,
sourceFiles, new ArrayList<>());