This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch cluster_new
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/cluster_new by this push:
     new 4df949c  add cluster inserttablet
     new d8fe5bc  Merge pull request #1385 from 
LebronAl/cluster_new_add_cluster_inserttablet
4df949c is described below

commit 4df949c34e919d4de9689e38bacb982772867496
Author: LebronAl <[email protected]>
AuthorDate: Sun Jun 21 20:17:38 2020 +0800

    add cluster inserttablet
---
 .../java/org/apache/iotdb/cluster/ClientMain.java  |   6 +-
 .../iotdb/cluster/log/applier/BaseApplier.java     |  18 ++-
 .../iotdb/cluster/log/manage/RaftLogManager.java   |   2 +-
 .../iotdb/cluster/query/ClusterPlanExecutor.java   |  46 +++++--
 .../iotdb/cluster/query/ClusterPlanRouter.java     |   3 +-
 .../cluster/server/member/MetaGroupMember.java     | 144 +++++++++++++++------
 .../iotdb/cluster/server/member/RaftMember.java    |  14 +-
 .../apache/iotdb/cluster/utils/PartitionUtils.java |  22 +++-
 .../apache/iotdb/cluster/utils/StatusUtils.java    |   4 +-
 .../engine/storagegroup/StorageGroupProcessor.java |   5 +-
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |  98 ++++++++------
 .../db/qp/physical/crud/InsertTabletPlan.java      |  87 ++++++++++---
 .../main/java/org/apache/iotdb/rpc/RpcUtils.java   |   7 +
 13 files changed, 326 insertions(+), 130 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClientMain.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/ClientMain.java
index 423789c..4b6c660 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClientMain.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClientMain.java
@@ -31,7 +31,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Locale;
 import java.util.Map;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -61,7 +60,6 @@ import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.apache.iotdb.session.SessionDataSet;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.filter.operator.In;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.thrift.TException;
