This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4e7be1c [IOTDB-615] Use binary rather than string in insert plan
(#1229)
4e7be1c is described below
commit 4e7be1c204acc47b362fe74b84f7e7b6d742e0b8
Author: SilverNarcissus <[email protected]>
AuthorDate: Thu May 28 13:56:23 2020 +0800
[IOTDB-615] Use binary rather than string in insert plan (#1229)
* add insert object interface in session
---
.../org/apache/iotdb/flink/FlinkIoTDBSink.java | 1 +
.../java/org/apache/iotdb/rocketmq/Constant.java | 40 +--
.../apache/iotdb/rocketmq/RocketMQConsumer.java | 79 ++++--
.../main/java/org/apache/iotdb/SessionExample.java | 43 ++-
.../java/org/apache/iotdb/SessionPoolExample.java | 17 +-
.../iotdb/flink/DefaultIoTSerializationSchema.java | 144 ++++++----
.../main/java/org/apache/iotdb/flink/Event.java | 67 +++--
.../java/org/apache/iotdb/flink/IoTDBSink.java | 282 +++++++++----------
.../flink/DefaultIoTSerializationSchemaTest.java | 11 +-
.../iotdb/flink/IoTDBSinkBatchInsertTest.java | 157 ++++++-----
.../iotdb/flink/IoTDBSinkBatchTimerTest.java | 80 +++---
.../apache/iotdb/flink/IoTDBSinkInsertTest.java | 68 ++---
.../iotdb/db/engine/memtable/AbstractMemTable.java | 32 +--
.../db/engine/storagegroup/TsFileProcessor.java | 2 +-
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 69 ++++-
.../iotdb/db/qp/physical/crud/InsertPlan.java | 209 +++++++++++++--
.../iotdb/db/qp/strategy/PhysicalGenerator.java | 1 +
.../org/apache/iotdb/db/service/TSServiceImpl.java | 42 +--
.../org/apache/iotdb/db/utils/CommonUtils.java | 44 ++-
.../iotdb/db/engine/storagegroup/TTLTest.java | 23 +-
.../org/apache/iotdb/db/tools/WalCheckerTest.java | 11 +-
.../apache/iotdb/db/writelog/PerformanceTest.java | 5 +-
.../iotdb/db/writelog/WriteLogNodeManagerTest.java | 2 +
.../apache/iotdb/db/writelog/WriteLogNodeTest.java | 6 +
.../iotdb/db/writelog/io/LogWriterReaderTest.java | 5 +-
.../iotdb/db/writelog/recover/LogReplayerTest.java | 19 +-
.../recover/RecoverResourceFromReaderTest.java | 19 +-
.../db/writelog/recover/SeqTsFileRecoverTest.java | 7 +-
.../writelog/recover/UnseqTsFileRecoverTest.java | 26 +-
service-rpc/rpc-changelist.md | 5 +-
service-rpc/src/main/thrift/rpc.thrift | 4 +-
.../java/org/apache/iotdb/session/Session.java | 297 ++++++++++++++++-----
.../org/apache/iotdb/session/pool/SessionPool.java | 48 ++--
.../org/apache/iotdb/session/IoTDBSessionIT.java | 187 ++++++++++++-
.../iotdb/session/IoTDBSessionIteratorIT.java | 21 +-
.../apache/iotdb/session/pool/SessionPoolTest.java | 18 +-
.../java/org/apache/iotdb/tsfile/utils/Binary.java | 9 +-
.../iotdb/tsfile/utils/ReadWriteIOUtils.java | 54 +++-
38 files changed, 1481 insertions(+), 673 deletions(-)
diff --git
a/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java
b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java
index 1048079..ca8d205 100644
--- a/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java
+++ b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java
@@ -64,6 +64,7 @@ public class FlinkIoTDBSink {
tuple.put("device", "root.sg.d1");
tuple.put("timestamp",
String.valueOf(System.currentTimeMillis()));
tuple.put("measurements", "s1");
+ tuple.put("types", "DOUBLE");
tuple.put("values", String.valueOf(random.nextDouble()));
context.collect(tuple);
diff --git
a/example/rocketmq/src/main/java/org/apache/iotdb/rocketmq/Constant.java
b/example/rocketmq/src/main/java/org/apache/iotdb/rocketmq/Constant.java
index 58785a3..5c465e9 100644
--- a/example/rocketmq/src/main/java/org/apache/iotdb/rocketmq/Constant.java
+++ b/example/rocketmq/src/main/java/org/apache/iotdb/rocketmq/Constant.java
@@ -41,25 +41,25 @@ public class Constant {
{"root.test.d1.s0", "INT32", "PLAIN", "SNAPPY"},
};
public static final String[] ALL_DATA = {
- "root.vehicle.d0,10,s0,100",
- "root.vehicle.d0,12,s0:s1,101:'employeeId102'",
- "root.vehicle.d0,19,s1,'employeeId103'",
- "root.vehicle.d1,11,s2,104.0",
- "root.vehicle.d1,15,s2:s3,105.0:true",
- "root.vehicle.d1,17,s3,false",
- "root.vehicle.d0,20,s0,1000",
- "root.vehicle.d0,22,s0:s1,1001:'employeeId1002'",
- "root.vehicle.d0,29,s1,'employeeId1003'",
- "root.vehicle.d1,21,s2,1004.0",
- "root.vehicle.d1,25,s2:s3,1005.0:true",
- "root.vehicle.d1,27,s3,true",
- "root.test.d0,10,s0,106",
- "root.test.d0,14,s0:s1,107:'employeeId108'",
- "root.test.d0,16,s1,'employeeId109'",
- "root.test.d1,1,s0,110",
- "root.test.d0,30,s0,1006",
- "root.test.d0,34,s0:s1,1007:'employeeId1008'",
- "root.test.d0,36,s1,'employeeId1090'",
- "root.test.d1,10,s0,1100",
+ "root.vehicle.d0,10,s0,INT32,100",
+ "root.vehicle.d0,12,s0:s1,INT32:TEXT,101:'employeeId102'",
+ "root.vehicle.d0,19,s1,TEXT,'employeeId103'",
+ "root.vehicle.d1,11,s2,FLOAT,104.0",
+ "root.vehicle.d1,15,s2:s3,FLOAT:BOOLEAN,105.0:true",
+ "root.vehicle.d1,17,s3,BOOLEAN,false",
+ "root.vehicle.d0,20,s0,INT32,1000",
+ "root.vehicle.d0,22,s0:s1,INT32:TEXT,1001:'employeeId1002'",
+ "root.vehicle.d0,29,s1,TEXT,'employeeId1003'",
+ "root.vehicle.d1,21,s2,FLOAT,1004.0",
+ "root.vehicle.d1,25,s2:s3,FLOAT:BOOLEAN,1005.0:true",
+ "root.vehicle.d1,27,s3,BOOLEAN,true",
+ "root.test.d0,10,s0,INT32,106",
+ "root.test.d0,14,s0:s1,INT32:TEXT,107:'employeeId108'",
+ "root.test.d0,16,s1,TEXT,'employeeId109'",
+ "root.test.d1,1,s0,INT32,110",
+ "root.test.d0,30,s0,INT32,1006",
+ "root.test.d0,34,s0:s1,INT32:TEXT,1007:'employeeId1008'",
+ "root.test.d0,36,s1,TEXT,'employeeId1090'",
+ "root.test.d1,10,s0,INT32,1100",
};
}
diff --git
a/example/rocketmq/src/main/java/org/apache/iotdb/rocketmq/RocketMQConsumer.java
b/example/rocketmq/src/main/java/org/apache/iotdb/rocketmq/RocketMQConsumer.java
index 696b2b9..6a67446 100644
---
a/example/rocketmq/src/main/java/org/apache/iotdb/rocketmq/RocketMQConsumer.java
+++
b/example/rocketmq/src/main/java/org/apache/iotdb/rocketmq/RocketMQConsumer.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.rocketmq;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.iotdb.rpc.IoTDBConnectionException;
@@ -37,11 +38,11 @@ import org.slf4j.LoggerFactory;
public class RocketMQConsumer {
+ private static final Logger logger =
LoggerFactory.getLogger(RocketMQConsumer.class);
+ private static Session session;
private DefaultMQPushConsumer consumer;
private String producerGroup;
private String serverAddresses;
- private static Session session;
- private static final Logger logger =
LoggerFactory.getLogger(RocketMQConsumer.class);
public RocketMQConsumer(String producerGroup, String serverAddresses, String
connectionHost,
int connectionPort, String user, String password)
@@ -53,6 +54,22 @@ public class RocketMQConsumer {
initIoTDB(connectionHost, connectionPort, user, password);
}
+ public static void main(String[] args)
+ throws MQClientException, StatementExecutionException,
IoTDBConnectionException {
+ /**
+ *Instantiate with specified consumer group name and specify name server
addresses.
+ */
+ RocketMQConsumer consumer = new RocketMQConsumer(Constant.CONSUMER_GROUP,
+ Constant.SERVER_ADDRESS,
+ Constant.IOTDB_CONNECTION_HOST,
+ Constant.IOTDB_CONNECTION_PORT,
+ Constant.IOTDB_CONNECTION_USER,
+ Constant.IOTDB_CONNECTION_PASSWORD);
+ consumer.prepareConsume();
+ consumer.addStorageGroup(Constant.ADDITIONAL_STORAGE_GROUP);
+ consumer.start();
+ }
+
private void initIoTDB(String host, int port, String user, String password)
throws IoTDBConnectionException, StatementExecutionException {
if (host == null) {
@@ -76,7 +93,8 @@ public class RocketMQConsumer {
session.setStorageGroup(storageGroup);
}
- private void createTimeseries(String[] sql) throws
StatementExecutionException, IoTDBConnectionException {
+ private void createTimeseries(String[] sql)
+ throws StatementExecutionException, IoTDBConnectionException {
String timeseries = sql[0];
TSDataType dataType = TSDataType.valueOf(sql[1]);
TSEncoding encoding = TSEncoding.valueOf(sql[2]);
@@ -89,8 +107,37 @@ public class RocketMQConsumer {
String device = dataArray[0];
long time = Long.parseLong(dataArray[1]);
List<String> measurements = Arrays.asList(dataArray[2].split(":"));
- List<String> values = Arrays.asList(dataArray[3].split(":"));
- session.insertRecord(device, time, measurements, values);
+ List<TSDataType> types = new ArrayList<>();
+ for (String type : dataArray[3].split(":")) {
+ types.add(TSDataType.valueOf(type));
+ }
+
+ List<Object> values = new ArrayList<>();
+ String[] valuesStr = dataArray[4].split(":");
+ for (int i = 0; i < valuesStr.length; i++) {
+ switch (types.get(i)) {
+ case INT64:
+ values.add(Long.parseLong(valuesStr[i]));
+ break;
+ case DOUBLE:
+ values.add(Double.parseDouble(valuesStr[i]));
+ break;
+ case INT32:
+ values.add(Integer.parseInt(valuesStr[i]));
+ break;
+ case TEXT:
+ values.add(valuesStr[i]);
+ break;
+ case FLOAT:
+ values.add(Float.parseFloat(valuesStr[i]));
+ break;
+ case BOOLEAN:
+ values.add(Boolean.parseBoolean(valuesStr[i]));
+ break;
+ }
+ }
+
+ session.insertRecord(device, time, measurements, types, values);
}
public void start() throws MQClientException {
@@ -99,6 +146,7 @@ public class RocketMQConsumer {
/**
* Subscribe topic and add register Listener
+ *
* @throws MQClientException
*/
public void prepareConsume() throws MQClientException {
@@ -116,8 +164,9 @@ public class RocketMQConsumer {
*/
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context)
-> {
for (MessageExt msg : msgs) {
- logger.info(String.format("%s Receive New Messages: %s %n",
Thread.currentThread().getName(),
- new String(msg.getBody())));
+ logger
+ .info(String.format("%s Receive New Messages: %s %n",
Thread.currentThread().getName(),
+ new String(msg.getBody())));
try {
insert(new String(msg.getBody()));
} catch (Exception e) {
@@ -147,20 +196,4 @@ public class RocketMQConsumer {
public void setServerAddresses(String serverAddresses) {
this.serverAddresses = serverAddresses;
}
-
- public static void main(String[] args)
- throws MQClientException, StatementExecutionException,
IoTDBConnectionException {
- /**
- *Instantiate with specified consumer group name and specify name server
addresses.
- */
- RocketMQConsumer consumer = new RocketMQConsumer(Constant.CONSUMER_GROUP,
- Constant.SERVER_ADDRESS,
- Constant.IOTDB_CONNECTION_HOST,
- Constant.IOTDB_CONNECTION_PORT,
- Constant.IOTDB_CONNECTION_USER,
- Constant.IOTDB_CONNECTION_PASSWORD);
- consumer.prepareConsume();
- consumer.addStorageGroup(Constant.ADDITIONAL_STORAGE_GROUP);
- consumer.start();
- }
}
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 444d862..f7e7db9 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -136,15 +136,20 @@ public class SessionExample {
private static void insertRecord() throws IoTDBConnectionException,
StatementExecutionException {
String deviceId = "root.sg1.d1";
List<String> measurements = new ArrayList<>();
+ List<TSDataType> types = new ArrayList<>();
measurements.add("s1");
measurements.add("s2");
measurements.add("s3");
+ types.add(TSDataType.INT64);
+ types.add(TSDataType.INT64);
+ types.add(TSDataType.INT64);
+
for (long time = 0; time < 100; time++) {
- List<String> values = new ArrayList<>();
- values.add("1");
- values.add("2");
- values.add("3");
- session.insertRecord(deviceId, time, measurements, values);
+ List<Object> values = new ArrayList<>();
+ values.add(1L);
+ values.add(2L);
+ values.add(3L);
+ session.insertRecord(deviceId, time, measurements, types, values);
}
}
@@ -152,11 +157,16 @@ public class SessionExample {
throws IoTDBConnectionException, StatementExecutionException {
String deviceId = "root.sg1.d1";
List<String> measurements = new ArrayList<>();
+ List<TSDataType> types = new ArrayList<>();
measurements.add("s1");
measurements.add("s2");
measurements.add("s3");
+ types.add(TSDataType.INT64);
+ types.add(TSDataType.INT64);
+ types.add(TSDataType.INT64);
+
for (long time = 0; time < 100; time++) {
- session.insertRecord(deviceId, time, measurements, 1L, 1L, 1L);
+ session.insertRecord(deviceId, time, measurements, types, 1L, 1L, 1L);
}
}
@@ -168,21 +178,27 @@ public class SessionExample {
measurements.add("s3");
List<String> deviceIds = new ArrayList<>();
List<List<String>> measurementsList = new ArrayList<>();
- List<List<String>> valuesList = new ArrayList<>();
+ List<List<Object>> valuesList = new ArrayList<>();
List<Long> timestamps = new ArrayList<>();
+ List<List<TSDataType>> typesList = new ArrayList<>();
for (long time = 0; time < 500; time++) {
- List<String> values = new ArrayList<>();
- values.add("1");
- values.add("2");
- values.add("3");
+ List<Object> values = new ArrayList<>();
+ List<TSDataType> types = new ArrayList<>();
+ values.add(1L);
+ values.add(2L);
+ values.add(3L);
+ types.add(TSDataType.INT64);
+ types.add(TSDataType.INT64);
+ types.add(TSDataType.INT64);
deviceIds.add(deviceId);
measurementsList.add(measurements);
valuesList.add(values);
+ typesList.add(types);
timestamps.add(time);
if (time != 0 && time % 100 == 0) {
- session.insertRecords(deviceIds, timestamps, measurementsList,
valuesList);
+ session.insertRecords(deviceIds, timestamps, measurementsList,
typesList, valuesList);
deviceIds.clear();
measurementsList.clear();
valuesList.clear();
@@ -190,9 +206,8 @@ public class SessionExample {
}
}
- session.insertRecords(deviceIds, timestamps, measurementsList, valuesList);
+ session.insertRecords(deviceIds, timestamps, measurementsList, typesList,
valuesList);
}
-
/**
* insert the data of a device. For each timestamp, the number of
measurements is the same.
*
diff --git
a/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
b/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
index 96a660d..395312e 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
@@ -19,7 +19,6 @@
package org.apache.iotdb;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -28,6 +27,7 @@ import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.SessionDataSet.DataIterator;
import org.apache.iotdb.session.pool.SessionDataSetWrapper;
import org.apache.iotdb.session.pool.SessionPool;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
public class SessionPoolExample {
@@ -51,15 +51,20 @@ public class SessionPoolExample {
private static void insertRecord() throws StatementExecutionException,
IoTDBConnectionException {
String deviceId = "root.sg1.d1";
List<String> measurements = new ArrayList<>();
+ List<TSDataType> types = new ArrayList<>();
measurements.add("s1");
measurements.add("s2");
measurements.add("s3");
+ types.add(TSDataType.INT64);
+ types.add(TSDataType.INT64);
+ types.add(TSDataType.INT64);
+
for (long time = 0; time < 10; time++) {
- List<String> values = new ArrayList<>();
- values.add("1");
- values.add("2");
- values.add("3");
- pool.insertRecord(deviceId, time, measurements, values);
+ List<Object> values = new ArrayList<>();
+ values.add(1L);
+ values.add(2L);
+ values.add(3L);
+ pool.insertRecord(deviceId, time, measurements, types, values);
}
}
diff --git
a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/DefaultIoTSerializationSchema.java
b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/DefaultIoTSerializationSchema.java
index 4ac596b..941b5ef 100644
---
a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/DefaultIoTSerializationSchema.java
+++
b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/DefaultIoTSerializationSchema.java
@@ -18,82 +18,110 @@
package org.apache.iotdb.flink;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
/**
- * @inheritDoc
- * The default implementation of IoTSerializationSchema. Gets info from a map
struct.
+ * @inheritDoc The default implementation of IoTSerializationSchema. Gets info
from a map struct.
*/
-public class DefaultIoTSerializationSchema implements
IoTSerializationSchema<Map<String,String>> {
- private String fieldDevice = "device";
- private String fieldTimestamp = "timestamp";
- private String fieldMeasurements = "measurements";
- private String fieldValues = "values";
- private String separator = ",";
-
- @Override
- public Event serialize(Map<String,String> tuple) {
- if (tuple == null) {
- return null;
- }
-
- String device = tuple.get(fieldDevice);
-
- String ts = tuple.get(fieldTimestamp);
- Long timestamp = ts == null ? System.currentTimeMillis() :
Long.parseLong(ts);
-
- List<String> measurements = null;
- if (tuple.get(fieldMeasurements) != null) {
- measurements =
Arrays.asList(tuple.get(fieldMeasurements).split(separator));
- }
-
- List<String> values = null;
- if (tuple.get(fieldValues) != null) {
- values = Arrays.asList(tuple.get(fieldValues).split(separator));
- }
-
- return new Event(device, timestamp, measurements, values);
+public class DefaultIoTSerializationSchema implements
IoTSerializationSchema<Map<String, String>> {
+
+ private String fieldDevice = "device";
+ private String fieldTimestamp = "timestamp";
+ private String fieldMeasurements = "measurements";
+ private String fieldValues = "values";
+ private String fieldTypes = "types";
+ private String separator = ",";
+
+ @Override
+ public Event serialize(Map<String, String> tuple) {
+ if (tuple == null) {
+ return null;
}
- public String getFieldDevice() {
- return fieldDevice;
- }
+ String device = tuple.get(fieldDevice);
- public void setFieldDevice(String fieldDevice) {
- this.fieldDevice = fieldDevice;
- }
+ String ts = tuple.get(fieldTimestamp);
+ Long timestamp = ts == null ? System.currentTimeMillis() :
Long.parseLong(ts);
- public String getFieldTimestamp() {
- return fieldTimestamp;
+ List<String> measurements = null;
+ if (tuple.get(fieldMeasurements) != null) {
+ measurements =
Arrays.asList(tuple.get(fieldMeasurements).split(separator));
}
- public void setFieldTimestamp(String fieldTimestamp) {
- this.fieldTimestamp = fieldTimestamp;
+ List<TSDataType> types = new ArrayList<>();
+ for (String type : tuple.get(fieldTypes).split(separator)) {
+ types.add(TSDataType.valueOf(type));
}
- public String getFieldMeasurements() {
- return fieldMeasurements;
+ List<Object> values = new ArrayList<>();
+ String[] valuesStr = tuple.get(fieldValues).split(separator);
+ for (int i = 0; i < valuesStr.length; i++) {
+ switch (types.get(i)) {
+ case INT64:
+ values.add(Long.parseLong(valuesStr[i]));
+ break;
+ case DOUBLE:
+ values.add(Double.parseDouble(valuesStr[i]));
+ break;
+ case INT32:
+ values.add(Integer.parseInt(valuesStr[i]));
+ break;
+ case TEXT:
+ values.add(valuesStr[i]);
+ break;
+ case FLOAT:
+ values.add(Float.parseFloat(valuesStr[i]));
+ break;
+ case BOOLEAN:
+ values.add(Boolean.parseBoolean(valuesStr[i]));
+ break;
+ }
}
- public void setFieldMeasurements(String fieldMeasurements) {
- this.fieldMeasurements = fieldMeasurements;
- }
+ return new Event(device, timestamp, measurements, types, values);
+ }
- public String getFieldValues() {
- return fieldValues;
- }
+ public String getFieldDevice() {
+ return fieldDevice;
+ }
- public void setFieldValues(String fieldValues) {
- this.fieldValues = fieldValues;
- }
+ public void setFieldDevice(String fieldDevice) {
+ this.fieldDevice = fieldDevice;
+ }
- public String getSeparator() {
- return separator;
- }
+ public String getFieldTimestamp() {
+ return fieldTimestamp;
+ }
- public void setSeparator(String separator) {
- this.separator = separator;
- }
+ public void setFieldTimestamp(String fieldTimestamp) {
+ this.fieldTimestamp = fieldTimestamp;
+ }
+
+ public String getFieldMeasurements() {
+ return fieldMeasurements;
+ }
+
+ public void setFieldMeasurements(String fieldMeasurements) {
+ this.fieldMeasurements = fieldMeasurements;
+ }
+
+ public String getFieldValues() {
+ return fieldValues;
+ }
+
+ public void setFieldValues(String fieldValues) {
+ this.fieldValues = fieldValues;
+ }
+
+ public String getSeparator() {
+ return separator;
+ }
+
+ public void setSeparator(String separator) {
+ this.separator = separator;
+ }
}
diff --git
a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/Event.java
b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/Event.java
index 6cccb23..83fc4a3 100644
--- a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/Event.java
+++ b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/Event.java
@@ -19,36 +19,49 @@
package org.apache.iotdb.flink;
import java.util.List;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
/**
* Event serializes the device/sensor related data, such as time, measurements
etc.
*/
public class Event {
- private String device;
- private Long timestamp;
- private List<String> measurements;
- private List<String> values;
-
- public Event(String device, Long timestamp, List<String> measurements,
List<String> values) {
- this.device = device;
- this.timestamp = timestamp;
- this.measurements = measurements;
- this.values = values;
- }
-
- public String getDevice() {
- return device;
- }
-
- public Long getTimestamp() {
- return timestamp;
- }
-
- public List<String> getMeasurements() {
- return measurements;
- }
-
- public List<String> getValues() {
- return values;
- }
+
+ private String device;
+ private Long timestamp;
+ private List<String> measurements;
+ private List<TSDataType> types;
+ private List<Object> values;
+
+ public Event(String device, Long timestamp, List<String> measurements,
List<TSDataType> types,
+ List<Object> values) {
+ this.device = device;
+ this.timestamp = timestamp;
+ this.measurements = measurements;
+ this.types = types;
+ this.values = values;
+ }
+
+ public List<TSDataType> getTypes() {
+ return types;
+ }
+
+ public void setTypes(List<TSDataType> types) {
+ this.types = types;
+ }
+
+ public String getDevice() {
+ return device;
+ }
+
+ public Long getTimestamp() {
+ return timestamp;
+ }
+
+ public List<String> getMeasurements() {
+ return measurements;
+ }
+
+ public List<Object> getValues() {
+ return values;
+ }
}
diff --git
a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
index bb3e042..c63871b 100644
--- a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
+++ b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
@@ -32,161 +32,167 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
- * The `IoTDBSink` allows flink jobs to write events into IoTDB timeseries.
- * By default send only one event after another, but you can change to batch
by invoking `withBatchSize(int)`.
+ * The `IoTDBSink` allows flink jobs to write events into IoTDB timeseries. By
default send only one
+ * event after another, but you can change to batch by invoking
`withBatchSize(int)`.
+ *
* @param <IN> the input data type
*/
public class IoTDBSink<IN> extends RichSinkFunction<IN> {
- private static final long serialVersionUID = 1L;
- private static final Logger LOG = LoggerFactory.getLogger(IoTDBSink.class);
-
- private IoTDBOptions options;
- private IoTSerializationSchema<IN> serializationSchema;
- private Map<String, IoTDBOptions.TimeseriesOption> timeseriesOptionMap;
- private transient SessionPool pool;
- private transient ScheduledExecutorService scheduledExecutor;
-
- private int batchSize = 0;
- private int flushIntervalMs = 3000;
- private List<Event> batchList;
- private int sessionPoolSize = 2;
-
- public IoTDBSink(IoTDBOptions options, IoTSerializationSchema<IN> schema) {
- this.options = options;
- this.serializationSchema = schema;
- this.batchList = new LinkedList<>();
- this.timeseriesOptionMap = new HashMap<>();
- for (IoTDBOptions.TimeseriesOption timeseriesOption :
options.getTimeseriesOptionList()) {
- timeseriesOptionMap.put(timeseriesOption.getPath(),
timeseriesOption);
- }
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG = LoggerFactory.getLogger(IoTDBSink.class);
+
+ private IoTDBOptions options;
+ private IoTSerializationSchema<IN> serializationSchema;
+ private Map<String, IoTDBOptions.TimeseriesOption> timeseriesOptionMap;
+ private transient SessionPool pool;
+ private transient ScheduledExecutorService scheduledExecutor;
+
+ private int batchSize = 0;
+ private int flushIntervalMs = 3000;
+ private List<Event> batchList;
+ private int sessionPoolSize = 2;
+
+ public IoTDBSink(IoTDBOptions options, IoTSerializationSchema<IN> schema) {
+ this.options = options;
+ this.serializationSchema = schema;
+ this.batchList = new LinkedList<>();
+ this.timeseriesOptionMap = new HashMap<>();
+ for (IoTDBOptions.TimeseriesOption timeseriesOption :
options.getTimeseriesOptionList()) {
+ timeseriesOptionMap.put(timeseriesOption.getPath(), timeseriesOption);
}
-
- @Override
- public void open(Configuration parameters) throws Exception {
- initSession();
- initScheduler();
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ initSession();
+ initScheduler();
+ }
+
+ void initSession() throws Exception {
+ pool = new SessionPool(options.getHost(), options.getPort(),
options.getUser(),
+ options.getPassword(), sessionPoolSize);
+
+ pool.setStorageGroup(options.getStorageGroup());
+ for (IoTDBOptions.TimeseriesOption option :
options.getTimeseriesOptionList()) {
+ if (!pool.checkTimeseriesExists(option.getPath())) {
+ pool.createTimeseries(option.getPath(), option.getDataType(),
option.getEncoding(),
+ option.getCompressor());
+ }
}
-
- void initSession() throws Exception {
- pool = new SessionPool(options.getHost(), options.getPort(),
options.getUser(), options.getPassword(), sessionPoolSize);
-
- pool.setStorageGroup(options.getStorageGroup());
- for (IoTDBOptions.TimeseriesOption option :
options.getTimeseriesOptionList()) {
- if (!pool.checkTimeseriesExists(option.getPath())) {
- pool.createTimeseries(option.getPath(), option.getDataType(),
option.getEncoding(), option.getCompressor());
- }
+ }
+
+ void initScheduler() {
+ if (batchSize > 0) {
+ scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
+ scheduledExecutor.scheduleAtFixedRate(() -> {
+ try {
+ flush();
+ } catch (Exception e) {
+ LOG.error("flush error", e);
}
+ }, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS);
}
-
- void initScheduler() {
- if (batchSize > 0) {
- scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
- scheduledExecutor.scheduleAtFixedRate(() -> {
- try {
- flush();
- } catch (Exception e) {
- LOG.error("flush error", e);
- }
- }, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS);
- }
- }
-
- // for testing
- void setSessionPool(SessionPool pool) {
- this.pool = pool;
+ }
+
+ // for testing
+ void setSessionPool(SessionPool pool) {
+ this.pool = pool;
+ }
+
+ @Override
+ public void invoke(IN input, Context context) throws Exception {
+ Event event = serializationSchema.serialize(input);
+ if (event == null) {
+ return;
}
- @Override
- public void invoke(IN input, Context context) throws Exception {
- Event event = serializationSchema.serialize(input);
- if (event == null) {
- return;
- }
-
- if (batchSize > 0) {
- synchronized (batchList) {
- batchList.add(event);
- if (batchList.size() >= batchSize) {
- flush();
- }
- return;
- }
+ if (batchSize > 0) {
+ synchronized (batchList) {
+ batchList.add(event);
+ if (batchList.size() >= batchSize) {
+ flush();
}
-
- convertText(event.getDevice(), event.getMeasurements(),
event.getValues());
- pool.insertRecord(event.getDevice(), event.getTimestamp(),
event.getMeasurements(),
- event.getValues());
- LOG.debug("send event successfully");
- }
-
- public IoTDBSink<IN> withBatchSize(int batchSize) {
- Preconditions.checkArgument(batchSize >= 0);
- this.batchSize = batchSize;
- return this;
- }
-
- public IoTDBSink<IN> withFlushIntervalMs(int flushIntervalMs) {
- Preconditions.checkArgument(flushIntervalMs > 0);
- this.flushIntervalMs = flushIntervalMs;
- return this;
+ return;
+ }
}
- public IoTDBSink<IN> withSessionPoolSize(int sessionPoolSize) {
- Preconditions.checkArgument(sessionPoolSize > 0);
- this.sessionPoolSize = sessionPoolSize;
- return this;
+ convertText(event.getDevice(), event.getMeasurements(), event.getValues());
+ pool.insertRecord(event.getDevice(), event.getTimestamp(),
event.getMeasurements(),
+ event.getTypes(), event.getValues());
+ LOG.debug("send event successfully");
+ }
+
+ public IoTDBSink<IN> withBatchSize(int batchSize) {
+ Preconditions.checkArgument(batchSize >= 0);
+ this.batchSize = batchSize;
+ return this;
+ }
+
+ public IoTDBSink<IN> withFlushIntervalMs(int flushIntervalMs) {
+ Preconditions.checkArgument(flushIntervalMs > 0);
+ this.flushIntervalMs = flushIntervalMs;
+ return this;
+ }
+
+ public IoTDBSink<IN> withSessionPoolSize(int sessionPoolSize) {
+ Preconditions.checkArgument(sessionPoolSize > 0);
+ this.sessionPoolSize = sessionPoolSize;
+ return this;
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (pool != null) {
+ try {
+ flush();
+ } catch (Exception e) {
+ LOG.error("flush error", e);
+ }
+ pool.close();
}
-
- @Override
- public void close() throws Exception {
- if (pool != null) {
- try {
- flush();
- } catch (Exception e) {
- LOG.error("flush error", e);
- }
- pool.close();
- }
- if (scheduledExecutor != null) {
- scheduledExecutor.shutdown();
- }
+ if (scheduledExecutor != null) {
+ scheduledExecutor.shutdown();
}
-
- private void convertText(String device, List<String> measurements,
List<String> values) {
- if (device != null && measurements != null && values != null &&
measurements.size() == values.size()) {
- for (int i = 0; i < measurements.size(); i++) {
- String measurement = device + "." + measurements.get(i);
- IoTDBOptions.TimeseriesOption timeseriesOption =
timeseriesOptionMap.get(measurement);
- if (timeseriesOption!= null &&
TSDataType.TEXT.equals(timeseriesOption.getDataType())) {
- // The TEXT data type should be covered by " or '
- values.set(i, "'" + values.get(i) + "'");
- }
- }
+ }
+
+ private void convertText(String device, List<String> measurements,
List<Object> values) {
+ if (device != null && measurements != null && values != null &&
measurements.size() == values
+ .size()) {
+ for (int i = 0; i < measurements.size(); i++) {
+ String measurement = device + "." + measurements.get(i);
+ IoTDBOptions.TimeseriesOption timeseriesOption =
timeseriesOptionMap.get(measurement);
+ if (timeseriesOption != null &&
TSDataType.TEXT.equals(timeseriesOption.getDataType())) {
+ // The TEXT data type should be covered by " or '
+ values.set(i, "'" + values.get(i) + "'");
}
+ }
}
-
- private void flush() throws Exception {
- if (batchSize > 0) {
- synchronized (batchList) {
- if (batchList.size() > 0) {
- List<String> deviceIds = new ArrayList<>();
- List<Long> timestamps = new ArrayList<>();
- List<List<String>> measurementsList = new ArrayList<>();
- List<List<String>> valuesList = new ArrayList<>();
-
- for (Event event : batchList) {
- convertText(event.getDevice(),
event.getMeasurements(), event.getValues());
- deviceIds.add(event.getDevice());
- timestamps.add(event.getTimestamp());
- measurementsList.add(event.getMeasurements());
- valuesList.add(event.getValues());
- }
- pool.insertRecords(deviceIds, timestamps,
measurementsList, valuesList);
- LOG.debug("send event successfully");
- batchList.clear();
- }
- }
+ }
+
+ private void flush() throws Exception {
+ if (batchSize > 0) {
+ synchronized (batchList) {
+ if (batchList.size() > 0) {
+ List<String> deviceIds = new ArrayList<>();
+ List<Long> timestamps = new ArrayList<>();
+ List<List<String>> measurementsList = new ArrayList<>();
+ List<List<TSDataType>> typesList = new ArrayList<>();
+ List<List<Object>> valuesList = new ArrayList<>();
+
+ for (Event event : batchList) {
+ convertText(event.getDevice(), event.getMeasurements(),
event.getValues());
+ deviceIds.add(event.getDevice());
+ timestamps.add(event.getTimestamp());
+ measurementsList.add(event.getMeasurements());
+ typesList.add(event.getTypes());
+ valuesList.add(event.getValues());
+ }
+ pool.insertRecords(deviceIds, timestamps, measurementsList,
typesList, valuesList);
+ LOG.debug("send event successfully");
+ batchList.clear();
}
+ }
}
+ }
}
diff --git
a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/DefaultIoTSerializationSchemaTest.java
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/DefaultIoTSerializationSchemaTest.java
index efb5a34..8e696e4 100644
---
a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/DefaultIoTSerializationSchemaTest.java
+++
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/DefaultIoTSerializationSchemaTest.java
@@ -18,13 +18,12 @@
package org.apache.iotdb.flink;
-import com.google.common.collect.Lists;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import com.google.common.collect.Lists;
import java.util.HashMap;
import java.util.Map;
-
-import static org.junit.Assert.*;
+import org.junit.Test;
public class DefaultIoTSerializationSchemaTest {
@@ -38,12 +37,14 @@ public class DefaultIoTSerializationSchemaTest {
tuple.put("device", "root.sg.D01");
tuple.put("timestamp", "1581861293000");
tuple.put("measurements", "temperature");
+ tuple.put("types", "DOUBLE");
tuple.put("values", "36.5");
Event event = serializationSchema.serialize(tuple);
assertEquals(tuple.get("device"), event.getDevice());
assertEquals(tuple.get("timestamp"),
String.valueOf(event.getTimestamp()));
assertEquals(tuple.get("measurements"),
event.getMeasurements().get(0));
- assertEquals(tuple.get("values"), event.getValues().get(0));
+ assertEquals(tuple.get("types"), event.getTypes().get(0).toString());
+ assertEquals(tuple.get("values"),
String.valueOf(event.getValues().get(0)));
}
}
diff --git
a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java
index 7f6fe5e..930565b 100644
---
a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java
+++
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java
@@ -18,85 +18,94 @@
package org.apache.iotdb.flink;
-import com.google.common.collect.Lists;
-import org.apache.iotdb.session.pool.SessionPool;
-import org.junit.Before;
-import org.junit.Test;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import com.google.common.collect.Lists;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
+import org.apache.iotdb.session.pool.SessionPool;
+import org.junit.Before;
+import org.junit.Test;
public class IoTDBSinkBatchInsertTest {
- private IoTDBSink ioTDBSink;
- private SessionPool pool;
-
- @Before
- public void setUp() throws Exception {
- IoTDBOptions options = new IoTDBOptions();
- options.setTimeseriesOptionList(Lists.newArrayList(new
IoTDBOptions.TimeseriesOption("root.sg.D01.temperature")));
- ioTDBSink = new IoTDBSink(options, new
DefaultIoTSerializationSchema());
- ioTDBSink.withBatchSize(3);
-
- pool = mock(SessionPool.class);
- ioTDBSink.setSessionPool(pool);
- }
-
- @Test
- public void testBatchInsert() throws Exception {
- Map<String,String> tuple = new HashMap();
- tuple.put("device", "root.sg.D01");
- tuple.put("timestamp", "1581861293000");
- tuple.put("measurements", "temperature");
- tuple.put("values", "36.5");
- ioTDBSink.invoke(tuple, null);
-
- verifyZeroInteractions(pool);
-
- tuple = new HashMap();
- tuple.put("device", "root.sg.D01");
- tuple.put("timestamp", "1581861293001");
- tuple.put("measurements", "temperature");
- tuple.put("values", "37.2");
- ioTDBSink.invoke(tuple, null);
-
- verifyZeroInteractions(pool);
-
- tuple = new HashMap();
- tuple.put("device", "root.sg.D01");
- tuple.put("timestamp", "1581861293003");
- tuple.put("measurements", "temperature");
- tuple.put("values", "37.1");
- ioTDBSink.invoke(tuple, null);
-
- verify(pool).insertRecords(any(List.class), any(List.class),
any(List.class), any(List.class));
-
- tuple = new HashMap();
- tuple.put("device", "root.sg.D01");
- tuple.put("timestamp", "1581861293005");
- tuple.put("measurements", "temperature");
- tuple.put("values", "36.5");
- ioTDBSink.invoke(tuple, null);
-
- verifyZeroInteractions(pool);
- }
-
- @Test
- public void close() throws Exception {
- Map<String,String> tuple = new HashMap();
- tuple.put("device", "root.sg.D01");
- tuple.put("timestamp", "1581861293005");
- tuple.put("measurements", "temperature");
- tuple.put("values", "36.5");
- ioTDBSink.invoke(tuple, null);
- verifyZeroInteractions(pool);
-
- ioTDBSink.close();
- verify(pool).insertRecords(any(List.class), any(List.class),
any(List.class), any(List.class));
- verify(pool).close();
- }
+ private IoTDBSink ioTDBSink;
+ private SessionPool pool;
+
+ @Before
+ public void setUp() throws Exception {
+ IoTDBOptions options = new IoTDBOptions();
+ options.setTimeseriesOptionList(
+ Lists.newArrayList(new
IoTDBOptions.TimeseriesOption("root.sg.D01.temperature")));
+ ioTDBSink = new IoTDBSink(options, new DefaultIoTSerializationSchema());
+ ioTDBSink.withBatchSize(3);
+
+ pool = mock(SessionPool.class);
+ ioTDBSink.setSessionPool(pool);
+ }
+
+ @Test
+ public void testBatchInsert() throws Exception {
+ Map<String, String> tuple = new HashMap();
+ tuple.put("device", "root.sg.D01");
+ tuple.put("timestamp", "1581861293000");
+ tuple.put("measurements", "temperature");
+ tuple.put("types", "DOUBLE");
+ tuple.put("values", "36.5");
+ ioTDBSink.invoke(tuple, null);
+
+ verifyZeroInteractions(pool);
+
+ tuple = new HashMap();
+ tuple.put("device", "root.sg.D01");
+ tuple.put("timestamp", "1581861293001");
+ tuple.put("measurements", "temperature");
+ tuple.put("types", "DOUBLE");
+ tuple.put("values", "37.2");
+ ioTDBSink.invoke(tuple, null);
+
+ verifyZeroInteractions(pool);
+
+ tuple = new HashMap();
+ tuple.put("device", "root.sg.D01");
+ tuple.put("timestamp", "1581861293003");
+ tuple.put("measurements", "temperature");
+ tuple.put("types", "DOUBLE");
+ tuple.put("values", "37.1");
+ ioTDBSink.invoke(tuple, null);
+
+ verify(pool).insertRecords(any(List.class), any(List.class),
any(List.class), any(List.class),
+ any(List.class));
+
+ tuple = new HashMap();
+ tuple.put("device", "root.sg.D01");
+ tuple.put("timestamp", "1581861293005");
+ tuple.put("measurements", "temperature");
+ tuple.put("types", "DOUBLE");
+ tuple.put("values", "36.5");
+ ioTDBSink.invoke(tuple, null);
+
+ verifyZeroInteractions(pool);
+ }
+
+ @Test
+ public void close() throws Exception {
+ Map<String, String> tuple = new HashMap();
+ tuple.put("device", "root.sg.D01");
+ tuple.put("timestamp", "1581861293005");
+ tuple.put("measurements", "temperature");
+ tuple.put("types", "DOUBLE");
+ tuple.put("values", "36.5");
+ ioTDBSink.invoke(tuple, null);
+ verifyZeroInteractions(pool);
+
+ ioTDBSink.close();
+ verify(pool).insertRecords(any(List.class), any(List.class),
any(List.class), any(List.class),
+ any(List.class));
+ verify(pool).close();
+ }
}
diff --git
a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchTimerTest.java
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchTimerTest.java
index f8c7f7e..f9e2d62 100644
---
a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchTimerTest.java
+++
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchTimerTest.java
@@ -18,57 +18,61 @@
package org.apache.iotdb.flink;
-import com.google.common.collect.Lists;
-import org.apache.iotdb.session.pool.SessionPool;
-import org.junit.Before;
-import org.junit.Test;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import com.google.common.collect.Lists;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
+import org.apache.iotdb.session.pool.SessionPool;
+import org.junit.Before;
+import org.junit.Test;
public class IoTDBSinkBatchTimerTest {
- private IoTDBSink ioTDBSink;
- private SessionPool pool;
+ private IoTDBSink ioTDBSink;
+ private SessionPool pool;
- @Before
- public void setUp() throws Exception {
- IoTDBOptions options = new IoTDBOptions();
- options.setTimeseriesOptionList(Lists.newArrayList(new
IoTDBOptions.TimeseriesOption("root.sg.D01.temperature")));
- ioTDBSink = new IoTDBSink(options, new
DefaultIoTSerializationSchema());
- ioTDBSink.withBatchSize(3);
- ioTDBSink.withFlushIntervalMs(1000);
- ioTDBSink.initScheduler();
+ @Before
+ public void setUp() throws Exception {
+ IoTDBOptions options = new IoTDBOptions();
+ options.setTimeseriesOptionList(
+ Lists.newArrayList(new
IoTDBOptions.TimeseriesOption("root.sg.D01.temperature")));
+ ioTDBSink = new IoTDBSink(options, new DefaultIoTSerializationSchema());
+ ioTDBSink.withBatchSize(3);
+ ioTDBSink.withFlushIntervalMs(1000);
+ ioTDBSink.initScheduler();
- pool = mock(SessionPool.class);
- ioTDBSink.setSessionPool(pool);
- }
+ pool = mock(SessionPool.class);
+ ioTDBSink.setSessionPool(pool);
+ }
- @Test
- public void testBatchInsert() throws Exception {
- Map<String,String> tuple = new HashMap();
- tuple.put("device", "root.sg.D01");
- tuple.put("timestamp", "1581861293000");
- tuple.put("measurements", "temperature");
- tuple.put("values", "36.5");
- ioTDBSink.invoke(tuple, null);
+ @Test
+ public void testBatchInsert() throws Exception {
+ Map<String, String> tuple = new HashMap();
+ tuple.put("device", "root.sg.D01");
+ tuple.put("timestamp", "1581861293000");
+ tuple.put("measurements", "temperature");
+ tuple.put("types", "DOUBLE");
+ tuple.put("values", "36.5");
+ ioTDBSink.invoke(tuple, null);
- Thread.sleep(2500);
+ Thread.sleep(2500);
- verify(pool).insertRecords(any(List.class), any(List.class),
any(List.class), any(List.class));
+ verify(pool).insertRecords(any(List.class), any(List.class),
any(List.class), any(List.class),
+ any(List.class));
- Thread.sleep(1000);
+ Thread.sleep(1000);
- verifyZeroInteractions(pool);
- }
+ verifyZeroInteractions(pool);
+ }
- @Test
- public void close() throws Exception {
- ioTDBSink.close();
- verify(pool).close();
- }
+ @Test
+ public void close() throws Exception {
+ ioTDBSink.close();
+ verify(pool).close();
+ }
}
diff --git
a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkInsertTest.java
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkInsertTest.java
index 3bcb367..6c268f6 100644
---
a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkInsertTest.java
+++
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkInsertTest.java
@@ -18,49 +18,51 @@
package org.apache.iotdb.flink;
-import com.google.common.collect.Lists;
-import org.apache.iotdb.session.pool.SessionPool;
-import org.junit.Before;
-import org.junit.Test;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import com.google.common.collect.Lists;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
+import org.apache.iotdb.session.pool.SessionPool;
+import org.junit.Before;
+import org.junit.Test;
public class IoTDBSinkInsertTest {
- private IoTDBSink ioTDBSink;
- private SessionPool pool;
+ private IoTDBSink ioTDBSink;
+ private SessionPool pool;
- @Before
- public void setUp() throws Exception {
- IoTDBOptions options = new IoTDBOptions();
- options.setTimeseriesOptionList(Lists.newArrayList(new
IoTDBOptions.TimeseriesOption("root.sg.D01.temperature")));
- ioTDBSink = new IoTDBSink(options, new
DefaultIoTSerializationSchema());
+ @Before
+ public void setUp() throws Exception {
+ IoTDBOptions options = new IoTDBOptions();
+ options.setTimeseriesOptionList(
+ Lists.newArrayList(new
IoTDBOptions.TimeseriesOption("root.sg.D01.temperature")));
+ ioTDBSink = new IoTDBSink(options, new DefaultIoTSerializationSchema());
- pool = mock(SessionPool.class);
- ioTDBSink.setSessionPool(pool);
- }
+ pool = mock(SessionPool.class);
+ ioTDBSink.setSessionPool(pool);
+ }
- @Test
- public void testInsert() throws Exception {
- Map<String,String> tuple = new HashMap();
- tuple.put("device", "root.sg.D01");
- tuple.put("timestamp", "1581861293000");
- tuple.put("measurements", "temperature");
- tuple.put("values", "36.5");
+ @Test
+ public void testInsert() throws Exception {
+ Map<String, String> tuple = new HashMap();
+ tuple.put("device", "root.sg.D01");
+ tuple.put("timestamp", "1581861293000");
+ tuple.put("measurements", "temperature");
+ tuple.put("types", "DOUBLE");
+ tuple.put("values", "36.5");
- ioTDBSink.invoke(tuple, null);
- verify(pool).insertRecord(any(String.class), any(Long.class),
any(List.class), any(List.class));
- }
+ ioTDBSink.invoke(tuple, null);
+ verify(pool).insertRecord(any(String.class), any(Long.class),
any(List.class), any(List.class),
+ any(List.class));
+ }
- @Test
- public void close() throws Exception {
- ioTDBSink.close();
- verify(pool).close();
- }
+ @Test
+ public void close() throws Exception {
+ ioTDBSink.close();
+ verify(pool).close();
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 5292938..dde8bee 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -32,7 +32,6 @@ import
org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.rescon.TVListAllocator;
-import org.apache.iotdb.db.utils.CommonUtils;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -41,12 +40,9 @@ import
org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
public abstract class AbstractMemTable implements IMemTable {
+ private final Map<String, Map<String, IWritableMemChunk>> memTableMap;
private long version = Long.MAX_VALUE;
-
private List<Modification> modifications = new ArrayList<>();
-
- private final Map<String, Map<String, IWritableMemChunk>> memTableMap;
-
private long memSize = 0;
public AbstractMemTable() {
@@ -86,20 +82,14 @@ public abstract class AbstractMemTable implements IMemTable
{
protected abstract IWritableMemChunk genMemSeries(MeasurementSchema schema);
@Override
- public void insert(InsertPlan insertPlan) throws WriteProcessException {
- try {
- for (int i = 0; i < insertPlan.getValues().length; i++) {
-
- Object value =
CommonUtils.parseValue(insertPlan.getSchemas()[i].getType(),
- insertPlan.getValues()[i]);
+ public void insert(InsertPlan insertPlan) {
+ for (int i = 0; i < insertPlan.getValues().length; i++) {
- memSize +=
MemUtils.getRecordSize(insertPlan.getSchemas()[i].getType(), value);
+ Object value = insertPlan.getValues()[i];
+ memSize += MemUtils.getRecordSize(insertPlan.getSchemas()[i].getType(),
value);
- write(insertPlan.getDeviceId(), insertPlan.getMeasurements()[i],
- insertPlan.getSchemas()[i], insertPlan.getTime(), value);
- }
- } catch (QueryProcessException e) {
- throw new WriteProcessException(e.getMessage());
+ write(insertPlan.getDeviceId(), insertPlan.getMeasurements()[i],
+ insertPlan.getSchemas()[i], insertPlan.getTime(), value);
}
}
@@ -209,14 +199,14 @@ public abstract class AbstractMemTable implements
IMemTable {
this.modifications.add(deletion);
}
- public void setVersion(long version) {
- this.version = version;
- }
-
public long getVersion() {
return version;
}
+ public void setVersion(long version) {
+ this.version = version;
+ }
+
@Override
public void release() {
for (Entry<String, Map<String, IWritableMemChunk>> entry :
memTableMap.entrySet()) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index b0f308e..9433bcb 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -57,8 +57,8 @@ import
org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
import org.apache.iotdb.db.writelog.node.WriteLogNode;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.utils.Pair;
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 63bae27..38ab58a 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -168,7 +168,7 @@ public class PlanExecutor implements IPlanExecutor {
@Override
public QueryDataSet processQuery(PhysicalPlan queryPlan, QueryContext
context)
throws IOException, StorageEngineException,
QueryFilterOptimizationException,
- QueryProcessException, MetadataException {
+ QueryProcessException, MetadataException {
if (queryPlan instanceof QueryPlan) {
return processDataQuery((QueryPlan) queryPlan, context);
} else if (queryPlan instanceof AuthorPlan) {
@@ -255,7 +255,7 @@ public class PlanExecutor implements IPlanExecutor {
}
private void operateMerge(MergePlan plan) throws StorageEngineException {
- if(plan.getOperatorType() == OperatorType.FULL_MERGE) {
+ if (plan.getOperatorType() == OperatorType.FULL_MERGE) {
StorageEngine.getInstance().mergeAll(true);
} else {
StorageEngine.getInstance()
@@ -272,10 +272,10 @@ public class PlanExecutor implements IPlanExecutor {
}
private void operateFlush(FlushPlan plan) throws StorageGroupNotSetException
{
- if(plan.getPaths() == null) {
+ if (plan.getPaths() == null) {
StorageEngine.getInstance().syncCloseAllProcessor();
} else {
- if(plan.isSeq() == null) {
+ if (plan.isSeq() == null) {
for (Path storageGroup : plan.getPaths()) {
StorageEngine.getInstance().asyncCloseProcessor(storageGroup.toString(), true);
StorageEngine.getInstance().asyncCloseProcessor(storageGroup.toString(), false);
@@ -290,7 +290,7 @@ public class PlanExecutor implements IPlanExecutor {
protected QueryDataSet processDataQuery(QueryPlan queryPlan, QueryContext
context)
throws StorageEngineException, QueryFilterOptimizationException,
QueryProcessException,
- IOException {
+ IOException {
QueryDataSet queryDataSet;
if (queryPlan instanceof AlignByDevicePlan) {
queryDataSet = new AlignByDeviceDataSet((AlignByDevicePlan) queryPlan,
context, queryRouter);
@@ -869,7 +869,6 @@ public class PlanExecutor implements IPlanExecutor {
String[] measurementList = insertPlan.getMeasurements();
String deviceId = insertPlan.getDeviceId();
node = mManager.getDeviceNodeWithAutoCreateAndReadLock(deviceId);
- String[] strValues = insertPlan.getValues();
MeasurementSchema[] schemas = new
MeasurementSchema[measurementList.length];
for (int i = 0; i < measurementList.length; i++) {
@@ -878,7 +877,7 @@ public class PlanExecutor implements IPlanExecutor {
if
(!IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) {
throw new PathNotExistException(deviceId + PATH_SEPARATOR +
measurement);
}
- TSDataType dataType =
TypeInferenceUtils.getPredictedDataType(strValues[i]);
+ TSDataType dataType =
TypeInferenceUtils.getPredictedDataType(insertPlan.getValues()[i]);
Path path = new Path(deviceId, measurement);
internalCreateTimeseries(path.toString(), dataType);
}
@@ -886,6 +885,10 @@ public class PlanExecutor implements IPlanExecutor {
schemas[i] = measurementNode.getSchema();
// reset measurement to common name instead of alias
measurementList[i] = measurementNode.getName();
+
+ if(insertPlan.getStrValueList() == null) {
+ checkType(insertPlan, i, measurementNode.getSchema().getType());
+ }
}
insertPlan.setMeasurements(measurementList);
@@ -900,7 +903,53 @@ public class PlanExecutor implements IPlanExecutor {
}
}
- /** create timeseries with ignore PathAlreadyExistException */
+ private void checkType(InsertPlan plan, int loc, TSDataType type) {
+ plan.getTypes()[loc] = type;
+ try {
+ switch (type) {
+ case INT32:
+ if (!(plan.getValues()[loc] instanceof Integer)) {
+ plan.getValues()[loc] =
+ Integer.parseInt(((Binary)
plan.getValues()[loc]).getStringValue());
+ }
+ break;
+ case INT64:
+ if (!(plan.getValues()[loc] instanceof Long)) {
+ plan.getValues()[loc] =
+ Long.parseLong(((Binary)
plan.getValues()[loc]).getStringValue());
+ }
+ break;
+ case DOUBLE:
+ if (!(plan.getValues()[loc] instanceof Double)) {
+ plan.getValues()[loc] =
+ Double.parseDouble(((Binary)
plan.getValues()[loc]).getStringValue());
+ }
+ break;
+ case FLOAT:
+ if (!(plan.getValues()[loc] instanceof Float)) {
+ plan.getValues()[loc] =
+ Float.parseFloat(((Binary)
plan.getValues()[loc]).getStringValue());
+ }
+ break;
+ case BOOLEAN:
+ if (!(plan.getValues()[loc] instanceof Boolean)) {
+ plan.getValues()[loc] =
+ Boolean.parseBoolean(((Binary)
plan.getValues()[loc]).getStringValue());
+ }
+ break;
+ case TEXT:
+ // need to do nothing
+ break;
+ }
+ }
+ catch (ClassCastException e){
+ logger.error("inconsistent type between client and server");
+ }
+ }
+
+ /**
+ * create timeseries with ignore PathAlreadyExistException
+ */
private void internalCreateTimeseries(String path, TSDataType dataType)
throws MetadataException {
try {
mManager.createTimeseries(
@@ -919,7 +968,9 @@ public class PlanExecutor implements IPlanExecutor {
}
}
- /** Get default encoding by dataType */
+ /**
+ * Get default encoding by dataType
+ */
private TSEncoding getDefaultEncoding(TSDataType dataType) {
IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
switch (dataType) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
index 5f334af..8174fc2 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
@@ -25,16 +25,18 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
-
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.utils.CommonUtils;
import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -44,21 +46,59 @@ public class InsertPlan extends PhysicalPlan {
private long time;
private String deviceId;
private String[] measurements;
- private String[] values;
+ private Object[] values;
+ private TSDataType[] types;
private MeasurementSchema[] schemas;
+ public String[] getStrValueList() {
+ return strValueList;
+ }
+
+ public void setStrValueList(String[] strValueList) {
+ this.strValueList = strValueList;
+ }
+
+ // only for sql
+ private String[] strValueList;
+
public InsertPlan() {
super(false, OperatorType.INSERT);
canbeSplit = false;
}
@TestOnly
- public InsertPlan(String deviceId, long insertTime, String measurement,
String insertValue) {
+ public InsertPlan(String deviceId, long insertTime, String[] measurements,
TSDataType[] types,
+ String[] insertValues) {
super(false, OperatorType.INSERT);
this.time = insertTime;
this.deviceId = deviceId;
- this.measurements = new String[] {measurement};
- this.values = new String[] {insertValue};
+ this.measurements = measurements;
+
+ this.types = types;
+ this.values = new Object[measurements.length];
+ for (int i = 0; i < measurements.length; i++) {
+ try {
+ values[i] = CommonUtils.parseValueForTest(types[i], insertValues[i]);
+ } catch (QueryProcessException e) {
+ e.printStackTrace();
+ }
+ }
+ canbeSplit = false;
+ }
+
+ @TestOnly
+ public InsertPlan(String deviceId, long insertTime, String measurement,
TSDataType type, String insertValue) {
+ super(false, OperatorType.INSERT);
+ this.time = insertTime;
+ this.deviceId = deviceId;
+ this.measurements = new String[]{measurement};
+ this.types = new TSDataType[]{type};
+ this.values = new Object[1];
+ try {
+ values[0] = CommonUtils.parseValueForTest(types[0], insertValue);
+ } catch (QueryProcessException e) {
+ e.printStackTrace();
+ }
canbeSplit = false;
}
@@ -68,25 +108,43 @@ public class InsertPlan extends PhysicalPlan {
this.time = tsRecord.time;
this.measurements = new String[tsRecord.dataPointList.size()];
this.schemas = new MeasurementSchema[tsRecord.dataPointList.size()];
- this.values = new String[tsRecord.dataPointList.size()];
+ this.types = new TSDataType[tsRecord.dataPointList.size()];
+ this.values = new Object[tsRecord.dataPointList.size()];
for (int i = 0; i < tsRecord.dataPointList.size(); i++) {
measurements[i] = tsRecord.dataPointList.get(i).getMeasurementId();
- schemas[i] = new MeasurementSchema(measurements[i],
tsRecord.dataPointList.get(i).getType(), TSEncoding.PLAIN);
- values[i] = tsRecord.dataPointList.get(i).getValue().toString();
+ schemas[i] = new MeasurementSchema(measurements[i],
tsRecord.dataPointList.get(i).getType(),
+ TSEncoding.PLAIN);
+ types[i] = tsRecord.dataPointList.get(i).getType();
+ values[i] = tsRecord.dataPointList.get(i).getValue();
}
canbeSplit = false;
}
+ public InsertPlan(String deviceId, long insertTime, String[]
measurementList, TSDataType[] types,
+ Object[] insertValues) {
+ super(false, Operator.OperatorType.INSERT);
+ this.time = insertTime;
+ this.deviceId = deviceId;
+ this.measurements = measurementList;
+ this.types = types;
+ this.values = insertValues;
+ canbeSplit = false;
+ }
+
public InsertPlan(String deviceId, long insertTime, String[] measurementList,
String[] insertValues) {
super(false, Operator.OperatorType.INSERT);
this.time = insertTime;
this.deviceId = deviceId;
this.measurements = measurementList;
- this.values = insertValues;
+ // build types and values
+ this.types = new TSDataType[measurements.length];
+ this.values = new Object[measurements.length];
+ this.strValueList = insertValues;
canbeSplit = false;
}
+
public long getTime() {
return time;
}
@@ -101,6 +159,17 @@ public class InsertPlan extends PhysicalPlan {
public void setSchemas(MeasurementSchema[] schemas) {
this.schemas = schemas;
+ if (strValueList != null) {
+ for (int i = 0; i < schemas.length; i++) {
+ types[i] = schemas[i].getType();
+ try {
+ values[i] = CommonUtils.parseValue(types[i], strValueList[i]);
+ } catch (QueryProcessException e) {
+ e.printStackTrace();
+ }
+ }
+ strValueList = null;
+ }
}
@Override
@@ -129,11 +198,11 @@ public class InsertPlan extends PhysicalPlan {
this.measurements = measurements;
}
- public String[] getValues() {
+ public Object[] getValues() {
return this.values;
}
- public void setValues(String[] values) {
+ public void setValues(Object[] values) {
this.values = values;
}
@@ -174,8 +243,102 @@ public class InsertPlan extends PhysicalPlan {
schema.serializeTo(stream);
}
- for (String m : values) {
- putString(stream, m);
+ try {
+ putValues(stream);
+ } catch (QueryProcessException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private void putValues(DataOutputStream outputStream) throws
QueryProcessException, IOException {
+ for (int i = 0; i < values.length; i++) {
+ ReadWriteIOUtils.write(types[i], outputStream);
+ switch (types[i]) {
+ case BOOLEAN:
+ ReadWriteIOUtils.write((Boolean) values[i], outputStream);
+ break;
+ case INT32:
+ ReadWriteIOUtils.write((Integer) values[i], outputStream);
+ break;
+ case INT64:
+ ReadWriteIOUtils.write((Long) values[i], outputStream);
+ break;
+ case FLOAT:
+ ReadWriteIOUtils.write((Float) values[i], outputStream);
+ break;
+ case DOUBLE:
+ ReadWriteIOUtils.write((Double) values[i], outputStream);
+ break;
+ case TEXT:
+ ReadWriteIOUtils.write((Binary) values[i], outputStream);
+ break;
+ default:
+ throw new QueryProcessException("Unsupported data type:" + types[i]);
+ }
+ }
+ }
+
+ private void putValues(ByteBuffer buffer) throws QueryProcessException {
+ for (int i = 0; i < values.length; i++) {
+ ReadWriteIOUtils.write(types[i], buffer);
+ switch (types[i]) {
+ case BOOLEAN:
+ ReadWriteIOUtils.write((Boolean) values[i], buffer);
+ break;
+ case INT32:
+ ReadWriteIOUtils.write((Integer) values[i], buffer);
+ break;
+ case INT64:
+ ReadWriteIOUtils.write((Long) values[i], buffer);
+ break;
+ case FLOAT:
+ ReadWriteIOUtils.write((Float) values[i], buffer);
+ break;
+ case DOUBLE:
+ ReadWriteIOUtils.write((Double) values[i], buffer);
+ break;
+ case TEXT:
+ ReadWriteIOUtils.write((Binary) values[i], buffer);
+ break;
+ default:
+ throw new QueryProcessException("Unsupported data type:" + types[i]);
+ }
+ }
+ }
+
+ public TSDataType[] getTypes() {
+ return types;
+ }
+
+ public void setTypes(TSDataType[] types) {
+ this.types = types;
+ }
+
+ public void setValues(ByteBuffer buffer) throws QueryProcessException {
+ for (int i = 0; i < measurements.length; i++) {
+ types[i] = ReadWriteIOUtils.readDataType(buffer);
+ switch (types[i]) {
+ case BOOLEAN:
+ values[i] = ReadWriteIOUtils.readBool(buffer);
+ break;
+ case INT32:
+ values[i] = ReadWriteIOUtils.readInt(buffer);
+ break;
+ case INT64:
+ values[i] = ReadWriteIOUtils.readLong(buffer);
+ break;
+ case FLOAT:
+ values[i] = ReadWriteIOUtils.readFloat(buffer);
+ break;
+ case DOUBLE:
+ values[i] = ReadWriteIOUtils.readDouble(buffer);
+ break;
+ case TEXT:
+ values[i] = ReadWriteIOUtils.readBinary(buffer);
+ break;
+ default:
+ throw new QueryProcessException("Unsupported data type:" + types[i]);
+ }
}
}
@@ -193,8 +356,10 @@ public class InsertPlan extends PhysicalPlan {
putString(buffer, m);
}
- for (String m : values) {
- putString(buffer, m);
+ try {
+ putValues(buffer);
+ } catch (QueryProcessException e) {
+ e.printStackTrace();
}
}
@@ -210,9 +375,12 @@ public class InsertPlan extends PhysicalPlan {
measurements[i] = readString(buffer);
}
- this.values = new String[measurementSize];
- for (int i = 0; i < measurementSize; i++) {
- values[i] = readString(buffer);
+ this.types = new TSDataType[measurementSize];
+ this.values = new Object[measurementSize];
+ try {
+ setValues(buffer);
+ } catch (QueryProcessException e) {
+ e.printStackTrace();
}
}
@@ -225,7 +393,8 @@ public class InsertPlan extends PhysicalPlan {
if (measurementIndex >= values.length) {
return null;
}
- Object value = CommonUtils.parseValue(schemas[measurementIndex].getType(),
values[measurementIndex]);
- return new TimeValuePair(time,
TsPrimitiveType.getByType(schemas[measurementIndex].getType(), value));
+ Object value = values[measurementIndex];
+ return new TimeValuePair(time,
+ TsPrimitiveType.getByType(schemas[measurementIndex].getType(), value));
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index a083856..5627616 100644
---
a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++
b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -178,6 +178,7 @@ public class PhysicalGenerator {
throw new LogicalOperatorException(
"For Insert command, cannot specified more than one seriesPath:
" + paths);
}
+
return new InsertPlan(
paths.get(0).getFullPath(),
insert.getTime(),
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index b10ad42..fef8785 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -136,13 +136,10 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
private static final int DELETE_SIZE = 20;
private static final String ERROR_PARSING_SQL =
"meet error while parsing SQL to physical plan: {}";
-
- private boolean enableMetric =
IoTDBDescriptor.getInstance().getConfig().isEnableMetricService();
private static final List<SqlArgument> sqlArgumentList = new
ArrayList<>(MAX_SIZE);
-
protected Planner processor;
protected IPlanExecutor executor;
-
+ private boolean enableMetric =
IoTDBDescriptor.getInstance().getConfig().isEnableMetricService();
// Record the username for every rpc connection (session).
private Map<Long, String> sessionIdUsernameMap = new ConcurrentHashMap<>();
private Map<Long, ZoneId> sessionIdZoneIdMap = new ConcurrentHashMap<>();
@@ -170,6 +167,10 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
executor = new PlanExecutor();
}
+ public static List<SqlArgument> getSqlArgumentList() {
+ return sqlArgumentList;
+ }
+
@Override
public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException
{
logger.info(
@@ -1068,15 +1069,22 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
InsertPlan plan = new InsertPlan();
for (int i = 0; i < req.deviceIds.size(); i++) {
- plan.setDeviceId(req.getDeviceIds().get(i));
- plan.setTime(req.getTimestamps().get(i));
- plan.setMeasurements(req.getMeasurementsList().get(i).toArray(new
String[0]));
- plan.setValues(req.getValuesList().get(i).toArray(new String[0]));
- TSStatus status = checkAuthority(plan, req.getSessionId());
- if (status != null) {
- resp.addToStatusList(status);
- } else {
- resp.addToStatusList(executePlan(plan));
+ try {
+ plan.setDeviceId(req.getDeviceIds().get(i));
+ plan.setTime(req.getTimestamps().get(i));
+ plan.setMeasurements(req.getMeasurementsList().get(i).toArray(new
String[0]));
+ plan.setTypes(new TSDataType[plan.getMeasurements().length]);
+ plan.setValues(new Object[plan.getMeasurements().length]);
+ plan.setValues(req.valuesList.get(i));
+ TSStatus status = checkAuthority(plan, req.getSessionId());
+ if (status != null) {
+ resp.addToStatusList(status);
+ } else {
+ resp.addToStatusList(executePlan(plan));
+ }
+ } catch (Exception e) {
+ logger.error("meet error when insert in batch", e);
+
resp.addToStatusList(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
}
}
@@ -1113,7 +1121,9 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
plan.setDeviceId(req.getDeviceId());
plan.setTime(req.getTimestamp());
plan.setMeasurements(req.getMeasurements().toArray(new String[0]));
- plan.setValues(req.getValues().toArray(new String[0]));
+ plan.setTypes(new TSDataType[plan.getMeasurements().length]);
+ plan.setValues(new Object[plan.getMeasurements().length]);
+ plan.setValues(req.values);
TSStatus status = checkAuthority(plan, req.getSessionId());
if (status != null) {
@@ -1435,10 +1445,6 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
}
}
- public static List<SqlArgument> getSqlArgumentList() {
- return sqlArgumentList;
- }
-
private long generateQueryId(boolean isDataQuery) {
return QueryResourceManager.getInstance().assignQueryId(isDataQuery);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
b/server/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
index 3f9e43a..c6575de 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
@@ -31,7 +31,8 @@ import org.apache.iotdb.tsfile.utils.Binary;
public class CommonUtils {
- private CommonUtils(){}
+ private CommonUtils() {
+ }
/**
* get JDK version.
@@ -72,7 +73,8 @@ public class CommonUtils {
switch (dataType) {
case BOOLEAN:
value = value.toLowerCase();
- if (SQLConstant.BOOLEAN_FALSE_NUM.equals(value) ||
SQLConstant.BOOLEN_FALSE.equals(value)) {
+ if (SQLConstant.BOOLEAN_FALSE_NUM.equals(value) ||
SQLConstant.BOOLEN_FALSE
+ .equals(value)) {
return false;
}
if (SQLConstant.BOOLEAN_TRUE_NUM.equals(value) ||
SQLConstant.BOOLEN_TRUE.equals(value)) {
@@ -89,14 +91,48 @@ public class CommonUtils {
return Double.parseDouble(value);
case TEXT:
if ((value.startsWith(SQLConstant.QUOTE) &&
value.endsWith(SQLConstant.QUOTE))
- || (value.startsWith(SQLConstant.DQUOTE) &&
value.endsWith(SQLConstant.DQUOTE))) {
+ || (value.startsWith(SQLConstant.DQUOTE) &&
value.endsWith(SQLConstant.DQUOTE))) {
if (value.length() == 1) {
return new Binary(value);
} else {
return new Binary(value.substring(1, value.length() - 1));
}
}
- throw new QueryProcessException("The TEXT data type should be
covered by \" or '");
+
+ return new Binary(value);
+ default:
+ throw new QueryProcessException("Unsupported data type:" + dataType);
+ }
+ } catch (NumberFormatException e) {
+ throw new QueryProcessException(e.getMessage());
+ }
+ }
+
+ @TestOnly
+ public static Object parseValueForTest(TSDataType dataType, String value)
+ throws QueryProcessException {
+ try {
+ switch (dataType) {
+ case BOOLEAN:
+ value = value.toLowerCase();
+ if (SQLConstant.BOOLEAN_FALSE_NUM.equals(value) ||
SQLConstant.BOOLEN_FALSE
+ .equals(value)) {
+ return false;
+ }
+ if (SQLConstant.BOOLEAN_TRUE_NUM.equals(value) ||
SQLConstant.BOOLEN_TRUE.equals(value)) {
+ return true;
+ }
+ throw new QueryProcessException("The BOOLEAN should be true/TRUE,
false/FALSE or 0/1");
+ case INT32:
+ return Integer.parseInt(value);
+ case INT64:
+ return Long.parseLong(value);
+ case FLOAT:
+ return Float.parseFloat(value);
+ case DOUBLE:
+ return Double.parseDouble(value);
+ case TEXT:
+ return new Binary(value);
default:
throw new QueryProcessException("Unsupported data type:" + dataType);
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
index f627484..18598a4 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
@@ -20,6 +20,17 @@
package org.apache.iotdb.db.engine.storagegroup;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
@@ -57,12 +68,6 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import java.io.File;
-import java.io.IOException;
-import java.util.*;
-
-import static org.junit.Assert.*;
-
public class TTLTest {
private String sg1 = "root.TTL_SG1";
@@ -126,7 +131,8 @@ public class TTLTest {
insertPlan.setDeviceId(sg1);
insertPlan.setTime(System.currentTimeMillis());
insertPlan.setMeasurements(new String[]{"s1"});
- insertPlan.setValues(new String[]{"1"});
+ insertPlan.setTypes(new TSDataType[]{TSDataType.INT64});
+ insertPlan.setValues(new Object[]{1L});
insertPlan.setSchemas(
new MeasurementSchema[]{new MeasurementSchema("s1", TSDataType.INT64,
TSEncoding.PLAIN)});
@@ -152,7 +158,8 @@ public class TTLTest {
insertPlan.setDeviceId(sg1);
insertPlan.setTime(System.currentTimeMillis());
insertPlan.setMeasurements(new String[]{"s1"});
- insertPlan.setValues(new String[]{"1"});
+ insertPlan.setTypes(new TSDataType[]{TSDataType.INT64});
+ insertPlan.setValues(new Object[]{1L});
insertPlan.setSchemas(
new MeasurementSchema[]{new MeasurementSchema("s1", TSDataType.INT64,
TSEncoding.PLAIN)});
diff --git a/server/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java
b/server/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java
index 1e7401a..2dc30bc 100644
--- a/server/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.exception.SystemCheckException;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.writelog.io.LogWriter;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.junit.Test;
public class WalCheckerTest {
@@ -73,12 +74,13 @@ public class WalCheckerTest {
LogWriter logWriter = new LogWriter(subDir.getPath() + File.separator
+ WAL_FILE_NAME);
- ByteBuffer binaryPlans = ByteBuffer.allocate(64*1024);
+ ByteBuffer binaryPlans = ByteBuffer.allocate(64 * 1024);
String deviceId = "device1";
String[] measurements = new String[]{"s1", "s2", "s3"};
+ TSDataType[] types = new TSDataType[]{TSDataType.INT64,
TSDataType.INT64, TSDataType.INT64};
String[] values = new String[]{"5", "6", "7"};
for (int j = 0; j < 10; j++) {
- new InsertPlan(deviceId, j, measurements,
values).serializeTo(binaryPlans);
+ new InsertPlan(deviceId, j, measurements, types,
values).serializeTo(binaryPlans);
}
binaryPlans.flip();
logWriter.write(binaryPlans);
@@ -106,12 +108,13 @@ public class WalCheckerTest {
LogWriter logWriter = new LogWriter(subDir.getPath() + File.separator
+ WAL_FILE_NAME);
- ByteBuffer binaryPlans = ByteBuffer.allocate(64*1024);
+ ByteBuffer binaryPlans = ByteBuffer.allocate(64 * 1024);
String deviceId = "device1";
String[] measurements = new String[]{"s1", "s2", "s3"};
+ TSDataType[] types = new TSDataType[]{TSDataType.INT64,
TSDataType.INT64, TSDataType.INT64};
String[] values = new String[]{"5", "6", "7"};
for (int j = 0; j < 10; j++) {
- new InsertPlan(deviceId, j, measurements,
values).serializeTo(binaryPlans);
+ new InsertPlan(deviceId, j, measurements, types,
values).serializeTo(binaryPlans);
}
if (i > 2) {
binaryPlans.put("not a wal".getBytes());
diff --git
a/server/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java
b/server/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java
index b118f05..269f93c 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java
@@ -88,6 +88,7 @@ public class PerformanceTest {
for (int i = 0; i < 1000000; i++) {
InsertPlan bwInsertPlan = new InsertPlan("logTestDevice", 100,
new String[]{"s1", "s2", "s3", "s4"},
+ new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64,
TSDataType.TEXT, TSDataType.BOOLEAN},
new String[]{"1.0", "15", "str", "false"});
UpdatePlan updatePlan = new UpdatePlan(0, 100, "2.0",
new Path("root.logTestDevice.s1"));
@@ -147,7 +148,9 @@ public class PerformanceTest {
for (int i = 0; i < 1000000; i++) {
InsertPlan bwInsertPlan = new InsertPlan("root.logTestDevice", 100,
- new String[]{"s1", "s2", "s3", "s4"}, new String[]{"1.0", "15",
"str", "false"});
+ new String[]{"s1", "s2", "s3", "s4"},
+ new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64,
TSDataType.TEXT, TSDataType.BOOLEAN},
+ new String[]{"1.0", "15", "str", "false"});
UpdatePlan updatePlan = new UpdatePlan(0, 100, "2.0",
new Path("root.logTestDevice.s1"));
DeletePlan deletePlan = new DeletePlan(50, new
Path("root.logTestDevice.s1"));
diff --git
a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeManagerTest.java
b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeManagerTest.java
index 4aa8297..7125d88 100644
---
a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeManagerTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeManagerTest.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
import org.apache.iotdb.db.writelog.manager.WriteLogNodeManager;
import org.apache.iotdb.db.writelog.node.WriteLogNode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.junit.After;
import org.junit.Before;
@@ -86,6 +87,7 @@ public class WriteLogNodeManagerTest {
InsertPlan bwInsertPlan = new InsertPlan("logTestDevice", 100,
new String[]{"s1", "s2", "s3", "s4"},
+ new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT,
TSDataType.BOOLEAN},
new String[]{"1.0", "15", "str", "false"});
DeletePlan deletePlan = new DeletePlan(50, new
Path("root.logTestDevice.s1"));
diff --git
a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
index 63a5931..953a76e 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.writelog.io.ILogReader;
import org.apache.iotdb.db.writelog.node.ExclusiveWriteLogNode;
import org.apache.iotdb.db.writelog.node.WriteLogNode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.junit.After;
import org.junit.Before;
@@ -66,6 +67,7 @@ public class WriteLogNodeTest {
InsertPlan bwInsertPlan = new InsertPlan(identifier, 100,
new String[]{"s1", "s2", "s3", "s4"},
+ new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT,
TSDataType.BOOLEAN},
new String[]{"1.0", "15", "str", "false"});
DeletePlan deletePlan = new DeletePlan(50, new Path(identifier + ".s1"));
@@ -96,6 +98,7 @@ public class WriteLogNodeTest {
InsertPlan bwInsertPlan = new InsertPlan(identifier, 100,
new String[]{"s1", "s2", "s3", "s4"},
+ new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT,
TSDataType.BOOLEAN},
new String[]{"1.0", "15", "str", "false"});
DeletePlan deletePlan = new DeletePlan(50, new Path(identifier + ".s1"));
@@ -132,6 +135,7 @@ public class WriteLogNodeTest {
InsertPlan bwInsertPlan = new InsertPlan("root.logTestDevice", 100,
new String[]{"s1", "s2", "s3", "s4"},
+ new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT,
TSDataType.BOOLEAN},
new String[]{"1.0", "15", "str", "false"});
DeletePlan deletePlan = new DeletePlan(50, new
Path("root.logTestDevice.s1"));
@@ -157,6 +161,7 @@ public class WriteLogNodeTest {
InsertPlan bwInsertPlan = new InsertPlan("logTestDevice", 100,
new String[]{"s1", "s2", "s3", "s4"},
+ new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT,
TSDataType.BOOLEAN},
new String[]{"1.0", "15", "str", "false"});
DeletePlan deletePlan = new DeletePlan(50, new
Path("root.logTestDevice.s1"));
@@ -181,6 +186,7 @@ public class WriteLogNodeTest {
InsertPlan bwInsertPlan = new InsertPlan("root.logTestDevice.oversize",
100,
new String[]{"s1", "s2", "s3", "s4"},
+ new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT,
TSDataType.BOOLEAN},
new String[]{"1.0", "15", new String(new char[65 * 1024 * 1024]),
"false"});
boolean caught = false;
diff --git
a/server/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java
b/server/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java
index 6dea9ac..b2e087e 100644
---
a/server/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java
@@ -29,6 +29,7 @@ import java.util.List;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.junit.Before;
import org.junit.Test;
@@ -36,7 +37,7 @@ import org.junit.Test;
public class LogWriterReaderTest {
private static String filePath = "logtest.test";
- ByteBuffer logsBuffer = ByteBuffer.allocate(64*1024);
+ ByteBuffer logsBuffer = ByteBuffer.allocate(64 * 1024);
List<PhysicalPlan> plans = new ArrayList<>();
@Before
@@ -45,8 +46,10 @@ public class LogWriterReaderTest {
new File(filePath).delete();
}
InsertPlan insertPlan1 = new InsertPlan("d1", 10L, new String[]{"s1",
"s2"},
+ new TSDataType[]{TSDataType.INT64, TSDataType.INT64},
new String[]{"1", "2"});
InsertPlan insertPlan2 = new InsertPlan("d1", 10L, new String[]{"s1",
"s2"},
+ new TSDataType[]{TSDataType.INT64, TSDataType.INT64},
new String[]{"1", "2"});
DeletePlan deletePlan = new DeletePlan(10L, new Path("root.d1.s1"));
plans.add(insertPlan1);
diff --git
a/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
b/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
index fc1005c..37e263b 100644
---
a/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
@@ -36,10 +36,10 @@ import
org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.StorageGroupProcessorException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.MManager;
-import org.apache.iotdb.db.exception.StorageGroupProcessorException;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -48,10 +48,9 @@ import org.apache.iotdb.db.writelog.node.WriteLogNode;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -105,10 +104,13 @@ public class LogReplayerTest {
WriteLogNode node =
MultiFileLogNodeManager.getInstance().getNode(logNodePrefix +
tsFile.getName());
- node.write(new InsertPlan("root.sg.device0", 100, "sensor0",
String.valueOf(0)));
- node.write(new InsertPlan("root.sg.device0", 2, "sensor1",
String.valueOf(0)));
+ node.write(
+ new InsertPlan("root.sg.device0", 100, "sensor0", TSDataType.INT64,
String.valueOf(0)));
+ node.write(
+ new InsertPlan("root.sg.device0", 2, "sensor1", TSDataType.INT64,
String.valueOf(0)));
for (int i = 1; i < 5; i++) {
- node.write(new InsertPlan("root.sg.device" + i, i, "sensor" + i,
String.valueOf(i)));
+ node.write(new InsertPlan("root.sg.device" + i, i, "sensor" + i,
TSDataType.INT64,
+ String.valueOf(i)));
}
DeletePlan deletePlan = new DeletePlan(200, new Path("root.sg.device0",
"sensor0"));
node.write(deletePlan);
@@ -117,8 +119,9 @@ public class LogReplayerTest {
replayer.replayLogs();
for (int i = 0; i < 5; i++) {
- ReadOnlyMemChunk memChunk = memTable.query("root.sg.device" + i,
"sensor" + i, TSDataType.INT64,
- TSEncoding.RLE, Collections.emptyMap(), Long.MIN_VALUE);
+ ReadOnlyMemChunk memChunk = memTable
+ .query("root.sg.device" + i, "sensor" + i, TSDataType.INT64,
+ TSEncoding.RLE, Collections.emptyMap(), Long.MIN_VALUE);
IPointReader iterator = memChunk.getPointReader();
if (i == 0) {
assertFalse(iterator.hasNextTimeValuePair());
diff --git
a/server/src/test/java/org/apache/iotdb/db/writelog/recover/RecoverResourceFromReaderTest.java
b/server/src/test/java/org/apache/iotdb/db/writelog/recover/RecoverResourceFromReaderTest.java
index a881e01..3c1435b 100644
---
a/server/src/test/java/org/apache/iotdb/db/writelog/recover/RecoverResourceFromReaderTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/writelog/recover/RecoverResourceFromReaderTest.java
@@ -88,7 +88,8 @@ public class RecoverResourceFromReaderTest {
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
Path path = new Path(("root.sg.device" + i), ("sensor" + j));
- MeasurementSchema measurementSchema = new MeasurementSchema("sensor" +
j, TSDataType.INT64, TSEncoding.PLAIN);
+ MeasurementSchema measurementSchema = new MeasurementSchema("sensor" +
j, TSDataType.INT64,
+ TSEncoding.PLAIN);
schema.registerTimeseries(path, measurementSchema);
MManager.getInstance().createTimeseries(path.getFullPath(),
measurementSchema.getType(),
measurementSchema.getEncodingType(),
measurementSchema.getCompressor(),
@@ -138,18 +139,23 @@ public class RecoverResourceFromReaderTest {
for (int j = 0; j < 10; j++) {
String[] measurements = new String[10];
String[] values = new String[10];
+ TSDataType[] types = new TSDataType[10];
for (int k = 0; k < 10; k++) {
measurements[k] = "sensor" + k;
+ types[k] = TSDataType.INT64;
values[k] = String.valueOf(k + 10);
}
- InsertPlan insertPlan = new InsertPlan("root.sg.device" + j, i,
measurements, values);
+ InsertPlan insertPlan = new InsertPlan("root.sg.device" + j, i,
measurements,
+ types, values);
node.write(insertPlan);
}
node.notifyStartFlush();
}
- InsertPlan insertPlan = new InsertPlan("root.sg.device99", 1, "sensor4",
"4");
+ InsertPlan insertPlan = new InsertPlan("root.sg.device99", 1, new
String[]{"sensor4"},
+ new TSDataType[]{TSDataType.INT64}, new String[]{"4"});
node.write(insertPlan);
- insertPlan = new InsertPlan("root.sg.device99", 300, "sensor2", "2");
+ insertPlan = new InsertPlan("root.sg.device99", 300, new
String[]{"sensor2"},
+ new TSDataType[]{TSDataType.INT64}, new String[]{"2"});
node.write(insertPlan);
node.close();
@@ -174,10 +180,11 @@ public class RecoverResourceFromReaderTest {
.getBufferedOutputStream(resourceFile.getPath())) {
ReadWriteIOUtils.write(123, outputStream);
}
-
+
TsFileRecoverPerformer performer = new
TsFileRecoverPerformer(logNodePrefix,
versionController, resource, true, false);
-
ActiveTimeSeriesCounter.getInstance().init(resource.getFile().getParentFile().getParentFile().getName());
+ ActiveTimeSeriesCounter.getInstance()
+ .init(resource.getFile().getParentFile().getParentFile().getName());
performer.recover();
assertEquals(1, (long) resource.getStartTimeMap().get("root.sg.device99"));
assertEquals(300, (long) resource.getEndTimeMap().get("root.sg.device99"));
diff --git
a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
index 5c51d9a..ac5ae05 100644
---
a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
@@ -142,17 +142,20 @@ public class SeqTsFileRecoverTest {
for (int i = 10; i < 20; i++) {
for (int j = 0; j < 10; j++) {
String[] measurements = new String[10];
+ TSDataType[] types = new TSDataType[10];
String[] values = new String[10];
for (int k = 0; k < 10; k++) {
measurements[k] = "sensor" + k;
+ types[k] = TSDataType.INT64;
values[k] = String.valueOf(k);
}
- InsertPlan insertPlan = new InsertPlan("root.sg.device" + j, i,
measurements, values);
+ InsertPlan insertPlan = new InsertPlan("root.sg.device" + j, i,
measurements, types,
+ values);
node.write(insertPlan);
}
node.notifyStartFlush();
}
-
+
resource = new TsFileResource(tsF);
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
index b285dd0..016fb60 100644
---
a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
@@ -19,6 +19,10 @@
package org.apache.iotdb.db.writelog.recover;
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
import java.util.Collections;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter;
@@ -53,17 +57,12 @@ import
org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
import org.apache.iotdb.tsfile.write.TsFileWriter;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
-import org.apache.iotdb.tsfile.write.schema.Schema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.Schema;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import java.io.File;
-import java.io.IOException;
-
-import static org.junit.Assert.assertEquals;
-
public class UnseqTsFileRecoverTest {
private File tsF;
@@ -96,7 +95,8 @@ public class UnseqTsFileRecoverTest {
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
Path path = new Path(("root.sg.device" + i), ("sensor" + j));
- MeasurementSchema measurementSchema = new MeasurementSchema("sensor" +
j, TSDataType.INT64, TSEncoding.PLAIN);
+ MeasurementSchema measurementSchema = new MeasurementSchema("sensor" +
j, TSDataType.INT64,
+ TSEncoding.PLAIN);
schema.registerTimeseries(path, measurementSchema);
MManager.getInstance().createTimeseries(path.getFullPath(),
measurementSchema.getType(),
measurementSchema.getEncodingType(),
measurementSchema.getCompressor(),
@@ -145,19 +145,22 @@ public class UnseqTsFileRecoverTest {
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
String[] measurements = new String[10];
+ TSDataType[] types = new TSDataType[10];
String[] values = new String[10];
for (int k = 0; k < 10; k++) {
measurements[k] = "sensor" + k;
+ types[k] = TSDataType.INT64;
values[k] = String.valueOf(k + 10);
}
- InsertPlan insertPlan = new InsertPlan("root.sg.device" + j, i,
measurements, values);
+ InsertPlan insertPlan = new InsertPlan("root.sg.device" + j, i,
measurements, types,
+ values);
node.write(insertPlan);
}
node.notifyStartFlush();
}
- InsertPlan insertPlan = new InsertPlan("root.sg.device99", 1, "sensor4",
"4");
+ InsertPlan insertPlan = new InsertPlan("root.sg.device99", 1, "sensor4",
TSDataType.INT64, "4");
node.write(insertPlan);
- insertPlan = new InsertPlan("root.sg.device99", 300, "sensor2", "2");
+ insertPlan = new InsertPlan("root.sg.device99", 300, "sensor2",
TSDataType.INT64, "2");
node.write(insertPlan);
node.close();
@@ -176,7 +179,8 @@ public class UnseqTsFileRecoverTest {
public void test() throws StorageGroupProcessorException, IOException {
TsFileRecoverPerformer performer = new
TsFileRecoverPerformer(logNodePrefix,
versionController, resource, true, false);
-
ActiveTimeSeriesCounter.getInstance().init(resource.getFile().getParentFile().getParentFile().getName());
+ ActiveTimeSeriesCounter.getInstance()
+ .init(resource.getFile().getParentFile().getParentFile().getName());
performer.recover();
assertEquals(1, (long) resource.getStartTimeMap().get("root.sg.device99"));
diff --git a/service-rpc/rpc-changelist.md b/service-rpc/rpc-changelist.md
index b2e528f..5814ac2 100644
--- a/service-rpc/rpc-changelist.md
+++ b/service-rpc/rpc-changelist.md
@@ -22,7 +22,7 @@
# 0.10.0 (version-1) -> version-2
-Last Updated on 2020-4-24 by Jialin Qiao.
+Last Updated on 2020-5-25 by Kaifeng Xue.
## 1. Delete Old
@@ -50,6 +50,8 @@ Last Updated on 2020-4-24 by Jialin Qiao.
| Rename TSStatusType to TSStatus | Jialin Qiao |
| Remove sessionId in TSExecuteBatchStatementResp | Jialin Qiao |
| Rename insertRows to insertReords, insert to insertRecord, insertBatch to
insertTablet | Jialin Qiao |
+| Use TsDataType and binary rather than string in TSInsertInBatchReq and
TSInsertReq | Kaifeng Xue |
+
# 0.8.0 (version-0) -> version-1
@@ -121,3 +123,4 @@ Last Updated on 2019-10-27 by Lei Rui.
| Add required i64 statementId in TSExecuteStatementReq | Yuan Tian |
| Add required binary time, required list<binary> valueList, required
list<binary> bitmapList and remove required binary values, required i32
rowCount in TSQueryDataSet| Yuan Tian |
| Add optional i32 fetchSize in TSExecuteStatementReq,<br />Add optional
TSQueryDataSet in TSExecuteStatementResp| liutaohua |
+| Add optional map<string, string> props, optional map<string, string> tags,
optional map<string, string> attributes and optional string aliasPath in
TSCreateTimeseriesReq | Yuan Tian |
diff --git a/service-rpc/src/main/thrift/rpc.thrift
b/service-rpc/src/main/thrift/rpc.thrift
index 7c8e026..876dac4 100644
--- a/service-rpc/src/main/thrift/rpc.thrift
+++ b/service-rpc/src/main/thrift/rpc.thrift
@@ -169,7 +169,7 @@ struct TSInsertRecordReq {
1: required i64 sessionId
2: required string deviceId
3: required list<string> measurements
- 4: required list<string> values
+ 4: required binary values
5: required i64 timestamp
}
@@ -197,7 +197,7 @@ struct TSInsertRecordsReq {
1: required i64 sessionId
2: required list<string> deviceIds
3: required list<list<string>> measurementsList
- 4: required list<list<string>> valuesList
+ 4: required list<binary> valuesList
5: required list<i64> timestamps
}
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java
b/session/src/main/java/org/apache/iotdb/session/Session.java
index b574c9f..244b602 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.session;
+import java.nio.ByteBuffer;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
@@ -45,11 +46,13 @@ import
org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.thrift.TException;
@@ -186,57 +189,28 @@ public class Session {
}
/**
- * insert data in one row, if you want to improve your performance, please
use insertRecords method
- * or insertTablet method
+ * insert data in one row, if you want to improve your performance, please
use insertRecords
+ * method or insertTablet method
*
- * @see Session#insertRecords(List, List, List, List)
+ * @see Session#insertRecords(List, List, List, List, List)
* @see Session#insertTablet(Tablet)
*/
public void insertRecord(String deviceId, long time, List<String>
measurements,
+ List<TSDataType> types,
Object... values) throws IoTDBConnectionException,
StatementExecutionException {
- List<String> stringValues = new ArrayList<>();
- for (Object o : values) {
- stringValues.add(o.toString());
- }
+ List<Object> valuesList = new ArrayList<>(Arrays.asList(values));
- insertRecord(deviceId, time, measurements, stringValues);
- }
-
- /**
- * insert data in one row, if you want to improve your performance, please
use insertRecords method
- * or insertTablet method
- *
- * @see Session#insertRecords(List, List, List, List)
- * @see Session#insertTablet(Tablet)
- */
- public void insertRecord(String deviceId, long time, List<String>
measurements,
- List<String> values) throws IoTDBConnectionException,
StatementExecutionException {
- TSInsertRecordReq request = new TSInsertRecordReq();
- request.setSessionId(sessionId);
- request.setDeviceId(deviceId);
- request.setTimestamp(time);
- request.setMeasurements(measurements);
- request.setValues(values);
-
- try {
- RpcUtils.verifySuccess(client.insertRecord(request));
- } catch (TException e) {
- throw new IoTDBConnectionException(e);
- }
+ insertRecord(deviceId, time, measurements, types, valuesList);
}
/**
* insert the data of a device. For each timestamp, the number of
measurements is the same.
- *
- * a Tablet example:
- *
- * device1
- * time s1, s2, s3
- * 1, 1, 1, 1
- * 2, 2, 2, 2
- * 3, 3, 3, 3
- *
+ * <p>
+ * a Tablet example:
+ * <p>
+ * device1 time s1, s2, s3 1, 1, 1, 1 2, 2, 2, 2 3, 3, 3, 3
+ * <p>
* times in Tablet may be not in ascending order
*
* @param tablet data batch
@@ -281,9 +255,9 @@ public class Session {
}
/**
- * insert the data of several deivces.
- * Given a deivce, for each timestamp, the number of measurements is the
same.
- *
+ * insert the data of several deivces. Given a deivce, for each timestamp,
the number of
+ * measurements is the same.
+ * <p>
* Times in each Tablet may not be in ascending order
*
* @param tablets data batch in multiple device
@@ -294,11 +268,11 @@ public class Session {
}
/**
- * insert the data of several devices.
- * Given a device, for each timestamp, the number of measurements is the
same.
+ * insert the data of several devices. Given a device, for each timestamp,
the number of
+ * measurements is the same.
*
* @param tablets data batch in multiple device
- * @param sorted whether times in each Tablet are in ascending order
+ * @param sorted whether times in each Tablet are in ascending order
*/
public void insertTablets(Map<String, Tablet> tablets, boolean sorted)
throws IoTDBConnectionException, BatchExecutionException {
@@ -337,10 +311,51 @@ public class Session {
}
/**
- * Insert multiple rows, which can reduce the overhead of network. This
method is just like
- * jdbc executeBatch, we pack some insert request in batch and send them to
server.
- * If you want improve your performance, please see insertTablet method
+ * Insert multiple rows, which can reduce the overhead of network. This
method is just like jdbc
+ * executeBatch, we pack some insert request in batch and send them to
server. If you want improve
+ * your performance, please see insertTablet method
+ * <p>
+ * Each row is independent, which could have different deviceId, time,
number of measurements
*
+ * @see Session#insertTablet(Tablet)
+ */
+ public void insertRecords(List<String> deviceIds, List<Long> times,
+ List<List<String>> measurementsList, List<List<TSDataType>> typesList,
+ List<List<Object>> valuesList)
+ throws IoTDBConnectionException, BatchExecutionException {
+ // check params size
+ int len = deviceIds.size();
+ if (len != times.size() || len != measurementsList.size() || len !=
valuesList.size()) {
+ throw new IllegalArgumentException(
+ "deviceIds, times, measurementsList and valuesList's size should be
equal");
+ }
+
+ TSInsertRecordsReq request = new TSInsertRecordsReq();
+ request.setSessionId(sessionId);
+ request.setDeviceIds(deviceIds);
+ request.setTimestamps(times);
+ request.setMeasurementsList(measurementsList);
+ List<ByteBuffer> buffersList = new ArrayList<>();
+ for (int i = 0; i < measurementsList.size(); i++) {
+ ByteBuffer buffer =
ByteBuffer.allocate(calculateLength(typesList.get(i), valuesList.get(i)));
+ putValues(typesList.get(i), valuesList.get(i), buffer);
+ buffer.flip();
+ buffersList.add(buffer);
+ }
+ request.setValuesList(buffersList);
+
+ try {
+ RpcUtils.verifySuccess(client.insertRecords(request).statusList);
+ } catch (TException e) {
+ throw new IoTDBConnectionException(e);
+ }
+ }
+
+ /**
+ * Insert multiple rows, which can reduce the overhead of network. This
method is just like jdbc
+ * executeBatch, we pack some insert request in batch and send them to
server. If you want improve
+ * your performance, please see insertTablet method
+ * <p>
* Each row is independent, which could have different deviceId, time,
number of measurements
*
* @see Session#insertTablet(Tablet)
@@ -360,7 +375,14 @@ public class Session {
request.setDeviceIds(deviceIds);
request.setTimestamps(times);
request.setMeasurementsList(measurementsList);
- request.setValuesList(valuesList);
+ List<ByteBuffer> buffersList = new ArrayList<>();
+ for (int i = 0; i < measurementsList.size(); i++) {
+ ByteBuffer buffer =
ByteBuffer.allocate(calculateStrLength(valuesList.get(i)));
+ putStrValues(valuesList.get(i), buffer);
+ buffer.flip();
+ buffersList.add(buffer);
+ }
+ request.setValuesList(buffersList);
try {
RpcUtils.verifySuccess(client.insertRecords(request).statusList);
@@ -369,26 +391,157 @@ public class Session {
}
}
+
/**
- * This method NOT insert data into database and the server just return
after accept the request,
- * this method should be used to test other time cost in client
+ * insert data in one row, if you want improve your performance, please use
insertInBatch method
+ * or insertBatch method
+ *
+ * @see Session#insertRecords(List, List, List, List, List)
+ * @see Session#insertTablet(Tablet)
*/
- public void testInsertRecord(String deviceId, long time, List<String>
measurements,
+ public void insertRecord(String deviceId, long time, List<String>
measurements,
+ List<TSDataType> types,
+ List<Object> values) throws IoTDBConnectionException,
StatementExecutionException {
+ TSInsertRecordReq request = new TSInsertRecordReq();
+ request.setSessionId(sessionId);
+ request.setDeviceId(deviceId);
+ request.setTimestamp(time);
+ request.setMeasurements(measurements);
+ ByteBuffer buffer = ByteBuffer.allocate(calculateLength(types, values));
+ putValues(types, values, buffer);
+ buffer.flip();
+ request.setValues(buffer);
+
+ try {
+ RpcUtils.verifySuccess(client.insertRecord(request));
+ } catch (TException e) {
+ throw new IoTDBConnectionException(e);
+ }
+ }
+
+ /**
+ * insert data in one row, if you want improve your performance, please use
insertInBatch method
+ * or insertBatch method
+ *
+ * @see Session#insertRecords(List, List, List, List, List)
+ * @see Session#insertTablet(Tablet)
+ */
+ public void insertRecord(String deviceId, long time, List<String>
measurements,
List<String> values) throws IoTDBConnectionException,
StatementExecutionException {
TSInsertRecordReq request = new TSInsertRecordReq();
request.setSessionId(sessionId);
request.setDeviceId(deviceId);
request.setTimestamp(time);
request.setMeasurements(measurements);
- request.setValues(values);
+ ByteBuffer buffer = ByteBuffer.allocate(calculateStrLength(values));
+ putStrValues(values, buffer);
+ buffer.flip();
+ request.setValues(buffer);
try {
- RpcUtils.verifySuccess(client.testInsertRecord(request));
+ RpcUtils.verifySuccess(client.insertRecord(request));
} catch (TException e) {
throw new IoTDBConnectionException(e);
}
}
+ private void putStrValues(List<String> values, ByteBuffer buffer)
+ throws IoTDBConnectionException {
+ for (int i = 0; i < values.size(); i++) {
+ ReadWriteIOUtils.write(TSDataType.TEXT, buffer);
+ byte[] bytes = ((String)
values.get(i)).getBytes(TSFileConfig.STRING_CHARSET);
+ ReadWriteIOUtils.write(bytes.length, buffer);
+ buffer.put(bytes);
+ }
+ }
+
+
+ /**
+ * put value in buffer
+ *
+ * @param types types list
+ * @param values values list
+ * @param buffer buffer to insert
+ * @throws IoTDBConnectionException
+ */
+ private void putValues(List<TSDataType> types, List<Object> values,
ByteBuffer buffer)
+ throws IoTDBConnectionException {
+ for (int i = 0; i < values.size(); i++) {
+ ReadWriteIOUtils.write(types.get(i), buffer);
+ switch (types.get(i)) {
+ case BOOLEAN:
+ ReadWriteIOUtils.write((Boolean) values.get(i), buffer);
+ break;
+ case INT32:
+ ReadWriteIOUtils.write((Integer) values.get(i), buffer);
+ break;
+ case INT64:
+ ReadWriteIOUtils.write((Long) values.get(i), buffer);
+ break;
+ case FLOAT:
+ ReadWriteIOUtils.write((Float) values.get(i), buffer);
+ break;
+ case DOUBLE:
+ ReadWriteIOUtils.write((Double) values.get(i), buffer);
+ break;
+ case TEXT:
+ byte[] bytes = ((String)
values.get(i)).getBytes(TSFileConfig.STRING_CHARSET);
+ ReadWriteIOUtils.write(bytes.length, buffer);
+ buffer.put(bytes);
+ break;
+ default:
+ throw new IoTDBConnectionException("Unsupported data type:" +
types.get(i));
+ }
+ }
+ }
+
+ private int calculateStrLength(List<String> values) {
+ int res = 0;
+
+ for (int i = 0; i < values.size(); i++) {
+ // types
+ res += Short.BYTES;
+ res += Integer.BYTES;
+ res += values.get(i).getBytes(TSFileConfig.STRING_CHARSET).length;
+ }
+
+ return res;
+ }
+
+ private int calculateLength(List<TSDataType> types, List<Object> values)
+ throws IoTDBConnectionException {
+ int res = 0;
+ for (int i = 0; i < types.size(); i++) {
+ // types
+ res += Short.BYTES;
+ switch (types.get(i)) {
+ case BOOLEAN:
+ res += 1;
+ break;
+ case INT32:
+ res += Integer.BYTES;
+ break;
+ case INT64:
+ res += Long.BYTES;
+ break;
+ case FLOAT:
+ res += Float.BYTES;
+ break;
+ case DOUBLE:
+ res += Double.BYTES;
+ break;
+ case TEXT:
+ res += Integer.BYTES;
+ res += ((String)
values.get(i)).getBytes(TSFileConfig.STRING_CHARSET).length;
+ break;
+ default:
+ throw new IoTDBConnectionException("Unsupported data type:" +
types.get(i));
+ }
+ }
+
+ return res;
+ }
+
/**
* This method NOT insert data into database and the server just return
after accept the request,
* this method should be used to test other time cost in client
@@ -432,7 +585,7 @@ public class Session {
request.setDeviceIds(deviceIds);
request.setTimestamps(times);
request.setMeasurementsList(measurementsList);
- request.setValuesList(valuesList);
+ request.setValuesList(new ArrayList<>());
try {
RpcUtils.verifySuccess(client.testInsertRecords(request).statusList);
@@ -442,6 +595,26 @@ public class Session {
}
/**
+ * This method NOT insert data into database and the server just return
after accept the request,
+ * this method should be used to test other time cost in client
+ */
+ public void testInsertRecord(String deviceId, long time, List<String>
measurements,
+ List<String> values) throws IoTDBConnectionException,
StatementExecutionException {
+ TSInsertRecordReq request = new TSInsertRecordReq();
+ request.setSessionId(sessionId);
+ request.setDeviceId(deviceId);
+ request.setTimestamp(time);
+ request.setMeasurements(measurements);
+ request.setValues(ByteBuffer.allocate(1));
+
+ try {
+ RpcUtils.verifySuccess(client.testInsertRecord(request));
+ } catch (TException e) {
+ throw new IoTDBConnectionException(e);
+ }
+ }
+
+ /**
* delete a timeseries, including data and schema
*
* @param path timeseries to delete, should be a whole path
@@ -565,19 +738,19 @@ public class Session {
request.setPaths(paths);
List<Integer> dataTypeOrdinals = new ArrayList<>(paths.size());
- for (TSDataType dataType: dataTypes) {
+ for (TSDataType dataType : dataTypes) {
dataTypeOrdinals.add(dataType.ordinal());
}
request.setDataTypes(dataTypeOrdinals);
List<Integer> encodingOrdinals = new ArrayList<>(paths.size());
- for (TSEncoding encoding: encodings) {
+ for (TSEncoding encoding : encodings) {
encodingOrdinals.add(encoding.ordinal());
}
request.setEncodings(encodingOrdinals);
List<Integer> compressionOrdinals = new ArrayList<>(paths.size());
- for (CompressionType compression: compressors) {
+ for (CompressionType compression : compressors) {
compressionOrdinals.add(compression.ordinal());
}
request.setCompressors(compressionOrdinals);
@@ -651,8 +824,10 @@ public class Session {
}
RpcUtils.verifySuccess(execResp.getStatus());
- return new SessionDataSet(sql, execResp.getColumns(),
execResp.getDataTypeList(), execResp.columnNameIndexMap,
- execResp.getQueryId(), client, sessionId, execResp.queryDataSet,
execResp.isIgnoreTimeStamp());
+ return new SessionDataSet(sql, execResp.getColumns(),
execResp.getDataTypeList(),
+ execResp.columnNameIndexMap,
+ execResp.getQueryId(), client, sessionId, execResp.queryDataSet,
+ execResp.isIgnoreTimeStamp());
}
/**
diff --git
a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 17135fb..260ab65 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -37,45 +37,40 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * SessionPool is a wrapper of a Session Set.
- * Using SessionPool, the user do not need to consider how to reuse a session
connection.
- * Even if the session is disconnected, the session pool can recognize it and
remove the broken
- * session connection and create a new one.
- *
+ * SessionPool is a wrapper of a Session Set. Using SessionPool, the user do
not need to consider
+ * how to reuse a session connection. Even if the session is disconnected, the
session pool can
+ * recognize it and remove the broken session connection and create a new one.
+ * <p>
* If there is no available connections and the pool reaches its max size, the
all methods will hang
* until there is a available connection.
- *
+ * <p>
* If a user has waited for a session for more than 60 seconds, a warn log
will be printed.
- *
+ * <p>
* The only thing you have to remember is that:
- *
+ * <p>
* For a query, if you have get all data, i.e.,
SessionDataSetWrapper.hasNext() == false, it is ok.
- * Otherwise, i.e., you want to stop the query before you get all data
(SessionDataSetWrapper.hasNext() == true),
- * then you have to call closeResultSet(SessionDataSetWrapper wrapper)
manually.
- * Otherwise the connection is occupied by the query.
- *
- * Another case that you have to manually call closeResultSet() is that when
there is exception
- * when you call SessionDataSetWrapper.hasNext() or next()
- *
+ * Otherwise, i.e., you want to stop the query before you get all data
+ * (SessionDataSetWrapper.hasNext() == true), then you have to call
closeResultSet(SessionDataSetWrapper
+ * wrapper) manually. Otherwise the connection is occupied by the query.
+ * <p>
+ * Another case that you have to manually call closeResultSet() is that when
there is exception when
+ * you call SessionDataSetWrapper.hasNext() or next()
*/
public class SessionPool {
private static final Logger logger =
LoggerFactory.getLogger(SessionPool.class);
+ private static int RETRY = 3;
private ConcurrentLinkedDeque<Session> queue = new ConcurrentLinkedDeque<>();
//for session whose resultSet is not released.
private ConcurrentMap<Session, Session> occupied = new ConcurrentHashMap<>();
-
private int size = 0;
private int maxSize = 0;
private String ip;
private int port;
private String user;
private String password;
-
private int fetchSize;
-
private long timeout; //ms
- private static int RETRY = 3;
private static int FINAL_RETRY = RETRY - 1;
private boolean enableCompression = false;
@@ -328,12 +323,12 @@ public class SessionPool {
* @see Session#insertTablet(Tablet)
*/
public void insertRecords(List<String> deviceIds, List<Long> times,
- List<List<String>> measurementsList, List<List<String>> valuesList)
- throws IoTDBConnectionException, BatchExecutionException {
+ List<List<String>> measurementsList, List<List<TSDataType>> typesList,
+ List<List<Object>> valuesList) throws IoTDBConnectionException,
BatchExecutionException {
for (int i = 0; i < RETRY; i++) {
Session session = getSession();
try {
- session.insertRecords(deviceIds, times, measurementsList, valuesList);
+ session.insertRecords(deviceIds, times, measurementsList, typesList,
valuesList);
putBack(session);
return;
} catch (IoTDBConnectionException e) {
@@ -350,15 +345,16 @@ public class SessionPool {
* insert data in one row, if you want improve your performance, please use
insertRecords method
* or insertTablet method
*
- * @see Session#insertRecords(List, List, List, List)
+ * @see Session#insertRecords(List, List, List, List, List)
* @see Session#insertTablet(Tablet)
*/
- public void insertRecord(String deviceId, long time, List<String>
measurements, List<String> values)
+ public void insertRecord(String deviceId, long time, List<String>
measurements,
+ List<TSDataType> types, List<Object> values)
throws IoTDBConnectionException, StatementExecutionException {
for (int i = 0; i < RETRY; i++) {
Session session = getSession();
try {
- session.insertRecord(deviceId, time, measurements, values);
+ session.insertRecord(deviceId, time, measurements, types, values);
putBack(session);
return;
} catch (IoTDBConnectionException e) {
@@ -512,7 +508,7 @@ public class SessionPool {
* delete data <= time in multiple timeseries
*
* @param paths data in which time series to delete
- * @param time data with time stamp less than or equal to time will be
deleted
+ * @param time data with time stamp less than or equal to time will be
deleted
*/
public void deleteData(List<String> paths, long time)
throws IoTDBConnectionException, StatementExecutionException {
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
index 4c4b1e2..8097e37 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
@@ -54,6 +54,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class IoTDBSessionIT {
+
private static Logger logger = LoggerFactory.getLogger(IoTDBSessionIT.class);
private Session session;
@@ -72,6 +73,25 @@ public class IoTDBSessionIT {
}
@Test
+ public void testInsertByStr() throws IoTDBConnectionException,
StatementExecutionException {
+ session = new Session("127.0.0.1", 6667, "root", "root");
+ session.open();
+
+ session.setStorageGroup("root.sg1");
+
+ createTimeseries();
+ insertByStr();
+
+ // sql test
+ insert_via_sql();
+ query3();
+
+ session.close();
+
+ }
+
+
+ @Test
public void testInsertByObject() throws IoTDBConnectionException,
StatementExecutionException {
session = new Session("127.0.0.1", 6667, "root", "root");
session.open();
@@ -200,6 +220,7 @@ public class IoTDBSessionIT {
createTimeseries();
+
// test insert tablet
List<MeasurementSchema> schemaList = new ArrayList<>();
schemaList.add(new MeasurementSchema("s1", TSDataType.INT64,
TSEncoding.RLE));
@@ -351,6 +372,78 @@ public class IoTDBSessionIT {
}
@Test
+ public void testByStr() throws ClassNotFoundException, SQLException,
+ IoTDBConnectionException, StatementExecutionException,
BatchExecutionException {
+ session = new Session("127.0.0.1", 6667, "root", "root");
+ try {
+ session.open();
+ } catch (IoTDBConnectionException e) {
+ e.printStackTrace();
+ }
+
+ session.setStorageGroup("root.sg1");
+
+ createTimeseries();
+
+ insertByStr();
+
+ // sql test
+ insert_via_sql();
+
+ query3();
+
+// insertTabletTest1();
+ deleteData();
+
+ query();
+
+ deleteTimeseries();
+
+ query2();
+
+ insertRecordsByStr();
+
+ query4();
+
+ // special characters
+ session.createTimeseries("root.sg1.d1.1_2", TSDataType.INT64,
TSEncoding.RLE,
+ CompressionType.SNAPPY);
+ session.createTimeseries("root.sg1.d1.\"1.2.3\"", TSDataType.INT64,
TSEncoding.RLE,
+ CompressionType.SNAPPY);
+ session.createTimeseries("root.sg1.d1.\'1.2.4\'", TSDataType.INT64,
TSEncoding.RLE,
+ CompressionType.SNAPPY);
+
+ session.setStorageGroup("root.1");
+ session.createTimeseries("root.1.2.3", TSDataType.INT64, TSEncoding.RLE,
+ CompressionType.SNAPPY);
+
+ // Add another storage group to test the deletion of storage group
+ session.setStorageGroup("root.sg2");
+ session.createTimeseries("root.sg2.d1.s1", TSDataType.INT64,
TSEncoding.RLE,
+ CompressionType.SNAPPY);
+
+ deleteStorageGroupTest();
+
+ // set storage group but do not create timeseries
+ session.setStorageGroup("root.sg3");
+ insertTabletTest1("root.sg3.d1");
+
+ // create timeseries but do not set storage group
+ session.createTimeseries("root.sg4.d1.s1", TSDataType.INT64,
TSEncoding.RLE,
+ CompressionType.SNAPPY);
+ session.createTimeseries("root.sg4.d1.s2", TSDataType.INT64,
TSEncoding.RLE,
+ CompressionType.SNAPPY);
+ session.createTimeseries("root.sg4.d1.s3", TSDataType.INT64,
TSEncoding.RLE,
+ CompressionType.SNAPPY);
+ insertTabletTest1("root.sg4.d1");
+
+ // do not set storage group and create timeseries
+ insertTabletTest1("root.sg5.d1");
+
+ session.close();
+ }
+
+ @Test
public void TestSessionInterfacesWithDisabledWAL()
throws StatementExecutionException, IoTDBConnectionException,
BatchExecutionException {
@@ -370,15 +463,19 @@ public class IoTDBSessionIT {
// test insert record
List<String> measurements = new ArrayList<>();
+ List<TSDataType> types = new ArrayList<>();
measurements.add("s1");
measurements.add("s2");
measurements.add("s3");
+ types.add(TSDataType.INT64);
+ types.add(TSDataType.INT64);
+ types.add(TSDataType.INT64);
for (long time = 0; time < 100; time++) {
- List<String> values = new ArrayList<>();
- values.add("1");
- values.add("2");
- values.add("3");
- session.insertRecord(deviceId, time, measurements, values);
+ List<Object> values = new ArrayList<>();
+ values.add(1L);
+ values.add(2L);
+ values.add(3L);
+ session.insertRecord(deviceId, time, measurements, types, values);
}
// test insert tablet
@@ -460,10 +557,13 @@ public class IoTDBSessionIT {
}
String sensorId = ss[ss.length - 1];
List<String> measurements = new ArrayList<>();
- List<String> values = new ArrayList<>();
+ List<Object> values = new ArrayList<>();
+ List<TSDataType> types = new ArrayList<>();
+
measurements.add(sensorId);
- values.add("100");
- session.insertRecord(deviceId, i, measurements, values);
+ types.add(TSDataType.INT64);
+ values.add(100L);
+ session.insertRecord(deviceId, i, measurements, types, values);
}
}
}
@@ -476,6 +576,45 @@ public class IoTDBSessionIT {
measurements.add("s3");
List<String> deviceIds = new ArrayList<>();
List<List<String>> measurementsList = new ArrayList<>();
+ List<List<Object>> valuesList = new ArrayList<>();
+ List<Long> timestamps = new ArrayList<>();
+ List<List<TSDataType>> typesList = new ArrayList<>();
+
+ for (long time = 0; time < 500; time++) {
+ List<Object> values = new ArrayList<>();
+ List<TSDataType> types = new ArrayList<>();
+ values.add(1L);
+ values.add(2L);
+ values.add(3L);
+ types.add(TSDataType.INT64);
+ types.add(TSDataType.INT64);
+ types.add(TSDataType.INT64);
+
+ deviceIds.add(deviceId);
+ measurementsList.add(measurements);
+ valuesList.add(values);
+ typesList.add(types);
+ timestamps.add(time);
+ if (time != 0 && time % 100 == 0) {
+ session.insertRecords(deviceIds, timestamps, measurementsList,
typesList, valuesList);
+ deviceIds.clear();
+ measurementsList.clear();
+ valuesList.clear();
+ timestamps.clear();
+ }
+ }
+
+ session.insertRecords(deviceIds, timestamps, measurementsList, typesList,
valuesList);
+ }
+
+ private void insertRecordsByStr() throws IoTDBConnectionException,
BatchExecutionException {
+ String deviceId = "root.sg1.d2";
+ List<String> measurements = new ArrayList<>();
+ measurements.add("s1");
+ measurements.add("s2");
+ measurements.add("s3");
+ List<String> deviceIds = new ArrayList<>();
+ List<List<String>> measurementsList = new ArrayList<>();
List<List<String>> valuesList = new ArrayList<>();
List<Long> timestamps = new ArrayList<>();
@@ -504,20 +643,46 @@ public class IoTDBSessionIT {
private void insertInObject() throws IoTDBConnectionException,
StatementExecutionException {
String deviceId = "root.sg1.d1";
List<String> measurements = new ArrayList<>();
+ List<TSDataType> types = new ArrayList<>();
measurements.add("s1");
measurements.add("s2");
measurements.add("s3");
+ types.add(TSDataType.INT64);
+ types.add(TSDataType.INT64);
+ types.add(TSDataType.INT64);
+
for (long time = 0; time < 100; time++) {
- session.insertRecord(deviceId, time, measurements, 1L, 2L, 3L);
+ session.insertRecord(deviceId, time, measurements, types,1L, 2L, 3L);
}
}
private void insert() throws IoTDBConnectionException,
StatementExecutionException {
String deviceId = "root.sg1.d1";
List<String> measurements = new ArrayList<>();
+ List<TSDataType> types = new ArrayList<>();
+ measurements.add("s1");
+ measurements.add("s2");
+ measurements.add("s3");
+ types.add(TSDataType.INT64);
+ types.add(TSDataType.INT64);
+ types.add(TSDataType.INT64);
+
+ for (long time = 0; time < 100; time++) {
+ List<Object> values = new ArrayList<>();
+ values.add(1L);
+ values.add(2L);
+ values.add(3L);
+ session.insertRecord(deviceId, time, measurements, types, values);
+ }
+ }
+
+ private void insertByStr() throws IoTDBConnectionException,
StatementExecutionException {
+ String deviceId = "root.sg1.d1";
+ List<String> measurements = new ArrayList<>();
measurements.add("s1");
measurements.add("s2");
measurements.add("s3");
+
for (long time = 0; time < 100; time++) {
List<String> values = new ArrayList<>();
values.add("1");
@@ -529,6 +694,7 @@ public class IoTDBSessionIT {
private void insertTabletTest1(String deviceId)
throws IoTDBConnectionException, BatchExecutionException {
+
List<MeasurementSchema> schemaList = new ArrayList<>();
schemaList.add(new MeasurementSchema("s1", TSDataType.INT64,
TSEncoding.RLE));
schemaList.add(new MeasurementSchema("s2", TSDataType.INT64,
TSEncoding.RLE));
@@ -803,6 +969,7 @@ public class IoTDBSessionIT {
private void insertTabletTest2(String deviceId)
throws IoTDBConnectionException, BatchExecutionException {
+
List<MeasurementSchema> schemaList = new ArrayList<>();
schemaList.add(new MeasurementSchema("s1", TSDataType.INT64,
TSEncoding.RLE));
schemaList.add(new MeasurementSchema("s2", TSDataType.INT64,
TSEncoding.RLE));
@@ -834,6 +1001,7 @@ public class IoTDBSessionIT {
private void insertTabletTest3(String deviceId)
throws IoTDBConnectionException, BatchExecutionException {
+
List<MeasurementSchema> schemaList = new ArrayList<>();
schemaList.add(new MeasurementSchema("s1", TSDataType.INT64,
TSEncoding.RLE));
schemaList.add(new MeasurementSchema("s2", TSDataType.INT64,
TSEncoding.RLE));
@@ -865,6 +1033,7 @@ public class IoTDBSessionIT {
private void insertTabletTestForTime(String deviceId)
throws IoTDBConnectionException, BatchExecutionException {
+
List<MeasurementSchema> schemaList = new ArrayList<>();
schemaList.add(new MeasurementSchema("s1", TSDataType.INT64,
TSEncoding.RLE));
schemaList.add(new MeasurementSchema("s2", TSDataType.INT64,
TSEncoding.RLE));
diff --git
a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIteratorIT.java
b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIteratorIT.java
index 64c9fad..ed6a24b 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIteratorIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIteratorIT.java
@@ -153,22 +153,29 @@ public class IoTDBSessionIteratorIT {
CompressionType.SNAPPY);
String deviceId = "root.sg1.d1";
List<String> measurements = new ArrayList<>();
+ List<TSDataType> types = new ArrayList<>();
measurements.add("s1");
measurements.add("s2");
+ types.add(TSDataType.INT32);
+ types.add(TSDataType.FLOAT);
+
for (long time = 0; time < 10; time++) {
- List<String> values = new ArrayList<>();
- values.add("1");
- values.add("2");
- session.insertRecord(deviceId, time, measurements, values);
+ List<Object> values = new ArrayList<>();
+ values.add(1);
+ values.add(2f);
+ session.insertRecord(deviceId, time, measurements, types, values);
}
deviceId = "root.sg1.d2";
measurements = new ArrayList<>();
+ types = new ArrayList<>();
measurements.add("s1");
+ types.add(TSDataType.DOUBLE);
+
for (long time = 5; time < 10; time++) {
- List<String> values = new ArrayList<>();
- values.add("4");
- session.insertRecord(deviceId, time, measurements, values);
+ List<Object> values = new ArrayList<>();
+ values.add(4d);
+ session.insertRecord(deviceId, time, measurements, types, values);
}
}
diff --git
a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
index 7df3d1f..c35d044 100644
--- a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
+++ b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -60,7 +61,7 @@ public class SessionPoolTest {
service.submit(() -> {
try {
pool.insertRecord("root.sg1.d1", 1, Collections.singletonList("s" +
no),
- Collections.singletonList("3"));
+ Collections.singletonList(TSDataType.INT64),
Collections.singletonList(3L));
} catch (IoTDBConnectionException | StatementExecutionException e) {
fail();
}
@@ -84,7 +85,8 @@ public class SessionPoolTest {
assertEquals(0, pool.currentAvailableSize());
try {
pool.insertRecord(".root.sg1.d1", 1, Collections.singletonList("s"),
- Collections.singletonList("3"));
+ Collections.singletonList(TSDataType.INT64),
+ Collections.singletonList(3L));
} catch (IoTDBConnectionException | StatementExecutionException e) {
//do nothing
}
@@ -100,7 +102,8 @@ public class SessionPoolTest {
for (int i = 0; i < 10; i++) {
try {
pool.insertRecord("root.sg1.d1", i, Collections.singletonList("s" + i),
- Collections.singletonList("" + i));
+ Collections.singletonList(TSDataType.INT64),
+ Collections.singletonList((long) i));
} catch (IoTDBConnectionException | StatementExecutionException e) {
fail();
}
@@ -143,7 +146,8 @@ public class SessionPoolTest {
for (int i = 0; i < 10; i++) {
try {
pool.insertRecord("root.sg1.d1", i, Collections.singletonList("s" + i),
- Collections.singletonList("" + i));
+ Collections.singletonList(TSDataType.INT64),
+ Collections.singletonList((long) i));
} catch (IoTDBConnectionException | StatementExecutionException e) {
fail();
}
@@ -179,7 +183,8 @@ public class SessionPoolTest {
for (int i = 0; i < 10; i++) {
try {
pool.insertRecord("root.sg1.d1", i, Collections.singletonList("s" + i),
- Collections.singletonList("" + i));
+ Collections.singletonList(TSDataType.INT64),
+ Collections.singletonList((long) i));
} catch (IoTDBConnectionException | StatementExecutionException e) {
fail();
}
@@ -210,7 +215,8 @@ public class SessionPoolTest {
for (int i = 0; i < 10; i++) {
try {
pool.insertRecord("root.sg1.d1", i, Collections.singletonList("s" + i),
- Collections.singletonList("" + i));
+ Collections.singletonList(TSDataType.INT64),
+ Collections.singletonList((long) i));
} catch (IoTDBConnectionException | StatementExecutionException e) {
fail();
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/Binary.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/Binary.java
index 58e1260..1903a09 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/Binary.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/Binary.java
@@ -24,13 +24,12 @@ import java.util.Arrays;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
/**
- * Override compareTo() and equals() function to Binary class. This class is
- * used to accept Java String type
+ * Override compareTo() and equals() function to Binary class. This class is
used to accept Java
+ * String type
*/
public class Binary implements Comparable<Binary>, Serializable {
private static final long serialVersionUID = 6394197743397020735L;
-
private byte[] values;
/**
@@ -117,4 +116,8 @@ public class Binary implements Comparable<Binary>,
Serializable {
public byte[] getValues() {
return values;
}
+
+ public void setValues(byte[] values) {
+ this.values = values;
+ }
}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
index c8dbea3..66500ab 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
@@ -19,11 +19,14 @@
package org.apache.iotdb.tsfile.utils;
-import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.reader.TsFileInput;
+import static
org.apache.iotdb.tsfile.utils.ReadWriteIOUtils.ClassSerializeId.BINARY;
+import static
org.apache.iotdb.tsfile.utils.ReadWriteIOUtils.ClassSerializeId.BOOLEAN;
+import static
org.apache.iotdb.tsfile.utils.ReadWriteIOUtils.ClassSerializeId.DOUBLE;
+import static
org.apache.iotdb.tsfile.utils.ReadWriteIOUtils.ClassSerializeId.FLOAT;
+import static
org.apache.iotdb.tsfile.utils.ReadWriteIOUtils.ClassSerializeId.INTEGER;
+import static
org.apache.iotdb.tsfile.utils.ReadWriteIOUtils.ClassSerializeId.LONG;
+import static
org.apache.iotdb.tsfile.utils.ReadWriteIOUtils.ClassSerializeId.NULL;
+import static
org.apache.iotdb.tsfile.utils.ReadWriteIOUtils.ClassSerializeId.STRING;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -31,10 +34,17 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
-
-import static
org.apache.iotdb.tsfile.utils.ReadWriteIOUtils.ClassSerializeId.*;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.reader.TsFileInput;
/**
* ConverterUtils is a utility class. It provide conversion between normal
datatype and byte array.
@@ -213,6 +223,17 @@ public class ReadWriteIOUtils {
}
/**
+ * write a short n to byteBuffer.
+ *
+ * @return The number of bytes used to represent n.
+ */
+ public static int write(Binary n, ByteBuffer buffer) {
+ buffer.putInt(n.getLength());
+ buffer.put(n.getValues());
+ return INT_LEN + n.getLength();
+ }
+
+ /**
* write a int n to outputStream.
*
* @return The number of bytes used to represent n.
@@ -287,6 +308,23 @@ public class ReadWriteIOUtils {
}
/**
+ * write a float n to byteBuffer.
+ */
+ public static int write(float n, ByteBuffer buffer) {
+ buffer.putFloat(n);
+ return FLOAT_LEN;
+ }
+
+ /**
+ * write a double n to byteBuffer.
+ */
+ public static int write(double n, ByteBuffer buffer) {
+ buffer.putDouble(n);
+ return DOUBLE_LEN;
+ }
+
+
+ /**
* write string to outputStream.
*
* @return the length of string represented by byte[].