This is an automated email from the ASF dual-hosted git repository.

xuekaifeng pushed a commit to branch 
IOTDB-615-Use-binary-rather-than-string-in-insert-plan
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to 
refs/heads/IOTDB-615-Use-binary-rather-than-string-in-insert-plan by this push:
     new 5a3fc4e  fix bugs
5a3fc4e is described below

commit 5a3fc4ebc256700492e6f4860bd890cda5801bb4
Author: 151250176 <[email protected]>
AuthorDate: Wed May 20 01:18:31 2020 +0800

    fix bugs
---
 .../org/apache/iotdb/flink/FlinkIoTDBSink.java     |   1 +
 .../java/org/apache/iotdb/SessionPoolExample.java  |  17 +-
 .../iotdb/flink/DefaultIoTSerializationSchema.java |  36 ++-
 .../main/java/org/apache/iotdb/flink/Event.java    |  67 +++--
 .../java/org/apache/iotdb/flink/IoTDBSink.java     | 282 +++++++++++----------
 .../iotdb/flink/IoTDBSinkBatchInsertTest.java      | 157 ++++++------
 .../iotdb/flink/IoTDBSinkBatchTimerTest.java       |  80 +++---
 .../apache/iotdb/flink/IoTDBSinkInsertTest.java    |  68 ++---
 8 files changed, 388 insertions(+), 320 deletions(-)

