This is an automated email from the ASF dual-hosted git repository. qiaojialin pushed a commit to branch add_batch_insert in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 03052d17993c0c857162d7af32173fc1af4a0fec Author: qiaojialin <[email protected]> AuthorDate: Fri May 24 19:56:34 2019 +0800 add batch --- .../org/apache/iotdb/db/service/TSServiceImpl.java | 103 ++++++++++++++------- 1 file changed, 69 insertions(+), 34 deletions(-) diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java index b02f1c2..2bea855 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java @@ -1,19 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with the License. You may obtain + * a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations * under the License. */ package org.apache.iotdb.db.service; @@ -48,6 +44,7 @@ import org.apache.iotdb.db.qp.QueryProcessor; import org.apache.iotdb.db.qp.executor.OverflowQPExecutor; import org.apache.iotdb.db.qp.logical.Operator; import org.apache.iotdb.db.qp.physical.PhysicalPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertPlan; import org.apache.iotdb.db.qp.physical.crud.QueryPlan; import org.apache.iotdb.db.qp.physical.sys.AuthorPlan; import org.apache.iotdb.db.query.context.QueryContext; @@ -84,6 +81,7 @@ import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; +import org.apache.iotdb.tsfile.utils.Pair; import org.apache.thrift.TException; import org.apache.thrift.server.ServerContext; import org.slf4j.Logger; @@ -444,30 +442,52 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { boolean isAllSuccessful = true; String batchErrorMessage = ""; - for (String statement : statements) { - try { - PhysicalPlan physicalPlan = processor.parseSQLToPhysicalPlan(statement, zoneIds.get()); - physicalPlan.setProposer(username.get()); - if (physicalPlan.isQuery()) { - return getTSBathExecuteStatementResp(TS_StatusCode.ERROR_STATUS, - "statement is query :" + statement, result); - } - TSExecuteStatementResp resp = executeUpdateStatement(physicalPlan); - if (resp.getStatus().getStatusCode().equals(TS_StatusCode.SUCCESS_STATUS)) { - result.add(Statement.SUCCESS_NO_INFO); - } else { + PhysicalPlan[] physicalPlans = new PhysicalPlan[statements.size()]; + boolean allInsert = true; + + for (int i = 0; i < physicalPlans.length; i++) { + PhysicalPlan physicalPlan = processor.parseSQLToPhysicalPlan(statements.get(i), zoneIds.get()); + physicalPlan.setProposer(username.get()); + physicalPlans[i] = physicalPlan; + if (!(physicalPlan instanceof InsertPlan)) { + allInsert = false; + } + } + + if (allInsert) { + Pair<List<Integer>, String> pair = executeBatchInsert(physicalPlans); + result = pair.left; + // only used when having failure + batchErrorMessage = pair.right; + if (batchErrorMessage != null) { + isAllSuccessful = false; + } + } else { + for (int i = 0; i < physicalPlans.length; i++) { + PhysicalPlan physicalPlan = physicalPlans[i]; + try { + physicalPlan.setProposer(username.get()); + if (physicalPlan.isQuery()) { + return getTSBathExecuteStatementResp(TS_StatusCode.ERROR_STATUS, + "statement is query :" + statements.get(i), result); + } + TSExecuteStatementResp resp = executeUpdateStatement(physicalPlan); + if (resp.getStatus().getStatusCode().equals(TS_StatusCode.SUCCESS_STATUS)) { + result.add(Statement.SUCCESS_NO_INFO); + } else { + result.add(Statement.EXECUTE_FAILED); + isAllSuccessful = false; + batchErrorMessage = resp.getStatus().getErrorMessage(); + } + } catch (Exception e) { + String errMessage = String.format( + "Fail to generate physical plan and execute for statement " + + "%s because %s", + statements.get(i), e.getMessage()); result.add(Statement.EXECUTE_FAILED); isAllSuccessful = false; - batchErrorMessage = resp.getStatus().getErrorMessage(); + batchErrorMessage = errMessage; } - } catch (Exception e) { - String errMessage = String.format( - "Fail to generate physcial plan and execute for statement " - + "%s beacuse %s", - statement, e.getMessage()); - result.add(Statement.EXECUTE_FAILED); - isAllSuccessful = false; - batchErrorMessage = errMessage; } } if (isAllSuccessful) { @@ -482,6 +502,21 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { } } + /** + * @param physicalPlans + * @return a list of return code and message + */ + private Pair<List<Integer>, String> executeBatchInsert(PhysicalPlan[] physicalPlans) { + List<Integer> results = new ArrayList<>(); + + // null means all success + String message = null; + + + return new Pair<>(results, message); + } + + @Override public TSExecuteStatementResp executeStatement(TSExecuteStatementReq req) throws TException { try {
