This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8070bae8500 Pipe: Fixed the bug that historical alias/attributes/tags
may not be transferred in meta pipe when the time series already exists in
receiver & create timeseries non idempotent in PBTree
8070bae8500 is described below
commit 8070bae8500bc11d6f2b50f4ea85b0c334b70314
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jul 17 17:42:05 2024 +0800
Pipe: Fixed the bug that historical alias/attributes/tags may not be
transferred in meta pipe when the time series already exists in receiver &
create timeseries non idempotent in PBTree
---
.../pipe/it/autocreate/IoTDBPipeIdempotentIT.java | 84 +++----
.../it/manual/IoTDBPipeMetaLeaderChangeIT.java | 22 +-
.../pipe/extractor/ConfigRegionListeningQueue.java | 25 ++-
.../ConfigRegionListeningQueueTest.java | 13 +-
.../schemaregion/SchemaExecutionVisitor.java | 155 ++++++++-----
.../schemaregion/SchemaRegionListeningQueue.java | 22 +-
.../execution/executor/RegionWriteExecutor.java | 81 +++----
.../schemaengine/schemaregion/ISchemaRegion.java | 4 +-
.../schemaregion/impl/SchemaRegionMemoryImpl.java | 125 ++++++++---
.../schemaregion/impl/SchemaRegionPBTreeImpl.java | 130 +++++++----
.../mtree/impl/mem/MTreeBelowSGMemoryImpl.java | 107 +++++----
.../mtree/impl/pbtree/MTreeBelowSGCachedImpl.java | 118 ++++++----
.../mtree/impl/pbtree/mnode/ICachedMNode.java | 1 +
.../write/req/SchemaRegionWritePlanFactory.java | 2 +-
.../req/impl/CreateAlignedTimeSeriesPlanImpl.java | 45 ++--
.../write/req/impl/CreateTimeSeriesPlanImpl.java | 9 +
.../schemaRegion/AbstractSchemaRegionTest.java | 9 +-
.../schemaRegion/SchemaRegionBasicTest.java | 245 +++++++++++++++++----
.../schemaRegion/SchemaRegionManagementTest.java | 4 +-
.../SchemaRegionSimpleRecoverTest.java | 135 +++++++++++-
.../schemaRegion/SchemaRegionTemplateTest.java | 2 +-
.../schemaRegion/SchemaRegionTestUtil.java | 41 +++-
.../schemaRegion/SchemaStatisticsTest.java | 28 +--
.../extractor/SchemaRegionListeningQueueTest.java | 15 +-
.../org/apache/iotdb/db/tools/MLogParserTest.java | 2 +-
.../db/utils/SchemaRegionSnapshotParserTest.java | 10 +-
.../commons/schema/node/utils/IMNodeFactory.java | 1 +
27 files changed, 1000 insertions(+), 435 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeIdempotentIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeIdempotentIT.java
index 2cd7e287796..fab2ef9597a 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeIdempotentIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeIdempotentIT.java
@@ -82,82 +82,82 @@ public class IoTDBPipeIdempotentIT extends
AbstractPipeDualAutoIT {
}
@Test
- public void testCreateTimeseriesIdempotent() throws Exception {
+ public void testCreateTimeSeriesIdempotent() throws Exception {
testIdempotent(
Collections.emptyList(),
- "create timeseries root.ln.wf01.wt01.status0 with
datatype=BOOLEAN,encoding=PLAIN",
- "create timeseries root.ln.wf01.wt01.status1 with
datatype=BOOLEAN,encoding=PLAIN",
- "count timeseries",
+ "create timeSeries root.ln.wf01.wt01.status0 with
datatype=BOOLEAN,encoding=PLAIN",
+ "create timeSeries root.ln.wf01.wt01.status1 with
datatype=BOOLEAN,encoding=PLAIN",
+ "count timeSeries",
"count(timeseries),",
Collections.singleton("2,"));
}
@Test
- public void testCreateAlignedTimeseriesIdempotent() throws Exception {
+ public void testCreateAlignedTimeSeriesIdempotent() throws Exception {
testIdempotent(
Collections.emptyList(),
"CREATE ALIGNED TIMESERIES root.ln.wf01.GPS(latitude FLOAT
encoding=PLAIN compressor=SNAPPY, longitude FLOAT encoding=PLAIN
compressor=SNAPPY)",
- "create timeseries root.ln.wf01.wt01.status1(status) with
datatype=BOOLEAN,encoding=PLAIN",
- "count timeseries",
+ "create timeSeries root.ln.wf01.wt01.status1(status) with
datatype=BOOLEAN,encoding=PLAIN",
+ "count timeSeries",
"count(timeseries),",
Collections.singleton("3,"));
}
@Test
- public void testCreateTimeseriesWithAliasIdempotent() throws Exception {
+ public void testCreateTimeSeriesWithAliasIdempotent() throws Exception {
testIdempotent(
Collections.emptyList(),
- "create timeseries root.ln.wf01.wt01.status0(status0) with
datatype=BOOLEAN,encoding=PLAIN",
- "create timeseries root.ln.wf01.wt01.status1(status1) with
datatype=BOOLEAN,encoding=PLAIN",
- "count timeseries",
+ "create timeSeries root.ln.wf01.wt01.status0(status0) with
datatype=BOOLEAN,encoding=PLAIN",
+ "create timeSeries root.ln.wf01.wt01.status1(status1) with
datatype=BOOLEAN,encoding=PLAIN",
+ "count timeSeries",
"count(timeseries),",
Collections.singleton("2,"));
}
@Test
- public void testInternalCreateTimeseriesIdempotent() throws Exception {
+ public void testInternalCreateTimeSeriesIdempotent() throws Exception {
testIdempotent(
Collections.emptyList(),
"insert into root.ln.wf01.wt01(time, status0) values(now(), false);",
- "create timeseries root.ln.wf01.wt01.status1 with
datatype=BOOLEAN,encoding=PLAIN",
- "count timeseries",
+ "create timeSeries root.ln.wf01.wt01.status1 with
datatype=BOOLEAN,encoding=PLAIN",
+ "count timeSeries",
"count(timeseries),",
Collections.singleton("2,"));
}
@Test
- public void testAlterTimeseriesAddTagIdempotent() throws Exception {
+ public void testAlterTimeSeriesAddTagIdempotent() throws Exception {
testIdempotent(
Collections.singletonList(
- "create timeseries root.ln.wf01.wt01.status0 with
datatype=BOOLEAN,encoding=PLAIN"),
- "ALTER timeseries root.ln.wf01.wt01.status0 ADD TAGS tag3=v3;",
- "create timeseries root.ln.wf01.wt01.status1 with
datatype=BOOLEAN,encoding=PLAIN",
- "count timeseries",
+ "create timeSeries root.ln.wf01.wt01.status0 with
datatype=BOOLEAN,encoding=PLAIN"),
+ "ALTER timeSeries root.ln.wf01.wt01.status0 ADD TAGS tag3=v3;",
+ "create timeSeries root.ln.wf01.wt01.status1 with
datatype=BOOLEAN,encoding=PLAIN",
+ "count timeSeries",
"count(timeseries),",
Collections.singleton("2,"));
}
@Test
- public void testAlterTimeseriesAddAttrIdempotent() throws Exception {
+ public void testAlterTimeSeriesAddAttrIdempotent() throws Exception {
testIdempotent(
Collections.singletonList(
- "create timeseries root.ln.wf01.wt01.status0 with
datatype=BOOLEAN,encoding=PLAIN"),
- "ALTER timeseries root.ln.wf01.wt01.status0 ADD ATTRIBUTES
attr1=newV1;",
- "create timeseries root.ln.wf01.wt01.status1 with
datatype=BOOLEAN,encoding=PLAIN",
- "count timeseries",
+ "create timeSeries root.ln.wf01.wt01.status0 with
datatype=BOOLEAN,encoding=PLAIN"),
+ "ALTER timeSeries root.ln.wf01.wt01.status0 ADD ATTRIBUTES
attr1=newV1;",
+ "create timeSeries root.ln.wf01.wt01.status1 with
datatype=BOOLEAN,encoding=PLAIN",
+ "count timeSeries",
"count(timeseries),",
Collections.singleton("2,"));
}
@Test
- public void testAlterTimeseriesRenameIdempotent() throws Exception {
+ public void testAlterTimeSeriesRenameIdempotent() throws Exception {
testIdempotent(
Arrays.asList(
- "create timeseries root.ln.wf01.wt01.status0 with
datatype=BOOLEAN,encoding=PLAIN",
- "ALTER timeseries root.ln.wf01.wt01.status0 ADD ATTRIBUTES
attr1=newV1;"),
- "ALTER timeseries root.ln.wf01.wt01.status0 RENAME attr1 TO newAttr1;",
- "create timeseries root.ln.wf01.wt01.status1 with
datatype=BOOLEAN,encoding=PLAIN",
- "count timeseries",
+ "create timeSeries root.ln.wf01.wt01.status0 with
datatype=BOOLEAN,encoding=PLAIN",
+ "ALTER timeSeries root.ln.wf01.wt01.status0 ADD ATTRIBUTES
attr1=newV1;"),
+ "ALTER timeSeries root.ln.wf01.wt01.status0 RENAME attr1 TO newAttr1;",
+ "create timeSeries root.ln.wf01.wt01.status1 with
datatype=BOOLEAN,encoding=PLAIN",
+ "count timeSeries",
"count(timeseries),",
Collections.singleton("2,"));
}
@@ -166,10 +166,10 @@ public class IoTDBPipeIdempotentIT extends
AbstractPipeDualAutoIT {
public void testDeleteTimeSeriesIdempotent() throws Exception {
testIdempotent(
Collections.singletonList(
- "create timeseries root.ln.wf01.wt01.status0(status0) with
datatype=BOOLEAN,encoding=PLAIN"),
- "delete timeseries root.ln.wf01.wt01.status0",
- "create timeseries root.ln.wf01.wt01.status2(status2) with
datatype=BOOLEAN,encoding=PLAIN",
- "count timeseries",
+ "create timeSeries root.ln.wf01.wt01.status0(status0) with
datatype=BOOLEAN,encoding=PLAIN"),
+ "delete timeSeries root.ln.wf01.wt01.status0",
+ "create timeSeries root.ln.wf01.wt01.status2(status2) with
datatype=BOOLEAN,encoding=PLAIN",
+ "count timeSeries",
"count(timeseries),",
Collections.singleton("1,"));
}
@@ -243,9 +243,9 @@ public class IoTDBPipeIdempotentIT extends
AbstractPipeDualAutoIT {
"create schema template t1 (s1 INT64 encoding=RLE, s2 INT64
encoding=RLE, s3 INT64 encoding=RLE compression=SNAPPY)",
"create database root.sg1",
"set schema template t1 to root.sg1"),
- "create timeseries using device template on root.sg1.d1",
- "create timeseries using device template on root.sg1.d2",
- "count timeseries",
+ "create timeSeries using device template on root.sg1.d1",
+ "create timeSeries using device template on root.sg1.d2",
+ "count timeSeries",
"count(timeseries),",
Collections.singleton("6,"));
}
@@ -257,11 +257,11 @@ public class IoTDBPipeIdempotentIT extends
AbstractPipeDualAutoIT {
"create schema template t1 (s1 INT64 encoding=RLE, s2 INT64
encoding=RLE, s3 INT64 encoding=RLE compression=SNAPPY)",
"create database root.sg1",
"set schema template t1 to root.sg1",
- "create timeseries using device template on root.sg1.d1",
- "create timeseries using device template on root.sg1.d2"),
- "delete timeseries of schema template t1 from root.sg1.*",
- "create timeseries using device template on root.sg1.d3",
- "count timeseries",
+ "create timeSeries using device template on root.sg1.d1",
+ "create timeSeries using device template on root.sg1.d2"),
+ "delete timeSeries of schema template t1 from root.sg1.*",
+ "create timeSeries using device template on root.sg1.d3",
+ "count timeSeries",
"count(timeseries),",
Collections.singleton("3,"));
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaLeaderChangeIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaLeaderChangeIT.java
index b05e695ad3b..de386f7f3dc 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaLeaderChangeIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaLeaderChangeIT.java
@@ -163,7 +163,7 @@ public class IoTDBPipeMetaLeaderChangeIT extends
AbstractPipeDualManualIT {
if (!TestUtils.tryExecuteNonQueryWithRetry(
senderEnv,
String.format(
- "create timeseries root.ln.wf01.GPS.status%s with
datatype=BOOLEAN,encoding=PLAIN",
+ "create timeSeries root.ln.wf01.GPS.status%s with
datatype=BOOLEAN,encoding=PLAIN",
i))) {
return;
}
@@ -178,18 +178,34 @@ public class IoTDBPipeMetaLeaderChangeIT extends
AbstractPipeDualManualIT {
return;
}
+ // Include "alter" in historical transfer for new leader as possible
+ // in order to test the "withMerge" creation in receiver side
+ if (!TestUtils.tryExecuteNonQueryOnSpecifiedDataNodeWithRetry(
+ senderEnv,
+ senderEnv.getDataNodeWrapper(index == 0 ? 1 : 0),
+ "ALTER timeSeries root.ln.wf01.GPS.status0 UPSERT ALIAS=newAlias
TAGS(tag3=v3) ATTRIBUTES(attr4=v4)")) {
+ return;
+ }
+
for (int i = 10; i < 20; ++i) {
if (!TestUtils.tryExecuteNonQueryOnSpecifiedDataNodeWithRetry(
senderEnv,
senderEnv.getDataNodeWrapper(index == 0 ? 1 : 0),
String.format(
- "create timeseries root.ln.wf01.GPS.status%s with
datatype=BOOLEAN,encoding=PLAIN",
+ "create timeSeries root.ln.wf01.GPS.status%s with
datatype=BOOLEAN,encoding=PLAIN",
i))) {
return;
}
}
TestUtils.assertDataEventuallyOnEnv(
- receiverEnv, "count timeseries", "count(timeseries),",
Collections.singleton("20,"));
+ receiverEnv,
+ "show timeSeries root.ln.wf01.GPS.status0",
+
"Timeseries,Alias,Database,DataType,Encoding,Compression,Tags,Attributes,Deadband,DeadbandParameters,ViewType,",
+ Collections.singleton(
+
"root.ln.wf01.GPS.status0,newAlias,root.ln,BOOLEAN,PLAIN,LZ4,{\"tag3\":\"v3\"},{\"attr4\":\"v4\"},null,null,BASE,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv, "count timeSeries", "count(timeseries),",
Collections.singleton("20,"));
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningQueue.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningQueue.java
index 9f76e2d3748..4c908667a9f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningQueue.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningQueue.java
@@ -61,9 +61,10 @@ public class ConfigRegionListeningQueue extends
AbstractPipeListeningQueue
/////////////////////////////// Function ///////////////////////////////
- public synchronized void tryListenToPlan(ConfigPhysicalPlan plan, boolean
isGeneratedByPipe) {
+ public synchronized void tryListenToPlan(
+ final ConfigPhysicalPlan plan, final boolean isGeneratedByPipe) {
if (ConfigRegionListeningFilter.shouldPlanBeListened(plan)) {
- PipeConfigRegionWritePlanEvent event;
+ final PipeConfigRegionWritePlanEvent event;
switch (plan.getType()) {
case PipeEnriched:
tryListenToPlan(((PipeEnrichedPlan) plan).getInnerPlan(), true);
@@ -96,9 +97,9 @@ public class ConfigRegionListeningQueue extends
AbstractPipeListeningQueue
}
public synchronized void tryListenToSnapshots(
- List<Pair<Pair<Path, Path>, CNSnapshotFileType>> snapshotPathInfoList) {
- List<PipeSnapshotEvent> events = new ArrayList<>();
- for (Pair<Pair<Path, Path>, CNSnapshotFileType> snapshotPathInfo :
snapshotPathInfoList) {
+ final List<Pair<Pair<Path, Path>, CNSnapshotFileType>>
snapshotPathInfoList) {
+ final List<PipeSnapshotEvent> events = new ArrayList<>();
+ for (final Pair<Pair<Path, Path>, CNSnapshotFileType> snapshotPathInfo :
snapshotPathInfoList) {
final Path snapshotPath = snapshotPathInfo.getLeft().getLeft();
final CNSnapshotFileType type = snapshotPathInfo.getRight();
// Filter empty and superuser snapshots
@@ -133,19 +134,19 @@ public class ConfigRegionListeningQueue extends
AbstractPipeListeningQueue
/////////////////////////////// Element Ser / De Method
////////////////////////////////
@Override
- protected ByteBuffer serializeToByteBuffer(Event event) {
+ protected ByteBuffer serializeToByteBuffer(final Event event) {
return ((SerializableEvent) event).serializeToByteBuffer();
}
@Override
- protected Event deserializeFromByteBuffer(ByteBuffer byteBuffer) {
+ protected Event deserializeFromByteBuffer(final ByteBuffer byteBuffer) {
try {
- SerializableEvent result =
PipeConfigSerializableEventType.deserialize(byteBuffer);
+ final SerializableEvent result =
PipeConfigSerializableEventType.deserialize(byteBuffer);
// We assume the caller of this method will put the deserialize result
into a queue,
// so we increase the reference count here.
((EnrichedEvent)
result).increaseReferenceCount(ConfigRegionListeningQueue.class.getName());
return result;
- } catch (IOException e) {
+ } catch (final IOException e) {
LOGGER.error("Failed to load snapshot from byteBuffer {}.", byteBuffer);
}
return null;
@@ -154,12 +155,14 @@ public class ConfigRegionListeningQueue extends
AbstractPipeListeningQueue
/////////////////////////////// Snapshot ///////////////////////////////
@Override
- public synchronized boolean processTakeSnapshot(File snapshotDir) throws
TException, IOException {
+ public synchronized boolean processTakeSnapshot(final File snapshotDir)
+ throws TException, IOException {
return super.serializeToFile(new File(snapshotDir, SNAPSHOT_FILE_NAME));
}
@Override
- public synchronized void processLoadSnapshot(File snapshotDir) throws
TException, IOException {
+ public synchronized void processLoadSnapshot(final File snapshotDir)
+ throws TException, IOException {
super.deserializeFromFile(new File(snapshotDir, SNAPSHOT_FILE_NAME));
}
}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ConfigRegionListeningQueueTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ConfigRegionListeningQueueTest.java
index f9e72c59d6e..937c1a9e2dd 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ConfigRegionListeningQueueTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ConfigRegionListeningQueueTest.java
@@ -52,6 +52,8 @@ public class ConfigRegionListeningQueueTest {
if (!snapshotDir.exists()) {
snapshotDir.mkdirs();
}
+ PipeConfigNodeAgent.runtime().listener().open();
+ PipeConfigNodeAgent.runtime().notifyLeaderReady();
}
@AfterClass
@@ -64,13 +66,10 @@ public class ConfigRegionListeningQueueTest {
@Test
public void testSnapshot() throws TException, IOException, AuthException {
- PipeConfigNodeAgent.runtime().listener().open();
- PipeConfigNodeAgent.runtime().notifyLeaderReady();
-
- DatabaseSchemaPlan plan1 =
+ final DatabaseSchemaPlan plan1 =
new DatabaseSchemaPlan(
ConfigPhysicalPlanType.CreateDatabase, new
TDatabaseSchema("root.test1"));
- PipeEnrichedPlan plan2 =
+ final PipeEnrichedPlan plan2 =
new PipeEnrichedPlan(
new AuthorPlan(
ConfigPhysicalPlanType.CreateUser,
@@ -96,10 +95,10 @@ public class ConfigRegionListeningQueueTest {
ConcurrentIterableLinkedQueue<Event>.DynamicIterator itr =
PipeConfigNodeAgent.runtime().listener().newIterator(0);
- Event event1 = itr.next(0);
+ final Event event1 = itr.next(0);
Assert.assertEquals(plan1, ((PipeConfigRegionWritePlanEvent)
event1).getConfigPhysicalPlan());
- Event event2 = itr.next(0);
+ final Event event2 = itr.next(0);
Assert.assertEquals(
plan2.getInnerPlan(), ((PipeConfigRegionWritePlanEvent)
event2).getConfigPhysicalPlan());
Assert.assertTrue(((PipeConfigRegionWritePlanEvent)
event2).isGeneratedByPipe());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
index 94d8dda19ac..6efdeed997e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import
org.apache.iotdb.db.pipe.extractor.schemaregion.SchemaRegionListeningQueue;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.ActivateTemplateNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.AlterTimeSeriesNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.BatchActivateTemplateNode;
@@ -60,6 +61,8 @@ import
org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion;
import
org.apache.iotdb.db.schemaengine.schemaregion.write.req.ICreateAlignedTimeSeriesPlan;
import
org.apache.iotdb.db.schemaengine.schemaregion.write.req.ICreateTimeSeriesPlan;
import
org.apache.iotdb.db.schemaengine.schemaregion.write.req.SchemaRegionWritePlanFactory;
+import
org.apache.iotdb.db.schemaengine.schemaregion.write.req.impl.CreateAlignedTimeSeriesPlanImpl;
+import
org.apache.iotdb.db.schemaengine.schemaregion.write.req.impl.CreateTimeSeriesPlanImpl;
import org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager;
import org.apache.iotdb.db.schemaengine.template.Template;
import org.apache.iotdb.rpc.RpcUtils;
@@ -84,7 +87,7 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
@Override
public TSStatus visitCreateTimeSeries(CreateTimeSeriesNode node,
ISchemaRegion schemaRegion) {
try {
- schemaRegion.createTimeseries(node, -1);
+ schemaRegion.createTimeSeries(node, -1);
} catch (MetadataException e) {
logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
@@ -119,7 +122,7 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
// todo implement batch creation of one device in SchemaRegion
for (int i = 0; i < size; i++) {
try {
- schemaRegion.createTimeseries(
+ schemaRegion.createTimeSeries(
transformToCreateTimeSeriesPlan(devicePath, measurementGroup,
i), -1);
} catch (MetadataException e) {
logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME,
e);
@@ -153,27 +156,37 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
@Override
public TSStatus visitInternalCreateTimeSeries(
- InternalCreateTimeSeriesNode node, ISchemaRegion schemaRegion) {
- PartialPath devicePath = node.getDevicePath();
- MeasurementGroup measurementGroup = node.getMeasurementGroup();
+ final InternalCreateTimeSeriesNode node, final ISchemaRegion
schemaRegion) {
+ final PartialPath devicePath = node.getDevicePath();
+ final MeasurementGroup measurementGroup = node.getMeasurementGroup();
- List<TSStatus> alreadyExistingTimeseries = new ArrayList<>();
- List<TSStatus> failingStatus = new ArrayList<>();
+ final List<TSStatus> alreadyExistingTimeSeries = new ArrayList<>();
+ final List<TSStatus> failingStatus = new ArrayList<>();
if (node.isAligned()) {
- executeInternalCreateAlignedTimeseries(
- devicePath, measurementGroup, schemaRegion,
alreadyExistingTimeseries, failingStatus);
+ executeInternalCreateAlignedTimeSeries(
+ devicePath,
+ measurementGroup,
+ schemaRegion,
+ alreadyExistingTimeSeries,
+ failingStatus,
+ false);
} else {
- executeInternalCreateTimeseries(
- devicePath, measurementGroup, schemaRegion,
alreadyExistingTimeseries, failingStatus);
+ executeInternalCreateTimeSeries(
+ devicePath,
+ measurementGroup,
+ schemaRegion,
+ alreadyExistingTimeSeries,
+ failingStatus,
+ false);
}
if (!failingStatus.isEmpty()) {
return RpcUtils.getStatus(failingStatus);
}
- if (!alreadyExistingTimeseries.isEmpty()) {
- return RpcUtils.getStatus(alreadyExistingTimeseries);
+ if (!alreadyExistingTimeSeries.isEmpty()) {
+ return RpcUtils.getStatus(alreadyExistingTimeSeries);
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute
successfully");
@@ -181,23 +194,33 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
@Override
public TSStatus visitInternalCreateMultiTimeSeries(
- InternalCreateMultiTimeSeriesNode node, ISchemaRegion schemaRegion) {
+ final InternalCreateMultiTimeSeriesNode node, final ISchemaRegion
schemaRegion) {
PartialPath devicePath;
MeasurementGroup measurementGroup;
- List<TSStatus> alreadyExistingTimeseries = new ArrayList<>();
- List<TSStatus> failingStatus = new ArrayList<>();
+ final List<TSStatus> alreadyExistingTimeSeries = new ArrayList<>();
+ final List<TSStatus> failingStatus = new ArrayList<>();
- for (Map.Entry<PartialPath, Pair<Boolean, MeasurementGroup>> deviceEntry :
+ for (final Map.Entry<PartialPath, Pair<Boolean, MeasurementGroup>>
deviceEntry :
node.getDeviceMap().entrySet()) {
devicePath = deviceEntry.getKey();
measurementGroup = deviceEntry.getValue().right;
- if (deviceEntry.getValue().left) {
- executeInternalCreateAlignedTimeseries(
- devicePath, measurementGroup, schemaRegion,
alreadyExistingTimeseries, failingStatus);
+ if (Boolean.TRUE.equals(deviceEntry.getValue().left)) {
+ executeInternalCreateAlignedTimeSeries(
+ devicePath,
+ measurementGroup,
+ schemaRegion,
+ alreadyExistingTimeSeries,
+ failingStatus,
+ node.isGeneratedByPipe());
} else {
- executeInternalCreateTimeseries(
- devicePath, measurementGroup, schemaRegion,
alreadyExistingTimeseries, failingStatus);
+ executeInternalCreateTimeSeries(
+ devicePath,
+ measurementGroup,
+ schemaRegion,
+ alreadyExistingTimeSeries,
+ failingStatus,
+ node.isGeneratedByPipe());
}
}
@@ -205,49 +228,56 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
return RpcUtils.getStatus(failingStatus);
}
- if (!alreadyExistingTimeseries.isEmpty()) {
- return RpcUtils.getStatus(alreadyExistingTimeseries);
+ if (!alreadyExistingTimeSeries.isEmpty()) {
+ return RpcUtils.getStatus(alreadyExistingTimeSeries);
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute
successfully");
}
- private void executeInternalCreateTimeseries(
- PartialPath devicePath,
- MeasurementGroup measurementGroup,
- ISchemaRegion schemaRegion,
- List<TSStatus> alreadyExistingTimeseries,
- List<TSStatus> failingStatus) {
-
- int size = measurementGroup.getMeasurements().size();
+ private void executeInternalCreateTimeSeries(
+ final PartialPath devicePath,
+ final MeasurementGroup measurementGroup,
+ final ISchemaRegion schemaRegion,
+ final List<TSStatus> alreadyExistingTimeSeries,
+ final List<TSStatus> failingStatus,
+ final boolean withMerge) {
+ final int size = measurementGroup.getMeasurements().size();
// todo implement batch creation of one device in SchemaRegion
for (int i = 0; i < size; i++) {
try {
- schemaRegion.createTimeseries(
- transformToCreateTimeSeriesPlan(devicePath, measurementGroup, i),
-1);
- } catch (MeasurementAlreadyExistException e) {
- // There's no need to internal create timeseries.
- alreadyExistingTimeseries.add(
+ final ICreateTimeSeriesPlan createTimeSeriesPlan =
+ transformToCreateTimeSeriesPlan(devicePath, measurementGroup, i);
+ // With merge is only true for pipe to upsert the receiver
alias/tags/attributes in
+ // historical transfer.
+ // For normal internal creation, the alias/tags/attributes are not set
+ // Thus the original ones are not altered
+ ((CreateTimeSeriesPlanImpl)
createTimeSeriesPlan).setWithMerge(withMerge);
+ schemaRegion.createTimeSeries(createTimeSeriesPlan, -1);
+ } catch (final MeasurementAlreadyExistException e) {
+ // There's no need to internal create time series.
+ alreadyExistingTimeSeries.add(
RpcUtils.getStatus(
e.getErrorCode(),
MeasurementPath.transformDataToString(e.getMeasurementPath())));
- } catch (MetadataException e) {
+ } catch (final MetadataException e) {
logger.warn("{}: MetaData error: ", e.getMessage(), e);
failingStatus.add(RpcUtils.getStatus(e.getErrorCode(),
e.getMessage()));
}
}
}
- private void executeInternalCreateAlignedTimeseries(
- PartialPath devicePath,
- MeasurementGroup measurementGroup,
- ISchemaRegion schemaRegion,
- List<TSStatus> alreadyExistingTimeseries,
- List<TSStatus> failingStatus) {
- List<String> measurementList = measurementGroup.getMeasurements();
- List<TSDataType> dataTypeList = measurementGroup.getDataTypes();
- List<TSEncoding> encodingList = measurementGroup.getEncodings();
- List<CompressionType> compressionTypeList =
measurementGroup.getCompressors();
- ICreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan =
+ private void executeInternalCreateAlignedTimeSeries(
+ final PartialPath devicePath,
+ final MeasurementGroup measurementGroup,
+ final ISchemaRegion schemaRegion,
+ final List<TSStatus> alreadyExistingTimeSeries,
+ final List<TSStatus> failingStatus,
+ final boolean withMerge) {
+ final List<String> measurementList = measurementGroup.getMeasurements();
+ final List<TSDataType> dataTypeList = measurementGroup.getDataTypes();
+ final List<TSEncoding> encodingList = measurementGroup.getEncodings();
+ final List<CompressionType> compressionTypeList =
measurementGroup.getCompressors();
+ final ICreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan =
SchemaRegionWritePlanFactory.getCreateAlignedTimeSeriesPlan(
devicePath,
measurementList,
@@ -257,22 +287,27 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
null,
null,
null);
+ // With merge is only true for pipe to upsert the receiver
alias/tags/attributes in historical
+ // transfer.
+ // For normal internal creation, the alias/tags/attributes are not set
+ // Thus the original ones are not altered
+ ((CreateAlignedTimeSeriesPlanImpl)
createAlignedTimeSeriesPlan).setWithMerge(withMerge);
boolean shouldRetry = true;
while (shouldRetry) {
try {
schemaRegion.createAlignedTimeSeries(createAlignedTimeSeriesPlan);
shouldRetry = false;
- } catch (MeasurementAlreadyExistException e) {
- // the existence check will be executed before truly creation
- // There's no need to internal create timeseries.
- MeasurementPath measurementPath = e.getMeasurementPath();
- alreadyExistingTimeseries.add(
+ } catch (final MeasurementAlreadyExistException e) {
+ // The existence check will be executed before truly creation
+ // There's no need to internal create time series.
+ final MeasurementPath measurementPath = e.getMeasurementPath();
+ alreadyExistingTimeSeries.add(
RpcUtils.getStatus(
e.getErrorCode(),
MeasurementPath.transformDataToString(e.getMeasurementPath())));
- // remove the existing timeseries from plan
- int index = measurementList.indexOf(measurementPath.getMeasurement());
+ // remove the existing time series from plan
+ final int index =
measurementList.indexOf(measurementPath.getMeasurement());
measurementList.remove(index);
dataTypeList.remove(index);
encodingList.remove(index);
@@ -282,7 +317,7 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
shouldRetry = false;
}
- } catch (MetadataException e) {
+ } catch (final MetadataException e) {
logger.warn("{}: MetaData error: ", e.getMessage(), e);
failingStatus.add(RpcUtils.getStatus(e.getErrorCode(),
e.getMessage()));
shouldRetry = false;
@@ -533,12 +568,16 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
@Override
public TSStatus visitPipeEnrichedWritePlanNode(
final PipeEnrichedWritePlanNode node, final ISchemaRegion schemaRegion) {
- return node.getWritePlanNode().accept(this, schemaRegion);
+ final WritePlanNode innerNode = node.getWritePlanNode();
+ innerNode.markAsGeneratedByPipe();
+ return innerNode.accept(this, schemaRegion);
}
@Override
public TSStatus visitPipeEnrichedNonWritePlanNode(
final PipeEnrichedNonWritePlanNode node, final ISchemaRegion
schemaRegion) {
+ final PlanNode innerNode = node.getNonWritePlanNode();
+ innerNode.markAsGeneratedByPipe();
return node.getNonWritePlanNode().accept(this, schemaRegion);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/SchemaRegionListeningQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/SchemaRegionListeningQueue.java
index 517f93d2aab..70ad8b6df24 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/SchemaRegionListeningQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/SchemaRegionListeningQueue.java
@@ -47,9 +47,9 @@ public class SchemaRegionListeningQueue extends
AbstractPipeListeningQueue {
/////////////////////////////// Function ///////////////////////////////
- public synchronized void tryListenToNode(PlanNode node) {
+ public synchronized void tryListenToNode(final PlanNode node) {
if (SchemaRegionListeningFilter.shouldPlanBeListened(node)) {
- PipeSchemaRegionWritePlanEvent event;
+ final PipeSchemaRegionWritePlanEvent event;
switch (node.getType()) {
case PIPE_ENRICHED_WRITE:
event =
@@ -69,7 +69,7 @@ public class SchemaRegionListeningQueue extends
AbstractPipeListeningQueue {
}
public synchronized void tryListenToSnapshot(
- String mTreeSnapshotPath, String tLogPath, String databaseName) {
+ final String mTreeSnapshotPath, final String tLogPath, final String
databaseName) {
tryListen(
Objects.nonNull(mTreeSnapshotPath)
? Collections.singletonList(
@@ -80,19 +80,19 @@ public class SchemaRegionListeningQueue extends
AbstractPipeListeningQueue {
/////////////////////////////// Element Ser / De Method
////////////////////////////////
@Override
- protected ByteBuffer serializeToByteBuffer(Event event) {
+ protected ByteBuffer serializeToByteBuffer(final Event event) {
return ((SerializableEvent) event).serializeToByteBuffer();
}
@Override
- protected Event deserializeFromByteBuffer(ByteBuffer byteBuffer) {
+ protected Event deserializeFromByteBuffer(final ByteBuffer byteBuffer) {
try {
- SerializableEvent result =
PipeSchemaSerializableEventType.deserialize(byteBuffer);
+ final SerializableEvent result =
PipeSchemaSerializableEventType.deserialize(byteBuffer);
// We assume the caller of this method will put the deserialize result
into a queue,
// so we increase the reference count here.
((EnrichedEvent)
result).increaseReferenceCount(SchemaRegionListeningQueue.class.getName());
return result;
- } catch (IOException e) {
+ } catch (final IOException e) {
LOGGER.error("Failed to load snapshot from byteBuffer {}.", byteBuffer);
}
return null;
@@ -100,19 +100,19 @@ public class SchemaRegionListeningQueue extends
AbstractPipeListeningQueue {
/////////////////////////////// Snapshot ///////////////////////////////
- public synchronized boolean createSnapshot(File snapshotDir) {
+ public synchronized boolean createSnapshot(final File snapshotDir) {
try {
return super.serializeToFile(new File(snapshotDir, SNAPSHOT_FILE_NAME));
- } catch (IOException e) {
+ } catch (final IOException e) {
LOGGER.warn("Take snapshot error: {}", e.getMessage());
return false;
}
}
- public synchronized void loadSnapshot(File snapshotDir) {
+ public synchronized void loadSnapshot(final File snapshotDir) {
try {
super.deserializeFromFile(new File(snapshotDir, SNAPSHOT_FILE_NAME));
- } catch (IOException e) {
+ } catch (final IOException e) {
LOGGER.error("Failed to load snapshot {}", e.getMessage());
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
index 41dc339c23d..ee4ab738f80 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
@@ -611,13 +611,13 @@ public class RegionWriteExecutor {
}
private RegionExecutionResult executeInternalCreateMultiTimeSeries(
- InternalCreateMultiTimeSeriesNode node,
- WritePlanNodeExecutionContext context,
- boolean receivedFromPipe) {
- ISchemaRegion schemaRegion =
+ final InternalCreateMultiTimeSeriesNode node,
+ final WritePlanNodeExecutionContext context,
+ final boolean receivedFromPipe) {
+ final ISchemaRegion schemaRegion =
schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId());
RegionExecutionResult result;
- for (Map.Entry<PartialPath, Pair<Boolean, MeasurementGroup>> deviceEntry
:
+ for (final Map.Entry<PartialPath, Pair<Boolean, MeasurementGroup>>
deviceEntry :
node.getDeviceMap().entrySet()) {
result =
checkQuotaBeforeCreatingTimeSeries(
@@ -629,41 +629,46 @@ public class RegionWriteExecutor {
if
(CONFIG.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS))
{
context.getRegionWriteValidationRWLock().writeLock().lock();
try {
- List<TSStatus> failingStatus = new ArrayList<>();
- List<TSStatus> alreadyExistingStatus = new ArrayList<>();
-
- MeasurementGroup measurementGroup;
- Map<Integer, MetadataException> failingMeasurementMap;
- MetadataException metadataException;
- for (Map.Entry<PartialPath, Pair<Boolean, MeasurementGroup>>
deviceEntry :
- node.getDeviceMap().entrySet()) {
- measurementGroup = deviceEntry.getValue().right;
- failingMeasurementMap =
- schemaRegion.checkMeasurementExistence(
- deviceEntry.getKey(),
- measurementGroup.getMeasurements(),
- measurementGroup.getAliasList());
- // filter failed measurement and keep the rest for execution
- for (Map.Entry<Integer, MetadataException> failingMeasurement :
- failingMeasurementMap.entrySet()) {
- metadataException = failingMeasurement.getValue();
- if (metadataException.getErrorCode()
- == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
- // There's no need to internal create timeseries.
- alreadyExistingStatus.add(
- RpcUtils.getStatus(
- metadataException.getErrorCode(),
- MeasurementPath.transformDataToString(
- ((MeasurementAlreadyExistException)
metadataException)
- .getMeasurementPath())));
- } else {
- LOGGER.warn(METADATA_ERROR_MSG, metadataException);
- failingStatus.add(
- RpcUtils.getStatus(
- metadataException.getErrorCode(),
metadataException.getMessage()));
+ final List<TSStatus> failingStatus = new ArrayList<>();
+ final List<TSStatus> alreadyExistingStatus = new ArrayList<>();
+
+ // Do not filter measurements if the node is generated by pipe
+ // Because pipe may use the InternalCreateMultiTimeSeriesNode to
transfer historical data
+ // And the alias/tags/attributes may need to be updated for existing
time series
+ if (!receivedFromPipe) {
+ MeasurementGroup measurementGroup;
+ Map<Integer, MetadataException> failingMeasurementMap;
+ MetadataException metadataException;
+ for (final Map.Entry<PartialPath, Pair<Boolean, MeasurementGroup>>
deviceEntry :
+ node.getDeviceMap().entrySet()) {
+ measurementGroup = deviceEntry.getValue().right;
+ failingMeasurementMap =
+ schemaRegion.checkMeasurementExistence(
+ deviceEntry.getKey(),
+ measurementGroup.getMeasurements(),
+ measurementGroup.getAliasList());
+ // filter failed measurement and keep the rest for execution
+ for (final Map.Entry<Integer, MetadataException>
failingMeasurement :
+ failingMeasurementMap.entrySet()) {
+ metadataException = failingMeasurement.getValue();
+ if (metadataException.getErrorCode()
+ == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
+ // There's no need to internal create time series.
+ alreadyExistingStatus.add(
+ RpcUtils.getStatus(
+ metadataException.getErrorCode(),
+ MeasurementPath.transformDataToString(
+ ((MeasurementAlreadyExistException)
metadataException)
+ .getMeasurementPath())));
+ } else {
+ LOGGER.warn(METADATA_ERROR_MSG, metadataException);
+ failingStatus.add(
+ RpcUtils.getStatus(
+ metadataException.getErrorCode(),
metadataException.getMessage()));
+ }
}
+
measurementGroup.removeMeasurements(failingMeasurementMap.keySet());
}
-
measurementGroup.removeMeasurements(failingMeasurementMap.keySet());
}
return processExecutionResultOfInternalCreateSchema(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java
index da9c3f0a7b3..12536f0dbed 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java
@@ -112,7 +112,7 @@ public interface ISchemaRegion {
* @param offset
* @throws MetadataException
*/
- void createTimeseries(ICreateTimeSeriesPlan plan, long offset) throws
MetadataException;
+ void createTimeSeries(ICreateTimeSeriesPlan plan, long offset) throws
MetadataException;
/**
* Create aligned timeseries.
@@ -198,7 +198,7 @@ public interface ISchemaRegion {
// region Interfaces for metadata info Query
- // region Interfaces for timeseries, measurement and schema info Query
+ // region Interfaces for timeSeries, measurement and schema info Query
MeasurementPath fetchMeasurementPath(PartialPath fullPath) throws
MetadataException;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
index 7647702e839..25eaa8fe211 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
@@ -79,6 +79,8 @@ import
org.apache.iotdb.db.schemaengine.schemaregion.write.req.IPreDeleteTimeSer
import
org.apache.iotdb.db.schemaengine.schemaregion.write.req.IRollbackPreDeactivateTemplatePlan;
import
org.apache.iotdb.db.schemaengine.schemaregion.write.req.IRollbackPreDeleteTimeSeriesPlan;
import
org.apache.iotdb.db.schemaengine.schemaregion.write.req.SchemaRegionWritePlanFactory;
+import
org.apache.iotdb.db.schemaengine.schemaregion.write.req.impl.CreateAlignedTimeSeriesPlanImpl;
+import
org.apache.iotdb.db.schemaengine.schemaregion.write.req.impl.CreateTimeSeriesPlanImpl;
import
org.apache.iotdb.db.schemaengine.schemaregion.write.req.view.IAlterLogicalViewPlan;
import
org.apache.iotdb.db.schemaengine.schemaregion.write.req.view.ICreateLogicalViewPlan;
import
org.apache.iotdb.db.schemaengine.schemaregion.write.req.view.IDeleteLogicalViewPlan;
@@ -89,6 +91,7 @@ import
org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
import org.apache.iotdb.db.utils.SchemaUtils;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
@@ -101,6 +104,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -551,52 +555,62 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
// region Interfaces and Implementation for Timeseries operation
// including create and delete
- public void createTimeseries(ICreateTimeSeriesPlan plan) throws
MetadataException {
- createTimeseries(plan, -1);
+ public void createTimeseries(final ICreateTimeSeriesPlan plan) throws
MetadataException {
+ createTimeSeries(plan, -1);
}
@Override
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity
warning
- public void createTimeseries(ICreateTimeSeriesPlan plan, long offset) throws
MetadataException {
+ public void createTimeSeries(final ICreateTimeSeriesPlan plan, long offset)
+ throws MetadataException {
if (!regionStatistics.isAllowToCreateNewSeries()) {
throw new SeriesOverflowException(
regionStatistics.getGlobalMemoryUsage(),
regionStatistics.getGlobalSeriesNumber());
}
- IMeasurementMNode<IMemMNode> leafMNode;
+ final IMeasurementMNode<IMemMNode> leafMNode;
try {
- PartialPath path = plan.getPath();
+ final PartialPath path = plan.getPath();
SchemaUtils.checkDataTypeWithEncoding(plan.getDataType(),
plan.getEncoding());
- TSDataType type = plan.getDataType();
- // create time series in MTree
+ final TSDataType type = plan.getDataType();
+ // Create time series in MTree
leafMNode =
- mtree.createTimeseries(
+ mtree.createTimeSeries(
path,
type,
plan.getEncoding(),
plan.getCompressor(),
plan.getProps(),
- plan.getAlias());
+ plan.getAlias(),
+ (plan instanceof CreateTimeSeriesPlanImpl
+ && ((CreateTimeSeriesPlanImpl) plan).isWithMerge()));
+
+ // Should merge
+ if (Objects.isNull(leafMNode)) {
+ // Write an upsert plan directly
+ upsertAliasAndTagsAndAttributes(
+ plan.getAlias(), plan.getTags(), plan.getAttributes(), path);
+ return;
+ }
- // update statistics and schemaDataTypeNumMap
+ // Update statistics and schemaDataTypeNumMap
regionStatistics.addMeasurement(1L);
- // update tag index
+ // Update tag index
if (offset != -1 && isRecovering) {
- // the timeseries has already been created and now system is
recovering, using the tag
- // info
- // in tagFile to recover index directly
+ // The time series has already been created and now system is
recovering, using the tag
+ // info in tagFile to recover index directly
tagManager.recoverIndex(offset, leafMNode);
} else if (plan.getTags() != null) {
- // tag key, tag value
+ // Tag key, tag value
tagManager.addIndex(plan.getTags(), leafMNode);
}
- // write log
+ // Write log
if (!isRecovering) {
- // either tags or attributes is not empty
+ // Either tags or attributes is not empty
if ((plan.getTags() != null && !plan.getTags().isEmpty())
|| (plan.getAttributes() != null &&
!plan.getAttributes().isEmpty())) {
offset = tagManager.writeTagFile(plan.getTags(),
plan.getAttributes());
@@ -619,42 +633,81 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
* @param plan CreateAlignedTimeSeriesPlan
*/
@Override
- public void createAlignedTimeSeries(ICreateAlignedTimeSeriesPlan plan)
throws MetadataException {
- int seriesCount = plan.getMeasurements().size();
+ public void createAlignedTimeSeries(final ICreateAlignedTimeSeriesPlan plan)
+ throws MetadataException {
if (!regionStatistics.isAllowToCreateNewSeries()) {
throw new SeriesOverflowException(
regionStatistics.getGlobalMemoryUsage(),
regionStatistics.getGlobalSeriesNumber());
}
try {
- PartialPath prefixPath = plan.getDevicePath();
- List<String> measurements = plan.getMeasurements();
- List<TSDataType> dataTypes = plan.getDataTypes();
- List<TSEncoding> encodings = plan.getEncodings();
- List<Map<String, String>> tagsList = plan.getTagsList();
- List<Map<String, String>> attributesList = plan.getAttributesList();
- List<IMeasurementMNode<IMemMNode>> measurementMNodeList;
+ final PartialPath prefixPath = plan.getDevicePath();
+ final List<String> measurements = plan.getMeasurements();
+ final List<TSDataType> dataTypes = plan.getDataTypes();
+ final List<TSEncoding> encodings = plan.getEncodings();
+ final List<CompressionType> compressors = plan.getCompressors();
+ final List<String> aliasList = plan.getAliasList();
+ final List<Map<String, String>> tagsList = plan.getTagsList();
+ final List<Map<String, String>> attributesList =
plan.getAttributesList();
+ final List<IMeasurementMNode<IMemMNode>> measurementMNodeList;
for (int i = 0; i < measurements.size(); i++) {
SchemaUtils.checkDataTypeWithEncoding(dataTypes.get(i),
encodings.get(i));
}
+ // Used iff with merge
+ final Set<Integer> existingMeasurementIndexes = new HashSet<>();
+
// create time series in MTree
measurementMNodeList =
- mtree.createAlignedTimeseries(
+ mtree.createAlignedTimeSeries(
prefixPath,
measurements,
- plan.getDataTypes(),
- plan.getEncodings(),
- plan.getCompressors(),
- plan.getAliasList());
+ dataTypes,
+ encodings,
+ compressors,
+ aliasList,
+ (plan instanceof CreateAlignedTimeSeriesPlanImpl
+ && ((CreateAlignedTimeSeriesPlanImpl) plan).isWithMerge()),
+ existingMeasurementIndexes);
// update statistics and schemaDataTypeNumMap
- regionStatistics.addMeasurement(seriesCount);
+ regionStatistics.addMeasurement(measurementMNodeList.size());
List<Long> tagOffsets = plan.getTagOffsets();
+
+ // Merge the existing ones
+ // The existing measurements are written into the "upsert" but not
written
+ // to the "createSeries" in mLog
+ for (int i = measurements.size() - 1; i >= 0; --i) {
+ if (existingMeasurementIndexes.isEmpty()) {
+ break;
+ }
+ if (!existingMeasurementIndexes.remove(i)) {
+ continue;
+ }
+ // WARNING: The input lists can not be immutable when the "withMerge"
is set.
+ upsertAliasAndTagsAndAttributes(
+ Objects.nonNull(aliasList) ? aliasList.remove(i) : null,
+ Objects.nonNull(tagsList) ? tagsList.remove(i) : null,
+ Objects.nonNull(attributesList) ? attributesList.remove(i) : null,
+ prefixPath.concatNode(measurements.get(i)));
+ if (Objects.nonNull(tagOffsets) && !tagOffsets.isEmpty()) {
+ tagOffsets.remove(i);
+ }
+ // Nonnull
+ measurements.remove(i);
+ dataTypes.remove(i);
+ encodings.remove(i);
+ compressors.remove(i);
+ }
+
+ if (measurementMNodeList.isEmpty()) {
+ return;
+ }
+
for (int i = 0; i < measurements.size(); i++) {
- if (tagOffsets != null && !plan.getTagOffsets().isEmpty() &&
isRecovering) {
+ if (tagOffsets != null && !tagOffsets.isEmpty() && isRecovering) {
if (tagOffsets.get(i) != -1) {
tagManager.recoverIndex(plan.getTagOffsets().get(i),
measurementMNodeList.get(i));
}
@@ -666,7 +719,7 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
}
}
- // write log
+ // Write log
tagOffsets = new ArrayList<>();
if (!isRecovering) {
if ((tagsList != null && !tagsList.isEmpty())
@@ -696,7 +749,7 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
measurementMNodeList.get(i).setOffset(tagOffsets.get(i));
}
}
- } catch (IOException e) {
+ } catch (final IOException e) {
throw new MetadataException(e);
}
}
@@ -1339,7 +1392,7 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
public RecoverOperationResult visitCreateTimeSeries(
ICreateTimeSeriesPlan createTimeSeriesPlan, SchemaRegionMemoryImpl
context) {
try {
- createTimeseries(createTimeSeriesPlan,
createTimeSeriesPlan.getTagOffset());
+ createTimeSeries(createTimeSeriesPlan,
createTimeSeriesPlan.getTagOffset());
return RecoverOperationResult.SUCCESS;
} catch (MetadataException e) {
return new RecoverOperationResult(e);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
index f8659a4d02d..e57620c37e5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
@@ -82,6 +82,8 @@ import
org.apache.iotdb.db.schemaengine.schemaregion.write.req.IPreDeleteTimeSer
import
org.apache.iotdb.db.schemaengine.schemaregion.write.req.IRollbackPreDeactivateTemplatePlan;
import
org.apache.iotdb.db.schemaengine.schemaregion.write.req.IRollbackPreDeleteTimeSeriesPlan;
import
org.apache.iotdb.db.schemaengine.schemaregion.write.req.SchemaRegionWritePlanFactory;
+import
org.apache.iotdb.db.schemaengine.schemaregion.write.req.impl.CreateAlignedTimeSeriesPlanImpl;
+import
org.apache.iotdb.db.schemaengine.schemaregion.write.req.impl.CreateTimeSeriesPlanImpl;
import
org.apache.iotdb.db.schemaengine.schemaregion.write.req.view.IAlterLogicalViewPlan;
import
org.apache.iotdb.db.schemaengine.schemaregion.write.req.view.ICreateLogicalViewPlan;
import org.apache.iotdb.db.schemaengine.template.Template;
@@ -102,6 +104,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
@@ -587,18 +590,18 @@ public class SchemaRegionPBTreeImpl implements
ISchemaRegion {
// endregion
- // region Interfaces and Implementation for Timeseries operation
+ // region Interfaces and Implementation for Time series operation
// including create and delete
- public void createTimeseries(ICreateTimeSeriesPlan plan) throws
MetadataException {
- createTimeseries(plan, -1);
+ public void createTimeSeries(ICreateTimeSeriesPlan plan) throws
MetadataException {
+ createTimeSeries(plan, -1);
}
- public void recoverTimeseries(ICreateTimeSeriesPlan plan, long offset)
throws MetadataException {
+ public void recoverTimeSeries(ICreateTimeSeriesPlan plan, long offset)
throws MetadataException {
boolean done = false;
while (!done) {
try {
- createTimeseries(plan, offset);
+ createTimeSeries(plan, offset);
done = true;
} catch (AliasAlreadyExistException | PathAlreadyExistException e) {
// skip
@@ -609,40 +612,51 @@ public class SchemaRegionPBTreeImpl implements
ISchemaRegion {
@Override
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity
warning
- public void createTimeseries(ICreateTimeSeriesPlan plan, long offset) throws
MetadataException {
+ public void createTimeSeries(final ICreateTimeSeriesPlan plan, long offset)
+ throws MetadataException {
while (!regionStatistics.isAllowToCreateNewSeries()) {
ReleaseFlushMonitor.getInstance().waitIfReleasing();
}
- PartialPath path = plan.getPath();
- IMeasurementMNode<ICachedMNode> leafMNode;
+ final PartialPath path = plan.getPath();
+ final IMeasurementMNode<ICachedMNode> leafMNode;
try {
SchemaUtils.checkDataTypeWithEncoding(plan.getDataType(),
plan.getEncoding());
- TSDataType type = plan.getDataType();
- // create time series in MTree
+ final TSDataType type = plan.getDataType();
+ // Create time series in MTree
leafMNode =
- mtree.createTimeseriesWithPinnedReturn(
+ mtree.createTimeSeriesWithPinnedReturn(
path,
type,
plan.getEncoding(),
plan.getCompressor(),
plan.getProps(),
- plan.getAlias());
+ plan.getAlias(),
+ (plan instanceof CreateTimeSeriesPlanImpl
+ && ((CreateTimeSeriesPlanImpl) plan).isWithMerge()));
try {
- // update statistics and schemaDataTypeNumMap
+ // Should merge
+ if (Objects.isNull(leafMNode)) {
+ // Write an upsert plan directly
+ // Note that the "pin" and "unpin" is reentrant
+ upsertAliasAndTagsAndAttributes(
+ plan.getAlias(), plan.getTags(), plan.getAttributes(), path);
+ return;
+ }
+
+ // Update statistics and schemaDataTypeNumMap
regionStatistics.addMeasurement(1L);
- // update tag index
+ // Update tag index
if (offset != -1 && isRecovering) {
- // the timeseries has already been created and now system is
recovering, using the tag
- // info
- // in tagFile to recover index directly
+ // The time series has already been created and now system is
recovering, using the tag
+ // info in tagFile to recover index directly
tagManager.recoverIndex(offset, leafMNode);
mtree.pinMNode(leafMNode.getAsMNode());
} else if (plan.getTags() != null) {
- // tag key, tag value
+ // Tag key, tag value
tagManager.addIndex(plan.getTags(), leafMNode);
mtree.pinMNode(leafMNode.getAsMNode());
}
@@ -664,7 +678,9 @@ public class SchemaRegionPBTreeImpl implements
ISchemaRegion {
}
} finally {
- mtree.unPinMNode(leafMNode.getAsMNode());
+ if (Objects.nonNull(leafMNode)) {
+ mtree.unPinMNode(leafMNode.getAsMNode());
+ }
}
} catch (IOException e) {
@@ -694,41 +710,79 @@ public class SchemaRegionPBTreeImpl implements
ISchemaRegion {
* @param plan CreateAlignedTimeSeriesPlan
*/
@Override
- public void createAlignedTimeSeries(ICreateAlignedTimeSeriesPlan plan)
throws MetadataException {
- int seriesCount = plan.getMeasurements().size();
+ public void createAlignedTimeSeries(final ICreateAlignedTimeSeriesPlan plan)
+ throws MetadataException {
while (!regionStatistics.isAllowToCreateNewSeries()) {
ReleaseFlushMonitor.getInstance().waitIfReleasing();
}
try {
- PartialPath prefixPath = plan.getDevicePath();
- List<String> measurements = plan.getMeasurements();
- List<TSDataType> dataTypes = plan.getDataTypes();
- List<TSEncoding> encodings = plan.getEncodings();
- List<Map<String, String>> tagsList = plan.getTagsList();
- List<Map<String, String>> attributesList = plan.getAttributesList();
- List<IMeasurementMNode<ICachedMNode>> measurementMNodeList;
+ final PartialPath prefixPath = plan.getDevicePath();
+ final List<String> measurements = plan.getMeasurements();
+ final List<TSDataType> dataTypes = plan.getDataTypes();
+ final List<TSEncoding> encodings = plan.getEncodings();
+ final List<CompressionType> compressors = plan.getCompressors();
+ final List<String> aliasList = plan.getAliasList();
+ final List<Map<String, String>> tagsList = plan.getTagsList();
+ final List<Map<String, String>> attributesList =
plan.getAttributesList();
+ final List<IMeasurementMNode<ICachedMNode>> measurementMNodeList;
for (int i = 0; i < measurements.size(); i++) {
SchemaUtils.checkDataTypeWithEncoding(dataTypes.get(i),
encodings.get(i));
}
+ // Used iff with merge
+ final Set<Integer> existingMeasurementIndexes = new HashSet<>();
+
// Create time series in MTree
measurementMNodeList =
- mtree.createAlignedTimeseries(
+ mtree.createAlignedTimeSeries(
prefixPath,
measurements,
- plan.getDataTypes(),
- plan.getEncodings(),
- plan.getCompressors(),
- plan.getAliasList());
+ dataTypes,
+ encodings,
+ compressors,
+ aliasList,
+ (plan instanceof CreateAlignedTimeSeriesPlanImpl
+ && ((CreateAlignedTimeSeriesPlanImpl) plan).isWithMerge()),
+ existingMeasurementIndexes);
try {
-
// Update statistics and schemaDataTypeNumMap
- regionStatistics.addMeasurement(seriesCount);
+ regionStatistics.addMeasurement(measurementMNodeList.size());
List<Long> tagOffsets = plan.getTagOffsets();
+
+ // Merge the existing ones
+ // The existing measurements are written into the "upsert" but not
written
+ // to the "createSeries" in mLog
+ for (int i = measurements.size() - 1; i >= 0; --i) {
+ if (existingMeasurementIndexes.isEmpty()) {
+ break;
+ }
+ if (!existingMeasurementIndexes.remove(i)) {
+ continue;
+ }
+ // WARNING: The input lists can not be immutable when the
"withMerge" is set.
+ upsertAliasAndTagsAndAttributes(
+ Objects.nonNull(aliasList) ? aliasList.remove(i) : null,
+ Objects.nonNull(tagsList) ? tagsList.remove(i) : null,
+ Objects.nonNull(attributesList) ? attributesList.remove(i) :
null,
+ prefixPath.concatNode(measurements.get(i)));
+ if (Objects.nonNull(tagOffsets) && !tagOffsets.isEmpty()) {
+ tagOffsets.remove(i);
+ }
+ // Nonnull
+ measurements.remove(i);
+ dataTypes.remove(i);
+ encodings.remove(i);
+ compressors.remove(i);
+ }
+
+ if (measurementMNodeList.isEmpty()) {
+ return;
+ }
+
for (int i = 0; i < measurements.size(); i++) {
if (tagOffsets != null && !plan.getTagOffsets().isEmpty() &&
isRecovering) {
if (tagOffsets.get(i) != -1) {
@@ -778,11 +832,11 @@ public class SchemaRegionPBTreeImpl implements
ISchemaRegion {
}
}
} finally {
- for (IMeasurementMNode<ICachedMNode> measurementMNode :
measurementMNodeList) {
+ for (final IMeasurementMNode<ICachedMNode> measurementMNode :
measurementMNodeList) {
mtree.unPinMNode(measurementMNode.getAsMNode());
}
}
- } catch (IOException e) {
+ } catch (final IOException e) {
throw new MetadataException(e);
}
}
@@ -1444,7 +1498,7 @@ public class SchemaRegionPBTreeImpl implements
ISchemaRegion {
public RecoverOperationResult visitCreateTimeSeries(
ICreateTimeSeriesPlan createTimeSeriesPlan, SchemaRegionPBTreeImpl
context) {
try {
- recoverTimeseries(createTimeSeriesPlan,
createTimeSeriesPlan.getTagOffset());
+ recoverTimeSeries(createTimeSeriesPlan,
createTimeSeriesPlan.getTagOffset());
return RecoverOperationResult.SUCCESS;
} catch (MetadataException e) {
return new RecoverOperationResult(e);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java
index 615e83aaf19..2710b754fe5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java
@@ -190,56 +190,62 @@ public class MTreeBelowSGMemoryImpl {
// endregion
- // region Timeseries operation, including create and delete
+ // region time series operation, including creation and deletion
/**
- * Create a timeseries with a full path from root to leaf node. Before
creating a timeseries, the
- * database should be set first, throw exception otherwise
+ * Create a time series with a full path from root to leaf node. Before
creating a time series,
+ * the database should be set first, throw exception otherwise
*
- * @param path timeseries path
+ * @param path time series path
* @param dataType data type
* @param encoding encoding
* @param compressor compressor
* @param props props
* @param alias alias of measurement
*/
- public IMeasurementMNode<IMemMNode> createTimeseries(
- PartialPath path,
- TSDataType dataType,
- TSEncoding encoding,
- CompressionType compressor,
- Map<String, String> props,
- String alias)
+ public IMeasurementMNode<IMemMNode> createTimeSeries(
+ final PartialPath path,
+ final TSDataType dataType,
+ final TSEncoding encoding,
+ final CompressionType compressor,
+ final Map<String, String> props,
+ final String alias,
+ final boolean withMerge)
throws MetadataException {
- String[] nodeNames = path.getNodes();
+ final String[] nodeNames = path.getNodes();
if (nodeNames.length <= 2) {
throw new IllegalPathException(path.getFullPath());
}
MetaFormatUtils.checkTimeseries(path);
- PartialPath devicePath = path.getDevicePath();
- IMemMNode deviceParent = checkAndAutoCreateInternalPath(devicePath);
+ final PartialPath devicePath = path.getDevicePath();
+ final IMemMNode deviceParent = checkAndAutoCreateInternalPath(devicePath);
// synchronize check and add, we need addChild and add Alias become atomic
operation
- // only write on mtree will be synchronized
+ // only write on mTree will be synchronized
synchronized (this) {
- IMemMNode device =
checkAndAutoCreateDeviceNode(devicePath.getTailNode(), deviceParent);
+ final IMemMNode device =
checkAndAutoCreateDeviceNode(devicePath.getTailNode(), deviceParent);
MetaFormatUtils.checkTimeseriesProps(path.getFullPath(), props);
- String leafName = path.getMeasurement();
+ final String leafName = path.getMeasurement();
- if (alias != null && device.hasChild(alias)) {
+ if (!withMerge && alias != null && device.hasChild(alias)) {
throw new AliasAlreadyExistException(path.getFullPath(), alias);
}
if (device.hasChild(leafName)) {
- IMemMNode node = device.getChild(leafName);
+ final IMemMNode node = device.getChild(leafName);
if (node.isMeasurement()) {
- if (node.getAsMeasurementMNode().isPreDeleted()) {
+ final IMeasurementMNode<IMemMNode> measurementNode =
node.getAsMeasurementMNode();
+ if (measurementNode.isPreDeleted()) {
throw new MeasurementInBlackListException(path);
- } else {
+ } else if (!withMerge || measurementNode.getDataType() != dataType) {
+ // Report conflict if the types are different
throw new MeasurementAlreadyExistException(
path.getFullPath(),
node.getAsMeasurementMNode().getMeasurementPath());
+ } else {
+ // Return null iff we should merge the time series with the
existing one
+ return null;
}
} else {
throw new PathAlreadyExistException(path.getFullPath());
@@ -250,40 +256,43 @@ public class MTreeBelowSGMemoryImpl {
&& device.getAsDeviceMNode().isAlignedNullable() != null
&& device.getAsDeviceMNode().isAligned()) {
throw new AlignedTimeseriesException(
- "timeseries under this device is aligned, please use
createAlignedTimeseries or change device.",
+ "time series under this device is aligned, please use
createAlignedTimeseries or change device.",
device.getFullPath());
}
- IDeviceMNode<IMemMNode> entityMNode;
+ final IDeviceMNode<IMemMNode> entityMNode;
if (device.isDevice()) {
entityMNode = device.getAsDeviceMNode();
} else {
entityMNode = store.setToEntity(device);
}
- // create a non-aligned timeseries
+ // create a non-aligned time series
if (entityMNode.isAlignedNullable() == null) {
entityMNode.setAligned(false);
}
- IMeasurementMNode<IMemMNode> measurementMNode =
+ final IMeasurementMNode<IMemMNode> measurementMNode =
nodeFactory.createMeasurementMNode(
entityMNode,
leafName,
new MeasurementSchema(leafName, dataType, encoding, compressor,
props),
alias);
+
store.addChild(entityMNode.getAsMNode(), leafName,
measurementMNode.getAsMNode());
+
// link alias to LeafMNode
if (alias != null) {
entityMNode.addAlias(alias, measurementMNode);
}
+
return measurementMNode;
}
}
/**
- * Create aligned timeseries with full paths from root to one leaf node.
Before creating
- * timeseries, the * database should be set first, throw exception otherwise
+ * Create aligned time series with full paths from root to one leaf node.
Before creating time
+ * series, the * database should be set first, throw exception otherwise
*
* @param devicePath device path
* @param measurements measurements list
@@ -291,33 +300,39 @@ public class MTreeBelowSGMemoryImpl {
* @param encodings encodings list
* @param compressors compressor
*/
- public List<IMeasurementMNode<IMemMNode>> createAlignedTimeseries(
- PartialPath devicePath,
- List<String> measurements,
- List<TSDataType> dataTypes,
- List<TSEncoding> encodings,
- List<CompressionType> compressors,
- List<String> aliasList)
+ public List<IMeasurementMNode<IMemMNode>> createAlignedTimeSeries(
+ final PartialPath devicePath,
+ final List<String> measurements,
+ final List<TSDataType> dataTypes,
+ final List<TSEncoding> encodings,
+ final List<CompressionType> compressors,
+ final List<String> aliasList,
+ final boolean withMerge,
+ final Set<Integer> existingMeasurementIndexes)
throws MetadataException {
- List<IMeasurementMNode<IMemMNode>> measurementMNodeList = new
ArrayList<>();
+ final List<IMeasurementMNode<IMemMNode>> measurementMNodeList = new
ArrayList<>();
MetaFormatUtils.checkSchemaMeasurementNames(measurements);
- IMemMNode deviceParent = checkAndAutoCreateInternalPath(devicePath);
+ final IMemMNode deviceParent = checkAndAutoCreateInternalPath(devicePath);
// synchronize check and add, we need addChild operation be atomic.
- // only write operations on mtree will be synchronized
+ // only write operations on mTree will be synchronized
synchronized (this) {
- IMemMNode device =
checkAndAutoCreateDeviceNode(devicePath.getTailNode(), deviceParent);
+ final IMemMNode device =
checkAndAutoCreateDeviceNode(devicePath.getTailNode(), deviceParent);
for (int i = 0; i < measurements.size(); i++) {
if (device.hasChild(measurements.get(i))) {
- IMemMNode node = device.getChild(measurements.get(i));
+ final IMemMNode node = device.getChild(measurements.get(i));
if (node.isMeasurement()) {
+ final IMeasurementMNode<IMemMNode> measurementNode =
node.getAsMeasurementMNode();
if (node.getAsMeasurementMNode().isPreDeleted()) {
throw new
MeasurementInBlackListException(devicePath.concatNode(measurements.get(i)));
- } else {
+ } else if (!withMerge || measurementNode.getDataType() !=
dataTypes.get(i)) {
throw new MeasurementAlreadyExistException(
devicePath.getFullPath() + "." + measurements.get(i),
node.getAsMeasurementMNode().getMeasurementPath());
+ } else {
+ existingMeasurementIndexes.add(i);
+ continue;
}
} else {
throw new PathAlreadyExistException(
@@ -334,11 +349,11 @@ public class MTreeBelowSGMemoryImpl {
&& device.getAsDeviceMNode().isAlignedNullable() != null
&& !device.getAsDeviceMNode().isAligned()) {
throw new AlignedTimeseriesException(
- "Timeseries under this device is not aligned, please use
createTimeseries or change device.",
+ "Time series under this device is not aligned, please use
createTimeSeries or change device.",
devicePath.getFullPath());
}
- IDeviceMNode<IMemMNode> entityMNode;
+ final IDeviceMNode<IMemMNode> entityMNode;
if (device.isDevice()) {
entityMNode = device.getAsDeviceMNode();
} else {
@@ -346,13 +361,17 @@ public class MTreeBelowSGMemoryImpl {
entityMNode.setAligned(true);
}
- // create a aligned timeseries
+ // create an aligned time series
if (entityMNode.isAlignedNullable() == null) {
entityMNode.setAligned(true);
}
for (int i = 0; i < measurements.size(); i++) {
- IMeasurementMNode<IMemMNode> measurementMNode =
+ if (existingMeasurementIndexes.contains(i)) {
+ continue;
+ }
+
+ final IMeasurementMNode<IMemMNode> measurementMNode =
nodeFactory.createMeasurementMNode(
entityMNode,
measurements.get(i),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/MTreeBelowSGCachedImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/MTreeBelowSGCachedImpl.java
index b1237304e5c..192e1209f10 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/MTreeBelowSGCachedImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/MTreeBelowSGCachedImpl.java
@@ -256,7 +256,7 @@ public class MTreeBelowSGCachedImpl {
// region Timeseries operation, including create and delete
- public IMeasurementMNode<ICachedMNode> createTimeseries(
+ public IMeasurementMNode<ICachedMNode> createTimeSeries(
PartialPath path,
TSDataType dataType,
TSEncoding encoding,
@@ -265,66 +265,82 @@ public class MTreeBelowSGCachedImpl {
String alias)
throws MetadataException {
IMeasurementMNode<ICachedMNode> measurementMNode =
- createTimeseriesWithPinnedReturn(path, dataType, encoding, compressor,
props, alias);
+ createTimeSeriesWithPinnedReturn(path, dataType, encoding, compressor,
props, alias, false);
unPinMNode(measurementMNode.getAsMNode());
return measurementMNode;
}
/**
- * Create a timeseries with a full path from root to leaf node. Before
creating a timeseries, the
- * database should be set first, throw exception otherwise
+ * Create a time series with a full path from root to leaf node. Before
creating a time series,
+ * the database should be set first, throw exception otherwise
*
- * @param path timeseries path
+ * @param path time series path
* @param dataType data type
* @param encoding encoding
* @param compressor compressor
* @param props props
* @param alias alias of measurement
*/
- public IMeasurementMNode<ICachedMNode> createTimeseriesWithPinnedReturn(
- PartialPath path,
- TSDataType dataType,
- TSEncoding encoding,
- CompressionType compressor,
- Map<String, String> props,
- String alias)
+ public IMeasurementMNode<ICachedMNode> createTimeSeriesWithPinnedReturn(
+ final PartialPath path,
+ final TSDataType dataType,
+ final TSEncoding encoding,
+ final CompressionType compressor,
+ final Map<String, String> props,
+ final String alias,
+ final boolean withMerge)
throws MetadataException {
- String[] nodeNames = path.getNodes();
+ final String[] nodeNames = path.getNodes();
if (nodeNames.length <= 2) {
throw new IllegalPathException(path.getFullPath());
}
MetaFormatUtils.checkTimeseries(path);
- PartialPath devicePath = path.getDevicePath();
- ICachedMNode deviceParent = checkAndAutoCreateInternalPath(devicePath);
+ final PartialPath devicePath = path.getDevicePath();
+ final ICachedMNode deviceParent =
checkAndAutoCreateInternalPath(devicePath);
try {
// synchronize check and add, we need addChild and add Alias become
atomic operation
- // only write on mtree will be synchronized
+ // only write on mTree will be synchronized
synchronized (this) {
ICachedMNode device =
checkAndAutoCreateDeviceNode(devicePath.getTailNode(), deviceParent);
try {
MetaFormatUtils.checkTimeseriesProps(path.getFullPath(), props);
- String leafName = path.getMeasurement();
+ final String leafName = path.getMeasurement();
- if (alias != null && store.hasChild(device, alias)) {
+ if (!withMerge && alias != null && store.hasChild(device, alias)) {
throw new AliasAlreadyExistException(path.getFullPath(), alias);
}
if (store.hasChild(device, leafName)) {
- throw new PathAlreadyExistException(path.getFullPath());
+ final ICachedMNode node = device.getChild(leafName);
+ if (node.isMeasurement()) {
+ final IMeasurementMNode<ICachedMNode> measurementNode =
node.getAsMeasurementMNode();
+ if (measurementNode.isPreDeleted()) {
+ throw new MeasurementInBlackListException(path);
+ } else if (!withMerge || measurementNode.getDataType() !=
dataType) {
+ // Report conflict if the types are different
+ throw new MeasurementAlreadyExistException(
+ path.getFullPath(),
node.getAsMeasurementMNode().getMeasurementPath());
+ } else {
+ // Return null iff we should merge the time series with the
existing one
+ return null;
+ }
+ } else {
+ throw new PathAlreadyExistException(path.getFullPath());
+ }
}
if (device.isDevice()
&& device.getAsDeviceMNode().isAlignedNullable() != null
&& device.getAsDeviceMNode().isAligned()) {
throw new AlignedTimeseriesException(
- "timeseries under this device is aligned, please use
createAlignedTimeseries or change device.",
+ "Time series under this device is aligned, please use
createAlignedTimeSeries or change device.",
device.getFullPath());
}
- IDeviceMNode<ICachedMNode> entityMNode;
+ final IDeviceMNode<ICachedMNode> entityMNode;
if (device.isDevice()) {
entityMNode = device.getAsDeviceMNode();
} else {
@@ -332,12 +348,12 @@ public class MTreeBelowSGCachedImpl {
device = entityMNode.getAsMNode();
}
- // create a non-aligned timeseries
+ // create a non-aligned time series
if (entityMNode.isAlignedNullable() == null) {
entityMNode.setAligned(false);
}
- IMeasurementMNode<ICachedMNode> measurementMNode =
+ final IMeasurementMNode<ICachedMNode> measurementMNode =
nodeFactory.createMeasurementMNode(
entityMNode,
leafName,
@@ -370,30 +386,50 @@ public class MTreeBelowSGCachedImpl {
* @param encodings encodings list
* @param compressors compressor
*/
- public List<IMeasurementMNode<ICachedMNode>> createAlignedTimeseries(
- PartialPath devicePath,
- List<String> measurements,
- List<TSDataType> dataTypes,
- List<TSEncoding> encodings,
- List<CompressionType> compressors,
- List<String> aliasList)
+ public List<IMeasurementMNode<ICachedMNode>> createAlignedTimeSeries(
+ final PartialPath devicePath,
+ final List<String> measurements,
+ final List<TSDataType> dataTypes,
+ final List<TSEncoding> encodings,
+ final List<CompressionType> compressors,
+ final List<String> aliasList,
+ final boolean withMerge,
+ final Set<Integer> existingMeasurementIndexes)
throws MetadataException {
- List<IMeasurementMNode<ICachedMNode>> measurementMNodeList = new
ArrayList<>();
+ final List<IMeasurementMNode<ICachedMNode>> measurementMNodeList = new
ArrayList<>();
MetaFormatUtils.checkSchemaMeasurementNames(measurements);
- ICachedMNode deviceParent = checkAndAutoCreateInternalPath(devicePath);
+ final ICachedMNode deviceParent =
checkAndAutoCreateInternalPath(devicePath);
try {
// synchronize check and add, we need addChild operation be atomic.
- // only write operations on mtree will be synchronized
+ // only write operations on mTree will be synchronized
synchronized (this) {
ICachedMNode device =
checkAndAutoCreateDeviceNode(devicePath.getTailNode(), deviceParent);
try {
for (int i = 0; i < measurements.size(); i++) {
if (store.hasChild(device, measurements.get(i))) {
- throw new PathAlreadyExistException(
- devicePath.getFullPath() + "." + measurements.get(i));
+ final ICachedMNode node = device.getChild(measurements.get(i));
+ if (node.isMeasurement()) {
+ final IMeasurementMNode<ICachedMNode> measurementNode =
+ node.getAsMeasurementMNode();
+ if (node.getAsMeasurementMNode().isPreDeleted()) {
+ throw new MeasurementInBlackListException(
+ devicePath.concatNode(measurements.get(i)));
+ } else if (!withMerge || measurementNode.getDataType() !=
dataTypes.get(i)) {
+ throw new MeasurementAlreadyExistException(
+ devicePath.getFullPath() + "." + measurements.get(i),
+ node.getAsMeasurementMNode().getMeasurementPath());
+ } else {
+ existingMeasurementIndexes.add(i);
+ continue;
+ }
+ } else {
+ throw new PathAlreadyExistException(
+ devicePath.getFullPath() + "." + measurements.get(i));
+ }
}
+
if (aliasList != null
&& aliasList.get(i) != null
&& store.hasChild(device, aliasList.get(i))) {
@@ -406,11 +442,11 @@ public class MTreeBelowSGCachedImpl {
&& device.getAsDeviceMNode().isAlignedNullable() != null
&& !device.getAsDeviceMNode().isAligned()) {
throw new AlignedTimeseriesException(
- "Timeseries under this device is not aligned, please use
createTimeseries or change device.",
+ "TimeSeries under this device is not aligned, please use
createTimeSeries or change device.",
devicePath.getFullPath());
}
- IDeviceMNode<ICachedMNode> entityMNode;
+ final IDeviceMNode<ICachedMNode> entityMNode;
if (device.isDevice()) {
entityMNode = device.getAsDeviceMNode();
} else {
@@ -419,13 +455,17 @@ public class MTreeBelowSGCachedImpl {
device = entityMNode.getAsMNode();
}
- // create a aligned timeseries
+ // create an aligned time series
if (entityMNode.isAlignedNullable() == null) {
entityMNode.setAligned(true);
}
for (int i = 0; i < measurements.size(); i++) {
- IMeasurementMNode<ICachedMNode> measurementMNode =
+ if (existingMeasurementIndexes.contains(i)) {
+ continue;
+ }
+
+ final IMeasurementMNode<ICachedMNode> measurementMNode =
nodeFactory.createMeasurementMNode(
entityMNode,
measurements.get(i),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/ICachedMNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/ICachedMNode.java
index cac0ee2d988..489b1a642aa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/ICachedMNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/ICachedMNode.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode;
import org.apache.iotdb.commons.schema.node.IMNode;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/SchemaRegionWritePlanFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/SchemaRegionWritePlanFactory.java
index 16b00d9b8cb..42a2c530e2a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/SchemaRegionWritePlanFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/SchemaRegionWritePlanFactory.java
@@ -57,7 +57,7 @@ public class SchemaRegionWritePlanFactory {
private SchemaRegionWritePlanFactory() {}
- public static ISchemaRegionPlan getEmptyPlan(SchemaRegionPlanType planType) {
+ public static ISchemaRegionPlan getEmptyPlan(final SchemaRegionPlanType
planType) {
switch (planType) {
case CREATE_TIMESERIES:
return new CreateTimeSeriesPlanImpl();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateAlignedTimeSeriesPlanImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateAlignedTimeSeriesPlanImpl.java
index f8b6e747e61..f02aa52f4c0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateAlignedTimeSeriesPlanImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateAlignedTimeSeriesPlanImpl.java
@@ -43,18 +43,19 @@ public class CreateAlignedTimeSeriesPlanImpl implements
ICreateAlignedTimeSeries
private List<Map<String, String>> tagsList;
private List<Map<String, String>> attributesList;
private List<Long> tagOffsets = null;
+ private transient boolean withMerge;
public CreateAlignedTimeSeriesPlanImpl() {}
public CreateAlignedTimeSeriesPlanImpl(
- PartialPath devicePath,
- List<String> measurements,
- List<TSDataType> dataTypes,
- List<TSEncoding> encodings,
- List<CompressionType> compressors,
- List<String> aliasList,
- List<Map<String, String>> tagsList,
- List<Map<String, String>> attributesList) {
+ final PartialPath devicePath,
+ final List<String> measurements,
+ final List<TSDataType> dataTypes,
+ final List<TSEncoding> encodings,
+ final List<CompressionType> compressors,
+ final List<String> aliasList,
+ final List<Map<String, String>> tagsList,
+ final List<Map<String, String>> attributesList) {
this.devicePath = devicePath;
this.measurements = measurements;
this.dataTypes = dataTypes;
@@ -66,7 +67,7 @@ public class CreateAlignedTimeSeriesPlanImpl implements
ICreateAlignedTimeSeries
}
public CreateAlignedTimeSeriesPlanImpl(
- PartialPath devicePath, String measurement, MeasurementSchema schema) {
+ final PartialPath devicePath, final String measurement, final
MeasurementSchema schema) {
this.devicePath = devicePath;
this.measurements = Collections.singletonList(measurement);
this.dataTypes = Collections.singletonList(schema.getType());
@@ -80,7 +81,7 @@ public class CreateAlignedTimeSeriesPlanImpl implements
ICreateAlignedTimeSeries
}
@Override
- public void setDevicePath(PartialPath devicePath) {
+ public void setDevicePath(final PartialPath devicePath) {
this.devicePath = devicePath;
}
@@ -90,7 +91,7 @@ public class CreateAlignedTimeSeriesPlanImpl implements
ICreateAlignedTimeSeries
}
@Override
- public void setMeasurements(List<String> measurements) {
+ public void setMeasurements(final List<String> measurements) {
this.measurements = measurements;
}
@@ -100,7 +101,7 @@ public class CreateAlignedTimeSeriesPlanImpl implements
ICreateAlignedTimeSeries
}
@Override
- public void setDataTypes(List<TSDataType> dataTypes) {
+ public void setDataTypes(final List<TSDataType> dataTypes) {
this.dataTypes = dataTypes;
}
@@ -110,7 +111,7 @@ public class CreateAlignedTimeSeriesPlanImpl implements
ICreateAlignedTimeSeries
}
@Override
- public void setEncodings(List<TSEncoding> encodings) {
+ public void setEncodings(final List<TSEncoding> encodings) {
this.encodings = encodings;
}
@@ -120,7 +121,7 @@ public class CreateAlignedTimeSeriesPlanImpl implements
ICreateAlignedTimeSeries
}
@Override
- public void setCompressors(List<CompressionType> compressors) {
+ public void setCompressors(final List<CompressionType> compressors) {
this.compressors = compressors;
}
@@ -130,7 +131,7 @@ public class CreateAlignedTimeSeriesPlanImpl implements
ICreateAlignedTimeSeries
}
@Override
- public void setAliasList(List<String> aliasList) {
+ public void setAliasList(final List<String> aliasList) {
this.aliasList = aliasList;
}
@@ -140,7 +141,7 @@ public class CreateAlignedTimeSeriesPlanImpl implements
ICreateAlignedTimeSeries
}
@Override
- public void setTagsList(List<Map<String, String>> tagsList) {
+ public void setTagsList(final List<Map<String, String>> tagsList) {
this.tagsList = tagsList;
}
@@ -150,7 +151,7 @@ public class CreateAlignedTimeSeriesPlanImpl implements
ICreateAlignedTimeSeries
}
@Override
- public void setAttributesList(List<Map<String, String>> attributesList) {
+ public void setAttributesList(final List<Map<String, String>>
attributesList) {
this.attributesList = attributesList;
}
@@ -166,7 +167,15 @@ public class CreateAlignedTimeSeriesPlanImpl implements
ICreateAlignedTimeSeries
}
@Override
- public void setTagOffsets(List<Long> tagOffsets) {
+ public void setTagOffsets(final List<Long> tagOffsets) {
this.tagOffsets = tagOffsets;
}
+
+ public boolean isWithMerge() {
+ return withMerge;
+ }
+
+ public void setWithMerge(final boolean withMerge) {
+ this.withMerge = withMerge;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateTimeSeriesPlanImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateTimeSeriesPlanImpl.java
index 0e0e8d6bdbb..1f5aa8e81ea 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateTimeSeriesPlanImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateTimeSeriesPlanImpl.java
@@ -40,6 +40,7 @@ public class CreateTimeSeriesPlanImpl implements
ICreateTimeSeriesPlan {
private Map<String, String> tags = null;
private Map<String, String> attributes = null;
private long tagOffset = -1;
+ private transient boolean withMerge;
public CreateTimeSeriesPlanImpl() {}
@@ -161,4 +162,12 @@ public class CreateTimeSeriesPlanImpl implements
ICreateTimeSeriesPlan {
public void setTagOffset(long tagOffset) {
this.tagOffset = tagOffset;
}
+
+ public boolean isWithMerge() {
+ return withMerge;
+ }
+
+ public void setWithMerge(final boolean withMerge) {
+ this.withMerge = withMerge;
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/AbstractSchemaRegionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/AbstractSchemaRegionTest.java
index 4df89c425a1..9b5b141d4b2 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/AbstractSchemaRegionTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/AbstractSchemaRegionTest.java
@@ -58,7 +58,7 @@ public abstract class AbstractSchemaRegionTest {
new SchemaRegionTestParams("PBTree-NonMemory", "PBTree", 0));
}
- public AbstractSchemaRegionTest(SchemaRegionTestParams testParams) {
+ public AbstractSchemaRegionTest(final SchemaRegionTestParams testParams) {
this.testParams = testParams;
}
@@ -91,7 +91,8 @@ public abstract class AbstractSchemaRegionTest {
SchemaEngine.getInstance().init();
}
- protected ISchemaRegion getSchemaRegion(String database, int schemaRegionId)
throws Exception {
+ protected ISchemaRegion getSchemaRegion(final String database, final int
schemaRegionId)
+ throws Exception {
SchemaRegionId regionId = new SchemaRegionId(schemaRegionId);
if (SchemaEngine.getInstance().getSchemaRegion(regionId) == null) {
SchemaEngine.getInstance().createSchemaRegion(new PartialPath(database),
regionId);
@@ -99,7 +100,7 @@ public abstract class AbstractSchemaRegionTest {
return SchemaEngine.getInstance().getSchemaRegion(regionId);
}
- protected static class SchemaRegionTestParams {
+ public static class SchemaRegionTestParams {
private final String testModeName;
@@ -108,7 +109,7 @@ public abstract class AbstractSchemaRegionTest {
private final int cachedMNodeSize;
private SchemaRegionTestParams(
- String testModeName, String schemaEngineMode, int cachedMNodeSize) {
+ final String testModeName, final String schemaEngineMode, final int
cachedMNodeSize) {
this.testModeName = testModeName;
this.schemaEngineMode = schemaEngineMode;
this.cachedMNodeSize = cachedMNodeSize;
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionBasicTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionBasicTest.java
index 9e74729cab7..667ec4bc9b4 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionBasicTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionBasicTest.java
@@ -34,7 +34,11 @@ import
org.apache.iotdb.db.schemaengine.schemaregion.read.resp.info.IDeviceSchem
import
org.apache.iotdb.db.schemaengine.schemaregion.read.resp.info.ITimeSeriesSchemaInfo;
import
org.apache.iotdb.db.schemaengine.schemaregion.read.resp.info.impl.ShowDevicesResult;
import
org.apache.iotdb.db.schemaengine.schemaregion.read.resp.info.impl.ShowNodesResult;
+import
org.apache.iotdb.db.schemaengine.schemaregion.write.req.ICreateAlignedTimeSeriesPlan;
+import
org.apache.iotdb.db.schemaengine.schemaregion.write.req.ICreateTimeSeriesPlan;
import
org.apache.iotdb.db.schemaengine.schemaregion.write.req.SchemaRegionWritePlanFactory;
+import
org.apache.iotdb.db.schemaengine.schemaregion.write.req.impl.CreateAlignedTimeSeriesPlanImpl;
+import
org.apache.iotdb.db.schemaengine.schemaregion.write.req.impl.CreateTimeSeriesPlanImpl;
import org.apache.commons.lang3.StringUtils;
import org.apache.tsfile.enums.TSDataType;
@@ -45,6 +49,7 @@ import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -58,7 +63,8 @@ import java.util.stream.IntStream;
import static org.apache.iotdb.commons.schema.SchemaConstant.ALL_MATCH_PATTERN;
import static org.apache.iotdb.commons.schema.SchemaConstant.ALL_MATCH_SCOPE;
-import static
org.apache.iotdb.db.metadata.schemaRegion.SchemaRegionTestUtil.getAllTimeseriesCount;
+import static
org.apache.iotdb.db.metadata.schemaRegion.SchemaRegionTestUtil.checkSingleTimeSeries;
+import static
org.apache.iotdb.db.metadata.schemaRegion.SchemaRegionTestUtil.getAllTimeSeriesCount;
import static
org.apache.iotdb.db.metadata.schemaRegion.SchemaRegionTestUtil.getChildNodePathInNextLevel;
import static
org.apache.iotdb.db.metadata.schemaRegion.SchemaRegionTestUtil.getDevicesNum;
import static
org.apache.iotdb.db.metadata.schemaRegion.SchemaRegionTestUtil.getMeasurementCountGroupByLevel;
@@ -76,15 +82,15 @@ public class SchemaRegionBasicTest extends
AbstractSchemaRegionTest {
}
@Test
- @Ignore
- public void testFetchSchemaPerfomance() throws Exception {
+ @Ignore("This is just a performance test and shall not be run in auto test
environment")
+ public void testFetchSchemaPerformance() throws Exception {
System.out.println(testParams.getTestModeName());
final int deviceNum = 1000;
final int measurementNum = 40;
final ISchemaRegion schemaRegion = getSchemaRegion("root.sg", 0);
for (int i = 0; i < deviceNum; i++) {
for (int j = 0; j < measurementNum; j++) {
- schemaRegion.createTimeseries(
+ schemaRegion.createTimeSeries(
SchemaRegionWritePlanFactory.getCreateTimeSeriesPlan(
new PartialPath("root.sg.d" + i + ".s" + j),
TSDataType.BOOLEAN,
@@ -110,14 +116,14 @@ public class SchemaRegionBasicTest extends
AbstractSchemaRegionTest {
schemaRegion.fetchSeriesSchema(
patternTree, Collections.emptyMap(), false, false, true, false);
}
- System.out.println("cost time: " + (System.currentTimeMillis() -
startTime));
+ System.out.println("Cost time: " + (System.currentTimeMillis() -
startTime));
}
@Test
public void testFetchSchema() throws Exception {
final ISchemaRegion schemaRegion = getSchemaRegion("root.sg", 0);
- schemaRegion.createTimeseries(
+ schemaRegion.createTimeSeries(
SchemaRegionWritePlanFactory.getCreateTimeSeriesPlan(
new PartialPath("root.sg.wf01.wt01.status"),
TSDataType.BOOLEAN,
@@ -128,7 +134,7 @@ public class SchemaRegionBasicTest extends
AbstractSchemaRegionTest {
null,
null),
-1);
- schemaRegion.createTimeseries(
+ schemaRegion.createTimeSeries(
SchemaRegionWritePlanFactory.getCreateTimeSeriesPlan(
new PartialPath("root.sg.wf01.wt01.temperature"),
TSDataType.FLOAT,
@@ -229,7 +235,7 @@ public class SchemaRegionBasicTest extends
AbstractSchemaRegionTest {
}
@Test
- public void testCreateAlignedTimeseries() throws Exception {
+ public void testCreateAlignedTimeSeries() throws Exception {
final ISchemaRegion schemaRegion = getSchemaRegion("root.sg", 0);
schemaRegion.createAlignedTimeSeries(
SchemaRegionWritePlanFactory.getCreateAlignedTimeSeriesPlan(
@@ -249,10 +255,93 @@ public class SchemaRegionBasicTest extends
AbstractSchemaRegionTest {
Assert.assertTrue(checkRes.get(1) instanceof
MeasurementAlreadyExistException);
}
+ @Test
+ public void testCreateAlignedTimeSeriesWithMerge() throws Exception {
+ final ISchemaRegion schemaRegion = getSchemaRegion("root.sg", 0);
+
+ final Map<String, String> oldTagMap = Collections.singletonMap("tagK",
"tagV");
+ final Map<String, String> oldAttrMap = Collections.singletonMap("attrK1",
"attrV1");
+ schemaRegion.createAlignedTimeSeries(
+ SchemaRegionWritePlanFactory.getCreateAlignedTimeSeriesPlan(
+ new PartialPath("root.sg.wf02.wt01"),
+ Arrays.asList("temperature", "status"),
+ Arrays.asList(TSDataType.valueOf("FLOAT"),
TSDataType.valueOf("INT32")),
+ Arrays.asList(TSEncoding.valueOf("RLE"),
TSEncoding.valueOf("RLE")),
+ Arrays.asList(CompressionType.SNAPPY, CompressionType.SNAPPY),
+ null,
+ Arrays.asList(Collections.emptyMap(), oldTagMap),
+ Arrays.asList(Collections.emptyMap(), oldAttrMap)));
+
+ final Map<String, String> newTagMap = Collections.singletonMap("tagK",
"newTagV");
+ final Map<String, String> newAttrMap = Collections.singletonMap("attrK2",
"attrV2");
+ ICreateAlignedTimeSeriesPlan mergePlan =
+ SchemaRegionWritePlanFactory.getCreateAlignedTimeSeriesPlan(
+ new PartialPath("root.sg.wf02.wt01"),
+ // The lists must be mutable
+ new ArrayList<>(Arrays.asList("status", "height")),
+ new ArrayList<>(
+ Arrays.asList(TSDataType.valueOf("INT32"),
TSDataType.valueOf("INT64"))),
+ new ArrayList<>(
+ Arrays.asList(TSEncoding.valueOf("PLAIN"),
TSEncoding.valueOf("PLAIN"))),
+ new ArrayList<>(Arrays.asList(CompressionType.ZSTD,
CompressionType.GZIP)),
+ new ArrayList<>(Arrays.asList("alias2", null)),
+ new ArrayList<>(Arrays.asList(newTagMap, oldTagMap)),
+ new ArrayList<>(Arrays.asList(newAttrMap, oldAttrMap)));
+ ((CreateAlignedTimeSeriesPlanImpl) mergePlan).setWithMerge(true);
+ schemaRegion.createAlignedTimeSeries(mergePlan);
+
+ // The encoding and compressor won't be changed
+ // The alias/tags/attributes are updated
+
+ final Map<String, String> resultAttrMap = new HashMap<>(oldAttrMap);
+ resultAttrMap.putAll(newAttrMap);
+
+ checkSingleTimeSeries(
+ schemaRegion,
+ new PartialPath("root.sg.wf02.wt01.status"),
+ true,
+ TSDataType.INT32,
+ TSEncoding.RLE,
+ CompressionType.SNAPPY,
+ "alias2",
+ newTagMap,
+ resultAttrMap);
+
+ checkSingleTimeSeries(
+ schemaRegion,
+ new PartialPath("root.sg.wf02.wt01.height"),
+ true,
+ TSDataType.INT64,
+ TSEncoding.PLAIN,
+ CompressionType.GZIP,
+ null,
+ oldTagMap,
+ oldAttrMap);
+
+ // Test illegal plan
+ try {
+ mergePlan =
+ SchemaRegionWritePlanFactory.getCreateAlignedTimeSeriesPlan(
+ new PartialPath("root.sg.wf02.wt01"),
+ Collections.singletonList("temperature"),
+ Collections.singletonList(TSDataType.valueOf("BOOLEAN")),
+ Collections.singletonList(TSEncoding.valueOf("PLAIN")),
+ Collections.singletonList(CompressionType.ZSTD),
+ Collections.singletonList("alias2"),
+ Collections.singletonList(newTagMap),
+ Collections.singletonList(newAttrMap));
+ ((CreateAlignedTimeSeriesPlanImpl) mergePlan).setWithMerge(true);
+ schemaRegion.createAlignedTimeSeries(mergePlan);
+ Assert.fail("Create aligned time series with merge shall fail if the
types are different");
+ } catch (final MeasurementAlreadyExistException e) {
+ // Success
+ }
+ }
+
@Test
public void testCheckMeasurementExistence() throws Exception {
final ISchemaRegion schemaRegion = getSchemaRegion("root.sg", 0);
- schemaRegion.createTimeseries(
+ schemaRegion.createTimeSeries(
SchemaRegionWritePlanFactory.getCreateTimeSeriesPlan(
new PartialPath("root.sg.wf01.wt01.status"),
TSDataType.BOOLEAN,
@@ -263,7 +352,7 @@ public class SchemaRegionBasicTest extends
AbstractSchemaRegionTest {
null,
null),
-1);
- schemaRegion.createTimeseries(
+ schemaRegion.createTimeSeries(
SchemaRegionWritePlanFactory.getCreateTimeSeriesPlan(
new PartialPath("root.sg.wf01.wt01.v1.s1"),
TSDataType.BOOLEAN,
@@ -274,7 +363,7 @@ public class SchemaRegionBasicTest extends
AbstractSchemaRegionTest {
null,
null),
-1);
- schemaRegion.createTimeseries(
+ schemaRegion.createTimeSeries(
SchemaRegionWritePlanFactory.getCreateTimeSeriesPlan(
new PartialPath("root.sg.wf01.wt01.temperature"),
TSDataType.FLOAT,
@@ -320,15 +409,85 @@ public class SchemaRegionBasicTest extends
AbstractSchemaRegionTest {
Assert.assertTrue(res3.get(2) instanceof PathAlreadyExistException);
}
+ @Test
+ public void testCreateTimeSeriesWithMerge() throws Exception {
+ final ISchemaRegion schemaRegion = getSchemaRegion("root.sg", 0);
+
+ final Map<String, String> oldTagMap = Collections.singletonMap("tagK",
"tagV");
+ final Map<String, String> oldAttrMap = Collections.singletonMap("attrK1",
"attrV1");
+ schemaRegion.createTimeSeries(
+ SchemaRegionWritePlanFactory.getCreateTimeSeriesPlan(
+ new PartialPath("root.sg.wf01.wt01.v1.s1"),
+ TSDataType.BOOLEAN,
+ TSEncoding.PLAIN,
+ CompressionType.SNAPPY,
+ null,
+ oldTagMap,
+ oldAttrMap,
+ null),
+ -1);
+
+ final Map<String, String> newTagMap = Collections.singletonMap("tagK",
"newTagV");
+ final Map<String, String> newAttrMap = Collections.singletonMap("attrK2",
"attrV2");
+ ICreateTimeSeriesPlan mergePlan =
+ SchemaRegionWritePlanFactory.getCreateTimeSeriesPlan(
+ new PartialPath("root.sg.wf01.wt01.v1.s1"),
+ TSDataType.BOOLEAN,
+ TSEncoding.RLE,
+ CompressionType.ZSTD,
+ null,
+ newTagMap,
+ newAttrMap,
+ "alias2");
+ ((CreateTimeSeriesPlanImpl) mergePlan).setWithMerge(true);
+ schemaRegion.createTimeSeries(mergePlan, -1);
+
+ // The encoding and compressor won't be changed
+ // The alias/tags/attributes are updated
+
+ final Map<String, String> resultAttrMap = new HashMap<>(oldAttrMap);
+ resultAttrMap.putAll(newAttrMap);
+
+ checkSingleTimeSeries(
+ schemaRegion,
+ new PartialPath("root.sg.wf01.wt01.v1.s1"),
+ false,
+ TSDataType.BOOLEAN,
+ TSEncoding.PLAIN,
+ CompressionType.SNAPPY,
+ "alias2",
+ newTagMap,
+ resultAttrMap);
+
+ // Test illegal plan
+ try {
+ mergePlan =
+ SchemaRegionWritePlanFactory.getCreateTimeSeriesPlan(
+ new PartialPath("root.sg.wf01.wt01.v1.s1"),
+ TSDataType.INT64,
+ TSEncoding.PLAIN,
+ CompressionType.ZSTD,
+ null,
+ oldTagMap,
+ oldAttrMap,
+ null);
+ ((CreateTimeSeriesPlanImpl) mergePlan).setWithMerge(true);
+ schemaRegion.createTimeSeries(mergePlan, -1);
+ Assert.fail("Create time series with merge shall fail if the types are
different");
+ } catch (final MeasurementAlreadyExistException e) {
+ // Success
+ }
+ }
+
/**
* Test {@link ISchemaRegion#constructSchemaBlackList}, {@link
* ISchemaRegion#rollbackSchemaBlackList}, {@link
ISchemaRegion#fetchSchemaBlackList} and{@link
* ISchemaRegion#deleteTimeseriesInBlackList}
*/
@Test
- public void testDeleteTimeseries() throws Exception {
+ public void testDeleteTimeSeries() throws Exception {
final ISchemaRegion schemaRegion = getSchemaRegion("root.sg", 0);
- schemaRegion.createTimeseries(
+ schemaRegion.createTimeSeries(
SchemaRegionWritePlanFactory.getCreateTimeSeriesPlan(
new PartialPath("root.sg.wf01.wt01.status"),
TSDataType.BOOLEAN,
@@ -339,7 +498,7 @@ public class SchemaRegionBasicTest extends
AbstractSchemaRegionTest {
null,
null),
-1);
- schemaRegion.createTimeseries(
+ schemaRegion.createTimeSeries(
SchemaRegionWritePlanFactory.getCreateTimeSeriesPlan(
new PartialPath("root.sg.wf01.wt02.status"),
TSDataType.BOOLEAN,
@@ -350,7 +509,7 @@ public class SchemaRegionBasicTest extends
AbstractSchemaRegionTest {
null,
null),
-1);
- schemaRegion.createTimeseries(
+ schemaRegion.createTimeSeries(
SchemaRegionWritePlanFactory.getCreateTimeSeriesPlan(
new PartialPath("root.sg.wf01.wt01.temperature"),
TSDataType.FLOAT,
@@ -361,7 +520,7 @@ public class SchemaRegionBasicTest extends
AbstractSchemaRegionTest {
null,
null),
-1);
- schemaRegion.createTimeseries(
+ schemaRegion.createTimeSeries(
SchemaRegionWritePlanFactory.getCreateTimeSeriesPlan(
new PartialPath("root.sg.wf02.wt01.temperature"),
TSDataType.FLOAT,
@@ -411,7 +570,7 @@ public class SchemaRegionBasicTest extends
AbstractSchemaRegionTest {
public void testGetAllTimeSeriesCount() throws Exception {
final ISchemaRegion schemaRegion = getSchemaRegion("root.laptop", 0);
- SchemaRegionTestUtil.createSimpleTimeseriesByList(
+ SchemaRegionTestUtil.createSimpleTimeSeriesByList(
schemaRegion,
Arrays.asList(
"root.laptop.d0",
@@ -423,46 +582,46 @@ public class SchemaRegionBasicTest extends
AbstractSchemaRegionTest {
// for Non prefix matched path
Assert.assertEquals(
- 6, getAllTimeseriesCount(schemaRegion, new PartialPath("root.**"),
null, false));
+ 6, getAllTimeSeriesCount(schemaRegion, new PartialPath("root.**"),
null, false));
Assert.assertEquals(
- 6, getAllTimeseriesCount(schemaRegion, new
PartialPath("root.laptop.**"), null, false));
+ 6, getAllTimeSeriesCount(schemaRegion, new
PartialPath("root.laptop.**"), null, false));
Assert.assertEquals(
- 1, getAllTimeseriesCount(schemaRegion, new
PartialPath("root.laptop.*"), null, false));
+ 1, getAllTimeSeriesCount(schemaRegion, new
PartialPath("root.laptop.*"), null, false));
Assert.assertEquals(
- 4, getAllTimeseriesCount(schemaRegion, new
PartialPath("root.laptop.*.*"), null, false));
+ 4, getAllTimeSeriesCount(schemaRegion, new
PartialPath("root.laptop.*.*"), null, false));
Assert.assertEquals(
- 5, getAllTimeseriesCount(schemaRegion, new
PartialPath("root.laptop.*.**"), null, false));
+ 5, getAllTimeSeriesCount(schemaRegion, new
PartialPath("root.laptop.*.**"), null, false));
Assert.assertEquals(
- 1, getAllTimeseriesCount(schemaRegion, new
PartialPath("root.laptop.*.*.t1"), null, false));
+ 1, getAllTimeSeriesCount(schemaRegion, new
PartialPath("root.laptop.*.*.t1"), null, false));
Assert.assertEquals(
- 2, getAllTimeseriesCount(schemaRegion, new
PartialPath("root.laptop.*.s1"), null, false));
+ 2, getAllTimeSeriesCount(schemaRegion, new
PartialPath("root.laptop.*.s1"), null, false));
Assert.assertEquals(
- 3, getAllTimeseriesCount(schemaRegion, new
PartialPath("root.laptop.d1.**"), null, false));
+ 3, getAllTimeSeriesCount(schemaRegion, new
PartialPath("root.laptop.d1.**"), null, false));
Assert.assertEquals(
- 2, getAllTimeseriesCount(schemaRegion, new
PartialPath("root.laptop.d1.*"), null, false));
+ 2, getAllTimeSeriesCount(schemaRegion, new
PartialPath("root.laptop.d1.*"), null, false));
Assert.assertEquals(
- 1, getAllTimeseriesCount(schemaRegion, new
PartialPath("root.laptop.d2.s1"), null, false));
+ 1, getAllTimeSeriesCount(schemaRegion, new
PartialPath("root.laptop.d2.s1"), null, false));
Assert.assertEquals(
- 2, getAllTimeseriesCount(schemaRegion, new
PartialPath("root.laptop.d2.**"), null, false));
+ 2, getAllTimeSeriesCount(schemaRegion, new
PartialPath("root.laptop.d2.**"), null, false));
Assert.assertEquals(
- 0, getAllTimeseriesCount(schemaRegion, new PartialPath("root.laptop"),
null, false));
+ 0, getAllTimeSeriesCount(schemaRegion, new PartialPath("root.laptop"),
null, false));
Assert.assertEquals(
- 0, getAllTimeseriesCount(schemaRegion, new
PartialPath("root.laptop.d3.s1"), null, false));
+ 0, getAllTimeSeriesCount(schemaRegion, new
PartialPath("root.laptop.d3.s1"), null, false));
// for prefix matched path
Assert.assertEquals(
- 6, getAllTimeseriesCount(schemaRegion, new PartialPath("root"), null,
true));
+ 6, getAllTimeSeriesCount(schemaRegion, new PartialPath("root"), null,
true));
Assert.assertEquals(
- 6, getAllTimeseriesCount(schemaRegion, new PartialPath("root.laptop"),
null, true));
+ 6, getAllTimeSeriesCount(schemaRegion, new PartialPath("root.laptop"),
null, true));
Assert.assertEquals(
- 2, getAllTimeseriesCount(schemaRegion, new
PartialPath("root.laptop.d2"), null, true));
+ 2, getAllTimeSeriesCount(schemaRegion, new
PartialPath("root.laptop.d2"), null, true));
}
@Test
public void testGetMeasurementCountGroupByLevel() throws Exception {
final ISchemaRegion schemaRegion = getSchemaRegion("root.laptop", 0);
- SchemaRegionTestUtil.createSimpleTimeseriesByList(
+ SchemaRegionTestUtil.createSimpleTimeSeriesByList(
schemaRegion,
Arrays.asList(
"root.laptop.d0",
@@ -559,7 +718,7 @@ public class SchemaRegionBasicTest extends
AbstractSchemaRegionTest {
public void testGetDevicesNum() throws Exception {
final ISchemaRegion schemaRegion = getSchemaRegion("root.laptop", 0);
- SchemaRegionTestUtil.createSimpleTimeseriesByList(
+ SchemaRegionTestUtil.createSimpleTimeSeriesByList(
schemaRegion,
Arrays.asList(
"root.laptop.d0",
@@ -592,7 +751,7 @@ public class SchemaRegionBasicTest extends
AbstractSchemaRegionTest {
public void testGetNodesListInGivenLevel() throws Exception {
final ISchemaRegion schemaRegion = getSchemaRegion("root.laptop", 0);
- SchemaRegionTestUtil.createSimpleTimeseriesByList(
+ SchemaRegionTestUtil.createSimpleTimeSeriesByList(
schemaRegion,
Arrays.asList(
"root.laptop.d0",
@@ -647,7 +806,7 @@ public class SchemaRegionBasicTest extends
AbstractSchemaRegionTest {
public void testGetChildNodePathInNextLevel() throws Exception {
final ISchemaRegion schemaRegion = getSchemaRegion("root.laptop", 0);
- SchemaRegionTestUtil.createSimpleTimeseriesByList(
+ SchemaRegionTestUtil.createSimpleTimeSeriesByList(
schemaRegion,
Arrays.asList(
"root.laptop.d0",
@@ -695,7 +854,7 @@ public class SchemaRegionBasicTest extends
AbstractSchemaRegionTest {
public void testGetMatchedDevices() throws Exception {
final ISchemaRegion schemaRegion = getSchemaRegion("root.laptop", 0);
- SchemaRegionTestUtil.createSimpleTimeseriesByList(
+ SchemaRegionTestUtil.createSimpleTimeSeriesByList(
schemaRegion,
Arrays.asList(
"root.laptop.d0",
@@ -814,7 +973,7 @@ public class SchemaRegionBasicTest extends
AbstractSchemaRegionTest {
public void testShowTimeseries() throws Exception {
final ISchemaRegion schemaRegion = getSchemaRegion("root.laptop", 0);
- SchemaRegionTestUtil.createSimpleTimeseriesByList(
+ SchemaRegionTestUtil.createSimpleTimeSeriesByList(
schemaRegion,
Arrays.asList(
"root.laptop.d0",
@@ -995,7 +1154,7 @@ public class SchemaRegionBasicTest extends
AbstractSchemaRegionTest {
public void testGetMatchedDevicesWithSpecialPattern() throws Exception {
final ISchemaRegion schemaRegion = getSchemaRegion("root.test", 0);
- SchemaRegionTestUtil.createSimpleTimeseriesByList(
+ SchemaRegionTestUtil.createSimpleTimeSeriesByList(
schemaRegion,
Arrays.asList("root.test.d1.s", "root.test.dac.device1.s",
"root.test.dac.device1.d1.s"));
@@ -1030,7 +1189,7 @@ public class SchemaRegionBasicTest extends
AbstractSchemaRegionTest {
public void testGetMatchedDevicesWithSpecialPattern2() throws Exception {
final ISchemaRegion schemaRegion = getSchemaRegion("root.test", 0);
- SchemaRegionTestUtil.createSimpleTimeseriesByList(
+ SchemaRegionTestUtil.createSimpleTimeSeriesByList(
schemaRegion,
Arrays.asList(
"root.test.abc57.bcde22.def89.efg1",
@@ -1050,7 +1209,7 @@ public class SchemaRegionBasicTest extends
AbstractSchemaRegionTest {
final Set<IDeviceSchemaInfo> actualHashset = new HashSet<>(actualResult);
Assert.assertEquals(expectedHashset, actualHashset);
- // case2: show timeseries root.**.*e*.*e*
+ // case2: show time series root.**.*e*.*e*
List<ITimeSeriesSchemaInfo> result =
SchemaRegionTestUtil.showTimeseries(schemaRegion, new
PartialPath("root.**.*e*.*e*"));
final Set<String> expectedPathList =
@@ -1068,7 +1227,7 @@ public class SchemaRegionBasicTest extends
AbstractSchemaRegionTest {
}
Assert.assertEquals(expectedPathList, actualPathList);
- // case3: show timeseries root.**.*e*
+ // case3: show time series root.**.*e*
result = SchemaRegionTestUtil.showTimeseries(schemaRegion, new
PartialPath("root.**.*e*"));
Assert.assertEquals(expectedSize, result.size());
actualPathList = new HashSet<>();
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionManagementTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionManagementTest.java
index 50c89a06688..d0278711219 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionManagementTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionManagementTest.java
@@ -71,7 +71,7 @@ public class SchemaRegionManagementTest extends
AbstractSchemaRegionTest {
Map<String, String> tags = new HashMap<>();
tags.put("tag-key", "tag-value");
- schemaRegion.createTimeseries(
+ schemaRegion.createTimeSeries(
SchemaRegionWritePlanFactory.getCreateTimeSeriesPlan(
new PartialPath("root.sg.d1.s1"),
TSDataType.INT32,
@@ -204,7 +204,7 @@ public class SchemaRegionManagementTest extends
AbstractSchemaRegionTest {
long time = System.currentTimeMillis();
for (int i = 0; i < 1000; i++) {
for (int j = 0; j < 1000; j++) {
- schemaRegion.createTimeseries(
+ schemaRegion.createTimeSeries(
SchemaRegionWritePlanFactory.getCreateTimeSeriesPlan(
new PartialPath("root.sg.d" + i + ".s" + j),
TSDataType.INT32,
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionSimpleRecoverTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionSimpleRecoverTest.java
index 0811e3c35ef..ab09314cb5d 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionSimpleRecoverTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionSimpleRecoverTest.java
@@ -24,7 +24,11 @@ import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.common.schematree.ClusterSchemaTree;
import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion;
+import
org.apache.iotdb.db.schemaengine.schemaregion.write.req.ICreateAlignedTimeSeriesPlan;
+import
org.apache.iotdb.db.schemaengine.schemaregion.write.req.ICreateTimeSeriesPlan;
import
org.apache.iotdb.db.schemaengine.schemaregion.write.req.SchemaRegionWritePlanFactory;
+import
org.apache.iotdb.db.schemaengine.schemaregion.write.req.impl.CreateAlignedTimeSeriesPlanImpl;
+import
org.apache.iotdb.db.schemaengine.schemaregion.write.req.impl.CreateTimeSeriesPlanImpl;
import org.apache.iotdb.db.schemaengine.template.Template;
import org.apache.tsfile.enums.TSDataType;
@@ -35,16 +39,20 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import static org.apache.iotdb.commons.schema.SchemaConstant.ALL_MATCH_SCOPE;
+import static
org.apache.iotdb.db.metadata.schemaRegion.SchemaRegionTestUtil.checkSingleTimeSeries;
public class SchemaRegionSimpleRecoverTest extends AbstractSchemaRegionTest {
private String schemaRegionConsensusProtocolClass;
- public SchemaRegionSimpleRecoverTest(SchemaRegionTestParams testParams) {
+ public SchemaRegionSimpleRecoverTest(final SchemaRegionTestParams
testParams) {
super(testParams);
}
@@ -69,8 +77,8 @@ public class SchemaRegionSimpleRecoverTest extends
AbstractSchemaRegionTest {
@Test
public void testRecoverWithAlignedTemplate() throws Exception {
ISchemaRegion schemaRegion = getSchemaRegion("root.sg", 0);
- int templateId = 1;
- Template template =
+ final int templateId = 1;
+ final Template template =
new Template(
"t1",
Arrays.asList("s1", "s2"),
@@ -105,4 +113,125 @@ public class SchemaRegionSimpleRecoverTest extends
AbstractSchemaRegionTest {
false);
Assert.assertTrue(schemaTree.getAllDevices().get(0).isAligned());
}
+
+ @Test
+ public void testRecoverAfterCreateAlignedTimeSeriesWithMerge() throws
Exception {
+ ISchemaRegion schemaRegion = getSchemaRegion("root.sg", 0);
+
+ final Map<String, String> oldTagMap = Collections.singletonMap("tagK",
"tagV");
+ final Map<String, String> oldAttrMap = Collections.singletonMap("attrK1",
"attrV1");
+ schemaRegion.createAlignedTimeSeries(
+ SchemaRegionWritePlanFactory.getCreateAlignedTimeSeriesPlan(
+ new PartialPath("root.sg.wf02.wt01"),
+ Arrays.asList("temperature", "status"),
+ Arrays.asList(TSDataType.valueOf("FLOAT"),
TSDataType.valueOf("INT32")),
+ Arrays.asList(TSEncoding.valueOf("RLE"),
TSEncoding.valueOf("RLE")),
+ Arrays.asList(CompressionType.SNAPPY, CompressionType.SNAPPY),
+ null,
+ Arrays.asList(Collections.emptyMap(), oldTagMap),
+ Arrays.asList(Collections.emptyMap(), oldAttrMap)));
+
+ final Map<String, String> newTagMap = Collections.singletonMap("tagK",
"newTagV");
+ final Map<String, String> newAttrMap = Collections.singletonMap("attrK2",
"attrV2");
+ final ICreateAlignedTimeSeriesPlan mergePlan =
+ SchemaRegionWritePlanFactory.getCreateAlignedTimeSeriesPlan(
+ new PartialPath("root.sg.wf02.wt01"),
+ // The lists must be mutable
+ new ArrayList<>(Arrays.asList("status", "height")),
+ new ArrayList<>(
+ Arrays.asList(TSDataType.valueOf("INT32"),
TSDataType.valueOf("INT64"))),
+ new ArrayList<>(
+ Arrays.asList(TSEncoding.valueOf("PLAIN"),
TSEncoding.valueOf("PLAIN"))),
+ new ArrayList<>(Arrays.asList(CompressionType.ZSTD,
CompressionType.GZIP)),
+ new ArrayList<>(Arrays.asList("alias2", null)),
+ new ArrayList<>(Arrays.asList(newTagMap, oldTagMap)),
+ new ArrayList<>(Arrays.asList(newAttrMap, oldAttrMap)));
+ ((CreateAlignedTimeSeriesPlanImpl) mergePlan).setWithMerge(true);
+ schemaRegion.createAlignedTimeSeries(mergePlan);
+
+ simulateRestart();
+ schemaRegion = getSchemaRegion("root.sg", 0);
+
+ // The encoding and compressor won't be changed
+ // The alias/tags/attributes are updated
+
+ final Map<String, String> resultAttrMap = new HashMap<>(oldAttrMap);
+ resultAttrMap.putAll(newAttrMap);
+
+ checkSingleTimeSeries(
+ schemaRegion,
+ new PartialPath("root.sg.wf02.wt01.status"),
+ true,
+ TSDataType.INT32,
+ TSEncoding.RLE,
+ CompressionType.SNAPPY,
+ "alias2",
+ newTagMap,
+ resultAttrMap);
+
+ checkSingleTimeSeries(
+ schemaRegion,
+ new PartialPath("root.sg.wf02.wt01.height"),
+ true,
+ TSDataType.INT64,
+ TSEncoding.PLAIN,
+ CompressionType.GZIP,
+ null,
+ oldTagMap,
+ oldAttrMap);
+ }
+
+ @Test
+ public void testRecoverAfterCreateTimeSeriesWithMerge() throws Exception {
+ ISchemaRegion schemaRegion = getSchemaRegion("root.sg", 0);
+
+ final Map<String, String> oldTagMap = Collections.singletonMap("tagK",
"tagV");
+ final Map<String, String> oldAttrMap = Collections.singletonMap("attrK1",
"attrV1");
+ schemaRegion.createTimeSeries(
+ SchemaRegionWritePlanFactory.getCreateTimeSeriesPlan(
+ new PartialPath("root.sg.wf01.wt01.v1.s1"),
+ TSDataType.BOOLEAN,
+ TSEncoding.PLAIN,
+ CompressionType.SNAPPY,
+ null,
+ oldTagMap,
+ oldAttrMap,
+ null),
+ -1);
+
+ final Map<String, String> newTagMap = Collections.singletonMap("tagK",
"newTagV");
+ final Map<String, String> newAttrMap = Collections.singletonMap("attrK2",
"attrV2");
+ final ICreateTimeSeriesPlan mergePlan =
+ SchemaRegionWritePlanFactory.getCreateTimeSeriesPlan(
+ new PartialPath("root.sg.wf01.wt01.v1.s1"),
+ TSDataType.BOOLEAN,
+ TSEncoding.RLE,
+ CompressionType.ZSTD,
+ null,
+ newTagMap,
+ newAttrMap,
+ "alias2");
+ ((CreateTimeSeriesPlanImpl) mergePlan).setWithMerge(true);
+ schemaRegion.createTimeSeries(mergePlan, -1);
+
+ simulateRestart();
+ schemaRegion = getSchemaRegion("root.sg", 0);
+
+ // The encoding and compressor won't be changed
+ // The alias/tags/attributes are updated
+
+ final Map<String, String> resultAttrMap = new HashMap<>(oldAttrMap);
+ resultAttrMap.putAll(newAttrMap);
+
+ checkSingleTimeSeries(
+ schemaRegion,
+ new PartialPath("root.sg.wf01.wt01.v1.s1"),
+ false,
+ TSDataType.BOOLEAN,
+ TSEncoding.PLAIN,
+ CompressionType.SNAPPY,
+ "alias2",
+ newTagMap,
+ resultAttrMap);
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionTemplateTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionTemplateTest.java
index b1efefbd917..4da6ce191e1 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionTemplateTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionTemplateTest.java
@@ -193,7 +193,7 @@ public class SchemaRegionTemplateTest extends
AbstractSchemaRegionTest {
@Test
public void testFetchSchemaWithTemplate() throws Exception {
ISchemaRegion schemaRegion = getSchemaRegion("root.sg", 0);
- SchemaRegionTestUtil.createSimpleTimeseriesByList(
+ SchemaRegionTestUtil.createSimpleTimeSeriesByList(
schemaRegion, Arrays.asList("root.sg.wf01.wt01.status",
"root.sg.wf01.wt01.temperature"));
int templateId = 1;
Template template =
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionTestUtil.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionTestUtil.java
index 528335009b9..3bf1a7c1a00 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionTestUtil.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionTestUtil.java
@@ -38,6 +38,7 @@ import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.utils.Pair;
+import org.junit.Assert;
import java.util.ArrayList;
import java.util.Arrays;
@@ -64,7 +65,7 @@ public class SchemaRegionTestUtil {
Map<String, String> attributes,
String alias)
throws MetadataException {
- schemaRegion.createTimeseries(
+ schemaRegion.createTimeSeries(
SchemaRegionWritePlanFactory.getCreateTimeSeriesPlan(
new PartialPath(fullPath),
dataType,
@@ -89,7 +90,7 @@ public class SchemaRegionTestUtil {
List<String> alias)
throws MetadataException {
for (int i = 0; i < fullPaths.size(); i++) {
- schemaRegion.createTimeseries(
+ schemaRegion.createTimeSeries(
SchemaRegionWritePlanFactory.getCreateTimeSeriesPlan(
new PartialPath(fullPaths.get(i)),
dataTypes.get(i),
@@ -147,19 +148,19 @@ public class SchemaRegionTestUtil {
}
/**
- * Create timeseries quickly using createSimpleTimeSeriesInt64 with given
string list of paths.
+ * Create time series quickly using createSimpleTimeSeriesInt64 with given
string list of paths.
*
- * @param schemaRegion schemaRegion which you want to create timeseries
+ * @param schemaRegion schemaRegion which you want to create time series
* @param pathList
*/
- public static void createSimpleTimeseriesByList(ISchemaRegion schemaRegion,
List<String> pathList)
+ public static void createSimpleTimeSeriesByList(ISchemaRegion schemaRegion,
List<String> pathList)
throws Exception {
for (String path : pathList) {
SchemaRegionTestUtil.createSimpleTimeSeriesInt64(schemaRegion, path);
}
}
- public static long getAllTimeseriesCount(
+ public static long getAllTimeSeriesCount(
ISchemaRegion schemaRegion,
PartialPath pathPattern,
Map<Integer, Template> templateMap,
@@ -179,6 +180,34 @@ public class SchemaRegionTestUtil {
}
}
+ public static void checkSingleTimeSeries(
+ final ISchemaRegion schemaRegion,
+ final PartialPath pathPattern,
+ final boolean isAligned,
+ final TSDataType type,
+ final TSEncoding encoding,
+ final CompressionType compressor,
+ final String alias,
+ final Map<String, String> tags,
+ final Map<String, String> attributes) {
+ try (final ISchemaReader<ITimeSeriesSchemaInfo> timeSeriesReader =
+ schemaRegion.getTimeSeriesReader(
+ SchemaRegionReadPlanFactory.getShowTimeSeriesPlan(
+ pathPattern, Collections.emptyMap(), 0, 0, false, null, false,
ALL_MATCH_SCOPE))) {
+ Assert.assertTrue(timeSeriesReader.hasNext());
+ final ITimeSeriesSchemaInfo info = timeSeriesReader.next();
+ Assert.assertEquals(isAligned, info.isUnderAlignedDevice());
+ Assert.assertEquals(type, info.getSchema().getType());
+ Assert.assertEquals(encoding, info.getSchema().getEncodingType());
+ Assert.assertEquals(compressor, info.getSchema().getCompressor());
+ Assert.assertEquals(alias, info.getAlias());
+ Assert.assertEquals(tags, info.getTags());
+ Assert.assertEquals(attributes, info.getAttributes());
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
public static long getDevicesNum(
ISchemaRegion schemaRegion, PartialPath pathPattern, boolean
isPrefixMatch) {
try (ISchemaReader<IDeviceSchemaInfo> deviceReader =
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaStatisticsTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaStatisticsTest.java
index 14bb5d726a1..04643a88c41 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaStatisticsTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaStatisticsTest.java
@@ -60,9 +60,9 @@ public class SchemaStatisticsTest extends
AbstractSchemaRegionTest {
final ISchemaEngineStatistics engineStatistics =
SchemaEngine.getInstance().getSchemaEngineStatistics();
- SchemaRegionTestUtil.createSimpleTimeseriesByList(
+ SchemaRegionTestUtil.createSimpleTimeSeriesByList(
schemaRegion1, Arrays.asList("root.sg1.n.s0", "root.sg1.n.v.d1.s1",
"root.sg1.n.v.d2.s2"));
- SchemaRegionTestUtil.createSimpleTimeseriesByList(
+ SchemaRegionTestUtil.createSimpleTimeSeriesByList(
schemaRegion1, Arrays.asList("root.sg1.d0.s0"));
final PathPatternTree patternTree = new PathPatternTree();
patternTree.appendPathPattern(new PartialPath("root.**.s1"));
@@ -105,9 +105,9 @@ public class SchemaStatisticsTest extends
AbstractSchemaRegionTest {
final ISchemaEngineStatistics engineStatistics =
SchemaEngine.getInstance().getSchemaEngineStatistics();
- SchemaRegionTestUtil.createSimpleTimeseriesByList(
+ SchemaRegionTestUtil.createSimpleTimeSeriesByList(
schemaRegion1, Arrays.asList("root.sg1.v.d0", "root.sg1.d1.v.s1",
"root.sg1.d1.s2.v.t1"));
- SchemaRegionTestUtil.createSimpleTimeseriesByList(
+ SchemaRegionTestUtil.createSimpleTimeSeriesByList(
schemaRegion2, Arrays.asList("root.sg2.d1.v.s3", "root.sg2.d2.v.s1",
"root.sg2.d2.v.s2"));
final PathPatternTree patternTree = new PathPatternTree();
patternTree.appendPathPattern(new PartialPath("root.**.s1"));
@@ -215,9 +215,9 @@ public class SchemaStatisticsTest extends
AbstractSchemaRegionTest {
final ISchemaEngineStatistics engineStatistics =
SchemaEngine.getInstance().getSchemaEngineStatistics();
- SchemaRegionTestUtil.createSimpleTimeseriesByList(
+ SchemaRegionTestUtil.createSimpleTimeSeriesByList(
schemaRegion1, Arrays.asList("root.sg1.d0", "root.sg1.d1.s1",
"root.sg1.d1.s2.t1"));
- SchemaRegionTestUtil.createSimpleTimeseriesByList(
+ SchemaRegionTestUtil.createSimpleTimeSeriesByList(
schemaRegion2, Arrays.asList("root.sg2.d1.s3", "root.sg2.d2.s1",
"root.sg2.d2.s2"));
final PathPatternTree patternTree = new PathPatternTree();
patternTree.appendPathPattern(new PartialPath("root.**.s1"));
@@ -350,9 +350,9 @@ public class SchemaStatisticsTest extends
AbstractSchemaRegionTest {
final ISchemaEngineStatistics engineStatistics =
SchemaEngine.getInstance().getSchemaEngineStatistics();
- SchemaRegionTestUtil.createSimpleTimeseriesByList(
+ SchemaRegionTestUtil.createSimpleTimeSeriesByList(
schemaRegion1, Arrays.asList("root.sg1.d0", "root.sg1.d1.s1",
"root.sg1.d1.s2.t1"));
- SchemaRegionTestUtil.createSimpleTimeseriesByList(
+ SchemaRegionTestUtil.createSimpleTimeSeriesByList(
schemaRegion2, Arrays.asList("root.sg2.d1.s3", "root.sg2.d2.s1",
"root.sg2.d2.s2"));
final PathPatternTree patternTree = new PathPatternTree();
patternTree.appendPathPattern(new PartialPath("root.**.s1"));
@@ -375,11 +375,11 @@ public class SchemaStatisticsTest extends
AbstractSchemaRegionTest {
final ISchemaEngineStatistics engineStatistics =
SchemaEngine.getInstance().getSchemaEngineStatistics();
- SchemaRegionTestUtil.createSimpleTimeseriesByList(
+ SchemaRegionTestUtil.createSimpleTimeSeriesByList(
schemaRegion1, Arrays.asList("root.sg1.d0", "root.sg1.d1.s1",
"root.sg1.d1.s2.t1"));
- SchemaRegionTestUtil.createSimpleTimeseriesByList(
+ SchemaRegionTestUtil.createSimpleTimeSeriesByList(
schemaRegion2, Arrays.asList("root.sg2.d1.s3", "root.sg2.d2.s1",
"root.sg2.d2.s2"));
- SchemaRegionTestUtil.createSimpleTimeseriesByList(
+ SchemaRegionTestUtil.createSimpleTimeSeriesByList(
schemaRegion2, Collections.singletonList("root.sg2.s1"));
// Check device number
Assert.assertEquals(3,
schemaRegion1.getSchemaRegionStatistics().getDevicesNumber());
@@ -408,9 +408,9 @@ public class SchemaStatisticsTest extends
AbstractSchemaRegionTest {
SchemaEngine.getInstance()
.getSchemaEngineStatistics()
.getAsCachedSchemaEngineStatistics();
- SchemaRegionTestUtil.createSimpleTimeseriesByList(
+ SchemaRegionTestUtil.createSimpleTimeSeriesByList(
schemaRegion1, Arrays.asList("root.sg1.d0", "root.sg1.d1.s1",
"root.sg1.d1.s2.t1"));
- SchemaRegionTestUtil.createSimpleTimeseriesByList(
+ SchemaRegionTestUtil.createSimpleTimeSeriesByList(
schemaRegion2, Arrays.asList("root.sg2.d1.s3", "root.sg2.d2.s1",
"root.sg2.d2.s2"));
final PathPatternTree patternTree = new PathPatternTree();
patternTree.appendPathPattern(new PartialPath("root.**.s1"));
@@ -468,7 +468,7 @@ public class SchemaStatisticsTest extends
AbstractSchemaRegionTest {
SchemaEngine.getInstance().getSchemaEngineStatistics();
ISchemaRegion schemaRegion1 = getSchemaRegion("root.sg1", 0);
ISchemaRegion schemaRegion2 = getSchemaRegion("root.sg2", 1);
- schemaRegion1.createTimeseries(
+ schemaRegion1.createTimeSeries(
SchemaRegionWritePlanFactory.getCreateTimeSeriesPlan(
new PartialPath("root.sg.wf01.wt01.status"),
TSDataType.BOOLEAN,
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/SchemaRegionListeningQueueTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/SchemaRegionListeningQueueTest.java
index daafda17c5b..b9e1972b0b1 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/SchemaRegionListeningQueueTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/SchemaRegionListeningQueueTest.java
@@ -55,6 +55,8 @@ public class SchemaRegionListeningQueueTest {
if (!snapshotDir.exists()) {
snapshotDir.mkdirs();
}
+ PipeDataNodeAgent.runtime().schemaListener(new SchemaRegionId(0)).open();
+ PipeDataNodeAgent.runtime().notifySchemaLeaderReady(new SchemaRegionId(0));
}
@AfterClass
@@ -67,10 +69,7 @@ public class SchemaRegionListeningQueueTest {
@Test
public void testSnapshot() throws TException, IOException, AuthException,
IllegalPathException {
- PipeDataNodeAgent.runtime().schemaListener(new SchemaRegionId(0)).open();
- PipeDataNodeAgent.runtime().notifySchemaLeaderReady(new SchemaRegionId(0));
-
- CreateTimeSeriesNode node1 =
+ final CreateTimeSeriesNode node1 =
new CreateTimeSeriesNode(
new PlanNodeId("CreateTimeSeriesNode"),
new PartialPath("root.db.d1.s1"),
@@ -82,7 +81,7 @@ public class SchemaRegionListeningQueueTest {
null,
"alias");
- PipeEnrichedWritePlanNode node2 =
+ final PipeEnrichedWritePlanNode node2 =
new PipeEnrichedWritePlanNode(
new ActivateTemplateNode(
new PlanNodeId("ActivateTemplateNode"), new
PartialPath("root.sg.d1.s1"), 2, 1));
@@ -98,13 +97,13 @@ public class SchemaRegionListeningQueueTest {
PipeDataNodeAgent.runtime().schemaListener(new
SchemaRegionId(0)).loadSnapshot(snapshotDir);
Assert.assertTrue(PipeDataNodeAgent.runtime().schemaListener(new
SchemaRegionId(0)).isOpened());
- ConcurrentIterableLinkedQueue<Event>.DynamicIterator itr =
+ final ConcurrentIterableLinkedQueue<Event>.DynamicIterator itr =
PipeDataNodeAgent.runtime().schemaListener(new
SchemaRegionId(0)).newIterator(0);
- Event event1 = itr.next(0);
+ final Event event1 = itr.next(0);
Assert.assertEquals(node1, ((PipeSchemaRegionWritePlanEvent)
event1).getPlanNode());
- Event event2 = itr.next(0);
+ final Event event2 = itr.next(0);
Assert.assertEquals(
node2.getWritePlanNode(), ((PipeSchemaRegionWritePlanEvent)
event2).getPlanNode());
Assert.assertTrue(((PipeSchemaRegionWritePlanEvent)
event2).isGeneratedByPipe());
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/tools/MLogParserTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/tools/MLogParserTest.java
index 6b5e59d7a15..ad747a8ea62 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/tools/MLogParserTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/tools/MLogParserTest.java
@@ -100,7 +100,7 @@ public class MLogParserTest {
try {
schemaEngine
.getSchemaRegion(new SchemaRegionId(schemaRegionIds[i]))
- .createTimeseries(
+ .createTimeSeries(
SchemaRegionWritePlanFactory.getCreateTimeSeriesPlan(
new PartialPath("root.sg" + i + "." + "device" + j +
"." + "s" + k),
TSDataType.INT32,
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/SchemaRegionSnapshotParserTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/SchemaRegionSnapshotParserTest.java
index 8ff09c5008b..3c9e6cf1148 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/SchemaRegionSnapshotParserTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/SchemaRegionSnapshotParserTest.java
@@ -210,7 +210,7 @@ public class SchemaRegionSnapshotParserTest {
null,
null));
for (ICreateTimeSeriesPlan plan : planMap.values()) {
- schemaRegion.createTimeseries(plan, -1);
+ schemaRegion.createTimeSeries(plan, -1);
}
File snapshotDir = new File(config.getSchemaDir() + File.separator +
"snapshot");
@@ -398,7 +398,7 @@ public class SchemaRegionSnapshotParserTest {
}
ISchemaRegion schemaRegion = getSchemaRegion("root.sg", 0);
PartialPath databasePath = new PartialPath("root.sg");
- schemaRegion.createTimeseries(
+ schemaRegion.createTimeSeries(
SchemaRegionWritePlanFactory.getCreateTimeSeriesPlan(
new PartialPath("root.sg.s1.g1.temp"),
TSDataType.FLOAT,
@@ -409,7 +409,7 @@ public class SchemaRegionSnapshotParserTest {
null,
null),
0);
- schemaRegion.createTimeseries(
+ schemaRegion.createTimeSeries(
SchemaRegionWritePlanFactory.getCreateTimeSeriesPlan(
new PartialPath("root.sg.s1.g3.temp"),
TSDataType.FLOAT,
@@ -420,7 +420,7 @@ public class SchemaRegionSnapshotParserTest {
null,
null),
0);
- schemaRegion.createTimeseries(
+ schemaRegion.createTimeSeries(
SchemaRegionWritePlanFactory.getCreateTimeSeriesPlan(
new PartialPath("root.sg.s2.g1.temp"),
TSDataType.FLOAT,
@@ -662,7 +662,7 @@ public class SchemaRegionSnapshotParserTest {
}));
for (ISchemaRegionPlan plan : planMap.values()) {
if (plan instanceof ICreateTimeSeriesPlan) {
- schemaRegion.createTimeseries((ICreateTimeSeriesPlan) plan, -1);
+ schemaRegion.createTimeSeries((ICreateTimeSeriesPlan) plan, -1);
} else if (plan instanceof ICreateAlignedTimeSeriesPlan) {
schemaRegion.createAlignedTimeSeries((ICreateAlignedTimeSeriesPlan)
plan);
} else if (plan instanceof IActivateTemplateInClusterPlan) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/node/utils/IMNodeFactory.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/node/utils/IMNodeFactory.java
index 24b60a78fb0..93ca7406d9b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/node/utils/IMNodeFactory.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/node/utils/IMNodeFactory.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.commons.schema.node.utils;
import org.apache.iotdb.commons.schema.node.IMNode;