diff --git 
a/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java 
b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java
index 1048079..ca8d205 100644
--- a/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java
+++ b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java
@@ -64,6 +64,7 @@ public class FlinkIoTDBSink {
                 tuple.put("device", "root.sg.d1");
                 tuple.put("timestamp", 
String.valueOf(System.currentTimeMillis()));
                 tuple.put("measurements", "s1");
+                tuple.put("types", "DOUBLE");
                 tuple.put("values", String.valueOf(random.nextDouble()));
 
                 context.collect(tuple);
diff --git 
a/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java 
b/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
index 96a660d..395312e 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -28,6 +27,7 @@ import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.session.SessionDataSet.DataIterator;
 import org.apache.iotdb.session.pool.SessionDataSetWrapper;
 import org.apache.iotdb.session.pool.SessionPool;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 public class SessionPoolExample {
 
@@ -51,15 +51,20 @@ public class SessionPoolExample {
   private static void insertRecord() throws StatementExecutionException, 
IoTDBConnectionException {
     String deviceId = "root.sg1.d1";
     List<String> measurements = new ArrayList<>();
+    List<TSDataType> types = new ArrayList<>();
     measurements.add("s1");
     measurements.add("s2");
     measurements.add("s3");
+    types.add(TSDataType.INT64);
+    types.add(TSDataType.INT64);
+    types.add(TSDataType.INT64);
+
     for (long time = 0; time < 10; time++) {
-      List<String> values = new ArrayList<>();
-      values.add("1");
-      values.add("2");
-      values.add("3");
-      pool.insertRecord(deviceId, time, measurements, values);
+      List<Object> values = new ArrayList<>();
+      values.add(1L);
+      values.add(2L);
+      values.add(3L);
+      pool.insertRecord(deviceId, time, measurements, types, values);
     }
   }
 
diff --git 
a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/DefaultIoTSerializationSchema.java
 
b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/DefaultIoTSerializationSchema.java
index 4ac596b..2781906 100644
--- 
a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/DefaultIoTSerializationSchema.java
+++ 
b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/DefaultIoTSerializationSchema.java
@@ -18,9 +18,11 @@
 
 package org.apache.iotdb.flink;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 /**
  * @inheritDoc
@@ -31,6 +33,7 @@ public class DefaultIoTSerializationSchema implements 
IoTSerializationSchema<Map
     private String fieldTimestamp = "timestamp";
     private String fieldMeasurements = "measurements";
     private String fieldValues = "values";
+    private String fieldTypes = "types";
     private String separator = ",";
 
     @Override
@@ -49,12 +52,37 @@ public class DefaultIoTSerializationSchema implements 
IoTSerializationSchema<Map
             measurements = 
Arrays.asList(tuple.get(fieldMeasurements).split(separator));
         }
 
-        List<String> values = null;
-        if (tuple.get(fieldValues) != null) {
-            values = Arrays.asList(tuple.get(fieldValues).split(separator));
+        List<TSDataType> types = new ArrayList<>();
+        for(String type : tuple.get(fieldTypes).split(separator)){
+            types.add(TSDataType.valueOf(type));
         }
 
-        return new Event(device, timestamp, measurements, values);
+        List<Object> values = new ArrayList<>();
+        String[] valuesStr = tuple.get(fieldValues).split(separator);
+        for(int i = 0; i < valuesStr.length; i++){
+            switch (types.get(i)){
+                case INT64:
+                    values.add(Long.parseLong(valuesStr[i]));
+                    break;
+                case DOUBLE:
+                    values.add(Double.parseDouble(valuesStr[i]));
+                    break;
+                case INT32:
+                    values.add(Integer.parseInt(valuesStr[i]));
+                    break;
+                case TEXT:
+                    values.add(valuesStr[i]);
+                    break;
+                case FLOAT:
+                    values.add(Float.parseFloat(valuesStr[i]));
+                    break;
+                case BOOLEAN:
+                    values.add(Boolean.parseBoolean(valuesStr[i]));
+                    break;
+            }
+        }
+
+        return new Event(device, timestamp, measurements, types, values);
     }
 
     public String getFieldDevice() {
diff --git 
a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/Event.java 
b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/Event.java
index 6cccb23..83fc4a3 100644
--- a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/Event.java
+++ b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/Event.java
@@ -19,36 +19,49 @@
 package org.apache.iotdb.flink;
 
 import java.util.List;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 /**
  * Event serializes the device/sensor related data, such as time, measurements 
etc.
  */
 public class Event {
-    private String device;
-    private Long timestamp;
-    private List<String> measurements;
-    private List<String> values;
-
-    public Event(String device, Long timestamp, List<String> measurements, 
List<String> values) {
-        this.device = device;
-        this.timestamp = timestamp;
-        this.measurements = measurements;
-        this.values = values;
-    }
-
-    public String getDevice() {
-        return device;
-    }
-
-    public Long getTimestamp() {
-        return timestamp;
-    }
-
-    public List<String> getMeasurements() {
-        return measurements;
-    }
-
-    public List<String> getValues() {
-        return values;
-    }
+
+  private String device;
+  private Long timestamp;
+  private List<String> measurements;
+  private List<TSDataType> types;
+  private List<Object> values;
+
+  public Event(String device, Long timestamp, List<String> measurements, 
List<TSDataType> types,
+      List<Object> values) {
+    this.device = device;
+    this.timestamp = timestamp;
+    this.measurements = measurements;
+    this.types = types;
+    this.values = values;
+  }
+
+  public List<TSDataType> getTypes() {
+    return types;
+  }
+
+  public void setTypes(List<TSDataType> types) {
+    this.types = types;
+  }
+
+  public String getDevice() {
+    return device;
+  }
+
+  public Long getTimestamp() {
+    return timestamp;
+  }
+
+  public List<String> getMeasurements() {
+    return measurements;
+  }
+
+  public List<Object> getValues() {
+    return values;
+  }
 }
diff --git 
a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java 
b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
index bb3e042..c63871b 100644
--- a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
+++ b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
@@ -32,161 +32,167 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 /**
- * The `IoTDBSink` allows flink jobs to write events into IoTDB timeseries.
- * By default send only one event after another, but you can change to batch 
by invoking `withBatchSize(int)`.
+ * The `IoTDBSink` allows flink jobs to write events into IoTDB timeseries. By 
default send only one
+ * event after another, but you can change to batch by invoking 
`withBatchSize(int)`.
+ *
  * @param <IN> the input data type
  */
 public class IoTDBSink<IN> extends RichSinkFunction<IN> {
 
-    private static final long serialVersionUID = 1L;
-    private static final Logger LOG = LoggerFactory.getLogger(IoTDBSink.class);
-
-    private IoTDBOptions options;
-    private IoTSerializationSchema<IN> serializationSchema;
-    private Map<String, IoTDBOptions.TimeseriesOption> timeseriesOptionMap;
-    private transient SessionPool pool;
-    private transient ScheduledExecutorService scheduledExecutor;
-
-    private int batchSize = 0;
-    private int flushIntervalMs = 3000;
-    private List<Event> batchList;
-    private int sessionPoolSize = 2;
-
-    public IoTDBSink(IoTDBOptions options, IoTSerializationSchema<IN> schema) {
-        this.options = options;
-        this.serializationSchema = schema;
-        this.batchList = new LinkedList<>();
-        this.timeseriesOptionMap = new HashMap<>();
-        for (IoTDBOptions.TimeseriesOption timeseriesOption : 
options.getTimeseriesOptionList()) {
-            timeseriesOptionMap.put(timeseriesOption.getPath(), 
timeseriesOption);
-        }
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = LoggerFactory.getLogger(IoTDBSink.class);
+
+  private IoTDBOptions options;
+  private IoTSerializationSchema<IN> serializationSchema;
+  private Map<String, IoTDBOptions.TimeseriesOption> timeseriesOptionMap;
+  private transient SessionPool pool;
+  private transient ScheduledExecutorService scheduledExecutor;
+
+  private int batchSize = 0;
+  private int flushIntervalMs = 3000;
+  private List<Event> batchList;
+  private int sessionPoolSize = 2;
+
+  public IoTDBSink(IoTDBOptions options, IoTSerializationSchema<IN> schema) {
+    this.options = options;
+    this.serializationSchema = schema;
+    this.batchList = new LinkedList<>();
+    this.timeseriesOptionMap = new HashMap<>();
+    for (IoTDBOptions.TimeseriesOption timeseriesOption : 
options.getTimeseriesOptionList()) {
+      timeseriesOptionMap.put(timeseriesOption.getPath(), timeseriesOption);
     }
-
-    @Override
-    public void open(Configuration parameters) throws Exception {
-        initSession();
-        initScheduler();
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    initSession();
+    initScheduler();
+  }
+
+  void initSession() throws Exception {
+    pool = new SessionPool(options.getHost(), options.getPort(), 
options.getUser(),
+        options.getPassword(), sessionPoolSize);
+
+    pool.setStorageGroup(options.getStorageGroup());
+    for (IoTDBOptions.TimeseriesOption option : 
options.getTimeseriesOptionList()) {
+      if (!pool.checkTimeseriesExists(option.getPath())) {
+        pool.createTimeseries(option.getPath(), option.getDataType(), 
option.getEncoding(),
+            option.getCompressor());
+      }
     }
-
-    void initSession() throws Exception {
-        pool = new SessionPool(options.getHost(), options.getPort(), 
options.getUser(), options.getPassword(), sessionPoolSize);
-
-        pool.setStorageGroup(options.getStorageGroup());
-        for (IoTDBOptions.TimeseriesOption option : 
options.getTimeseriesOptionList()) {
-            if (!pool.checkTimeseriesExists(option.getPath())) {
-                pool.createTimeseries(option.getPath(), option.getDataType(), 
option.getEncoding(), option.getCompressor());
-            }
+  }
+
+  void initScheduler() {
+    if (batchSize > 0) {
+      scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
+      scheduledExecutor.scheduleAtFixedRate(() -> {
+        try {
+          flush();
+        } catch (Exception e) {
+          LOG.error("flush error", e);
         }
+      }, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS);
     }
-
-    void initScheduler() {
-        if (batchSize > 0) {
-            scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
-            scheduledExecutor.scheduleAtFixedRate(() -> {
-                try {
-                    flush();
-                } catch (Exception e) {
-                    LOG.error("flush error", e);
-                }
-            }, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS);
-        }
-    }
-
-    //  for testing
-    void setSessionPool(SessionPool pool) {
-        this.pool = pool;
+  }
+
+  //  for testing
+  void setSessionPool(SessionPool pool) {
+    this.pool = pool;
+  }
+
+  @Override
+  public void invoke(IN input, Context context) throws Exception {
+    Event event = serializationSchema.serialize(input);
+    if (event == null) {
+      return;
     }
 
-    @Override
-    public void invoke(IN input, Context context) throws Exception {
-        Event event = serializationSchema.serialize(input);
-        if (event == null) {
-            return;
-        }
-
-        if (batchSize > 0) {
-            synchronized (batchList) {
-                batchList.add(event);
-                if (batchList.size() >= batchSize) {
-                    flush();
-                }
-                return;
-            }
+    if (batchSize > 0) {
+      synchronized (batchList) {
+        batchList.add(event);
+        if (batchList.size() >= batchSize) {
+          flush();
         }
-
-        convertText(event.getDevice(), event.getMeasurements(), 
event.getValues());
-        pool.insertRecord(event.getDevice(), event.getTimestamp(), 
event.getMeasurements(),
-                event.getValues());
-        LOG.debug("send event successfully");
-    }
-
-    public IoTDBSink<IN> withBatchSize(int batchSize) {
-        Preconditions.checkArgument(batchSize >= 0);
-        this.batchSize = batchSize;
-        return this;
-    }
-
-    public IoTDBSink<IN> withFlushIntervalMs(int flushIntervalMs) {
-        Preconditions.checkArgument(flushIntervalMs > 0);
-        this.flushIntervalMs = flushIntervalMs;
-        return this;
+        return;
+      }
     }
 
-    public IoTDBSink<IN> withSessionPoolSize(int sessionPoolSize) {
-        Preconditions.checkArgument(sessionPoolSize > 0);
-        this.sessionPoolSize = sessionPoolSize;
-        return this;
+    convertText(event.getDevice(), event.getMeasurements(), event.getValues());
+    pool.insertRecord(event.getDevice(), event.getTimestamp(), 
event.getMeasurements(),
+        event.getTypes(), event.getValues());
+    LOG.debug("send event successfully");
+  }
+
+  public IoTDBSink<IN> withBatchSize(int batchSize) {
+    Preconditions.checkArgument(batchSize >= 0);
+    this.batchSize = batchSize;
+    return this;
+  }
+
+  public IoTDBSink<IN> withFlushIntervalMs(int flushIntervalMs) {
+    Preconditions.checkArgument(flushIntervalMs > 0);
+    this.flushIntervalMs = flushIntervalMs;
+    return this;
+  }
+
+  public IoTDBSink<IN> withSessionPoolSize(int sessionPoolSize) {
+    Preconditions.checkArgument(sessionPoolSize > 0);
+    this.sessionPoolSize = sessionPoolSize;
+    return this;
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (pool != null) {
+      try {
+        flush();
+      } catch (Exception e) {
+        LOG.error("flush error", e);
+      }
+      pool.close();
     }
-
-    @Override
-    public void close() throws Exception {
-        if (pool != null) {
-            try {
-                flush();
-            } catch (Exception e) {
-                LOG.error("flush error", e);
-            }
-            pool.close();
-        }
-        if (scheduledExecutor != null) {
-            scheduledExecutor.shutdown();
-        }
+    if (scheduledExecutor != null) {
+      scheduledExecutor.shutdown();
     }
-
-    private void convertText(String device, List<String> measurements, 
List<String> values) {
-        if (device != null && measurements != null && values != null && 
measurements.size() == values.size()) {
-            for (int i = 0; i < measurements.size(); i++) {
-                String measurement = device + "." + measurements.get(i);
-                IoTDBOptions.TimeseriesOption timeseriesOption = 
timeseriesOptionMap.get(measurement);
-                if (timeseriesOption!= null && 
TSDataType.TEXT.equals(timeseriesOption.getDataType())) {
-                    // The TEXT data type should be covered by " or '
-                    values.set(i, "'" + values.get(i) + "'");
-                }
-            }
+  }
+
+  private void convertText(String device, List<String> measurements, 
List<Object> values) {
+    if (device != null && measurements != null && values != null && 
measurements.size() == values
+        .size()) {
+      for (int i = 0; i < measurements.size(); i++) {
+        String measurement = device + "." + measurements.get(i);
+        IoTDBOptions.TimeseriesOption timeseriesOption = 
timeseriesOptionMap.get(measurement);
+        if (timeseriesOption != null && 
TSDataType.TEXT.equals(timeseriesOption.getDataType())) {
+          // The TEXT data type should be covered by " or '
+          values.set(i, "'" + values.get(i) + "'");
         }
+      }
     }
-
-    private void flush() throws Exception {
-        if (batchSize > 0) {
-            synchronized (batchList) {
-                if (batchList.size() > 0) {
-                    List<String> deviceIds = new ArrayList<>();
-                    List<Long> timestamps = new ArrayList<>();
-                    List<List<String>> measurementsList = new ArrayList<>();
-                    List<List<String>> valuesList = new ArrayList<>();
-
-                    for (Event event : batchList) {
-                        convertText(event.getDevice(), 
event.getMeasurements(), event.getValues());
-                        deviceIds.add(event.getDevice());
-                        timestamps.add(event.getTimestamp());
-                        measurementsList.add(event.getMeasurements());
-                        valuesList.add(event.getValues());
-                    }
-                    pool.insertRecords(deviceIds, timestamps, 
measurementsList, valuesList);
-                    LOG.debug("send event successfully");
-                    batchList.clear();
-                }
-            }
+  }
+
+  private void flush() throws Exception {
+    if (batchSize > 0) {
+      synchronized (batchList) {
+        if (batchList.size() > 0) {
+          List<String> deviceIds = new ArrayList<>();
+          List<Long> timestamps = new ArrayList<>();
+          List<List<String>> measurementsList = new ArrayList<>();
+          List<List<TSDataType>> typesList = new ArrayList<>();
+          List<List<Object>> valuesList = new ArrayList<>();
+
+          for (Event event : batchList) {
+            convertText(event.getDevice(), event.getMeasurements(), 
event.getValues());
+            deviceIds.add(event.getDevice());
+            timestamps.add(event.getTimestamp());
+            measurementsList.add(event.getMeasurements());
+            typesList.add(event.getTypes());
+            valuesList.add(event.getValues());
+          }
+          pool.insertRecords(deviceIds, timestamps, measurementsList, 
typesList, valuesList);
+          LOG.debug("send event successfully");
+          batchList.clear();
         }
+      }
     }
+  }
 }
diff --git 
a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java
 
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java
index 7f6fe5e..930565b 100644
--- 
a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java
+++ 
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java
@@ -18,85 +18,94 @@
 
 package org.apache.iotdb.flink;
 
-import com.google.common.collect.Lists;
-import org.apache.iotdb.session.pool.SessionPool;
-import org.junit.Before;
-import org.junit.Test;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
 
+import com.google.common.collect.Lists;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
+import org.apache.iotdb.session.pool.SessionPool;
+import org.junit.Before;
+import org.junit.Test;
 
 public class IoTDBSinkBatchInsertTest {
 
-    private IoTDBSink ioTDBSink;
-    private SessionPool pool;
-
-    @Before
-    public void setUp() throws Exception {
-        IoTDBOptions options = new IoTDBOptions();
-        options.setTimeseriesOptionList(Lists.newArrayList(new 
IoTDBOptions.TimeseriesOption("root.sg.D01.temperature")));
-        ioTDBSink = new IoTDBSink(options, new 
DefaultIoTSerializationSchema());
-        ioTDBSink.withBatchSize(3);
-
-        pool = mock(SessionPool.class);
-        ioTDBSink.setSessionPool(pool);
-    }
-
-    @Test
-    public void testBatchInsert() throws Exception {
-        Map<String,String> tuple = new HashMap();
-        tuple.put("device", "root.sg.D01");
-        tuple.put("timestamp", "1581861293000");
-        tuple.put("measurements", "temperature");
-        tuple.put("values", "36.5");
-        ioTDBSink.invoke(tuple, null);
-
-        verifyZeroInteractions(pool);
-
-        tuple = new HashMap();
-        tuple.put("device", "root.sg.D01");
-        tuple.put("timestamp", "1581861293001");
-        tuple.put("measurements", "temperature");
-        tuple.put("values", "37.2");
-        ioTDBSink.invoke(tuple, null);
-
-        verifyZeroInteractions(pool);
-
-        tuple = new HashMap();
-        tuple.put("device", "root.sg.D01");
-        tuple.put("timestamp", "1581861293003");
-        tuple.put("measurements", "temperature");
-        tuple.put("values", "37.1");
-        ioTDBSink.invoke(tuple, null);
-
-        verify(pool).insertRecords(any(List.class), any(List.class), 
any(List.class), any(List.class));
-
-        tuple = new HashMap();
-        tuple.put("device", "root.sg.D01");
-        tuple.put("timestamp", "1581861293005");
-        tuple.put("measurements", "temperature");
-        tuple.put("values", "36.5");
-        ioTDBSink.invoke(tuple, null);
-
-        verifyZeroInteractions(pool);
-    }
-
-    @Test
-    public void close() throws Exception {
-        Map<String,String> tuple = new HashMap();
-        tuple.put("device", "root.sg.D01");
-        tuple.put("timestamp", "1581861293005");
-        tuple.put("measurements", "temperature");
-        tuple.put("values", "36.5");
-        ioTDBSink.invoke(tuple, null);
-        verifyZeroInteractions(pool);
-
-        ioTDBSink.close();
-        verify(pool).insertRecords(any(List.class), any(List.class), 
any(List.class), any(List.class));
-        verify(pool).close();
-    }
+  private IoTDBSink ioTDBSink;
+  private SessionPool pool;
+
+  @Before
+  public void setUp() throws Exception {
+    IoTDBOptions options = new IoTDBOptions();
+    options.setTimeseriesOptionList(
+        Lists.newArrayList(new 
IoTDBOptions.TimeseriesOption("root.sg.D01.temperature")));
+    ioTDBSink = new IoTDBSink(options, new DefaultIoTSerializationSchema());
+    ioTDBSink.withBatchSize(3);
+
+    pool = mock(SessionPool.class);
+    ioTDBSink.setSessionPool(pool);
+  }
+
+  @Test
+  public void testBatchInsert() throws Exception {
+    Map<String, String> tuple = new HashMap();
+    tuple.put("device", "root.sg.D01");
+    tuple.put("timestamp", "1581861293000");
+    tuple.put("measurements", "temperature");
+    tuple.put("types", "DOUBLE");
+    tuple.put("values", "36.5");
+    ioTDBSink.invoke(tuple, null);
+
+    verifyZeroInteractions(pool);
+
+    tuple = new HashMap();
+    tuple.put("device", "root.sg.D01");
+    tuple.put("timestamp", "1581861293001");
+    tuple.put("measurements", "temperature");
+    tuple.put("types", "DOUBLE");
+    tuple.put("values", "37.2");
+    ioTDBSink.invoke(tuple, null);
+
+    verifyZeroInteractions(pool);
+
+    tuple = new HashMap();
+    tuple.put("device", "root.sg.D01");
+    tuple.put("timestamp", "1581861293003");
+    tuple.put("measurements", "temperature");
+    tuple.put("types", "DOUBLE");
+    tuple.put("values", "37.1");
+    ioTDBSink.invoke(tuple, null);
+
+    verify(pool).insertRecords(any(List.class), any(List.class), 
any(List.class), any(List.class),
+        any(List.class));
+
+    tuple = new HashMap();
+    tuple.put("device", "root.sg.D01");
+    tuple.put("timestamp", "1581861293005");
+    tuple.put("measurements", "temperature");
+    tuple.put("types", "DOUBLE");
+    tuple.put("values", "36.5");
+    ioTDBSink.invoke(tuple, null);
+
+    verifyZeroInteractions(pool);
+  }
+
+  @Test
+  public void close() throws Exception {
+    Map<String, String> tuple = new HashMap();
+    tuple.put("device", "root.sg.D01");
+    tuple.put("timestamp", "1581861293005");
+    tuple.put("measurements", "temperature");
+    tuple.put("types", "DOUBLE");
+    tuple.put("values", "36.5");
+    ioTDBSink.invoke(tuple, null);
+    verifyZeroInteractions(pool);
+
+    ioTDBSink.close();
+    verify(pool).insertRecords(any(List.class), any(List.class), 
any(List.class), any(List.class),
+        any(List.class));
+    verify(pool).close();
+  }
 }
diff --git 
a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchTimerTest.java
 
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchTimerTest.java
index f8c7f7e..f9e2d62 100644
--- 
a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchTimerTest.java
+++ 
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchTimerTest.java
@@ -18,57 +18,61 @@
 
 package org.apache.iotdb.flink;
 
-import com.google.common.collect.Lists;
-import org.apache.iotdb.session.pool.SessionPool;
-import org.junit.Before;
-import org.junit.Test;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
 
+import com.google.common.collect.Lists;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
+import org.apache.iotdb.session.pool.SessionPool;
+import org.junit.Before;
+import org.junit.Test;
 
 public class IoTDBSinkBatchTimerTest {
 
-    private IoTDBSink ioTDBSink;
-    private SessionPool pool;
+  private IoTDBSink ioTDBSink;
+  private SessionPool pool;
 
-    @Before
-    public void setUp() throws Exception {
-        IoTDBOptions options = new IoTDBOptions();
-        options.setTimeseriesOptionList(Lists.newArrayList(new 
IoTDBOptions.TimeseriesOption("root.sg.D01.temperature")));
-        ioTDBSink = new IoTDBSink(options, new 
DefaultIoTSerializationSchema());
-        ioTDBSink.withBatchSize(3);
-        ioTDBSink.withFlushIntervalMs(1000);
-        ioTDBSink.initScheduler();
+  @Before
+  public void setUp() throws Exception {
+    IoTDBOptions options = new IoTDBOptions();
+    options.setTimeseriesOptionList(
+        Lists.newArrayList(new 
IoTDBOptions.TimeseriesOption("root.sg.D01.temperature")));
+    ioTDBSink = new IoTDBSink(options, new DefaultIoTSerializationSchema());
+    ioTDBSink.withBatchSize(3);
+    ioTDBSink.withFlushIntervalMs(1000);
+    ioTDBSink.initScheduler();
 
-        pool = mock(SessionPool.class);
-        ioTDBSink.setSessionPool(pool);
-    }
+    pool = mock(SessionPool.class);
+    ioTDBSink.setSessionPool(pool);
+  }
 
-    @Test
-    public void testBatchInsert() throws Exception {
-        Map<String,String> tuple = new HashMap();
-        tuple.put("device", "root.sg.D01");
-        tuple.put("timestamp", "1581861293000");
-        tuple.put("measurements", "temperature");
-        tuple.put("values", "36.5");
-        ioTDBSink.invoke(tuple, null);
+  @Test
+  public void testBatchInsert() throws Exception {
+    Map<String, String> tuple = new HashMap();
+    tuple.put("device", "root.sg.D01");
+    tuple.put("timestamp", "1581861293000");
+    tuple.put("measurements", "temperature");
+    tuple.put("types", "DOUBLE");
+    tuple.put("values", "36.5");
+    ioTDBSink.invoke(tuple, null);
 
-        Thread.sleep(2500);
+    Thread.sleep(2500);
 
-        verify(pool).insertRecords(any(List.class), any(List.class), 
any(List.class), any(List.class));
+    verify(pool).insertRecords(any(List.class), any(List.class), 
any(List.class), any(List.class),
+        any(List.class));
 
-        Thread.sleep(1000);
+    Thread.sleep(1000);
 
-        verifyZeroInteractions(pool);
-    }
+    verifyZeroInteractions(pool);
+  }
 
-    @Test
-    public void close() throws Exception {
-        ioTDBSink.close();
-        verify(pool).close();
-    }
+  @Test
+  public void close() throws Exception {
+    ioTDBSink.close();
+    verify(pool).close();
+  }
 }
diff --git 
a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkInsertTest.java
 
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkInsertTest.java
index 3bcb367..6c268f6 100644
--- 
a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkInsertTest.java
+++ 
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkInsertTest.java
@@ -18,49 +18,51 @@
 
 package org.apache.iotdb.flink;
 
-import com.google.common.collect.Lists;
-import org.apache.iotdb.session.pool.SessionPool;
-import org.junit.Before;
-import org.junit.Test;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 
+import com.google.common.collect.Lists;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
+import org.apache.iotdb.session.pool.SessionPool;
+import org.junit.Before;
+import org.junit.Test;
 
 public class IoTDBSinkInsertTest {
 
-    private IoTDBSink ioTDBSink;
-    private SessionPool pool;
+  private IoTDBSink ioTDBSink;
+  private SessionPool pool;
 
-    @Before
-    public void setUp() throws Exception {
-        IoTDBOptions options = new IoTDBOptions();
-        options.setTimeseriesOptionList(Lists.newArrayList(new 
IoTDBOptions.TimeseriesOption("root.sg.D01.temperature")));
-        ioTDBSink = new IoTDBSink(options, new 
DefaultIoTSerializationSchema());
+  @Before
+  public void setUp() throws Exception {
+    IoTDBOptions options = new IoTDBOptions();
+    options.setTimeseriesOptionList(
+        Lists.newArrayList(new 
IoTDBOptions.TimeseriesOption("root.sg.D01.temperature")));
+    ioTDBSink = new IoTDBSink(options, new DefaultIoTSerializationSchema());
 
-        pool = mock(SessionPool.class);
-        ioTDBSink.setSessionPool(pool);
-    }
+    pool = mock(SessionPool.class);
+    ioTDBSink.setSessionPool(pool);
+  }
 
-    @Test
-    public void testInsert() throws Exception {
-        Map<String,String> tuple = new HashMap();
-        tuple.put("device", "root.sg.D01");
-        tuple.put("timestamp", "1581861293000");
-        tuple.put("measurements", "temperature");
-        tuple.put("values", "36.5");
+  @Test
+  public void testInsert() throws Exception {
+    Map<String, String> tuple = new HashMap();
+    tuple.put("device", "root.sg.D01");
+    tuple.put("timestamp", "1581861293000");
+    tuple.put("measurements", "temperature");
+    tuple.put("types", "DOUBLE");
+    tuple.put("values", "36.5");
 
-        ioTDBSink.invoke(tuple, null);
-        verify(pool).insertRecord(any(String.class), any(Long.class), 
any(List.class), any(List.class));
-    }
+    ioTDBSink.invoke(tuple, null);
+    verify(pool).insertRecord(any(String.class), any(Long.class), 
any(List.class), any(List.class),
+        any(List.class));
+  }
 
-    @Test
-    public void close() throws Exception {
-        ioTDBSink.close();
-        verify(pool).close();
-    }
+  @Test
+  public void close() throws Exception {
+    ioTDBSink.close();
+    verify(pool).close();
+  }
 }

Reply via email to