This is an automated email from the ASF dual-hosted git repository.
xuekaifeng pushed a commit to branch
IOTDB-615-Use-binary-rather-than-string-in-insert-plan
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to
refs/heads/IOTDB-615-Use-binary-rather-than-string-in-insert-plan by this push:
new 5a3fc4e fix bugs
5a3fc4e is described below
commit 5a3fc4ebc256700492e6f4860bd890cda5801bb4
Author: 151250176 <[email protected]>
AuthorDate: Wed May 20 01:18:31 2020 +0800
fix bugs
---
.../org/apache/iotdb/flink/FlinkIoTDBSink.java | 1 +
.../java/org/apache/iotdb/SessionPoolExample.java | 17 +-
.../iotdb/flink/DefaultIoTSerializationSchema.java | 36 ++-
.../main/java/org/apache/iotdb/flink/Event.java | 67 +++--
.../java/org/apache/iotdb/flink/IoTDBSink.java | 282 +++++++++++----------
.../iotdb/flink/IoTDBSinkBatchInsertTest.java | 157 ++++++------
.../iotdb/flink/IoTDBSinkBatchTimerTest.java | 80 +++---
.../apache/iotdb/flink/IoTDBSinkInsertTest.java | 68 ++---
8 files changed, 388 insertions(+), 320 deletions(-)
diff --git
a/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java
b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java
index 1048079..ca8d205 100644
--- a/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java
+++ b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java
@@ -64,6 +64,7 @@ public class FlinkIoTDBSink {
tuple.put("device", "root.sg.d1");
tuple.put("timestamp",
String.valueOf(System.currentTimeMillis()));
tuple.put("measurements", "s1");
+ tuple.put("types", "DOUBLE");
tuple.put("values", String.valueOf(random.nextDouble()));
context.collect(tuple);
diff --git
a/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
b/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
index 96a660d..395312e 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
@@ -19,7 +19,6 @@
package org.apache.iotdb;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -28,6 +27,7 @@ import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.SessionDataSet.DataIterator;
import org.apache.iotdb.session.pool.SessionDataSetWrapper;
import org.apache.iotdb.session.pool.SessionPool;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
public class SessionPoolExample {
@@ -51,15 +51,20 @@ public class SessionPoolExample {
private static void insertRecord() throws StatementExecutionException,
IoTDBConnectionException {
String deviceId = "root.sg1.d1";
List<String> measurements = new ArrayList<>();
+ List<TSDataType> types = new ArrayList<>();
measurements.add("s1");
measurements.add("s2");
measurements.add("s3");
+ types.add(TSDataType.INT64);
+ types.add(TSDataType.INT64);
+ types.add(TSDataType.INT64);
+
for (long time = 0; time < 10; time++) {
- List<String> values = new ArrayList<>();
- values.add("1");
- values.add("2");
- values.add("3");
- pool.insertRecord(deviceId, time, measurements, values);
+ List<Object> values = new ArrayList<>();
+ values.add(1L);
+ values.add(2L);
+ values.add(3L);
+ pool.insertRecord(deviceId, time, measurements, types, values);
}
}
diff --git
a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/DefaultIoTSerializationSchema.java
b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/DefaultIoTSerializationSchema.java
index 4ac596b..2781906 100644
---
a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/DefaultIoTSerializationSchema.java
+++
b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/DefaultIoTSerializationSchema.java
@@ -18,9 +18,11 @@
package org.apache.iotdb.flink;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
/**
* @inheritDoc
@@ -31,6 +33,7 @@ public class DefaultIoTSerializationSchema implements
IoTSerializationSchema<Map
private String fieldTimestamp = "timestamp";
private String fieldMeasurements = "measurements";
private String fieldValues = "values";
+ private String fieldTypes = "types";
private String separator = ",";
@Override
@@ -49,12 +52,37 @@ public class DefaultIoTSerializationSchema implements
IoTSerializationSchema<Map
measurements =
Arrays.asList(tuple.get(fieldMeasurements).split(separator));
}
- List<String> values = null;
- if (tuple.get(fieldValues) != null) {
- values = Arrays.asList(tuple.get(fieldValues).split(separator));
+ List<TSDataType> types = new ArrayList<>();
+ for(String type : tuple.get(fieldTypes).split(separator)){
+ types.add(TSDataType.valueOf(type));
}
- return new Event(device, timestamp, measurements, values);
+ List<Object> values = new ArrayList<>();
+ String[] valuesStr = tuple.get(fieldValues).split(separator);
+ for(int i = 0; i < valuesStr.length; i++){
+ switch (types.get(i)){
+ case INT64:
+ values.add(Long.parseLong(valuesStr[i]));
+ break;
+ case DOUBLE:
+ values.add(Double.parseDouble(valuesStr[i]));
+ break;
+ case INT32:
+ values.add(Integer.parseInt(valuesStr[i]));
+ break;
+ case TEXT:
+ values.add(valuesStr[i]);
+ break;
+ case FLOAT:
+ values.add(Float.parseFloat(valuesStr[i]));
+ break;
+ case BOOLEAN:
+ values.add(Boolean.parseBoolean(valuesStr[i]));
+ break;
+ }
+ }
+
+ return new Event(device, timestamp, measurements, types, values);
}
public String getFieldDevice() {
diff --git
a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/Event.java
b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/Event.java
index 6cccb23..83fc4a3 100644
--- a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/Event.java
+++ b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/Event.java
@@ -19,36 +19,49 @@
package org.apache.iotdb.flink;
import java.util.List;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
/**
* Event serializes the device/sensor related data, such as time, measurements
etc.
*/
public class Event {
- private String device;
- private Long timestamp;
- private List<String> measurements;
- private List<String> values;
-
- public Event(String device, Long timestamp, List<String> measurements,
List<String> values) {
- this.device = device;
- this.timestamp = timestamp;
- this.measurements = measurements;
- this.values = values;
- }
-
- public String getDevice() {
- return device;
- }
-
- public Long getTimestamp() {
- return timestamp;
- }
-
- public List<String> getMeasurements() {
- return measurements;
- }
-
- public List<String> getValues() {
- return values;
- }
+
+ private String device;
+ private Long timestamp;
+ private List<String> measurements;
+ private List<TSDataType> types;
+ private List<Object> values;
+
+ public Event(String device, Long timestamp, List<String> measurements,
List<TSDataType> types,
+ List<Object> values) {
+ this.device = device;
+ this.timestamp = timestamp;
+ this.measurements = measurements;
+ this.types = types;
+ this.values = values;
+ }
+
+ public List<TSDataType> getTypes() {
+ return types;
+ }
+
+ public void setTypes(List<TSDataType> types) {
+ this.types = types;
+ }
+
+ public String getDevice() {
+ return device;
+ }
+
+ public Long getTimestamp() {
+ return timestamp;
+ }
+
+ public List<String> getMeasurements() {
+ return measurements;
+ }
+
+ public List<Object> getValues() {
+ return values;
+ }
}
diff --git
a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
index bb3e042..c63871b 100644
--- a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
+++ b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
@@ -32,161 +32,167 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
- * The `IoTDBSink` allows flink jobs to write events into IoTDB timeseries.
- * By default send only one event after another, but you can change to batch
by invoking `withBatchSize(int)`.
+ * The `IoTDBSink` allows flink jobs to write events into IoTDB timeseries. By
default send only one
+ * event after another, but you can change to batch by invoking
`withBatchSize(int)`.
+ *
* @param <IN> the input data type
*/
public class IoTDBSink<IN> extends RichSinkFunction<IN> {
- private static final long serialVersionUID = 1L;
- private static final Logger LOG = LoggerFactory.getLogger(IoTDBSink.class);
-
- private IoTDBOptions options;
- private IoTSerializationSchema<IN> serializationSchema;
- private Map<String, IoTDBOptions.TimeseriesOption> timeseriesOptionMap;
- private transient SessionPool pool;
- private transient ScheduledExecutorService scheduledExecutor;
-
- private int batchSize = 0;
- private int flushIntervalMs = 3000;
- private List<Event> batchList;
- private int sessionPoolSize = 2;
-
- public IoTDBSink(IoTDBOptions options, IoTSerializationSchema<IN> schema) {
- this.options = options;
- this.serializationSchema = schema;
- this.batchList = new LinkedList<>();
- this.timeseriesOptionMap = new HashMap<>();
- for (IoTDBOptions.TimeseriesOption timeseriesOption :
options.getTimeseriesOptionList()) {
- timeseriesOptionMap.put(timeseriesOption.getPath(),
timeseriesOption);
- }
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG = LoggerFactory.getLogger(IoTDBSink.class);
+
+ private IoTDBOptions options;
+ private IoTSerializationSchema<IN> serializationSchema;
+ private Map<String, IoTDBOptions.TimeseriesOption> timeseriesOptionMap;
+ private transient SessionPool pool;
+ private transient ScheduledExecutorService scheduledExecutor;
+
+ private int batchSize = 0;
+ private int flushIntervalMs = 3000;
+ private List<Event> batchList;
+ private int sessionPoolSize = 2;
+
+ public IoTDBSink(IoTDBOptions options, IoTSerializationSchema<IN> schema) {
+ this.options = options;
+ this.serializationSchema = schema;
+ this.batchList = new LinkedList<>();
+ this.timeseriesOptionMap = new HashMap<>();
+ for (IoTDBOptions.TimeseriesOption timeseriesOption :
options.getTimeseriesOptionList()) {
+ timeseriesOptionMap.put(timeseriesOption.getPath(), timeseriesOption);
}
-
- @Override
- public void open(Configuration parameters) throws Exception {
- initSession();
- initScheduler();
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ initSession();
+ initScheduler();
+ }
+
+ void initSession() throws Exception {
+ pool = new SessionPool(options.getHost(), options.getPort(),
options.getUser(),
+ options.getPassword(), sessionPoolSize);
+
+ pool.setStorageGroup(options.getStorageGroup());
+ for (IoTDBOptions.TimeseriesOption option :
options.getTimeseriesOptionList()) {
+ if (!pool.checkTimeseriesExists(option.getPath())) {
+ pool.createTimeseries(option.getPath(), option.getDataType(),
option.getEncoding(),
+ option.getCompressor());
+ }
}
-
- void initSession() throws Exception {
- pool = new SessionPool(options.getHost(), options.getPort(),
options.getUser(), options.getPassword(), sessionPoolSize);
-
- pool.setStorageGroup(options.getStorageGroup());
- for (IoTDBOptions.TimeseriesOption option :
options.getTimeseriesOptionList()) {
- if (!pool.checkTimeseriesExists(option.getPath())) {
- pool.createTimeseries(option.getPath(), option.getDataType(),
option.getEncoding(), option.getCompressor());
- }
+ }
+
+ void initScheduler() {
+ if (batchSize > 0) {
+ scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
+ scheduledExecutor.scheduleAtFixedRate(() -> {
+ try {
+ flush();
+ } catch (Exception e) {
+ LOG.error("flush error", e);
}
+ }, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS);
}
-
- void initScheduler() {
- if (batchSize > 0) {
- scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
- scheduledExecutor.scheduleAtFixedRate(() -> {
- try {
- flush();
- } catch (Exception e) {
- LOG.error("flush error", e);
- }
- }, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS);
- }
- }
-
- // for testing
- void setSessionPool(SessionPool pool) {
- this.pool = pool;
+ }
+
+ // for testing
+ void setSessionPool(SessionPool pool) {
+ this.pool = pool;
+ }
+
+ @Override
+ public void invoke(IN input, Context context) throws Exception {
+ Event event = serializationSchema.serialize(input);
+ if (event == null) {
+ return;
}
- @Override
- public void invoke(IN input, Context context) throws Exception {
- Event event = serializationSchema.serialize(input);
- if (event == null) {
- return;
- }
-
- if (batchSize > 0) {
- synchronized (batchList) {
- batchList.add(event);
- if (batchList.size() >= batchSize) {
- flush();
- }
- return;
- }
+ if (batchSize > 0) {
+ synchronized (batchList) {
+ batchList.add(event);
+ if (batchList.size() >= batchSize) {
+ flush();
}
-
- convertText(event.getDevice(), event.getMeasurements(),
event.getValues());
- pool.insertRecord(event.getDevice(), event.getTimestamp(),
event.getMeasurements(),
- event.getValues());
- LOG.debug("send event successfully");
- }
-
- public IoTDBSink<IN> withBatchSize(int batchSize) {
- Preconditions.checkArgument(batchSize >= 0);
- this.batchSize = batchSize;
- return this;
- }
-
- public IoTDBSink<IN> withFlushIntervalMs(int flushIntervalMs) {
- Preconditions.checkArgument(flushIntervalMs > 0);
- this.flushIntervalMs = flushIntervalMs;
- return this;
+ return;
+ }
}
- public IoTDBSink<IN> withSessionPoolSize(int sessionPoolSize) {
- Preconditions.checkArgument(sessionPoolSize > 0);
- this.sessionPoolSize = sessionPoolSize;
- return this;
+ convertText(event.getDevice(), event.getMeasurements(), event.getValues());
+ pool.insertRecord(event.getDevice(), event.getTimestamp(),
event.getMeasurements(),
+ event.getTypes(), event.getValues());
+ LOG.debug("send event successfully");
+ }
+
+ public IoTDBSink<IN> withBatchSize(int batchSize) {
+ Preconditions.checkArgument(batchSize >= 0);
+ this.batchSize = batchSize;
+ return this;
+ }
+
+ public IoTDBSink<IN> withFlushIntervalMs(int flushIntervalMs) {
+ Preconditions.checkArgument(flushIntervalMs > 0);
+ this.flushIntervalMs = flushIntervalMs;
+ return this;
+ }
+
+ public IoTDBSink<IN> withSessionPoolSize(int sessionPoolSize) {
+ Preconditions.checkArgument(sessionPoolSize > 0);
+ this.sessionPoolSize = sessionPoolSize;
+ return this;
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (pool != null) {
+ try {
+ flush();
+ } catch (Exception e) {
+ LOG.error("flush error", e);
+ }
+ pool.close();
}
-
- @Override
- public void close() throws Exception {
- if (pool != null) {
- try {
- flush();
- } catch (Exception e) {
- LOG.error("flush error", e);
- }
- pool.close();
- }
- if (scheduledExecutor != null) {
- scheduledExecutor.shutdown();
- }
+ if (scheduledExecutor != null) {
+ scheduledExecutor.shutdown();
}
-
- private void convertText(String device, List<String> measurements,
List<String> values) {
- if (device != null && measurements != null && values != null &&
measurements.size() == values.size()) {
- for (int i = 0; i < measurements.size(); i++) {
- String measurement = device + "." + measurements.get(i);
- IoTDBOptions.TimeseriesOption timeseriesOption =
timeseriesOptionMap.get(measurement);
- if (timeseriesOption!= null &&
TSDataType.TEXT.equals(timeseriesOption.getDataType())) {
- // The TEXT data type should be covered by " or '
- values.set(i, "'" + values.get(i) + "'");
- }
- }
+ }
+
+ private void convertText(String device, List<String> measurements,
List<Object> values) {
+ if (device != null && measurements != null && values != null &&
measurements.size() == values
+ .size()) {
+ for (int i = 0; i < measurements.size(); i++) {
+ String measurement = device + "." + measurements.get(i);
+ IoTDBOptions.TimeseriesOption timeseriesOption =
timeseriesOptionMap.get(measurement);
+ if (timeseriesOption != null &&
TSDataType.TEXT.equals(timeseriesOption.getDataType())) {
+ // The TEXT data type should be covered by " or '
+ values.set(i, "'" + values.get(i) + "'");
}
+ }
}
-
- private void flush() throws Exception {
- if (batchSize > 0) {
- synchronized (batchList) {
- if (batchList.size() > 0) {
- List<String> deviceIds = new ArrayList<>();
- List<Long> timestamps = new ArrayList<>();
- List<List<String>> measurementsList = new ArrayList<>();
- List<List<String>> valuesList = new ArrayList<>();
-
- for (Event event : batchList) {
- convertText(event.getDevice(),
event.getMeasurements(), event.getValues());
- deviceIds.add(event.getDevice());
- timestamps.add(event.getTimestamp());
- measurementsList.add(event.getMeasurements());
- valuesList.add(event.getValues());
- }
- pool.insertRecords(deviceIds, timestamps,
measurementsList, valuesList);
- LOG.debug("send event successfully");
- batchList.clear();
- }
- }
+ }
+
+ private void flush() throws Exception {
+ if (batchSize > 0) {
+ synchronized (batchList) {
+ if (batchList.size() > 0) {
+ List<String> deviceIds = new ArrayList<>();
+ List<Long> timestamps = new ArrayList<>();
+ List<List<String>> measurementsList = new ArrayList<>();
+ List<List<TSDataType>> typesList = new ArrayList<>();
+ List<List<Object>> valuesList = new ArrayList<>();
+
+ for (Event event : batchList) {
+ convertText(event.getDevice(), event.getMeasurements(),
event.getValues());
+ deviceIds.add(event.getDevice());
+ timestamps.add(event.getTimestamp());
+ measurementsList.add(event.getMeasurements());
+ typesList.add(event.getTypes());
+ valuesList.add(event.getValues());
+ }
+ pool.insertRecords(deviceIds, timestamps, measurementsList,
typesList, valuesList);
+ LOG.debug("send event successfully");
+ batchList.clear();
}
+ }
}
+ }
}
diff --git
a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java
index 7f6fe5e..930565b 100644
---
a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java
+++
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java
@@ -18,85 +18,94 @@
package org.apache.iotdb.flink;
-import com.google.common.collect.Lists;
-import org.apache.iotdb.session.pool.SessionPool;
-import org.junit.Before;
-import org.junit.Test;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import com.google.common.collect.Lists;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
+import org.apache.iotdb.session.pool.SessionPool;
+import org.junit.Before;
+import org.junit.Test;
public class IoTDBSinkBatchInsertTest {
- private IoTDBSink ioTDBSink;
- private SessionPool pool;
-
- @Before
- public void setUp() throws Exception {
- IoTDBOptions options = new IoTDBOptions();
- options.setTimeseriesOptionList(Lists.newArrayList(new
IoTDBOptions.TimeseriesOption("root.sg.D01.temperature")));
- ioTDBSink = new IoTDBSink(options, new
DefaultIoTSerializationSchema());
- ioTDBSink.withBatchSize(3);
-
- pool = mock(SessionPool.class);
- ioTDBSink.setSessionPool(pool);
- }
-
- @Test
- public void testBatchInsert() throws Exception {
- Map<String,String> tuple = new HashMap();
- tuple.put("device", "root.sg.D01");
- tuple.put("timestamp", "1581861293000");
- tuple.put("measurements", "temperature");
- tuple.put("values", "36.5");
- ioTDBSink.invoke(tuple, null);
-
- verifyZeroInteractions(pool);
-
- tuple = new HashMap();
- tuple.put("device", "root.sg.D01");
- tuple.put("timestamp", "1581861293001");
- tuple.put("measurements", "temperature");
- tuple.put("values", "37.2");
- ioTDBSink.invoke(tuple, null);
-
- verifyZeroInteractions(pool);
-
- tuple = new HashMap();
- tuple.put("device", "root.sg.D01");
- tuple.put("timestamp", "1581861293003");
- tuple.put("measurements", "temperature");
- tuple.put("values", "37.1");
- ioTDBSink.invoke(tuple, null);
-
- verify(pool).insertRecords(any(List.class), any(List.class),
any(List.class), any(List.class));
-
- tuple = new HashMap();
- tuple.put("device", "root.sg.D01");
- tuple.put("timestamp", "1581861293005");
- tuple.put("measurements", "temperature");
- tuple.put("values", "36.5");
- ioTDBSink.invoke(tuple, null);
-
- verifyZeroInteractions(pool);
- }
-
- @Test
- public void close() throws Exception {
- Map<String,String> tuple = new HashMap();
- tuple.put("device", "root.sg.D01");
- tuple.put("timestamp", "1581861293005");
- tuple.put("measurements", "temperature");
- tuple.put("values", "36.5");
- ioTDBSink.invoke(tuple, null);
- verifyZeroInteractions(pool);
-
- ioTDBSink.close();
- verify(pool).insertRecords(any(List.class), any(List.class),
any(List.class), any(List.class));
- verify(pool).close();
- }
+ private IoTDBSink ioTDBSink;
+ private SessionPool pool;
+
+ @Before
+ public void setUp() throws Exception {
+ IoTDBOptions options = new IoTDBOptions();
+ options.setTimeseriesOptionList(
+ Lists.newArrayList(new
IoTDBOptions.TimeseriesOption("root.sg.D01.temperature")));
+ ioTDBSink = new IoTDBSink(options, new DefaultIoTSerializationSchema());
+ ioTDBSink.withBatchSize(3);
+
+ pool = mock(SessionPool.class);
+ ioTDBSink.setSessionPool(pool);
+ }
+
+ @Test
+ public void testBatchInsert() throws Exception {
+ Map<String, String> tuple = new HashMap();
+ tuple.put("device", "root.sg.D01");
+ tuple.put("timestamp", "1581861293000");
+ tuple.put("measurements", "temperature");
+ tuple.put("types", "DOUBLE");
+ tuple.put("values", "36.5");
+ ioTDBSink.invoke(tuple, null);
+
+ verifyZeroInteractions(pool);
+
+ tuple = new HashMap();
+ tuple.put("device", "root.sg.D01");
+ tuple.put("timestamp", "1581861293001");
+ tuple.put("measurements", "temperature");
+ tuple.put("types", "DOUBLE");
+ tuple.put("values", "37.2");
+ ioTDBSink.invoke(tuple, null);
+
+ verifyZeroInteractions(pool);
+
+ tuple = new HashMap();
+ tuple.put("device", "root.sg.D01");
+ tuple.put("timestamp", "1581861293003");
+ tuple.put("measurements", "temperature");
+ tuple.put("types", "DOUBLE");
+ tuple.put("values", "37.1");
+ ioTDBSink.invoke(tuple, null);
+
+ verify(pool).insertRecords(any(List.class), any(List.class),
any(List.class), any(List.class),
+ any(List.class));
+
+ tuple = new HashMap();
+ tuple.put("device", "root.sg.D01");
+ tuple.put("timestamp", "1581861293005");
+ tuple.put("measurements", "temperature");
+ tuple.put("types", "DOUBLE");
+ tuple.put("values", "36.5");
+ ioTDBSink.invoke(tuple, null);
+
+ verifyZeroInteractions(pool);
+ }
+
+ @Test
+ public void close() throws Exception {
+ Map<String, String> tuple = new HashMap();
+ tuple.put("device", "root.sg.D01");
+ tuple.put("timestamp", "1581861293005");
+ tuple.put("measurements", "temperature");
+ tuple.put("types", "DOUBLE");
+ tuple.put("values", "36.5");
+ ioTDBSink.invoke(tuple, null);
+ verifyZeroInteractions(pool);
+
+ ioTDBSink.close();
+ verify(pool).insertRecords(any(List.class), any(List.class),
any(List.class), any(List.class),
+ any(List.class));
+ verify(pool).close();
+ }
}
diff --git
a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchTimerTest.java
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchTimerTest.java
index f8c7f7e..f9e2d62 100644
---
a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchTimerTest.java
+++
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchTimerTest.java
@@ -18,57 +18,61 @@
package org.apache.iotdb.flink;
-import com.google.common.collect.Lists;
-import org.apache.iotdb.session.pool.SessionPool;
-import org.junit.Before;
-import org.junit.Test;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import com.google.common.collect.Lists;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
+import org.apache.iotdb.session.pool.SessionPool;
+import org.junit.Before;
+import org.junit.Test;
public class IoTDBSinkBatchTimerTest {
- private IoTDBSink ioTDBSink;
- private SessionPool pool;
+ private IoTDBSink ioTDBSink;
+ private SessionPool pool;
- @Before
- public void setUp() throws Exception {
- IoTDBOptions options = new IoTDBOptions();
- options.setTimeseriesOptionList(Lists.newArrayList(new
IoTDBOptions.TimeseriesOption("root.sg.D01.temperature")));
- ioTDBSink = new IoTDBSink(options, new
DefaultIoTSerializationSchema());
- ioTDBSink.withBatchSize(3);
- ioTDBSink.withFlushIntervalMs(1000);
- ioTDBSink.initScheduler();
+ @Before
+ public void setUp() throws Exception {
+ IoTDBOptions options = new IoTDBOptions();
+ options.setTimeseriesOptionList(
+ Lists.newArrayList(new
IoTDBOptions.TimeseriesOption("root.sg.D01.temperature")));
+ ioTDBSink = new IoTDBSink(options, new DefaultIoTSerializationSchema());
+ ioTDBSink.withBatchSize(3);
+ ioTDBSink.withFlushIntervalMs(1000);
+ ioTDBSink.initScheduler();
- pool = mock(SessionPool.class);
- ioTDBSink.setSessionPool(pool);
- }
+ pool = mock(SessionPool.class);
+ ioTDBSink.setSessionPool(pool);
+ }
- @Test
- public void testBatchInsert() throws Exception {
- Map<String,String> tuple = new HashMap();
- tuple.put("device", "root.sg.D01");
- tuple.put("timestamp", "1581861293000");
- tuple.put("measurements", "temperature");
- tuple.put("values", "36.5");
- ioTDBSink.invoke(tuple, null);
+ @Test
+ public void testBatchInsert() throws Exception {
+ Map<String, String> tuple = new HashMap();
+ tuple.put("device", "root.sg.D01");
+ tuple.put("timestamp", "1581861293000");
+ tuple.put("measurements", "temperature");
+ tuple.put("types", "DOUBLE");
+ tuple.put("values", "36.5");
+ ioTDBSink.invoke(tuple, null);
- Thread.sleep(2500);
+ Thread.sleep(2500);
- verify(pool).insertRecords(any(List.class), any(List.class),
any(List.class), any(List.class));
+ verify(pool).insertRecords(any(List.class), any(List.class),
any(List.class), any(List.class),
+ any(List.class));
- Thread.sleep(1000);
+ Thread.sleep(1000);
- verifyZeroInteractions(pool);
- }
+ verifyZeroInteractions(pool);
+ }
- @Test
- public void close() throws Exception {
- ioTDBSink.close();
- verify(pool).close();
- }
+ @Test
+ public void close() throws Exception {
+ ioTDBSink.close();
+ verify(pool).close();
+ }
}
diff --git
a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkInsertTest.java
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkInsertTest.java
index 3bcb367..6c268f6 100644
---
a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkInsertTest.java
+++
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkInsertTest.java
@@ -18,49 +18,51 @@
package org.apache.iotdb.flink;
-import com.google.common.collect.Lists;
-import org.apache.iotdb.session.pool.SessionPool;
-import org.junit.Before;
-import org.junit.Test;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import com.google.common.collect.Lists;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
+import org.apache.iotdb.session.pool.SessionPool;
+import org.junit.Before;
+import org.junit.Test;
public class IoTDBSinkInsertTest {
- private IoTDBSink ioTDBSink;
- private SessionPool pool;
+ private IoTDBSink ioTDBSink;
+ private SessionPool pool;
- @Before
- public void setUp() throws Exception {
- IoTDBOptions options = new IoTDBOptions();
- options.setTimeseriesOptionList(Lists.newArrayList(new
IoTDBOptions.TimeseriesOption("root.sg.D01.temperature")));
- ioTDBSink = new IoTDBSink(options, new
DefaultIoTSerializationSchema());
+ @Before
+ public void setUp() throws Exception {
+ IoTDBOptions options = new IoTDBOptions();
+ options.setTimeseriesOptionList(
+ Lists.newArrayList(new
IoTDBOptions.TimeseriesOption("root.sg.D01.temperature")));
+ ioTDBSink = new IoTDBSink(options, new DefaultIoTSerializationSchema());
- pool = mock(SessionPool.class);
- ioTDBSink.setSessionPool(pool);
- }
+ pool = mock(SessionPool.class);
+ ioTDBSink.setSessionPool(pool);
+ }
- @Test
- public void testInsert() throws Exception {
- Map<String,String> tuple = new HashMap();
- tuple.put("device", "root.sg.D01");
- tuple.put("timestamp", "1581861293000");
- tuple.put("measurements", "temperature");
- tuple.put("values", "36.5");
+ @Test
+ public void testInsert() throws Exception {
+ Map<String, String> tuple = new HashMap();
+ tuple.put("device", "root.sg.D01");
+ tuple.put("timestamp", "1581861293000");
+ tuple.put("measurements", "temperature");
+ tuple.put("types", "DOUBLE");
+ tuple.put("values", "36.5");
- ioTDBSink.invoke(tuple, null);
- verify(pool).insertRecord(any(String.class), any(Long.class),
any(List.class), any(List.class));
- }
+ ioTDBSink.invoke(tuple, null);
+ verify(pool).insertRecord(any(String.class), any(Long.class),
any(List.class), any(List.class),
+ any(List.class));
+ }
- @Test
- public void close() throws Exception {
- ioTDBSink.close();
- verify(pool).close();
- }
+ @Test
+ public void close() throws Exception {
+ ioTDBSink.close();
+ verify(pool).close();
+ }
}