This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch cluster_new
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster_new by this push:
new 4df949c add cluster inserttablet
new d8fe5bc Merge pull request #1385 from
LebronAl/cluster_new_add_cluster_inserttablet
4df949c is described below
commit 4df949c34e919d4de9689e38bacb982772867496
Author: LebronAl <[email protected]>
AuthorDate: Sun Jun 21 20:17:38 2020 +0800
add cluster inserttablet
---
.../java/org/apache/iotdb/cluster/ClientMain.java | 6 +-
.../iotdb/cluster/log/applier/BaseApplier.java | 18 ++-
.../iotdb/cluster/log/manage/RaftLogManager.java | 2 +-
.../iotdb/cluster/query/ClusterPlanExecutor.java | 46 +++++--
.../iotdb/cluster/query/ClusterPlanRouter.java | 3 +-
.../cluster/server/member/MetaGroupMember.java | 144 +++++++++++++++------
.../iotdb/cluster/server/member/RaftMember.java | 14 +-
.../apache/iotdb/cluster/utils/PartitionUtils.java | 22 +++-
.../apache/iotdb/cluster/utils/StatusUtils.java | 4 +-
.../engine/storagegroup/StorageGroupProcessor.java | 5 +-
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 98 ++++++++------
.../db/qp/physical/crud/InsertTabletPlan.java | 87 ++++++++++---
.../main/java/org/apache/iotdb/rpc/RpcUtils.java | 7 +
13 files changed, 326 insertions(+), 130 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClientMain.java
b/cluster/src/main/java/org/apache/iotdb/cluster/ClientMain.java
index 423789c..4b6c660 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClientMain.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClientMain.java
@@ -31,7 +31,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
-import java.util.Locale;
import java.util.Map;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
@@ -61,7 +60,6 @@ import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.session.SessionDataSet;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.filter.operator.In;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.thrift.TException;
@@ -391,10 +389,10 @@ public class ClientMain {
List<String> paths = new ArrayList<>();
for (String measurement : MEASUREMENTS) {
for (String device : DEVICES) {
- paths.add(measurement + "." + device);
+ paths.add(device + "." + measurement);
}
}
- client.deleteTimeseries(sessionId, paths);
+ logger.info(client.deleteTimeseries(sessionId, paths).toString());
}
private static int calculateStrLength(List<String> values) {
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
index 291b382..985368f 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.utils.SchemaUtils;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
@@ -56,8 +57,8 @@ abstract class BaseApplier implements LogApplier {
void applyPhysicalPlan(PhysicalPlan plan)
throws QueryProcessException, StorageGroupNotSetException,
StorageEngineException {
- if (plan instanceof InsertPlan) {
- processInsertPlan((InsertPlan) plan);
+ if (plan instanceof InsertPlan || plan instanceof InsertTabletPlan) {
+ processPlanWithTolerance(plan);
} else if (!plan.isQuery()) {
try {
getQueryExecutor().processNonQuery(plan);
@@ -78,7 +79,8 @@ abstract class BaseApplier implements LogApplier {
}
}
- private void processInsertPlan(InsertPlan plan)
+
+ private void processPlanWithTolerance(PhysicalPlan plan)
throws QueryProcessException, StorageGroupNotSetException,
StorageEngineException {
try {
getQueryExecutor().processNonQuery(plan);
@@ -94,11 +96,17 @@ abstract class BaseApplier implements LogApplier {
metaGroupMember.getName(), e.getCause().getMessage());
}
try {
+ String path;
+ if (plan instanceof InsertPlan) {
+ path = ((InsertPlan) plan).getDeviceId();
+ } else {
+ path = ((InsertTabletPlan) plan).getDeviceId();
+ }
List<MeasurementSchema> schemas = metaGroupMember
-
.pullTimeSeriesSchemas(Collections.singletonList(plan.getDeviceId()));
+ .pullTimeSeriesSchemas(Collections.singletonList(path));
for (MeasurementSchema schema : schemas) {
registerMeasurement(
- plan.getDeviceId() + IoTDBConstant.PATH_SEPARATOR +
schema.getMeasurementId(),
+ path + IoTDBConstant.PATH_SEPARATOR +
schema.getMeasurementId(),
schema);
}
} catch (MetadataException e1) {
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index b694ed6..4d0a886 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -460,7 +460,7 @@ public class RaftLogManager {
logApplier.apply(entry);
} catch (Exception e) {
if (ignoreExecutionException) {
- logger.error("Cannot apply a log {} in snapshot, ignored", entry, e);
+ logger.error("Cannot apply a log {}, ignored", entry, e);
} else {
throw new LogExecutionException(e);
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
index 4493756..33c1c46 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
@@ -62,6 +62,7 @@ import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan;
@@ -599,6 +600,38 @@ public class ClusterPlanExecutor extends PlanExecutor {
String[] measurementList = insertPlan.getMeasurements();
String deviceId = insertPlan.getDeviceId();
+ if (getSeriesSchemas(deviceId, measurementList)) {
+ return super.getSeriesSchemas(insertPlan);
+ }
+
+ // some schemas does not exist locally, fetch them from the remote side
+ pullSeriesSchemas(deviceId, measurementList);
+
+ // we have pulled schemas as much as we can, those not pulled will depend
on whether
+ // auto-creation is enabled
+ return super.getSeriesSchemas(insertPlan);
+ }
+
+ @Override
+ protected MeasurementSchema[] getSeriesSchemas(InsertTabletPlan
insertTabletPlan)
+ throws MetadataException, QueryProcessException {
+ String[] measurementList = insertTabletPlan.getMeasurements();
+ String deviceId = insertTabletPlan.getDeviceId();
+
+ if (getSeriesSchemas(deviceId, measurementList)) {
+ return super.getSeriesSchemas(insertTabletPlan);
+ }
+
+ // some schemas does not exist locally, fetch them from the remote side
+ pullSeriesSchemas(deviceId, measurementList);
+
+ // we have pulled schemas as much as we can, those not pulled will depend
on whether
+ // auto-creation is enabled
+ return super.getSeriesSchemas(insertTabletPlan);
+ }
+
+ public boolean getSeriesSchemas(String deviceId, String[] measurementList)
+ throws MetadataException {
MNode node = null;
boolean allSeriesExists = true;
try {
@@ -621,12 +654,11 @@ public class ClusterPlanExecutor extends PlanExecutor {
node.readUnlock();
}
}
+ return allSeriesExists;
+ }
- if (allSeriesExists) {
- return super.getSeriesSchemas(insertPlan);
- }
-
- // some schemas does not exist locally, fetch them from the remote side
+ public void pullSeriesSchemas(String deviceId, String[] measurementList)
+ throws MetadataException {
List<String> schemasToPull = new ArrayList<>();
for (String s : measurementList) {
schemasToPull.add(deviceId + IoTDBConstant.PATH_SEPARATOR + s);
@@ -637,10 +669,6 @@ public class ClusterPlanExecutor extends PlanExecutor {
.cacheSchema(deviceId + IoTDBConstant.PATH_SEPARATOR +
schema.getMeasurementId(), schema);
}
logger.debug("Pulled {}/{} schemas from remote", schemas.size(),
measurementList.length);
-
- // we have pulled schemas as much as we can, those not pulled will depend
on whether
- // auto-creation is enabled
- return super.getSeriesSchemas(insertPlan);
}
@Override
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
index e46a583..8403d24 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
@@ -156,7 +156,7 @@ public class ClusterPlanRouter {
int startLoc = 0; //included
Map<PartitionGroup, List<Integer>> splitMap = new HashMap<>();
- //for each List in split, they are range1.start, range.end, range2.start,
range2.end, ...
+ //for each List in split, they are range1.start, range1.end, range2.start,
range2.end, ...
for (int i = 1; i < times.length; i++) {// times are sorted in session API.
if (times[i] >= endTime) {
// a new range.
@@ -200,6 +200,7 @@ public class ClusterPlanRouter {
destLoc += end - start;
}
InsertTabletPlan newBatch = PartitionUtils.copy(plan, subTimes, values);
+ newBatch.setRange(locs);
result.put(newBatch, entry.getKey());
}
return result;
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index dfe7fde..45041dd 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -143,6 +143,7 @@ import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.context.QueryContext;
@@ -1488,8 +1489,8 @@ public class MetaGroupMember extends RaftMember
implements TSMetaService.AsyncIf
syncLeaderWithConsistencyCheck();
List<PartitionGroup> globalGroups = partitionTable.getGlobalGroups();
logger.debug("Forwarding global data plan {} to {} groups", plan,
globalGroups.size());
- return forwardPlan(plan, globalGroups);
- }catch (CheckConsistencyException e) {
+ return forwardPlan(globalGroups, plan);
+ } catch (CheckConsistencyException e) {
logger.debug("Forwarding global data plan {} to meta leader {}", plan,
leader);
return forwardPlan(plan, leader, null);
}
@@ -1535,7 +1536,7 @@ public class MetaGroupMember extends RaftMember
implements TSMetaService.AsyncIf
return StatusUtils.NO_STORAGE_GROUP;
}
logger.debug("{}: The data groups of {} are {}", name, plan, planGroupMap);
- return forwardPlan(planGroupMap);
+ return forwardPlan(planGroupMap, plan);
}
/**
@@ -1545,76 +1546,141 @@ public class MetaGroupMember extends RaftMember
implements TSMetaService.AsyncIf
* @param planGroupMap
* @return
*/
- TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap) {
- TSStatus status;
+ TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap,
PhysicalPlan plan) {
// the error codes from the groups that cannot execute the plan
- List<String> errorCodePartitionGroups = new ArrayList<>();
- TSStatus subStatus = StatusUtils.OK;
- for (Map.Entry<PhysicalPlan, PartitionGroup> entry :
planGroupMap.entrySet()) {
+ TSStatus status;
+ if (planGroupMap.size() == 1) {
+ Map.Entry<PhysicalPlan, PartitionGroup> entry =
planGroupMap.entrySet().iterator().next();
if (entry.getValue().contains(thisNode)) {
// the query should be handled by a group the local node is in, handle
it with in the group
- logger.debug("Execute {} in a local group of {}", entry.getKey(),
entry.getValue().getHeader());
- subStatus = getLocalDataMember(entry.getValue().getHeader())
+ logger.debug("Execute {} in a local group of {}", entry.getKey(),
+ entry.getValue().getHeader());
+ status = getLocalDataMember(entry.getValue().getHeader())
.executeNonQuery(entry.getKey());
} else {
// forward the query to the group that should handle it
logger.debug("Forward {} to a remote group of {}", entry.getKey(),
entry.getValue().getHeader());
- subStatus = forwardPlan(entry.getKey(), entry.getValue());
- }
- if (subStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- // execution failed, record the error message
- errorCodePartitionGroups.add(String.format("[%s@%s:%s]",
- subStatus.getCode(), entry.getValue().getHeader(),
- subStatus.getMessage()));
+ status = forwardPlan(entry.getKey(), entry.getValue());
}
- }
- if (errorCodePartitionGroups.size() <= 1) {
- // when size = 0, no error occurs, the plan is successfully executed,
return OK
- // when size = 1, one error occurs, set status = subStatus and return
- status = subStatus;
} else {
- status = StatusUtils.EXECUTE_STATEMENT_ERROR.deepCopy();
- status.setMessage("The following errors occurred when executing the
query, "
- + "please retry or contact the DBA: " +
errorCodePartitionGroups.toString());
+ TSStatus tmpStatus;
+ List<String> errorCodePartitionGroups = new ArrayList<>();
+ if (plan instanceof InsertTabletPlan) {
+ TSStatus[] subStatus = new TSStatus[((InsertTabletPlan)
plan).getRowCount()];
+ boolean noFailure = true;
+ boolean isBatchFailure = false;
+ for (Map.Entry<PhysicalPlan, PartitionGroup> entry :
planGroupMap.entrySet()) {
+ if (entry.getValue().contains(thisNode)) {
+ // the query should be handled by a group the local node is in,
handle it with in the group
+ logger.debug("Execute {} in a local group of {}", entry.getKey(),
+ entry.getValue().getHeader());
+ tmpStatus = getLocalDataMember(entry.getValue().getHeader())
+ .executeNonQuery(entry.getKey());
+ } else {
+ // forward the query to the group that should handle it
+ logger.debug("Forward {} to a remote group of {}", entry.getKey(),
+ entry.getValue().getHeader());
+ tmpStatus = forwardPlan(entry.getKey(), entry.getValue());
+ }
+ logger.debug("{}: from {},{},{}", name, entry.getKey(),
entry.getValue(), tmpStatus);
+ noFailure =
+ (tmpStatus.getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) && noFailure;
+ isBatchFailure = (tmpStatus.getCode() ==
TSStatusCode.MULTIPLE_ERROR.getStatusCode())
+ || isBatchFailure;
+ PartitionUtils.reordering((InsertTabletPlan) entry.getKey(),
subStatus,
+ tmpStatus.subStatus == null ? RpcUtils
+ .getStatus(((InsertTabletPlan) entry.getKey()).getRowCount())
+ : tmpStatus.subStatus.toArray(new TSStatus[]{}));
+ if (tmpStatus.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ // execution failed, record the error message
+ errorCodePartitionGroups.add(String.format("[%s@%s:%s:%s]",
+ tmpStatus.getCode(), entry.getValue().getHeader(),
+ tmpStatus.getMessage(), tmpStatus.subStatus));
+ }
+ }
+ if (noFailure) {
+ status = StatusUtils.OK;
+ } else if (isBatchFailure) {
+ status = RpcUtils.getStatus(Arrays.asList(subStatus));
+ } else {
+ status = StatusUtils.EXECUTE_STATEMENT_ERROR.deepCopy();
+ status.setMessage("The following errors occurred when executing the
query, "
+ + "please retry or contact the DBA: " +
errorCodePartitionGroups.toString());
+ }
+ } else {
+ for (Map.Entry<PhysicalPlan, PartitionGroup> entry :
planGroupMap.entrySet()) {
+ if (entry.getValue().contains(thisNode)) {
+ // the query should be handled by a group the local node is in,
handle it with in the group
+ logger.debug("Execute {} in a local group of {}", entry.getKey(),
+ entry.getValue().getHeader());
+ tmpStatus = getLocalDataMember(entry.getValue().getHeader())
+ .executeNonQuery(entry.getKey());
+ } else {
+ // forward the query to the group that should handle it
+ logger.debug("Forward {} to a remote group of {}", entry.getKey(),
+ entry.getValue().getHeader());
+ tmpStatus = forwardPlan(entry.getKey(), entry.getValue());
+ }
+ if (tmpStatus.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ // execution failed, record the error message
+ errorCodePartitionGroups.add(String.format("[%s@%s:%s]",
+ tmpStatus.getCode(), entry.getValue().getHeader(),
+ tmpStatus.getMessage()));
+ }
+ }
+ if (errorCodePartitionGroups.size() == 0) {
+ status = StatusUtils.OK;
+ } else {
+ status = StatusUtils.EXECUTE_STATEMENT_ERROR.deepCopy();
+ status.setMessage("The following errors occurred when executing the
query, "
+ + "please retry or contact the DBA: " +
errorCodePartitionGroups.toString());
+ }
+ }
}
+ logger.debug("{}: executed {} with answer {}", name, plan, status);
return status;
}
- TSStatus forwardPlan(PhysicalPlan plan, List<PartitionGroup>
partitionGroups) {
- TSStatus status;
+ /**
+ * Forward plans to all DataGroupMember groups. Only when all nodes time
out, will a TIME_OUT be
+ * returned.
+ *
+ * @param partitionGroups
+ * @return
+ * @para plan
+ */
+ TSStatus forwardPlan(List<PartitionGroup> partitionGroups, PhysicalPlan
plan) {
// the error codes from the groups that cannot execute the plan
+ TSStatus status;
List<String> errorCodePartitionGroups = new ArrayList<>();
- TSStatus subStatus = StatusUtils.OK;
for (PartitionGroup partitionGroup : partitionGroups) {
if (partitionGroup.contains(thisNode)) {
// the query should be handled by a group the local node is in, handle
it with in the group
- logger.debug("Execute {} in a local group of {}", plan,
- partitionGroup.getHeader());
- subStatus = getLocalDataMember(partitionGroup.getHeader())
+ logger.debug("Execute {} in a local group of {}", plan,
partitionGroup.getHeader());
+ status = getLocalDataMember(partitionGroup.getHeader())
.executeNonQuery(plan);
} else {
// forward the query to the group that should handle it
logger.debug("Forward {} to a remote group of {}", plan,
partitionGroup.getHeader());
- subStatus = forwardPlan(plan, partitionGroup);
+ status = forwardPlan(plan, partitionGroup);
}
- if (subStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
// execution failed, record the error message
errorCodePartitionGroups.add(String.format("[%s@%s:%s]",
- subStatus.getCode(), partitionGroup.getHeader(),
- subStatus.getMessage()));
+ status.getCode(), partitionGroup.getHeader(),
+ status.getMessage()));
}
}
- if (errorCodePartitionGroups.size() <= 1) {
- // when size = 0, no error occurs, the plan is successfully executed,
return OK
- // when size = 1, one error occurs, set status = subStatus and return
- status = subStatus;
+ if (errorCodePartitionGroups.size() == 0) {
+ status = StatusUtils.OK;
} else {
status = StatusUtils.EXECUTE_STATEMENT_ERROR.deepCopy();
status.setMessage("The following errors occurred when executing the
query, "
+ "please retry or contact the DBA: " +
errorCodePartitionGroups.toString());
}
+ logger.debug("{}: executed {} with answer {}", name, plan, status);
return status;
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 98b7c4e..2eaee35 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -27,6 +27,7 @@ import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.ConcurrentModificationException;
import java.util.List;
@@ -73,6 +74,7 @@ import org.apache.iotdb.cluster.server.Response;
import org.apache.iotdb.cluster.server.handlers.caller.AppendNodeEntryHandler;
import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
import org.apache.iotdb.cluster.utils.StatusUtils;
+import org.apache.iotdb.db.exception.BatchInsertionException;
import org.apache.iotdb.db.exception.IoTDBException;
import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
@@ -80,6 +82,7 @@ import
org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
@@ -997,8 +1000,12 @@ public abstract class RaftMember implements
RaftService.AsyncIface {
return StatusUtils.OK;
}
} catch (LogExecutionException e) {
- TSStatus tsStatus = StatusUtils.EXECUTE_STATEMENT_ERROR.deepCopy();
Throwable cause = getRootCause(e);
+ if (cause instanceof BatchInsertionException) {
+ return RpcUtils
+ .getStatus(Arrays.asList(((BatchInsertionException)
cause).getFailingStatus()));
+ }
+ TSStatus tsStatus = StatusUtils.EXECUTE_STATEMENT_ERROR.deepCopy();
if (cause instanceof IoTDBException) {
tsStatus.setCode(((IoTDBException) cause).getErrorCode());
}
@@ -1083,8 +1090,9 @@ public abstract class RaftMember implements
RaftService.AsyncIface {
try {
// process the plan locally
PhysicalPlan plan = PhysicalPlan.Factory.create(request.planBytes);
- logger.debug("{}: Received a plan {}", name, plan);
- resultHandler.onComplete(executeNonQuery(plan));
+ TSStatus answer = executeNonQuery(plan);
+ logger.debug("{}: Received a plan {}, executed answer: {}", name, plan,
answer);
+ resultHandler.onComplete(answer);
} catch (Exception e) {
resultHandler.onError(e);
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
index f371792..b0813e9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.cluster.utils;
import static org.apache.iotdb.cluster.config.ClusterConstant.HASH_SALT;
import java.util.ArrayList;
+import java.util.List;
import java.util.Set;
import org.apache.iotdb.cluster.partition.PartitionTable;
import org.apache.iotdb.cluster.rpc.thrift.Node;
@@ -40,6 +41,7 @@ import org.apache.iotdb.db.qp.physical.sys.OperateFilePlan;
import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowTTLPlan;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeEq;
import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeGt;
@@ -131,6 +133,17 @@ public class PartitionUtils {
return newPlan;
}
+ public static void reordering(InsertTabletPlan plan, TSStatus[] status,
TSStatus[] subStatus) {
+ List<Integer> range = plan.getRange();
+ int destLoc = 0;
+ for (int i = 0; i < range.size(); i += 2) {
+ int start = range.get(i);
+ int end = range.get(i + 1);
+ System.arraycopy(subStatus, destLoc, status, start, end - start);
+ destLoc += end - start;
+ }
+ }
+
public static Intervals extractTimeInterval(Filter filter) {
if (filter == null) {
return Intervals.ALL_INTERVAL;
@@ -301,10 +314,11 @@ public class PartitionUtils {
}
/**
- * Merge an interval of [lowerBound, upperBound] with the last interval if
they can be
- * merged, or just add it as the last interval if its lowerBound is larger
than the
- * upperBound of the last interval. If the upperBound of the new interval
is less than the
- * lowerBound of the last interval, nothing will be done.
+ * Merge an interval of [lowerBound, upperBound] with the last interval if
they can be merged,
+ * or just add it as the last interval if its lowerBound is larger than
the upperBound of the
+ * last interval. If the upperBound of the new interval is less than the
lowerBound of the last
+ * interval, nothing will be done.
+ *
* @param lowerBound
* @param upperBound
*/
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/StatusUtils.java
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/StatusUtils.java
index d136bd8..4be9208 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/StatusUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/StatusUtils.java
@@ -47,6 +47,9 @@ public class StatusUtils {
TSStatus status = new TSStatus();
status.setCode(statusCode.getStatusCode());
switch (statusCode) {
+ case SUCCESS_STATUS:
+ status.setMessage("Executed successfully. ");
+ break;
case TIME_OUT:
status.setMessage("Request timed out. ");
break;
@@ -60,7 +63,6 @@ public class StatusUtils {
status
.setMessage("Current node is read-only, please retry to find
another available node. ");
break;
-
case INCOMPATIBLE_VERSION:
status.setMessage("Incompatible version. ");
break;
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index f9cc0f6..a6ea2b9 100755
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -789,8 +789,7 @@ public class StorageGroupProcessor {
return true;
}
- private void tryToUpdateBatchInsertLastCache(InsertTabletPlan plan, Long
latestFlushedTime)
- throws WriteProcessException {
+ private void tryToUpdateBatchInsertLastCache(InsertTabletPlan plan, Long
latestFlushedTime) {
MNode node = null;
try {
MManager manager = MManager.getInstance();
@@ -802,7 +801,7 @@ public class StorageGroupProcessor {
.updateCachedLast(plan.composeLastTimeValuePair(i), true,
latestFlushedTime);
}
} catch (MetadataException e) {
- throw new WriteProcessException(e);
+ // skip last cache update if the local MTree does not contain the schema
} finally {
if (node != null) {
node.readUnlock();
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 45ea1fd..b2f1313 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -960,6 +960,61 @@ public class PlanExecutor implements IPlanExecutor {
return schemas;
}
+ protected MeasurementSchema[] getSeriesSchemas(InsertTabletPlan
insertTabletPlan)
+ throws MetadataException, QueryProcessException {
+ String[] measurementList = insertTabletPlan.getMeasurements();
+ String deviceId = insertTabletPlan.getDeviceId();
+ MeasurementSchema[] schemas = new
MeasurementSchema[measurementList.length];
+
+ MNode node = null;
+ try {
+ node = mManager.getDeviceNodeWithAutoCreateAndReadLock(deviceId);
+ // To reduce the String number in memory, set the deviceId from MManager
to insertPlan
+ insertTabletPlan.setDeviceId(node.getFullPath());
+ } catch (PathNotExistException e) {
+ // ignore
+ }
+ try {
+ TSDataType[] dataTypes = insertTabletPlan.getDataTypes();
+ IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
+ String measurement;
+ for (int i = 0; i < measurementList.length; i++) {
+ measurement = measurementList[i];
+ if (node == null) {
+ schemas[i] = mManager.getSeriesSchema(deviceId, measurement);
+ } else {
+ if (!node.hasChild(measurement)) {
+ if (!conf.isAutoCreateSchemaEnabled()) {
+ throw new QueryProcessException(String.format(
+ "Current deviceId[%s] does not contain measurement:%s",
deviceId, measurement));
+ }
+ Path path = new Path(deviceId, measurement);
+ TSDataType dataType = dataTypes[i];
+ internalCreateTimeseries(path.getFullPath(), dataType);
+ }
+ MeasurementMNode measurementNode = (MeasurementMNode) mManager
+ .getChild(node, measurement);
+
+ // check data type
+ if (measurementNode.getSchema().getType() !=
insertTabletPlan.getDataTypes()[i]) {
+ throw new QueryProcessException(String.format(
+ "Datatype mismatch, Insert measurement %s type %s, metadata
tree type %s",
+ measurement, insertTabletPlan.getDataTypes()[i],
+ measurementNode.getSchema().getType()));
+ }
+ schemas[i] = measurementNode.getSchema();
+ // reset measurement to common name instead of alias
+ measurementList[i] = measurementNode.getName();
+ }
+ }
+ } finally {
+ if (node != null) {
+ node.readUnlock();
+ }
+ }
+ return schemas;
+ }
+
/**
* @param loc index of measurement in insertPlan
*/
@@ -1092,53 +1147,12 @@ public class PlanExecutor implements IPlanExecutor {
@Override
public void insertTablet(InsertTabletPlan insertTabletPlan) throws
QueryProcessException {
- MNode node = null;
try {
- String[] measurementList = insertTabletPlan.getMeasurements();
- String deviceId = insertTabletPlan.getDeviceId();
- node = mManager.getDeviceNodeWithAutoCreateAndReadLock(deviceId);
- // To reduce the String number in memory, use the deviceId from MManager
- deviceId = node.getFullPath();
- insertTabletPlan.setDeviceId(deviceId);
- TSDataType[] dataTypes = insertTabletPlan.getDataTypes();
- IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
- MeasurementSchema[] schemas = new
MeasurementSchema[measurementList.length];
-
- String measurement;
- for (int i = 0; i < measurementList.length; i++) {
- measurement = measurementList[i];
- // check if timeseries exists
- if (!node.hasChild(measurement)) {
- if (!conf.isAutoCreateSchemaEnabled()) {
- throw new QueryProcessException(String.format(
- "Current deviceId[%s] does not contain measurement:%s",
deviceId, measurement));
- }
- Path path = new Path(deviceId, measurement);
- TSDataType dataType = dataTypes[i];
- internalCreateTimeseries(path.getFullPath(), dataType);
-
- }
- MeasurementMNode measurementNode = (MeasurementMNode)
mManager.getChild(node, measurement);
-
- // check data type
- if (measurementNode.getSchema().getType() !=
insertTabletPlan.getDataTypes()[i]) {
- throw new QueryProcessException(String.format(
- "Datatype mismatch, Insert measurement %s type %s, metadata tree
type %s",
- measurement, insertTabletPlan.getDataTypes()[i],
- measurementNode.getSchema().getType()));
- }
- schemas[i] = measurementNode.getSchema();
- // reset measurement to common name instead of alias
- measurementList[i] = measurementNode.getName();
- }
+ MeasurementSchema[] schemas = getSeriesSchemas(insertTabletPlan);
insertTabletPlan.setSchemas(schemas);
StorageEngine.getInstance().insertTablet(insertTabletPlan);
} catch (StorageEngineException | MetadataException e) {
throw new QueryProcessException(e);
- } finally {
- if (node != null) {
- node.readUnlock();
- }
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
index 690ead1..66c8f66 100644
---
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
+++
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
@@ -22,6 +22,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
@@ -57,12 +58,15 @@ public class InsertTabletPlan extends PhysicalPlan {
private Object[] columns;
private ByteBuffer valueBuffer;
private int rowCount = 0;
+ // indicate whether this plan has been set 'start' or 'end' in order to
support plan transmission without data loss in cluster version
+ boolean isExecuting = false;
// cached values
private Long maxTime = null;
private Long minTime = null;
private List<Path> paths;
private int start;
private int end;
+ private List<Integer> range;
public InsertTabletPlan() {
super(false, OperatorType.BATCHINSERT);
@@ -92,6 +96,7 @@ public class InsertTabletPlan extends PhysicalPlan {
}
public void setStart(int start) {
+ this.isExecuting = true;
this.start = start;
}
@@ -100,9 +105,18 @@ public class InsertTabletPlan extends PhysicalPlan {
}
public void setEnd(int end) {
+ this.isExecuting = true;
this.end = end;
}
+ public List<Integer> getRange() {
+ return range;
+ }
+
+ public void setRange(List<Integer> range) {
+ this.range = range;
+ }
+
@Override
public List<Path> getPaths() {
if (paths != null) {
@@ -132,11 +146,21 @@ public class InsertTabletPlan extends PhysicalPlan {
stream.writeShort(dataType.serialize());
}
- stream.writeInt(end - start);
+ if (isExecuting) {
+ stream.writeInt(end - start);
+ } else {
+ stream.writeInt(rowCount);
+ }
if (timeBuffer == null) {
- for (int i = start; i < end; i++) {
- stream.writeLong(times[i]);
+ if (isExecuting) {
+ for (int i = start; i < end; i++) {
+ stream.writeLong(times[i]);
+ }
+ } else {
+ for (long time : times) {
+ stream.writeLong(time);
+ }
}
} else {
stream.write(timeBuffer.array());
@@ -168,11 +192,21 @@ public class InsertTabletPlan extends PhysicalPlan {
dataType.serializeTo(buffer);
}
- buffer.putInt(end - start);
+ if (isExecuting) {
+ buffer.putInt(end - start);
+ } else {
+ buffer.putInt(rowCount);
+ }
if (timeBuffer == null) {
- for (int i = start; i < end; i++) {
- buffer.putLong(times[i]);
+ if (isExecuting) {
+ for (int i = start; i < end; i++) {
+ buffer.putLong(times[i]);
+ }
+ } else {
+ for (long time : times) {
+ buffer.putLong(time);
+ }
}
} else {
buffer.put(timeBuffer.array());
@@ -201,40 +235,42 @@ public class InsertTabletPlan extends PhysicalPlan {
private void serializeColumn(TSDataType dataType, Object column, ByteBuffer
buffer,
int start, int end) {
+ int curStart = isExecuting ? start : 0;
+ int curEnd = isExecuting ? end : rowCount;
switch (dataType) {
case INT32:
int[] intValues = (int[]) column;
- for (int j = start; j < end; j++) {
+ for (int j = curStart; j < curEnd; j++) {
buffer.putInt(intValues[j]);
}
break;
case INT64:
long[] longValues = (long[]) column;
- for (int j = start; j < end; j++) {
+ for (int j = curStart; j < curEnd; j++) {
buffer.putLong(longValues[j]);
}
break;
case FLOAT:
float[] floatValues = (float[]) column;
- for (int j = start; j < end; j++) {
+ for (int j = curStart; j < curEnd; j++) {
buffer.putFloat(floatValues[j]);
}
break;
case DOUBLE:
double[] doubleValues = (double[]) column;
- for (int j = start; j < end; j++) {
+ for (int j = curStart; j < curEnd; j++) {
buffer.putDouble(doubleValues[j]);
}
break;
case BOOLEAN:
boolean[] boolValues = (boolean[]) column;
- for (int j = start; j < end; j++) {
+ for (int j = curStart; j < curEnd; j++) {
buffer.putInt(BytesUtils.boolToByte(boolValues[j]));
}
break;
case TEXT:
Binary[] binaryValues = (Binary[]) column;
- for (int j = start; j < end; j++) {
+ for (int j = curStart; j < curEnd; j++) {
buffer.putInt(binaryValues[j].getLength());
buffer.put(binaryValues[j].getValues());
}
@@ -247,40 +283,42 @@ public class InsertTabletPlan extends PhysicalPlan {
private void serializeColumn(TSDataType dataType, Object column,
DataOutputStream outputStream,
int start, int end) throws IOException {
+ int curStart = isExecuting ? start : 0;
+ int curEnd = isExecuting ? end : rowCount;
switch (dataType) {
case INT32:
int[] intValues = (int[]) column;
- for (int j = start; j < end; j++) {
+ for (int j = curStart; j < curEnd; j++) {
outputStream.writeInt(intValues[j]);
}
break;
case INT64:
long[] longValues = (long[]) column;
- for (int j = start; j < end; j++) {
+ for (int j = curStart; j < curEnd; j++) {
outputStream.writeLong(longValues[j]);
}
break;
case FLOAT:
float[] floatValues = (float[]) column;
- for (int j = start; j < end; j++) {
+ for (int j = curStart; j < curEnd; j++) {
outputStream.writeFloat(floatValues[j]);
}
break;
case DOUBLE:
double[] doubleValues = (double[]) column;
- for (int j = start; j < end; j++) {
+ for (int j = curStart; j < curEnd; j++) {
outputStream.writeDouble(doubleValues[j]);
}
break;
case BOOLEAN:
boolean[] boolValues = (boolean[]) column;
- for (int j = start; j < end; j++) {
+ for (int j = curStart; j < curEnd; j++) {
outputStream.writeByte(BytesUtils.boolToByte(boolValues[j]));
}
break;
case TEXT:
Binary[] binaryValues = (Binary[]) column;
- for (int j = start; j < end; j++) {
+ for (int j = curStart; j < curEnd; j++) {
outputStream.writeInt(binaryValues[j].getLength());
outputStream.write(binaryValues[j].getValues());
}
@@ -460,4 +498,17 @@ public class InsertTabletPlan extends PhysicalPlan {
this.rowCount = size;
}
+ @Override
+ public String toString() {
+ return "InsertTabletPlan {" +
+ "deviceId:" + deviceId +
+ ", dataTypes:" + Arrays.toString(dataTypes) +
+ ", schemas:" + Arrays.toString(schemas) +
+ ", times:" + Arrays.toString(times) +
+ ", columns:" + Arrays.toString(columns) +
+ ", rowCount:" + rowCount +
+ ", start:" + start +
+ ", end:" + end +
+ '}';
+ }
}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
index 2bd09c5..312128b 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.rpc;
import java.lang.reflect.Proxy;
+import java.util.Arrays;
import java.util.List;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
@@ -92,6 +93,12 @@ public class RpcUtils {
return status;
}
+ public static TSStatus[] getStatus(int length) {
+ TSStatus[] status = new TSStatus[length];
+ Arrays.fill(status, RpcUtils.SUCCESS_STATUS);
+ return status;
+ }
+
public static TSExecuteStatementResp getTSExecuteStatementResp(TSStatusCode
tsStatusCode) {
TSStatus status = getStatus(tsStatusCode);
return getTSExecuteStatementResp(status);