This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 421ee8b [IOTDB-2206] Rename StorageGroupProcessor to
VirtualStorageGroupProcessor (#4646)
421ee8b is described below
commit 421ee8b35b6af6bd8a14153cf80d74fc10cefede
Author: SzyWilliam <[email protected]>
AuthorDate: Wed Dec 29 10:43:43 2021 +0800
[IOTDB-2206] Rename StorageGroupProcessor to VirtualStorageGroupProcessor
(#4646)
---
.../iotdb/cluster/ClusterFileFlushPolicy.java | 8 +-
.../partition/slot/SlotTimePartitionFilter.java | 2 +-
.../iotdb/cluster/query/ClusterPlanExecutor.java | 2 +-
.../cluster/server/member/DataGroupMember.java | 2 +-
.../cluster/log/applier/DataLogApplierTest.java | 10 +-
.../cluster/log/snapshot/FileSnapshotTest.java | 14 +-
.../log/snapshot/PartitionedSnapshotTest.java | 4 +-
.../cluster/log/snapshot/PullSnapshotTaskTest.java | 4 +-
.../cluster/server/member/DataGroupMemberTest.java | 4 +-
.../cluster/server/member/MetaGroupMemberTest.java | 4 +-
docs/SystemDesign/StorageEngine/Recover.md | 2 +-
docs/SystemDesign/StorageEngine/StorageEngine.md | 2 +-
docs/zh/SystemDesign/StorageEngine/Recover.md | 4 +-
.../zh/SystemDesign/StorageEngine/StorageEngine.md | 2 +-
.../db/integration/IoTDBNewTsFileCompactionIT.java | 6 +-
.../org/apache/iotdb/db/engine/StorageEngine.java | 142 +++++++++---------
.../compaction/task/CompactionRecoverTask.java | 2 +-
.../iotdb/db/engine/flush/TsFileFlushPolicy.java | 11 +-
.../db/engine/storagegroup/StorageGroupInfo.java | 18 +--
.../db/engine/storagegroup/TsFileProcessor.java | 4 +-
.../db/engine/storagegroup/TsFileResource.java | 4 +-
...ssor.java => VirtualStorageGroupProcessor.java} | 6 +-
...eGroupManager.java => StorageGroupManager.java} | 167 ++++++++++++---------
.../apache/iotdb/db/metadata/tag/TagManager.java | 4 +-
.../org/apache/iotdb/db/monitor/StatMonitor.java | 8 +-
.../apache/iotdb/db/qp/executor/IPlanExecutor.java | 2 +-
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 2 +-
.../iotdb/db/qp/physical/crud/DeletePlan.java | 2 +-
.../db/qp/physical/sys/DeleteTimeSeriesPlan.java | 2 +-
.../groupby/GroupByWithValueFilterDataSet.java | 4 +-
.../groupby/GroupByWithoutValueFilterDataSet.java | 4 +-
.../db/query/executor/AggregationExecutor.java | 6 +-
.../iotdb/db/query/executor/FillQueryExecutor.java | 4 +-
.../iotdb/db/query/executor/LastQueryExecutor.java | 4 +-
.../db/query/executor/RawDataQueryExecutor.java | 6 +-
.../query/timegenerator/ServerTimeGenerator.java | 4 +-
.../org/apache/iotdb/db/rescon/SystemInfo.java | 10 +-
.../recover/SizeTieredCompactionRecoverTest.java | 4 +-
.../engine/modification/DeletionFileNodeTest.java | 6 +-
.../storagegroup/StorageGroupProcessorTest.java | 4 +-
.../iotdb/db/engine/storagegroup/TTLTest.java | 52 +++----
.../db/sync/receiver/load/FileLoaderTest.java | 12 +-
.../recover/SyncReceiverLogAnalyzerTest.java | 4 +-
43 files changed, 297 insertions(+), 271 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterFileFlushPolicy.java
b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterFileFlushPolicy.java
index c4e29cc..81ef9ed 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterFileFlushPolicy.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterFileFlushPolicy.java
@@ -21,8 +21,8 @@ package org.apache.iotdb.cluster;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy;
-import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,7 +57,9 @@ public class ClusterFileFlushPolicy implements
TsFileFlushPolicy {
@Override
public void apply(
- StorageGroupProcessor storageGroupProcessor, TsFileProcessor processor,
boolean isSeq) {
+ VirtualStorageGroupProcessor virtualStorageGroupProcessor,
+ TsFileProcessor processor,
+ boolean isSeq) {
logger.info(
"The memtable size reaches the threshold, async flush it to tsfile:
{}",
processor.getTsFileResource().getTsFile().getAbsolutePath());
@@ -68,7 +70,7 @@ public class ClusterFileFlushPolicy implements
TsFileFlushPolicy {
closePartitionExecutor.submit(
() ->
metaGroupMember.closePartition(
- storageGroupProcessor.getVirtualStorageGroupId(),
+ virtualStorageGroupProcessor.getVirtualStorageGroupId(),
processor.getTimeRangeId(),
isSeq));
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotTimePartitionFilter.java
b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotTimePartitionFilter.java
index a864a1a..7a7825f 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotTimePartitionFilter.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotTimePartitionFilter.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.cluster.partition.slot;
import org.apache.iotdb.cluster.config.ClusterConstant;
-import
org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.TimePartitionFilter;
+import
org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor.TimePartitionFilter;
import java.util.Objects;
import java.util.Set;
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
index 0c35cb6..3485cfd 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
@@ -37,7 +37,7 @@ import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
-import
org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.TimePartitionFilter;
+import
org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor.TimePartitionFilter;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 690a826..36ee967 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -75,7 +75,7 @@ import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
-import
org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.TimePartitionFilter;
+import
org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor.TimePartitionFilter;
import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
index 8cf5feb..5f0f106 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
@@ -51,8 +51,8 @@ import
org.apache.iotdb.cluster.server.service.DataAsyncService;
import org.apache.iotdb.cluster.server.service.MetaAsyncService;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
-import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
-import
org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.TimePartitionFilter;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
+import
org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor.TimePartitionFilter;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -403,13 +403,13 @@ public class DataLogApplierTest extends IoTDBTest {
@Test
public void testApplyCloseFile() throws
org.apache.iotdb.db.exception.IoTDBException {
- StorageGroupProcessor storageGroupProcessor =
+ VirtualStorageGroupProcessor virtualStorageGroupProcessor =
StorageEngine.getInstance().getProcessor(new
PartialPath(TestUtils.getTestSg(0)));
-
TestCase.assertFalse(storageGroupProcessor.getWorkSequenceTsFileProcessors().isEmpty());
+
TestCase.assertFalse(virtualStorageGroupProcessor.getWorkSequenceTsFileProcessors().isEmpty());
CloseFileLog closeFileLog = new CloseFileLog(TestUtils.getTestSg(0), 0,
true);
applier.apply(closeFileLog);
-
TestCase.assertTrue(storageGroupProcessor.getWorkSequenceTsFileProcessors().isEmpty());
+
TestCase.assertTrue(virtualStorageGroupProcessor.getWorkSequenceTsFileProcessors().isEmpty());
}
@Test
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshotTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshotTest.java
index 2cd60e2..ab46385 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshotTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshotTest.java
@@ -27,8 +27,8 @@ import
org.apache.iotdb.cluster.partition.slot.SlotManager.SlotStatus;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.ModificationFile;
-import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
import org.apache.iotdb.db.exception.LoadFileException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -116,7 +116,7 @@ public class FileSnapshotTest extends DataSnapshotTest {
for (TimeseriesSchema timeseriesSchema : timeseriesSchemas) {
assertTrue(IoTDB.metaManager.isPathExist(new
PartialPath(timeseriesSchema.getFullPath())));
}
- StorageGroupProcessor processor =
+ VirtualStorageGroupProcessor processor =
StorageEngine.getInstance().getProcessor(new
PartialPath(TestUtils.getTestSg(0)));
assertEquals(9, processor.getPartitionMaxFileVersions(0));
List<TsFileResource> loadedFiles = processor.getSequenceFileTreeSet();
@@ -159,7 +159,7 @@ public class FileSnapshotTest extends DataSnapshotTest {
for (TimeseriesSchema timeseriesSchema : timeseriesSchemas) {
assertTrue(IoTDB.metaManager.isPathExist(new
PartialPath(timeseriesSchema.getFullPath())));
}
- StorageGroupProcessor processor =
+ VirtualStorageGroupProcessor processor =
StorageEngine.getInstance().getProcessor(new
PartialPath(TestUtils.getTestSg(0)));
assertEquals(9, processor.getPartitionMaxFileVersions(0));
List<TsFileResource> loadedFiles = processor.getSequenceFileTreeSet();
@@ -208,7 +208,7 @@ public class FileSnapshotTest extends DataSnapshotTest {
for (TimeseriesSchema timeseriesSchema : timeseriesSchemas) {
assertTrue(IoTDB.metaManager.isPathExist(new
PartialPath(timeseriesSchema.getFullPath())));
}
- StorageGroupProcessor processor =
+ VirtualStorageGroupProcessor processor =
StorageEngine.getInstance().getProcessor(new
PartialPath(TestUtils.getTestSg(0)));
assertEquals(9, processor.getPartitionMaxFileVersions(0));
List<TsFileResource> loadedFiles = processor.getSequenceFileTreeSet();
@@ -248,7 +248,7 @@ public class FileSnapshotTest extends DataSnapshotTest {
defaultInstaller.install(snapshotMap, false);
for (int j = 0; j < 10; j++) {
- StorageGroupProcessor processor =
+ VirtualStorageGroupProcessor processor =
StorageEngine.getInstance().getProcessor(new
PartialPath(TestUtils.getTestSg(j)));
assertEquals(9, processor.getPartitionMaxFileVersions(0));
List<TsFileResource> loadedFiles = processor.getSequenceFileTreeSet();
@@ -273,7 +273,7 @@ public class FileSnapshotTest extends DataSnapshotTest {
timeseriesSchemas.add(TestUtils.getTestTimeSeriesSchema(0, i));
}
for (int i = 0; i < 5; i++) {
- StorageGroupProcessor processor =
+ VirtualStorageGroupProcessor processor =
StorageEngine.getInstance().getProcessor(new
PartialPath(TestUtils.getTestSg(0)));
TsFileResource resource = tsFileResources.get(i);
String pathWithoutHardlinkSuffix =
@@ -293,7 +293,7 @@ public class FileSnapshotTest extends DataSnapshotTest {
for (TimeseriesSchema timeseriesSchema : timeseriesSchemas) {
assertTrue(IoTDB.metaManager.isPathExist(new
PartialPath(timeseriesSchema.getFullPath())));
}
- StorageGroupProcessor processor =
+ VirtualStorageGroupProcessor processor =
StorageEngine.getInstance().getProcessor(new
PartialPath(TestUtils.getTestSg(0)));
assertEquals(10, processor.getPartitionMaxFileVersions(0));
List<TsFileResource> loadedFiles = processor.getSequenceFileTreeSet();
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshotTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshotTest.java
index f55b9cd..27a1735 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshotTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshotTest.java
@@ -23,8 +23,8 @@ import org.apache.iotdb.cluster.common.TestUtils;
import org.apache.iotdb.cluster.exception.SnapshotInstallationException;
import org.apache.iotdb.cluster.partition.slot.SlotManager.SlotStatus;
import org.apache.iotdb.db.engine.StorageEngine;
-import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.path.PartialPath;
@@ -105,7 +105,7 @@ public class PartitionedSnapshotTest extends
DataSnapshotTest {
for (TimeseriesSchema timeseriesSchema : timeseriesSchemas) {
assertTrue(IoTDB.metaManager.isPathExist(new
PartialPath(timeseriesSchema.getFullPath())));
}
- StorageGroupProcessor processor =
+ VirtualStorageGroupProcessor processor =
StorageEngine.getInstance().getProcessor(new
PartialPath(TestUtils.getTestSg(0)));
assertEquals(9, processor.getPartitionMaxFileVersions(0));
List<TsFileResource> loadedFiles = processor.getSequenceFileTreeSet();
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java
index eee4320..aea2a58 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java
@@ -36,8 +36,8 @@ import org.apache.iotdb.cluster.server.member.DataGroupMember;
import org.apache.iotdb.cluster.utils.IOUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
-import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -291,7 +291,7 @@ public class PullSnapshotTaskTest extends DataSnapshotTest {
for (TimeseriesSchema timeseriesSchema : timeseriesSchemas) {
assertTrue(IoTDB.metaManager.isPathExist(new
PartialPath(timeseriesSchema.getFullPath())));
}
- StorageGroupProcessor processor =
+ VirtualStorageGroupProcessor processor =
StorageEngine.getInstance().getProcessor(new
PartialPath(TestUtils.getTestSg(0)));
assertEquals(9, processor.getPartitionMaxFileVersions(0));
List<TsFileResource> loadedFiles = processor.getSequenceFileTreeSet();
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index fcb7a64..6ff693f 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -63,8 +63,8 @@ import
org.apache.iotdb.cluster.server.service.DataAsyncService;
import org.apache.iotdb.cluster.utils.Constants;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.modification.Deletion;
-import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.TriggerExecutionException;
import org.apache.iotdb.db.exception.WriteProcessException;
@@ -497,7 +497,7 @@ public class DataGroupMemberTest extends BaseMember {
snapshot.addFile(tsFileResource, TestUtils.getNode(0), true);
// create a local resource1
- StorageGroupProcessor processor;
+ VirtualStorageGroupProcessor processor;
while (true) {
try {
processor =
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index ecdc609..980c242 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -84,7 +84,7 @@ import
org.apache.iotdb.db.auth.authorizer.LocalFileAuthorizer;
import org.apache.iotdb.db.auth.entity.Role;
import org.apache.iotdb.db.auth.entity.User;
import org.apache.iotdb.db.engine.StorageEngine;
-import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -613,7 +613,7 @@ public class MetaGroupMemberTest extends BaseMember {
ExecutorService testThreadPool = Executors.newFixedThreadPool(4);
assertTrue(testMetaMember.closePartition(TestUtils.getTestSg(0), 0, true));
- StorageGroupProcessor processor =
+ VirtualStorageGroupProcessor processor =
StorageEngine.getInstance().getProcessor(new
PartialPath(TestUtils.getTestSg(0)));
assertTrue(processor.getWorkSequenceTsFileProcessors().isEmpty());
diff --git a/docs/SystemDesign/StorageEngine/Recover.md
b/docs/SystemDesign/StorageEngine/Recover.md
index 4d0f64e..99baf52 100644
--- a/docs/SystemDesign/StorageEngine/Recover.md
+++ b/docs/SystemDesign/StorageEngine/Recover.md
@@ -57,7 +57,7 @@ Recovery are performed at the granularity of the storage
group, and the entry po
## Recover a TsFile(Seq/Unseq) of each partiton
-* org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.recoverTsFiles
+*
org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor.recoverTsFiles
This method is mainly responsible for traversing all TsFiles passed in and
recovering them one by one.
diff --git a/docs/SystemDesign/StorageEngine/StorageEngine.md
b/docs/SystemDesign/StorageEngine/StorageEngine.md
index b53f2df..70444bc 100644
--- a/docs/SystemDesign/StorageEngine/StorageEngine.md
+++ b/docs/SystemDesign/StorageEngine/StorageEngine.md
@@ -39,7 +39,7 @@ In addition, the storage engine includes asynchronous
persistence and file merge
Responsible for writing and accessing an IoTDB instance and managing all
StorageGroupProsessor.
-* org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor
+* org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor
Responsible for writing and accessing data within a time partition of a
storage group.
diff --git a/docs/zh/SystemDesign/StorageEngine/Recover.md
b/docs/zh/SystemDesign/StorageEngine/Recover.md
index 1793dec..34848b9 100644
--- a/docs/zh/SystemDesign/StorageEngine/Recover.md
+++ b/docs/zh/SystemDesign/StorageEngine/Recover.md
@@ -25,7 +25,7 @@
## 存储组恢复流程
-* org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.recover()
+*
org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor.recover()
* 首先获得该存储组下所有以 .tsfile 结尾的数据文件,返回 TsFileResource,共有如下几个文件列表
@@ -58,7 +58,7 @@
## 恢复一个分区的(顺序/乱序) TsFile
-* org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.recoverTsFiles
+*
org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor.recoverTsFiles
该方法主要负责遍历传进来的所有 TsFile,挨个进行恢复。
diff --git a/docs/zh/SystemDesign/StorageEngine/StorageEngine.md
b/docs/zh/SystemDesign/StorageEngine/StorageEngine.md
index 4497608..836f816 100644
--- a/docs/zh/SystemDesign/StorageEngine/StorageEngine.md
+++ b/docs/zh/SystemDesign/StorageEngine/StorageEngine.md
@@ -39,7 +39,7 @@
负责一个 IoTDB 实例的写入和访问,管理所有的 StorageGroupProsessor。
-* org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor
+* org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor
负责一个存储组一个时间分区内的数据写入和访问。管理所有分区的 TsFileProcessor。
diff --git
a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBNewTsFileCompactionIT.java
b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBNewTsFileCompactionIT.java
index 204ba32..11b97cf 100644
---
a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBNewTsFileCompactionIT.java
+++
b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBNewTsFileCompactionIT.java
@@ -21,8 +21,8 @@ package org.apache.iotdb.db.integration;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
-import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -1049,9 +1049,9 @@ public class IoTDBNewTsFileCompactionIT {
/** wait until merge is finished */
private boolean waitForMergeFinish() throws StorageEngineException,
InterruptedException {
- StorageGroupProcessor storageGroupProcessor =
+ VirtualStorageGroupProcessor virtualStorageGroupProcessor =
StorageEngine.getInstance().getProcessor(storageGroupPath);
- TsFileManager resourceManager =
storageGroupProcessor.getTsFileResourceManager();
+ TsFileManager resourceManager =
virtualStorageGroupProcessor.getTsFileResourceManager();
long startTime = System.nanoTime();
// get the size of level 1's tsfile list to judge whether merge is finished
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index d3497ee..f208490 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -28,11 +28,11 @@ import org.apache.iotdb.db.engine.flush.FlushListener;
import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy;
import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy.DirectFlushPolicy;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
-import
org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.TimePartitionFilter;
import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import
org.apache.iotdb.db.engine.storagegroup.virtualSg.VirtualStorageGroupManager;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
+import
org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor.TimePartitionFilter;
+import org.apache.iotdb.db.engine.storagegroup.virtualSg.StorageGroupManager;
import org.apache.iotdb.db.exception.*;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -113,7 +113,7 @@ public class StorageEngine implements IService {
FilePathUtils.regularizePath(config.getSystemDir()) + "storage_groups";
/** storage group name -> storage group processor */
- private final ConcurrentHashMap<PartialPath, VirtualStorageGroupManager>
processorMap =
+ private final ConcurrentHashMap<PartialPath, StorageGroupManager>
processorMap =
new ConcurrentHashMap<>();
private AtomicBoolean isAllSgReady = new AtomicBoolean(false);
@@ -220,12 +220,12 @@ public class StorageEngine implements IService {
List<IStorageGroupMNode> sgNodes =
IoTDB.metaManager.getAllStorageGroupNodes();
List<Future<Void>> futures = new LinkedList<>();
for (IStorageGroupMNode storageGroup : sgNodes) {
- VirtualStorageGroupManager virtualStorageGroupManager =
+ StorageGroupManager storageGroupManager =
processorMap.computeIfAbsent(
- storageGroup.getPartialPath(), id -> new
VirtualStorageGroupManager(true));
+ storageGroup.getPartialPath(), id -> new
StorageGroupManager(true));
// recover all virtual storage groups in one logic storage group
- virtualStorageGroupManager.asyncRecover(storageGroup,
recoveryThreadPool, futures);
+ storageGroupManager.asyncRecover(storageGroup, recoveryThreadPool,
futures);
}
// operations after all virtual storage groups are recovered
@@ -279,7 +279,7 @@ public class StorageEngine implements IService {
private void checkTTL() {
try {
- for (VirtualStorageGroupManager processor : processorMap.values()) {
+ for (StorageGroupManager processor : processorMap.values()) {
processor.checkTTL();
}
} catch (ConcurrentModificationException e) {
@@ -324,7 +324,7 @@ public class StorageEngine implements IService {
private void timedFlushSeqMemTable() {
try {
- for (VirtualStorageGroupManager processor : processorMap.values()) {
+ for (StorageGroupManager processor : processorMap.values()) {
processor.timedFlushSeqMemTable();
}
} catch (Exception e) {
@@ -334,7 +334,7 @@ public class StorageEngine implements IService {
private void timedFlushUnseqMemTable() {
try {
- for (VirtualStorageGroupManager processor : processorMap.values()) {
+ for (StorageGroupManager processor : processorMap.values()) {
processor.timedFlushUnseqMemTable();
}
} catch (Exception e) {
@@ -344,7 +344,7 @@ public class StorageEngine implements IService {
private void timedCloseTsFileProcessor() {
try {
- for (VirtualStorageGroupManager processor : processorMap.values()) {
+ for (StorageGroupManager processor : processorMap.values()) {
processor.timedCloseTsFileProcessor();
}
} catch (Exception e) {
@@ -354,8 +354,8 @@ public class StorageEngine implements IService {
@Override
public void stop() {
- for (VirtualStorageGroupManager virtualStorageGroupManager :
processorMap.values()) {
- virtualStorageGroupManager.stopCompactionSchedulerPool();
+ for (StorageGroupManager storageGroupManager : processorMap.values()) {
+ storageGroupManager.stopCompactionSchedulerPool();
}
syncCloseAllProcessor();
stopTimedService(ttlCheckThread, "TTlCheckThread");
@@ -386,8 +386,8 @@ public class StorageEngine implements IService {
@Override
public void shutdown(long milliseconds) throws ShutdownException {
try {
- for (VirtualStorageGroupManager virtualStorageGroupManager :
processorMap.values()) {
- virtualStorageGroupManager.stopCompactionSchedulerPool();
+ for (StorageGroupManager storageGroupManager : processorMap.values()) {
+ storageGroupManager.stopCompactionSchedulerPool();
}
forceCloseAllProcessor();
} catch (TsFileProcessorException e) {
@@ -455,7 +455,7 @@ public class StorageEngine implements IService {
* @param path storage group path
* @return storage group processor
*/
- public StorageGroupProcessor getProcessorDirectly(PartialPath path)
+ public VirtualStorageGroupProcessor getProcessorDirectly(PartialPath path)
throws StorageEngineException {
PartialPath storageGroupPath;
try {
@@ -473,7 +473,7 @@ public class StorageEngine implements IService {
* @param path device path
* @return storage group processor
*/
- public StorageGroupProcessor getProcessor(PartialPath path) throws
StorageEngineException {
+ public VirtualStorageGroupProcessor getProcessor(PartialPath path) throws
StorageEngineException {
try {
IStorageGroupMNode storageGroupMNode =
IoTDB.metaManager.getStorageGroupNodeByPath(path);
return getStorageGroupProcessorByPath(path, storageGroupMNode);
@@ -492,9 +492,9 @@ public class StorageEngine implements IService {
List<String> lockHolderList = new ArrayList<>(pathList.size());
for (PartialPath path : pathList) {
IStorageGroupMNode storageGroupMNode =
IoTDB.metaManager.getStorageGroupNodeByPath(path);
- StorageGroupProcessor storageGroupProcessor =
+ VirtualStorageGroupProcessor virtualStorageGroupProcessor =
getStorageGroupProcessorByPath(path, storageGroupMNode);
- lockHolderList.add(storageGroupProcessor.getInsertWriteLockHolder());
+
lockHolderList.add(virtualStorageGroupProcessor.getInsertWriteLockHolder());
}
return lockHolderList;
} catch (StorageGroupProcessorException | MetadataException e) {
@@ -512,21 +512,20 @@ public class StorageEngine implements IService {
*/
@SuppressWarnings("java:S2445")
// actually storageGroupMNode is a unique object on the mtree, synchronize
it is reasonable
- private StorageGroupProcessor getStorageGroupProcessorByPath(
+ private VirtualStorageGroupProcessor getStorageGroupProcessorByPath(
PartialPath devicePath, IStorageGroupMNode storageGroupMNode)
throws StorageGroupProcessorException, StorageEngineException {
- VirtualStorageGroupManager virtualStorageGroupManager =
- processorMap.get(storageGroupMNode.getPartialPath());
- if (virtualStorageGroupManager == null) {
+ StorageGroupManager storageGroupManager =
processorMap.get(storageGroupMNode.getPartialPath());
+ if (storageGroupManager == null) {
synchronized (this) {
- virtualStorageGroupManager =
processorMap.get(storageGroupMNode.getPartialPath());
- if (virtualStorageGroupManager == null) {
- virtualStorageGroupManager = new VirtualStorageGroupManager();
- processorMap.put(storageGroupMNode.getPartialPath(),
virtualStorageGroupManager);
+ storageGroupManager =
processorMap.get(storageGroupMNode.getPartialPath());
+ if (storageGroupManager == null) {
+ storageGroupManager = new StorageGroupManager();
+ processorMap.put(storageGroupMNode.getPartialPath(),
storageGroupManager);
}
}
}
- return virtualStorageGroupManager.getProcessor(devicePath,
storageGroupMNode);
+ return storageGroupManager.getProcessor(devicePath, storageGroupMNode);
}
/**
@@ -535,18 +534,18 @@ public class StorageEngine implements IService {
* @param virtualStorageGroupId virtual storage group id e.g. 1
* @param logicalStorageGroupName logical storage group name e.g. root.sg1
*/
- public StorageGroupProcessor buildNewStorageGroupProcessor(
+ public VirtualStorageGroupProcessor buildNewStorageGroupProcessor(
PartialPath logicalStorageGroupName,
IStorageGroupMNode storageGroupMNode,
String virtualStorageGroupId)
throws StorageGroupProcessorException {
- StorageGroupProcessor processor;
+ VirtualStorageGroupProcessor processor;
logger.info(
"construct a processor instance, the storage group is {}, Thread is
{}",
logicalStorageGroupName,
Thread.currentThread().getId());
processor =
- new StorageGroupProcessor(
+ new VirtualStorageGroupProcessor(
systemDir + File.separator + logicalStorageGroupName,
virtualStorageGroupId,
fileFlushPolicy,
@@ -560,8 +559,8 @@ public class StorageEngine implements IService {
/** This function is just for unit test. */
@TestOnly
public synchronized void reset() {
- for (VirtualStorageGroupManager virtualStorageGroupManager :
processorMap.values()) {
- virtualStorageGroupManager.reset();
+ for (StorageGroupManager storageGroupManager : processorMap.values()) {
+ storageGroupManager.reset();
}
}
@@ -578,10 +577,11 @@ public class StorageEngine implements IService {
throw new StorageEngineException(e);
}
}
- StorageGroupProcessor storageGroupProcessor =
getProcessor(insertRowPlan.getDeviceId());
+ VirtualStorageGroupProcessor virtualStorageGroupProcessor =
+ getProcessor(insertRowPlan.getDeviceId());
try {
- storageGroupProcessor.insert(insertRowPlan);
+ virtualStorageGroupProcessor.insert(insertRowPlan);
if (config.isEnableStatMonitor()) {
try {
updateMonitorStatistics(
@@ -606,12 +606,12 @@ public class StorageEngine implements IService {
throw new StorageEngineException(e);
}
}
- StorageGroupProcessor storageGroupProcessor =
+ VirtualStorageGroupProcessor virtualStorageGroupProcessor =
getProcessor(insertRowsOfOneDevicePlan.getDeviceId());
// TODO monitor: update statistics
try {
- storageGroupProcessor.insert(insertRowsOfOneDevicePlan);
+ virtualStorageGroupProcessor.insert(insertRowsOfOneDevicePlan);
} catch (WriteProcessException e) {
throw new StorageEngineException(e);
}
@@ -629,9 +629,9 @@ public class StorageEngine implements IService {
throw new BatchProcessException(results);
}
}
- StorageGroupProcessor storageGroupProcessor;
+ VirtualStorageGroupProcessor virtualStorageGroupProcessor;
try {
- storageGroupProcessor = getProcessor(insertTabletPlan.getDeviceId());
+ virtualStorageGroupProcessor =
getProcessor(insertTabletPlan.getDeviceId());
} catch (StorageEngineException e) {
throw new StorageEngineException(
String.format(
@@ -639,7 +639,7 @@ public class StorageEngine implements IService {
e);
}
- storageGroupProcessor.insertTablet(insertTabletPlan);
+ virtualStorageGroupProcessor.insertTablet(insertTabletPlan);
if (config.isEnableStatMonitor()) {
try {
@@ -654,12 +654,12 @@ public class StorageEngine implements IService {
}
private void updateMonitorStatistics(
- VirtualStorageGroupManager virtualStorageGroupManager, InsertPlan
insertPlan) {
+ StorageGroupManager storageGroupManager, InsertPlan insertPlan) {
StatMonitor monitor = StatMonitor.getInstance();
int successPointsNum =
insertPlan.getMeasurements().length -
insertPlan.getFailedMeasurementNumber();
// update to storage group statistics
- virtualStorageGroupManager.updateMonitorSeriesValue(successPointsNum);
+ storageGroupManager.updateMonitorSeriesValue(successPointsNum);
// update to global statistics
monitor.updateStatGlobalValue(successPointsNum);
}
@@ -667,14 +667,14 @@ public class StorageEngine implements IService {
/** flush command Sync asyncCloseOneProcessor all file node processors. */
public void syncCloseAllProcessor() {
logger.info("Start closing all storage group processor");
- for (VirtualStorageGroupManager processor : processorMap.values()) {
+ for (StorageGroupManager processor : processorMap.values()) {
processor.syncCloseAllWorkingTsFileProcessors();
}
}
public void forceCloseAllProcessor() throws TsFileProcessorException {
logger.info("Start force closing all storage group processor");
- for (VirtualStorageGroupManager processor : processorMap.values()) {
+ for (StorageGroupManager processor : processorMap.values()) {
processor.forceCloseAllWorkingTsFileProcessors();
}
}
@@ -685,8 +685,8 @@ public class StorageEngine implements IService {
return;
}
- VirtualStorageGroupManager virtualStorageGroupManager =
processorMap.get(storageGroupPath);
- virtualStorageGroupManager.closeStorageGroupProcessor(isSeq, isSync);
+ StorageGroupManager storageGroupManager =
processorMap.get(storageGroupPath);
+ storageGroupManager.closeStorageGroupProcessor(isSeq, isSync);
}
/**
@@ -703,8 +703,8 @@ public class StorageEngine implements IService {
throw new StorageGroupNotSetException(storageGroupPath.getFullPath());
}
- VirtualStorageGroupManager virtualStorageGroupManager =
processorMap.get(storageGroupPath);
- virtualStorageGroupManager.closeStorageGroupProcessor(partitionId, isSeq,
isSync);
+ StorageGroupManager storageGroupManager =
processorMap.get(storageGroupPath);
+ storageGroupManager.closeStorageGroupProcessor(partitionId, isSeq, isSync);
}
public void delete(
@@ -759,8 +759,8 @@ public class StorageEngine implements IService {
PartialPath fullPath, Filter filter, QueryContext context,
QueryFileManager filePathsManager)
throws StorageEngineException, QueryProcessException {
PartialPath deviceId = fullPath.getDevicePath();
- StorageGroupProcessor storageGroupProcessor = getProcessor(deviceId);
- return storageGroupProcessor.query(fullPath, context, filePathsManager,
filter);
+ VirtualStorageGroupProcessor virtualStorageGroupProcessor =
getProcessor(deviceId);
+ return virtualStorageGroupProcessor.query(fullPath, context,
filePathsManager, filter);
}
/**
@@ -770,8 +770,8 @@ public class StorageEngine implements IService {
*/
public int countUpgradeFiles() {
int totalUpgradeFileNum = 0;
- for (VirtualStorageGroupManager virtualStorageGroupManager :
processorMap.values()) {
- totalUpgradeFileNum += virtualStorageGroupManager.countUpgradeFiles();
+ for (StorageGroupManager storageGroupManager : processorMap.values()) {
+ totalUpgradeFileNum += storageGroupManager.countUpgradeFiles();
}
return totalUpgradeFileNum;
}
@@ -786,8 +786,8 @@ public class StorageEngine implements IService {
throw new StorageEngineException(
"Current system mode is read only, does not support file upgrade");
}
- for (VirtualStorageGroupManager virtualStorageGroupManager :
processorMap.values()) {
- virtualStorageGroupManager.upgradeAll();
+ for (StorageGroupManager storageGroupManager : processorMap.values()) {
+ storageGroupManager.upgradeAll();
}
}
@@ -797,7 +797,7 @@ public class StorageEngine implements IService {
List<TsFileResource> unseqResourcesToBeSettled,
List<String> tsFilePaths)
throws StorageEngineException {
- VirtualStorageGroupManager vsg = processorMap.get(sgPath);
+ StorageGroupManager vsg = processorMap.get(sgPath);
if (vsg == null) {
throw new StorageEngineException(
"The Storage Group " + sgPath.toString() + " is not existed.");
@@ -826,8 +826,8 @@ public class StorageEngine implements IService {
throw new StorageEngineException("Current system mode is read only, does
not support merge");
}
- for (VirtualStorageGroupManager virtualStorageGroupManager :
processorMap.values()) {
- virtualStorageGroupManager.mergeAll(isFullMerge);
+ for (StorageGroupManager storageGroupManager : processorMap.values()) {
+ storageGroupManager.mergeAll(isFullMerge);
}
}
@@ -879,8 +879,8 @@ public class StorageEngine implements IService {
deleteAllDataFilesInOneStorageGroup(storageGroupPath);
releaseWalDirectByteBufferPoolInOneStorageGroup(storageGroupPath);
- VirtualStorageGroupManager virtualStorageGroupManager =
processorMap.remove(storageGroupPath);
- virtualStorageGroupManager.deleteStorageGroupSystemFolder(
+ StorageGroupManager storageGroupManager =
processorMap.remove(storageGroupPath);
+ storageGroupManager.deleteStorageGroupSystemFolder(
systemDir + File.pathSeparator + storageGroupPath);
}
@@ -934,7 +934,7 @@ public class StorageEngine implements IService {
/** @return TsFiles (seq or unseq) grouped by their storage group and
partition number. */
public Map<PartialPath, Map<Long, List<TsFileResource>>>
getAllClosedStorageGroupTsFile() {
Map<PartialPath, Map<Long, List<TsFileResource>>> ret = new HashMap<>();
- for (Entry<PartialPath, VirtualStorageGroupManager> entry :
processorMap.entrySet()) {
+ for (Entry<PartialPath, StorageGroupManager> entry :
processorMap.entrySet()) {
entry.getValue().getAllClosedStorageGroupTsFile(entry.getKey(), ret);
}
return ret;
@@ -946,8 +946,8 @@ public class StorageEngine implements IService {
public boolean isFileAlreadyExist(
TsFileResource tsFileResource, PartialPath storageGroup, long
partitionNum) {
- VirtualStorageGroupManager virtualStorageGroupManager =
processorMap.get(storageGroup);
- if (virtualStorageGroupManager == null) {
+ StorageGroupManager storageGroupManager = processorMap.get(storageGroup);
+ if (storageGroupManager == null) {
return false;
}
@@ -976,7 +976,7 @@ public class StorageEngine implements IService {
}
}
- public Map<PartialPath, VirtualStorageGroupManager> getProcessorMap() {
+ public Map<PartialPath, StorageGroupManager> getProcessorMap() {
return processorMap;
}
@@ -988,7 +988,7 @@ public class StorageEngine implements IService {
*/
public Map<String, List<Pair<Long, Boolean>>>
getWorkingStorageGroupPartitions() {
Map<String, List<Pair<Long, Boolean>>> res = new ConcurrentHashMap<>();
- for (Entry<PartialPath, VirtualStorageGroupManager> entry :
processorMap.entrySet()) {
+ for (Entry<PartialPath, StorageGroupManager> entry :
processorMap.entrySet()) {
entry.getValue().getWorkingStorageGroupPartitions(entry.getKey().getFullPath(),
res);
}
return res;
@@ -1015,23 +1015,23 @@ public class StorageEngine implements IService {
}
/** get all merge lock of the storage group processor related to the query */
- public List<StorageGroupProcessor> mergeLock(List<PartialPath> pathList)
+ public List<VirtualStorageGroupProcessor> mergeLock(List<PartialPath>
pathList)
throws StorageEngineException {
- Set<StorageGroupProcessor> set = new HashSet<>();
+ Set<VirtualStorageGroupProcessor> set = new HashSet<>();
for (PartialPath path : pathList) {
set.add(getProcessor(path.getDevicePath()));
}
- List<StorageGroupProcessor> list =
+ List<VirtualStorageGroupProcessor> list =
set.stream()
-
.sorted(Comparator.comparing(StorageGroupProcessor::getVirtualStorageGroupId))
+
.sorted(Comparator.comparing(VirtualStorageGroupProcessor::getVirtualStorageGroupId))
.collect(Collectors.toList());
- list.forEach(StorageGroupProcessor::readLock);
+ list.forEach(VirtualStorageGroupProcessor::readLock);
return list;
}
/** unlock all merge lock of the storage group processor related to the
query */
- public void mergeUnLock(List<StorageGroupProcessor> list) {
- list.forEach(StorageGroupProcessor::readUnlock);
+ public void mergeUnLock(List<VirtualStorageGroupProcessor> list) {
+ list.forEach(VirtualStorageGroupProcessor::readUnlock);
}
static class InstanceHolder {
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionRecoverTask.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionRecoverTask.java
index 63526d1..937a4a6 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionRecoverTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionRecoverTask.java
@@ -23,8 +23,8 @@ import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.compaction.CompactionScheduler;
import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
import
org.apache.iotdb.db.engine.compaction.cross.inplace.recover.InplaceCompactionLogger;
-import
org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.CompactionRecoverCallBack;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
+import
org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor.CompactionRecoverCallBack;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java
b/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java
index 396847d..d71df87 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java
@@ -19,8 +19,8 @@
package org.apache.iotdb.db.engine.flush;
-import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
/**
* TsFileFlushPolicy is applied when a TsFileProcessor is full after
insertion. For standalone
@@ -29,17 +29,20 @@ import
org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
*/
public interface TsFileFlushPolicy {
- void apply(StorageGroupProcessor storageGroupProcessor, TsFileProcessor
processor, boolean isSeq);
+ void apply(
+ VirtualStorageGroupProcessor virtualStorageGroupProcessor,
+ TsFileProcessor processor,
+ boolean isSeq);
class DirectFlushPolicy implements TsFileFlushPolicy {
@Override
public void apply(
- StorageGroupProcessor storageGroupProcessor,
+ VirtualStorageGroupProcessor virtualStorageGroupProcessor,
TsFileProcessor tsFileProcessor,
boolean isSeq) {
if (tsFileProcessor.shouldClose()) {
- storageGroupProcessor.asyncCloseOneTsFileProcessor(isSeq,
tsFileProcessor);
+ virtualStorageGroupProcessor.asyncCloseOneTsFileProcessor(isSeq,
tsFileProcessor);
} else {
tsFileProcessor.asyncFlush();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
index c5ee561..d1a7a75 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
@@ -34,7 +34,7 @@ import java.util.function.Supplier;
/** The storageGroupInfo records the total memory cost of the Storage Group. */
public class StorageGroupInfo {
- private StorageGroupProcessor storageGroupProcessor;
+ private VirtualStorageGroupProcessor virtualStorageGroupProcessor;
/**
* The total Storage group memory cost, including unsealed TsFileResource,
ChunkMetadata, WAL,
@@ -51,13 +51,13 @@ public class StorageGroupInfo {
/** A set of all unclosed TsFileProcessors in this SG */
private List<TsFileProcessor> reportedTsps = new CopyOnWriteArrayList<>();
- public StorageGroupInfo(StorageGroupProcessor storageGroupProcessor) {
- this.storageGroupProcessor = storageGroupProcessor;
+ public StorageGroupInfo(VirtualStorageGroupProcessor
virtualStorageGroupProcessor) {
+ this.virtualStorageGroupProcessor = virtualStorageGroupProcessor;
memoryCost = new AtomicLong();
}
- public StorageGroupProcessor getStorageGroupProcessor() {
- return storageGroupProcessor;
+ public VirtualStorageGroupProcessor getVirtualStorageGroupProcessor() {
+ return virtualStorageGroupProcessor;
}
/** When create a new TsFileProcessor, call this method */
@@ -101,8 +101,8 @@ public class StorageGroupInfo {
}
public Supplier<ByteBuffer[]> getWalSupplier() {
- if (storageGroupProcessor != null) {
- return storageGroupProcessor::getWalDirectByteBuffer;
+ if (virtualStorageGroupProcessor != null) {
+ return virtualStorageGroupProcessor::getWalDirectByteBuffer;
} else { // only happens in test
return this::walSupplier;
}
@@ -119,8 +119,8 @@ public class StorageGroupInfo {
}
public Consumer<ByteBuffer[]> getWalConsumer() {
- if (storageGroupProcessor != null) {
- return storageGroupProcessor::releaseWalBuffer;
+ if (virtualStorageGroupProcessor != null) {
+ return virtualStorageGroupProcessor::releaseWalBuffer;
} else { // only happens in test
return this::walConsumer;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 32adc25..2ef0ddf 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -34,7 +34,7 @@ import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
-import
org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.UpdateEndTimeCallBack;
+import
org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor.UpdateEndTimeCallBack;
import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.WriteProcessRejectException;
@@ -1354,7 +1354,7 @@ public class TsFileProcessor {
}
public void submitAFlushTask() {
-
this.storageGroupInfo.getStorageGroupProcessor().submitAFlushTaskWhenShouldFlush(this);
+
this.storageGroupInfo.getVirtualStorageGroupProcessor().submitAFlushTaskWhenShouldFlush(this);
}
public boolean alreadyMarkedClosing() {
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 59e83ba..70db68a3 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -22,9 +22,9 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
-import
org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.SettleTsFileCallBack;
-import
org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.UpgradeTsFileResourceCallBack;
import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator.TsFileName;
+import
org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor.SettleTsFileCallBack;
+import
org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor.UpgradeTsFileResourceCallBack;
import org.apache.iotdb.db.engine.storagegroup.timeindex.DeviceTimeIndex;
import org.apache.iotdb.db.engine.storagegroup.timeindex.FileTimeIndex;
import org.apache.iotdb.db.engine.storagegroup.timeindex.ITimeIndex;
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
similarity index 99%
rename from
server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
rename to
server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
index d44b439..73be3cb 100755
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
@@ -145,7 +145,7 @@ import static
org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFF
* <p>When a TsFileProcessor is closed, the
closeUnsealedTsFileProcessorCallBack() method will be
* called as a callback.
*/
-public class StorageGroupProcessor {
+public class VirtualStorageGroupProcessor {
private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
private static final Logger DEBUG_LOGGER =
LoggerFactory.getLogger("QUERY_DEBUG");
@@ -156,7 +156,7 @@ public class StorageGroupProcessor {
*/
private static final int MERGE_MOD_START_VERSION_NUM = 1;
- private static final Logger logger =
LoggerFactory.getLogger(StorageGroupProcessor.class);
+ private static final Logger logger =
LoggerFactory.getLogger(VirtualStorageGroupProcessor.class);
/** indicating the file to be loaded already exists locally. */
private static final int POS_ALREADY_EXIST = -2;
/** indicating the file to be loaded overlap with some files. */
@@ -382,7 +382,7 @@ public class StorageGroupProcessor {
* @param fileFlushPolicy file flush policy
* @param logicalStorageGroupName logical storage group name e.g. root.sg1
*/
- public StorageGroupProcessor(
+ public VirtualStorageGroupProcessor(
String systemDir,
String virtualStorageGroupId,
TsFileFlushPolicy fileFlushPolicy,
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/StorageGroupManager.java
similarity index 69%
rename from
server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
rename to
server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/StorageGroupManager.java
index d37aefc..92b78bb 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/StorageGroupManager.java
@@ -19,10 +19,10 @@
package org.apache.iotdb.db.engine.storagegroup.virtualSg;
import org.apache.iotdb.db.engine.StorageEngine;
-import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
-import
org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.TimePartitionFilter;
import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
+import
org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor.TimePartitionFilter;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.StorageGroupNotReadyException;
import org.apache.iotdb.db.exception.StorageGroupProcessorException;
@@ -47,16 +47,16 @@ import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
/** Each storage group that set by users corresponds to a StorageGroupManager
*/
-public class VirtualStorageGroupManager {
+public class StorageGroupManager {
/** logger of this class */
- private static final Logger logger =
LoggerFactory.getLogger(VirtualStorageGroupManager.class);
+ private static final Logger logger =
LoggerFactory.getLogger(StorageGroupManager.class);
/** virtual storage group partitioner */
VirtualPartitioner partitioner = HashVirtualPartitioner.getInstance();
/** all virtual storage group processor */
- StorageGroupProcessor[] virtualStorageGroupProcessor;
+ VirtualStorageGroupProcessor[] virtualStorageGroupProcessor;
/**
* recover status of each virtual storage group processor, null if this
logical storage group is
@@ -69,12 +69,13 @@ public class VirtualStorageGroupManager {
/** value of root.stats."root.sg".TOTAL_POINTS */
private long monitorSeriesValue;
- public VirtualStorageGroupManager() {
+ public StorageGroupManager() {
this(false);
}
- public VirtualStorageGroupManager(boolean needRecovering) {
- virtualStorageGroupProcessor = new
StorageGroupProcessor[partitioner.getPartitionCount()];
+ public StorageGroupManager(boolean needRecovering) {
+ virtualStorageGroupProcessor =
+ new VirtualStorageGroupProcessor[partitioner.getPartitionCount()];
isVsgReady = new AtomicBoolean[partitioner.getPartitionCount()];
boolean recoverReady = !needRecovering;
for (int i = 0; i < partitioner.getPartitionCount(); i++) {
@@ -84,54 +85,60 @@ public class VirtualStorageGroupManager {
/** push forceCloseAllWorkingTsFileProcessors down to all sg */
public void forceCloseAllWorkingTsFileProcessors() throws
TsFileProcessorException {
- for (StorageGroupProcessor storageGroupProcessor :
virtualStorageGroupProcessor) {
- if (storageGroupProcessor != null) {
- storageGroupProcessor.forceCloseAllWorkingTsFileProcessors();
+ for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
+ this.virtualStorageGroupProcessor) {
+ if (virtualStorageGroupProcessor != null) {
+ virtualStorageGroupProcessor.forceCloseAllWorkingTsFileProcessors();
}
}
}
/** push syncCloseAllWorkingTsFileProcessors down to all sg */
public void syncCloseAllWorkingTsFileProcessors() {
- for (StorageGroupProcessor storageGroupProcessor :
virtualStorageGroupProcessor) {
- if (storageGroupProcessor != null) {
- storageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
+ for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
+ this.virtualStorageGroupProcessor) {
+ if (virtualStorageGroupProcessor != null) {
+ virtualStorageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
}
}
}
/** push check ttl down to all sg */
public void checkTTL() {
- for (StorageGroupProcessor storageGroupProcessor :
virtualStorageGroupProcessor) {
- if (storageGroupProcessor != null) {
- storageGroupProcessor.checkFilesTTL();
+ for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
+ this.virtualStorageGroupProcessor) {
+ if (virtualStorageGroupProcessor != null) {
+ virtualStorageGroupProcessor.checkFilesTTL();
}
}
}
/** push check sequence memtable flush interval down to all sg */
public void timedFlushSeqMemTable() {
- for (StorageGroupProcessor storageGroupProcessor :
virtualStorageGroupProcessor) {
- if (storageGroupProcessor != null) {
- storageGroupProcessor.timedFlushSeqMemTable();
+ for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
+ this.virtualStorageGroupProcessor) {
+ if (virtualStorageGroupProcessor != null) {
+ virtualStorageGroupProcessor.timedFlushSeqMemTable();
}
}
}
/** push check unsequence memtable flush interval down to all sg */
public void timedFlushUnseqMemTable() {
- for (StorageGroupProcessor storageGroupProcessor :
virtualStorageGroupProcessor) {
- if (storageGroupProcessor != null) {
- storageGroupProcessor.timedFlushUnseqMemTable();
+ for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
+ this.virtualStorageGroupProcessor) {
+ if (virtualStorageGroupProcessor != null) {
+ virtualStorageGroupProcessor.timedFlushUnseqMemTable();
}
}
}
/** push check TsFileProcessor close interval down to all sg */
public void timedCloseTsFileProcessor() {
- for (StorageGroupProcessor storageGroupProcessor :
virtualStorageGroupProcessor) {
- if (storageGroupProcessor != null) {
- storageGroupProcessor.timedCloseTsFileProcessor();
+ for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
+ this.virtualStorageGroupProcessor) {
+ if (virtualStorageGroupProcessor != null) {
+ virtualStorageGroupProcessor.timedCloseTsFileProcessor();
}
}
}
@@ -144,12 +151,12 @@ public class VirtualStorageGroupManager {
*/
@SuppressWarnings("java:S2445")
// actually storageGroupMNode is a unique object on the mtree, synchronize
it is reasonable
- public StorageGroupProcessor getProcessor(
+ public VirtualStorageGroupProcessor getProcessor(
PartialPath partialPath, IStorageGroupMNode storageGroupMNode)
throws StorageGroupProcessorException, StorageEngineException {
int loc = partitioner.deviceToVirtualStorageGroupId(partialPath);
- StorageGroupProcessor processor = virtualStorageGroupProcessor[loc];
+ VirtualStorageGroupProcessor processor = virtualStorageGroupProcessor[loc];
if (processor == null) {
// if finish recover
if (isVsgReady[loc].get()) {
@@ -187,7 +194,7 @@ public class VirtualStorageGroupManager {
Callable<Void> recoverVsgTask =
() -> {
isVsgReady[cur].set(false);
- StorageGroupProcessor processor = null;
+ VirtualStorageGroupProcessor processor = null;
try {
processor =
StorageEngine.getInstance()
@@ -224,7 +231,7 @@ public class VirtualStorageGroupManager {
/** push closeStorageGroupProcessor operation down to all virtual storage
group processors */
public void closeStorageGroupProcessor(boolean isSeq, boolean isSync) {
- for (StorageGroupProcessor processor : virtualStorageGroupProcessor) {
+ for (VirtualStorageGroupProcessor processor :
virtualStorageGroupProcessor) {
if (processor == null) {
continue;
}
@@ -268,7 +275,7 @@ public class VirtualStorageGroupManager {
/** push closeStorageGroupProcessor operation down to all virtual storage
group processors */
public void closeStorageGroupProcessor(long partitionId, boolean isSeq,
boolean isSync) {
- for (StorageGroupProcessor processor : virtualStorageGroupProcessor) {
+ for (VirtualStorageGroupProcessor processor :
virtualStorageGroupProcessor) {
if (processor != null) {
logger.info(
"async closing sg processor is called for closing {}, seq = {},
partitionId = {}",
@@ -307,9 +314,11 @@ public class VirtualStorageGroupManager {
long planIndex,
TimePartitionFilter timePartitionFilter)
throws IOException {
- for (StorageGroupProcessor storageGroupProcessor :
virtualStorageGroupProcessor) {
- if (storageGroupProcessor != null) {
- storageGroupProcessor.delete(path, startTime, endTime, planIndex,
timePartitionFilter);
+ for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
+ this.virtualStorageGroupProcessor) {
+ if (virtualStorageGroupProcessor != null) {
+ virtualStorageGroupProcessor.delete(
+ path, startTime, endTime, planIndex, timePartitionFilter);
}
}
}
@@ -317,9 +326,10 @@ public class VirtualStorageGroupManager {
/** push countUpgradeFiles operation down to all virtual storage group
processors */
public int countUpgradeFiles() {
int totalUpgradeFileNum = 0;
- for (StorageGroupProcessor storageGroupProcessor :
virtualStorageGroupProcessor) {
- if (storageGroupProcessor != null) {
- totalUpgradeFileNum += storageGroupProcessor.countUpgradeFiles();
+ for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
+ this.virtualStorageGroupProcessor) {
+ if (virtualStorageGroupProcessor != null) {
+ totalUpgradeFileNum +=
virtualStorageGroupProcessor.countUpgradeFiles();
}
}
@@ -328,9 +338,10 @@ public class VirtualStorageGroupManager {
/** push upgradeAll operation down to all virtual storage group processors */
public void upgradeAll() {
- for (StorageGroupProcessor storageGroupProcessor :
virtualStorageGroupProcessor) {
- if (storageGroupProcessor != null) {
- storageGroupProcessor.upgrade();
+ for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
+ this.virtualStorageGroupProcessor) {
+ if (virtualStorageGroupProcessor != null) {
+ virtualStorageGroupProcessor.upgrade();
}
}
}
@@ -339,9 +350,10 @@ public class VirtualStorageGroupManager {
List<TsFileResource> seqResourcesToBeSettled,
List<TsFileResource> unseqResourcesToBeSettled,
List<String> tsFilePaths) {
- for (StorageGroupProcessor storageGroupProcessor :
virtualStorageGroupProcessor) {
- if (storageGroupProcessor != null) {
- storageGroupProcessor.addSettleFilesToList(
+ for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
+ this.virtualStorageGroupProcessor) {
+ if (virtualStorageGroupProcessor != null) {
+ virtualStorageGroupProcessor.addSettleFilesToList(
seqResourcesToBeSettled, unseqResourcesToBeSettled, tsFilePaths);
}
}
@@ -349,34 +361,37 @@ public class VirtualStorageGroupManager {
/** push mergeAll operation down to all virtual storage group processors */
public void mergeAll(boolean isFullMerge) {
- for (StorageGroupProcessor storageGroupProcessor :
virtualStorageGroupProcessor) {
- if (storageGroupProcessor != null) {
- storageGroupProcessor.merge(isFullMerge);
+ for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
+ this.virtualStorageGroupProcessor) {
+ if (virtualStorageGroupProcessor != null) {
+ virtualStorageGroupProcessor.merge(isFullMerge);
}
}
}
/** push syncDeleteDataFiles operation down to all virtual storage group
processors */
public void syncDeleteDataFiles() {
- for (StorageGroupProcessor storageGroupProcessor :
virtualStorageGroupProcessor) {
- if (storageGroupProcessor != null) {
- storageGroupProcessor.syncDeleteDataFiles();
+ for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
+ this.virtualStorageGroupProcessor) {
+ if (virtualStorageGroupProcessor != null) {
+ virtualStorageGroupProcessor.syncDeleteDataFiles();
}
}
}
/** push setTTL operation down to all virtual storage group processors */
public void setTTL(long dataTTL) {
- for (StorageGroupProcessor storageGroupProcessor :
virtualStorageGroupProcessor) {
- if (storageGroupProcessor != null) {
- storageGroupProcessor.setDataTTL(dataTTL);
+ for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
+ this.virtualStorageGroupProcessor) {
+ if (virtualStorageGroupProcessor != null) {
+ virtualStorageGroupProcessor.setDataTTL(dataTTL);
}
}
}
/** push deleteStorageGroup operation down to all virtual storage group
processors */
public void deleteStorageGroupSystemFolder(String path) {
- for (StorageGroupProcessor processor : virtualStorageGroupProcessor) {
+ for (VirtualStorageGroupProcessor processor :
virtualStorageGroupProcessor) {
if (processor != null) {
processor.deleteFolder(path);
}
@@ -386,10 +401,11 @@ public class VirtualStorageGroupManager {
/** push getAllClosedStorageGroupTsFile operation down to all virtual
storage group processors */
public void getAllClosedStorageGroupTsFile(
PartialPath storageGroupName, Map<PartialPath, Map<Long,
List<TsFileResource>>> ret) {
- for (StorageGroupProcessor storageGroupProcessor :
virtualStorageGroupProcessor) {
- if (storageGroupProcessor != null) {
- List<TsFileResource> allResources =
storageGroupProcessor.getSequenceFileTreeSet();
- allResources.addAll(storageGroupProcessor.getUnSequenceFileList());
+ for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
+ this.virtualStorageGroupProcessor) {
+ if (virtualStorageGroupProcessor != null) {
+ List<TsFileResource> allResources =
virtualStorageGroupProcessor.getSequenceFileTreeSet();
+
allResources.addAll(virtualStorageGroupProcessor.getUnSequenceFileList());
for (TsFileResource tsfile : allResources) {
if (!tsfile.isClosed()) {
continue;
@@ -405,18 +421,20 @@ public class VirtualStorageGroupManager {
/** push setPartitionVersionToMax operation down to all virtual storage
group processors */
public void setPartitionVersionToMax(long partitionId, long newMaxVersion) {
- for (StorageGroupProcessor storageGroupProcessor :
virtualStorageGroupProcessor) {
- if (storageGroupProcessor != null) {
- storageGroupProcessor.setPartitionFileVersionToMax(partitionId,
newMaxVersion);
+ for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
+ this.virtualStorageGroupProcessor) {
+ if (virtualStorageGroupProcessor != null) {
+ virtualStorageGroupProcessor.setPartitionFileVersionToMax(partitionId,
newMaxVersion);
}
}
}
/** push removePartitions operation down to all virtual storage group
processors */
public void removePartitions(TimePartitionFilter filter) {
- for (StorageGroupProcessor storageGroupProcessor :
virtualStorageGroupProcessor) {
- if (storageGroupProcessor != null) {
- storageGroupProcessor.removePartitions(filter);
+ for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
+ this.virtualStorageGroupProcessor) {
+ if (virtualStorageGroupProcessor != null) {
+ virtualStorageGroupProcessor.removePartitions(filter);
}
}
}
@@ -426,17 +444,18 @@ public class VirtualStorageGroupManager {
*/
public void getWorkingStorageGroupPartitions(
String storageGroupName, Map<String, List<Pair<Long, Boolean>>> res) {
- for (StorageGroupProcessor storageGroupProcessor :
virtualStorageGroupProcessor) {
- if (storageGroupProcessor != null) {
+ for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
+ this.virtualStorageGroupProcessor) {
+ if (virtualStorageGroupProcessor != null) {
List<Pair<Long, Boolean>> partitionIdList = new ArrayList<>();
for (TsFileProcessor tsFileProcessor :
- storageGroupProcessor.getWorkSequenceTsFileProcessors()) {
+ virtualStorageGroupProcessor.getWorkSequenceTsFileProcessors()) {
Pair<Long, Boolean> tmpPair = new
Pair<>(tsFileProcessor.getTimeRangeId(), true);
partitionIdList.add(tmpPair);
}
for (TsFileProcessor tsFileProcessor :
- storageGroupProcessor.getWorkUnsequenceTsFileProcessors()) {
+ virtualStorageGroupProcessor.getWorkUnsequenceTsFileProcessors()) {
Pair<Long, Boolean> tmpPair = new
Pair<>(tsFileProcessor.getTimeRangeId(), false);
partitionIdList.add(tmpPair);
}
@@ -448,9 +467,10 @@ public class VirtualStorageGroupManager {
/** release resource of direct wal buffer */
public void releaseWalDirectByteBufferPool() {
- for (StorageGroupProcessor storageGroupProcessor :
virtualStorageGroupProcessor) {
- if (storageGroupProcessor != null) {
- storageGroupProcessor.releaseWalDirectByteBufferPool();
+ for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
+ this.virtualStorageGroupProcessor) {
+ if (virtualStorageGroupProcessor != null) {
+ virtualStorageGroupProcessor.releaseWalDirectByteBufferPool();
}
}
}
@@ -461,9 +481,10 @@ public class VirtualStorageGroupManager {
}
public void stopCompactionSchedulerPool() {
- for (StorageGroupProcessor storageGroupProcessor :
virtualStorageGroupProcessor) {
- if (storageGroupProcessor != null) {
- storageGroupProcessor.getTimedCompactionScheduleTask().shutdown();
+ for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
+ this.virtualStorageGroupProcessor) {
+ if (virtualStorageGroupProcessor != null) {
+
virtualStorageGroupProcessor.getTimedCompactionScheduleTask().shutdown();
}
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagManager.java
b/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagManager.java
index a2c543f..9926ff5 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagManager.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.metadata.tag;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
-import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.MetadataConstant;
@@ -141,7 +141,7 @@ public class TagManager {
// if ordered by heat, we sort all the timeseries by the descending order
of the last insert
// timestamp
if (plan.isOrderByHeat()) {
- List<StorageGroupProcessor> list;
+ List<VirtualStorageGroupProcessor> list;
try {
list =
StorageEngine.getInstance()
diff --git a/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
b/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
index efc778c..732c2fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
-import
org.apache.iotdb.db.engine.storagegroup.virtualSg.VirtualStorageGroupManager;
+import org.apache.iotdb.db.engine.storagegroup.virtualSg.StorageGroupManager;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -248,13 +248,13 @@ public class StatMonitor implements StatMonitorMBean,
IService {
@Override
public long getStorageGroupTotalPointsNum(String storageGroupName) {
try {
- VirtualStorageGroupManager virtualStorageGroupManager =
+ StorageGroupManager storageGroupManager =
storageEngine.getProcessorMap().get(new
PartialPath(storageGroupName));
- if (virtualStorageGroupManager == null) {
+ if (storageGroupManager == null) {
return 0;
}
- return virtualStorageGroupManager.getMonitorSeriesValue();
+ return storageGroupManager.getMonitorSeriesValue();
} catch (IllegalPathException e) {
logger.error(e.getMessage());
return -1;
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
b/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
index 7f0a4be..8b721de 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.db.qp.executor;
-import
org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.TimePartitionFilter;
+import
org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor.TimePartitionFilter;
import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 633e8b6..af9b6de 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -36,8 +36,8 @@ import
org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
import org.apache.iotdb.db.engine.compaction.cross.inplace.manage.MergeManager;
import
org.apache.iotdb.db.engine.compaction.cross.inplace.manage.MergeManager.TaskStatus;
import org.apache.iotdb.db.engine.flush.pool.FlushTaskPoolManager;
-import
org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.TimePartitionFilter;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import
org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor.TimePartitionFilter;
import org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationService;
import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.ContinuousQueryException;
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/DeletePlan.java
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/DeletePlan.java
index 046e86f..a3005c5 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/DeletePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/DeletePlan.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.db.qp.physical.crud;
-import
org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.TimePartitionFilter;
+import
org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor.TimePartitionFilter;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.logical.Operator;
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java
b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java
index 1ef49ac..6289709 100644
---
a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java
+++
b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.db.qp.physical.sys;
-import
org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.TimePartitionFilter;
+import
org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor.TimePartitionFilter;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.logical.Operator;
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
index d052381..9b40234 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.query.dataset.groupby;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
-import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.AlignedPath;
@@ -98,7 +98,7 @@ public class GroupByWithValueFilterDataSet extends
GroupByEngineDataSet {
Map<AlignedPath, List<List<Integer>>> alignedPathToAggrIndexesMap =
MetaUtils.groupAlignedSeriesWithAggregations(pathToAggrIndexesMap);
- List<StorageGroupProcessor> list =
+ List<VirtualStorageGroupProcessor> list =
StorageEngine.getInstance()
.mergeLock(paths.stream().map(p -> (PartialPath)
p).collect(Collectors.toList()));
try {
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
index 8de5276..b2a8563 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.db.query.dataset.groupby;
import org.apache.iotdb.db.engine.StorageEngine;
-import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.AlignedPath;
@@ -98,7 +98,7 @@ public class GroupByWithoutValueFilterDataSet extends
GroupByEngineDataSet {
throw new QueryProcessException("TimeFilter cannot be null in GroupBy
query.");
}
- List<StorageGroupProcessor> list =
+ List<VirtualStorageGroupProcessor> list =
StorageEngine.getInstance()
.mergeLock(paths.stream().map(p -> (PartialPath)
p).collect(Collectors.toList()));
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index fdb0b88..fc33f26 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.AlignedPath;
@@ -111,7 +111,7 @@ public class AggregationExecutor {
Map<PartialPath, List<Integer>> pathToAggrIndexesMap =
MetaUtils.groupAggregationsBySeries(selectedSeries);
// TODO-Cluster: group the paths by storage group to reduce communications
- List<StorageGroupProcessor> list =
+ List<VirtualStorageGroupProcessor> list =
StorageEngine.getInstance().mergeLock(new
ArrayList<>(pathToAggrIndexesMap.keySet()));
// Attention: this method will REMOVE aligned path from
pathToAggrIndexesMap
@@ -598,7 +598,7 @@ public class AggregationExecutor {
Map<AlignedPath, List<List<Integer>>> alignedPathToAggrIndexesMap =
MetaUtils.groupAlignedSeriesWithAggregations(pathToAggrIndexesMap);
Map<IReaderByTimestamp, List<List<Integer>>> readerToAggrIndexesMap = new
HashMap<>();
- List<StorageGroupProcessor> list =
StorageEngine.getInstance().mergeLock(selectedSeries);
+ List<VirtualStorageGroupProcessor> list =
StorageEngine.getInstance().mergeLock(selectedSeries);
try {
for (PartialPath path : pathToAggrIndexesMap.keySet()) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
b/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
index d4331ea..262ee29 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.query.executor;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.PartialPath;
@@ -78,7 +78,7 @@ public class FillQueryExecutor {
throws StorageEngineException, QueryProcessException, IOException {
RowRecord record = new RowRecord(queryTime);
- List<StorageGroupProcessor> list =
StorageEngine.getInstance().mergeLock(selectedSeries);
+ List<VirtualStorageGroupProcessor> list =
StorageEngine.getInstance().mergeLock(selectedSeries);
try {
List<TimeValuePair> timeValuePairs = getTimeValuePairs(context);
long defaultFillInterval =
IoTDBDescriptor.getInstance().getConfig().getDefaultFillInterval();
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
index 7508a60..94c77e1 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.query.executor;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -168,7 +168,7 @@ public class LastQueryExecutor {
// Acquire query resources for the rest series paths
List<LastPointReader> readerList = new ArrayList<>();
- List<StorageGroupProcessor> list =
StorageEngine.getInstance().mergeLock(nonCachedPaths);
+ List<VirtualStorageGroupProcessor> list =
StorageEngine.getInstance().mergeLock(nonCachedPaths);
try {
for (int i = 0; i < nonCachedPaths.size(); i++) {
QueryDataSource dataSource =
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
index 09f43cf..31bab8a 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.db.query.executor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.PartialPath;
@@ -99,7 +99,7 @@ public class RawDataQueryExecutor {
}
List<ManagedSeriesReader> readersOfSelectedSeries = new ArrayList<>();
- List<StorageGroupProcessor> list =
+ List<VirtualStorageGroupProcessor> list =
StorageEngine.getInstance().mergeLock(queryPlan.getDeduplicatedPaths());
try {
List<PartialPath> paths = queryPlan.getDeduplicatedPaths();
@@ -163,7 +163,7 @@ public class RawDataQueryExecutor {
QueryContext context, RawDataQueryPlan queryPlan, List<Boolean> cached)
throws QueryProcessException, StorageEngineException {
List<IReaderByTimestamp> readersOfSelectedSeries = new ArrayList<>();
- List<StorageGroupProcessor> list =
+ List<VirtualStorageGroupProcessor> list =
StorageEngine.getInstance().mergeLock(queryPlan.getDeduplicatedPaths());
try {
for (int i = 0; i < queryPlan.getDeduplicatedPaths().size(); i++) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java
b/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java
index 7bd4fe6..3bdc6fa 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.db.query.timegenerator;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
@@ -73,7 +73,7 @@ public class ServerTimeGenerator extends TimeGenerator {
throws IOException, StorageEngineException {
List<PartialPath> pathList = new ArrayList<>();
getAndTransformPartialPathFromExpression(expression, pathList);
- List<StorageGroupProcessor> list =
StorageEngine.getInstance().mergeLock(pathList);
+ List<VirtualStorageGroupProcessor> list =
StorageEngine.getInstance().mergeLock(pathList);
try {
operatorNode = construct(expression);
} finally {
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
index 4223fa1..334bb96 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
@@ -91,7 +91,7 @@ public class SystemInfo {
} else {
logger.info(
"Change system to reject status. Triggered by: logical SG ({}), mem
cost delta ({}), totalSgMemCost ({}).",
-
storageGroupInfo.getStorageGroupProcessor().getLogicalStorageGroupName(),
+
storageGroupInfo.getVirtualStorageGroupProcessor().getLogicalStorageGroupName(),
delta,
totalStorageGroupMemCost);
rejected = true;
@@ -131,13 +131,13 @@ public class SystemInfo {
&& totalStorageGroupMemCost < REJECT_THERSHOLD) {
logger.debug(
"SG ({}) released memory (delta: {}) but still exceeding flush
proportion (totalSgMemCost: {}), call flush.",
-
storageGroupInfo.getStorageGroupProcessor().getLogicalStorageGroupName(),
+
storageGroupInfo.getVirtualStorageGroupProcessor().getLogicalStorageGroupName(),
delta,
totalStorageGroupMemCost);
if (rejected) {
logger.info(
"SG ({}) released memory (delta: {}), set system to normal status
(totalSgMemCost: {}).",
-
storageGroupInfo.getStorageGroupProcessor().getLogicalStorageGroupName(),
+
storageGroupInfo.getVirtualStorageGroupProcessor().getLogicalStorageGroupName(),
delta,
totalStorageGroupMemCost);
}
@@ -146,7 +146,7 @@ public class SystemInfo {
} else if (totalStorageGroupMemCost >= REJECT_THERSHOLD) {
logger.warn(
"SG ({}) released memory (delta: {}), but system is still in reject
status (totalSgMemCost: {}).",
-
storageGroupInfo.getStorageGroupProcessor().getLogicalStorageGroupName(),
+
storageGroupInfo.getVirtualStorageGroupProcessor().getLogicalStorageGroupName(),
delta,
totalStorageGroupMemCost);
logCurrentTotalSGMemory();
@@ -154,7 +154,7 @@ public class SystemInfo {
} else {
logger.debug(
"SG ({}) released memory (delta: {}), system is in normal status
(totalSgMemCost: {}).",
-
storageGroupInfo.getStorageGroupProcessor().getLogicalStorageGroupName(),
+
storageGroupInfo.getVirtualStorageGroupProcessor().getLogicalStorageGroupName(),
delta,
totalStorageGroupMemCost);
logCurrentTotalSGMemory();
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/recover/SizeTieredCompactionRecoverTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/recover/SizeTieredCompactionRecoverTest.java
index 1ef63a0..2b43a0a 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/recover/SizeTieredCompactionRecoverTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/recover/SizeTieredCompactionRecoverTest.java
@@ -27,9 +27,9 @@ import
org.apache.iotdb.db.engine.compaction.inner.utils.InnerSpaceCompactionUti
import
org.apache.iotdb.db.engine.compaction.inner.utils.SizeTieredCompactionLogger;
import
org.apache.iotdb.db.engine.compaction.utils.CompactionFileGeneratorUtils;
import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy;
-import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
import org.apache.iotdb.db.exception.StorageGroupProcessorException;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
@@ -140,7 +140,7 @@ public class SizeTieredCompactionRecoverTest {
File timePartitionDir = new File(SEQ_FILE_DIR);
File f = new File(timePartitionDir.getParent() + File.separator +
"test.tmp");
f.createNewFile();
- new StorageGroupProcessor(
+ new VirtualStorageGroupProcessor(
TestConstant.BASE_OUTPUT_PATH + File.separator + "data" +
File.separator + "sequence",
"0",
new TsFileFlushPolicy.DirectFlushPolicy(),
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
index d585a5a..9d3adbf 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
@@ -25,8 +25,8 @@ import org.apache.iotdb.db.engine.StorageEngine;
import
org.apache.iotdb.db.engine.modification.io.LocalTextModificationAccessor;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
-import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -165,7 +165,7 @@ public class DeletionFileNodeTest {
SchemaTestUtils.getMeasurementPath(
processorName + TsFileConstant.PATH_SEPARATOR +
measurements[measurementIdx]),
null);
- List<StorageGroupProcessor> list =
+ List<VirtualStorageGroupProcessor> list =
StorageEngine.getInstance()
.mergeLock(Collections.singletonList((PartialPath)
expression.getSeriesPath()));
try {
@@ -303,7 +303,7 @@ public class DeletionFileNodeTest {
processorName + TsFileConstant.PATH_SEPARATOR +
measurements[5]),
null);
- List<StorageGroupProcessor> list =
+ List<VirtualStorageGroupProcessor> list =
StorageEngine.getInstance()
.mergeLock(Collections.singletonList((PartialPath)
expression.getSeriesPath()));
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index 40e96fc..c330cff 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -73,7 +73,7 @@ public class StorageGroupProcessorTest {
private String systemDir = TestConstant.OUTPUT_DATA_DIR.concat("info");
private String deviceId = "root.vehicle.d0";
private String measurementId = "s0";
- private StorageGroupProcessor processor;
+ private VirtualStorageGroupProcessor processor;
private QueryContext context = EnvironmentUtils.TEST_QUERY_CONTEXT;
@Before
@@ -818,7 +818,7 @@ public class StorageGroupProcessorTest {
config.setCloseTsFileIntervalAfterFlushing(prevCloseTsFileInterval);
}
- class DummySGP extends StorageGroupProcessor {
+ class DummySGP extends VirtualStorageGroupProcessor {
DummySGP(String systemInfoDir, String storageGroupName) throws
StorageGroupProcessorException {
super(
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
index bee794a..b866ba0 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
@@ -81,7 +81,7 @@ public class TTLTest {
private String sg1 = "root.TTL_SG1";
private String sg2 = "root.TTL_SG2";
private long ttl = 12345;
- private StorageGroupProcessor storageGroupProcessor;
+ private VirtualStorageGroupProcessor virtualStorageGroupProcessor;
private String s1 = "s1";
private String g1s1 = sg1 + IoTDBConstant.PATH_SEPARATOR + s1;
private long prevPartitionInterval;
@@ -96,7 +96,7 @@ public class TTLTest {
@After
public void tearDown() throws IOException, StorageEngineException {
- storageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
+ virtualStorageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
EnvironmentUtils.cleanEnv();
IoTDBDescriptor.getInstance().getConfig().setPartitionInterval(prevPartitionInterval);
}
@@ -104,8 +104,8 @@ public class TTLTest {
private void createSchemas() throws MetadataException,
StorageGroupProcessorException {
IoTDB.metaManager.setStorageGroup(new PartialPath(sg1));
IoTDB.metaManager.setStorageGroup(new PartialPath(sg2));
- storageGroupProcessor =
- new StorageGroupProcessor(
+ virtualStorageGroupProcessor =
+ new VirtualStorageGroupProcessor(
IoTDBDescriptor.getInstance().getConfig().getSystemDir(),
sg1,
new DirectFlushPolicy(),
@@ -162,20 +162,20 @@ public class TTLTest {
plan.transferType();
// ok without ttl
- storageGroupProcessor.insert(plan);
+ virtualStorageGroupProcessor.insert(plan);
- storageGroupProcessor.setDataTTL(1000);
+ virtualStorageGroupProcessor.setDataTTL(1000);
// with ttl
plan.setTime(System.currentTimeMillis() - 1001);
boolean caught = false;
try {
- storageGroupProcessor.insert(plan);
+ virtualStorageGroupProcessor.insert(plan);
} catch (OutOfTTLException e) {
caught = true;
}
assertTrue(caught);
plan.setTime(System.currentTimeMillis() - 900);
- storageGroupProcessor.insert(plan);
+ virtualStorageGroupProcessor.insert(plan);
}
private void prepareData()
@@ -201,17 +201,17 @@ public class TTLTest {
// sequence data
for (int i = 1000; i < 2000; i++) {
plan.setTime(initTime - 2000 + i);
- storageGroupProcessor.insert(plan);
+ virtualStorageGroupProcessor.insert(plan);
if ((i + 1) % 300 == 0) {
- storageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
+ virtualStorageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
}
}
// unsequence data
for (int i = 0; i < 1000; i++) {
plan.setTime(initTime - 2000 + i);
- storageGroupProcessor.insert(plan);
+ virtualStorageGroupProcessor.insert(plan);
if ((i + 1) % 300 == 0) {
- storageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
+ virtualStorageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
}
}
}
@@ -224,7 +224,7 @@ public class TTLTest {
// files before ttl
QueryDataSource dataSource =
- storageGroupProcessor.query(
+ virtualStorageGroupProcessor.query(
SchemaTestUtils.getMeasurementPath(sg1 +
TsFileConstant.PATH_SEPARATOR + s1),
EnvironmentUtils.TEST_QUERY_CONTEXT,
null,
@@ -234,11 +234,11 @@ public class TTLTest {
assertEquals(4, seqResource.size());
assertEquals(4, unseqResource.size());
- storageGroupProcessor.setDataTTL(500);
+ virtualStorageGroupProcessor.setDataTTL(500);
// files after ttl
dataSource =
- storageGroupProcessor.query(
+ virtualStorageGroupProcessor.query(
new PartialPath(sg1, s1), EnvironmentUtils.TEST_QUERY_CONTEXT,
null, null);
seqResource = dataSource.getSeqResources();
unseqResource = dataSource.getUnseqResources();
@@ -272,9 +272,9 @@ public class TTLTest {
// we cannot offer the exact number since when exactly ttl will be checked
is unknown
assertTrue(cnt <= 1000);
- storageGroupProcessor.setDataTTL(0);
+ virtualStorageGroupProcessor.setDataTTL(0);
dataSource =
- storageGroupProcessor.query(
+ virtualStorageGroupProcessor.query(
new PartialPath(sg1, s1), EnvironmentUtils.TEST_QUERY_CONTEXT,
null, null);
seqResource = dataSource.getSeqResources();
unseqResource = dataSource.getUnseqResources();
@@ -288,7 +288,7 @@ public class TTLTest {
IllegalPathException {
prepareData();
- storageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
+ virtualStorageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
// files before ttl
File seqDir = new
File(DirectoryManager.getInstance().getNextFolderForSequenceFile(), sg1);
@@ -332,8 +332,8 @@ public class TTLTest {
} catch (InterruptedException e) {
e.printStackTrace();
}
- storageGroupProcessor.setDataTTL(500);
- storageGroupProcessor.checkFilesTTL();
+ virtualStorageGroupProcessor.setDataTTL(500);
+ virtualStorageGroupProcessor.checkFilesTTL();
// files after ttl
seqFiles = new ArrayList<>();
@@ -428,13 +428,13 @@ public class TTLTest {
throws WriteProcessException, QueryProcessException,
IllegalPathException,
TriggerExecutionException {
prepareData();
- storageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
+ virtualStorageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
- assertEquals(4, storageGroupProcessor.getSequenceFileTreeSet().size());
- assertEquals(4, storageGroupProcessor.getUnSequenceFileList().size());
+ assertEquals(4,
virtualStorageGroupProcessor.getSequenceFileTreeSet().size());
+ assertEquals(4,
virtualStorageGroupProcessor.getUnSequenceFileList().size());
- storageGroupProcessor.setDataTTL(0);
- assertEquals(0, storageGroupProcessor.getSequenceFileTreeSet().size());
- assertEquals(0, storageGroupProcessor.getUnSequenceFileList().size());
+ virtualStorageGroupProcessor.setDataTTL(0);
+ assertEquals(0,
virtualStorageGroupProcessor.getSequenceFileTreeSet().size());
+ assertEquals(0,
virtualStorageGroupProcessor.getUnSequenceFileList().size());
}
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java
b/server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java
index 8d7edee..cd7334a 100644
---
a/server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java
@@ -22,8 +22,8 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.StorageEngine;
-import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -166,7 +166,7 @@ public class FileLoaderTest {
}
for (int i = 0; i < 3; i++) {
- StorageGroupProcessor processor =
+ VirtualStorageGroupProcessor processor =
StorageEngine.getInstance().getProcessor(new PartialPath(SG_NAME +
i));
assertTrue(processor.getSequenceFileTreeSet().isEmpty());
assertTrue(processor.getUnSequenceFileList().isEmpty());
@@ -199,7 +199,7 @@ public class FileLoaderTest {
assertFalse(new File(getReceiverFolderFile(),
SyncConstant.RECEIVER_DATA_FOLDER_NAME).exists());
Map<String, Set<String>> sequenceLoadedFileMap = new HashMap<>();
for (int i = 0; i < 3; i++) {
- StorageGroupProcessor processor =
+ VirtualStorageGroupProcessor processor =
StorageEngine.getInstance().getProcessor(new PartialPath(SG_NAME +
i));
sequenceLoadedFileMap.putIfAbsent(SG_NAME + i, new HashSet<>());
assertEquals(10, processor.getSequenceFileTreeSet().size());
@@ -285,7 +285,7 @@ public class FileLoaderTest {
}
for (int i = 0; i < 3; i++) {
- StorageGroupProcessor processor =
+ VirtualStorageGroupProcessor processor =
StorageEngine.getInstance().getProcessor(new PartialPath(SG_NAME +
i));
assertTrue(processor.getSequenceFileTreeSet().isEmpty());
assertTrue(processor.getUnSequenceFileList().isEmpty());
@@ -318,7 +318,7 @@ public class FileLoaderTest {
assertFalse(new File(getReceiverFolderFile(),
SyncConstant.RECEIVER_DATA_FOLDER_NAME).exists());
Map<String, Set<String>> loadedFileMap = new HashMap<>();
for (int i = 0; i < 3; i++) {
- StorageGroupProcessor processor =
+ VirtualStorageGroupProcessor processor =
StorageEngine.getInstance().getProcessor(new PartialPath(SG_NAME +
i));
loadedFileMap.putIfAbsent(SG_NAME + i, new HashSet<>());
assertEquals(25, processor.getSequenceFileTreeSet().size());
@@ -376,7 +376,7 @@ public class FileLoaderTest {
loadedFileMap.clear();
for (int i = 0; i < 3; i++) {
- StorageGroupProcessor processor =
+ VirtualStorageGroupProcessor processor =
StorageEngine.getInstance().getProcessor(new PartialPath(SG_NAME +
i));
loadedFileMap.putIfAbsent(SG_NAME + i, new HashSet<>());
for (TsFileResource tsFileResource : processor.getSequenceFileTreeSet())
{
diff --git
a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzerTest.java
b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzerTest.java
index 922cf1a..e51dd2b 100644
---
a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzerTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzerTest.java
@@ -22,8 +22,8 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.StorageEngine;
-import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -156,7 +156,7 @@ public class SyncReceiverLogAnalyzerTest {
}
for (int i = 0; i < 3; i++) {
- StorageGroupProcessor processor =
+ VirtualStorageGroupProcessor processor =
StorageEngine.getInstance().getProcessor(new PartialPath(SG_NAME +
i));
assertTrue(processor.getSequenceFileTreeSet().isEmpty());
assertTrue(processor.getUnSequenceFileList().isEmpty());