This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new fe73244 Refactor the execution path of InsertTabletPlan (#1361)
fe73244 is described below
commit fe73244ed5227c66dbeda8bb0a71346bcb2614c0
Author: Jiang Tian <[email protected]>
AuthorDate: Tue Jun 16 06:37:48 2020 -0500
Refactor the execution path of InsertTabletPlan (#1361)
* refactor the execution path of InsertTabletPlan
* set message when executing batch errored
---
.../main/java/org/apache/iotdb/SessionExample.java | 6 +-
.../java/org/apache/iotdb/jdbc/IoTDBStatement.java | 20 ++--
.../test/java/org/apache/iotdb/jdbc/BatchTest.java | 27 ++++--
.../org/apache/iotdb/db/engine/StorageEngine.java | 12 +--
.../engine/storagegroup/StorageGroupProcessor.java | 44 ++++++---
.../db/exception/BatchInsertionException.java | 28 +++---
.../apache/iotdb/db/qp/executor/IPlanExecutor.java | 4 +-
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 8 +-
.../org/apache/iotdb/db/service/TSServiceImpl.java | 101 +++++++--------------
service-rpc/rpc-changelist.md | 21 +++++
.../apache/iotdb/rpc/BatchExecutionException.java | 3 +-
.../main/java/org/apache/iotdb/rpc/RpcUtils.java | 39 +++-----
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 +
service-rpc/src/main/thrift/rpc.thrift | 22 ++---
.../java/org/apache/iotdb/session/Session.java | 48 +++++-----
.../org/apache/iotdb/session/pool/SessionPool.java | 40 ++++----
.../org/apache/iotdb/session/IoTDBSessionIT.java | 12 +--
17 files changed, 222 insertions(+), 214 deletions(-)
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 01b05e5..3a63fec 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -186,7 +186,7 @@ public class SessionExample {
}
}
- private static void insertRecords() throws IoTDBConnectionException,
BatchExecutionException {
+ private static void insertRecords() throws IoTDBConnectionException,
StatementExecutionException {
String deviceId = "root.sg1.d1";
List<String> measurements = new ArrayList<>();
measurements.add("s1");
@@ -237,7 +237,7 @@ public class SessionExample {
*
* Users need to control the count of Tablet and write a batch when it
reaches the maxBatchSize
*/
- private static void insertTablet() throws IoTDBConnectionException,
BatchExecutionException {
+ private static void insertTablet() throws IoTDBConnectionException,
StatementExecutionException {
// The schema of sensors of one device
List<MeasurementSchema> schemaList = new ArrayList<>();
schemaList.add(new MeasurementSchema("s1", TSDataType.INT64,
TSEncoding.RLE));
@@ -268,7 +268,7 @@ public class SessionExample {
}
}
- private static void insertTablets() throws IoTDBConnectionException,
BatchExecutionException {
+ private static void insertTablets() throws IoTDBConnectionException,
StatementExecutionException {
// The schema of sensors of one device
List<MeasurementSchema> schemaList = new ArrayList<>();
schemaList.add(new MeasurementSchema("s1", TSDataType.INT64,
TSEncoding.RLE));
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
index c38225b..1b21c5c 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
import org.apache.iotdb.service.rpc.thrift.TSIService;
@@ -261,15 +260,22 @@ public class IoTDBStatement implements Statement {
private int[] executeBatchSQL() throws TException, BatchUpdateException {
isCancelled = false;
TSExecuteBatchStatementReq execReq = new
TSExecuteBatchStatementReq(sessionId, batchSQLList);
- TSExecuteBatchStatementResp execResp =
client.executeBatchStatement(execReq);
- int[] result = new int[execResp.statusList.size()];
+ TSStatus execResp = client.executeBatchStatement(execReq);
+ int[] result = new int[batchSQLList.size()];
boolean allSuccess = true;
String message = "";
for (int i = 0; i < result.length; i++) {
- result[i] = execResp.statusList.get(i).code;
- if (result[i] != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- allSuccess = false;
- message = execResp.statusList.get(i).message;
+ if (execResp.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+ result[i] = execResp.getSubStatus().get(i).code;
+ if (result[i] != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ allSuccess = false;
+ message = execResp.getSubStatus().get(i).message;
+ }
+ } else {
+ allSuccess =
+ allSuccess && execResp.getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode();
+ result[i] = execResp.getCode();
+ message = execResp.getMessage();
}
}
if (!allSuccess) {
diff --git a/jdbc/src/test/java/org/apache/iotdb/jdbc/BatchTest.java
b/jdbc/src/test/java/org/apache/iotdb/jdbc/BatchTest.java
index 814ad12..01d7c9c 100644
--- a/jdbc/src/test/java/org/apache/iotdb/jdbc/BatchTest.java
+++ b/jdbc/src/test/java/org/apache/iotdb/jdbc/BatchTest.java
@@ -28,11 +28,11 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.time.ZoneId;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp;
import org.apache.iotdb.service.rpc.thrift.TSIService;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.thrift.TException;
@@ -52,7 +52,7 @@ public class BatchTest {
@Mock
private IoTDBStatement statement;
private TSStatus errorStatus =
RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR);
- private TSExecuteBatchStatementResp resp;
+ private TSStatus resp;
private ZoneId zoneID = ZoneId.systemDefault();
@Before
@@ -71,8 +71,10 @@ public class BatchTest {
@Test
public void testExecuteBatchSQL1() throws SQLException, TException {
Statement statement = connection.createStatement();
- resp = new TSExecuteBatchStatementResp();
- resp =
RpcUtils.getTSBatchExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
+ statement.addBatch("sql1");
+ resp = new TSStatus();
+ resp =
+
RpcUtils.getStatus(Collections.singletonList(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)));
when(client.executeBatchStatement(any(TSExecuteBatchStatementReq.class))).thenReturn(resp);
int[] result = statement.executeBatch();
assertEquals(1, result.length);
@@ -90,8 +92,9 @@ public class BatchTest {
add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
}
};
- resp.setStatusList(resExpected);
+ resp.setSubStatus(resExpected);
+ statement.clearBatch();
statement.addBatch("SET STORAGE GROUP TO root.ln.wf01.wt01");
statement.addBatch(
"CREATE TIMESERIES root.ln.wf01.wt01.status WITH DATATYPE=BOOLEAN,
ENCODING=PLAIN");
@@ -110,8 +113,8 @@ public class BatchTest {
statement.addBatch(
"insert into root.ln.wf01.wt01(timestamp,temperature)
vvvvvv(1509465720000,20.092794)");
result = statement.executeBatch();
- assertEquals(resp.statusList.size(), result.length);
- for (int i = 0; i < resp.statusList.size(); i++) {
+ assertEquals(resp.getSubStatus().size(), result.length);
+ for (int i = 0; i < resp.getSubStatus().size(); i++) {
assertEquals(resExpected.get(i).code, result[i]);
}
statement.clearBatch();
@@ -120,7 +123,9 @@ public class BatchTest {
@Test(expected = BatchUpdateException.class)
public void testExecuteBatchSQL2() throws SQLException, TException {
Statement statement = connection.createStatement();
- resp =
RpcUtils.getTSBatchExecuteStatementResp(TSStatusCode.SQL_PARSE_ERROR);
+ statement.addBatch("sql1");
+ resp =
+
RpcUtils.getStatus(Collections.singletonList(RpcUtils.getStatus(TSStatusCode.SQL_PARSE_ERROR)));
when(client.executeBatchStatement(any(TSExecuteBatchStatementReq.class))).thenReturn(resp);
statement.executeBatch();
@@ -130,14 +135,16 @@ public class BatchTest {
@Test
public void testExecuteBatchSQL3() throws SQLException, TException {
Statement statement = connection.createStatement();
- resp = RpcUtils.getTSBatchExecuteStatementResp(errorStatus);
+ resp = RpcUtils.getStatus(Collections.singletonList(errorStatus));
+ statement.addBatch("sql1");
+ statement.addBatch("sql1");
List<TSStatus> resExpected = new ArrayList<TSStatus>() {
{
add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
add(RpcUtils.getStatus(TSStatusCode.SQL_PARSE_ERROR));
}
};
- resp.setStatusList(resExpected);
+ resp.setSubStatus(resExpected);
when(client.executeBatchStatement(any(TSExecuteBatchStatementReq.class))).thenReturn(resp);
try {
statement.executeBatch();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 758c77f..544ea7f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -46,6 +46,7 @@ import
org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.BatchInsertionException;
import org.apache.iotdb.db.exception.LoadFileException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -289,20 +290,19 @@ public class StorageEngine implements IService {
*
* @return result of each row
*/
- public TSStatus[] insertTablet(InsertTabletPlan insertTabletPlan) throws
StorageEngineException {
+ public void insertTablet(InsertTabletPlan insertTabletPlan)
+ throws StorageEngineException, BatchInsertionException {
StorageGroupProcessor storageGroupProcessor;
try {
storageGroupProcessor = getProcessor(insertTabletPlan.getDeviceId());
} catch (StorageEngineException e) {
- logger.warn("get StorageGroupProcessor of device {} failed, because {}",
- insertTabletPlan.getDeviceId(),
- e.getMessage(), e);
- throw new StorageEngineException(e);
+ throw new StorageEngineException(String.format("Get
StorageGroupProcessor of device %s "
+ + "failed", insertTabletPlan.getDeviceId()), e);
}
// TODO monitor: update statistics
try {
- return storageGroupProcessor.insertTablet(insertTabletPlan);
+ storageGroupProcessor.insertTablet(insertTabletPlan);
} catch (WriteProcessException e) {
throw new StorageEngineException(e);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 8ba9020..b4b3f72 100755
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -25,6 +25,7 @@ import static
org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFF
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
@@ -62,6 +63,7 @@ import
org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
import org.apache.iotdb.db.engine.version.VersionController;
+import org.apache.iotdb.db.exception.BatchInsertionException;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
import org.apache.iotdb.db.exception.LoadFileException;
import org.apache.iotdb.db.exception.MergeException;
@@ -622,10 +624,19 @@ public class StorageGroupProcessor {
}
}
- public TSStatus[] insertTablet(InsertTabletPlan insertTabletPlan) throws
WriteProcessException {
+ /**
+ * Insert a tablet (rows belonging to the same devices) into this storage
group.
+ * @param insertTabletPlan
+ * @throws WriteProcessException when update last cache failed
+ * @throws BatchInsertionException if some of the rows failed to be inserted
+ */
+ public void insertTablet(InsertTabletPlan insertTabletPlan) throws
WriteProcessException,
+ BatchInsertionException {
writeLock();
try {
TSStatus[] results = new TSStatus[insertTabletPlan.getRowCount()];
+ Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
+ boolean noFailure = true;
/*
* assume that batch has been sorted by client
@@ -638,13 +649,14 @@ public class StorageGroupProcessor {
results[loc] = RpcUtils.getStatus(TSStatusCode.OUT_OF_TTL_ERROR,
"time " + currTime + " in current line is out of TTL: " +
dataTTL);
loc++;
+ noFailure = false;
} else {
break;
}
}
// loc pointing at first legal position
if (loc == insertTabletPlan.getRowCount()) {
- return results;
+ throw new BatchInsertionException(results);
}
// before is first start point
int before = loc;
@@ -659,12 +671,12 @@ public class StorageGroupProcessor {
while (loc < insertTabletPlan.getRowCount()) {
long time = insertTabletPlan.getTimes()[loc];
long curTimePartition = StorageEngine.getTimePartition(time);
- results[loc] = RpcUtils.SUCCESS_STATUS;
// start next partition
if (curTimePartition != beforeTimePartition) {
// insert last time partition
- insertTabletToTsFileProcessor(insertTabletPlan, before, loc,
isSequence, results,
- beforeTimePartition);
+ noFailure = insertTabletToTsFileProcessor(insertTabletPlan, before,
loc, isSequence,
+ results,
+ beforeTimePartition) && noFailure;
// re initialize
before = loc;
beforeTimePartition = curTimePartition;
@@ -678,8 +690,8 @@ public class StorageGroupProcessor {
// judge if we should insert sequence
if (!isSequence && time > lastFlushTime) {
// insert into unsequence and then start sequence
- insertTabletToTsFileProcessor(insertTabletPlan, before, loc,
false, results,
- beforeTimePartition);
+ noFailure = insertTabletToTsFileProcessor(insertTabletPlan,
before, loc, false, results,
+ beforeTimePartition) && noFailure;
before = loc;
isSequence = true;
}
@@ -689,14 +701,16 @@ public class StorageGroupProcessor {
// do not forget last part
if (before < loc) {
- insertTabletToTsFileProcessor(insertTabletPlan, before, loc,
isSequence, results,
- beforeTimePartition);
+ noFailure = insertTabletToTsFileProcessor(insertTabletPlan, before,
loc, isSequence,
+ results, beforeTimePartition) && noFailure;
}
long globalLatestFlushedTime =
globalLatestFlushedTimeForEachDevice.getOrDefault(
insertTabletPlan.getDeviceId(), Long.MIN_VALUE);
tryToUpdateBatchInsertLastCache(insertTabletPlan,
globalLatestFlushedTime);
- return results;
+ if (!noFailure) {
+ throw new BatchInsertionException(results);
+ }
} finally {
writeUnlock();
}
@@ -719,12 +733,13 @@ public class StorageGroupProcessor {
* @param end end index of rows to be inserted in insertTabletPlan
* @param results result array
* @param timePartitionId time partition id
+ * @return false if any failure occurs when inserting the tablet, true
otherwise
*/
- private void insertTabletToTsFileProcessor(InsertTabletPlan insertTabletPlan,
+ private boolean insertTabletToTsFileProcessor(InsertTabletPlan
insertTabletPlan,
int start, int end, boolean sequence, TSStatus[] results, long
timePartitionId) {
// return when start >= end
if (start >= end) {
- return;
+ return true;
}
TsFileProcessor tsFileProcessor =
getOrCreateTsFileProcessor(timePartitionId, sequence);
@@ -733,14 +748,14 @@ public class StorageGroupProcessor {
results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR,
"can not create TsFileProcessor, timePartitionId: " +
timePartitionId);
}
- return;
+ return false;
}
try {
tsFileProcessor.insertTablet(insertTabletPlan, start, end, results);
} catch (WriteProcessException e) {
logger.error("insert to TsFileProcessor error ", e);
- return;
+ return false;
}
latestTimeForEachDevice.computeIfAbsent(timePartitionId, t -> new
HashMap<>());
@@ -756,6 +771,7 @@ public class StorageGroupProcessor {
if (tsFileProcessor.shouldFlush()) {
fileFlushPolicy.apply(this, tsFileProcessor, sequence);
}
+ return true;
}
private void tryToUpdateBatchInsertLastCache(InsertTabletPlan plan, Long
latestFlushedTime)
diff --git
a/service-rpc/src/main/java/org/apache/iotdb/rpc/BatchExecutionException.java
b/server/src/main/java/org/apache/iotdb/db/exception/BatchInsertionException.java
similarity index 63%
copy from
service-rpc/src/main/java/org/apache/iotdb/rpc/BatchExecutionException.java
copy to
server/src/main/java/org/apache/iotdb/db/exception/BatchInsertionException.java
index bce1d26..182b3e4 100644
---
a/service-rpc/src/main/java/org/apache/iotdb/rpc/BatchExecutionException.java
+++
b/server/src/main/java/org/apache/iotdb/db/exception/BatchInsertionException.java
@@ -16,30 +16,26 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.rpc;
-import java.util.List;
-import org.apache.iotdb.service.rpc.thrift.TSStatus;
+package org.apache.iotdb.db.exception;
-public class BatchExecutionException extends Exception{
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
- private List<TSStatus> statusList;
+public class BatchInsertionException extends QueryProcessException {
- public BatchExecutionException(String message) {
- super(message);
- }
+ private TSStatus[] failingStatus;
- public BatchExecutionException(List<TSStatus> statusList) {
- this.statusList = statusList;
+ public BatchInsertionException(TSStatus[] failingStatus) {
+ super("Batch insertion failed");
+ this.failingStatus = failingStatus;
}
- public BatchExecutionException(List<TSStatus> statusList, String message) {
- super(message);
- this.statusList = statusList;
+ public void setFailingStatus(TSStatus[] failingStatus) {
+ this.failingStatus = failingStatus;
}
- public List<TSStatus> getStatusList() {
- return statusList;
+ public TSStatus[] getFailingStatus() {
+ return failingStatus;
}
-
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
b/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
index 24a2629..6fb5d98 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.qp.executor;
import java.io.IOException;
import java.sql.SQLException;
+import org.apache.iotdb.db.exception.BatchInsertionException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
@@ -93,6 +94,7 @@ public interface IPlanExecutor {
* execute batch insert plan
*
* @return result of each row
+ * @throws BatchInsertionException when some of the rows failed
*/
- TSStatus[] insertTablet(InsertTabletPlan insertTabletPlan) throws
QueryProcessException;
+ void insertTablet(InsertTabletPlan insertTabletPlan) throws
QueryProcessException;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 34510d4..5f3936f 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -76,6 +76,7 @@ import
org.apache.iotdb.db.engine.flush.pool.FlushTaskPoolManager;
import org.apache.iotdb.db.engine.merge.manage.MergeManager;
import org.apache.iotdb.db.engine.merge.manage.MergeManager.TaskStatus;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.BatchInsertionException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.DeleteFailedException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -199,6 +200,9 @@ public class PlanExecutor implements IPlanExecutor {
case INSERT:
insert((InsertPlan) plan);
return true;
+ case BATCHINSERT:
+ insertTablet((InsertTabletPlan) plan);
+ return true;
case CREATE_ROLE:
case DELETE_ROLE:
case CREATE_USER:
@@ -1075,7 +1079,7 @@ public class PlanExecutor implements IPlanExecutor {
}
@Override
- public TSStatus[] insertTablet(InsertTabletPlan insertTabletPlan) throws
QueryProcessException {
+ public void insertTablet(InsertTabletPlan insertTabletPlan) throws
QueryProcessException {
MNode node = null;
try {
String[] measurementList = insertTabletPlan.getMeasurements();
@@ -1116,7 +1120,7 @@ public class PlanExecutor implements IPlanExecutor {
measurementList[i] = measurementNode.getName();
}
insertTabletPlan.setSchemas(schemas);
- return StorageEngine.getInstance().insertTablet(insertTabletPlan);
+ StorageEngine.getInstance().insertTablet(insertTabletPlan);
} catch (StorageEngineException | MetadataException e) {
throw new QueryProcessException(e);
} finally {
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 11e3ac3..b165552 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.cost.statistic.Measurement;
import org.apache.iotdb.db.cost.statistic.Operation;
+import org.apache.iotdb.db.exception.BatchInsertionException;
import org.apache.iotdb.db.exception.QueryInBatchStatementException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -80,7 +81,6 @@ import
org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
@@ -362,13 +362,13 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
}
@Override
- public TSExecuteBatchStatementResp
executeBatchStatement(TSExecuteBatchStatementReq req) {
+ public TSStatus executeBatchStatement(TSExecuteBatchStatementReq req) {
long t1 = System.currentTimeMillis();
List<TSStatus> result = new ArrayList<>();
try {
if (!checkLogin(req.getSessionId())) {
logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
- return
RpcUtils.getTSBatchExecuteStatementResp(TSStatusCode.NOT_LOGIN_ERROR);
+ return RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR);
}
List<String> statements = req.getStatements();
@@ -382,15 +382,15 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_ONE_SQL_IN_BATCH,
t2);
}
if (isAllSuccessful) {
- return RpcUtils.getTSBatchExecuteStatementResp(
+ return RpcUtils.getStatus(
TSStatusCode.SUCCESS_STATUS, "Execute batch statements
successfully");
} else {
- return RpcUtils.getTSBatchExecuteStatementResp(result);
+ return RpcUtils.getStatus(result);
}
} catch (Exception e) {
logger.error("{}: server Internal Error: ",
IoTDBConstant.GLOBAL_DB_NAME, e);
return RpcUtils
- .getTSBatchExecuteStatementResp(TSStatusCode.INTERNAL_SERVER_ERROR,
e.getMessage());
+ .getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
} finally {
Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_JDBC_BATCH,
t1);
}
@@ -1059,14 +1059,13 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
}
@Override
- public TSExecuteBatchStatementResp insertRecords(TSInsertRecordsReq req) {
- TSExecuteBatchStatementResp resp = new TSExecuteBatchStatementResp();
+ public TSStatus insertRecords(TSInsertRecordsReq req) {
if (!checkLogin(req.getSessionId())) {
logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
- resp.addToStatusList(RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR));
- return resp;
+ return RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR);
}
+ List<TSStatus> statusList = new ArrayList<>();
InsertPlan plan = new InsertPlan();
for (int i = 0; i < req.deviceIds.size(); i++) {
try {
@@ -1079,29 +1078,29 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
plan.setInferType(req.isInferType());
TSStatus status = checkAuthority(plan, req.getSessionId());
if (status != null) {
- resp.addToStatusList(status);
+ statusList.add(status);
} else {
- resp.addToStatusList(executePlan(plan));
+ statusList.add(executePlan(plan));
}
} catch (Exception e) {
logger.error("meet error when insert in batch", e);
-
resp.addToStatusList(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
+ statusList.add(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
}
}
- return resp;
+ return RpcUtils.getStatus(statusList);
}
@Override
- public TSExecuteBatchStatementResp testInsertTablet(TSInsertTabletReq req) {
+ public TSStatus testInsertTablet(TSInsertTabletReq req) {
logger.debug("Test insert batch request receive.");
- return
RpcUtils.getTSBatchExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
@Override
- public TSExecuteBatchStatementResp testInsertTablets(TSInsertTabletsReq req)
throws TException {
+ public TSStatus testInsertTablets(TSInsertTabletsReq req) {
logger.debug("Test insert batch request receive.");
- return
RpcUtils.getTSBatchExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
@Override
@@ -1111,9 +1110,9 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
}
@Override
- public TSExecuteBatchStatementResp testInsertRecords(TSInsertRecordsReq req)
{
+ public TSStatus testInsertRecords(TSInsertRecordsReq req) {
logger.debug("Test insert row in batch request receive.");
- return
RpcUtils.getTSBatchExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
@Override
@@ -1167,12 +1166,12 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
}
@Override
- public TSExecuteBatchStatementResp insertTablet(TSInsertTabletReq req) {
+ public TSStatus insertTablet(TSInsertTabletReq req) {
long t1 = System.currentTimeMillis();
try {
if (!checkLogin(req.getSessionId())) {
logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
- return
RpcUtils.getTSBatchExecuteStatementResp(TSStatusCode.NOT_LOGIN_ERROR);
+ return RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR);
}
InsertTabletPlan insertTabletPlan = new InsertTabletPlan(req.deviceId,
req.measurements);
@@ -1183,45 +1182,28 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
insertTabletPlan.setRowCount(req.size);
insertTabletPlan.setDataTypes(req.types);
- boolean isAllSuccessful = true;
TSStatus status = checkAuthority(insertTabletPlan, req.getSessionId());
if (status != null) {
- return RpcUtils.getTSBatchExecuteStatementResp(status);
- }
- TSStatus[] tsStatusArray = executor.insertTablet(insertTabletPlan);
-
- for (TSStatus tsStatus : tsStatusArray) {
- if (tsStatus.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- isAllSuccessful = false;
- break;
- }
+ return status;
}
- if (isAllSuccessful) {
- if (logger.isDebugEnabled()) {
- logger.debug("Insert one Tablet successfully");
- }
- return
RpcUtils.getTSBatchExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
- } else {
- logger.debug("Insert one Tablet failed!");
- return
RpcUtils.getTSBatchExecuteStatementResp(Arrays.asList(tsStatusArray));
- }
+ return executePlan(insertTabletPlan);
} catch (Exception e) {
logger.error("{}: error occurs when executing statements",
IoTDBConstant.GLOBAL_DB_NAME, e);
return RpcUtils
-
.getTSBatchExecuteStatementResp(TSStatusCode.EXECUTE_STATEMENT_ERROR,
e.getMessage());
+ .getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage());
} finally {
Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT,
t1);
}
}
@Override
- public TSExecuteBatchStatementResp insertTablets(TSInsertTabletsReq req) {
+ public TSStatus insertTablets(TSInsertTabletsReq req) {
long t1 = System.currentTimeMillis();
try {
if (!checkLogin(req.getSessionId())) {
logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
- return
RpcUtils.getTSBatchExecuteStatementResp(TSStatusCode.NOT_LOGIN_ERROR);
+ return RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR);
}
List<TSStatus> statusList = new ArrayList<>();
@@ -1237,34 +1219,19 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
insertTabletPlan.setRowCount(req.sizeList.get(i));
insertTabletPlan.setDataTypes(req.typesList.get(i));
- boolean isCurrentTabletSuccessful = true;
TSStatus status = checkAuthority(insertTabletPlan, req.getSessionId());
if (status != null) {
statusList.add(status);
continue;
}
- TSStatus[] tsStatusArray = executor.insertTablet(insertTabletPlan);
- TSStatus failed =
RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR);
- for (TSStatus tsStatus : tsStatusArray) {
- if (tsStatus.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- isCurrentTabletSuccessful = false;
- failed = tsStatus;
- break;
- }
- }
-
- if (isCurrentTabletSuccessful) {
- statusList.add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
- } else {
- statusList.add(failed);
- }
+ statusList.add(executePlan(insertTabletPlan));
}
- return RpcUtils.getTSBatchExecuteStatementResp(statusList);
+ return RpcUtils.getStatus(statusList);
} catch (Exception e) {
logger.error("{}: error occurs when insertTablets",
IoTDBConstant.GLOBAL_DB_NAME, e);
return RpcUtils
-
.getTSBatchExecuteStatementResp(TSStatusCode.EXECUTE_STATEMENT_ERROR,
e.getMessage());
+ .getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage());
} finally {
Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT,
t1);
}
@@ -1332,10 +1299,10 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
}
@Override
- public TSExecuteBatchStatementResp
createMultiTimeseries(TSCreateMultiTimeseriesReq req) {
+ public TSStatus createMultiTimeseries(TSCreateMultiTimeseriesReq req) {
if (!checkLogin(req.getSessionId())) {
logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
- return
RpcUtils.getTSBatchExecuteStatementResp(TSStatusCode.NOT_LOGIN_ERROR);
+ return RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR);
}
List<TSStatus> statusList = new ArrayList<>(req.paths.size());
for (int i = 0; i < req.paths.size(); i++) {
@@ -1376,10 +1343,10 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
if (logger.isDebugEnabled()) {
logger.debug("Create multiple timeseries successfully");
}
- return
RpcUtils.getTSBatchExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} else {
logger.debug("Create multiple timeseries failed!");
- return RpcUtils.getTSBatchExecuteStatementResp(statusList);
+ return RpcUtils.getStatus(statusList);
}
}
@@ -1430,6 +1397,8 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
boolean execRet;
try {
execRet = executeNonQuery(plan);
+ } catch (BatchInsertionException e) {
+ return RpcUtils.getStatus(Arrays.asList(e.getFailingStatus()));
} catch (QueryProcessException e) {
logger.debug("meet error while processing non-query. ", e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
diff --git a/service-rpc/rpc-changelist.md b/service-rpc/rpc-changelist.md
index 1b60515..dacb452 100644
--- a/service-rpc/rpc-changelist.md
+++ b/service-rpc/rpc-changelist.md
@@ -19,6 +19,26 @@
-->
+# 0.10.x (version-2) -> 0.11.x (version-3)
+
+Last Updated on 2020-6-15 by Tian Jiang.
+
+## 1. Delete Old
+
+| Latest Changes | Related Committers |
+| ---------------------------------- | ------------------ |
+| Remove TSBatchExecuteStatementResp | Tian Jiang |
+
+
+
+## 2. Add New
+
+## 3. Update
+
+| Latest Changes | Related
Committers |
+| ------------------------------------------------------------ |
---------------------- |
+| Add sub-status in TSStatus | Tian Jiang |
+
# 0.9.x (version-1) -> 0.10.x (version-2)
@@ -32,6 +52,7 @@ Last Updated on 2020-5-25 by Kaifeng Xue.
| Remove TS_SessionHandle,TSHandleIdentifier | Tian Jiang |
| Remove TSStatus,TSExecuteInsertRowInBatchResp | Jialin Qiao|
+
## 2. Add New
| Latest Changes | Related
Committers |
diff --git
a/service-rpc/src/main/java/org/apache/iotdb/rpc/BatchExecutionException.java
b/service-rpc/src/main/java/org/apache/iotdb/rpc/BatchExecutionException.java
index bce1d26..7c67d22 100644
---
a/service-rpc/src/main/java/org/apache/iotdb/rpc/BatchExecutionException.java
+++
b/service-rpc/src/main/java/org/apache/iotdb/rpc/BatchExecutionException.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.rpc;
import java.util.List;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
-public class BatchExecutionException extends Exception{
+public class BatchExecutionException extends StatementExecutionException {
private List<TSStatus> statusList;
@@ -30,6 +30,7 @@ public class BatchExecutionException extends Exception{
}
public BatchExecutionException(List<TSStatus> statusList) {
+ super("");
this.statusList = statusList;
}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
index db84839..2bd09c5 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.rpc;
import java.lang.reflect.Proxy;
import java.util.List;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
import org.apache.iotdb.service.rpc.thrift.TSIService;
@@ -28,6 +27,10 @@ import org.apache.iotdb.service.rpc.thrift.TSStatus;
public class RpcUtils {
+ private RpcUtils() {
+ // util class
+ }
+
public static final TSStatus SUCCESS_STATUS = new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
public static TSIService.Iface newSynchronizedClient(TSIService.Iface
client) {
@@ -41,6 +44,10 @@ public class RpcUtils {
* @param status -status
*/
public static void verifySuccess(TSStatus status) throws
StatementExecutionException {
+ if (status.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+ verifySuccess(status.getSubStatus());
+ return;
+ }
if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new StatementExecutionException(status);
}
@@ -61,6 +68,12 @@ public class RpcUtils {
return new TSStatus(tsStatusCode.getStatusCode());
}
+ public static TSStatus getStatus(List<TSStatus> statusList) {
+ TSStatus status = new
TSStatus(TSStatusCode.MULTIPLE_ERROR.getStatusCode());
+ status.setSubStatus(statusList);
+ return status;
+ }
+
/**
* convert from TSStatusCode to TSStatus, which has message appending with
existed status message
*
@@ -96,30 +109,6 @@ public class RpcUtils {
return resp;
}
-
- public static TSExecuteBatchStatementResp
getTSBatchExecuteStatementResp(TSStatusCode tsStatusCode) {
- TSStatus status = getStatus(tsStatusCode);
- return getTSBatchExecuteStatementResp(status);
- }
-
- public static TSExecuteBatchStatementResp
getTSBatchExecuteStatementResp(TSStatusCode tsStatusCode, String message) {
- TSStatus status = getStatus(tsStatusCode, message);
- return getTSBatchExecuteStatementResp(status);
- }
-
- public static TSExecuteBatchStatementResp
getTSBatchExecuteStatementResp(TSStatus status) {
- TSExecuteBatchStatementResp resp = new TSExecuteBatchStatementResp();
- resp.addToStatusList(status);
- return resp;
- }
-
- public static TSExecuteBatchStatementResp
getTSBatchExecuteStatementResp(List<TSStatus> statusList) {
- TSExecuteBatchStatementResp resp = new TSExecuteBatchStatementResp();
- resp.setStatusList(statusList);
- return resp;
- }
-
-
public static TSFetchResultsResp getTSFetchResultsResp(TSStatusCode
tsStatusCode) {
TSStatus status = getStatus(tsStatusCode);
return getTSFetchResultsResp(status);
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index c3e19ae..48a064f 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -63,6 +63,7 @@ public enum TSStatusCode {
READ_ONLY_SYSTEM_ERROR(502),
DISK_SPACE_INSUFFICIENT_ERROR(503),
START_UP_ERROR(504),
+ MULTIPLE_ERROR(505),
WRONG_LOGIN_PASSWORD_ERROR(600),
NOT_LOGIN_ERROR(601),
diff --git a/service-rpc/src/main/thrift/rpc.thrift
b/service-rpc/src/main/thrift/rpc.thrift
index 6dd4edf..25bdcd8 100644
--- a/service-rpc/src/main/thrift/rpc.thrift
+++ b/service-rpc/src/main/thrift/rpc.thrift
@@ -22,6 +22,7 @@ namespace java org.apache.iotdb.service.rpc.thrift
struct TSStatus {
1: required i32 code
2: optional string message
+ 3: optional list<TSStatus> subStatus
}
struct TSExecuteStatementResp {
@@ -91,10 +92,6 @@ struct TSExecuteStatementReq {
4: optional i32 fetchSize
}
-struct TSExecuteBatchStatementResp{
- 1: required list<TSStatus> statusList
-}
-
struct TSExecuteBatchStatementReq{
// The session to execute the statement against
1: required i64 sessionId
@@ -255,7 +252,6 @@ struct TSQueryNonAlignDataSet{
2: required list<binary> valueList
}
-
service TSIService {
TSOpenSessionResp openSession(1:TSOpenSessionReq req);
@@ -263,7 +259,7 @@ service TSIService {
TSExecuteStatementResp executeStatement(1:TSExecuteStatementReq req);
- TSExecuteBatchStatementResp
executeBatchStatement(1:TSExecuteBatchStatementReq req);
+ TSStatus executeBatchStatement(1:TSExecuteBatchStatementReq req);
TSExecuteStatementResp executeQueryStatement(1:TSExecuteStatementReq
req);
@@ -287,7 +283,7 @@ service TSIService {
TSStatus createTimeseries(1:TSCreateTimeseriesReq req);
- TSExecuteBatchStatementResp
createMultiTimeseries(1:TSCreateMultiTimeseriesReq req);
+ TSStatus createMultiTimeseries(1:TSCreateMultiTimeseriesReq req);
TSStatus deleteTimeseries(1:i64 sessionId, 2:list<string> path)
@@ -295,19 +291,19 @@ service TSIService {
TSStatus insertRecord(1:TSInsertRecordReq req);
- TSExecuteBatchStatementResp insertTablet(1:TSInsertTabletReq req);
+ TSStatus insertTablet(1:TSInsertTabletReq req);
- TSExecuteBatchStatementResp insertTablets(1:TSInsertTabletsReq req);
+ TSStatus insertTablets(1:TSInsertTabletsReq req);
- TSExecuteBatchStatementResp insertRecords(1:TSInsertRecordsReq req);
+ TSStatus insertRecords(1:TSInsertRecordsReq req);
- TSExecuteBatchStatementResp testInsertTablet(1:TSInsertTabletReq req);
+ TSStatus testInsertTablet(1:TSInsertTabletReq req);
- TSExecuteBatchStatementResp testInsertTablets(1:TSInsertTabletsReq req);
+ TSStatus testInsertTablets(1:TSInsertTabletsReq req);
TSStatus testInsertRecord(1:TSInsertRecordReq req);
- TSExecuteBatchStatementResp testInsertRecords(1:TSInsertRecordsReq req);
+ TSStatus testInsertRecords(1:TSInsertRecordsReq req);
TSStatus deleteData(1:TSDeleteDataReq req);
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java
b/session/src/main/java/org/apache/iotdb/session/Session.java
index 02ef9b9..43bacda 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -220,7 +220,7 @@ public class Session {
* @param tablet data batch
*/
public void insertTablet(Tablet tablet)
- throws BatchExecutionException, IoTDBConnectionException {
+ throws StatementExecutionException, IoTDBConnectionException {
insertTablet(tablet, false);
}
@@ -231,10 +231,10 @@ public class Session {
* @param sorted whether times in Tablet are in ascending order
*/
public void insertTablet(Tablet tablet, boolean sorted)
- throws IoTDBConnectionException, BatchExecutionException {
+ throws IoTDBConnectionException, StatementExecutionException {
TSInsertTabletReq request = genTSInsertTabletReq(tablet, sorted);
try {
- RpcUtils.verifySuccess(client.insertTablet(request).statusList);
+ RpcUtils.verifySuccess(client.insertTablet(request));
} catch (TException e) {
throw new IoTDBConnectionException(e);
}
@@ -272,7 +272,7 @@ public class Session {
* @param tablets data batch in multiple device
*/
public void insertTablets(Map<String, Tablet> tablets)
- throws IoTDBConnectionException, BatchExecutionException {
+ throws IoTDBConnectionException, StatementExecutionException {
insertTablets(tablets, false);
}
@@ -284,11 +284,11 @@ public class Session {
* @param sorted whether times in each Tablet are in ascending order
*/
public void insertTablets(Map<String, Tablet> tablets, boolean sorted)
- throws IoTDBConnectionException, BatchExecutionException {
+ throws IoTDBConnectionException, StatementExecutionException {
TSInsertTabletsReq request = genTSInsertTabletsReq(tablets, sorted);
try {
- RpcUtils.verifySuccess(client.insertTablets(request).statusList);
+ RpcUtils.verifySuccess(client.insertTablets(request));
} catch (TException e) {
throw new IoTDBConnectionException(e);
}
@@ -336,11 +336,11 @@ public class Session {
public void insertRecords(List<String> deviceIds, List<Long> times,
List<List<String>> measurementsList, List<List<TSDataType>> typesList,
List<List<Object>> valuesList)
- throws IoTDBConnectionException, BatchExecutionException {
+ throws IoTDBConnectionException, StatementExecutionException {
TSInsertRecordsReq request = genTSInsertRecordsReq(deviceIds, times,
measurementsList,
typesList, valuesList);
try {
- RpcUtils.verifySuccess(client.insertRecords(request).statusList);
+ RpcUtils.verifySuccess(client.insertRecords(request));
} catch (TException e) {
throw new IoTDBConnectionException(e);
}
@@ -383,11 +383,11 @@ public class Session {
*/
public void insertRecords(List<String> deviceIds, List<Long> times,
List<List<String>> measurementsList, List<List<String>> valuesList)
- throws IoTDBConnectionException, BatchExecutionException {
+ throws IoTDBConnectionException, StatementExecutionException {
TSInsertRecordsReq request =genTSInsertRecordsReq(deviceIds, times,
measurementsList, valuesList);
try {
- RpcUtils.verifySuccess(client.insertRecords(request).statusList);
+ RpcUtils.verifySuccess(client.insertRecords(request));
} catch (TException e) {
throw new IoTDBConnectionException(e);
}
@@ -589,11 +589,11 @@ public class Session {
* this method should be used to test other time cost in client
*/
public void testInsertTablet(Tablet tablet)
- throws IoTDBConnectionException, BatchExecutionException {
+ throws IoTDBConnectionException, StatementExecutionException {
TSInsertTabletReq request = genTSInsertTabletReq(tablet, false);
try {
- RpcUtils.verifySuccess(client.testInsertTablet(request).statusList);
+ RpcUtils.verifySuccess(client.testInsertTablet(request));
} catch (TException e) {
throw new IoTDBConnectionException(e);
}
@@ -604,11 +604,11 @@ public class Session {
* this method should be used to test other time cost in client
*/
public void testInsertTablet(Tablet tablet, boolean sorted)
- throws IoTDBConnectionException, BatchExecutionException {
+ throws IoTDBConnectionException, StatementExecutionException {
TSInsertTabletReq request = genTSInsertTabletReq(tablet, sorted);
try {
- RpcUtils.verifySuccess(client.testInsertTablet(request).statusList);
+ RpcUtils.verifySuccess(client.testInsertTablet(request));
} catch (TException e) {
throw new IoTDBConnectionException(e);
}
@@ -619,11 +619,11 @@ public class Session {
* this method should be used to test other time cost in client
*/
public void testInsertTablets(Map<String, Tablet> tablets)
- throws IoTDBConnectionException, BatchExecutionException {
+ throws IoTDBConnectionException, StatementExecutionException {
TSInsertTabletsReq request = genTSInsertTabletsReq(tablets, false);
try {
- RpcUtils.verifySuccess(client.testInsertTablets(request).statusList);
+ RpcUtils.verifySuccess(client.testInsertTablets(request));
} catch (TException e) {
throw new IoTDBConnectionException(e);
}
@@ -634,11 +634,11 @@ public class Session {
* this method should be used to test other time cost in client
*/
public void testInsertTablets(Map<String, Tablet> tablets, boolean sorted)
- throws IoTDBConnectionException, BatchExecutionException {
+ throws IoTDBConnectionException, StatementExecutionException {
TSInsertTabletsReq request = genTSInsertTabletsReq(tablets, sorted);
try {
- RpcUtils.verifySuccess(client.testInsertTablets(request).statusList);
+ RpcUtils.verifySuccess(client.testInsertTablets(request));
} catch (TException e) {
throw new IoTDBConnectionException(e);
}
@@ -650,11 +650,11 @@ public class Session {
*/
public void testInsertRecords(List<String> deviceIds, List<Long> times,
List<List<String>> measurementsList, List<List<String>> valuesList)
- throws IoTDBConnectionException, BatchExecutionException {
+ throws IoTDBConnectionException, StatementExecutionException {
TSInsertRecordsReq request = genTSInsertRecordsReq(deviceIds, times,
measurementsList, valuesList);
try {
- RpcUtils.verifySuccess(client.testInsertRecords(request).statusList);
+ RpcUtils.verifySuccess(client.testInsertRecords(request));
} catch (TException e) {
throw new IoTDBConnectionException(e);
}
@@ -663,11 +663,11 @@ public class Session {
public void testInsertRecords(List<String> deviceIds, List<Long> times,
List<List<String>> measurementsList, List<List<TSDataType>> typesList,
List<List<Object>> valuesList)
- throws IoTDBConnectionException, BatchExecutionException {
+ throws IoTDBConnectionException, StatementExecutionException {
TSInsertRecordsReq request = genTSInsertRecordsReq(deviceIds, times,
measurementsList,
typesList, valuesList);
try {
- RpcUtils.verifySuccess(client.testInsertRecords(request).statusList);
+ RpcUtils.verifySuccess(client.testInsertRecords(request));
} catch (TException e) {
throw new IoTDBConnectionException(e);
}
@@ -821,7 +821,7 @@ public class Session {
List<TSEncoding> encodings, List<CompressionType> compressors,
List<Map<String, String>> propsList, List<Map<String, String>> tagsList,
List<Map<String, String>> attributesList, List<String>
measurementAliasList)
- throws IoTDBConnectionException, BatchExecutionException {
+ throws IoTDBConnectionException, StatementExecutionException {
TSCreateMultiTimeseriesReq request = new TSCreateMultiTimeseriesReq();
request.setSessionId(sessionId);
@@ -851,7 +851,7 @@ public class Session {
request.setMeasurementAliasList(measurementAliasList);
try {
- RpcUtils.verifySuccess(client.createMultiTimeseries(request).statusList);
+ RpcUtils.verifySuccess(client.createMultiTimeseries(request));
} catch (TException e) {
throw new IoTDBConnectionException(e);
}
diff --git
a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 8221f6b..4a03917 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -243,7 +243,7 @@ public class SessionPool {
* @param tablet data batch
*/
public void insertTablet(Tablet tablet)
- throws IoTDBConnectionException, BatchExecutionException {
+ throws IoTDBConnectionException, StatementExecutionException {
insertTablet(tablet, false);
}
@@ -264,7 +264,7 @@ public class SessionPool {
* @param sorted whether times in Tablet are in ascending order
*/
public void insertTablet(Tablet tablet, boolean sorted)
- throws IoTDBConnectionException, BatchExecutionException {
+ throws IoTDBConnectionException, StatementExecutionException {
for (int i = 0; i < RETRY; i++) {
Session session = getSession();
try {
@@ -274,7 +274,7 @@ public class SessionPool {
} catch (IoTDBConnectionException e) {
// TException means the connection is broken, remove it and get a new
one.
cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (BatchExecutionException e) {
+ } catch (StatementExecutionException e) {
putBack(session);
throw e;
}
@@ -288,7 +288,7 @@ public class SessionPool {
* @param tablets multiple batch
*/
public void insertTablets(Map<String, Tablet> tablets)
- throws IoTDBConnectionException, BatchExecutionException {
+ throws IoTDBConnectionException, StatementExecutionException {
insertTablets(tablets, false);
}
@@ -298,7 +298,7 @@ public class SessionPool {
* @param tablets multiple batch
*/
public void insertTablets(Map<String, Tablet> tablets, boolean sorted)
- throws IoTDBConnectionException, BatchExecutionException {
+ throws IoTDBConnectionException, StatementExecutionException {
for (int i = 0; i < RETRY; i++) {
Session session = getSession();
try {
@@ -308,7 +308,7 @@ public class SessionPool {
} catch (IoTDBConnectionException e) {
// TException means the connection is broken, remove it and get a new
one.
cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (BatchExecutionException e) {
+ } catch (StatementExecutionException e) {
putBack(session);
throw e;
}
@@ -324,7 +324,7 @@ public class SessionPool {
*/
public void insertRecords(List<String> deviceIds, List<Long> times,
List<List<String>> measurementsList, List<List<TSDataType>> typesList,
- List<List<Object>> valuesList) throws IoTDBConnectionException,
BatchExecutionException {
+ List<List<Object>> valuesList) throws IoTDBConnectionException,
StatementExecutionException {
for (int i = 0; i < RETRY; i++) {
Session session = getSession();
try {
@@ -334,7 +334,7 @@ public class SessionPool {
} catch (IoTDBConnectionException e) {
// TException means the connection is broken, remove it and get a new
one.
cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (BatchExecutionException e) {
+ } catch (StatementExecutionException e) {
putBack(session);
throw e;
}
@@ -351,7 +351,7 @@ public class SessionPool {
*/
public void insertRecords(List<String> deviceIds, List<Long> times,
List<List<String>> measurementsList, List<List<String>> valuesList)
- throws IoTDBConnectionException, BatchExecutionException {
+ throws IoTDBConnectionException, StatementExecutionException {
for (int i = 0; i < RETRY; i++) {
Session session = getSession();
try {
@@ -361,7 +361,7 @@ public class SessionPool {
} catch (IoTDBConnectionException e) {
// TException means the connection is broken, remove it and get a new
one.
cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (BatchExecutionException e) {
+ } catch (StatementExecutionException e) {
putBack(session);
throw e;
}
@@ -425,7 +425,7 @@ public class SessionPool {
* this method should be used to test other time cost in client
*/
public void testInsertTablet(Tablet tablet)
- throws IoTDBConnectionException, BatchExecutionException {
+ throws IoTDBConnectionException, StatementExecutionException {
for (int i = 0; i < RETRY; i++) {
Session session = getSession();
try {
@@ -435,7 +435,7 @@ public class SessionPool {
} catch (IoTDBConnectionException e) {
// TException means the connection is broken, remove it and get a new
one.
cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (BatchExecutionException e) {
+ } catch (StatementExecutionException e) {
putBack(session);
throw e;
}
@@ -447,7 +447,7 @@ public class SessionPool {
* this method should be used to test other time cost in client
*/
public void testInsertTablets(Map<String, Tablet> tablets)
- throws IoTDBConnectionException, BatchExecutionException {
+ throws IoTDBConnectionException, StatementExecutionException {
for (int i = 0; i < RETRY; i++) {
Session session = getSession();
try {
@@ -457,7 +457,7 @@ public class SessionPool {
} catch (IoTDBConnectionException e) {
// TException means the connection is broken, remove it and get a new
one.
cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (BatchExecutionException e) {
+ } catch (StatementExecutionException e) {
putBack(session);
throw e;
}
@@ -470,7 +470,7 @@ public class SessionPool {
*/
public void testInsertRecords(List<String> deviceIds, List<Long> times,
List<List<String>> measurementsList, List<List<String>> valuesList)
- throws IoTDBConnectionException, BatchExecutionException {
+ throws IoTDBConnectionException, StatementExecutionException {
for (int i = 0; i < RETRY; i++) {
Session session = getSession();
try {
@@ -480,7 +480,7 @@ public class SessionPool {
} catch (IoTDBConnectionException e) {
// TException means the connection is broken, remove it and get a new
one.
cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (BatchExecutionException e) {
+ } catch (StatementExecutionException e) {
putBack(session);
throw e;
}
@@ -494,7 +494,7 @@ public class SessionPool {
public void testInsertRecords(List<String> deviceIds, List<Long> times,
List<List<String>> measurementsList, List<List<TSDataType>> typesList,
List<List<Object>> valuesList)
- throws IoTDBConnectionException, BatchExecutionException {
+ throws IoTDBConnectionException, StatementExecutionException {
for (int i = 0; i < RETRY; i++) {
Session session = getSession();
try {
@@ -504,7 +504,7 @@ public class SessionPool {
} catch (IoTDBConnectionException e) {
// TException means the connection is broken, remove it and get a new
one.
cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (BatchExecutionException e) {
+ } catch (StatementExecutionException e) {
putBack(session);
throw e;
}
@@ -746,7 +746,7 @@ public class SessionPool {
List<TSEncoding> encodings, List<CompressionType> compressors,
List<Map<String, String>> propsList, List<Map<String, String>> tagsList,
List<Map<String, String>> attributesList, List<String>
measurementAliasList)
- throws IoTDBConnectionException, BatchExecutionException {
+ throws IoTDBConnectionException, StatementExecutionException {
for (int i = 0; i < RETRY; i++) {
Session session = getSession();
try {
@@ -757,7 +757,7 @@ public class SessionPool {
} catch (IoTDBConnectionException e) {
// TException means the connection is broken, remove it and get a new
one.
cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (BatchExecutionException e) {
+ } catch (StatementExecutionException e) {
putBack(session);
throw e;
}
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
index bc66859..7bc3a11 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
@@ -688,7 +688,7 @@ public class IoTDBSessionIT {
}
}
- private void insertRecords() throws IoTDBConnectionException,
BatchExecutionException {
+ private void insertRecords() throws IoTDBConnectionException,
StatementExecutionException {
String deviceId = "root.sg1.d2";
List<String> measurements = new ArrayList<>();
measurements.add("s1");
@@ -727,7 +727,7 @@ public class IoTDBSessionIT {
session.insertRecords(deviceIds, timestamps, measurementsList, typesList,
valuesList);
}
- private void insertRecordsByStr() throws IoTDBConnectionException,
BatchExecutionException {
+ private void insertRecordsByStr() throws IoTDBConnectionException,
StatementExecutionException {
String deviceId = "root.sg1.d2";
List<String> measurements = new ArrayList<>();
measurements.add("s1");
@@ -797,7 +797,7 @@ public class IoTDBSessionIT {
}
private void insertTabletTest1(String deviceId)
- throws IoTDBConnectionException, BatchExecutionException {
+ throws IoTDBConnectionException, StatementExecutionException {
List<MeasurementSchema> schemaList = new ArrayList<>();
schemaList.add(new MeasurementSchema("s1", TSDataType.INT64,
TSEncoding.RLE));
@@ -1072,7 +1072,7 @@ public class IoTDBSessionIT {
}
private void insertTabletTest2(String deviceId)
- throws IoTDBConnectionException, BatchExecutionException {
+ throws IoTDBConnectionException, StatementExecutionException {
List<MeasurementSchema> schemaList = new ArrayList<>();
schemaList.add(new MeasurementSchema("s1", TSDataType.INT64,
TSEncoding.RLE));
@@ -1104,7 +1104,7 @@ public class IoTDBSessionIT {
}
private void insertTabletTest3(String deviceId)
- throws IoTDBConnectionException, BatchExecutionException {
+ throws IoTDBConnectionException, StatementExecutionException {
List<MeasurementSchema> schemaList = new ArrayList<>();
schemaList.add(new MeasurementSchema("s1", TSDataType.INT64,
TSEncoding.RLE));
@@ -1136,7 +1136,7 @@ public class IoTDBSessionIT {
}
private void insertTabletTestForTime(String deviceId)
- throws IoTDBConnectionException, BatchExecutionException {
+ throws IoTDBConnectionException, StatementExecutionException {
List<MeasurementSchema> schemaList = new ArrayList<>();
schemaList.add(new MeasurementSchema("s1", TSDataType.INT64,
TSEncoding.RLE));