@@ -391,10 +389,10 @@ public class ClientMain {
     List<String> paths = new ArrayList<>();
     for (String measurement : MEASUREMENTS) {
       for (String device : DEVICES) {
-        paths.add(measurement + "." + device);
+        paths.add(device + "." + measurement);
       }
     }
-    client.deleteTimeseries(sessionId, paths);
+    logger.info(client.deleteTimeseries(sessionId, paths).toString());
   }
 
   private static int calculateStrLength(List<String> values) {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
index 291b382..985368f 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.utils.SchemaUtils;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.slf4j.Logger;
@@ -56,8 +57,8 @@ abstract class BaseApplier implements LogApplier {
 
   void applyPhysicalPlan(PhysicalPlan plan)
       throws QueryProcessException, StorageGroupNotSetException, 
StorageEngineException {
-    if (plan instanceof InsertPlan) {
-      processInsertPlan((InsertPlan) plan);
+    if (plan instanceof InsertPlan || plan instanceof InsertTabletPlan) {
+      processPlanWithTolerance(plan);
     } else if (!plan.isQuery()) {
       try {
         getQueryExecutor().processNonQuery(plan);
@@ -78,7 +79,8 @@ abstract class BaseApplier implements LogApplier {
     }
   }
 
-  private void processInsertPlan(InsertPlan plan)
+
+  private void processPlanWithTolerance(PhysicalPlan plan)
       throws QueryProcessException, StorageGroupNotSetException, 
StorageEngineException {
     try {
       getQueryExecutor().processNonQuery(plan);
@@ -94,11 +96,17 @@ abstract class BaseApplier implements LogApplier {
               metaGroupMember.getName(), e.getCause().getMessage());
         }
         try {
+          String path;
+          if (plan instanceof InsertPlan) {
+            path = ((InsertPlan) plan).getDeviceId();
+          } else {
+            path = ((InsertTabletPlan) plan).getDeviceId();
+          }
           List<MeasurementSchema> schemas = metaGroupMember
-              
.pullTimeSeriesSchemas(Collections.singletonList(plan.getDeviceId()));
+              .pullTimeSeriesSchemas(Collections.singletonList(path));
           for (MeasurementSchema schema : schemas) {
             registerMeasurement(
-                plan.getDeviceId() + IoTDBConstant.PATH_SEPARATOR + 
schema.getMeasurementId(),
+                path + IoTDBConstant.PATH_SEPARATOR + 
schema.getMeasurementId(),
                 schema);
           }
         } catch (MetadataException e1) {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index b694ed6..4d0a886 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -460,7 +460,7 @@ public class RaftLogManager {
         logApplier.apply(entry);
       } catch (Exception e) {
         if (ignoreExecutionException) {
-          logger.error("Cannot apply a log {} in snapshot, ignored", entry, e);
+          logger.error("Cannot apply a log {}, ignored", entry, e);
         } else {
           throw new LogExecutionException(e);
         }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
index 4493756..33c1c46 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
@@ -62,6 +62,7 @@ import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
 import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan;
@@ -599,6 +600,38 @@ public class ClusterPlanExecutor extends PlanExecutor {
     String[] measurementList = insertPlan.getMeasurements();
     String deviceId = insertPlan.getDeviceId();
 
+    if (getSeriesSchemas(deviceId, measurementList)) {
+      return super.getSeriesSchemas(insertPlan);
+    }
+
+    // some schemas does not exist locally, fetch them from the remote side
+    pullSeriesSchemas(deviceId, measurementList);
+
+    // we have pulled schemas as much as we can, those not pulled will depend 
on whether
+    // auto-creation is enabled
+    return super.getSeriesSchemas(insertPlan);
+  }
+
+  @Override
+  protected MeasurementSchema[] getSeriesSchemas(InsertTabletPlan 
insertTabletPlan)
+      throws MetadataException, QueryProcessException {
+    String[] measurementList = insertTabletPlan.getMeasurements();
+    String deviceId = insertTabletPlan.getDeviceId();
+
+    if (getSeriesSchemas(deviceId, measurementList)) {
+      return super.getSeriesSchemas(insertTabletPlan);
+    }
+
+    // some schemas does not exist locally, fetch them from the remote side
+    pullSeriesSchemas(deviceId, measurementList);
+
+    // we have pulled schemas as much as we can, those not pulled will depend 
on whether
+    // auto-creation is enabled
+    return super.getSeriesSchemas(insertTabletPlan);
+  }
+
+  public boolean getSeriesSchemas(String deviceId, String[] measurementList)
+      throws MetadataException {
     MNode node = null;
     boolean allSeriesExists = true;
     try {
@@ -621,12 +654,11 @@ public class ClusterPlanExecutor extends PlanExecutor {
         node.readUnlock();
       }
     }
+    return allSeriesExists;
+  }
 
-    if (allSeriesExists) {
-      return super.getSeriesSchemas(insertPlan);
-    }
-
-    // some schemas does not exist locally, fetch them from the remote side
+  public void pullSeriesSchemas(String deviceId, String[] measurementList)
+      throws MetadataException {
     List<String> schemasToPull = new ArrayList<>();
     for (String s : measurementList) {
       schemasToPull.add(deviceId + IoTDBConstant.PATH_SEPARATOR + s);
@@ -637,10 +669,6 @@ public class ClusterPlanExecutor extends PlanExecutor {
           .cacheSchema(deviceId + IoTDBConstant.PATH_SEPARATOR + 
schema.getMeasurementId(), schema);
     }
     logger.debug("Pulled {}/{} schemas from remote", schemas.size(), 
measurementList.length);
-
-    // we have pulled schemas as much as we can, those not pulled will depend 
on whether
-    // auto-creation is enabled
-    return super.getSeriesSchemas(insertPlan);
   }
 
   @Override
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
index e46a583..8403d24 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
@@ -156,7 +156,7 @@ public class ClusterPlanRouter {
     int startLoc = 0; //included
 
     Map<PartitionGroup, List<Integer>> splitMap = new HashMap<>();
-    //for each List in split, they are range1.start, range.end, range2.start, 
range2.end, ...
+    //for each List in split, they are range1.start, range1.end, range2.start, 
range2.end, ...
     for (int i = 1; i < times.length; i++) {// times are sorted in session API.
       if (times[i] >= endTime) {
         // a new range.
@@ -200,6 +200,7 @@ public class ClusterPlanRouter {
         destLoc += end - start;
       }
       InsertTabletPlan newBatch = PartitionUtils.copy(plan, subTimes, values);
+      newBatch.setRange(locs);
       result.put(newBatch, entry.getKey());
     }
     return result;
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index dfe7fde..45041dd 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -143,6 +143,7 @@ import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
 import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.context.QueryContext;
@@ -1488,8 +1489,8 @@ public class MetaGroupMember extends RaftMember 
implements TSMetaService.AsyncIf
       syncLeaderWithConsistencyCheck();
       List<PartitionGroup> globalGroups = partitionTable.getGlobalGroups();
       logger.debug("Forwarding global data plan {} to {} groups", plan, 
globalGroups.size());
-      return forwardPlan(plan, globalGroups);
-    }catch (CheckConsistencyException e) {
+      return forwardPlan(globalGroups, plan);
+    } catch (CheckConsistencyException e) {
       logger.debug("Forwarding global data plan {} to meta leader {}", plan, 
leader);
       return forwardPlan(plan, leader, null);
     }
@@ -1535,7 +1536,7 @@ public class MetaGroupMember extends RaftMember 
implements TSMetaService.AsyncIf
       return StatusUtils.NO_STORAGE_GROUP;
     }
     logger.debug("{}: The data groups of {} are {}", name, plan, planGroupMap);
-    return forwardPlan(planGroupMap);
+    return forwardPlan(planGroupMap, plan);
   }
 
   /**
@@ -1545,76 +1546,141 @@ public class MetaGroupMember extends RaftMember 
implements TSMetaService.AsyncIf
    * @param planGroupMap
    * @return
    */
-  TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap) {
-    TSStatus status;
+  TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap, 
PhysicalPlan plan) {
     // the error codes from the groups that cannot execute the plan
-    List<String> errorCodePartitionGroups = new ArrayList<>();
-    TSStatus subStatus = StatusUtils.OK;
-    for (Map.Entry<PhysicalPlan, PartitionGroup> entry : 
planGroupMap.entrySet()) {
+    TSStatus status;
+    if (planGroupMap.size() == 1) {
+      Map.Entry<PhysicalPlan, PartitionGroup> entry = 
planGroupMap.entrySet().iterator().next();
       if (entry.getValue().contains(thisNode)) {
         // the query should be handled by a group the local node is in, handle 
it with in the group
-        logger.debug("Execute {} in a local group of {}", entry.getKey(), 
entry.getValue().getHeader());
-        subStatus = getLocalDataMember(entry.getValue().getHeader())
+        logger.debug("Execute {} in a local group of {}", entry.getKey(),
+            entry.getValue().getHeader());
+        status = getLocalDataMember(entry.getValue().getHeader())
             .executeNonQuery(entry.getKey());
       } else {
         // forward the query to the group that should handle it
         logger.debug("Forward {} to a remote group of {}", entry.getKey(),
             entry.getValue().getHeader());
-        subStatus = forwardPlan(entry.getKey(), entry.getValue());
-      }
-      if (subStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        // execution failed, record the error message
-        errorCodePartitionGroups.add(String.format("[%s@%s:%s]",
-            subStatus.getCode(), entry.getValue().getHeader(),
-            subStatus.getMessage()));
+        status = forwardPlan(entry.getKey(), entry.getValue());
       }
-    }
-    if (errorCodePartitionGroups.size() <= 1) {
-      // when size = 0, no error occurs, the plan is successfully executed, 
return OK
-      // when size = 1, one error occurs, set status = subStatus and return
-      status = subStatus;
     } else {
-      status = StatusUtils.EXECUTE_STATEMENT_ERROR.deepCopy();
-      status.setMessage("The following errors occurred when executing the 
query, "
-          + "please retry or contact the DBA: " + 
errorCodePartitionGroups.toString());
+      TSStatus tmpStatus;
+      List<String> errorCodePartitionGroups = new ArrayList<>();
+      if (plan instanceof InsertTabletPlan) {
+        TSStatus[] subStatus = new TSStatus[((InsertTabletPlan) 
plan).getRowCount()];
+        boolean noFailure = true;
+        boolean isBatchFailure = false;
+        for (Map.Entry<PhysicalPlan, PartitionGroup> entry : 
planGroupMap.entrySet()) {
+          if (entry.getValue().contains(thisNode)) {
+            // the query should be handled by a group the local node is in, 
handle it with in the group
+            logger.debug("Execute {} in a local group of {}", entry.getKey(),
+                entry.getValue().getHeader());
+            tmpStatus = getLocalDataMember(entry.getValue().getHeader())
+                .executeNonQuery(entry.getKey());
+          } else {
+            // forward the query to the group that should handle it
+            logger.debug("Forward {} to a remote group of {}", entry.getKey(),
+                entry.getValue().getHeader());
+            tmpStatus = forwardPlan(entry.getKey(), entry.getValue());
+          }
+          logger.debug("{}: from {},{},{}", name, entry.getKey(), 
entry.getValue(), tmpStatus);
+          noFailure =
+              (tmpStatus.getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) && noFailure;
+          isBatchFailure = (tmpStatus.getCode() == 
TSStatusCode.MULTIPLE_ERROR.getStatusCode())
+              || isBatchFailure;
+          PartitionUtils.reordering((InsertTabletPlan) entry.getKey(), 
subStatus,
+              tmpStatus.subStatus == null ? RpcUtils
+                  .getStatus(((InsertTabletPlan) entry.getKey()).getRowCount())
+                  : tmpStatus.subStatus.toArray(new TSStatus[]{}));
+          if (tmpStatus.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+            // execution failed, record the error message
+            errorCodePartitionGroups.add(String.format("[%s@%s:%s:%s]",
+                tmpStatus.getCode(), entry.getValue().getHeader(),
+                tmpStatus.getMessage(), tmpStatus.subStatus));
+          }
+        }
+        if (noFailure) {
+          status = StatusUtils.OK;
+        } else if (isBatchFailure) {
+          status = RpcUtils.getStatus(Arrays.asList(subStatus));
+        } else {
+          status = StatusUtils.EXECUTE_STATEMENT_ERROR.deepCopy();
+          status.setMessage("The following errors occurred when executing the 
query, "
+              + "please retry or contact the DBA: " + 
errorCodePartitionGroups.toString());
+        }
+      } else {
+        for (Map.Entry<PhysicalPlan, PartitionGroup> entry : 
planGroupMap.entrySet()) {
+          if (entry.getValue().contains(thisNode)) {
+            // the query should be handled by a group the local node is in, 
handle it with in the group
+            logger.debug("Execute {} in a local group of {}", entry.getKey(),
+                entry.getValue().getHeader());
+            tmpStatus = getLocalDataMember(entry.getValue().getHeader())
+                .executeNonQuery(entry.getKey());
+          } else {
+            // forward the query to the group that should handle it
+            logger.debug("Forward {} to a remote group of {}", entry.getKey(),
+                entry.getValue().getHeader());
+            tmpStatus = forwardPlan(entry.getKey(), entry.getValue());
+          }
+          if (tmpStatus.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+            // execution failed, record the error message
+            errorCodePartitionGroups.add(String.format("[%s@%s:%s]",
+                tmpStatus.getCode(), entry.getValue().getHeader(),
+                tmpStatus.getMessage()));
+          }
+        }
+        if (errorCodePartitionGroups.size() == 0) {
+          status = StatusUtils.OK;
+        } else {
+          status = StatusUtils.EXECUTE_STATEMENT_ERROR.deepCopy();
+          status.setMessage("The following errors occurred when executing the 
query, "
+              + "please retry or contact the DBA: " + 
errorCodePartitionGroups.toString());
+        }
+      }
     }
+    logger.debug("{}: executed {} with answer {}", name, plan, status);
     return status;
   }
 
-  TSStatus forwardPlan(PhysicalPlan plan, List<PartitionGroup> 
partitionGroups) {
-    TSStatus status;
+  /**
+   * Forward plans to all DataGroupMember groups. Only when all nodes time 
out, will a TIME_OUT be
+   * returned.
+   *
+   * @param partitionGroups
+   * @return
+   * @para plan
+   */
+  TSStatus forwardPlan(List<PartitionGroup> partitionGroups, PhysicalPlan 
plan) {
     // the error codes from the groups that cannot execute the plan
+    TSStatus status;
     List<String> errorCodePartitionGroups = new ArrayList<>();
-    TSStatus subStatus = StatusUtils.OK;
     for (PartitionGroup partitionGroup : partitionGroups) {
       if (partitionGroup.contains(thisNode)) {
         // the query should be handled by a group the local node is in, handle 
it with in the group
-        logger.debug("Execute {} in a local group of {}", plan,
-            partitionGroup.getHeader());
-        subStatus = getLocalDataMember(partitionGroup.getHeader())
+        logger.debug("Execute {} in a local group of {}", plan, 
partitionGroup.getHeader());
+        status = getLocalDataMember(partitionGroup.getHeader())
             .executeNonQuery(plan);
       } else {
         // forward the query to the group that should handle it
         logger.debug("Forward {} to a remote group of {}", plan,
             partitionGroup.getHeader());
-        subStatus = forwardPlan(plan, partitionGroup);
+        status = forwardPlan(plan, partitionGroup);
       }
-      if (subStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         // execution failed, record the error message
         errorCodePartitionGroups.add(String.format("[%s@%s:%s]",
-            subStatus.getCode(), partitionGroup.getHeader(),
-            subStatus.getMessage()));
+            status.getCode(), partitionGroup.getHeader(),
+            status.getMessage()));
       }
     }
-    if (errorCodePartitionGroups.size() <= 1) {
-      // when size = 0, no error occurs, the plan is successfully executed, 
return OK
-      // when size = 1, one error occurs, set status = subStatus and return
-      status = subStatus;
+    if (errorCodePartitionGroups.size() == 0) {
+      status = StatusUtils.OK;
     } else {
       status = StatusUtils.EXECUTE_STATEMENT_ERROR.deepCopy();
       status.setMessage("The following errors occurred when executing the 
query, "
           + "please retry or contact the DBA: " + 
errorCodePartitionGroups.toString());
     }
+    logger.debug("{}: executed {} with answer {}", name, plan, status);
     return status;
   }
 
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 98b7c4e..2eaee35 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -27,6 +27,7 @@ import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.ConcurrentModificationException;
 import java.util.List;
@@ -73,6 +74,7 @@ import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.cluster.server.handlers.caller.AppendNodeEntryHandler;
 import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
 import org.apache.iotdb.cluster.utils.StatusUtils;
+import org.apache.iotdb.db.exception.BatchInsertionException;
 import org.apache.iotdb.db.exception.IoTDBException;
 import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
@@ -80,6 +82,7 @@ import 
org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.apache.thrift.TException;
 import org.apache.thrift.async.AsyncMethodCallback;
@@ -997,8 +1000,12 @@ public abstract class RaftMember implements 
RaftService.AsyncIface {
         return StatusUtils.OK;
       }
     } catch (LogExecutionException e) {
-      TSStatus tsStatus = StatusUtils.EXECUTE_STATEMENT_ERROR.deepCopy();
       Throwable cause = getRootCause(e);
+      if (cause instanceof BatchInsertionException) {
+        return RpcUtils
+            .getStatus(Arrays.asList(((BatchInsertionException) 
cause).getFailingStatus()));
+      }
+      TSStatus tsStatus = StatusUtils.EXECUTE_STATEMENT_ERROR.deepCopy();
       if (cause instanceof IoTDBException) {
         tsStatus.setCode(((IoTDBException) cause).getErrorCode());
       }
@@ -1083,8 +1090,9 @@ public abstract class RaftMember implements 
RaftService.AsyncIface {
     try {
       // process the plan locally
       PhysicalPlan plan = PhysicalPlan.Factory.create(request.planBytes);
-      logger.debug("{}: Received a plan {}", name, plan);
-      resultHandler.onComplete(executeNonQuery(plan));
+      TSStatus answer = executeNonQuery(plan);
+      logger.debug("{}: Received a plan {}, executed answer: {}", name, plan, 
answer);
+      resultHandler.onComplete(answer);
     } catch (Exception e) {
       resultHandler.onError(e);
     }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
index f371792..b0813e9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.cluster.utils;
 import static org.apache.iotdb.cluster.config.ClusterConstant.HASH_SALT;
 
 import java.util.ArrayList;
+import java.util.List;
 import java.util.Set;
 import org.apache.iotdb.cluster.partition.PartitionTable;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
@@ -40,6 +41,7 @@ import org.apache.iotdb.db.qp.physical.sys.OperateFilePlan;
 import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
 import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowTTLPlan;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
 import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeEq;
 import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeGt;
@@ -131,6 +133,17 @@ public class PartitionUtils {
     return newPlan;
   }
 
+  public static void reordering(InsertTabletPlan plan, TSStatus[] status, 
TSStatus[] subStatus) {
+    List<Integer> range = plan.getRange();
+    int destLoc = 0;
+    for (int i = 0; i < range.size(); i += 2) {
+      int start = range.get(i);
+      int end = range.get(i + 1);
+      System.arraycopy(subStatus, destLoc, status, start, end - start);
+      destLoc += end - start;
+    }
+  }
+
   public static Intervals extractTimeInterval(Filter filter) {
     if (filter == null) {
       return Intervals.ALL_INTERVAL;
@@ -301,10 +314,11 @@ public class PartitionUtils {
     }
 
     /**
-     * Merge an interval of [lowerBound, upperBound] with the last interval if 
they can be
-     * merged, or just add it as the last interval if its lowerBound is larger 
than the
-     * upperBound of the last interval. If the upperBound of the new interval 
is less than the
-     * lowerBound of the last interval, nothing will be done.
+     * Merge an interval of [lowerBound, upperBound] with the last interval if 
they can be merged,
+     * or just add it as the last interval if its lowerBound is larger than 
the upperBound of the
+     * last interval. If the upperBound of the new interval is less than the 
lowerBound of the last
+     * interval, nothing will be done.
+     *
      * @param lowerBound
      * @param upperBound
      */
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/StatusUtils.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/StatusUtils.java
index d136bd8..4be9208 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/StatusUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/StatusUtils.java
@@ -47,6 +47,9 @@ public class StatusUtils {
     TSStatus status = new TSStatus();
     status.setCode(statusCode.getStatusCode());
     switch (statusCode) {
+      case SUCCESS_STATUS:
+        status.setMessage("Executed successfully. ");
+        break;
       case TIME_OUT:
         status.setMessage("Request timed out. ");
         break;
@@ -60,7 +63,6 @@ public class StatusUtils {
         status
             .setMessage("Current node is read-only, please retry to find 
another available node. ");
         break;
-
       case INCOMPATIBLE_VERSION:
         status.setMessage("Incompatible version. ");
         break;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index f9cc0f6..a6ea2b9 100755
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -789,8 +789,7 @@ public class StorageGroupProcessor {
     return true;
   }
 
-  private void tryToUpdateBatchInsertLastCache(InsertTabletPlan plan, Long 
latestFlushedTime)
-      throws WriteProcessException {
+  private void tryToUpdateBatchInsertLastCache(InsertTabletPlan plan, Long 
latestFlushedTime) {
     MNode node = null;
     try {
       MManager manager = MManager.getInstance();
@@ -802,7 +801,7 @@ public class StorageGroupProcessor {
             .updateCachedLast(plan.composeLastTimeValuePair(i), true, 
latestFlushedTime);
       }
     } catch (MetadataException e) {
-      throw new WriteProcessException(e);
+      // skip last cache update if the local MTree does not contain the schema
     } finally {
       if (node != null) {
         node.readUnlock();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java 
b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 45ea1fd..b2f1313 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -960,6 +960,61 @@ public class PlanExecutor implements IPlanExecutor {
     return schemas;
   }
 
+  protected MeasurementSchema[] getSeriesSchemas(InsertTabletPlan 
insertTabletPlan)
+      throws MetadataException, QueryProcessException {
+    String[] measurementList = insertTabletPlan.getMeasurements();
+    String deviceId = insertTabletPlan.getDeviceId();
+    MeasurementSchema[] schemas = new 
MeasurementSchema[measurementList.length];
+
+    MNode node = null;
+    try {
+      node = mManager.getDeviceNodeWithAutoCreateAndReadLock(deviceId);
+      // To reduce the String number in memory, set the deviceId from MManager 
to insertPlan
+      insertTabletPlan.setDeviceId(node.getFullPath());
+    } catch (PathNotExistException e) {
+      // ignore
+    }
+    try {
+      TSDataType[] dataTypes = insertTabletPlan.getDataTypes();
+      IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
+      String measurement;
+      for (int i = 0; i < measurementList.length; i++) {
+        measurement = measurementList[i];
+        if (node == null) {
+          schemas[i] = mManager.getSeriesSchema(deviceId, measurement);
+        } else {
+          if (!node.hasChild(measurement)) {
+            if (!conf.isAutoCreateSchemaEnabled()) {
+              throw new QueryProcessException(String.format(
+                  "Current deviceId[%s] does not contain measurement:%s", 
deviceId, measurement));
+            }
+            Path path = new Path(deviceId, measurement);
+            TSDataType dataType = dataTypes[i];
+            internalCreateTimeseries(path.getFullPath(), dataType);
+          }
+          MeasurementMNode measurementNode = (MeasurementMNode) mManager
+              .getChild(node, measurement);
+
+          // check data type
+          if (measurementNode.getSchema().getType() != 
insertTabletPlan.getDataTypes()[i]) {
+            throw new QueryProcessException(String.format(
+                "Datatype mismatch, Insert measurement %s type %s, metadata 
tree type %s",
+                measurement, insertTabletPlan.getDataTypes()[i],
+                measurementNode.getSchema().getType()));
+          }
+          schemas[i] = measurementNode.getSchema();
+          // reset measurement to common name instead of alias
+          measurementList[i] = measurementNode.getName();
+        }
+      }
+    } finally {
+      if (node != null) {
+        node.readUnlock();
+      }
+    }
+    return schemas;
+  }
+
   /**
    * @param loc index of measurement in insertPlan
    */
@@ -1092,53 +1147,12 @@ public class PlanExecutor implements IPlanExecutor {
 
   @Override
   public void insertTablet(InsertTabletPlan insertTabletPlan) throws 
QueryProcessException {
-    MNode node = null;
     try {
-      String[] measurementList = insertTabletPlan.getMeasurements();
-      String deviceId = insertTabletPlan.getDeviceId();
-      node = mManager.getDeviceNodeWithAutoCreateAndReadLock(deviceId);
-      // To reduce the String number in memory, use the deviceId from MManager
-      deviceId = node.getFullPath();
-      insertTabletPlan.setDeviceId(deviceId);
-      TSDataType[] dataTypes = insertTabletPlan.getDataTypes();
-      IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
-      MeasurementSchema[] schemas = new 
MeasurementSchema[measurementList.length];
-
-      String measurement;
-      for (int i = 0; i < measurementList.length; i++) {
-        measurement = measurementList[i];
-        // check if timeseries exists
-        if (!node.hasChild(measurement)) {
-          if (!conf.isAutoCreateSchemaEnabled()) {
-            throw new QueryProcessException(String.format(
-                "Current deviceId[%s] does not contain measurement:%s", 
deviceId, measurement));
-          }
-          Path path = new Path(deviceId, measurement);
-          TSDataType dataType = dataTypes[i];
-          internalCreateTimeseries(path.getFullPath(), dataType);
-
-        }
-        MeasurementMNode measurementNode = (MeasurementMNode) 
mManager.getChild(node, measurement);
-
-        // check data type
-        if (measurementNode.getSchema().getType() != 
insertTabletPlan.getDataTypes()[i]) {
-          throw new QueryProcessException(String.format(
-              "Datatype mismatch, Insert measurement %s type %s, metadata tree 
type %s",
-              measurement, insertTabletPlan.getDataTypes()[i],
-              measurementNode.getSchema().getType()));
-        }
-        schemas[i] = measurementNode.getSchema();
-        // reset measurement to common name instead of alias
-        measurementList[i] = measurementNode.getName();
-      }
+      MeasurementSchema[] schemas = getSeriesSchemas(insertTabletPlan);
       insertTabletPlan.setSchemas(schemas);
       StorageEngine.getInstance().insertTablet(insertTabletPlan);
     } catch (StorageEngineException | MetadataException e) {
       throw new QueryProcessException(e);
-    } finally {
-      if (node != null) {
-        node.readUnlock();
-      }
     }
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
 
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
index 690ead1..66c8f66 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
@@ -22,6 +22,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
@@ -57,12 +58,15 @@ public class InsertTabletPlan extends PhysicalPlan {
   private Object[] columns;
   private ByteBuffer valueBuffer;
   private int rowCount = 0;
+  // indicate whether this plan has been set 'start' or 'end' in order to 
support plan transmission without data loss in cluster version
+  boolean isExecuting = false;
   // cached values
   private Long maxTime = null;
   private Long minTime = null;
   private List<Path> paths;
   private int start;
   private int end;
+  private List<Integer> range;
 
   public InsertTabletPlan() {
     super(false, OperatorType.BATCHINSERT);
@@ -92,6 +96,7 @@ public class InsertTabletPlan extends PhysicalPlan {
   }
 
   public void setStart(int start) {
+    this.isExecuting = true;
     this.start = start;
   }
 
@@ -100,9 +105,18 @@ public class InsertTabletPlan extends PhysicalPlan {
   }
 
   public void setEnd(int end) {
+    this.isExecuting = true;
     this.end = end;
   }
 
+  public List<Integer> getRange() {
+    return range;
+  }
+
+  public void setRange(List<Integer> range) {
+    this.range = range;
+  }
+
   @Override
   public List<Path> getPaths() {
     if (paths != null) {
@@ -132,11 +146,21 @@ public class InsertTabletPlan extends PhysicalPlan {
       stream.writeShort(dataType.serialize());
     }
 
-    stream.writeInt(end - start);
+    if (isExecuting) {
+      stream.writeInt(end - start);
+    } else {
+      stream.writeInt(rowCount);
+    }
 
     if (timeBuffer == null) {
-      for (int i = start; i < end; i++) {
-        stream.writeLong(times[i]);
+      if (isExecuting) {
+        for (int i = start; i < end; i++) {
+          stream.writeLong(times[i]);
+        }
+      } else {
+        for (long time : times) {
+          stream.writeLong(time);
+        }
       }
     } else {
       stream.write(timeBuffer.array());
@@ -168,11 +192,21 @@ public class InsertTabletPlan extends PhysicalPlan {
       dataType.serializeTo(buffer);
     }
 
-    buffer.putInt(end - start);
+    if (isExecuting) {
+      buffer.putInt(end - start);
+    } else {
+      buffer.putInt(rowCount);
+    }
 
     if (timeBuffer == null) {
-      for (int i = start; i < end; i++) {
-        buffer.putLong(times[i]);
+      if (isExecuting) {
+        for (int i = start; i < end; i++) {
+          buffer.putLong(times[i]);
+        }
+      } else {
+        for (long time : times) {
+          buffer.putLong(time);
+        }
       }
     } else {
       buffer.put(timeBuffer.array());
@@ -201,40 +235,42 @@ public class InsertTabletPlan extends PhysicalPlan {
 
   private void serializeColumn(TSDataType dataType, Object column, ByteBuffer 
buffer,
       int start, int end) {
+    int curStart = isExecuting ? start : 0;
+    int curEnd = isExecuting ? end : rowCount;
     switch (dataType) {
       case INT32:
         int[] intValues = (int[]) column;
-        for (int j = start; j < end; j++) {
+        for (int j = curStart; j < curEnd; j++) {
           buffer.putInt(intValues[j]);
         }
         break;
       case INT64:
         long[] longValues = (long[]) column;
-        for (int j = start; j < end; j++) {
+        for (int j = curStart; j < curEnd; j++) {
           buffer.putLong(longValues[j]);
         }
         break;
       case FLOAT:
         float[] floatValues = (float[]) column;
-        for (int j = start; j < end; j++) {
+        for (int j = curStart; j < curEnd; j++) {
           buffer.putFloat(floatValues[j]);
         }
         break;
       case DOUBLE:
         double[] doubleValues = (double[]) column;
-        for (int j = start; j < end; j++) {
+        for (int j = curStart; j < curEnd; j++) {
           buffer.putDouble(doubleValues[j]);
         }
         break;
       case BOOLEAN:
         boolean[] boolValues = (boolean[]) column;
-        for (int j = start; j < end; j++) {
+        for (int j = curStart; j < curEnd; j++) {
           buffer.putInt(BytesUtils.boolToByte(boolValues[j]));
         }
         break;
       case TEXT:
         Binary[] binaryValues = (Binary[]) column;
-        for (int j = start; j < end; j++) {
+        for (int j = curStart; j < curEnd; j++) {
           buffer.putInt(binaryValues[j].getLength());
           buffer.put(binaryValues[j].getValues());
         }
@@ -247,40 +283,42 @@ public class InsertTabletPlan extends PhysicalPlan {
 
   private void serializeColumn(TSDataType dataType, Object column, 
DataOutputStream outputStream,
       int start, int end) throws IOException {
+    int curStart = isExecuting ? start : 0;
+    int curEnd = isExecuting ? end : rowCount;
     switch (dataType) {
       case INT32:
         int[] intValues = (int[]) column;
-        for (int j = start; j < end; j++) {
+        for (int j = curStart; j < curEnd; j++) {
           outputStream.writeInt(intValues[j]);
         }
         break;
       case INT64:
         long[] longValues = (long[]) column;
-        for (int j = start; j < end; j++) {
+        for (int j = curStart; j < curEnd; j++) {
           outputStream.writeLong(longValues[j]);
         }
         break;
       case FLOAT:
         float[] floatValues = (float[]) column;
-        for (int j = start; j < end; j++) {
+        for (int j = curStart; j < curEnd; j++) {
           outputStream.writeFloat(floatValues[j]);
         }
         break;
       case DOUBLE:
         double[] doubleValues = (double[]) column;
-        for (int j = start; j < end; j++) {
+        for (int j = curStart; j < curEnd; j++) {
           outputStream.writeDouble(doubleValues[j]);
         }
         break;
       case BOOLEAN:
         boolean[] boolValues = (boolean[]) column;
-        for (int j = start; j < end; j++) {
+        for (int j = curStart; j < curEnd; j++) {
           outputStream.writeByte(BytesUtils.boolToByte(boolValues[j]));
         }
         break;
       case TEXT:
         Binary[] binaryValues = (Binary[]) column;
-        for (int j = start; j < end; j++) {
+        for (int j = curStart; j < curEnd; j++) {
           outputStream.writeInt(binaryValues[j].getLength());
           outputStream.write(binaryValues[j].getValues());
         }
@@ -460,4 +498,17 @@ public class InsertTabletPlan extends PhysicalPlan {
     this.rowCount = size;
   }
 
+  @Override
+  public String toString() {
+    return "InsertTabletPlan {" +
+        "deviceId:" + deviceId +
+        ", dataTypes:" + Arrays.toString(dataTypes) +
+        ", schemas:" + Arrays.toString(schemas) +
+        ", times:" + Arrays.toString(times) +
+        ", columns:" + Arrays.toString(columns) +
+        ", rowCount:" + rowCount +
+        ", start:" + start +
+        ", end:" + end +
+        '}';
+  }
 }
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java 
b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
index 2bd09c5..312128b 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.rpc;
 
 import java.lang.reflect.Proxy;
+import java.util.Arrays;
 import java.util.List;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
 import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
@@ -92,6 +93,12 @@ public class RpcUtils {
     return status;
   }
 
+  public static TSStatus[] getStatus(int length) {
+    TSStatus[] status = new TSStatus[length];
+    Arrays.fill(status, RpcUtils.SUCCESS_STATUS);
+    return status;
+  }
+
   public static TSExecuteStatementResp getTSExecuteStatementResp(TSStatusCode 
tsStatusCode) {
     TSStatus status = getStatus(tsStatusCode);
     return getTSExecuteStatementResp(status);

Reply via email to