This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.9
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/rel/0.9 by this push:
new 68ee917 Add test insertion interfaces for session and add
primitiveArray pool config (#1245)
68ee917 is described below
commit 68ee917f7c324b7e5d89455f8033fe78431691d7
Author: wshao08 <[email protected]>
AuthorDate: Sat May 23 13:57:37 2020 +0800
Add test insertion interfaces for session and add primitiveArray pool
config (#1245)
* Add test insertion interfaces for session and session pool
* add primitive_array_size config
Co-authored-by: Xiangdong Huang <[email protected]>
Co-authored-by: Jialin Qiao <[email protected]>
---
.../resources/conf/iotdb-engine.properties | 5 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 +++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 3 +
.../apache/iotdb/db/rescon/PrimitiveArrayPool.java | 4 +-
.../org/apache/iotdb/db/service/TSServiceImpl.java | 20 ++++++
service-rpc/src/main/thrift/rpc.thrift | 8 ++-
.../java/org/apache/iotdb/session/Session.java | 77 ++++++++++++++++++++
.../org/apache/iotdb/session/pool/SessionPool.java | 83 ++++++++++++++++++++++
8 files changed, 207 insertions(+), 3 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties
b/server/src/assembly/resources/conf/iotdb-engine.properties
index 3a5b4e2..8a6dec5 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -207,6 +207,9 @@ chunk_buffer_pool_enable=false
# data.
# default_ttl=36000000
+# primitive array size (length of each array) in array pool
+primitive_array_size=128
+
####################
### Upgrade Configurations
####################
@@ -417,4 +420,4 @@ value_encoder=PLAIN
compressor=SNAPPY
# bloom filter error rate in TsFile
-bloom_filter_error_rate=0.05
\ No newline at end of file
+bloom_filter_error_rate=0.05
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 4154566..ad2a436 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -458,6 +458,8 @@ public class IoTDBConfig {
*/
private long defaultTTL = Long.MAX_VALUE;
+ private int primitiveArraySize = 128;
+
public IoTDBConfig() {
// empty constructor
}
@@ -1278,4 +1280,12 @@ public class IoTDBConfig {
public void setDefaultTTL(long defaultTTL) {
this.defaultTTL = defaultTTL;
}
+
+ public int getPrimitiveArraySize() {
+ return primitiveArraySize;
+ }
+
+ public void setPrimitiveArraySize(int primitiveArraySize) {
+ this.primitiveArraySize = primitiveArraySize;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 90a38f4..2251cf6 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -334,6 +334,9 @@ public class IoTDBDescriptor {
conf.setDefaultTTL(Long.parseLong(properties.getProperty("default_ttl",
String.valueOf(conf.getDefaultTTL()))));
+ conf.setPrimitiveArraySize((Integer.parseInt(
+ properties.getProperty(
+ "primitive_array_size",
String.valueOf(conf.getPrimitiveArraySize())))));
// At the same time, set TSFileConfig
TSFileDescriptor.getInstance().getConfig()
diff --git
a/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayPool.java
b/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayPool.java
index a5de880..dae85c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayPool.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayPool.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.rescon;
import java.util.ArrayDeque;
import java.util.EnumMap;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
@@ -34,7 +35,8 @@ public class PrimitiveArrayPool {
*/
private static final EnumMap<TSDataType, ArrayDeque> primitiveArraysMap =
new EnumMap<>(TSDataType.class);
- public static final int ARRAY_SIZE = 128;
+ public static final int ARRAY_SIZE =
+ IoTDBDescriptor.getInstance().getConfig().getPrimitiveArraySize();
static {
primitiveArraysMap.put(TSDataType.BOOLEAN, new ArrayDeque());
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 2deb833..5a2b02b 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -1243,6 +1243,26 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
}
@Override
+ public TSExecuteBatchStatementResp testInsertBatch(TSBatchInsertionReq req) {
+ logger.debug("Test insert batch request receive.");
+ return new
TSExecuteBatchStatementResp(getStatus(TSStatusCode.SUCCESS_STATUS));
+ }
+
+ @Override
+ public TSStatus testInsertRow(TSInsertReq req) {
+ logger.debug("Test insert row request receive.");
+ return new TSStatus(getStatus(TSStatusCode.SUCCESS_STATUS));
+ }
+
+ @Override
+ public TSExecuteInsertRowInBatchResp testInsertRowInBatch(TSInsertInBatchReq
req) {
+ logger.debug("Test insert row in batch request receive.");
+ TSExecuteInsertRowInBatchResp resp = new TSExecuteInsertRowInBatchResp();
+ resp.addToStatusList(getStatus(TSStatusCode.SUCCESS_STATUS));
+ return resp;
+ }
+
+ @Override
public TSStatus deleteData(TSDeleteDataReq req) {
if (!checkLogin()) {
logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
diff --git a/service-rpc/src/main/thrift/rpc.thrift
b/service-rpc/src/main/thrift/rpc.thrift
index 533aa3d..1309b54 100644
--- a/service-rpc/src/main/thrift/rpc.thrift
+++ b/service-rpc/src/main/thrift/rpc.thrift
@@ -299,6 +299,8 @@ service TSIService {
TSExecuteStatementResp insert(1:TSInsertionReq req);
+ TSStatus insertRow(1:TSInsertReq req);
+
TSExecuteBatchStatementResp insertBatch(1:TSBatchInsertionReq req);
TSExecuteInsertRowInBatchResp insertRowInBatch(1:TSInsertInBatchReq req);
@@ -311,7 +313,11 @@ service TSIService {
TSStatus deleteStorageGroups(1:list<string> storageGroup);
- TSStatus insertRow(1:TSInsertReq req);
+ TSStatus testInsertRow(1:TSInsertReq req);
+
+ TSExecuteBatchStatementResp testInsertBatch(1:TSBatchInsertionReq req);
+
+ TSExecuteInsertRowInBatchResp testInsertRowInBatch(1:TSInsertInBatchReq req)
TSStatus deleteData(1:TSDeleteDataReq req);
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java
b/session/src/main/java/org/apache/iotdb/session/Session.java
index 2aa1559..31028ec 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -237,6 +237,83 @@ public class Session {
}
/**
+ * This method NOT insert data into database and the server just return
after accept the request,
+ * this method should be used to test other time cost in client
+ */
+ public void testInsertRow(String deviceId, long time, List<String>
measurements,
+ List<String> values) throws IoTDBSessionException {
+ TSInsertReq request = new TSInsertReq();
+ request.setDeviceId(deviceId);
+ request.setTimestamp(time);
+ request.setMeasurements(measurements);
+ request.setValues(values);
+
+ try {
+ RpcUtils.verifySuccess(client.testInsertRow(request));
+ } catch (TException | IoTDBRPCException e) {
+ throw new IoTDBSessionException(e);
+ }
+ }
+
+ /**
+ * This method NOT insert data into database and the server just return
after accept the request,
+ * this method should be used to test other time cost in client
+ */
+ public void testInsertBatch(RowBatch rowBatch)
+ throws IoTDBSessionException {
+ TSBatchInsertionReq request = new TSBatchInsertionReq();
+ request.deviceId = rowBatch.deviceId;
+ for (MeasurementSchema measurementSchema : rowBatch.measurements) {
+ request.addToMeasurements(measurementSchema.getMeasurementId());
+ request.addToTypes(measurementSchema.getType().ordinal());
+ }
+ request.setTimestamps(SessionUtils.getTimeBuffer(rowBatch));
+ request.setValues(SessionUtils.getValueBuffer(rowBatch));
+ request.setSize(rowBatch.batchSize);
+
+ try {
+ RpcUtils.verifySuccess(client.testInsertBatch(request).status);
+ } catch (TException | IoTDBRPCException e) {
+ throw new IoTDBSessionException(e);
+ }
+ }
+
+ /**
+ * This method NOT insert data into database and the server just return
after accept the request,
+ * this method should be used to test other time cost in client
+ */
+ public void testInsertRowInBatch(List<String> deviceIds, List<Long> times,
+ List<List<String>> measurementsList, List<List<String>> valuesList)
+ throws IoTDBSessionException {
+ int len = deviceIds.size();
+ if (len != times.size() || len != measurementsList.size() || len !=
valuesList.size()) {
+ throw new IllegalArgumentException(
+ "deviceIds, times, measurementsList and valuesList's size should be
equal");
+ }
+ for (int i = 0; i < measurementsList.size(); i++) {
+ List<String> measurements = measurementsList.get(i);
+ List<String> values = valuesList.get(i);
+ if (measurements.size() != values.size()) {
+ throw new IllegalArgumentException(
+ "each measurements size and values size should be equal");
+ }
+ }
+ TSInsertInBatchReq request = new TSInsertInBatchReq();
+ request.setDeviceIds(deviceIds);
+ request.setTimestamps(times);
+ request.setMeasurementsList(measurementsList);
+ request.setValuesList(valuesList);
+
+ try {
+ for (TSStatus cur : client.insertRowInBatch(request).getStatusList()) {
+ RpcUtils.verifySuccess(cur);
+ }
+ } catch (TException | IoTDBRPCException e) {
+ throw new IoTDBSessionException(e);
+ }
+ }
+
+ /**
* delete a timeseries, including data and schema
*
* @param path timeseries to delete, should be a whole path
diff --git
a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 5b76191..414b42c 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -304,6 +304,89 @@ public class SessionPool {
}
/**
+ * This method NOT insert data into database and the server just return
after accept the request,
+ * this method should be used to test other time cost in client
+ */
+ public void testInsertRow(String deviceId, long time, List<String>
measurements,
+ List<String> values) throws IoTDBSessionException {
+ for (int i = 0; i < RETRY; i++) {
+ Session session = getSession();
+ try {
+ session.testInsertRow(deviceId, time, measurements, values);
+ putBack(session);
+ return;
+ } catch (IoTDBSessionException e) {
+ if (e.getCause() instanceof TException) {
+ // TException means the connection is broken, remove it and get a
new one.
+ closeSession(session);
+ removeSession();
+ } else {
+ putBack(session);
+ throw e;
+ }
+ }
+ }
+ throw new IoTDBSessionException(
+ String.format("retry to execute statement on %s:%s failed %d times",
ip, port, RETRY));
+ }
+
+ /**
+ * This method NOT insert data into database and the server just return
after accept the request,
+ * this method should be used to test other time cost in client
+ */
+ public void testInsertBatch(RowBatch rowBatch)
+ throws IoTDBSessionException {
+ for (int i = 0; i < RETRY; i++) {
+ Session session = getSession();
+ try {
+ session.testInsertBatch(rowBatch);
+ putBack(session);
+ return;
+ }
+ catch (IoTDBSessionException e){
+ if (e.getCause() instanceof TException) {
+ // TException means the connection is broken, remove it and get a
new one.
+ closeSession(session);
+ removeSession();
+ } else {
+ putBack(session);
+ throw e;
+ }
+ }
+ }
+ throw new IoTDBSessionException(
+ String.format("retry to execute statement on %s:%s failed %d times",
ip, port, RETRY));
+ }
+
+ /**
+ * This method NOT insert data into database and the server just return
after accept the request,
+ * this method should be used to test other time cost in client
+ */
+ public void testInsertRowInBatch(List<String> deviceIds, List<Long> times,
+ List<List<String>> measurementsList, List<List<String>> valuesList)
+ throws IoTDBSessionException {
+ for (int i = 0; i < RETRY; i++) {
+ Session session = getSession();
+ try {
+ session.testInsertRowInBatch(deviceIds, times, measurementsList,
valuesList);
+ putBack(session);
+ return;
+ } catch (IoTDBSessionException e) {
+ if (e.getCause() instanceof TException) {
+ // TException means the connection is broken, remove it and get a
new one.
+ closeSession(session);
+ removeSession();
+ } else {
+ putBack(session);
+ throw e;
+ }
+ }
+ }
+ throw new IoTDBSessionException(
+ String.format("retry to execute statement on %s:%s failed %d times",
ip, port, RETRY));
+ }
+
+ /**
* delete a timeseries, including data and schema
*
* @param paths timeseries to delete, should be a whole path