This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch add_batch_insert
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/add_batch_insert by this push:
new 98b5818 prepare for FileNodeManager insertBatch
98b5818 is described below
commit 98b5818a927fee8c249c6bc8b87999423415d2ab
Author: qiaojialin <[email protected]>
AuthorDate: Sat May 25 15:31:51 2019 +0800
prepare for FileNodeManager insertBatch
---
.../iotdb/db/engine/filenode/FileNodeManager.java | 36 +++++++++++++++
.../db/qp/executor/IQueryProcessExecutor.java | 13 ++++++
.../iotdb/db/qp/executor/OverflowQPExecutor.java | 54 ++++++++++++++++++++++
.../org/apache/iotdb/db/service/TSServiceImpl.java | 32 +++++++++----
.../apache/iotdb/db/qp/utils/MemIntQpExecutor.java | 6 +++
5 files changed, 131 insertions(+), 10 deletions(-)
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
index 516abdc..1b5d65c 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.engine.filenode;
import java.io.File;
import java.io.IOException;
+import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -67,6 +68,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
import org.slf4j.Logger;
@@ -318,6 +320,40 @@ public class FileNodeManager implements IStatistic,
IService {
return insertType;
}
+ /**
+ *
+ * @param tsRecords
+ * @param partialResult
+ * @param message
+ * @param isMonitor
+ * @return
+ */
+ public Pair<List<Integer>, String> insertBatch(TSRecord[] tsRecords,
List<Integer> partialResult,
+ String message, boolean isMonitor) {
+ Map<String, List<Integer>> fileNode2RecordIndexes = new HashMap<>();
+ Map<String, List<TSRecord>> fileNode2Records = new HashMap<>();
+ for (int i = 0; i < tsRecords.length; i++) {
+ if (partialResult.get(i) == Statement.EXECUTE_FAILED) {
+ continue;
+ }
+ TSRecord record = tsRecords[i];
+ try {
+ checkTimestamp(record);
+ updateStat(isMonitor, record);
+ String fileNode =
MManager.getInstance().getFileNameByPath(record.deviceId);
+ fileNode2RecordIndexes.computeIfAbsent(fileNode, f -> new
ArrayList<>()).add(i);
+ fileNode2Records.computeIfAbsent(fileNode, f -> new
ArrayList<>()).add(record);
+ } catch (Exception e) {
+ e.printStackTrace();
+ partialResult.set(i, Statement.EXECUTE_FAILED);
+ message = e.getMessage();
+ }
+ }
+
+ return null;
+ }
+
+
private void writeLog(TSRecord tsRecord, boolean isMonitor, WriteLogNode
logNode)
throws FileNodeManagerException {
try {
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java
b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java
index 920aeef..ef758b2 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.fill.IFill;
@@ -45,6 +46,18 @@ public interface IQueryProcessExecutor {
*/
boolean processNonQuery(PhysicalPlan plan) throws ProcessorException;
+
+ /**
+ * process a batch insert plan, we group the insert plans by storage group
(FileNode).
+ * Then, for each storage group, we execute a batch of insertions.
+ *
+ * @param insertPlans InsertPlans
+ * @param partialResult the unauthorized plan is marked as
Statement.EXECUTE_FAILED
+ * @param message error message that already get
+ * @return result of each statement with
+ */
+ Pair<List<Integer>, String> processBatchInsert(InsertPlan[] insertPlans,
List<Integer> partialResult, String message);
+
/**
* process query plan of qp layer, construct queryDataSet.
*
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
index 9ca337e..6f26dd8 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.qp.executor;
import java.io.IOException;
+import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -162,6 +163,59 @@ public class OverflowQPExecutor extends
QueryProcessExecutor {
}
@Override
+ public Pair<List<Integer>, String> processBatchInsert(InsertPlan[]
insertPlans,
+ List<Integer> partialResult, String message) {
+ String msg = message;
+ TSRecord[] tsRecords = new TSRecord[insertPlans.length];
+
+ for (int i = 0; i < insertPlans.length; i++) {
+ // skip the bad plan
+ if (partialResult.get(i) == Statement.EXECUTE_FAILED) {
+ continue;
+ }
+ try {
+ boolean hasError = false;
+ InsertPlan insertPlan = insertPlans[i];
+ String deviceId = insertPlan.getDeviceId();
+ String[] measurementList = insertPlan.getMeasurements();
+ TSRecord tsRecord = new TSRecord(insertPlan.getTime(), deviceId);
+ MNode node = mManager.getNodeByDeviceIdFromCache(deviceId);
+ for (int j = 0; j < measurementList.length; j++) {
+ if (!node.hasChild(measurementList[j])) {
+ partialResult.set(i, Statement.EXECUTE_FAILED);
+ msg = String.format("Current deviceId[%s] does not contains
measurement:%s",
+ deviceId, measurementList[j]);
+ hasError = true;
+ break;
+ }
+ MNode measurementNode = node.getChild(measurementList[j]);
+ if (!measurementNode.isLeaf()) {
+ partialResult.set(i, Statement.EXECUTE_FAILED);
+ msg = String.format("Current Path is not leaf node. %s.%s",
deviceId,
+ measurementList[j]);
+ hasError = true;
+ break;
+ }
+
+ TSDataType dataType = measurementNode.getSchema().getType();
+ String value = insertPlan.getValues()[j];
+ value = checkValue(dataType, value);
+ DataPoint dataPoint = DataPoint.getDataPoint(dataType,
measurementList[i], value);
+ tsRecord.addTuple(dataPoint);
+ }
+ if (!hasError) {
+ tsRecords[i] = tsRecord;
+ }
+ } catch (Exception e) {
+ partialResult.set(i, Statement.EXECUTE_FAILED);
+ msg = e.getMessage();
+ }
+
+ }
+ return fileNodeManager.insertBatch(tsRecords, partialResult, msg, false);
+ }
+
+ @Override
public TSDataType getSeriesType(Path path) throws PathErrorException {
if (path.equals(SQLConstant.RESERVED_TIME)) {
return TSDataType.INT64;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 741567a..c89e41a 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -459,7 +459,8 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
}
if (allInsert) {
- Pair<List<Integer>, String> pair = executeBatchInsert(physicalPlans);
+ // execute batch insert
+ Pair<List<Integer>, String> pair =
executeBatchInsert((InsertPlan[])physicalPlans);
result = pair.left;
// only used when having failure
batchErrorMessage = pair.right;
@@ -467,6 +468,7 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
isAllSuccessful = false;
}
} else {
+ // execute one by one
for (int i = 0; i < physicalPlans.length; i++) {
PhysicalPlan physicalPlan = physicalPlans[i];
try {
@@ -507,17 +509,31 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
}
/**
- * @param physicalPlans
+ * @param insertPlans
* @return a list of return code and message
*/
- private Pair<List<Integer>, String> executeBatchInsert(PhysicalPlan[]
physicalPlans) {
- List<Integer> results = new ArrayList<>();
+ private Pair<List<Integer>, String> executeBatchInsert(InsertPlan[]
insertPlans) {
+ List<Integer> results = new ArrayList<>(insertPlans.length);
// null means all success
String message = null;
+ for (int i = 0; i < insertPlans.length; i++) {
+ PhysicalPlan physicalPlan = insertPlans[i];
+ List<Path> paths = physicalPlan.getPaths();
+ try {
+ if (!checkAuthorization(paths, physicalPlan)) {
+ results.set(i, Statement.EXECUTE_FAILED);
+ message = "No permissions for this operation " +
physicalPlan.getOperatorType();
+ }
+ } catch (AuthException e) {
+ LOGGER.error("meet error while checking authorization.", e);
+ results.set(i, Statement.EXECUTE_FAILED);
+ message = "Uninitialized authorizer " + e.getMessage();
+ }
+ }
- return new Pair<>(results, message);
+ return processor.getExecutor().processBatchInsert(insertPlans, results,
message);
}
@@ -793,7 +809,7 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
// Do we need to add extra information of executive condition
boolean execRet;
try {
- execRet = executeNonQuery(plan);
+ execRet = processor.getExecutor().processNonQuery(plan);
} catch (ProcessorException e) {
LOGGER.debug("meet error while processing non-query. {}",
e.getMessage());
return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS,
e.getMessage());
@@ -810,10 +826,6 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
return resp;
}
- protected boolean executeNonQuery(PhysicalPlan plan) throws
ProcessorException {
- return processor.getExecutor().processNonQuery(plan);
- }
-
private TSExecuteStatementResp executeUpdateStatement(String statement)
throws ProcessorException {
diff --git
a/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
b/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
index 71df644..bf839af 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
@@ -110,6 +110,12 @@ public class MemIntQpExecutor extends QueryProcessExecutor
{
}
@Override
+ public Pair<List<Integer>, String> processBatchInsert(InsertPlan[]
insertPlans,
+ List<Integer> partialResult, String message) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public QueryDataSet aggregate(List<Path> paths, List<String> aggres,
IExpression expression,
QueryContext context)
throws ProcessorException, IOException, PathErrorException,
FileNodeManagerException,