This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e7be1c  [IOTDB-615] Use binary rather than string in insert plan 
(#1229)
4e7be1c is described below

commit 4e7be1c204acc47b362fe74b84f7e7b6d742e0b8
Author: SilverNarcissus <[email protected]>
AuthorDate: Thu May 28 13:56:23 2020 +0800

    [IOTDB-615] Use binary rather than string in insert plan (#1229)
    
    * add insert object interface in session
---
 .../org/apache/iotdb/flink/FlinkIoTDBSink.java     |   1 +
 .../java/org/apache/iotdb/rocketmq/Constant.java   |  40 +--
 .../apache/iotdb/rocketmq/RocketMQConsumer.java    |  79 ++++--
 .../main/java/org/apache/iotdb/SessionExample.java |  43 ++-
 .../java/org/apache/iotdb/SessionPoolExample.java  |  17 +-
 .../iotdb/flink/DefaultIoTSerializationSchema.java | 144 ++++++----
 .../main/java/org/apache/iotdb/flink/Event.java    |  67 +++--
 .../java/org/apache/iotdb/flink/IoTDBSink.java     | 282 +++++++++----------
 .../flink/DefaultIoTSerializationSchemaTest.java   |  11 +-
 .../iotdb/flink/IoTDBSinkBatchInsertTest.java      | 157 ++++++-----
 .../iotdb/flink/IoTDBSinkBatchTimerTest.java       |  80 +++---
 .../apache/iotdb/flink/IoTDBSinkInsertTest.java    |  68 ++---
 .../iotdb/db/engine/memtable/AbstractMemTable.java |  32 +--
 .../db/engine/storagegroup/TsFileProcessor.java    |   2 +-
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |  69 ++++-
 .../iotdb/db/qp/physical/crud/InsertPlan.java      | 209 +++++++++++++--
 .../iotdb/db/qp/strategy/PhysicalGenerator.java    |   1 +
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  42 +--
 .../org/apache/iotdb/db/utils/CommonUtils.java     |  44 ++-
 .../iotdb/db/engine/storagegroup/TTLTest.java      |  23 +-
 .../org/apache/iotdb/db/tools/WalCheckerTest.java  |  11 +-
 .../apache/iotdb/db/writelog/PerformanceTest.java  |   5 +-
 .../iotdb/db/writelog/WriteLogNodeManagerTest.java |   2 +
 .../apache/iotdb/db/writelog/WriteLogNodeTest.java |   6 +
 .../iotdb/db/writelog/io/LogWriterReaderTest.java  |   5 +-
 .../iotdb/db/writelog/recover/LogReplayerTest.java |  19 +-
 .../recover/RecoverResourceFromReaderTest.java     |  19 +-
 .../db/writelog/recover/SeqTsFileRecoverTest.java  |   7 +-
 .../writelog/recover/UnseqTsFileRecoverTest.java   |  26 +-
 service-rpc/rpc-changelist.md                      |   5 +-
 service-rpc/src/main/thrift/rpc.thrift             |   4 +-
 .../java/org/apache/iotdb/session/Session.java     | 297 ++++++++++++++++-----
 .../org/apache/iotdb/session/pool/SessionPool.java |  48 ++--
 .../org/apache/iotdb/session/IoTDBSessionIT.java   | 187 ++++++++++++-
 .../iotdb/session/IoTDBSessionIteratorIT.java      |  21 +-
 .../apache/iotdb/session/pool/SessionPoolTest.java |  18 +-
 .../java/org/apache/iotdb/tsfile/utils/Binary.java |   9 +-
 .../iotdb/tsfile/utils/ReadWriteIOUtils.java       |  54 +++-
 38 files changed, 1481 insertions(+), 673 deletions(-)

diff --git 
a/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java 
b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java
index 1048079..ca8d205 100644
--- a/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java
+++ b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java
@@ -64,6 +64,7 @@ public class FlinkIoTDBSink {
                 tuple.put("device", "root.sg.d1");
                 tuple.put("timestamp", 
String.valueOf(System.currentTimeMillis()));
                 tuple.put("measurements", "s1");
+                tuple.put("types", "DOUBLE");
                 tuple.put("values", String.valueOf(random.nextDouble()));
 
                 context.collect(tuple);
diff --git 
a/example/rocketmq/src/main/java/org/apache/iotdb/rocketmq/Constant.java 
b/example/rocketmq/src/main/java/org/apache/iotdb/rocketmq/Constant.java
index 58785a3..5c465e9 100644
--- a/example/rocketmq/src/main/java/org/apache/iotdb/rocketmq/Constant.java
+++ b/example/rocketmq/src/main/java/org/apache/iotdb/rocketmq/Constant.java
@@ -41,25 +41,25 @@ public class Constant {
       {"root.test.d1.s0", "INT32", "PLAIN", "SNAPPY"},
   };
   public static final String[] ALL_DATA = {
-      "root.vehicle.d0,10,s0,100",
-      "root.vehicle.d0,12,s0:s1,101:'employeeId102'",
-      "root.vehicle.d0,19,s1,'employeeId103'",
-      "root.vehicle.d1,11,s2,104.0",
-      "root.vehicle.d1,15,s2:s3,105.0:true",
-      "root.vehicle.d1,17,s3,false",
-      "root.vehicle.d0,20,s0,1000",
-      "root.vehicle.d0,22,s0:s1,1001:'employeeId1002'",
-      "root.vehicle.d0,29,s1,'employeeId1003'",
-      "root.vehicle.d1,21,s2,1004.0",
-      "root.vehicle.d1,25,s2:s3,1005.0:true",
-      "root.vehicle.d1,27,s3,true",
-      "root.test.d0,10,s0,106",
-      "root.test.d0,14,s0:s1,107:'employeeId108'",
-      "root.test.d0,16,s1,'employeeId109'",
-      "root.test.d1,1,s0,110",
-      "root.test.d0,30,s0,1006",
-      "root.test.d0,34,s0:s1,1007:'employeeId1008'",
-      "root.test.d0,36,s1,'employeeId1090'",
-      "root.test.d1,10,s0,1100",
+      "root.vehicle.d0,10,s0,INT32,100",
+      "root.vehicle.d0,12,s0:s1,INT32:TEXT,101:'employeeId102'",
+      "root.vehicle.d0,19,s1,TEXT,'employeeId103'",
+      "root.vehicle.d1,11,s2,FLOAT,104.0",
+      "root.vehicle.d1,15,s2:s3,FLOAT:BOOLEAN,105.0:true",
+      "root.vehicle.d1,17,s3,BOOLEAN,false",
+      "root.vehicle.d0,20,s0,INT32,1000",
+      "root.vehicle.d0,22,s0:s1,INT32:TEXT,1001:'employeeId1002'",
+      "root.vehicle.d0,29,s1,TEXT,'employeeId1003'",
+      "root.vehicle.d1,21,s2,FLOAT,1004.0",
+      "root.vehicle.d1,25,s2:s3,FLOAT:BOOLEAN,1005.0:true",
+      "root.vehicle.d1,27,s3,BOOLEAN,true",
+      "root.test.d0,10,s0,INT32,106",
+      "root.test.d0,14,s0:s1,INT32:TEXT,107:'employeeId108'",
+      "root.test.d0,16,s1,TEXT,'employeeId109'",
+      "root.test.d1,1,s0,INT32,110",
+      "root.test.d0,30,s0,INT32,1006",
+      "root.test.d0,34,s0:s1,INT32:TEXT,1007:'employeeId1008'",
+      "root.test.d0,36,s1,TEXT,'employeeId1090'",
+      "root.test.d1,10,s0,INT32,1100",
   };
 }
diff --git 
a/example/rocketmq/src/main/java/org/apache/iotdb/rocketmq/RocketMQConsumer.java
 
b/example/rocketmq/src/main/java/org/apache/iotdb/rocketmq/RocketMQConsumer.java
index 696b2b9..6a67446 100644
--- 
a/example/rocketmq/src/main/java/org/apache/iotdb/rocketmq/RocketMQConsumer.java
+++ 
b/example/rocketmq/src/main/java/org/apache/iotdb/rocketmq/RocketMQConsumer.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.rocketmq;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
@@ -37,11 +38,11 @@ import org.slf4j.LoggerFactory;
 
 public class RocketMQConsumer {
 
+  private static final Logger logger = 
LoggerFactory.getLogger(RocketMQConsumer.class);
+  private static Session session;
   private DefaultMQPushConsumer consumer;
   private String producerGroup;
   private String serverAddresses;
-  private static Session session;
-  private static final Logger logger = 
LoggerFactory.getLogger(RocketMQConsumer.class);
 
   public RocketMQConsumer(String producerGroup, String serverAddresses, String 
connectionHost,
       int connectionPort, String user, String password)
@@ -53,6 +54,22 @@ public class RocketMQConsumer {
     initIoTDB(connectionHost, connectionPort, user, password);
   }
 
+  public static void main(String[] args)
+      throws MQClientException, StatementExecutionException, 
IoTDBConnectionException {
+    /**
+     *Instantiate with specified consumer group name and specify name server 
addresses.
+     */
+    RocketMQConsumer consumer = new RocketMQConsumer(Constant.CONSUMER_GROUP,
+        Constant.SERVER_ADDRESS,
+        Constant.IOTDB_CONNECTION_HOST,
+        Constant.IOTDB_CONNECTION_PORT,
+        Constant.IOTDB_CONNECTION_USER,
+        Constant.IOTDB_CONNECTION_PASSWORD);
+    consumer.prepareConsume();
+    consumer.addStorageGroup(Constant.ADDITIONAL_STORAGE_GROUP);
+    consumer.start();
+  }
+
   private void initIoTDB(String host, int port, String user, String password)
       throws IoTDBConnectionException, StatementExecutionException {
     if (host == null) {
@@ -76,7 +93,8 @@ public class RocketMQConsumer {
     session.setStorageGroup(storageGroup);
   }
 
-  private void createTimeseries(String[] sql) throws 
StatementExecutionException, IoTDBConnectionException {
+  private void createTimeseries(String[] sql)
+      throws StatementExecutionException, IoTDBConnectionException {
     String timeseries = sql[0];
     TSDataType dataType = TSDataType.valueOf(sql[1]);
     TSEncoding encoding = TSEncoding.valueOf(sql[2]);
@@ -89,8 +107,37 @@ public class RocketMQConsumer {
     String device = dataArray[0];
     long time = Long.parseLong(dataArray[1]);
     List<String> measurements = Arrays.asList(dataArray[2].split(":"));
-    List<String> values = Arrays.asList(dataArray[3].split(":"));
-    session.insertRecord(device, time, measurements, values);
+    List<TSDataType> types = new ArrayList<>();
+    for (String type : dataArray[3].split(":")) {
+      types.add(TSDataType.valueOf(type));
+    }
+
+    List<Object> values = new ArrayList<>();
+    String[] valuesStr = dataArray[4].split(":");
+    for (int i = 0; i < valuesStr.length; i++) {
+      switch (types.get(i)) {
+        case INT64:
+          values.add(Long.parseLong(valuesStr[i]));
+          break;
+        case DOUBLE:
+          values.add(Double.parseDouble(valuesStr[i]));
+          break;
+        case INT32:
+          values.add(Integer.parseInt(valuesStr[i]));
+          break;
+        case TEXT:
+          values.add(valuesStr[i]);
+          break;
+        case FLOAT:
+          values.add(Float.parseFloat(valuesStr[i]));
+          break;
+        case BOOLEAN:
+          values.add(Boolean.parseBoolean(valuesStr[i]));
+          break;
+      }
+    }
+
+    session.insertRecord(device, time, measurements, types, values);
   }
 
   public void start() throws MQClientException {
@@ -99,6 +146,7 @@ public class RocketMQConsumer {
 
   /**
    * Subscribe topic and add register Listener
+   *
    * @throws MQClientException
    */
   public void prepareConsume() throws MQClientException {
@@ -116,8 +164,9 @@ public class RocketMQConsumer {
      */
     consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) 
-> {
       for (MessageExt msg : msgs) {
-        logger.info(String.format("%s Receive New Messages: %s %n", 
Thread.currentThread().getName(),
-            new String(msg.getBody())));
+        logger
+            .info(String.format("%s Receive New Messages: %s %n", 
Thread.currentThread().getName(),
+                new String(msg.getBody())));
         try {
           insert(new String(msg.getBody()));
         } catch (Exception e) {
@@ -147,20 +196,4 @@ public class RocketMQConsumer {
   public void setServerAddresses(String serverAddresses) {
     this.serverAddresses = serverAddresses;
   }
-
-  public static void main(String[] args)
-      throws MQClientException, StatementExecutionException, 
IoTDBConnectionException {
-    /**
-     *Instantiate with specified consumer group name and specify name server 
addresses.
-     */
-    RocketMQConsumer consumer = new RocketMQConsumer(Constant.CONSUMER_GROUP,
-        Constant.SERVER_ADDRESS,
-        Constant.IOTDB_CONNECTION_HOST,
-        Constant.IOTDB_CONNECTION_PORT,
-        Constant.IOTDB_CONNECTION_USER,
-        Constant.IOTDB_CONNECTION_PASSWORD);
-    consumer.prepareConsume();
-    consumer.addStorageGroup(Constant.ADDITIONAL_STORAGE_GROUP);
-    consumer.start();
-  }
 }
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java 
b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 444d862..f7e7db9 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -136,15 +136,20 @@ public class SessionExample {
   private static void insertRecord() throws IoTDBConnectionException, 
StatementExecutionException {
     String deviceId = "root.sg1.d1";
     List<String> measurements = new ArrayList<>();
+    List<TSDataType> types = new ArrayList<>();
     measurements.add("s1");
     measurements.add("s2");
     measurements.add("s3");
+    types.add(TSDataType.INT64);
+    types.add(TSDataType.INT64);
+    types.add(TSDataType.INT64);
+
     for (long time = 0; time < 100; time++) {
-      List<String> values = new ArrayList<>();
-      values.add("1");
-      values.add("2");
-      values.add("3");
-      session.insertRecord(deviceId, time, measurements, values);
+      List<Object> values = new ArrayList<>();
+      values.add(1L);
+      values.add(2L);
+      values.add(3L);
+      session.insertRecord(deviceId, time, measurements, types, values);
     }
   }
 
@@ -152,11 +157,16 @@ public class SessionExample {
       throws IoTDBConnectionException, StatementExecutionException {
     String deviceId = "root.sg1.d1";
     List<String> measurements = new ArrayList<>();
+    List<TSDataType> types = new ArrayList<>();
     measurements.add("s1");
     measurements.add("s2");
     measurements.add("s3");
+    types.add(TSDataType.INT64);
+    types.add(TSDataType.INT64);
+    types.add(TSDataType.INT64);
+
     for (long time = 0; time < 100; time++) {
-      session.insertRecord(deviceId, time, measurements, 1L, 1L, 1L);
+      session.insertRecord(deviceId, time, measurements, types, 1L, 1L, 1L);
     }
   }
 
@@ -168,21 +178,27 @@ public class SessionExample {
     measurements.add("s3");
     List<String> deviceIds = new ArrayList<>();
     List<List<String>> measurementsList = new ArrayList<>();
-    List<List<String>> valuesList = new ArrayList<>();
+    List<List<Object>> valuesList = new ArrayList<>();
     List<Long> timestamps = new ArrayList<>();
+    List<List<TSDataType>> typesList = new ArrayList<>();
 
     for (long time = 0; time < 500; time++) {
-      List<String> values = new ArrayList<>();
-      values.add("1");
-      values.add("2");
-      values.add("3");
+      List<Object> values = new ArrayList<>();
+      List<TSDataType> types = new ArrayList<>();
+      values.add(1L);
+      values.add(2L);
+      values.add(3L);
+      types.add(TSDataType.INT64);
+      types.add(TSDataType.INT64);
+      types.add(TSDataType.INT64);
 
       deviceIds.add(deviceId);
       measurementsList.add(measurements);
       valuesList.add(values);
+      typesList.add(types);
       timestamps.add(time);
       if (time != 0 && time % 100 == 0) {
-        session.insertRecords(deviceIds, timestamps, measurementsList, 
valuesList);
+        session.insertRecords(deviceIds, timestamps, measurementsList, 
typesList, valuesList);
         deviceIds.clear();
         measurementsList.clear();
         valuesList.clear();
@@ -190,9 +206,8 @@ public class SessionExample {
       }
     }
 
-    session.insertRecords(deviceIds, timestamps, measurementsList, valuesList);
+    session.insertRecords(deviceIds, timestamps, measurementsList, typesList, 
valuesList);
   }
-
   /**
    * insert the data of a device. For each timestamp, the number of 
measurements is the same.
    *
diff --git 
a/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java 
b/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
index 96a660d..395312e 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -28,6 +27,7 @@ import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.session.SessionDataSet.DataIterator;
 import org.apache.iotdb.session.pool.SessionDataSetWrapper;
 import org.apache.iotdb.session.pool.SessionPool;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 public class SessionPoolExample {
 
@@ -51,15 +51,20 @@ public class SessionPoolExample {
   private static void insertRecord() throws StatementExecutionException, 
IoTDBConnectionException {
     String deviceId = "root.sg1.d1";
     List<String> measurements = new ArrayList<>();
+    List<TSDataType> types = new ArrayList<>();
     measurements.add("s1");
     measurements.add("s2");
     measurements.add("s3");
+    types.add(TSDataType.INT64);
+    types.add(TSDataType.INT64);
+    types.add(TSDataType.INT64);
+
     for (long time = 0; time < 10; time++) {
-      List<String> values = new ArrayList<>();
-      values.add("1");
-      values.add("2");
-      values.add("3");
-      pool.insertRecord(deviceId, time, measurements, values);
+      List<Object> values = new ArrayList<>();
+      values.add(1L);
+      values.add(2L);
+      values.add(3L);
+      pool.insertRecord(deviceId, time, measurements, types, values);
     }
   }
 
diff --git 
a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/DefaultIoTSerializationSchema.java
 
b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/DefaultIoTSerializationSchema.java
index 4ac596b..941b5ef 100644
--- 
a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/DefaultIoTSerializationSchema.java
+++ 
b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/DefaultIoTSerializationSchema.java
@@ -18,82 +18,110 @@
 
 package org.apache.iotdb.flink;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 /**
- * @inheritDoc
- * The default implementation of IoTSerializationSchema. Gets info from a map 
struct.
+ * @inheritDoc The default implementation of IoTSerializationSchema. Gets info 
from a map struct.
  */
-public class DefaultIoTSerializationSchema implements 
IoTSerializationSchema<Map<String,String>> {
-    private String fieldDevice = "device";
-    private String fieldTimestamp = "timestamp";
-    private String fieldMeasurements = "measurements";
-    private String fieldValues = "values";
-    private String separator = ",";
-
-    @Override
-    public Event serialize(Map<String,String> tuple) {
-        if (tuple == null) {
-            return null;
-        }
-
-        String device = tuple.get(fieldDevice);
-
-        String ts = tuple.get(fieldTimestamp);
-        Long timestamp = ts == null ? System.currentTimeMillis() : 
Long.parseLong(ts);
-
-        List<String> measurements = null;
-        if (tuple.get(fieldMeasurements) != null) {
-            measurements = 
Arrays.asList(tuple.get(fieldMeasurements).split(separator));
-        }
-
-        List<String> values = null;
-        if (tuple.get(fieldValues) != null) {
-            values = Arrays.asList(tuple.get(fieldValues).split(separator));
-        }
-
-        return new Event(device, timestamp, measurements, values);
+public class DefaultIoTSerializationSchema implements 
IoTSerializationSchema<Map<String, String>> {
+
+  private String fieldDevice = "device";
+  private String fieldTimestamp = "timestamp";
+  private String fieldMeasurements = "measurements";
+  private String fieldValues = "values";
+  private String fieldTypes = "types";
+  private String separator = ",";
+
+  @Override
+  public Event serialize(Map<String, String> tuple) {
+    if (tuple == null) {
+      return null;
     }
 
-    public String getFieldDevice() {
-        return fieldDevice;
-    }
+    String device = tuple.get(fieldDevice);
 
-    public void setFieldDevice(String fieldDevice) {
-        this.fieldDevice = fieldDevice;
-    }
+    String ts = tuple.get(fieldTimestamp);
+    Long timestamp = ts == null ? System.currentTimeMillis() : 
Long.parseLong(ts);
 
-    public String getFieldTimestamp() {
-        return fieldTimestamp;
+    List<String> measurements = null;
+    if (tuple.get(fieldMeasurements) != null) {
+      measurements = 
Arrays.asList(tuple.get(fieldMeasurements).split(separator));
     }
 
-    public void setFieldTimestamp(String fieldTimestamp) {
-        this.fieldTimestamp = fieldTimestamp;
+    List<TSDataType> types = new ArrayList<>();
+    for (String type : tuple.get(fieldTypes).split(separator)) {
+      types.add(TSDataType.valueOf(type));
     }
 
-    public String getFieldMeasurements() {
-        return fieldMeasurements;
+    List<Object> values = new ArrayList<>();
+    String[] valuesStr = tuple.get(fieldValues).split(separator);
+    for (int i = 0; i < valuesStr.length; i++) {
+      switch (types.get(i)) {
+        case INT64:
+          values.add(Long.parseLong(valuesStr[i]));
+          break;
+        case DOUBLE:
+          values.add(Double.parseDouble(valuesStr[i]));
+          break;
+        case INT32:
+          values.add(Integer.parseInt(valuesStr[i]));
+          break;
+        case TEXT:
+          values.add(valuesStr[i]);
+          break;
+        case FLOAT:
+          values.add(Float.parseFloat(valuesStr[i]));
+          break;
+        case BOOLEAN:
+          values.add(Boolean.parseBoolean(valuesStr[i]));
+          break;
+      }
     }
 
-    public void setFieldMeasurements(String fieldMeasurements) {
-        this.fieldMeasurements = fieldMeasurements;
-    }
+    return new Event(device, timestamp, measurements, types, values);
+  }
 
-    public String getFieldValues() {
-        return fieldValues;
-    }
+  public String getFieldDevice() {
+    return fieldDevice;
+  }
 
-    public void setFieldValues(String fieldValues) {
-        this.fieldValues = fieldValues;
-    }
+  public void setFieldDevice(String fieldDevice) {
+    this.fieldDevice = fieldDevice;
+  }
 
-    public String getSeparator() {
-        return separator;
-    }
+  public String getFieldTimestamp() {
+    return fieldTimestamp;
+  }
 
-    public void setSeparator(String separator) {
-        this.separator = separator;
-    }
+  public void setFieldTimestamp(String fieldTimestamp) {
+    this.fieldTimestamp = fieldTimestamp;
+  }
+
+  public String getFieldMeasurements() {
+    return fieldMeasurements;
+  }
+
+  public void setFieldMeasurements(String fieldMeasurements) {
+    this.fieldMeasurements = fieldMeasurements;
+  }
+
+  public String getFieldValues() {
+    return fieldValues;
+  }
+
+  public void setFieldValues(String fieldValues) {
+    this.fieldValues = fieldValues;
+  }
+
+  public String getSeparator() {
+    return separator;
+  }
+
+  public void setSeparator(String separator) {
+    this.separator = separator;
+  }
 }
diff --git 
a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/Event.java 
b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/Event.java
index 6cccb23..83fc4a3 100644
--- a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/Event.java
+++ b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/Event.java
@@ -19,36 +19,49 @@
 package org.apache.iotdb.flink;
 
 import java.util.List;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 /**
  * Event serializes the device/sensor related data, such as time, measurements 
etc.
  */
 public class Event {
-    private String device;
-    private Long timestamp;
-    private List<String> measurements;
-    private List<String> values;
-
-    public Event(String device, Long timestamp, List<String> measurements, 
List<String> values) {
-        this.device = device;
-        this.timestamp = timestamp;
-        this.measurements = measurements;
-        this.values = values;
-    }
-
-    public String getDevice() {
-        return device;
-    }
-
-    public Long getTimestamp() {
-        return timestamp;
-    }
-
-    public List<String> getMeasurements() {
-        return measurements;
-    }
-
-    public List<String> getValues() {
-        return values;
-    }
+
+  private String device;
+  private Long timestamp;
+  private List<String> measurements;
+  private List<TSDataType> types;
+  private List<Object> values;
+
+  public Event(String device, Long timestamp, List<String> measurements, 
List<TSDataType> types,
+      List<Object> values) {
+    this.device = device;
+    this.timestamp = timestamp;
+    this.measurements = measurements;
+    this.types = types;
+    this.values = values;
+  }
+
+  public List<TSDataType> getTypes() {
+    return types;
+  }
+
+  public void setTypes(List<TSDataType> types) {
+    this.types = types;
+  }
+
+  public String getDevice() {
+    return device;
+  }
+
+  public Long getTimestamp() {
+    return timestamp;
+  }
+
+  public List<String> getMeasurements() {
+    return measurements;
+  }
+
+  public List<Object> getValues() {
+    return values;
+  }
 }
diff --git 
a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java 
b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
index bb3e042..c63871b 100644
--- a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
+++ b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
@@ -32,161 +32,167 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 /**
- * The `IoTDBSink` allows flink jobs to write events into IoTDB timeseries.
- * By default send only one event after another, but you can change to batch 
by invoking `withBatchSize(int)`.
+ * The `IoTDBSink` allows flink jobs to write events into IoTDB timeseries. By 
default send only one
+ * event after another, but you can change to batch by invoking 
`withBatchSize(int)`.
+ *
  * @param <IN> the input data type
  */
 public class IoTDBSink<IN> extends RichSinkFunction<IN> {
 
-    private static final long serialVersionUID = 1L;
-    private static final Logger LOG = LoggerFactory.getLogger(IoTDBSink.class);
-
-    private IoTDBOptions options;
-    private IoTSerializationSchema<IN> serializationSchema;
-    private Map<String, IoTDBOptions.TimeseriesOption> timeseriesOptionMap;
-    private transient SessionPool pool;
-    private transient ScheduledExecutorService scheduledExecutor;
-
-    private int batchSize = 0;
-    private int flushIntervalMs = 3000;
-    private List<Event> batchList;
-    private int sessionPoolSize = 2;
-
-    public IoTDBSink(IoTDBOptions options, IoTSerializationSchema<IN> schema) {
-        this.options = options;
-        this.serializationSchema = schema;
-        this.batchList = new LinkedList<>();
-        this.timeseriesOptionMap = new HashMap<>();
-        for (IoTDBOptions.TimeseriesOption timeseriesOption : 
options.getTimeseriesOptionList()) {
-            timeseriesOptionMap.put(timeseriesOption.getPath(), 
timeseriesOption);
-        }
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = LoggerFactory.getLogger(IoTDBSink.class);
+
+  private IoTDBOptions options;
+  private IoTSerializationSchema<IN> serializationSchema;
+  private Map<String, IoTDBOptions.TimeseriesOption> timeseriesOptionMap;
+  private transient SessionPool pool;
+  private transient ScheduledExecutorService scheduledExecutor;
+
+  private int batchSize = 0;
+  private int flushIntervalMs = 3000;
+  private List<Event> batchList;
+  private int sessionPoolSize = 2;
+
+  public IoTDBSink(IoTDBOptions options, IoTSerializationSchema<IN> schema) {
+    this.options = options;
+    this.serializationSchema = schema;
+    this.batchList = new LinkedList<>();
+    this.timeseriesOptionMap = new HashMap<>();
+    for (IoTDBOptions.TimeseriesOption timeseriesOption : 
options.getTimeseriesOptionList()) {
+      timeseriesOptionMap.put(timeseriesOption.getPath(), timeseriesOption);
     }
-
-    @Override
-    public void open(Configuration parameters) throws Exception {
-        initSession();
-        initScheduler();
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    initSession();
+    initScheduler();
+  }
+
+  void initSession() throws Exception {
+    pool = new SessionPool(options.getHost(), options.getPort(), 
options.getUser(),
+        options.getPassword(), sessionPoolSize);
+
+    pool.setStorageGroup(options.getStorageGroup());
+    for (IoTDBOptions.TimeseriesOption option : 
options.getTimeseriesOptionList()) {
+      if (!pool.checkTimeseriesExists(option.getPath())) {
+        pool.createTimeseries(option.getPath(), option.getDataType(), 
option.getEncoding(),
+            option.getCompressor());
+      }
     }
-
-    void initSession() throws Exception {
-        pool = new SessionPool(options.getHost(), options.getPort(), 
options.getUser(), options.getPassword(), sessionPoolSize);
-
-        pool.setStorageGroup(options.getStorageGroup());
-        for (IoTDBOptions.TimeseriesOption option : 
options.getTimeseriesOptionList()) {
-            if (!pool.checkTimeseriesExists(option.getPath())) {
-                pool.createTimeseries(option.getPath(), option.getDataType(), 
option.getEncoding(), option.getCompressor());
-            }
+  }
+
+  void initScheduler() {
+    if (batchSize > 0) {
+      scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
+      scheduledExecutor.scheduleAtFixedRate(() -> {
+        try {
+          flush();
+        } catch (Exception e) {
+          LOG.error("flush error", e);
         }
+      }, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS);
     }
-
-    void initScheduler() {
-        if (batchSize > 0) {
-            scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
-            scheduledExecutor.scheduleAtFixedRate(() -> {
-                try {
-                    flush();
-                } catch (Exception e) {
-                    LOG.error("flush error", e);
-                }
-            }, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS);
-        }
-    }
-
-    //  for testing
-    void setSessionPool(SessionPool pool) {
-        this.pool = pool;
+  }
+
+  //  for testing
+  void setSessionPool(SessionPool pool) {
+    this.pool = pool;
+  }
+
+  @Override
+  public void invoke(IN input, Context context) throws Exception {
+    Event event = serializationSchema.serialize(input);
+    if (event == null) {
+      return;
     }
 
-    @Override
-    public void invoke(IN input, Context context) throws Exception {
-        Event event = serializationSchema.serialize(input);
-        if (event == null) {
-            return;
-        }
-
-        if (batchSize > 0) {
-            synchronized (batchList) {
-                batchList.add(event);
-                if (batchList.size() >= batchSize) {
-                    flush();
-                }
-                return;
-            }
+    if (batchSize > 0) {
+      synchronized (batchList) {
+        batchList.add(event);
+        if (batchList.size() >= batchSize) {
+          flush();
         }
-
-        convertText(event.getDevice(), event.getMeasurements(), 
event.getValues());
-        pool.insertRecord(event.getDevice(), event.getTimestamp(), 
event.getMeasurements(),
-                event.getValues());
-        LOG.debug("send event successfully");
-    }
-
-    public IoTDBSink<IN> withBatchSize(int batchSize) {
-        Preconditions.checkArgument(batchSize >= 0);
-        this.batchSize = batchSize;
-        return this;
-    }
-
-    public IoTDBSink<IN> withFlushIntervalMs(int flushIntervalMs) {
-        Preconditions.checkArgument(flushIntervalMs > 0);
-        this.flushIntervalMs = flushIntervalMs;
-        return this;
+        return;
+      }
     }
 
-    public IoTDBSink<IN> withSessionPoolSize(int sessionPoolSize) {
-        Preconditions.checkArgument(sessionPoolSize > 0);
-        this.sessionPoolSize = sessionPoolSize;
-        return this;
+    convertText(event.getDevice(), event.getMeasurements(), event.getValues());
+    pool.insertRecord(event.getDevice(), event.getTimestamp(), 
event.getMeasurements(),
+        event.getTypes(), event.getValues());
+    LOG.debug("send event successfully");
+  }
+
+  public IoTDBSink<IN> withBatchSize(int batchSize) {
+    Preconditions.checkArgument(batchSize >= 0);
+    this.batchSize = batchSize;
+    return this;
+  }
+
+  public IoTDBSink<IN> withFlushIntervalMs(int flushIntervalMs) {
+    Preconditions.checkArgument(flushIntervalMs > 0);
+    this.flushIntervalMs = flushIntervalMs;
+    return this;
+  }
+
+  public IoTDBSink<IN> withSessionPoolSize(int sessionPoolSize) {
+    Preconditions.checkArgument(sessionPoolSize > 0);
+    this.sessionPoolSize = sessionPoolSize;
+    return this;
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (pool != null) {
+      try {
+        flush();
+      } catch (Exception e) {
+        LOG.error("flush error", e);
+      }
+      pool.close();
     }
-
-    @Override
-    public void close() throws Exception {
-        if (pool != null) {
-            try {
-                flush();
-            } catch (Exception e) {
-                LOG.error("flush error", e);
-            }
-            pool.close();
-        }
-        if (scheduledExecutor != null) {
-            scheduledExecutor.shutdown();
-        }
+    if (scheduledExecutor != null) {
+      scheduledExecutor.shutdown();
     }
-
-    private void convertText(String device, List<String> measurements, 
List<String> values) {
-        if (device != null && measurements != null && values != null && 
measurements.size() == values.size()) {
-            for (int i = 0; i < measurements.size(); i++) {
-                String measurement = device + "." + measurements.get(i);
-                IoTDBOptions.TimeseriesOption timeseriesOption = 
timeseriesOptionMap.get(measurement);
-                if (timeseriesOption!= null && 
TSDataType.TEXT.equals(timeseriesOption.getDataType())) {
-                    // The TEXT data type should be covered by " or '
-                    values.set(i, "'" + values.get(i) + "'");
-                }
-            }
+  }
+
+  private void convertText(String device, List<String> measurements, 
List<Object> values) {
+    if (device != null && measurements != null && values != null && 
measurements.size() == values
+        .size()) {
+      for (int i = 0; i < measurements.size(); i++) {
+        String measurement = device + "." + measurements.get(i);
+        IoTDBOptions.TimeseriesOption timeseriesOption = 
timeseriesOptionMap.get(measurement);
+        if (timeseriesOption != null && 
TSDataType.TEXT.equals(timeseriesOption.getDataType())) {
+          // The TEXT data type should be covered by " or '
+          values.set(i, "'" + values.get(i) + "'");
         }
+      }
     }
-
-    private void flush() throws Exception {
-        if (batchSize > 0) {
-            synchronized (batchList) {
-                if (batchList.size() > 0) {
-                    List<String> deviceIds = new ArrayList<>();
-                    List<Long> timestamps = new ArrayList<>();
-                    List<List<String>> measurementsList = new ArrayList<>();
-                    List<List<String>> valuesList = new ArrayList<>();
-
-                    for (Event event : batchList) {
-                        convertText(event.getDevice(), 
event.getMeasurements(), event.getValues());
-                        deviceIds.add(event.getDevice());
-                        timestamps.add(event.getTimestamp());
-                        measurementsList.add(event.getMeasurements());
-                        valuesList.add(event.getValues());
-                    }
-                    pool.insertRecords(deviceIds, timestamps, 
measurementsList, valuesList);
-                    LOG.debug("send event successfully");
-                    batchList.clear();
-                }
-            }
+  }
+
+  private void flush() throws Exception {
+    if (batchSize > 0) {
+      synchronized (batchList) {
+        if (batchList.size() > 0) {
+          List<String> deviceIds = new ArrayList<>();
+          List<Long> timestamps = new ArrayList<>();
+          List<List<String>> measurementsList = new ArrayList<>();
+          List<List<TSDataType>> typesList = new ArrayList<>();
+          List<List<Object>> valuesList = new ArrayList<>();
+
+          for (Event event : batchList) {
+            convertText(event.getDevice(), event.getMeasurements(), 
event.getValues());
+            deviceIds.add(event.getDevice());
+            timestamps.add(event.getTimestamp());
+            measurementsList.add(event.getMeasurements());
+            typesList.add(event.getTypes());
+            valuesList.add(event.getValues());
+          }
+          pool.insertRecords(deviceIds, timestamps, measurementsList, 
typesList, valuesList);
+          LOG.debug("send event successfully");
+          batchList.clear();
         }
+      }
     }
+  }
 }
diff --git 
a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/DefaultIoTSerializationSchemaTest.java
 
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/DefaultIoTSerializationSchemaTest.java
index efb5a34..8e696e4 100644
--- 
a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/DefaultIoTSerializationSchemaTest.java
+++ 
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/DefaultIoTSerializationSchemaTest.java
@@ -18,13 +18,12 @@
 
 package org.apache.iotdb.flink;
 
-import com.google.common.collect.Lists;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
 
+import com.google.common.collect.Lists;
 import java.util.HashMap;
 import java.util.Map;
-
-import static org.junit.Assert.*;
+import org.junit.Test;
 
 public class DefaultIoTSerializationSchemaTest {
 
@@ -38,12 +37,14 @@ public class DefaultIoTSerializationSchemaTest {
         tuple.put("device", "root.sg.D01");
         tuple.put("timestamp", "1581861293000");
         tuple.put("measurements", "temperature");
+        tuple.put("types", "DOUBLE");
         tuple.put("values", "36.5");
 
         Event event = serializationSchema.serialize(tuple);
         assertEquals(tuple.get("device"), event.getDevice());
         assertEquals(tuple.get("timestamp"), 
String.valueOf(event.getTimestamp()));
         assertEquals(tuple.get("measurements"), 
event.getMeasurements().get(0));
-        assertEquals(tuple.get("values"), event.getValues().get(0));
+        assertEquals(tuple.get("types"), event.getTypes().get(0).toString());
+        assertEquals(tuple.get("values"), 
String.valueOf(event.getValues().get(0)));
     }
 }
diff --git 
a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java
 
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java
index 7f6fe5e..930565b 100644
--- 
a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java
+++ 
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java
@@ -18,85 +18,94 @@
 
 package org.apache.iotdb.flink;
 
-import com.google.common.collect.Lists;
-import org.apache.iotdb.session.pool.SessionPool;
-import org.junit.Before;
-import org.junit.Test;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
 
+import com.google.common.collect.Lists;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
+import org.apache.iotdb.session.pool.SessionPool;
+import org.junit.Before;
+import org.junit.Test;
 
 public class IoTDBSinkBatchInsertTest {
 
-    private IoTDBSink ioTDBSink;
-    private SessionPool pool;
-
-    @Before
-    public void setUp() throws Exception {
-        IoTDBOptions options = new IoTDBOptions();
-        options.setTimeseriesOptionList(Lists.newArrayList(new 
IoTDBOptions.TimeseriesOption("root.sg.D01.temperature")));
-        ioTDBSink = new IoTDBSink(options, new 
DefaultIoTSerializationSchema());
-        ioTDBSink.withBatchSize(3);
-
-        pool = mock(SessionPool.class);
-        ioTDBSink.setSessionPool(pool);
-    }
-
-    @Test
-    public void testBatchInsert() throws Exception {
-        Map<String,String> tuple = new HashMap();
-        tuple.put("device", "root.sg.D01");
-        tuple.put("timestamp", "1581861293000");
-        tuple.put("measurements", "temperature");
-        tuple.put("values", "36.5");
-        ioTDBSink.invoke(tuple, null);
-
-        verifyZeroInteractions(pool);
-
-        tuple = new HashMap();
-        tuple.put("device", "root.sg.D01");
-        tuple.put("timestamp", "1581861293001");
-        tuple.put("measurements", "temperature");
-        tuple.put("values", "37.2");
-        ioTDBSink.invoke(tuple, null);
-
-        verifyZeroInteractions(pool);
-
-        tuple = new HashMap();
-        tuple.put("device", "root.sg.D01");
-        tuple.put("timestamp", "1581861293003");
-        tuple.put("measurements", "temperature");
-        tuple.put("values", "37.1");
-        ioTDBSink.invoke(tuple, null);
-
-        verify(pool).insertRecords(any(List.class), any(List.class), 
any(List.class), any(List.class));
-
-        tuple = new HashMap();
-        tuple.put("device", "root.sg.D01");
-        tuple.put("timestamp", "1581861293005");
-        tuple.put("measurements", "temperature");
-        tuple.put("values", "36.5");
-        ioTDBSink.invoke(tuple, null);
-
-        verifyZeroInteractions(pool);
-    }
-
-    @Test
-    public void close() throws Exception {
-        Map<String,String> tuple = new HashMap();
-        tuple.put("device", "root.sg.D01");
-        tuple.put("timestamp", "1581861293005");
-        tuple.put("measurements", "temperature");
-        tuple.put("values", "36.5");
-        ioTDBSink.invoke(tuple, null);
-        verifyZeroInteractions(pool);
-
-        ioTDBSink.close();
-        verify(pool).insertRecords(any(List.class), any(List.class), 
any(List.class), any(List.class));
-        verify(pool).close();
-    }
+  private IoTDBSink ioTDBSink;
+  private SessionPool pool;
+
+  @Before
+  public void setUp() throws Exception {
+    IoTDBOptions options = new IoTDBOptions();
+    options.setTimeseriesOptionList(
+        Lists.newArrayList(new 
IoTDBOptions.TimeseriesOption("root.sg.D01.temperature")));
+    ioTDBSink = new IoTDBSink(options, new DefaultIoTSerializationSchema());
+    ioTDBSink.withBatchSize(3);
+
+    pool = mock(SessionPool.class);
+    ioTDBSink.setSessionPool(pool);
+  }
+
+  @Test
+  public void testBatchInsert() throws Exception {
+    Map<String, String> tuple = new HashMap();
+    tuple.put("device", "root.sg.D01");
+    tuple.put("timestamp", "1581861293000");
+    tuple.put("measurements", "temperature");
+    tuple.put("types", "DOUBLE");
+    tuple.put("values", "36.5");
+    ioTDBSink.invoke(tuple, null);
+
+    verifyZeroInteractions(pool);
+
+    tuple = new HashMap();
+    tuple.put("device", "root.sg.D01");
+    tuple.put("timestamp", "1581861293001");
+    tuple.put("measurements", "temperature");
+    tuple.put("types", "DOUBLE");
+    tuple.put("values", "37.2");
+    ioTDBSink.invoke(tuple, null);
+
+    verifyZeroInteractions(pool);
+
+    tuple = new HashMap();
+    tuple.put("device", "root.sg.D01");
+    tuple.put("timestamp", "1581861293003");
+    tuple.put("measurements", "temperature");
+    tuple.put("types", "DOUBLE");
+    tuple.put("values", "37.1");
+    ioTDBSink.invoke(tuple, null);
+
+    verify(pool).insertRecords(any(List.class), any(List.class), 
any(List.class), any(List.class),
+        any(List.class));
+
+    tuple = new HashMap();
+    tuple.put("device", "root.sg.D01");
+    tuple.put("timestamp", "1581861293005");
+    tuple.put("measurements", "temperature");
+    tuple.put("types", "DOUBLE");
+    tuple.put("values", "36.5");
+    ioTDBSink.invoke(tuple, null);
+
+    verifyZeroInteractions(pool);
+  }
+
+  @Test
+  public void close() throws Exception {
+    Map<String, String> tuple = new HashMap();
+    tuple.put("device", "root.sg.D01");
+    tuple.put("timestamp", "1581861293005");
+    tuple.put("measurements", "temperature");
+    tuple.put("types", "DOUBLE");
+    tuple.put("values", "36.5");
+    ioTDBSink.invoke(tuple, null);
+    verifyZeroInteractions(pool);
+
+    ioTDBSink.close();
+    verify(pool).insertRecords(any(List.class), any(List.class), 
any(List.class), any(List.class),
+        any(List.class));
+    verify(pool).close();
+  }
 }
diff --git 
a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchTimerTest.java
 
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchTimerTest.java
index f8c7f7e..f9e2d62 100644
--- 
a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchTimerTest.java
+++ 
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchTimerTest.java
@@ -18,57 +18,61 @@
 
 package org.apache.iotdb.flink;
 
-import com.google.common.collect.Lists;
-import org.apache.iotdb.session.pool.SessionPool;
-import org.junit.Before;
-import org.junit.Test;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
 
+import com.google.common.collect.Lists;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
+import org.apache.iotdb.session.pool.SessionPool;
+import org.junit.Before;
+import org.junit.Test;
 
 public class IoTDBSinkBatchTimerTest {
 
-    private IoTDBSink ioTDBSink;
-    private SessionPool pool;
+  private IoTDBSink ioTDBSink;
+  private SessionPool pool;
 
-    @Before
-    public void setUp() throws Exception {
-        IoTDBOptions options = new IoTDBOptions();
-        options.setTimeseriesOptionList(Lists.newArrayList(new 
IoTDBOptions.TimeseriesOption("root.sg.D01.temperature")));
-        ioTDBSink = new IoTDBSink(options, new 
DefaultIoTSerializationSchema());
-        ioTDBSink.withBatchSize(3);
-        ioTDBSink.withFlushIntervalMs(1000);
-        ioTDBSink.initScheduler();
+  @Before
+  public void setUp() throws Exception {
+    IoTDBOptions options = new IoTDBOptions();
+    options.setTimeseriesOptionList(
+        Lists.newArrayList(new 
IoTDBOptions.TimeseriesOption("root.sg.D01.temperature")));
+    ioTDBSink = new IoTDBSink(options, new DefaultIoTSerializationSchema());
+    ioTDBSink.withBatchSize(3);
+    ioTDBSink.withFlushIntervalMs(1000);
+    ioTDBSink.initScheduler();
 
-        pool = mock(SessionPool.class);
-        ioTDBSink.setSessionPool(pool);
-    }
+    pool = mock(SessionPool.class);
+    ioTDBSink.setSessionPool(pool);
+  }
 
-    @Test
-    public void testBatchInsert() throws Exception {
-        Map<String,String> tuple = new HashMap();
-        tuple.put("device", "root.sg.D01");
-        tuple.put("timestamp", "1581861293000");
-        tuple.put("measurements", "temperature");
-        tuple.put("values", "36.5");
-        ioTDBSink.invoke(tuple, null);
+  @Test
+  public void testBatchInsert() throws Exception {
+    Map<String, String> tuple = new HashMap();
+    tuple.put("device", "root.sg.D01");
+    tuple.put("timestamp", "1581861293000");
+    tuple.put("measurements", "temperature");
+    tuple.put("types", "DOUBLE");
+    tuple.put("values", "36.5");
+    ioTDBSink.invoke(tuple, null);
 
-        Thread.sleep(2500);
+    Thread.sleep(2500);
 
-        verify(pool).insertRecords(any(List.class), any(List.class), 
any(List.class), any(List.class));
+    verify(pool).insertRecords(any(List.class), any(List.class), 
any(List.class), any(List.class),
+        any(List.class));
 
-        Thread.sleep(1000);
+    Thread.sleep(1000);
 
-        verifyZeroInteractions(pool);
-    }
+    verifyZeroInteractions(pool);
+  }
 
-    @Test
-    public void close() throws Exception {
-        ioTDBSink.close();
-        verify(pool).close();
-    }
+  @Test
+  public void close() throws Exception {
+    ioTDBSink.close();
+    verify(pool).close();
+  }
 }
diff --git 
a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkInsertTest.java
 
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkInsertTest.java
index 3bcb367..6c268f6 100644
--- 
a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkInsertTest.java
+++ 
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkInsertTest.java
@@ -18,49 +18,51 @@
 
 package org.apache.iotdb.flink;
 
-import com.google.common.collect.Lists;
-import org.apache.iotdb.session.pool.SessionPool;
-import org.junit.Before;
-import org.junit.Test;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 
+import com.google.common.collect.Lists;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
+import org.apache.iotdb.session.pool.SessionPool;
+import org.junit.Before;
+import org.junit.Test;
 
 public class IoTDBSinkInsertTest {
 
-    private IoTDBSink ioTDBSink;
-    private SessionPool pool;
+  private IoTDBSink ioTDBSink;
+  private SessionPool pool;
 
-    @Before
-    public void setUp() throws Exception {
-        IoTDBOptions options = new IoTDBOptions();
-        options.setTimeseriesOptionList(Lists.newArrayList(new 
IoTDBOptions.TimeseriesOption("root.sg.D01.temperature")));
-        ioTDBSink = new IoTDBSink(options, new 
DefaultIoTSerializationSchema());
+  @Before
+  public void setUp() throws Exception {
+    IoTDBOptions options = new IoTDBOptions();
+    options.setTimeseriesOptionList(
+        Lists.newArrayList(new 
IoTDBOptions.TimeseriesOption("root.sg.D01.temperature")));
+    ioTDBSink = new IoTDBSink(options, new DefaultIoTSerializationSchema());
 
-        pool = mock(SessionPool.class);
-        ioTDBSink.setSessionPool(pool);
-    }
+    pool = mock(SessionPool.class);
+    ioTDBSink.setSessionPool(pool);
+  }
 
-    @Test
-    public void testInsert() throws Exception {
-        Map<String,String> tuple = new HashMap();
-        tuple.put("device", "root.sg.D01");
-        tuple.put("timestamp", "1581861293000");
-        tuple.put("measurements", "temperature");
-        tuple.put("values", "36.5");
+  @Test
+  public void testInsert() throws Exception {
+    Map<String, String> tuple = new HashMap();
+    tuple.put("device", "root.sg.D01");
+    tuple.put("timestamp", "1581861293000");
+    tuple.put("measurements", "temperature");
+    tuple.put("types", "DOUBLE");
+    tuple.put("values", "36.5");
 
-        ioTDBSink.invoke(tuple, null);
-        verify(pool).insertRecord(any(String.class), any(Long.class), 
any(List.class), any(List.class));
-    }
+    ioTDBSink.invoke(tuple, null);
+    verify(pool).insertRecord(any(String.class), any(Long.class), 
any(List.class), any(List.class),
+        any(List.class));
+  }
 
-    @Test
-    public void close() throws Exception {
-        ioTDBSink.close();
-        verify(pool).close();
-    }
+  @Test
+  public void close() throws Exception {
+    ioTDBSink.close();
+    verify(pool).close();
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 5292938..dde8bee 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -32,7 +32,6 @@ import 
org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.rescon.TVListAllocator;
-import org.apache.iotdb.db.utils.CommonUtils;
 import org.apache.iotdb.db.utils.MemUtils;
 import org.apache.iotdb.db.utils.datastructure.TVList;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -41,12 +40,9 @@ import 
org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 public abstract class AbstractMemTable implements IMemTable {
 
+  private final Map<String, Map<String, IWritableMemChunk>> memTableMap;
   private long version = Long.MAX_VALUE;
-
   private List<Modification> modifications = new ArrayList<>();
-
-  private final Map<String, Map<String, IWritableMemChunk>> memTableMap;
-
   private long memSize = 0;
 
   public AbstractMemTable() {
@@ -86,20 +82,14 @@ public abstract class AbstractMemTable implements IMemTable 
{
   protected abstract IWritableMemChunk genMemSeries(MeasurementSchema schema);
 
   @Override
-  public void insert(InsertPlan insertPlan) throws WriteProcessException {
-    try {
-      for (int i = 0; i < insertPlan.getValues().length; i++) {
-
-        Object value = 
CommonUtils.parseValue(insertPlan.getSchemas()[i].getType(),
-            insertPlan.getValues()[i]);
+  public void insert(InsertPlan insertPlan) {
+    for (int i = 0; i < insertPlan.getValues().length; i++) {
 
-        memSize += 
MemUtils.getRecordSize(insertPlan.getSchemas()[i].getType(), value);
+      Object value = insertPlan.getValues()[i];
+      memSize += MemUtils.getRecordSize(insertPlan.getSchemas()[i].getType(), 
value);
 
-        write(insertPlan.getDeviceId(), insertPlan.getMeasurements()[i],
-            insertPlan.getSchemas()[i], insertPlan.getTime(), value);
-      }
-    } catch (QueryProcessException e) {
-      throw new WriteProcessException(e.getMessage());
+      write(insertPlan.getDeviceId(), insertPlan.getMeasurements()[i],
+          insertPlan.getSchemas()[i], insertPlan.getTime(), value);
     }
   }
 
@@ -209,14 +199,14 @@ public abstract class AbstractMemTable implements 
IMemTable {
     this.modifications.add(deletion);
   }
 
-  public void setVersion(long version) {
-    this.version = version;
-  }
-
   public long getVersion() {
     return version;
   }
 
+  public void setVersion(long version) {
+    this.version = version;
+  }
+
   @Override
   public void release() {
     for (Entry<String, Map<String, IWritableMemChunk>> entry : 
memTableMap.entrySet()) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index b0f308e..9433bcb 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -57,8 +57,8 @@ import 
org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
 import org.apache.iotdb.db.writelog.node.WriteLogNode;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.utils.Pair;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java 
b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 63bae27..38ab58a 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -168,7 +168,7 @@ public class PlanExecutor implements IPlanExecutor {
   @Override
   public QueryDataSet processQuery(PhysicalPlan queryPlan, QueryContext 
context)
       throws IOException, StorageEngineException, 
QueryFilterOptimizationException,
-          QueryProcessException, MetadataException {
+      QueryProcessException, MetadataException {
     if (queryPlan instanceof QueryPlan) {
       return processDataQuery((QueryPlan) queryPlan, context);
     } else if (queryPlan instanceof AuthorPlan) {
@@ -255,7 +255,7 @@ public class PlanExecutor implements IPlanExecutor {
   }
 
   private void operateMerge(MergePlan plan) throws StorageEngineException {
-    if(plan.getOperatorType() == OperatorType.FULL_MERGE) {
+    if (plan.getOperatorType() == OperatorType.FULL_MERGE) {
       StorageEngine.getInstance().mergeAll(true);
     } else {
       StorageEngine.getInstance()
@@ -272,10 +272,10 @@ public class PlanExecutor implements IPlanExecutor {
   }
 
   private void operateFlush(FlushPlan plan) throws StorageGroupNotSetException 
{
-    if(plan.getPaths() == null) {
+    if (plan.getPaths() == null) {
       StorageEngine.getInstance().syncCloseAllProcessor();
     } else {
-      if(plan.isSeq() == null) {
+      if (plan.isSeq() == null) {
         for (Path storageGroup : plan.getPaths()) {
           
StorageEngine.getInstance().asyncCloseProcessor(storageGroup.toString(), true);
           
StorageEngine.getInstance().asyncCloseProcessor(storageGroup.toString(), false);
@@ -290,7 +290,7 @@ public class PlanExecutor implements IPlanExecutor {
 
   protected QueryDataSet processDataQuery(QueryPlan queryPlan, QueryContext 
context)
       throws StorageEngineException, QueryFilterOptimizationException, 
QueryProcessException,
-          IOException {
+      IOException {
     QueryDataSet queryDataSet;
     if (queryPlan instanceof AlignByDevicePlan) {
       queryDataSet = new AlignByDeviceDataSet((AlignByDevicePlan) queryPlan, 
context, queryRouter);
@@ -869,7 +869,6 @@ public class PlanExecutor implements IPlanExecutor {
       String[] measurementList = insertPlan.getMeasurements();
       String deviceId = insertPlan.getDeviceId();
       node = mManager.getDeviceNodeWithAutoCreateAndReadLock(deviceId);
-      String[] strValues = insertPlan.getValues();
       MeasurementSchema[] schemas = new 
MeasurementSchema[measurementList.length];
 
       for (int i = 0; i < measurementList.length; i++) {
@@ -878,7 +877,7 @@ public class PlanExecutor implements IPlanExecutor {
           if 
(!IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) {
             throw new PathNotExistException(deviceId + PATH_SEPARATOR + 
measurement);
           }
-          TSDataType dataType = 
TypeInferenceUtils.getPredictedDataType(strValues[i]);
+          TSDataType dataType = 
TypeInferenceUtils.getPredictedDataType(insertPlan.getValues()[i]);
           Path path = new Path(deviceId, measurement);
           internalCreateTimeseries(path.toString(), dataType);
         }
@@ -886,6 +885,10 @@ public class PlanExecutor implements IPlanExecutor {
         schemas[i] = measurementNode.getSchema();
         // reset measurement to common name instead of alias
         measurementList[i] = measurementNode.getName();
+
+        if(insertPlan.getStrValueList() == null) {
+          checkType(insertPlan, i, measurementNode.getSchema().getType());
+        }
       }
 
       insertPlan.setMeasurements(measurementList);
@@ -900,7 +903,53 @@ public class PlanExecutor implements IPlanExecutor {
     }
   }
 
-  /** create timeseries with ignore PathAlreadyExistException */
+  private void checkType(InsertPlan plan, int loc, TSDataType type) {
+    plan.getTypes()[loc] = type;
+    try {
+      switch (type) {
+        case INT32:
+          if (!(plan.getValues()[loc] instanceof Integer)) {
+            plan.getValues()[loc] =
+                Integer.parseInt(((Binary) 
plan.getValues()[loc]).getStringValue());
+          }
+          break;
+        case INT64:
+          if (!(plan.getValues()[loc] instanceof Long)) {
+            plan.getValues()[loc] =
+                Long.parseLong(((Binary) 
plan.getValues()[loc]).getStringValue());
+          }
+          break;
+        case DOUBLE:
+          if (!(plan.getValues()[loc] instanceof Double)) {
+            plan.getValues()[loc] =
+                Double.parseDouble(((Binary) 
plan.getValues()[loc]).getStringValue());
+          }
+          break;
+        case FLOAT:
+          if (!(plan.getValues()[loc] instanceof Float)) {
+            plan.getValues()[loc] =
+                Float.parseFloat(((Binary) 
plan.getValues()[loc]).getStringValue());
+          }
+          break;
+        case BOOLEAN:
+          if (!(plan.getValues()[loc] instanceof Boolean)) {
+            plan.getValues()[loc] =
+                Boolean.parseBoolean(((Binary) 
plan.getValues()[loc]).getStringValue());
+          }
+          break;
+        case TEXT:
+          // need to do nothing
+          break;
+      }
+    }
+    catch (ClassCastException e){
+      logger.error("inconsistent type between client and server");
+    }
+  }
+
+  /**
+   * create timeseries with ignore PathAlreadyExistException
+   */
   private void internalCreateTimeseries(String path, TSDataType dataType) 
throws MetadataException {
     try {
       mManager.createTimeseries(
@@ -919,7 +968,9 @@ public class PlanExecutor implements IPlanExecutor {
     }
   }
 
-  /** Get default encoding by dataType */
+  /**
+   * Get default encoding by dataType
+   */
   private TSEncoding getDefaultEncoding(TSDataType dataType) {
     IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
     switch (dataType) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java 
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
index 5f334af..8174fc2 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
@@ -25,16 +25,18 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
-
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.utils.CommonUtils;
 import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 import org.apache.iotdb.tsfile.write.record.TSRecord;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -44,21 +46,59 @@ public class InsertPlan extends PhysicalPlan {
   private long time;
   private String deviceId;
   private String[] measurements;
-  private String[] values;
+  private Object[] values;
+  private TSDataType[] types;
   private MeasurementSchema[] schemas;
 
+  public String[] getStrValueList() {
+    return strValueList;
+  }
+
+  public void setStrValueList(String[] strValueList) {
+    this.strValueList = strValueList;
+  }
+
+  // only for sql
+  private String[] strValueList;
+
   public InsertPlan() {
     super(false, OperatorType.INSERT);
     canbeSplit = false;
   }
 
   @TestOnly
-  public InsertPlan(String deviceId, long insertTime, String measurement, 
String insertValue) {
+  public InsertPlan(String deviceId, long insertTime, String[] measurements, 
TSDataType[] types,
+      String[] insertValues) {
     super(false, OperatorType.INSERT);
     this.time = insertTime;
     this.deviceId = deviceId;
-    this.measurements = new String[] {measurement};
-    this.values = new String[] {insertValue};
+    this.measurements = measurements;
+
+    this.types = types;
+    this.values = new Object[measurements.length];
+    for (int i = 0; i < measurements.length; i++) {
+      try {
+        values[i] = CommonUtils.parseValueForTest(types[i], insertValues[i]);
+      } catch (QueryProcessException e) {
+        e.printStackTrace();
+      }
+    }
+    canbeSplit = false;
+  }
+
+  @TestOnly
+  public InsertPlan(String deviceId, long insertTime, String measurement, 
TSDataType type, String insertValue) {
+    super(false, OperatorType.INSERT);
+    this.time = insertTime;
+    this.deviceId = deviceId;
+    this.measurements = new String[]{measurement};
+    this.types = new TSDataType[]{type};
+    this.values = new Object[1];
+    try {
+      values[0] = CommonUtils.parseValueForTest(types[0], insertValue);
+    } catch (QueryProcessException e) {
+      e.printStackTrace();
+    }
     canbeSplit = false;
   }
 
@@ -68,25 +108,43 @@ public class InsertPlan extends PhysicalPlan {
     this.time = tsRecord.time;
     this.measurements = new String[tsRecord.dataPointList.size()];
     this.schemas = new MeasurementSchema[tsRecord.dataPointList.size()];
-    this.values = new String[tsRecord.dataPointList.size()];
+    this.types = new TSDataType[tsRecord.dataPointList.size()];
+    this.values = new Object[tsRecord.dataPointList.size()];
     for (int i = 0; i < tsRecord.dataPointList.size(); i++) {
       measurements[i] = tsRecord.dataPointList.get(i).getMeasurementId();
-      schemas[i] = new MeasurementSchema(measurements[i], 
tsRecord.dataPointList.get(i).getType(), TSEncoding.PLAIN);
-      values[i] = tsRecord.dataPointList.get(i).getValue().toString();
+      schemas[i] = new MeasurementSchema(measurements[i], 
tsRecord.dataPointList.get(i).getType(),
+          TSEncoding.PLAIN);
+      types[i] = tsRecord.dataPointList.get(i).getType();
+      values[i] = tsRecord.dataPointList.get(i).getValue();
     }
     canbeSplit = false;
   }
 
+  public InsertPlan(String deviceId, long insertTime, String[] 
measurementList, TSDataType[] types,
+      Object[] insertValues) {
+    super(false, Operator.OperatorType.INSERT);
+    this.time = insertTime;
+    this.deviceId = deviceId;
+    this.measurements = measurementList;
+    this.types = types;
+    this.values = insertValues;
+    canbeSplit = false;
+  }
+
   public InsertPlan(String deviceId, long insertTime, String[] measurementList,
       String[] insertValues) {
     super(false, Operator.OperatorType.INSERT);
     this.time = insertTime;
     this.deviceId = deviceId;
     this.measurements = measurementList;
-    this.values = insertValues;
+    // build types and values
+    this.types = new TSDataType[measurements.length];
+    this.values = new Object[measurements.length];
+    this.strValueList = insertValues;
     canbeSplit = false;
   }
 
+
   public long getTime() {
     return time;
   }
@@ -101,6 +159,17 @@ public class InsertPlan extends PhysicalPlan {
 
   public void setSchemas(MeasurementSchema[] schemas) {
     this.schemas = schemas;
+    if (strValueList != null) {
+      for (int i = 0; i < schemas.length; i++) {
+        types[i] = schemas[i].getType();
+        try {
+          values[i] = CommonUtils.parseValue(types[i], strValueList[i]);
+        } catch (QueryProcessException e) {
+          e.printStackTrace();
+        }
+      }
+      strValueList = null;
+    }
   }
 
   @Override
@@ -129,11 +198,11 @@ public class InsertPlan extends PhysicalPlan {
     this.measurements = measurements;
   }
 
-  public String[] getValues() {
+  public Object[] getValues() {
     return this.values;
   }
 
-  public void setValues(String[] values) {
+  public void setValues(Object[] values) {
     this.values = values;
   }
 
@@ -174,8 +243,102 @@ public class InsertPlan extends PhysicalPlan {
       schema.serializeTo(stream);
     }
 
-    for (String m : values) {
-      putString(stream, m);
+    try {
+      putValues(stream);
+    } catch (QueryProcessException e) {
+      throw new IOException(e);
+    }
+  }
+
+  private void putValues(DataOutputStream outputStream) throws 
QueryProcessException, IOException {
+    for (int i = 0; i < values.length; i++) {
+      ReadWriteIOUtils.write(types[i], outputStream);
+      switch (types[i]) {
+        case BOOLEAN:
+          ReadWriteIOUtils.write((Boolean) values[i], outputStream);
+          break;
+        case INT32:
+          ReadWriteIOUtils.write((Integer) values[i], outputStream);
+          break;
+        case INT64:
+          ReadWriteIOUtils.write((Long) values[i], outputStream);
+          break;
+        case FLOAT:
+          ReadWriteIOUtils.write((Float) values[i], outputStream);
+          break;
+        case DOUBLE:
+          ReadWriteIOUtils.write((Double) values[i], outputStream);
+          break;
+        case TEXT:
+          ReadWriteIOUtils.write((Binary) values[i], outputStream);
+          break;
+        default:
+          throw new QueryProcessException("Unsupported data type:" + types[i]);
+      }
+    }
+  }
+
+  private void putValues(ByteBuffer buffer) throws QueryProcessException {
+    for (int i = 0; i < values.length; i++) {
+      ReadWriteIOUtils.write(types[i], buffer);
+      switch (types[i]) {
+        case BOOLEAN:
+          ReadWriteIOUtils.write((Boolean) values[i], buffer);
+          break;
+        case INT32:
+          ReadWriteIOUtils.write((Integer) values[i], buffer);
+          break;
+        case INT64:
+          ReadWriteIOUtils.write((Long) values[i], buffer);
+          break;
+        case FLOAT:
+          ReadWriteIOUtils.write((Float) values[i], buffer);
+          break;
+        case DOUBLE:
+          ReadWriteIOUtils.write((Double) values[i], buffer);
+          break;
+        case TEXT:
+          ReadWriteIOUtils.write((Binary) values[i], buffer);
+          break;
+        default:
+          throw new QueryProcessException("Unsupported data type:" + types[i]);
+      }
+    }
+  }
+
+  public TSDataType[] getTypes() {
+    return types;
+  }
+
+  public void setTypes(TSDataType[] types) {
+    this.types = types;
+  }
+
+  public void setValues(ByteBuffer buffer) throws QueryProcessException {
+    for (int i = 0; i < measurements.length; i++) {
+      types[i] = ReadWriteIOUtils.readDataType(buffer);
+      switch (types[i]) {
+        case BOOLEAN:
+          values[i] = ReadWriteIOUtils.readBool(buffer);
+          break;
+        case INT32:
+          values[i] = ReadWriteIOUtils.readInt(buffer);
+          break;
+        case INT64:
+          values[i] = ReadWriteIOUtils.readLong(buffer);
+          break;
+        case FLOAT:
+          values[i] = ReadWriteIOUtils.readFloat(buffer);
+          break;
+        case DOUBLE:
+          values[i] = ReadWriteIOUtils.readDouble(buffer);
+          break;
+        case TEXT:
+          values[i] = ReadWriteIOUtils.readBinary(buffer);
+          break;
+        default:
+          throw new QueryProcessException("Unsupported data type:" + types[i]);
+      }
     }
   }
 
@@ -193,8 +356,10 @@ public class InsertPlan extends PhysicalPlan {
       putString(buffer, m);
     }
 
-    for (String m : values) {
-      putString(buffer, m);
+    try {
+      putValues(buffer);
+    } catch (QueryProcessException e) {
+      e.printStackTrace();
     }
   }
 
@@ -210,9 +375,12 @@ public class InsertPlan extends PhysicalPlan {
       measurements[i] = readString(buffer);
     }
 
-    this.values = new String[measurementSize];
-    for (int i = 0; i < measurementSize; i++) {
-      values[i] = readString(buffer);
+    this.types = new TSDataType[measurementSize];
+    this.values = new Object[measurementSize];
+    try {
+      setValues(buffer);
+    } catch (QueryProcessException e) {
+      e.printStackTrace();
     }
   }
 
@@ -225,7 +393,8 @@ public class InsertPlan extends PhysicalPlan {
     if (measurementIndex >= values.length) {
       return null;
     }
-    Object value = CommonUtils.parseValue(schemas[measurementIndex].getType(), 
values[measurementIndex]);
-    return new TimeValuePair(time, 
TsPrimitiveType.getByType(schemas[measurementIndex].getType(), value));
+    Object value = values[measurementIndex];
+    return new TimeValuePair(time,
+        TsPrimitiveType.getByType(schemas[measurementIndex].getType(), value));
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java 
b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index a083856..5627616 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -178,6 +178,7 @@ public class PhysicalGenerator {
           throw new LogicalOperatorException(
               "For Insert command, cannot specified more than one seriesPath: 
" + paths);
         }
+
         return new InsertPlan(
             paths.get(0).getFullPath(),
             insert.getTime(),
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java 
b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index b10ad42..fef8785 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -136,13 +136,10 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
   private static final int DELETE_SIZE = 20;
   private static final String ERROR_PARSING_SQL =
       "meet error while parsing SQL to physical plan: {}";
-
-  private boolean enableMetric = 
IoTDBDescriptor.getInstance().getConfig().isEnableMetricService();
   private static final List<SqlArgument> sqlArgumentList = new 
ArrayList<>(MAX_SIZE);
-
   protected Planner processor;
   protected IPlanExecutor executor;
-
+  private boolean enableMetric = 
IoTDBDescriptor.getInstance().getConfig().isEnableMetricService();
   // Record the username for every rpc connection (session).
   private Map<Long, String> sessionIdUsernameMap = new ConcurrentHashMap<>();
   private Map<Long, ZoneId> sessionIdZoneIdMap = new ConcurrentHashMap<>();
@@ -170,6 +167,10 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
     executor = new PlanExecutor();
   }
 
+  public static List<SqlArgument> getSqlArgumentList() {
+    return sqlArgumentList;
+  }
+
   @Override
   public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException 
{
     logger.info(
@@ -1068,15 +1069,22 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
 
     InsertPlan plan = new InsertPlan();
     for (int i = 0; i < req.deviceIds.size(); i++) {
-      plan.setDeviceId(req.getDeviceIds().get(i));
-      plan.setTime(req.getTimestamps().get(i));
-      plan.setMeasurements(req.getMeasurementsList().get(i).toArray(new 
String[0]));
-      plan.setValues(req.getValuesList().get(i).toArray(new String[0]));
-      TSStatus status = checkAuthority(plan, req.getSessionId());
-      if (status != null) {
-        resp.addToStatusList(status);
-      } else {
-        resp.addToStatusList(executePlan(plan));
+      try {
+        plan.setDeviceId(req.getDeviceIds().get(i));
+        plan.setTime(req.getTimestamps().get(i));
+        plan.setMeasurements(req.getMeasurementsList().get(i).toArray(new 
String[0]));
+        plan.setTypes(new TSDataType[plan.getMeasurements().length]);
+        plan.setValues(new Object[plan.getMeasurements().length]);
+        plan.setValues(req.valuesList.get(i));
+        TSStatus status = checkAuthority(plan, req.getSessionId());
+        if (status != null) {
+          resp.addToStatusList(status);
+        } else {
+          resp.addToStatusList(executePlan(plan));
+        }
+      } catch (Exception e) {
+        logger.error("meet error when insert in batch", e);
+        
resp.addToStatusList(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
       }
     }
 
@@ -1113,7 +1121,9 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
       plan.setDeviceId(req.getDeviceId());
       plan.setTime(req.getTimestamp());
       plan.setMeasurements(req.getMeasurements().toArray(new String[0]));
-      plan.setValues(req.getValues().toArray(new String[0]));
+      plan.setTypes(new TSDataType[plan.getMeasurements().length]);
+      plan.setValues(new Object[plan.getMeasurements().length]);
+      plan.setValues(req.values);
 
       TSStatus status = checkAuthority(plan, req.getSessionId());
       if (status != null) {
@@ -1435,10 +1445,6 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
     }
   }
 
-  public static List<SqlArgument> getSqlArgumentList() {
-    return sqlArgumentList;
-  }
-
   private long generateQueryId(boolean isDataQuery) {
     return QueryResourceManager.getInstance().assignQueryId(isDataQuery);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java 
b/server/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
index 3f9e43a..c6575de 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
@@ -31,7 +31,8 @@ import org.apache.iotdb.tsfile.utils.Binary;
 
 public class CommonUtils {
 
-  private CommonUtils(){}
+  private CommonUtils() {
+  }
 
   /**
    * get JDK version.
@@ -72,7 +73,8 @@ public class CommonUtils {
       switch (dataType) {
         case BOOLEAN:
           value = value.toLowerCase();
-          if (SQLConstant.BOOLEAN_FALSE_NUM.equals(value) || 
SQLConstant.BOOLEN_FALSE.equals(value)) {
+          if (SQLConstant.BOOLEAN_FALSE_NUM.equals(value) || 
SQLConstant.BOOLEN_FALSE
+              .equals(value)) {
             return false;
           }
           if (SQLConstant.BOOLEAN_TRUE_NUM.equals(value) || 
SQLConstant.BOOLEN_TRUE.equals(value)) {
@@ -89,14 +91,48 @@ public class CommonUtils {
           return Double.parseDouble(value);
         case TEXT:
           if ((value.startsWith(SQLConstant.QUOTE) && 
value.endsWith(SQLConstant.QUOTE))
-                  || (value.startsWith(SQLConstant.DQUOTE) && 
value.endsWith(SQLConstant.DQUOTE))) {
+              || (value.startsWith(SQLConstant.DQUOTE) && 
value.endsWith(SQLConstant.DQUOTE))) {
             if (value.length() == 1) {
               return new Binary(value);
             } else {
               return new Binary(value.substring(1, value.length() - 1));
             }
           }
-          throw new QueryProcessException("The TEXT data type should be 
covered by \" or '");
+
+          return new Binary(value);
+        default:
+          throw new QueryProcessException("Unsupported data type:" + dataType);
+      }
+    } catch (NumberFormatException e) {
+      throw new QueryProcessException(e.getMessage());
+    }
+  }
+
+  @TestOnly
+  public static Object parseValueForTest(TSDataType dataType, String value)
+      throws QueryProcessException {
+    try {
+      switch (dataType) {
+        case BOOLEAN:
+          value = value.toLowerCase();
+          if (SQLConstant.BOOLEAN_FALSE_NUM.equals(value) || 
SQLConstant.BOOLEN_FALSE
+              .equals(value)) {
+            return false;
+          }
+          if (SQLConstant.BOOLEAN_TRUE_NUM.equals(value) || 
SQLConstant.BOOLEN_TRUE.equals(value)) {
+            return true;
+          }
+          throw new QueryProcessException("The BOOLEAN should be true/TRUE, 
false/FALSE or 0/1");
+        case INT32:
+          return Integer.parseInt(value);
+        case INT64:
+          return Long.parseLong(value);
+        case FLOAT:
+          return Float.parseFloat(value);
+        case DOUBLE:
+          return Double.parseDouble(value);
+        case TEXT:
+          return new Binary(value);
         default:
           throw new QueryProcessException("Unsupported data type:" + dataType);
       }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java 
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
index f627484..18598a4 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
@@ -20,6 +20,17 @@
 
 package org.apache.iotdb.db.engine.storagegroup;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.DirectoryManager;
@@ -57,12 +68,6 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.*;
-
-import static org.junit.Assert.*;
-
 public class TTLTest {
 
   private String sg1 = "root.TTL_SG1";
@@ -126,7 +131,8 @@ public class TTLTest {
     insertPlan.setDeviceId(sg1);
     insertPlan.setTime(System.currentTimeMillis());
     insertPlan.setMeasurements(new String[]{"s1"});
-    insertPlan.setValues(new String[]{"1"});
+    insertPlan.setTypes(new TSDataType[]{TSDataType.INT64});
+    insertPlan.setValues(new Object[]{1L});
     insertPlan.setSchemas(
         new MeasurementSchema[]{new MeasurementSchema("s1", TSDataType.INT64, 
TSEncoding.PLAIN)});
 
@@ -152,7 +158,8 @@ public class TTLTest {
     insertPlan.setDeviceId(sg1);
     insertPlan.setTime(System.currentTimeMillis());
     insertPlan.setMeasurements(new String[]{"s1"});
-    insertPlan.setValues(new String[]{"1"});
+    insertPlan.setTypes(new TSDataType[]{TSDataType.INT64});
+    insertPlan.setValues(new Object[]{1L});
     insertPlan.setSchemas(
         new MeasurementSchema[]{new MeasurementSchema("s1", TSDataType.INT64, 
TSEncoding.PLAIN)});
 
diff --git a/server/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java 
b/server/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java
index 1e7401a..2dc30bc 100644
--- a/server/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.constant.TestConstant;
 import org.apache.iotdb.db.exception.SystemCheckException;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.writelog.io.LogWriter;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.junit.Test;
 
 public class WalCheckerTest {
@@ -73,12 +74,13 @@ public class WalCheckerTest {
         LogWriter logWriter = new LogWriter(subDir.getPath() + File.separator
             + WAL_FILE_NAME);
 
-        ByteBuffer binaryPlans = ByteBuffer.allocate(64*1024);
+        ByteBuffer binaryPlans = ByteBuffer.allocate(64 * 1024);
         String deviceId = "device1";
         String[] measurements = new String[]{"s1", "s2", "s3"};
+        TSDataType[] types = new TSDataType[]{TSDataType.INT64, 
TSDataType.INT64, TSDataType.INT64};
         String[] values = new String[]{"5", "6", "7"};
         for (int j = 0; j < 10; j++) {
-          new InsertPlan(deviceId, j, measurements, 
values).serializeTo(binaryPlans);
+          new InsertPlan(deviceId, j, measurements, types, 
values).serializeTo(binaryPlans);
         }
         binaryPlans.flip();
         logWriter.write(binaryPlans);
@@ -106,12 +108,13 @@ public class WalCheckerTest {
         LogWriter logWriter = new LogWriter(subDir.getPath() + File.separator
             + WAL_FILE_NAME);
 
-        ByteBuffer binaryPlans = ByteBuffer.allocate(64*1024);
+        ByteBuffer binaryPlans = ByteBuffer.allocate(64 * 1024);
         String deviceId = "device1";
         String[] measurements = new String[]{"s1", "s2", "s3"};
+        TSDataType[] types = new TSDataType[]{TSDataType.INT64, 
TSDataType.INT64, TSDataType.INT64};
         String[] values = new String[]{"5", "6", "7"};
         for (int j = 0; j < 10; j++) {
-          new InsertPlan(deviceId, j, measurements, 
values).serializeTo(binaryPlans);
+          new InsertPlan(deviceId, j, measurements, types, 
values).serializeTo(binaryPlans);
         }
         if (i > 2) {
           binaryPlans.put("not a wal".getBytes());
diff --git 
a/server/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java 
b/server/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java
index b118f05..269f93c 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java
@@ -88,6 +88,7 @@ public class PerformanceTest {
         for (int i = 0; i < 1000000; i++) {
           InsertPlan bwInsertPlan = new InsertPlan("logTestDevice", 100,
               new String[]{"s1", "s2", "s3", "s4"},
+              new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, 
TSDataType.TEXT, TSDataType.BOOLEAN},
               new String[]{"1.0", "15", "str", "false"});
           UpdatePlan updatePlan = new UpdatePlan(0, 100, "2.0",
               new Path("root.logTestDevice.s1"));
@@ -147,7 +148,9 @@ public class PerformanceTest {
 
     for (int i = 0; i < 1000000; i++) {
       InsertPlan bwInsertPlan = new InsertPlan("root.logTestDevice", 100,
-          new String[]{"s1", "s2", "s3", "s4"}, new String[]{"1.0", "15", 
"str", "false"});
+          new String[]{"s1", "s2", "s3", "s4"},
+          new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, 
TSDataType.TEXT, TSDataType.BOOLEAN},
+          new String[]{"1.0", "15", "str", "false"});
       UpdatePlan updatePlan = new UpdatePlan(0, 100, "2.0",
           new Path("root.logTestDevice.s1"));
       DeletePlan deletePlan = new DeletePlan(50, new 
Path("root.logTestDevice.s1"));
diff --git 
a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeManagerTest.java
 
b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeManagerTest.java
index 4aa8297..7125d88 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeManagerTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeManagerTest.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
 import org.apache.iotdb.db.writelog.manager.WriteLogNodeManager;
 import org.apache.iotdb.db.writelog.node.WriteLogNode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.junit.After;
 import org.junit.Before;
@@ -86,6 +87,7 @@ public class WriteLogNodeManagerTest {
 
     InsertPlan bwInsertPlan = new InsertPlan("logTestDevice", 100,
         new String[]{"s1", "s2", "s3", "s4"},
+        new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT, 
TSDataType.BOOLEAN},
         new String[]{"1.0", "15", "str", "false"});
     DeletePlan deletePlan = new DeletePlan(50, new 
Path("root.logTestDevice.s1"));
 
diff --git 
a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java 
b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
index 63a5931..953a76e 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.db.writelog.io.ILogReader;
 import org.apache.iotdb.db.writelog.node.ExclusiveWriteLogNode;
 import org.apache.iotdb.db.writelog.node.WriteLogNode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.junit.After;
 import org.junit.Before;
@@ -66,6 +67,7 @@ public class WriteLogNodeTest {
 
     InsertPlan bwInsertPlan = new InsertPlan(identifier, 100,
         new String[]{"s1", "s2", "s3", "s4"},
+        new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT, 
TSDataType.BOOLEAN},
         new String[]{"1.0", "15", "str", "false"});
     DeletePlan deletePlan = new DeletePlan(50, new Path(identifier + ".s1"));
 
@@ -96,6 +98,7 @@ public class WriteLogNodeTest {
 
     InsertPlan bwInsertPlan = new InsertPlan(identifier, 100,
         new String[]{"s1", "s2", "s3", "s4"},
+        new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT, 
TSDataType.BOOLEAN},
         new String[]{"1.0", "15", "str", "false"});
     DeletePlan deletePlan = new DeletePlan(50, new Path(identifier + ".s1"));
 
@@ -132,6 +135,7 @@ public class WriteLogNodeTest {
 
     InsertPlan bwInsertPlan = new InsertPlan("root.logTestDevice", 100,
         new String[]{"s1", "s2", "s3", "s4"},
+        new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT, 
TSDataType.BOOLEAN},
         new String[]{"1.0", "15", "str", "false"});
     DeletePlan deletePlan = new DeletePlan(50, new 
Path("root.logTestDevice.s1"));
 
@@ -157,6 +161,7 @@ public class WriteLogNodeTest {
 
     InsertPlan bwInsertPlan = new InsertPlan("logTestDevice", 100,
         new String[]{"s1", "s2", "s3", "s4"},
+        new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT, 
TSDataType.BOOLEAN},
         new String[]{"1.0", "15", "str", "false"});
     DeletePlan deletePlan = new DeletePlan(50, new 
Path("root.logTestDevice.s1"));
 
@@ -181,6 +186,7 @@ public class WriteLogNodeTest {
 
     InsertPlan bwInsertPlan = new InsertPlan("root.logTestDevice.oversize", 
100,
         new String[]{"s1", "s2", "s3", "s4"},
+        new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT, 
TSDataType.BOOLEAN},
         new String[]{"1.0", "15", new String(new char[65 * 1024 * 1024]), 
"false"});
 
     boolean caught = false;
diff --git 
a/server/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java 
b/server/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java
index 6dea9ac..b2e087e 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java
@@ -29,6 +29,7 @@ import java.util.List;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.junit.Before;
 import org.junit.Test;
@@ -36,7 +37,7 @@ import org.junit.Test;
 public class LogWriterReaderTest {
 
   private static String filePath = "logtest.test";
-  ByteBuffer logsBuffer = ByteBuffer.allocate(64*1024);
+  ByteBuffer logsBuffer = ByteBuffer.allocate(64 * 1024);
   List<PhysicalPlan> plans = new ArrayList<>();
 
   @Before
@@ -45,8 +46,10 @@ public class LogWriterReaderTest {
       new File(filePath).delete();
     }
     InsertPlan insertPlan1 = new InsertPlan("d1", 10L, new String[]{"s1", 
"s2"},
+        new TSDataType[]{TSDataType.INT64, TSDataType.INT64},
         new String[]{"1", "2"});
     InsertPlan insertPlan2 = new InsertPlan("d1", 10L, new String[]{"s1", 
"s2"},
+        new TSDataType[]{TSDataType.INT64, TSDataType.INT64},
         new String[]{"1", "2"});
     DeletePlan deletePlan = new DeletePlan(10L, new Path("root.d1.s1"));
     plans.add(insertPlan1);
diff --git 
a/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
 
b/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
index fc1005c..37e263b 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
@@ -36,10 +36,10 @@ import 
org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.StorageGroupProcessorException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.MManager;
-import org.apache.iotdb.db.exception.StorageGroupProcessorException;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -48,10 +48,9 @@ import org.apache.iotdb.db.writelog.node.WriteLogNode;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.reader.IPointReader;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -105,10 +104,13 @@ public class LogReplayerTest {
 
       WriteLogNode node =
           MultiFileLogNodeManager.getInstance().getNode(logNodePrefix + 
tsFile.getName());
-      node.write(new InsertPlan("root.sg.device0", 100, "sensor0", 
String.valueOf(0)));
-      node.write(new InsertPlan("root.sg.device0", 2, "sensor1", 
String.valueOf(0)));
+      node.write(
+          new InsertPlan("root.sg.device0", 100, "sensor0", TSDataType.INT64, 
String.valueOf(0)));
+      node.write(
+          new InsertPlan("root.sg.device0", 2, "sensor1", TSDataType.INT64, 
String.valueOf(0)));
       for (int i = 1; i < 5; i++) {
-        node.write(new InsertPlan("root.sg.device" + i, i, "sensor" + i, 
String.valueOf(i)));
+        node.write(new InsertPlan("root.sg.device" + i, i, "sensor" + i, 
TSDataType.INT64,
+            String.valueOf(i)));
       }
       DeletePlan deletePlan = new DeletePlan(200, new Path("root.sg.device0", 
"sensor0"));
       node.write(deletePlan);
@@ -117,8 +119,9 @@ public class LogReplayerTest {
       replayer.replayLogs();
 
       for (int i = 0; i < 5; i++) {
-        ReadOnlyMemChunk memChunk = memTable.query("root.sg.device" + i, 
"sensor" + i, TSDataType.INT64,
-            TSEncoding.RLE, Collections.emptyMap(), Long.MIN_VALUE);
+        ReadOnlyMemChunk memChunk = memTable
+            .query("root.sg.device" + i, "sensor" + i, TSDataType.INT64,
+                TSEncoding.RLE, Collections.emptyMap(), Long.MIN_VALUE);
         IPointReader iterator = memChunk.getPointReader();
         if (i == 0) {
           assertFalse(iterator.hasNextTimeValuePair());
diff --git 
a/server/src/test/java/org/apache/iotdb/db/writelog/recover/RecoverResourceFromReaderTest.java
 
b/server/src/test/java/org/apache/iotdb/db/writelog/recover/RecoverResourceFromReaderTest.java
index a881e01..3c1435b 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/writelog/recover/RecoverResourceFromReaderTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/writelog/recover/RecoverResourceFromReaderTest.java
@@ -88,7 +88,8 @@ public class RecoverResourceFromReaderTest {
     for (int i = 0; i < 10; i++) {
       for (int j = 0; j < 10; j++) {
         Path path = new Path(("root.sg.device" + i), ("sensor" + j));
-        MeasurementSchema measurementSchema = new MeasurementSchema("sensor" + 
j, TSDataType.INT64, TSEncoding.PLAIN);
+        MeasurementSchema measurementSchema = new MeasurementSchema("sensor" + 
j, TSDataType.INT64,
+            TSEncoding.PLAIN);
         schema.registerTimeseries(path, measurementSchema);
         MManager.getInstance().createTimeseries(path.getFullPath(), 
measurementSchema.getType(),
             measurementSchema.getEncodingType(), 
measurementSchema.getCompressor(),
@@ -138,18 +139,23 @@ public class RecoverResourceFromReaderTest {
       for (int j = 0; j < 10; j++) {
         String[] measurements = new String[10];
         String[] values = new String[10];
+        TSDataType[] types = new TSDataType[10];
         for (int k = 0; k < 10; k++) {
           measurements[k] = "sensor" + k;
+          types[k] = TSDataType.INT64;
           values[k] = String.valueOf(k + 10);
         }
-        InsertPlan insertPlan = new InsertPlan("root.sg.device" + j, i, 
measurements, values);
+        InsertPlan insertPlan = new InsertPlan("root.sg.device" + j, i, 
measurements,
+            types, values);
         node.write(insertPlan);
       }
       node.notifyStartFlush();
     }
-    InsertPlan insertPlan = new InsertPlan("root.sg.device99", 1, "sensor4", 
"4");
+    InsertPlan insertPlan = new InsertPlan("root.sg.device99", 1, new 
String[]{"sensor4"},
+        new TSDataType[]{TSDataType.INT64}, new String[]{"4"});
     node.write(insertPlan);
-    insertPlan = new InsertPlan("root.sg.device99", 300, "sensor2", "2");
+    insertPlan = new InsertPlan("root.sg.device99", 300, new 
String[]{"sensor2"},
+        new TSDataType[]{TSDataType.INT64}, new String[]{"2"});
     node.write(insertPlan);
     node.close();
 
@@ -174,10 +180,11 @@ public class RecoverResourceFromReaderTest {
         .getBufferedOutputStream(resourceFile.getPath())) {
       ReadWriteIOUtils.write(123, outputStream);
     }
-    
+
     TsFileRecoverPerformer performer = new 
TsFileRecoverPerformer(logNodePrefix,
         versionController, resource, true, false);
-    
ActiveTimeSeriesCounter.getInstance().init(resource.getFile().getParentFile().getParentFile().getName());
+    ActiveTimeSeriesCounter.getInstance()
+        .init(resource.getFile().getParentFile().getParentFile().getName());
     performer.recover();
     assertEquals(1, (long) resource.getStartTimeMap().get("root.sg.device99"));
     assertEquals(300, (long) resource.getEndTimeMap().get("root.sg.device99"));
diff --git 
a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
 
b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
index 5c51d9a..ac5ae05 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
@@ -142,17 +142,20 @@ public class SeqTsFileRecoverTest {
     for (int i = 10; i < 20; i++) {
       for (int j = 0; j < 10; j++) {
         String[] measurements = new String[10];
+        TSDataType[] types = new TSDataType[10];
         String[] values = new String[10];
         for (int k = 0; k < 10; k++) {
           measurements[k] = "sensor" + k;
+          types[k] = TSDataType.INT64;
           values[k] = String.valueOf(k);
         }
-        InsertPlan insertPlan = new InsertPlan("root.sg.device" + j, i, 
measurements, values);
+        InsertPlan insertPlan = new InsertPlan("root.sg.device" + j, i, 
measurements, types,
+            values);
         node.write(insertPlan);
       }
       node.notifyStartFlush();
     }
-    
+
     resource = new TsFileResource(tsF);
   }
 
diff --git 
a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
 
b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
index b285dd0..016fb60 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
@@ -19,6 +19,10 @@
 
 package org.apache.iotdb.db.writelog.recover;
 
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
 import java.util.Collections;
 import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter;
@@ -53,17 +57,12 @@ import 
org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
 import org.apache.iotdb.tsfile.write.TsFileWriter;
 import org.apache.iotdb.tsfile.write.record.TSRecord;
 import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
-import org.apache.iotdb.tsfile.write.schema.Schema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.Schema;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.File;
-import java.io.IOException;
-
-import static org.junit.Assert.assertEquals;
-
 public class UnseqTsFileRecoverTest {
 
   private File tsF;
@@ -96,7 +95,8 @@ public class UnseqTsFileRecoverTest {
     for (int i = 0; i < 10; i++) {
       for (int j = 0; j < 10; j++) {
         Path path = new Path(("root.sg.device" + i), ("sensor" + j));
-        MeasurementSchema measurementSchema = new MeasurementSchema("sensor" + 
j, TSDataType.INT64, TSEncoding.PLAIN);
+        MeasurementSchema measurementSchema = new MeasurementSchema("sensor" + 
j, TSDataType.INT64,
+            TSEncoding.PLAIN);
         schema.registerTimeseries(path, measurementSchema);
         MManager.getInstance().createTimeseries(path.getFullPath(), 
measurementSchema.getType(),
             measurementSchema.getEncodingType(), 
measurementSchema.getCompressor(),
@@ -145,19 +145,22 @@ public class UnseqTsFileRecoverTest {
     for (int i = 0; i < 10; i++) {
       for (int j = 0; j < 10; j++) {
         String[] measurements = new String[10];
+        TSDataType[] types = new TSDataType[10];
         String[] values = new String[10];
         for (int k = 0; k < 10; k++) {
           measurements[k] = "sensor" + k;
+          types[k] = TSDataType.INT64;
           values[k] = String.valueOf(k + 10);
         }
-        InsertPlan insertPlan = new InsertPlan("root.sg.device" + j, i, 
measurements, values);
+        InsertPlan insertPlan = new InsertPlan("root.sg.device" + j, i, 
measurements, types,
+            values);
         node.write(insertPlan);
       }
       node.notifyStartFlush();
     }
-    InsertPlan insertPlan = new InsertPlan("root.sg.device99", 1, "sensor4", 
"4");
+    InsertPlan insertPlan = new InsertPlan("root.sg.device99", 1, "sensor4", 
TSDataType.INT64, "4");
     node.write(insertPlan);
-    insertPlan = new InsertPlan("root.sg.device99", 300, "sensor2", "2");
+    insertPlan = new InsertPlan("root.sg.device99", 300, "sensor2", 
TSDataType.INT64, "2");
     node.write(insertPlan);
     node.close();
 
@@ -176,7 +179,8 @@ public class UnseqTsFileRecoverTest {
   public void test() throws StorageGroupProcessorException, IOException {
     TsFileRecoverPerformer performer = new 
TsFileRecoverPerformer(logNodePrefix,
         versionController, resource, true, false);
-    
ActiveTimeSeriesCounter.getInstance().init(resource.getFile().getParentFile().getParentFile().getName());
+    ActiveTimeSeriesCounter.getInstance()
+        .init(resource.getFile().getParentFile().getParentFile().getName());
     performer.recover();
 
     assertEquals(1, (long) resource.getStartTimeMap().get("root.sg.device99"));
diff --git a/service-rpc/rpc-changelist.md b/service-rpc/rpc-changelist.md
index b2e528f..5814ac2 100644
--- a/service-rpc/rpc-changelist.md
+++ b/service-rpc/rpc-changelist.md
@@ -22,7 +22,7 @@
 
 # 0.10.0 (version-1) -> version-2
 
-Last Updated on 2020-4-24 by Jialin Qiao.
+Last Updated on 2020-5-25 by Kaifeng Xue.
 
 
 ## 1. Delete Old
@@ -50,6 +50,8 @@ Last Updated on 2020-4-24 by Jialin Qiao.
 | Rename TSStatusType to TSStatus   | Jialin Qiao   |
 | Remove sessionId in TSExecuteBatchStatementResp   | Jialin Qiao   |
 | Rename insertRows to insertReords, insert to insertRecord, insertBatch to 
insertTablet   | Jialin Qiao   |
+| Use TsDataType and binary rather than string in TSInsertInBatchReq and 
TSInsertReq  | Kaifeng Xue  |
+
 
 
 # 0.8.0 (version-0) -> version-1
@@ -121,3 +123,4 @@ Last Updated on 2019-10-27 by Lei Rui.
 | Add required i64 statementId in TSExecuteStatementReq        | Yuan Tian |
 | Add required binary time, required list<binary> valueList, required 
list<binary> bitmapList and remove required binary values, required i32 
rowCount in TSQueryDataSet| Yuan Tian |
 | Add optional i32 fetchSize in TSExecuteStatementReq,<br />Add optional 
TSQueryDataSet in TSExecuteStatementResp| liutaohua |
+| Add optional map<string, string> props, optional map<string, string> tags, 
optional map<string, string> attributes and optional string aliasPath in 
TSCreateTimeseriesReq | Yuan Tian | 
diff --git a/service-rpc/src/main/thrift/rpc.thrift 
b/service-rpc/src/main/thrift/rpc.thrift
index 7c8e026..876dac4 100644
--- a/service-rpc/src/main/thrift/rpc.thrift
+++ b/service-rpc/src/main/thrift/rpc.thrift
@@ -169,7 +169,7 @@ struct TSInsertRecordReq {
     1: required i64 sessionId
     2: required string deviceId
     3: required list<string> measurements
-    4: required list<string> values
+    4: required binary values
     5: required i64 timestamp
 }
 
@@ -197,7 +197,7 @@ struct TSInsertRecordsReq {
     1: required i64 sessionId
     2: required list<string> deviceIds
     3: required list<list<string>> measurementsList
-    4: required list<list<string>> valuesList
+    4: required list<binary> valuesList
     5: required list<i64> timestamps
 }
 
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java 
b/session/src/main/java/org/apache/iotdb/session/Session.java
index b574c9f..244b602 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.session;
 
+import java.nio.ByteBuffer;
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -45,11 +46,13 @@ import 
org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
 import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
 import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.thrift.TException;
@@ -186,57 +189,28 @@ public class Session {
   }
 
   /**
-   * insert data in one row, if you want to improve your performance, please 
use insertRecords method
-   * or insertTablet method
+   * insert data in one row, if you want to improve your performance, please 
use insertRecords
+   * method or insertTablet method
    *
-   * @see Session#insertRecords(List, List, List, List)
+   * @see Session#insertRecords(List, List, List, List, List)
    * @see Session#insertTablet(Tablet)
    */
   public void insertRecord(String deviceId, long time, List<String> 
measurements,
+      List<TSDataType> types,
       Object... values) throws IoTDBConnectionException, 
StatementExecutionException {
-    List<String> stringValues = new ArrayList<>();
-    for (Object o : values) {
-      stringValues.add(o.toString());
-    }
+    List<Object> valuesList = new ArrayList<>(Arrays.asList(values));
 
-    insertRecord(deviceId, time, measurements, stringValues);
-  }
-
-  /**
-   * insert data in one row, if you want to improve your performance, please 
use insertRecords method
-   * or insertTablet method
-   *
-   * @see Session#insertRecords(List, List, List, List)
-   * @see Session#insertTablet(Tablet)
-   */
-  public void insertRecord(String deviceId, long time, List<String> 
measurements,
-      List<String> values) throws IoTDBConnectionException, 
StatementExecutionException {
-    TSInsertRecordReq request = new TSInsertRecordReq();
-    request.setSessionId(sessionId);
-    request.setDeviceId(deviceId);
-    request.setTimestamp(time);
-    request.setMeasurements(measurements);
-    request.setValues(values);
-
-    try {
-      RpcUtils.verifySuccess(client.insertRecord(request));
-    } catch (TException e) {
-      throw new IoTDBConnectionException(e);
-    }
+    insertRecord(deviceId, time, measurements, types, valuesList);
   }
 
 
   /**
    * insert the data of a device. For each timestamp, the number of 
measurements is the same.
-   *
-   *  a Tablet example:
-   *
-   *        device1
-   *     time s1, s2, s3
-   *     1,   1,  1,  1
-   *     2,   2,  2,  2
-   *     3,   3,  3,  3
-   *
+   * <p>
+   * a Tablet example:
+   * <p>
+   * device1 time s1, s2, s3 1,   1,  1,  1 2,   2,  2,  2 3,   3,  3,  3
+   * <p>
    * times in Tablet may be not in ascending order
    *
    * @param tablet data batch
@@ -281,9 +255,9 @@ public class Session {
   }
 
   /**
-   * insert the data of several deivces.
-   * Given a deivce, for each timestamp, the number of measurements is the 
same.
-   *
+   * insert the data of several deivces. Given a deivce, for each timestamp, 
the number of
+   * measurements is the same.
+   * <p>
    * Times in each Tablet may not be in ascending order
    *
    * @param tablets data batch in multiple device
@@ -294,11 +268,11 @@ public class Session {
   }
 
   /**
-   * insert the data of several devices.
-   * Given a device, for each timestamp, the number of measurements is the 
same.
+   * insert the data of several devices. Given a device, for each timestamp, 
the number of
+   * measurements is the same.
    *
    * @param tablets data batch in multiple device
-   * @param sorted whether times in each Tablet are in ascending order
+   * @param sorted  whether times in each Tablet are in ascending order
    */
   public void insertTablets(Map<String, Tablet> tablets, boolean sorted)
       throws IoTDBConnectionException, BatchExecutionException {
@@ -337,10 +311,51 @@ public class Session {
   }
 
   /**
-   * Insert multiple rows, which can reduce the overhead of network. This 
method is just like
-   * jdbc executeBatch, we pack some insert request in batch and send them to 
server.
-   * If you want improve your performance, please see insertTablet method
+   * Insert multiple rows, which can reduce the overhead of network. This 
method is just like jdbc
+   * executeBatch, we pack some insert request in batch and send them to 
server. If you want improve
+   * your performance, please see insertTablet method
+   * <p>
+   * Each row is independent, which could have different deviceId, time, 
number of measurements
    *
+   * @see Session#insertTablet(Tablet)
+   */
+  public void insertRecords(List<String> deviceIds, List<Long> times,
+      List<List<String>> measurementsList, List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList)
+      throws IoTDBConnectionException, BatchExecutionException {
+    // check params size
+    int len = deviceIds.size();
+    if (len != times.size() || len != measurementsList.size() || len != 
valuesList.size()) {
+      throw new IllegalArgumentException(
+          "deviceIds, times, measurementsList and valuesList's size should be 
equal");
+    }
+
+    TSInsertRecordsReq request = new TSInsertRecordsReq();
+    request.setSessionId(sessionId);
+    request.setDeviceIds(deviceIds);
+    request.setTimestamps(times);
+    request.setMeasurementsList(measurementsList);
+    List<ByteBuffer> buffersList = new ArrayList<>();
+    for (int i = 0; i < measurementsList.size(); i++) {
+      ByteBuffer buffer = 
ByteBuffer.allocate(calculateLength(typesList.get(i), valuesList.get(i)));
+      putValues(typesList.get(i), valuesList.get(i), buffer);
+      buffer.flip();
+      buffersList.add(buffer);
+    }
+    request.setValuesList(buffersList);
+
+    try {
+      RpcUtils.verifySuccess(client.insertRecords(request).statusList);
+    } catch (TException e) {
+      throw new IoTDBConnectionException(e);
+    }
+  }
+
+  /**
+   * Insert multiple rows, which can reduce the overhead of network. This 
method is just like jdbc
+   * executeBatch, we pack some insert request in batch and send them to 
server. If you want improve
+   * your performance, please see insertTablet method
+   * <p>
    * Each row is independent, which could have different deviceId, time, 
number of measurements
    *
    * @see Session#insertTablet(Tablet)
@@ -360,7 +375,14 @@ public class Session {
     request.setDeviceIds(deviceIds);
     request.setTimestamps(times);
     request.setMeasurementsList(measurementsList);
-    request.setValuesList(valuesList);
+    List<ByteBuffer> buffersList = new ArrayList<>();
+    for (int i = 0; i < measurementsList.size(); i++) {
+      ByteBuffer buffer = 
ByteBuffer.allocate(calculateStrLength(valuesList.get(i)));
+      putStrValues(valuesList.get(i), buffer);
+      buffer.flip();
+      buffersList.add(buffer);
+    }
+    request.setValuesList(buffersList);
 
     try {
       RpcUtils.verifySuccess(client.insertRecords(request).statusList);
@@ -369,26 +391,157 @@ public class Session {
     }
   }
 
+
   /**
-   * This method NOT insert data into database and the server just return 
after accept the request,
-   * this method should be used to test other time cost in client
+   * insert data in one row, if you want improve your performance, please use 
insertInBatch method
+   * or insertBatch method
+   *
+   * @see Session#insertRecords(List, List, List, List, List)
+   * @see Session#insertTablet(Tablet)
    */
-  public void testInsertRecord(String deviceId, long time, List<String> 
measurements,
+  public void insertRecord(String deviceId, long time, List<String> 
measurements,
+      List<TSDataType> types,
+      List<Object> values) throws IoTDBConnectionException, 
StatementExecutionException {
+    TSInsertRecordReq request = new TSInsertRecordReq();
+    request.setSessionId(sessionId);
+    request.setDeviceId(deviceId);
+    request.setTimestamp(time);
+    request.setMeasurements(measurements);
+    ByteBuffer buffer = ByteBuffer.allocate(calculateLength(types, values));
+    putValues(types, values, buffer);
+    buffer.flip();
+    request.setValues(buffer);
+
+    try {
+      RpcUtils.verifySuccess(client.insertRecord(request));
+    } catch (TException e) {
+      throw new IoTDBConnectionException(e);
+    }
+  }
+
+  /**
+   * insert data in one row, if you want improve your performance, please use 
insertInBatch method
+   * or insertBatch method
+   *
+   * @see Session#insertRecords(List, List, List, List, List)
+   * @see Session#insertTablet(Tablet)
+   */
+  public void insertRecord(String deviceId, long time, List<String> 
measurements,
       List<String> values) throws IoTDBConnectionException, 
StatementExecutionException {
     TSInsertRecordReq request = new TSInsertRecordReq();
     request.setSessionId(sessionId);
     request.setDeviceId(deviceId);
     request.setTimestamp(time);
     request.setMeasurements(measurements);
-    request.setValues(values);
+    ByteBuffer buffer = ByteBuffer.allocate(calculateStrLength(values));
+    putStrValues(values, buffer);
+    buffer.flip();
+    request.setValues(buffer);
 
     try {
-      RpcUtils.verifySuccess(client.testInsertRecord(request));
+      RpcUtils.verifySuccess(client.insertRecord(request));
     } catch (TException e) {
       throw new IoTDBConnectionException(e);
     }
   }
 
+  private void putStrValues(List<String> values, ByteBuffer buffer)
+      throws IoTDBConnectionException {
+    for (int i = 0; i < values.size(); i++) {
+      ReadWriteIOUtils.write(TSDataType.TEXT, buffer);
+      byte[] bytes = ((String) 
values.get(i)).getBytes(TSFileConfig.STRING_CHARSET);
+      ReadWriteIOUtils.write(bytes.length, buffer);
+      buffer.put(bytes);
+    }
+  }
+
+
+  /**
+   * put value in buffer
+   *
+   * @param types  types list
+   * @param values values list
+   * @param buffer buffer to insert
+   * @throws IoTDBConnectionException
+   */
+  private void putValues(List<TSDataType> types, List<Object> values, 
ByteBuffer buffer)
+      throws IoTDBConnectionException {
+    for (int i = 0; i < values.size(); i++) {
+      ReadWriteIOUtils.write(types.get(i), buffer);
+      switch (types.get(i)) {
+        case BOOLEAN:
+          ReadWriteIOUtils.write((Boolean) values.get(i), buffer);
+          break;
+        case INT32:
+          ReadWriteIOUtils.write((Integer) values.get(i), buffer);
+          break;
+        case INT64:
+          ReadWriteIOUtils.write((Long) values.get(i), buffer);
+          break;
+        case FLOAT:
+          ReadWriteIOUtils.write((Float) values.get(i), buffer);
+          break;
+        case DOUBLE:
+          ReadWriteIOUtils.write((Double) values.get(i), buffer);
+          break;
+        case TEXT:
+          byte[] bytes = ((String) 
values.get(i)).getBytes(TSFileConfig.STRING_CHARSET);
+          ReadWriteIOUtils.write(bytes.length, buffer);
+          buffer.put(bytes);
+          break;
+        default:
+          throw new IoTDBConnectionException("Unsupported data type:" + 
types.get(i));
+      }
+    }
+  }
+
+  private int calculateStrLength(List<String> values) {
+    int res = 0;
+
+    for (int i = 0; i < values.size(); i++) {
+      // types
+      res += Short.BYTES;
+      res += Integer.BYTES;
+      res += values.get(i).getBytes(TSFileConfig.STRING_CHARSET).length;
+    }
+
+    return res;
+  }
+
+  private int calculateLength(List<TSDataType> types, List<Object> values)
+      throws IoTDBConnectionException {
+    int res = 0;
+    for (int i = 0; i < types.size(); i++) {
+      // types
+      res += Short.BYTES;
+      switch (types.get(i)) {
+        case BOOLEAN:
+          res += 1;
+          break;
+        case INT32:
+          res += Integer.BYTES;
+          break;
+        case INT64:
+          res += Long.BYTES;
+          break;
+        case FLOAT:
+          res += Float.BYTES;
+          break;
+        case DOUBLE:
+          res += Double.BYTES;
+          break;
+        case TEXT:
+          res += Integer.BYTES;
+          res += ((String) 
values.get(i)).getBytes(TSFileConfig.STRING_CHARSET).length;
+          break;
+        default:
+          throw new IoTDBConnectionException("Unsupported data type:" + 
types.get(i));
+      }
+    }
+
+    return res;
+  }
+
   /**
    * This method NOT insert data into database and the server just return 
after accept the request,
    * this method should be used to test other time cost in client
@@ -432,7 +585,7 @@ public class Session {
     request.setDeviceIds(deviceIds);
     request.setTimestamps(times);
     request.setMeasurementsList(measurementsList);
-    request.setValuesList(valuesList);
+    request.setValuesList(new ArrayList<>());
 
     try {
       RpcUtils.verifySuccess(client.testInsertRecords(request).statusList);
@@ -442,6 +595,26 @@ public class Session {
   }
 
   /**
+   * This method NOT insert data into database and the server just return 
after accept the request,
+   * this method should be used to test other time cost in client
+   */
+  public void testInsertRecord(String deviceId, long time, List<String> 
measurements,
+      List<String> values) throws IoTDBConnectionException, 
StatementExecutionException {
+    TSInsertRecordReq request = new TSInsertRecordReq();
+    request.setSessionId(sessionId);
+    request.setDeviceId(deviceId);
+    request.setTimestamp(time);
+    request.setMeasurements(measurements);
+    request.setValues(ByteBuffer.allocate(1));
+
+    try {
+      RpcUtils.verifySuccess(client.testInsertRecord(request));
+    } catch (TException e) {
+      throw new IoTDBConnectionException(e);
+    }
+  }
+
+  /**
    * delete a timeseries, including data and schema
    *
    * @param path timeseries to delete, should be a whole path
@@ -565,19 +738,19 @@ public class Session {
     request.setPaths(paths);
 
     List<Integer> dataTypeOrdinals = new ArrayList<>(paths.size());
-    for (TSDataType dataType: dataTypes) {
+    for (TSDataType dataType : dataTypes) {
       dataTypeOrdinals.add(dataType.ordinal());
     }
     request.setDataTypes(dataTypeOrdinals);
 
     List<Integer> encodingOrdinals = new ArrayList<>(paths.size());
-    for (TSEncoding encoding: encodings) {
+    for (TSEncoding encoding : encodings) {
       encodingOrdinals.add(encoding.ordinal());
     }
     request.setEncodings(encodingOrdinals);
 
     List<Integer> compressionOrdinals = new ArrayList<>(paths.size());
-    for (CompressionType compression: compressors) {
+    for (CompressionType compression : compressors) {
       compressionOrdinals.add(compression.ordinal());
     }
     request.setCompressors(compressionOrdinals);
@@ -651,8 +824,10 @@ public class Session {
     }
 
     RpcUtils.verifySuccess(execResp.getStatus());
-    return new SessionDataSet(sql, execResp.getColumns(), 
execResp.getDataTypeList(), execResp.columnNameIndexMap,
-        execResp.getQueryId(), client, sessionId, execResp.queryDataSet, 
execResp.isIgnoreTimeStamp());
+    return new SessionDataSet(sql, execResp.getColumns(), 
execResp.getDataTypeList(),
+        execResp.columnNameIndexMap,
+        execResp.getQueryId(), client, sessionId, execResp.queryDataSet,
+        execResp.isIgnoreTimeStamp());
   }
 
   /**
diff --git 
a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java 
b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 17135fb..260ab65 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -37,45 +37,40 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * SessionPool is a wrapper of a Session Set.
- * Using SessionPool, the user do not need to consider how to reuse a session 
connection.
- * Even if the session is disconnected, the session pool can recognize it and 
remove the broken
- * session connection and create a new one.
- *
+ * SessionPool is a wrapper of a Session Set. Using SessionPool, the user do 
not need to consider
+ * how to reuse a session connection. Even if the session is disconnected, the 
session pool can
+ * recognize it and remove the broken session connection and create a new one.
+ * <p>
  * If there is no available connections and the pool reaches its max size, the 
all methods will hang
  * until there is a available connection.
- *
+ * <p>
  * If a user has waited for a session for more than 60 seconds, a warn log 
will be printed.
- *
+ * <p>
  * The only thing you have to remember is that:
- *
+ * <p>
  * For a query, if you have get all data, i.e., 
SessionDataSetWrapper.hasNext() == false, it is ok.
- * Otherwise, i.e., you want to stop the query before you get all data 
(SessionDataSetWrapper.hasNext() == true),
- * then you have to call closeResultSet(SessionDataSetWrapper wrapper) 
manually.
- * Otherwise the connection is occupied by the query.
- *
- * Another case that you have to manually call closeResultSet() is that when 
there is exception
- * when you call SessionDataSetWrapper.hasNext() or next()
- *
+ * Otherwise, i.e., you want to stop the query before you get all data
+ * (SessionDataSetWrapper.hasNext() == true), then you have to call 
closeResultSet(SessionDataSetWrapper
+ * wrapper) manually. Otherwise the connection is occupied by the query.
+ * <p>
+ * Another case that you have to manually call closeResultSet() is that when 
there is exception when
+ * you call SessionDataSetWrapper.hasNext() or next()
  */
 public class SessionPool {
 
   private static final Logger logger = 
LoggerFactory.getLogger(SessionPool.class);
+  private static int RETRY = 3;
   private ConcurrentLinkedDeque<Session> queue = new ConcurrentLinkedDeque<>();
   //for session whose resultSet is not released.
   private ConcurrentMap<Session, Session> occupied = new ConcurrentHashMap<>();
-
   private int size = 0;
   private int maxSize = 0;
   private String ip;
   private int port;
   private String user;
   private String password;
-
   private int fetchSize;
-
   private long timeout; //ms
-  private static int RETRY = 3;
   private static int FINAL_RETRY = RETRY - 1;
   private boolean enableCompression = false;
 
@@ -328,12 +323,12 @@ public class SessionPool {
    * @see Session#insertTablet(Tablet)
    */
   public void insertRecords(List<String> deviceIds, List<Long> times,
-      List<List<String>> measurementsList, List<List<String>> valuesList)
-      throws IoTDBConnectionException, BatchExecutionException {
+      List<List<String>> measurementsList, List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList) throws IoTDBConnectionException, 
BatchExecutionException {
     for (int i = 0; i < RETRY; i++) {
       Session session = getSession();
       try {
-        session.insertRecords(deviceIds, times, measurementsList, valuesList);
+        session.insertRecords(deviceIds, times, measurementsList, typesList, 
valuesList);
         putBack(session);
         return;
       } catch (IoTDBConnectionException e) {
@@ -350,15 +345,16 @@ public class SessionPool {
    * insert data in one row, if you want improve your performance, please use 
insertRecords method
    * or insertTablet method
    *
-   * @see Session#insertRecords(List, List, List, List)
+   * @see Session#insertRecords(List, List, List, List, List)
    * @see Session#insertTablet(Tablet)
    */
-  public void insertRecord(String deviceId, long time, List<String> 
measurements, List<String> values)
+  public void insertRecord(String deviceId, long time, List<String> 
measurements,
+      List<TSDataType> types, List<Object> values)
       throws IoTDBConnectionException, StatementExecutionException {
     for (int i = 0; i < RETRY; i++) {
       Session session = getSession();
       try {
-        session.insertRecord(deviceId, time, measurements, values);
+        session.insertRecord(deviceId, time, measurements, types, values);
         putBack(session);
         return;
       } catch (IoTDBConnectionException e) {
@@ -512,7 +508,7 @@ public class SessionPool {
    * delete data <= time in multiple timeseries
    *
    * @param paths data in which time series to delete
-   * @param time data with time stamp less than or equal to time will be 
deleted
+   * @param time  data with time stamp less than or equal to time will be 
deleted
    */
   public void deleteData(List<String> paths, long time)
       throws IoTDBConnectionException, StatementExecutionException {
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java 
b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
index 4c4b1e2..8097e37 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
@@ -54,6 +54,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class IoTDBSessionIT {
+
   private static Logger logger = LoggerFactory.getLogger(IoTDBSessionIT.class);
 
   private Session session;
@@ -72,6 +73,25 @@ public class IoTDBSessionIT {
   }
 
   @Test
+  public void testInsertByStr() throws IoTDBConnectionException, 
StatementExecutionException {
+    session = new Session("127.0.0.1", 6667, "root", "root");
+    session.open();
+
+    session.setStorageGroup("root.sg1");
+
+    createTimeseries();
+    insertByStr();
+
+    // sql test
+    insert_via_sql();
+    query3();
+
+    session.close();
+
+  }
+
+
+    @Test
   public void testInsertByObject() throws IoTDBConnectionException, 
StatementExecutionException {
     session = new Session("127.0.0.1", 6667, "root", "root");
     session.open();
@@ -200,6 +220,7 @@ public class IoTDBSessionIT {
 
     createTimeseries();
 
+
     // test insert tablet
     List<MeasurementSchema> schemaList = new ArrayList<>();
     schemaList.add(new MeasurementSchema("s1", TSDataType.INT64, 
TSEncoding.RLE));
@@ -351,6 +372,78 @@ public class IoTDBSessionIT {
   }
 
   @Test
+  public void testByStr() throws ClassNotFoundException, SQLException,
+      IoTDBConnectionException, StatementExecutionException, 
BatchExecutionException {
+    session = new Session("127.0.0.1", 6667, "root", "root");
+    try {
+      session.open();
+    } catch (IoTDBConnectionException e) {
+      e.printStackTrace();
+    }
+
+    session.setStorageGroup("root.sg1");
+
+    createTimeseries();
+
+    insertByStr();
+
+    // sql test
+    insert_via_sql();
+
+    query3();
+
+//    insertTabletTest1();
+    deleteData();
+
+    query();
+
+    deleteTimeseries();
+
+    query2();
+
+    insertRecordsByStr();
+
+    query4();
+
+    // special characters
+    session.createTimeseries("root.sg1.d1.1_2", TSDataType.INT64, 
TSEncoding.RLE,
+        CompressionType.SNAPPY);
+    session.createTimeseries("root.sg1.d1.\"1.2.3\"", TSDataType.INT64, 
TSEncoding.RLE,
+        CompressionType.SNAPPY);
+    session.createTimeseries("root.sg1.d1.\'1.2.4\'", TSDataType.INT64, 
TSEncoding.RLE,
+        CompressionType.SNAPPY);
+
+    session.setStorageGroup("root.1");
+    session.createTimeseries("root.1.2.3", TSDataType.INT64, TSEncoding.RLE,
+        CompressionType.SNAPPY);
+
+    // Add another storage group to test the deletion of storage group
+    session.setStorageGroup("root.sg2");
+    session.createTimeseries("root.sg2.d1.s1", TSDataType.INT64, 
TSEncoding.RLE,
+        CompressionType.SNAPPY);
+
+    deleteStorageGroupTest();
+
+    // set storage group but do not create timeseries
+    session.setStorageGroup("root.sg3");
+    insertTabletTest1("root.sg3.d1");
+
+    // create timeseries but do not set storage group
+    session.createTimeseries("root.sg4.d1.s1", TSDataType.INT64, 
TSEncoding.RLE,
+        CompressionType.SNAPPY);
+    session.createTimeseries("root.sg4.d1.s2", TSDataType.INT64, 
TSEncoding.RLE,
+        CompressionType.SNAPPY);
+    session.createTimeseries("root.sg4.d1.s3", TSDataType.INT64, 
TSEncoding.RLE,
+        CompressionType.SNAPPY);
+    insertTabletTest1("root.sg4.d1");
+
+    // do not set storage group and create timeseries
+    insertTabletTest1("root.sg5.d1");
+
+    session.close();
+  }
+
+  @Test
   public void TestSessionInterfacesWithDisabledWAL()
       throws StatementExecutionException, IoTDBConnectionException,
           BatchExecutionException {
@@ -370,15 +463,19 @@ public class IoTDBSessionIT {
 
     // test insert record
     List<String> measurements = new ArrayList<>();
+    List<TSDataType> types = new ArrayList<>();
     measurements.add("s1");
     measurements.add("s2");
     measurements.add("s3");
+    types.add(TSDataType.INT64);
+    types.add(TSDataType.INT64);
+    types.add(TSDataType.INT64);
     for (long time = 0; time < 100; time++) {
-      List<String> values = new ArrayList<>();
-      values.add("1");
-      values.add("2");
-      values.add("3");
-      session.insertRecord(deviceId, time, measurements, values);
+      List<Object> values = new ArrayList<>();
+      values.add(1L);
+      values.add(2L);
+      values.add(3L);
+      session.insertRecord(deviceId, time, measurements, types, values);
     }
 
     // test insert tablet
@@ -460,10 +557,13 @@ public class IoTDBSessionIT {
         }
         String sensorId = ss[ss.length - 1];
         List<String> measurements = new ArrayList<>();
-        List<String> values = new ArrayList<>();
+        List<Object> values = new ArrayList<>();
+        List<TSDataType> types = new ArrayList<>();
+
         measurements.add(sensorId);
-        values.add("100");
-        session.insertRecord(deviceId, i, measurements, values);
+        types.add(TSDataType.INT64);
+        values.add(100L);
+        session.insertRecord(deviceId, i, measurements, types, values);
       }
     }
   }
@@ -476,6 +576,45 @@ public class IoTDBSessionIT {
     measurements.add("s3");
     List<String> deviceIds = new ArrayList<>();
     List<List<String>> measurementsList = new ArrayList<>();
+    List<List<Object>> valuesList = new ArrayList<>();
+    List<Long> timestamps = new ArrayList<>();
+    List<List<TSDataType>> typesList = new ArrayList<>();
+
+    for (long time = 0; time < 500; time++) {
+      List<Object> values = new ArrayList<>();
+      List<TSDataType> types = new ArrayList<>();
+      values.add(1L);
+      values.add(2L);
+      values.add(3L);
+      types.add(TSDataType.INT64);
+      types.add(TSDataType.INT64);
+      types.add(TSDataType.INT64);
+
+      deviceIds.add(deviceId);
+      measurementsList.add(measurements);
+      valuesList.add(values);
+      typesList.add(types);
+      timestamps.add(time);
+      if (time != 0 && time % 100 == 0) {
+        session.insertRecords(deviceIds, timestamps, measurementsList, 
typesList, valuesList);
+        deviceIds.clear();
+        measurementsList.clear();
+        valuesList.clear();
+        timestamps.clear();
+      }
+    }
+
+    session.insertRecords(deviceIds, timestamps, measurementsList, typesList, 
valuesList);
+  }
+
+  private void insertRecordsByStr() throws IoTDBConnectionException, 
BatchExecutionException {
+    String deviceId = "root.sg1.d2";
+    List<String> measurements = new ArrayList<>();
+    measurements.add("s1");
+    measurements.add("s2");
+    measurements.add("s3");
+    List<String> deviceIds = new ArrayList<>();
+    List<List<String>> measurementsList = new ArrayList<>();
     List<List<String>> valuesList = new ArrayList<>();
     List<Long> timestamps = new ArrayList<>();
 
@@ -504,20 +643,46 @@ public class IoTDBSessionIT {
   private void insertInObject() throws IoTDBConnectionException, 
StatementExecutionException {
     String deviceId = "root.sg1.d1";
     List<String> measurements = new ArrayList<>();
+    List<TSDataType> types = new ArrayList<>();
     measurements.add("s1");
     measurements.add("s2");
     measurements.add("s3");
+    types.add(TSDataType.INT64);
+    types.add(TSDataType.INT64);
+    types.add(TSDataType.INT64);
+
     for (long time = 0; time < 100; time++) {
-      session.insertRecord(deviceId, time, measurements, 1L, 2L, 3L);
+      session.insertRecord(deviceId, time, measurements, types,1L, 2L, 3L);
     }
   }
 
   private void insert() throws IoTDBConnectionException, 
StatementExecutionException {
     String deviceId = "root.sg1.d1";
     List<String> measurements = new ArrayList<>();
+    List<TSDataType> types = new ArrayList<>();
+    measurements.add("s1");
+    measurements.add("s2");
+    measurements.add("s3");
+    types.add(TSDataType.INT64);
+    types.add(TSDataType.INT64);
+    types.add(TSDataType.INT64);
+
+    for (long time = 0; time < 100; time++) {
+      List<Object> values = new ArrayList<>();
+      values.add(1L);
+      values.add(2L);
+      values.add(3L);
+      session.insertRecord(deviceId, time, measurements, types, values);
+    }
+  }
+
+  private void insertByStr() throws IoTDBConnectionException, 
StatementExecutionException {
+    String deviceId = "root.sg1.d1";
+    List<String> measurements = new ArrayList<>();
     measurements.add("s1");
     measurements.add("s2");
     measurements.add("s3");
+
     for (long time = 0; time < 100; time++) {
       List<String> values = new ArrayList<>();
       values.add("1");
@@ -529,6 +694,7 @@ public class IoTDBSessionIT {
 
   private void insertTabletTest1(String deviceId)
       throws IoTDBConnectionException, BatchExecutionException {
+
     List<MeasurementSchema> schemaList = new ArrayList<>();
     schemaList.add(new MeasurementSchema("s1", TSDataType.INT64, 
TSEncoding.RLE));
     schemaList.add(new MeasurementSchema("s2", TSDataType.INT64, 
TSEncoding.RLE));
@@ -803,6 +969,7 @@ public class IoTDBSessionIT {
 
   private void insertTabletTest2(String deviceId)
       throws IoTDBConnectionException, BatchExecutionException {
+
     List<MeasurementSchema> schemaList = new ArrayList<>();
     schemaList.add(new MeasurementSchema("s1", TSDataType.INT64, 
TSEncoding.RLE));
     schemaList.add(new MeasurementSchema("s2", TSDataType.INT64, 
TSEncoding.RLE));
@@ -834,6 +1001,7 @@ public class IoTDBSessionIT {
 
   private void insertTabletTest3(String deviceId)
       throws IoTDBConnectionException, BatchExecutionException {
+
     List<MeasurementSchema> schemaList = new ArrayList<>();
     schemaList.add(new MeasurementSchema("s1", TSDataType.INT64, 
TSEncoding.RLE));
     schemaList.add(new MeasurementSchema("s2", TSDataType.INT64, 
TSEncoding.RLE));
@@ -865,6 +1033,7 @@ public class IoTDBSessionIT {
 
   private void insertTabletTestForTime(String deviceId)
       throws IoTDBConnectionException, BatchExecutionException {
+
     List<MeasurementSchema> schemaList = new ArrayList<>();
     schemaList.add(new MeasurementSchema("s1", TSDataType.INT64, 
TSEncoding.RLE));
     schemaList.add(new MeasurementSchema("s2", TSDataType.INT64, 
TSEncoding.RLE));
diff --git 
a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIteratorIT.java 
b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIteratorIT.java
index 64c9fad..ed6a24b 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIteratorIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIteratorIT.java
@@ -153,22 +153,29 @@ public class IoTDBSessionIteratorIT {
         CompressionType.SNAPPY);
     String deviceId = "root.sg1.d1";
     List<String> measurements = new ArrayList<>();
+    List<TSDataType> types = new ArrayList<>();
     measurements.add("s1");
     measurements.add("s2");
+    types.add(TSDataType.INT32);
+    types.add(TSDataType.FLOAT);
+
     for (long time = 0; time < 10; time++) {
-      List<String> values = new ArrayList<>();
-      values.add("1");
-      values.add("2");
-      session.insertRecord(deviceId, time, measurements, values);
+      List<Object> values = new ArrayList<>();
+      values.add(1);
+      values.add(2f);
+      session.insertRecord(deviceId, time, measurements, types, values);
     }
 
     deviceId = "root.sg1.d2";
     measurements = new ArrayList<>();
+    types = new ArrayList<>();
     measurements.add("s1");
+    types.add(TSDataType.DOUBLE);
+
     for (long time = 5; time < 10; time++) {
-      List<String> values = new ArrayList<>();
-      values.add("4");
-      session.insertRecord(deviceId, time, measurements, values);
+      List<Object> values = new ArrayList<>();
+      values.add(4d);
+      session.insertRecord(deviceId, time, measurements, types, values);
     }
   }
 
diff --git 
a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java 
b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
index 7df3d1f..c35d044 100644
--- a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
+++ b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -60,7 +61,7 @@ public class SessionPoolTest {
       service.submit(() -> {
         try {
           pool.insertRecord("root.sg1.d1", 1, Collections.singletonList("s" + 
no),
-              Collections.singletonList("3"));
+              Collections.singletonList(TSDataType.INT64), 
Collections.singletonList(3L));
         } catch (IoTDBConnectionException | StatementExecutionException e) {
           fail();
         }
@@ -84,7 +85,8 @@ public class SessionPoolTest {
     assertEquals(0, pool.currentAvailableSize());
     try {
       pool.insertRecord(".root.sg1.d1", 1, Collections.singletonList("s"),
-          Collections.singletonList("3"));
+          Collections.singletonList(TSDataType.INT64),
+          Collections.singletonList(3L));
     } catch (IoTDBConnectionException | StatementExecutionException e) {
       //do nothing
     }
@@ -100,7 +102,8 @@ public class SessionPoolTest {
     for (int i = 0; i < 10; i++) {
       try {
         pool.insertRecord("root.sg1.d1", i, Collections.singletonList("s" + i),
-            Collections.singletonList("" + i));
+            Collections.singletonList(TSDataType.INT64),
+            Collections.singletonList((long) i));
       } catch (IoTDBConnectionException | StatementExecutionException e) {
         fail();
       }
@@ -143,7 +146,8 @@ public class SessionPoolTest {
     for (int i = 0; i < 10; i++) {
       try {
         pool.insertRecord("root.sg1.d1", i, Collections.singletonList("s" + i),
-            Collections.singletonList("" + i));
+            Collections.singletonList(TSDataType.INT64),
+            Collections.singletonList((long) i));
       } catch (IoTDBConnectionException | StatementExecutionException e) {
         fail();
       }
@@ -179,7 +183,8 @@ public class SessionPoolTest {
     for (int i = 0; i < 10; i++) {
       try {
         pool.insertRecord("root.sg1.d1", i, Collections.singletonList("s" + i),
-            Collections.singletonList("" + i));
+            Collections.singletonList(TSDataType.INT64),
+            Collections.singletonList((long) i));
       } catch (IoTDBConnectionException | StatementExecutionException e) {
         fail();
       }
@@ -210,7 +215,8 @@ public class SessionPoolTest {
     for (int i = 0; i < 10; i++) {
       try {
         pool.insertRecord("root.sg1.d1", i, Collections.singletonList("s" + i),
-            Collections.singletonList("" + i));
+            Collections.singletonList(TSDataType.INT64),
+            Collections.singletonList((long) i));
       } catch (IoTDBConnectionException | StatementExecutionException e) {
         fail();
       }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/Binary.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/Binary.java
index 58e1260..1903a09 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/Binary.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/Binary.java
@@ -24,13 +24,12 @@ import java.util.Arrays;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 
 /**
- * Override compareTo() and equals() function to Binary class. This class is
- * used to accept Java String type
+ * Override compareTo() and equals() function to Binary class. This class is 
used to accept Java
+ * String type
  */
 public class Binary implements Comparable<Binary>, Serializable {
 
   private static final long serialVersionUID = 6394197743397020735L;
-
   private byte[] values;
 
   /**
@@ -117,4 +116,8 @@ public class Binary implements Comparable<Binary>, 
Serializable {
   public byte[] getValues() {
     return values;
   }
+
+  public void setValues(byte[] values) {
+    this.values = values;
+  }
 }
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
index c8dbea3..66500ab 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
@@ -19,11 +19,14 @@
 
 package org.apache.iotdb.tsfile.utils;
 
-import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.reader.TsFileInput;
+import static 
org.apache.iotdb.tsfile.utils.ReadWriteIOUtils.ClassSerializeId.BINARY;
+import static 
org.apache.iotdb.tsfile.utils.ReadWriteIOUtils.ClassSerializeId.BOOLEAN;
+import static 
org.apache.iotdb.tsfile.utils.ReadWriteIOUtils.ClassSerializeId.DOUBLE;
+import static 
org.apache.iotdb.tsfile.utils.ReadWriteIOUtils.ClassSerializeId.FLOAT;
+import static 
org.apache.iotdb.tsfile.utils.ReadWriteIOUtils.ClassSerializeId.INTEGER;
+import static 
org.apache.iotdb.tsfile.utils.ReadWriteIOUtils.ClassSerializeId.LONG;
+import static 
org.apache.iotdb.tsfile.utils.ReadWriteIOUtils.ClassSerializeId.NULL;
+import static 
org.apache.iotdb.tsfile.utils.ReadWriteIOUtils.ClassSerializeId.STRING;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -31,10 +34,17 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
-
-import static 
org.apache.iotdb.tsfile.utils.ReadWriteIOUtils.ClassSerializeId.*;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.reader.TsFileInput;
 
 /**
  * ConverterUtils is a utility class. It provide conversion between normal 
datatype and byte array.
@@ -213,6 +223,17 @@ public class ReadWriteIOUtils {
   }
 
   /**
+   * write a short n to byteBuffer.
+   *
+   * @return The number of bytes used to represent n.
+   */
+  public static int write(Binary n, ByteBuffer buffer) {
+    buffer.putInt(n.getLength());
+    buffer.put(n.getValues());
+    return INT_LEN + n.getLength();
+  }
+
+  /**
    * write a int n to outputStream.
    *
    * @return The number of bytes used to represent n.
@@ -287,6 +308,23 @@ public class ReadWriteIOUtils {
   }
 
   /**
+   * write a float n to byteBuffer.
+   */
+  public static int write(float n, ByteBuffer buffer) {
+    buffer.putFloat(n);
+    return FLOAT_LEN;
+  }
+
+  /**
+   * write a double n to byteBuffer.
+   */
+  public static int write(double n, ByteBuffer buffer) {
+    buffer.putDouble(n);
+    return DOUBLE_LEN;
+  }
+
+
+  /**
    * write string to outputStream.
    *
    * @return the length of string represented by byte[].

Reply via email to