This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch add_batch_insert
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/add_batch_insert by this push:
     new 98b5818  prepare for FileNodeManager insertBatch
98b5818 is described below

commit 98b5818a927fee8c249c6bc8b87999423415d2ab
Author: qiaojialin <[email protected]>
AuthorDate: Sat May 25 15:31:51 2019 +0800

    prepare for FileNodeManager insertBatch
---
 .../iotdb/db/engine/filenode/FileNodeManager.java  | 36 +++++++++++++++
 .../db/qp/executor/IQueryProcessExecutor.java      | 13 ++++++
 .../iotdb/db/qp/executor/OverflowQPExecutor.java   | 54 ++++++++++++++++++++++
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 32 +++++++++----
 .../apache/iotdb/db/qp/utils/MemIntQpExecutor.java |  6 +++
 5 files changed, 131 insertions(+), 10 deletions(-)

diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
index 516abdc..1b5d65c 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.engine.filenode;
 
 import java.io.File;
 import java.io.IOException;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -67,6 +68,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.record.TSRecord;
 import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
 import org.slf4j.Logger;
@@ -318,6 +320,40 @@ public class FileNodeManager implements IStatistic, 
IService {
     return insertType;
   }
 
+  /**
+   *
+   * @param tsRecords
+   * @param partialResult
+   * @param message
+   * @param isMonitor
+   * @return
+   */
+  public Pair<List<Integer>, String> insertBatch(TSRecord[] tsRecords, 
List<Integer> partialResult,
+      String message, boolean isMonitor) {
+    Map<String, List<Integer>> fileNode2RecordIndexes = new HashMap<>();
+    Map<String, List<TSRecord>> fileNode2Records = new HashMap<>();
+    for (int i = 0; i < tsRecords.length; i++) {
+      if (partialResult.get(i) == Statement.EXECUTE_FAILED) {
+        continue;
+      }
+      TSRecord record = tsRecords[i];
+      try {
+        checkTimestamp(record);
+        updateStat(isMonitor, record);
+        String fileNode = 
MManager.getInstance().getFileNameByPath(record.deviceId);
+        fileNode2RecordIndexes.computeIfAbsent(fileNode, f -> new 
ArrayList<>()).add(i);
+        fileNode2Records.computeIfAbsent(fileNode, f -> new 
ArrayList<>()).add(record);
+      } catch (Exception e) {
+        e.printStackTrace();
+        partialResult.set(i, Statement.EXECUTE_FAILED);
+        message = e.getMessage();
+      }
+    }
+
+    return null;
+  }
+
+
   private void writeLog(TSRecord tsRecord, boolean isMonitor, WriteLogNode 
logNode)
       throws FileNodeManagerException {
     try {
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java
index 920aeef..ef758b2 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.fill.IFill;
@@ -45,6 +46,18 @@ public interface IQueryProcessExecutor {
    */
   boolean processNonQuery(PhysicalPlan plan) throws ProcessorException;
 
+
+  /**
+   * process a batch insert plan, we group the insert plans by storage group 
(FileNode).
+   * Then, for each storage group, we execute a batch of insertions.
+   *
+   * @param insertPlans InsertPlans
+   * @param partialResult the unauthorized plan is marked as 
Statement.EXECUTE_FAILED
+   * @param message error message that already get
+   * @return result of each statement with
+   */
+  Pair<List<Integer>, String> processBatchInsert(InsertPlan[] insertPlans, 
List<Integer> partialResult, String message);
+
   /**
    * process query plan of qp layer, construct queryDataSet.
    *
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java 
b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
index 9ca337e..6f26dd8 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.qp.executor;
 
 import java.io.IOException;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -162,6 +163,59 @@ public class OverflowQPExecutor extends 
QueryProcessExecutor {
   }
 
   @Override
+  public Pair<List<Integer>, String> processBatchInsert(InsertPlan[] 
insertPlans,
+      List<Integer> partialResult, String message) {
+    String msg = message;
+    TSRecord[] tsRecords = new TSRecord[insertPlans.length];
+
+    for (int i = 0; i < insertPlans.length; i++) {
+      // skip the bad plan
+      if (partialResult.get(i) == Statement.EXECUTE_FAILED) {
+        continue;
+      }
+      try {
+        boolean hasError = false;
+        InsertPlan insertPlan = insertPlans[i];
+        String deviceId = insertPlan.getDeviceId();
+        String[] measurementList = insertPlan.getMeasurements();
+        TSRecord tsRecord = new TSRecord(insertPlan.getTime(), deviceId);
+        MNode node = mManager.getNodeByDeviceIdFromCache(deviceId);
+        for (int j = 0; j < measurementList.length; j++) {
+          if (!node.hasChild(measurementList[j])) {
+            partialResult.set(i, Statement.EXECUTE_FAILED);
+            msg = String.format("Current deviceId[%s] does not contains 
measurement:%s",
+                deviceId, measurementList[j]);
+            hasError = true;
+            break;
+          }
+          MNode measurementNode = node.getChild(measurementList[j]);
+          if (!measurementNode.isLeaf()) {
+            partialResult.set(i, Statement.EXECUTE_FAILED);
+            msg = String.format("Current Path is not leaf node. %s.%s", 
deviceId,
+                measurementList[j]);
+            hasError = true;
+            break;
+          }
+
+          TSDataType dataType = measurementNode.getSchema().getType();
+          String value = insertPlan.getValues()[j];
+          value = checkValue(dataType, value);
+          DataPoint dataPoint = DataPoint.getDataPoint(dataType, 
measurementList[i], value);
+          tsRecord.addTuple(dataPoint);
+        }
+        if (!hasError) {
+          tsRecords[i] = tsRecord;
+        }
+      } catch (Exception e) {
+        partialResult.set(i, Statement.EXECUTE_FAILED);
+        msg = e.getMessage();
+      }
+
+    }
+    return fileNodeManager.insertBatch(tsRecords, partialResult, msg, false);
+  }
+
+  @Override
   public TSDataType getSeriesType(Path path) throws PathErrorException {
     if (path.equals(SQLConstant.RESERVED_TIME)) {
       return TSDataType.INT64;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java 
b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 741567a..c89e41a 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -459,7 +459,8 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
       }
 
       if (allInsert) {
-        Pair<List<Integer>, String> pair = executeBatchInsert(physicalPlans);
+        // execute batch insert
+        Pair<List<Integer>, String> pair = 
executeBatchInsert((InsertPlan[])physicalPlans);
         result = pair.left;
         // only used when having failure
         batchErrorMessage = pair.right;
@@ -467,6 +468,7 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
             isAllSuccessful = false;
         }
       } else {
+        // execute one by one
         for (int i = 0; i < physicalPlans.length; i++) {
           PhysicalPlan physicalPlan = physicalPlans[i];
           try {
@@ -507,17 +509,31 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
   }
 
   /**
-   * @param physicalPlans
+   * @param insertPlans
    * @return a list of return code and message
    */
-  private Pair<List<Integer>, String> executeBatchInsert(PhysicalPlan[] 
physicalPlans) {
-    List<Integer> results = new ArrayList<>();
+  private Pair<List<Integer>, String> executeBatchInsert(InsertPlan[] 
insertPlans) {
+    List<Integer> results = new ArrayList<>(insertPlans.length);
 
     // null means all success
     String message = null;
 
+    for (int i = 0; i < insertPlans.length; i++) {
+      PhysicalPlan physicalPlan = insertPlans[i];
+      List<Path> paths = physicalPlan.getPaths();
+      try {
+        if (!checkAuthorization(paths, physicalPlan)) {
+          results.set(i, Statement.EXECUTE_FAILED);
+          message = "No permissions for this operation " + 
physicalPlan.getOperatorType();
+        }
+      } catch (AuthException e) {
+        LOGGER.error("meet error while checking authorization.", e);
+        results.set(i, Statement.EXECUTE_FAILED);
+        message = "Uninitialized authorizer " + e.getMessage();
+      }
+    }
 
-    return new Pair<>(results, message);
+    return processor.getExecutor().processBatchInsert(insertPlans, results, 
message);
   }
 
 
@@ -793,7 +809,7 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
     // Do we need to add extra information of executive condition
     boolean execRet;
     try {
-      execRet = executeNonQuery(plan);
+      execRet = processor.getExecutor().processNonQuery(plan);
     } catch (ProcessorException e) {
       LOGGER.debug("meet error while processing non-query. {}", 
e.getMessage());
       return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS, 
e.getMessage());
@@ -810,10 +826,6 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
     return resp;
   }
 
-  protected boolean executeNonQuery(PhysicalPlan plan) throws 
ProcessorException {
-    return processor.getExecutor().processNonQuery(plan);
-  }
-
   private TSExecuteStatementResp executeUpdateStatement(String statement)
       throws ProcessorException {
 
diff --git 
a/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java 
b/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
index 71df644..bf839af 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
@@ -110,6 +110,12 @@ public class MemIntQpExecutor extends QueryProcessExecutor 
{
   }
 
   @Override
+  public Pair<List<Integer>, String> processBatchInsert(InsertPlan[] 
insertPlans,
+      List<Integer> partialResult, String message) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
   public QueryDataSet aggregate(List<Path> paths, List<String> aggres, 
IExpression expression,
       QueryContext context)
       throws ProcessorException, IOException, PathErrorException, 
FileNodeManagerException,

Reply via email to