This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new adcdbd6 [To rel/0.12] [IOTDB-1484] auto create schema on 0.12 (#3999)
adcdbd6 is described below
commit adcdbd613fd2aae5fccf75c7f7bc29ef14541681
Author: Mrquan <[email protected]>
AuthorDate: Wed Sep 22 22:30:03 2021 +0800
[To rel/0.12] [IOTDB-1484] auto create schema on 0.12 (#3999)
* auto create schema
* auto create schema
* auto create schema
---
.../iotdb/cluster/log/applier/BaseApplier.java | 89 ++++++--
.../iotdb/cluster/log/applier/DataLogApplier.java | 59 ++++--
.../apache/iotdb/cluster/metadata/CMManager.java | 21 ++
.../cluster/server/member/DataGroupMember.java | 117 ++++++++--
.../cluster/log/applier/DataLogApplierTest.java | 9 +-
docs/UserGuide/Cluster/Cluster-Setup-Example.md | 200 +++++++++++++++--
docs/zh/UserGuide/Cluster/Cluster-Setup-Example.md | 236 ++++++++++++++++++---
pom.xml | 53 ++---
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 11 +-
.../org/apache/iotdb/db/qp/physical/BatchPlan.java | 9 +
.../db/qp/physical/crud/InsertMultiTabletPlan.java | 19 +-
.../iotdb/db/qp/physical/crud/InsertRowPlan.java | 7 +-
.../physical/crud/InsertRowsOfOneDevicePlan.java | 13 +-
.../iotdb/db/qp/physical/crud/InsertRowsPlan.java | 15 +-
.../qp/physical/sys/CreateMultiTimeSeriesPlan.java | 12 +-
15 files changed, 709 insertions(+), 161 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
index d1e6d6a..dd1d735 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.cluster.query.ClusterPlanExecutor;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.member.DataGroupMember;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -93,6 +94,47 @@ abstract class BaseApplier implements LogApplier {
}
}
+ private void handleBatchProcessException(
+ BatchProcessException e, InsertPlan plan, DataGroupMember
dataGroupMember)
+ throws QueryProcessException, StorageGroupNotSetException,
StorageEngineException {
+ if (IoTDBDescriptor.getInstance().getConfig().isEnablePartition()) {
+ TSStatus[] failingStatus = e.getFailingStatus();
+ for (int i = 0; i < failingStatus.length; i++) {
+ TSStatus status = failingStatus[i];
+ // skip succeeded plans in later execution
+ if (status != null
+ && status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && plan instanceof BatchPlan) {
+ ((BatchPlan) plan).setIsExecuted(i);
+ }
+ }
+
+ boolean needRetry = false, hasError = false;
+ for (int i = 0, failingStatusLength = failingStatus.length; i <
failingStatusLength; i++) {
+ TSStatus status = failingStatus[i];
+ if (status != null) {
+ if (status.getCode() ==
TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()
+ && plan instanceof BatchPlan) {
+ ((BatchPlan) plan).unsetIsExecuted(i);
+ needRetry = true;
+ } else if (status.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ hasError = true;
+ }
+ }
+ }
+ if (hasError) {
+ throw e;
+ }
+ if (needRetry) {
+ pullTimeseriesSchema(plan, dataGroupMember.getHeader());
+ plan.recoverFromFailure();
+ getQueryExecutor().processNonQuery(plan);
+ }
+ } else {
+ throw e;
+ }
+ }
+
private void handleBatchProcessException(BatchProcessException e,
PhysicalPlan plan)
throws QueryProcessException, StorageEngineException,
StorageGroupNotSetException {
TSStatus[] failingStatus = e.getFailingStatus();
@@ -158,24 +200,26 @@ abstract class BaseApplier implements LogApplier {
throws QueryProcessException, StorageGroupNotSetException,
StorageEngineException {
try {
getQueryExecutor().processNonQuery(plan);
+ } catch (BatchProcessException e) {
+ handleBatchProcessException(e, plan, dataGroupMember);
} catch (QueryProcessException | StorageGroupNotSetException |
StorageEngineException e) {
- // check if this is caused by metadata missing, if so, pull metadata and
retry
- Throwable metaMissingException = SchemaUtils.findMetaMissingException(e);
- boolean causedByPathNotExist = metaMissingException instanceof
PathNotExistException;
-
- if (causedByPathNotExist) {
- if (logger.isDebugEnabled()) {
- logger.debug(
- "Timeseries is not found locally[{}], try pulling it from
another group: {}",
- metaGroupMember.getName(),
- e.getCause().getMessage());
- }
- pullTimeseriesSchema(plan, dataGroupMember.getHeader());
- plan.recoverFromFailure();
- getQueryExecutor().processNonQuery(plan);
- } else {
- throw e;
- }
+ if (IoTDBDescriptor.getInstance().getConfig().isEnablePartition()) {
+ // check if this is caused by metadata missing, if so, pull metadata
and retry
+ Throwable metaMissingException =
SchemaUtils.findMetaMissingException(e);
+ boolean causedByPathNotExist = metaMissingException instanceof
PathNotExistException;
+
+ if (causedByPathNotExist) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "Timeseries is not found locally[{}], try pulling it from
another group: {}",
+ metaGroupMember.getName(),
+ e.getCause().getMessage());
+ }
+ pullTimeseriesSchema(plan, dataGroupMember.getHeader());
+ plan.recoverFromFailure();
+ getQueryExecutor().processNonQuery(plan);
+ } else throw e;
+ } else throw e;
}
}
@@ -187,9 +231,14 @@ abstract class BaseApplier implements LogApplier {
private void pullTimeseriesSchema(InsertPlan plan, Node ignoredGroup)
throws QueryProcessException {
try {
- PartialPath path = plan.getDeviceId();
- ((CMManager) IoTDB.metaManager)
- .pullTimeSeriesSchemas(Collections.singletonList(path),
ignoredGroup);
+ if (plan instanceof BatchPlan) {
+ ((CMManager) IoTDB.metaManager)
+ .pullTimeSeriesSchemas(((BatchPlan) plan).getPrefixPaths(),
ignoredGroup);
+ } else {
+ PartialPath path = plan.getDeviceId();
+ ((CMManager) IoTDB.metaManager)
+ .pullTimeSeriesSchemas(Collections.singletonList(path),
ignoredGroup);
+ }
} catch (MetadataException e1) {
throw new QueryProcessException(e1);
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
index 7d11d3e..ba84ee6 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
@@ -52,7 +52,7 @@ public class DataLogApplier extends BaseApplier {
private static final Logger logger =
LoggerFactory.getLogger(DataLogApplier.class);
- private DataGroupMember dataGroupMember;
+ protected DataGroupMember dataGroupMember;
public DataLogApplier(MetaGroupMember metaGroupMember, DataGroupMember
dataGroupMember) {
super(metaGroupMember);
@@ -67,15 +67,7 @@ public class DataLogApplier extends BaseApplier {
if (log instanceof PhysicalPlanLog) {
PhysicalPlanLog physicalPlanLog = (PhysicalPlanLog) log;
PhysicalPlan plan = physicalPlanLog.getPlan();
- if (plan instanceof InsertMultiTabletPlan) {
- applyInsert((InsertMultiTabletPlan) plan);
- } else if (plan instanceof InsertRowsPlan) {
- applyInsert((InsertRowsPlan) plan);
- } else if (plan instanceof InsertPlan) {
- applyInsert((InsertPlan) plan);
- } else {
- applyPhysicalPlan(plan, dataGroupMember);
- }
+ applyPhysicalPlan(plan);
} else if (log instanceof CloseFileLog) {
CloseFileLog closeFileLog = ((CloseFileLog) log);
StorageEngine.getInstance()
@@ -98,18 +90,61 @@ public class DataLogApplier extends BaseApplier {
}
}
+ public void applyPhysicalPlan(PhysicalPlan plan)
+ throws QueryProcessException, StorageGroupNotSetException,
StorageEngineException {
+ if (plan instanceof InsertMultiTabletPlan) {
+ applyInsert((InsertMultiTabletPlan) plan);
+ } else if (plan instanceof InsertRowsPlan) {
+ applyInsert((InsertRowsPlan) plan);
+ } else if (plan instanceof InsertPlan) {
+ applyInsert((InsertPlan) plan);
+ } else {
+ applyPhysicalPlan(plan, dataGroupMember);
+ }
+ }
+
private void applyInsert(InsertMultiTabletPlan plan)
throws StorageGroupNotSetException, QueryProcessException,
StorageEngineException {
+ boolean hasSync = false;
for (InsertTabletPlan insertTabletPlan : plan.getInsertTabletPlanList()) {
- applyInsert(insertTabletPlan);
+ try {
+ IoTDB.metaManager.getStorageGroupPath(insertTabletPlan.getDeviceId());
+ } catch (StorageGroupNotSetException e) {
+ try {
+ if (!hasSync) {
+ metaGroupMember.syncLeaderWithConsistencyCheck(true);
+ hasSync = true;
+ } else {
+ throw new StorageEngineException(e.getMessage());
+ }
+ } catch (CheckConsistencyException ce) {
+ throw new QueryProcessException(ce.getMessage());
+ }
+ }
}
+ applyPhysicalPlan(plan, dataGroupMember);
}
private void applyInsert(InsertRowsPlan plan)
throws StorageGroupNotSetException, QueryProcessException,
StorageEngineException {
+ boolean hasSync = false;
for (InsertRowPlan insertRowPlan : plan.getInsertRowPlanList()) {
- applyInsert(insertRowPlan);
+ try {
+ IoTDB.metaManager.getStorageGroupPath(insertRowPlan.getDeviceId());
+ } catch (StorageGroupNotSetException e) {
+ try {
+ if (!hasSync) {
+ metaGroupMember.syncLeaderWithConsistencyCheck(true);
+ hasSync = true;
+ } else {
+ throw new StorageEngineException(e.getMessage());
+ }
+ } catch (CheckConsistencyException ce) {
+ throw new QueryProcessException(ce.getMessage());
+ }
+ }
}
+ applyPhysicalPlan(plan, dataGroupMember);
}
private void applyInsert(InsertPlan plan)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index 05d61a4..75ff989 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -51,6 +51,7 @@ import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.physical.BatchPlan;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.*;
import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
@@ -642,6 +643,22 @@ public class CMManager extends MManager {
return allSuccess;
}
+ public boolean createTimeseries(InsertRowsOfOneDevicePlan
insertRowsOfOneDevicePlan)
+ throws CheckConsistencyException, IllegalPathException {
+ boolean allSuccess = true;
+ for (InsertRowPlan insertRowPlan :
insertRowsOfOneDevicePlan.getRowPlans()) {
+ boolean success = createTimeseries(insertRowPlan);
+ allSuccess = allSuccess && success;
+ if (!success) {
+ logger.error(
+ "create timeseries for device={} failed, plan={}",
+ insertRowPlan.getDeviceId(),
+ insertRowPlan);
+ }
+ }
+ return allSuccess;
+ }
+
/**
* Create timeseries automatically for an InsertPlan.
*
@@ -658,6 +675,10 @@ public class CMManager extends MManager {
return createTimeseries((InsertRowsPlan) insertPlan);
}
+ if (insertPlan instanceof InsertRowsOfOneDevicePlan) {
+ return createTimeseries((InsertRowsOfOneDevicePlan) insertPlan);
+ }
+
List<String> seriesList = new ArrayList<>();
PartialPath deviceId = insertPlan.getDeviceId();
PartialPath storageGroupName;
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 6f6936a..599a08e 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -42,6 +42,7 @@ import org.apache.iotdb.cluster.log.snapshot.FileSnapshot;
import org.apache.iotdb.cluster.log.snapshot.PartitionedSnapshot;
import org.apache.iotdb.cluster.log.snapshot.PullSnapshotTask;
import org.apache.iotdb.cluster.log.snapshot.PullSnapshotTaskDescriptor;
+import org.apache.iotdb.cluster.metadata.CMManager;
import org.apache.iotdb.cluster.partition.NodeAdditionResult;
import org.apache.iotdb.cluster.partition.NodeRemovalResult;
import org.apache.iotdb.cluster.partition.PartitionGroup;
@@ -71,17 +72,22 @@ import org.apache.iotdb.cluster.utils.StatusUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import
org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.TimePartitionFilter;
+import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
-import org.apache.iotdb.db.exception.metadata.UndefinedTemplateException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.executor.PlanExecutor;
+import org.apache.iotdb.db.qp.physical.BatchPlan;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.*;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.EndPoint;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -145,6 +151,7 @@ public class DataGroupMember extends RaftMember {
private LocalQueryExecutor localQueryExecutor;
+ LogApplier dataLogApplier;
/**
* When a new partition table is installed, all data members will be checked
if unchanged. If not,
* such members will be removed.
@@ -175,13 +182,14 @@ public class DataGroupMember extends RaftMember {
allNodes = nodes;
setQueryManager(new ClusterQueryManager());
slotManager = new SlotManager(ClusterConstant.SLOT_NUM, getMemberDir());
- LogApplier applier = new DataLogApplier(metaGroupMember, this);
- if (ClusterDescriptor.getInstance().getConfig().isUseAsyncApplier()) {
- applier = new AsyncDataLogApplier(applier, name);
+ dataLogApplier = new DataLogApplier(metaGroupMember, this);
+ if (ClusterDescriptor.getInstance().getConfig().isUseAsyncApplier()
+ && ClusterDescriptor.getInstance().getConfig().getReplicationNum() !=
1) {
+ dataLogApplier = new AsyncDataLogApplier(dataLogApplier, name);
}
logManager =
new FilePartitionedSnapshotLogManager(
- applier, metaGroupMember.getPartitionTable(), allNodes.get(0),
thisNode, this);
+ dataLogApplier, metaGroupMember.getPartitionTable(),
allNodes.get(0), thisNode, this);
initPeerMap();
term.set(logManager.getHardState().getCurrentTerm());
voteFor = logManager.getHardState().getVoteFor();
@@ -698,24 +706,34 @@ public class DataGroupMember extends RaftMember {
public TSStatus executeNonQueryPlan(PhysicalPlan plan) {
if (ClusterDescriptor.getInstance().getConfig().getReplicationNum() == 1) {
try {
- getLocalExecutor().processNonQuery(plan);
+ ((DataLogApplier) dataLogApplier).applyPhysicalPlan(plan);
return StatusUtils.OK;
} catch (Exception e) {
Throwable cause = IOUtils.getRootCause(e);
- if (cause instanceof StorageGroupNotSetException
- || cause instanceof UndefinedTemplateException) {
- try {
- metaGroupMember.syncLeaderWithConsistencyCheck(true);
- if (plan instanceof InsertPlan && ((InsertPlan)
plan).getFailedMeasurements() != null) {
- ((InsertPlan) plan).recoverFromFailure();
+ boolean hasCreated = false;
+ try {
+ if (plan instanceof InsertPlan
+ &&
ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema()) {
+ if (plan instanceof InsertRowsPlan || plan instanceof
InsertMultiTabletPlan) {
+ if (e instanceof BatchProcessException) {
+ for (TSStatus status : ((BatchProcessException)
e).getFailingStatus()) {
+ if (status.getCode() ==
TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()) {
+ hasCreated =
createTimeseriesForFailedInsertion(((InsertPlan) plan));
+ ((BatchPlan) plan).getResults().clear();
+ break;
+ }
+ }
+ }
+ } else if (cause instanceof PathNotExistException) {
+ hasCreated = createTimeseriesForFailedInsertion(((InsertPlan)
plan));
}
- getLocalExecutor().processNonQuery(plan);
- return StatusUtils.OK;
- } catch (CheckConsistencyException ce) {
- return StatusUtils.getStatus(StatusUtils.CONSISTENCY_FAILURE,
ce.getMessage());
- } catch (Exception ne) {
- return handleLogExecutionException(plan, IOUtils.getRootCause(ne));
}
+ } catch (MetadataException | CheckConsistencyException ex) {
+ logger.error("{}: Cannot auto-create timeseries for {}", name, plan,
e);
+ return StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR,
ex.getMessage());
+ }
+ if (hasCreated) {
+ return executeNonQueryPlan(plan);
}
return handleLogExecutionException(plan, cause);
}
@@ -737,6 +755,34 @@ public class DataGroupMember extends RaftMember {
if (character == NodeCharacter.LEADER) {
long startTime =
Statistic.DATA_GROUP_MEMBER_LOCAL_EXECUTION.getOperationStartTime();
TSStatus status = processPlanLocally(plan);
+ boolean hasCreated = false;
+ try {
+ if (plan instanceof InsertPlan
+ &&
ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema()) {
+ if (plan instanceof InsertRowsPlan || plan instanceof
InsertMultiTabletPlan) {
+ if (status.getCode() ==
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+ for (TSStatus tmpStatus : status.getSubStatus()) {
+ if (tmpStatus.getCode() ==
TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()) {
+ hasCreated =
createTimeseriesForFailedInsertion(((InsertPlan) plan));
+ ((BatchPlan) plan).getResults().clear();
+ break;
+ }
+ }
+ }
+ } else {
+ if (status.getCode() ==
TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()) {
+ hasCreated = createTimeseriesForFailedInsertion(((InsertPlan)
plan));
+ }
+ }
+ }
+ } catch (MetadataException | CheckConsistencyException e) {
+ logger.error("{}: Cannot auto-create timeseries for {}", name, plan,
e);
+ return StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR,
e.getMessage());
+ }
+
+ if (hasCreated) {
+ status = processPlanLocally(plan);
+ }
Statistic.DATA_GROUP_MEMBER_LOCAL_EXECUTION.calOperationCostTimeFromStart(startTime);
if (status != null) {
return status;
@@ -754,6 +800,41 @@ public class DataGroupMember extends RaftMember {
return StatusUtils.NO_LEADER;
}
+ private boolean createTimeseriesForFailedInsertion(InsertPlan plan)
+ throws CheckConsistencyException, IllegalPathException {
+ logger.info("create time series for failed insertion {}", plan);
+ // apply measurements according to failed measurements
+ if (plan instanceof InsertMultiTabletPlan) {
+ for (InsertTabletPlan insertPlan : ((InsertMultiTabletPlan)
plan).getInsertTabletPlanList()) {
+ if (insertPlan.getFailedMeasurements() != null) {
+ insertPlan.getPlanFromFailed();
+ }
+ }
+ }
+
+ if (plan instanceof InsertRowsPlan) {
+ for (InsertRowPlan insertPlan : ((InsertRowsPlan)
plan).getInsertRowPlanList()) {
+ if (insertPlan.getFailedMeasurements() != null) {
+ insertPlan.getPlanFromFailed();
+ }
+ }
+ }
+
+ if (plan instanceof InsertRowsOfOneDevicePlan) {
+ for (InsertRowPlan insertPlan : ((InsertRowsOfOneDevicePlan)
plan).getRowPlans()) {
+ if (insertPlan.getFailedMeasurements() != null) {
+ insertPlan.getPlanFromFailed();
+ }
+ }
+ }
+
+ if (plan.getFailedMeasurements() != null) {
+ plan.getPlanFromFailed();
+ }
+
+ return ((CMManager) IoTDB.metaManager).createTimeseries(plan);
+ }
+
/**
* When the node does not play a member in a group any more, the
corresponding local data should
* be removed.
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
index 3dbf0c3..d20086c 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
@@ -95,7 +95,7 @@ public class DataLogApplierTest extends IoTDBTest {
private static final Logger logger =
LoggerFactory.getLogger(DataLogApplierTest.class);
private boolean partialWriteEnabled;
-
+ private boolean isPartitionEnabled;
private TestMetaGroupMember testMetaGroupMember =
new TestMetaGroupMember() {
@Override
@@ -168,6 +168,8 @@ public class DataLogApplierTest extends IoTDBTest {
NodeStatusManager.getINSTANCE().setMetaGroupMember(testMetaGroupMember);
partialWriteEnabled =
IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert();
IoTDBDescriptor.getInstance().getConfig().setEnablePartialInsert(false);
+ isPartitionEnabled =
IoTDBDescriptor.getInstance().getConfig().isEnablePartition();
+ IoTDBDescriptor.getInstance().getConfig().setEnablePartition(true);
testMetaGroupMember.setClientProvider(
new DataClientProvider(new Factory()) {
@Override
@@ -197,6 +199,10 @@ public class DataLogApplierTest extends IoTDBTest {
for (int i = 0; i < 10; i++) {
timeseriesSchemas.add(TestUtils.getTestTimeSeriesSchema(4, i));
}
+ } else if (path.startsWith(TestUtils.getTestSg(1))
+ || path.startsWith(TestUtils.getTestSg(2))
+ || path.startsWith(TestUtils.getTestSg(3))) {
+ // do nothing
} else if
(!path.startsWith(TestUtils.getTestSg(5))) {
resultHandler.onError(new
StorageGroupNotSetException(path));
return;
@@ -246,6 +252,7 @@ public class DataLogApplierTest extends IoTDBTest {
super.tearDown();
NodeStatusManager.getINSTANCE().setMetaGroupMember(null);
IoTDBDescriptor.getInstance().getConfig().setEnablePartialInsert(partialWriteEnabled);
+
IoTDBDescriptor.getInstance().getConfig().setEnablePartition(isPartitionEnabled);
}
@Test
diff --git a/docs/UserGuide/Cluster/Cluster-Setup-Example.md
b/docs/UserGuide/Cluster/Cluster-Setup-Example.md
index 43aa65a..81b9ee3 100644
--- a/docs/UserGuide/Cluster/Cluster-Setup-Example.md
+++ b/docs/UserGuide/Cluster/Cluster-Setup-Example.md
@@ -19,29 +19,199 @@
-->
-__NOTICE: CURRENT IoTDB CLUSTER IS FOR TESTING NOW!
-PLEASE BE DELIBERATE IF YOU RUN IT IN PRODUCT ENVIRONMENT.__
+## Prerequisite
+Note: Please install MinGW or WSL or git bash if you are using Windows.
+
+## Example of distributed configurations for 1 node and 1 replica
+
+### Compile from source code:
+
+```
+mvn clean package -DskipTests
+chmod -R 777 ./cluster/target/
+nohup ./cluster/target/iotdb-cluster-0.13.0-SNAPSHOT/sbin/start-node.sh
>/dev/null 2>&1 &
+```
+
+### Use the official website release version:
+
+```
+curl -O
https://downloads.apache.org/iotdb/0.12.1/apache-iotdb-0.12.1-cluster-bin.zip
+unzip apache-iotdb-0.12.1-cluster-bin.zip
+cd apache-iotdb-0.12.1-cluster-bin
+sed -i -e
's/^seed_nodes=127.0.0.1:9003,127.0.0.1:9005,127.0.0.1:9007$/seed_nodes=127.0.0.1:9003/g'
conf/iotdb-cluster.properties
+sed -i -e 's/^default_replica_num=3$/default_replica_num=1/g'
conf/iotdb-cluster.properties
+nohup ./sbin/start-node.sh >/dev/null 2>&1 &
+```
+
+## Example of distributed configurations for 3 nodes and 1 replica on a single
machine
+
+### Configurations
+
+You can start multiple instances on a single machine by modifying the
configurations yourself to handling port and file directory conflicts.
+
+**Node1**:**(default)**
+
+***iotdb-cluster.properties***
+
+seed\_nodes = 127.0.0.1:9003,127.0.0.1:9005,127.0.0.1:9007
+
+default\_replica\_num = 1
+
+internal\_meta\_port = 9003
+
+internal\_data\_port = 40010
+
+***iotdb-engine.properties***
+
+rpc\_port=6667
+
+system\_dir=data/system
+data\_dirs=data/data
+wal\_dir=data/wal
+index\_root\_dir=data/index
+udf\_root\_dir=ext/udf
+tracing\_dir=data/tracing
+
+**Node2**:
+
+***iotdb-cluster.properties***
+
+seed\_nodes = 127.0.0.1:9003,127.0.0.1:9005,127.0.0.1:9007
+
+default\_replica\_num = 1
+
+internal\_meta\_port = 9005
+
+internal\_data\_port = 40012
+
+***iotdb-engine.properties***
+
+rpc\_port=6669
+
+system\_dir=node2/system
+data\_dirs=node2/data
+wal\_dir=node2/wal
+index\_root\_dir=node2/index
+udf\_root\_dir=node2/ext/udf
+tracing\_dir=node2/tracing
+
+**Node3**:
+
+***iotdb-cluster.properties***
+
+seed\_nodes = 127.0.0.1:9003,127.0.0.1:9005,127.0.0.1:9007
+
+default\_replica\_num = 1
+
+internal\_meta\_port = 9007
+
+internal\_data\_port = 40014
+
+***iotdb-engine.properties***
+
+rpc\_port=6671
+
+system\_dir=node3/system
+data\_dirs=node3/data
+wal\_dir=node3/wal
+index\_root\_dir=node3/index
+udf\_root\_dir=node3/ext/udf
+tracing\_dir=node3/tracing
+
+### Compile from source code:
-## Example of pseudo-distributed scaffolding for 3 nodes and 3 replicas
```
mvn clean package -DskipTests
chmod -R 777 ./cluster/target/
-nohup ./cluster/target/iotdb-cluster-0.12.0-SNAPSHOT/sbin/start-node.sh
./cluster/target/test-classes/node1conf/ >/dev/null 2>&1 &
-nohup ./cluster/target/iotdb-cluster-0.12.0-SNAPSHOT/sbin/start-node.sh
./cluster/target/test-classes/node2conf/ >/dev/null 2>&1 &
-nohup ./cluster/target/iotdb-cluster-0.12.0-SNAPSHOT/sbin/start-node.sh
./cluster/target/test-classes/node3conf/ >/dev/null 2>&1 &
+nohup ./cluster/target/iotdb-cluster-0.13.0-SNAPSHOT/sbin/start-node.sh
./cluster/target/test-classes/node1conf/ >/dev/null 2>&1 &
+nohup ./cluster/target/iotdb-cluster-0.13.0-SNAPSHOT/sbin/start-node.sh
./cluster/target/test-classes/node2conf/ >/dev/null 2>&1 &
+nohup ./cluster/target/iotdb-cluster-0.13.0-SNAPSHOT/sbin/start-node.sh
./cluster/target/test-classes/node3conf/ >/dev/null 2>&1 &
```
+### Use the official website release version:
-## Example of distributed scaffolding for 3 nodes and 3 replicas
+Download the release version:
+```
+curl -O
https://downloads.apache.org/iotdb/0.12.1/apache-iotdb-0.12.1-cluster-bin.zip
+```
+
+Unzip the package:
+
+```
+unzip apache-iotdb-0.12.1-cluster-bin.zip
+```
+
+Enter IoTDB cluster root directory:
+
+```
+cd apache-iotdb-0.12.1-cluster-bin
+```
+
+Set default\_replica\_num = 1:
+
+```
+sed -i -e 's/^default_replica_num=3$/default_replica_num=1/g'
conf/iotdb-cluster.properties
+```
+
+Create conf\_dir for node2 and node3:
+
+```
+cp -r conf node2_conf
+cp -r conf node3_conf
+```
+
+Handle port and file directory conflicts:
+
+```
+sed -i -e 's/^internal_meta_port=9003$/internal_meta_port=9005/g' -e
's/^internal_data_port=40010$/internal_data_port=40012/g'
node2_conf/iotdb-cluster.properties
+sed -i -e 's/^internal_meta_port=9003$/internal_meta_port=9007/g' -e
's/^internal_data_port=40010$/internal_data_port=40014/g'
node3_conf/iotdb-cluster.properties
+sed -i -e 's/^rpc_port=6667$/rpc_port=6669/g' -e
node2_conf/iotdb-engine.properties
+sed -i -e 's/^rpc_port=6667$/rpc_port=6671/g' -e
node3_conf/iotdb-engine.properties
+sed -i -e 's/^# data_dirs=data\/data$/data_dirs=node2\/data/g' -e 's/^#
wal_dir=data\/wal$/wal_dir=node2\/wal/g' -e 's/^#
tracing_dir=data\/tracing$/tracing_dir=node2\/tracing/g' -e 's/^#
system_dir=data\/system$/system_dir=node2\/system/g' -e 's/^#
udf_root_dir=ext\/udf$/udf_root_dir=node2\/ext\/udf/g' -e 's/^#
index_root_dir=data\/index$/index_root_dir=node2\/index/g'
node2_conf/iotdb-engine.properties
+sed -i -e 's/^# data_dirs=data\/data$/data_dirs=node3\/data/g' -e 's/^#
wal_dir=data\/wal$/wal_dir=node3\/wal/g' -e 's/^#
tracing_dir=data\/tracing$/tracing_dir=node3\/tracing/g' -e 's/^#
system_dir=data\/system$/system_dir=node3\/system/g' -e 's/^#
udf_root_dir=ext\/udf$/udf_root_dir=node3\/ext\/udf/g' -e 's/^#
index_root_dir=data\/index$/index_root_dir=node3\/index/g'
node3_conf/iotdb-engine.properties
+```
+
+**You can modify the configuration items by yourself instead of using "sed"
command**
+
+Start the three nodes with their configurations:
-Suppose we need to deploy the distributed IoTDB on three physical nodes, A, B,
and C, whose public network IP is a_public_IP, b_public_IP, and c_public_IP,
and private network IP is a_private_IP, b_private_IP, and c_private_IP.
+
+```
+nohup ./sbin/start-node.sh >/dev/null 2>&1 &
+nohup ./sbin/start-node.sh ./node2_conf/ >/dev/null 2>&1 &
+nohup ./sbin/start-node.sh ./node3_conf/ >/dev/null 2>&1 &
+```
+
+
+
+## Example of distributed configurations for 3 nodes and 3 replicas
+
+Suppose we need to deploy the distributed IoTDB on three physical nodes, A, B,
and C, whose public network IP is *A\_public\_IP*, *B\_public\_IP*, and
*C\_public\_IP*, and private network IP is *A\_private\_IP*, *B\_private\_IP*,
and *C\_private\_IP*.
Note: If there is no public network IP or private network IP, both can be set
to the same, just need to ensure that the client can access the server.
+### Configurations
+
+**Each Node:**
+
+***iotdb-cluster.properties***
+
+seed\_nodes = *A\_private\_Ip*:9003,*B\_private\_Ip*:9003,*C\_private\_Ip*:9003
+
+default\_replica\_num = 3
+
+internal\_ip = *A\_private\_Ip* (or *B\_private\_Ip*, *C\_private\_Ip*)
+
+***iotdb-engine.properties***
+
+rpc\_address = *A\_public\_Ip* (or *B\_private\_Ip*, *C\_public\_Ip*)
+
+### Start IoTDB cluster
+
The operation steps are as follows:
-1. Make sure ports 6667, 9003, 9004, 40010, 40011 and 31999 are open on all
three nodes.
-2. Use 'mvn clean package -pl cluster -am -DskipTests' to compile the
distributed module.
-3. Send the generated package (iotdb-cluster-0.12.0-SNAPSHOT) to all servers.
-4. Configure all nodes' seed_nodes in conf/iotdb-cluster.properties as
"A_private_ip:9003,B_private_ip:9003,C_private_ip:9003"
-5. Configure the internal_ip in conf/iotdb-cluster.properties to be the
private_ip of each node.
-6. Configure rpc_address in conf/iotdb-engine.properties to be the public_ip
of each node.
-7. Run sh sbin/start-node.sh on each of the three nodes (or run in the
background).
\ No newline at end of file
+* Use 'mvn clean package -pl cluster -am -DskipTests' to compile the
distributed module or directly go into the
[website](https://iotdb.apache.org/Download/) to download the latest version.
+* Make sure ports 6567, 6667, 9003, 9004, 40010, 40011 and 31999 are open on
all three nodes.
+
+* Send the package to all servers.
+
+* Modify the configuration items.
+* Run sh sbin/start-node.sh on each of the three nodes (or run in the
background).
diff --git a/docs/zh/UserGuide/Cluster/Cluster-Setup-Example.md
b/docs/zh/UserGuide/Cluster/Cluster-Setup-Example.md
index 3fd72f1..78889a7 100644
--- a/docs/zh/UserGuide/Cluster/Cluster-Setup-Example.md
+++ b/docs/zh/UserGuide/Cluster/Cluster-Setup-Example.md
@@ -1,46 +1,220 @@
<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
+```
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+```
-->
-__集群模式目前是测试版!请谨慎在生产环境中使用。__
+## 前提条件
+
+如果您在使用 Windows 系统,请安装 MinGW,WSL 或者 git bash。
+
+## 1 节点 1 副本分布式搭建示例
+
+### 源码编译:
+
+```
+mvn clean package -DskipTests
+chmod -R 777 ./cluster/target/
+nohup ./cluster/target/iotdb-cluster-0.13.0-SNAPSHOT/sbin/start-node.sh
>/dev/null 2>&1 &
+```
+
+### 使用官网发布版本:
+
+```
+curl -O
https://downloads.apache.org/iotdb/0.12.1/apache-iotdb-0.12.1-cluster-bin.zip
+unzip apache-iotdb-0.12.1-cluster-bin.zip
+cd apache-iotdb-0.12.1-cluster-bin
+sed -i -e
's/^seed_nodes=127.0.0.1:9003,127.0.0.1:9005,127.0.0.1:9007$/seed_nodes=127.0.0.1:9003/g'
conf/iotdb-cluster.properties
+sed -i -e 's/^default_replica_num=3$/default_replica_num=1/g'
conf/iotdb-cluster.properties
+nohup ./sbin/start-node.sh >/dev/null 2>&1 &
+```
+
+## 单机部署 3 节点 1 副本示例
+
+### 配置
+
+通过自己修改配置来处理端口和文件目录冲突,可以在一台机器上启动多个实例。
+
+**节点1**:**(默认)**
+
+***iotdb-cluster.properties***
+
+seed\_nodes = 127.0.0.1:9003,127.0.0.1:9005,127.0.0.1:9007
+
+default\_replica\_num = 1
+
+internal\_meta\_port = 9003
+
+internal\_data\_port = 40010
+
+***iotdb-engine.properties***
+
+rpc\_port=6667
+
+system\_dir=data/system
+data\_dirs=data/data
+wal\_dir=data/wal
+index\_root\_dir=data/index
+udf\_root\_dir=ext/udf
+tracing\_dir=data/tracing
+
+**节点2**:
+
+***iotdb-cluster.properties***
+
+seed\_nodes = 127.0.0.1:9003,127.0.0.1:9005,127.0.0.1:9007
+
+default\_replica\_num = 1
+
+internal\_meta\_port = 9005
+
+internal\_data\_port = 40012
+
+***iotdb-engine.properties***
+
+rpc\_port=6669
+
+system\_dir=node2/system
+data\_dirs=node2/data
+wal\_dir=node2/wal
+index\_root\_dir=node2/index
+udf\_root\_dir=node2/ext/udf
+tracing\_dir=node2/tracing
+
+**节点3**:
+
+***iotdb-cluster.properties***
+
+seed\_nodes = 127.0.0.1:9003,127.0.0.1:9005,127.0.0.1:9007
+
+default\_replica\_num = 1
+
+internal\_meta\_port = 9007
+
+internal\_data\_port = 40014
+
+***iotdb-engine.properties***
+
+rpc\_port=6671
+
+system\_dir=node3/system
+data\_dirs=node3/data
+wal\_dir=node3/wal
+index\_root\_dir=node3/index
+udf\_root\_dir=node3/ext/udf
+tracing\_dir=node3/tracing
+
+### 源码编译:
-## 3节点3副本伪分布式搭建示例
```
mvn clean package -DskipTests
chmod -R 777 ./cluster/target/
-nohup ./cluster/target/iotdb-cluster-0.12.0-SNAPSHOT/sbin/start-node.sh
./cluster/target/test-classes/node1conf/ >/dev/null 2>&1 &
-nohup ./cluster/target/iotdb-cluster-0.12.0-SNAPSHOT/sbin/start-node.sh
./cluster/target/test-classes/node2conf/ >/dev/null 2>&1 &
-nohup ./cluster/target/iotdb-cluster-0.12.0-SNAPSHOT/sbin/start-node.sh
./cluster/target/test-classes/node3conf/ >/dev/null 2>&1 &
+nohup ./cluster/target/iotdb-cluster-0.13.0-SNAPSHOT/sbin/start-node.sh
./cluster/target/test-classes/node1conf/ >/dev/null 2>&1 &
+nohup ./cluster/target/iotdb-cluster-0.13.0-SNAPSHOT/sbin/start-node.sh
./cluster/target/test-classes/node2conf/ >/dev/null 2>&1 &
+nohup ./cluster/target/iotdb-cluster-0.13.0-SNAPSHOT/sbin/start-node.sh
./cluster/target/test-classes/node3conf/ >/dev/null 2>&1 &
+```
+
+### 使用官网发布版本:
+
+下载发布版本:
+
+```
+curl -O
https://downloads.apache.org/iotdb/0.12.1/apache-iotdb-0.12.1-cluster-bin.zip
+```
+
+解压压缩包:
+
+```
+unzip apache-iotdb-0.12.1-cluster-bin.zip
+```
+
+进入IoTDB集群根目录:
+
+```
+cd apache-iotdb-0.12.1-cluster-bin
+```
+
+设置 default\_replica\_num = 1:
+
+```
+sed -i -e 's/^default_replica_num=3$/default_replica_num=1/g'
conf/iotdb-cluster.properties
+```
+
+为节点2,节点3创建conf\_dir:
+
+```
+cp -r conf node2_confcp -r conf node3_conf
+```
+
+解决端口和文件目录冲突:
+
+```
+sed -i -e 's/^internal_meta_port=9003$/internal_meta_port=9005/g' -e
's/^internal_data_port=40010$/internal_data_port=40012/g'
node2_conf/iotdb-cluster.properties
+sed -i -e 's/^internal_meta_port=9003$/internal_meta_port=9007/g' -e
's/^internal_data_port=40010$/internal_data_port=40014/g'
node3_conf/iotdb-cluster.properties
+sed -i -e 's/^rpc_port=6667$/rpc_port=6669/g' -e
node2_conf/iotdb-engine.properties
+sed -i -e 's/^rpc_port=6667$/rpc_port=6671/g' -e
node3_conf/iotdb-engine.properties
+sed -i -e 's/^# data_dirs=data\/data$/data_dirs=node2\/data/g' -e 's/^#
wal_dir=data\/wal$/wal_dir=node2\/wal/g' -e 's/^#
tracing_dir=data\/tracing$/tracing_dir=node2\/tracing/g' -e 's/^#
system_dir=data\/system$/system_dir=node2\/system/g' -e 's/^#
udf_root_dir=ext\/udf$/udf_root_dir=node2\/ext\/udf/g' -e 's/^#
index_root_dir=data\/index$/index_root_dir=node2\/index/g'
node2_conf/iotdb-engine.properties
+sed -i -e 's/^# data_dirs=data\/data$/data_dirs=node3\/data/g' -e 's/^#
wal_dir=data\/wal$/wal_dir=node3\/wal/g' -e 's/^#
tracing_dir=data\/tracing$/tracing_dir=node3\/tracing/g' -e 's/^#
system_dir=data\/system$/system_dir=node3\/system/g' -e 's/^#
udf_root_dir=ext\/udf$/udf_root_dir=node3\/ext\/udf/g' -e 's/^#
index_root_dir=data\/index$/index_root_dir=node3\/index/g'
node3_conf/iotdb-engine.properties
+```
+
+**你可以自己修改配置项而不使用“sed”命令**
+
+根据配置文件路径启动三个节点:
+
+
+```
+nohup ./sbin/start-node.sh >/dev/null 2>&1 &nohup ./sbin/start-node.sh
./node2_conf/ >/dev/null 2>&1 &nohup ./sbin/start-node.sh ./node3_conf/
>/dev/null 2>&1 &
```
-## 3节点3副本分布式搭建示例
-假设我们需要在三个物理节点上部署分布式 IoTDB,这三个节点分别为 A, B 和 C,其公网 ip 分别为 A_public_ip,B_public_ip
和 C_public_ip,私网 ip 分别为 A_private_ip,B_private_ip 和 C_private_ip。
-注:如果没有公网 ip 或者私网 ip 则两者设置成一致即可, 只需要保证客户端能够访问到服务端即可。
+
+## 3 节点 3 副本分布式搭建示例
+
+假设我们需要在三个物理节点上部署分布式 IoTDB,这三个节点分别为 A, B 和 C,其公网 ip 分别为 A\_public\_IP*,
*B\_public\_IP*, and *C\_public\_IP*,私网 ip 分别为 *A\_private\_IP*,
*B\_private\_IP*, and *C\_private\_IP*.
+注:如果没有公网 ip 或者私网 ip 则两者设置成一致即可,只需要保证客户端能够访问到服务端即可。
+
+### 配置
+
+**各个节点:**
+
+***iotdb-cluster.properties***
+
+seed\_nodes = *A\_private\_Ip*:9003,*B\_private\_Ip*:9003,*C\_private\_Ip*:9003
+
+default\_replica\_num = 3
+
+internal\_ip = *A\_private\_Ip* (or *B\_private\_Ip*, *C\_private\_Ip*)
+
+***iotdb-engine.properties***
+
+rpc\_address = *A\_public\_Ip* (or *B\_private\_Ip*, *C\_public\_Ip*)
+
+### 启动IoTDB集群
以下为操作步骤:
-1. 保证三个节点的 6667, 9003, 9004, 40010, 40011 和 31999 端口是开放的。
-2. 使用 `mvn clean package -pl cluster -am -DskipTests` 编译分布式模块。
-3. 将打出来的包(iotdb-cluster-0.12.0-SNAPSHOT)传到所有的服务器上。
-4. 配置所有节点 conf/iotdb-cluster.properties 配置文件中的 seed_nodes 为
"A_private_ip:9003,B_private_ip:9003,C_private_ip:9003"
-5. 配置所有节点 conf/iotdb-cluster.properties 配置文件中的 internal_ip 为各自节点的 private_ip。
-6. 配置所有节点 conf/iotdb-engine.properties 配置文件中的 rpc_address 为各自节点的 public_ip。
-7. 在 3 个节点上分别运行 sh sbin/start-node.sh 即可(后台运行也可)。
\ No newline at end of file
+* 使用 `mvn clean package -pl cluster -am -DskipTests` 编译分布式模块或直接到
[官网](https://iotdb.apache.org/Download/) 下载最新版本。
+* 保证三个节点的 6567, 6667, 9003, 9004, 40010, 40011 和 31999 端口是开放的。
+
+* 将包上传到所有的服务器上。
+
+* 修改配置项。
+* 在 3 个节点上分别运行 sh sbin/start-node.sh 即可(后台运行也可)。
+
diff --git a/pom.xml b/pom.xml
index b5b2f17..81cc46d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -19,8 +19,7 @@
under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache</groupId>
@@ -32,9 +31,7 @@
<version>0.12.3-SNAPSHOT</version>
<packaging>pom</packaging>
<name>Apache IoTDB Project Parent POM</name>
- <description>This is the top level project that builds, packages the
tsfile, iotdb engine, jdbc, and integration
- libs.
- </description>
+ <description>This is the top level project that builds, packages the
tsfile, iotdb engine, jdbc, and integration libs.</description>
<licenses>
<license>
<name>The Apache License, Version 2.0</name>
@@ -150,8 +147,7 @@
<!-- Exclude all generated code -->
<sonar.exclusions>**/generated-sources</sonar.exclusions>
<sonar.java.checkstyle.reportPaths>target/checkstyle-report.xml</sonar.java.checkstyle.reportPaths>
-
<sonar.coverage.jacoco.xmlReportPaths>target/jacoco-merged-reports/jacoco.xml
- </sonar.coverage.jacoco.xmlReportPaths>
+
<sonar.coverage.jacoco.xmlReportPaths>target/jacoco-merged-reports/jacoco.xml</sonar.coverage.jacoco.xmlReportPaths>
<sonar.junit.reportPaths>target/surefire-reports,target/failsafe-reports</sonar.junit.reportPaths>
<!-- By default, the argLine is empty-->
<gson.version>2.8.6</gson.version>
@@ -960,9 +956,7 @@
</activation>
<properties>
<os.classifier>windows-x86_64</os.classifier>
- <thrift.download-url>
-
http://artfiles.org/apache.org/thrift/${thrift.version}/thrift-${thrift.version}.exe
- </thrift.download-url>
+
<thrift.download-url>http://artfiles.org/apache.org/thrift/${thrift.version}/thrift-${thrift.version}.exe</thrift.download-url>
<thrift.executable>thrift-${thrift.version}-win-x86_64.exe</thrift.executable>
<thrift.skip-making-executable>true</thrift.skip-making-executable>
<thrift.exec-cmd.executable>echo</thrift.exec-cmd.executable>
@@ -979,9 +973,7 @@
</activation>
<properties>
<os.classifier>linux-x86_64</os.classifier>
- <thrift.download-url>
-
https://github.com/apache/iotdb-bin-resources/raw/main/compile-tools/thrift-0.14-ubuntu
- </thrift.download-url>
+
<thrift.download-url>https://github.com/apache/iotdb-bin-resources/raw/main/compile-tools/thrift-0.14-ubuntu</thrift.download-url>
<thrift.executable>thrift_0.14.1_linux.exe</thrift.executable>
<thrift.skip-making-executable>false</thrift.skip-making-executable>
<thrift.exec-cmd.executable>chmod</thrift.exec-cmd.executable>
@@ -997,9 +989,7 @@
</activation>
<properties>
<os.classifier>mac-x86_64</os.classifier>
- <thrift.download-url>
-
https://github.com/apache/iotdb-bin-resources/raw/main/compile-tools/thrift-0.14-MacOS
- </thrift.download-url>
+
<thrift.download-url>https://github.com/apache/iotdb-bin-resources/raw/main/compile-tools/thrift-0.14-MacOS</thrift.download-url>
<thrift.executable>thrift_0.14.1_mac.exe</thrift.executable>
<thrift.skip-making-executable>false</thrift.skip-making-executable>
<thrift.exec-cmd.executable>chmod</thrift.exec-cmd.executable>
@@ -1049,8 +1039,7 @@
</file>
</activation>
<properties>
-
<thrift.exec.absolute.path>${project.build.directory}/tools/${thrift.executable}
- </thrift.exec.absolute.path>
+
<thrift.exec.absolute.path>${project.build.directory}/tools/${thrift.executable}</thrift.exec.absolute.path>
</properties>
<build>
<plugins>
@@ -1119,8 +1108,7 @@
<generator>py</generator>
<thriftExecutable>${thrift.exec.absolute.path}</thriftExecutable>
<thriftSourceRoot>${basedir}/src/main/thrift</thriftSourceRoot>
-
<outputDirectory>${project.build.directory}/generated-sources-python/
- </outputDirectory>
+
<outputDirectory>${project.build.directory}/generated-sources-python/</outputDirectory>
</configuration>
</execution>
<execution>
@@ -1272,9 +1260,7 @@
<goal>prepare-agent</goal>
</goals>
<configuration>
- <destFile>
-
${project.build.directory}/${project.build.finalName}-jacoco-unit-tests.exec
- </destFile>
+
<destFile>${project.build.directory}/${project.build.finalName}-jacoco-unit-tests.exec</destFile>
<propertyName>surefire.jacoco.args</propertyName>
</configuration>
</execution>
@@ -1287,9 +1273,7 @@
<goal>check</goal>
</goals>
<configuration>
- <dataFile>
-
${project.build.directory}/${project.build.finalName}-jacoco-unit-tests.exec
- </dataFile>
+
<dataFile>${project.build.directory}/${project.build.finalName}-jacoco-unit-tests.exec</dataFile>
<outputDirectory>${project.build.directory}/jacoco-unit-reports</outputDirectory>
</configuration>
</execution>
@@ -1301,9 +1285,7 @@
<goal>prepare-agent</goal>
</goals>
<configuration>
- <destFile>
-
${project.build.directory}/${project.build.finalName}-jacoco-integration-tests.exec
- </destFile>
+
<destFile>${project.build.directory}/${project.build.finalName}-jacoco-integration-tests.exec</destFile>
<propertyName>failsafe.jacoco.args</propertyName>
</configuration>
</execution>
@@ -1315,11 +1297,8 @@
<goal>check</goal>
</goals>
<configuration>
- <dataFile>
-
${project.build.directory}/${project.build.finalName}-jacoco-integration-tests.exec
- </dataFile>
-
<outputDirectory>${project.build.directory}/jacoco-integration-reports
- </outputDirectory>
+
<dataFile>${project.build.directory}/${project.build.finalName}-jacoco-integration-tests.exec</dataFile>
+
<outputDirectory>${project.build.directory}/jacoco-integration-reports</outputDirectory>
</configuration>
</execution>
<execution>
@@ -1337,8 +1316,7 @@
</includes>
</fileSet>
</fileSets>
-
<destFile>${project.build.directory}/${project.build.finalName}-merged.exec
- </destFile>
+
<destFile>${project.build.directory}/${project.build.finalName}-merged.exec</destFile>
</configuration>
</execution>
<execution>
@@ -1349,8 +1327,7 @@
<goal>check</goal>
</goals>
<configuration>
-
<dataFile>${project.build.directory}/${project.build.finalName}-merged.exec
- </dataFile>
+
<dataFile>${project.build.directory}/${project.build.finalName}-merged.exec</dataFile>
<outputDirectory>${project.build.directory}/jacoco-merged-reports</outputDirectory>
</configuration>
</execution>
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index d4363ad..d860a72 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -1407,7 +1407,16 @@ public class PlanExecutor implements IPlanExecutor {
|| insertMultiTabletPlan.isExecuted(i)) {
continue;
}
- insertTablet(insertMultiTabletPlan.getInsertTabletPlanList().get(i));
+ try {
+ insertTablet(insertMultiTabletPlan.getInsertTabletPlanList().get(i));
+ } catch (QueryProcessException e) {
+ insertMultiTabletPlan
+ .getResults()
+ .put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
+ }
+ }
+ if (!insertMultiTabletPlan.getResults().isEmpty()) {
+ throw new
BatchProcessException(insertMultiTabletPlan.getFailingStatus());
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/physical/BatchPlan.java
b/server/src/main/java/org/apache/iotdb/db/qp/physical/BatchPlan.java
index 318b628..493af85 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/BatchPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/BatchPlan.java
@@ -19,8 +19,10 @@
package org.apache.iotdb.db.qp.physical;
+import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import java.util.List;
import java.util.Map;
/** BatchPlan contains multiple sub-plans. */
@@ -61,4 +63,11 @@ public interface BatchPlan {
* @return execution status for each path
*/
Map<Integer, TSStatus> getResults();
+
+ /**
+ * Return prefix paths of all sub-plans
+ *
+ * @return prefix paths of all sub-plans
+ */
+ List<PartialPath> getPrefixPaths();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java
index 9cc807f..953fb78 100644
---
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java
+++
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java
@@ -23,16 +23,13 @@ import
org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.db.qp.physical.BatchPlan;
+import org.apache.iotdb.db.utils.StatusUtils;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.TreeMap;
+import java.util.*;
/**
* Mainly used in the distributed version, when multiple InsertTabletPlans
belong to a raft
@@ -126,6 +123,14 @@ public class InsertMultiTabletPlan extends InsertPlan
implements BatchPlan {
return result;
}
+ public List<PartialPath> getPrefixPaths() {
+ Set<PartialPath> result = new HashSet<>();
+ for (InsertTabletPlan insertTabletPlan : insertTabletPlanList) {
+ result.add(insertTabletPlan.getDeviceId());
+ }
+ return new ArrayList<>(result);
+ }
+
@Override
public long getMinTime() {
long minTime = Long.MAX_VALUE;
@@ -226,6 +231,10 @@ public class InsertMultiTabletPlan extends InsertPlan
implements BatchPlan {
return insertTabletPlanList;
}
+ public TSStatus[] getFailingStatus() {
+ return StatusUtils.getFailingStatus(results, insertTabletPlanList.size());
+ }
+
public void setResults(Map<Integer, TSStatus> results) {
this.results = results;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
index 8399527..e4b7eb1 100644
---
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
+++
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
@@ -246,6 +246,9 @@ public class InsertRowPlan extends InsertPlan {
}
failedValues.add(values[index]);
values[index] = null;
+ if (isNeedInferType) {
+ dataTypes[index] = null;
+ }
}
@Override
@@ -330,7 +333,7 @@ public class InsertRowPlan extends InsertPlan {
// and is forwarded to other nodes
if (dataTypes == null || dataTypes[i] == null) {
ReadWriteIOUtils.write(TYPE_RAW_STRING, outputStream);
- ReadWriteIOUtils.write((String) values[i], outputStream);
+ ReadWriteIOUtils.write(values[i].toString(), outputStream);
} else {
ReadWriteIOUtils.write(dataTypes[i], outputStream);
switch (dataTypes[i]) {
@@ -368,7 +371,7 @@ public class InsertRowPlan extends InsertPlan {
// and is forwarded to other nodes
if (dataTypes == null || dataTypes[i] == null) {
ReadWriteIOUtils.write(TYPE_RAW_STRING, buffer);
- ReadWriteIOUtils.write((String) values[i], buffer);
+ ReadWriteIOUtils.write(values[i].toString(), buffer);
} else {
ReadWriteIOUtils.write(dataTypes[i], buffer);
switch (dataTypes[i]) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
index 3f3177e..acf41a8 100644
---
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
+++
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
@@ -28,13 +28,7 @@ import org.apache.iotdb.service.rpc.thrift.TSStatus;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan
{
@@ -231,6 +225,11 @@ public class InsertRowsOfOneDevicePlan extends InsertPlan
implements BatchPlan {
}
@Override
+ public List<PartialPath> getPrefixPaths() {
+ return Collections.singletonList(this.deviceId);
+ }
+
+ @Override
public int getBatchSize() {
return rowPlans.length;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsPlan.java
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsPlan.java
index cc4685a..5c10901 100644
---
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsPlan.java
+++
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsPlan.java
@@ -29,11 +29,7 @@ import org.apache.iotdb.service.rpc.thrift.TSStatus;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
+import java.util.*;
public class InsertRowsPlan extends InsertPlan implements BatchPlan {
@@ -84,6 +80,15 @@ public class InsertRowsPlan extends InsertPlan implements
BatchPlan {
}
@Override
+ public List<PartialPath> getPrefixPaths() {
+ Set<PartialPath> result = new HashSet<>();
+ for (InsertRowPlan insertRowPlan : insertRowPlanList) {
+ result.add(insertRowPlan.getDeviceId());
+ }
+ return new ArrayList<>(result);
+ }
+
+ @Override
public void checkIntegrity() throws QueryProcessException {
if (insertRowPlanList.isEmpty()) {
throw new QueryProcessException("sub plan are empty.");
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java
b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java
index 0fecf01..2897334 100644
---
a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java
+++
b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java
@@ -34,12 +34,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.TreeMap;
+import java.util.*;
/**
* create multiple timeSeries, could be split to several sub Plans to execute
in different DataGroup
@@ -144,6 +139,11 @@ public class CreateMultiTimeSeriesPlan extends
PhysicalPlan implements BatchPlan
return results;
}
+ @Override
+ public List<PartialPath> getPrefixPaths() {
+ return Collections.emptyList();
+ }
+
public TSStatus[] getFailingStatus() {
return StatusUtils.getFailingStatus(results, paths.size());
}