This is an automated email from the ASF dual-hosted git repository.
jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f7c850f [IOTDB-497] Apache Flink Connector Support (#817)
f7c850f is described below
commit f7c850f89772579f7d1b8a1589929c28754efbca
Author: Xin Wang <[email protected]>
AuthorDate: Sat Mar 7 21:20:47 2020 +0800
[IOTDB-497] Apache Flink Connector Support (#817)
---
example/flink/README.md | 30 ++++
example/flink/pom.xml | 45 +++++
.../org/apache/iotdb/flink/FlinkIoTDBSink.java | 85 ++++++++++
example/pom.xml | 1 +
flink-iotdb-connector/README.md | 58 +++++++
flink-iotdb-connector/pom.xml | 55 ++++++
.../iotdb/flink/DefaultIoTSerializationSchema.java | 99 +++++++++++
.../main/java/org/apache/iotdb/flink/Event.java | 54 ++++++
.../java/org/apache/iotdb/flink/IoTDBOptions.java | 152 +++++++++++++++++
.../java/org/apache/iotdb/flink/IoTDBSink.java | 187 +++++++++++++++++++++
.../apache/iotdb/flink/IoTSerializationSchema.java | 31 ++++
.../flink/DefaultIoTSerializationSchemaTest.java | 49 ++++++
.../iotdb/flink/IoTDBSinkBatchInsertTest.java | 102 +++++++++++
.../iotdb/flink/IoTDBSinkBatchTimerTest.java | 74 ++++++++
.../apache/iotdb/flink/IoTDBSinkInsertTest.java | 66 ++++++++
pom.xml | 4 +-
16 files changed, 1091 insertions(+), 1 deletion(-)
diff --git a/example/flink/README.md b/example/flink/README.md
new file mode 100644
index 0000000..fa3b867
--- /dev/null
+++ b/example/flink/README.md
@@ -0,0 +1,30 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+# IoTDB-Flink-Connector Example
+
+## Function
+```
+The example is to show how to send data to a IoTDB server from a Flink job.
+```
+
+## Usage
+
+* Run `org.apache.iotdb.flink.FlinkIoTDBSink.java` to launch the local iotDB
server and run the flink job on local mini cluster.
diff --git a/example/flink/pom.xml b/example/flink/pom.xml
new file mode 100644
index 0000000..efcd34d
--- /dev/null
+++ b/example/flink/pom.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-examples</artifactId>
+ <version>0.10.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <artifactId>flink-example</artifactId>
+ <name>IoTDB-Flink Examples</name>
+ <packaging>jar</packaging>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>flink-iotdb-connector</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-server</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+</project>
diff --git
a/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java
b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java
new file mode 100644
index 0000000..0d8bb2f
--- /dev/null
+++ b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.flink;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.iotdb.db.service.IoTDB;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+public class FlinkIoTDBSink {
+ public static void main(String[] args) throws Exception {
+ // launch the local iotDB server at default port: 6667
+ IoTDB.main(args);
+
+ Thread.sleep(3000);
+
+ // run the flink job on local mini cluster
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ IoTDBOptions options = new IoTDBOptions();
+ options.setHost("127.0.0.1");
+ options.setPort(6667);
+ options.setUser("root");
+ options.setPassword("root");
+ options.setStorageGroup("root.sg");
+ options.setTimeseriesOptionList(Lists.newArrayList(new
IoTDBOptions.TimeseriesOption("root.sg.d1.s1")));
+
+ IoTSerializationSchema serializationSchema = new
DefaultIoTSerializationSchema();
+ IoTDBSink ioTDBSink = new IoTDBSink(options, serializationSchema)
+ // enable batching
+ .withBatchSize(10);
+
+ env.addSource(new SensorSource())
+ .name("sensor-source")
+ .setParallelism(1)
+ .addSink(ioTDBSink)
+ .name("iotdb-sink")
+ .setParallelism(1);
+
+ env.execute("iotdb-flink-example");
+ }
+
+ private static class SensorSource implements
SourceFunction<Map<String,String>> {
+ boolean running = true;
+
+ @Override
+ public void run(SourceContext context) throws Exception {
+ Random random = new Random();
+ while (running) {
+ Map<String,String> tuple = new HashMap();
+ tuple.put("device", "root.sg.d1");
+ tuple.put("timestamp",
String.valueOf(System.currentTimeMillis()));
+ tuple.put("measurements", "s1");
+ tuple.put("values", String.valueOf(random.nextDouble()));
+
+ context.collect(tuple);
+ Thread.sleep(1000);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ }
+}
diff --git a/example/pom.xml b/example/pom.xml
index 88c6bf0..283f2d8 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -40,6 +40,7 @@
<module>tsfile</module>
<module>jdbc</module>
<module>hadoop</module>
+ <module>flink</module>
</modules>
<build>
<pluginManagement>
diff --git a/flink-iotdb-connector/README.md b/flink-iotdb-connector/README.md
new file mode 100644
index 0000000..b88872f
--- /dev/null
+++ b/flink-iotdb-connector/README.md
@@ -0,0 +1,58 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+# IoTDB-Flink-Connector
+
+IoTDB integration for [Apache Flink](https://flink.apache.org/). This module
includes the iotdb sink that allows a flink job to write events into timeseries.
+
+## IoTDBSink
+To use the `IoTDBSink`, you need construct an instance of it by specifying
`IoTDBOptions` and `IoTSerializationSchema` instances.
+The `IoTDBSink` send only one event after another by default, but you can
change to batch by invoking `withBatchSize(int)`.
+
+## Examples
+The following is an example which receiving events from sensor source and then
sending events to iotdb.
+
+ ```java
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ IoTDBOptions options = new IoTDBOptions();
+ options.setHost("127.0.0.1");
+ options.setPort(6667);
+ options.setUser("root");
+ options.setPassword("root");
+ options.setStorageGroup("root.sg");
+ options.setTimeseries(Lists.newArrayList("root.sg.d1.s1"));
+
+ IoTSerializationSchema serializationSchema = new
DefaultIoTSerializationSchema();
+ IoTDBSink ioTDBSink = new IoTDBSink(options, serializationSchema)
+ // enable batching
+ .withBatchSize(10)
+ ;
+
+ env.addSource(new SensorSource())
+ .name("sensor-source")
+ .setParallelism(1)
+ .addSink(ioTDBSink)
+ .name("iotdb-sink")
+ .setParallelism(1)
+ ;
+
+ env.execute("iotdb-flink-example");
+ ```
diff --git a/flink-iotdb-connector/pom.xml b/flink-iotdb-connector/pom.xml
new file mode 100644
index 0000000..2538931
--- /dev/null
+++ b/flink-iotdb-connector/pom.xml
@@ -0,0 +1,55 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-parent</artifactId>
+ <version>0.10.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <artifactId>flink-iotdb-connector</artifactId>
+ <packaging>jar</packaging>
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <compile.version>1.8</compile.version>
+ <flink.version>1.10.0</flink.version>
+ <scala.binary.version>2.11</scala.binary.version>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-session</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git
a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/DefaultIoTSerializationSchema.java
b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/DefaultIoTSerializationSchema.java
new file mode 100644
index 0000000..4ac596b
--- /dev/null
+++
b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/DefaultIoTSerializationSchema.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.flink;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @inheritDoc
+ * The default implementation of IoTSerializationSchema. Gets info from a map
struct.
+ */
+public class DefaultIoTSerializationSchema implements
IoTSerializationSchema<Map<String,String>> {
+ private String fieldDevice = "device";
+ private String fieldTimestamp = "timestamp";
+ private String fieldMeasurements = "measurements";
+ private String fieldValues = "values";
+ private String separator = ",";
+
+ @Override
+ public Event serialize(Map<String,String> tuple) {
+ if (tuple == null) {
+ return null;
+ }
+
+ String device = tuple.get(fieldDevice);
+
+ String ts = tuple.get(fieldTimestamp);
+ Long timestamp = ts == null ? System.currentTimeMillis() :
Long.parseLong(ts);
+
+ List<String> measurements = null;
+ if (tuple.get(fieldMeasurements) != null) {
+ measurements =
Arrays.asList(tuple.get(fieldMeasurements).split(separator));
+ }
+
+ List<String> values = null;
+ if (tuple.get(fieldValues) != null) {
+ values = Arrays.asList(tuple.get(fieldValues).split(separator));
+ }
+
+ return new Event(device, timestamp, measurements, values);
+ }
+
+ public String getFieldDevice() {
+ return fieldDevice;
+ }
+
+ public void setFieldDevice(String fieldDevice) {
+ this.fieldDevice = fieldDevice;
+ }
+
+ public String getFieldTimestamp() {
+ return fieldTimestamp;
+ }
+
+ public void setFieldTimestamp(String fieldTimestamp) {
+ this.fieldTimestamp = fieldTimestamp;
+ }
+
+ public String getFieldMeasurements() {
+ return fieldMeasurements;
+ }
+
+ public void setFieldMeasurements(String fieldMeasurements) {
+ this.fieldMeasurements = fieldMeasurements;
+ }
+
+ public String getFieldValues() {
+ return fieldValues;
+ }
+
+ public void setFieldValues(String fieldValues) {
+ this.fieldValues = fieldValues;
+ }
+
+ public String getSeparator() {
+ return separator;
+ }
+
+ public void setSeparator(String separator) {
+ this.separator = separator;
+ }
+}
diff --git
a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/Event.java
b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/Event.java
new file mode 100644
index 0000000..6cccb23
--- /dev/null
+++ b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/Event.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.flink;
+
+import java.util.List;
+
+/**
+ * Event serializes the device/sensor related data, such as time, measurements
etc.
+ */
+public class Event {
+ private String device;
+ private Long timestamp;
+ private List<String> measurements;
+ private List<String> values;
+
+ public Event(String device, Long timestamp, List<String> measurements,
List<String> values) {
+ this.device = device;
+ this.timestamp = timestamp;
+ this.measurements = measurements;
+ this.values = values;
+ }
+
+ public String getDevice() {
+ return device;
+ }
+
+ public Long getTimestamp() {
+ return timestamp;
+ }
+
+ public List<String> getMeasurements() {
+ return measurements;
+ }
+
+ public List<String> getValues() {
+ return values;
+ }
+}
diff --git
a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBOptions.java
b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBOptions.java
new file mode 100644
index 0000000..f2038ff
--- /dev/null
+++
b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBOptions.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.flink;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * IoTDBOptions describes the configuration related information for IoTDB and
timeseries.
+ */
+public class IoTDBOptions implements Serializable {
+ private String host;
+ private int port;
+ private String user;
+ private String password;
+ private String storageGroup;
+ private List<TimeseriesOption> timeseriesOptionList;
+
+ public IoTDBOptions() {
+ }
+
+ public IoTDBOptions(String host, int port, String user, String password,
+ String storageGroup, List<TimeseriesOption>
timeseriesOptionList) {
+ this.host = host;
+ this.port = port;
+ this.user = user;
+ this.password = password;
+ this.storageGroup = storageGroup;
+ this.timeseriesOptionList = timeseriesOptionList;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public String getStorageGroup() {
+ return storageGroup;
+ }
+
+ public void setStorageGroup(String storageGroup) {
+ this.storageGroup = storageGroup;
+ }
+
+ public List<TimeseriesOption> getTimeseriesOptionList() {
+ return timeseriesOptionList;
+ }
+
+ public void setTimeseriesOptionList(List<TimeseriesOption>
timeseriesOptionList) {
+ this.timeseriesOptionList = timeseriesOptionList;
+ }
+
+ public static class TimeseriesOption implements Serializable {
+ private String path;
+ private TSDataType dataType = TSDataType.TEXT;
+ private TSEncoding encoding = TSEncoding.PLAIN;
+ private CompressionType compressor = CompressionType.SNAPPY;
+
+ public TimeseriesOption() {
+ }
+
+ public TimeseriesOption(String path) {
+ this.path = path;
+ }
+
+ public TimeseriesOption(String path, TSDataType dataType, TSEncoding
encoding, CompressionType compressor) {
+ this.path = path;
+ this.dataType = dataType;
+ this.encoding = encoding;
+ this.compressor = compressor;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ public TSDataType getDataType() {
+ return dataType;
+ }
+
+ public void setDataType(TSDataType dataType) {
+ this.dataType = dataType;
+ }
+
+ public TSEncoding getEncoding() {
+ return encoding;
+ }
+
+ public void setEncoding(TSEncoding encoding) {
+ this.encoding = encoding;
+ }
+
+ public CompressionType getCompressor() {
+ return compressor;
+ }
+
+ public void setCompressor(CompressionType compressor) {
+ this.compressor = compressor;
+ }
+ }
+}
diff --git
a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
new file mode 100644
index 0000000..1f405d6
--- /dev/null
+++ b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.flink;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The `IoTDBSink` allows flink jobs to write events into IoTDB timeseries.
+ * By default send only one event after another, but you can change to batch
by invoking `withBatchSize(int)`.
+ * @param <IN> the input data type
+ */
+public class IoTDBSink<IN> extends RichSinkFunction<IN> {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG = LoggerFactory.getLogger(IoTDBSink.class);
+
+ private IoTDBOptions options;
+ private IoTSerializationSchema<IN> serializationSchema;
+ private Map<String, IoTDBOptions.TimeseriesOption> timeseriesOptionMap;
+ private transient Session session;
+ private transient ScheduledExecutorService scheduledExecutor;
+
+ private int batchSize = 0;
+ private int flushIntervalMs = 3000;
+ private List<Event> batchList;
+
+ public IoTDBSink(IoTDBOptions options, IoTSerializationSchema<IN> schema) {
+ this.options = options;
+ this.serializationSchema = schema;
+ this.batchList = new LinkedList<>();
+ this.timeseriesOptionMap = new HashMap<>();
+ for (IoTDBOptions.TimeseriesOption timeseriesOption :
options.getTimeseriesOptionList()) {
+ timeseriesOptionMap.put(timeseriesOption.getPath(),
timeseriesOption);
+ }
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ initSession();
+ initScheduler();
+ }
+
+ void initSession() throws Exception {
+ session = new Session(options.getHost(), options.getPort(),
options.getUser(), options.getPassword());
+ session.open();
+
+ session.setStorageGroup(options.getStorageGroup());
+ for (IoTDBOptions.TimeseriesOption option :
options.getTimeseriesOptionList()) {
+ if (!session.checkTimeseriesExists(option.getPath())) {
+ session.createTimeseries(option.getPath(),
option.getDataType(), option.getEncoding(), option.getCompressor());
+ }
+ }
+ }
+
+ void initScheduler() {
+ if (batchSize > 0) {
+ scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
+ scheduledExecutor.scheduleAtFixedRate(() -> {
+ try {
+ flush();
+ } catch (Exception e) {
+ LOG.error("flush error", e);
+ }
+ }, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ // for testing
+ void setSession(Session session) {
+ this.session = session;
+ }
+
+ @Override
+ public void invoke(IN input, Context context) throws Exception {
+ Event event = serializationSchema.serialize(input);
+ if (event == null) {
+ return;
+ }
+
+ if (batchSize > 0) {
+ synchronized (batchList) {
+ batchList.add(event);
+ if (batchList.size() >= batchSize) {
+ flush();
+ }
+ return;
+ }
+ }
+
+ convertText(event.getDevice(), event.getMeasurements(),
event.getValues());
+ TSStatus status = session.insert(event.getDevice(),
event.getTimestamp(),
+ event.getMeasurements(), event.getValues());
+ LOG.debug("send event result: {}", status);
+ }
+
+ public IoTDBSink<IN> withBatchSize(int batchSize) {
+ Preconditions.checkArgument(batchSize >= 0);
+ this.batchSize = batchSize;
+ return this;
+ }
+
+ public IoTDBSink<IN> withFlushIntervalMs(int flushIntervalMs) {
+ Preconditions.checkArgument(flushIntervalMs > 0);
+ this.flushIntervalMs = flushIntervalMs;
+ return this;
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (session != null) {
+ try {
+ flush();
+ } catch (Exception e) {
+ LOG.error("flush error", e);
+ }
+ session.close();
+ }
+ if (scheduledExecutor != null) {
+ scheduledExecutor.shutdown();
+ }
+ }
+
+ private void convertText(String device, List<String> measurements,
List<String> values) {
+ if (device != null && measurements != null && values != null &&
measurements.size() == values.size()) {
+ for (int i = 0; i < measurements.size(); i++) {
+ String measurement = device + "." + measurements.get(i);
+ IoTDBOptions.TimeseriesOption timeseriesOption =
timeseriesOptionMap.get(measurement);
+ if (timeseriesOption!= null &&
TSDataType.TEXT.equals(timeseriesOption.getDataType())) {
+ // The TEXT data type should be covered by " or '
+ values.set(i, "'" + values.get(i) + "'");
+ }
+ }
+ }
+ }
+
+ private void flush() throws Exception {
+ if (batchSize > 0) {
+ synchronized (batchList) {
+ if (batchList.size() > 0) {
+ List<String> deviceIds = new ArrayList<>();
+ List<Long> timestamps = new ArrayList<>();
+ List<List<String>> measurementsList = new ArrayList<>();
+ List<List<String>> valuesList = new ArrayList<>();
+
+ for (Event event : batchList) {
+ convertText(event.getDevice(),
event.getMeasurements(), event.getValues());
+ deviceIds.add(event.getDevice());
+ timestamps.add(event.getTimestamp());
+ measurementsList.add(event.getMeasurements());
+ valuesList.add(event.getValues());
+ }
+ List<TSStatus> statusList =
session.insertInBatch(deviceIds, timestamps, measurementsList, valuesList);
+ LOG.debug("send events result: {}", statusList);
+ batchList.clear();
+ }
+ }
+ }
+ }
+}
diff --git
a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTSerializationSchema.java
b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTSerializationSchema.java
new file mode 100644
index 0000000..e0b72d4
--- /dev/null
+++
b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTSerializationSchema.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.flink;
+
+import java.io.Serializable;
+
+/**
+ * IoTSerializationSchema serializes the input tuple data into events for
inserting into IoTDB server.
+ * @param <T> the input data type
+ */
+public interface IoTSerializationSchema<T> extends Serializable {
+
+ Event serialize(T tuple);
+
+}
diff --git
a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/DefaultIoTSerializationSchemaTest.java
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/DefaultIoTSerializationSchemaTest.java
new file mode 100644
index 0000000..efb5a34
--- /dev/null
+++
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/DefaultIoTSerializationSchemaTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.flink;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+public class DefaultIoTSerializationSchemaTest {
+
+ @Test
+ public void serialize() {
+ IoTDBOptions options = new IoTDBOptions();
+ options.setTimeseriesOptionList(Lists.newArrayList(new
IoTDBOptions.TimeseriesOption("root.sg.D01.temperature")));
+ DefaultIoTSerializationSchema serializationSchema = new
DefaultIoTSerializationSchema();
+
+ Map<String,String> tuple = new HashMap();
+ tuple.put("device", "root.sg.D01");
+ tuple.put("timestamp", "1581861293000");
+ tuple.put("measurements", "temperature");
+ tuple.put("values", "36.5");
+
+ Event event = serializationSchema.serialize(tuple);
+ assertEquals(tuple.get("device"), event.getDevice());
+ assertEquals(tuple.get("timestamp"),
String.valueOf(event.getTimestamp()));
+ assertEquals(tuple.get("measurements"),
event.getMeasurements().get(0));
+ assertEquals(tuple.get("values"), event.getValues().get(0));
+ }
+}
diff --git
a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java
new file mode 100644
index 0000000..3692377
--- /dev/null
+++
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.flink;
+
+import com.google.common.collect.Lists;
+import org.apache.iotdb.session.Session;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+public class IoTDBSinkBatchInsertTest {
+
+ private IoTDBSink ioTDBSink;
+ private Session session;
+
+ @Before
+ public void setUp() throws Exception {
+ IoTDBOptions options = new IoTDBOptions();
+ options.setTimeseriesOptionList(Lists.newArrayList(new
IoTDBOptions.TimeseriesOption("root.sg.D01.temperature")));
+ ioTDBSink = new IoTDBSink(options, new
DefaultIoTSerializationSchema());
+ ioTDBSink.withBatchSize(3);
+
+ session = mock(Session.class);
+ ioTDBSink.setSession(session);
+ }
+
+ @Test
+ public void testBatchInsert() throws Exception {
+ Map<String,String> tuple = new HashMap();
+ tuple.put("device", "root.sg.D01");
+ tuple.put("timestamp", "1581861293000");
+ tuple.put("measurements", "temperature");
+ tuple.put("values", "36.5");
+ ioTDBSink.invoke(tuple, null);
+
+ verifyZeroInteractions(session);
+
+ tuple = new HashMap();
+ tuple.put("device", "root.sg.D01");
+ tuple.put("timestamp", "1581861293001");
+ tuple.put("measurements", "temperature");
+ tuple.put("values", "37.2");
+ ioTDBSink.invoke(tuple, null);
+
+ verifyZeroInteractions(session);
+
+ tuple = new HashMap();
+ tuple.put("device", "root.sg.D01");
+ tuple.put("timestamp", "1581861293003");
+ tuple.put("measurements", "temperature");
+ tuple.put("values", "37.1");
+ ioTDBSink.invoke(tuple, null);
+
+ verify(session).insertInBatch(any(List.class), any(List.class),
any(List.class), any(List.class));
+
+ tuple = new HashMap();
+ tuple.put("device", "root.sg.D01");
+ tuple.put("timestamp", "1581861293005");
+ tuple.put("measurements", "temperature");
+ tuple.put("values", "36.5");
+ ioTDBSink.invoke(tuple, null);
+
+ verifyZeroInteractions(session);
+ }
+
+ @Test
+ public void close() throws Exception {
+ Map<String,String> tuple = new HashMap();
+ tuple.put("device", "root.sg.D01");
+ tuple.put("timestamp", "1581861293005");
+ tuple.put("measurements", "temperature");
+ tuple.put("values", "36.5");
+ ioTDBSink.invoke(tuple, null);
+ verifyZeroInteractions(session);
+
+ ioTDBSink.close();
+ verify(session).insertInBatch(any(List.class), any(List.class),
any(List.class), any(List.class));
+ verify(session).close();
+ }
+}
diff --git
a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchTimerTest.java
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchTimerTest.java
new file mode 100644
index 0000000..226e798
--- /dev/null
+++
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchTimerTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.flink;
+
+import com.google.common.collect.Lists;
+import org.apache.iotdb.session.Session;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+public class IoTDBSinkBatchTimerTest {
+
+ private IoTDBSink ioTDBSink;
+ private Session session;
+
+ @Before
+ public void setUp() throws Exception {
+ IoTDBOptions options = new IoTDBOptions();
+ options.setTimeseriesOptionList(Lists.newArrayList(new
IoTDBOptions.TimeseriesOption("root.sg.D01.temperature")));
+ ioTDBSink = new IoTDBSink(options, new
DefaultIoTSerializationSchema());
+ ioTDBSink.withBatchSize(3);
+ ioTDBSink.withFlushIntervalMs(1000);
+ ioTDBSink.initScheduler();
+
+ session = mock(Session.class);
+ ioTDBSink.setSession(session);
+ }
+
+ @Test
+ public void testBatchInsert() throws Exception {
+ Map<String,String> tuple = new HashMap();
+ tuple.put("device", "root.sg.D01");
+ tuple.put("timestamp", "1581861293000");
+ tuple.put("measurements", "temperature");
+ tuple.put("values", "36.5");
+ ioTDBSink.invoke(tuple, null);
+
+ Thread.sleep(2500);
+
+ verify(session).insertInBatch(any(List.class), any(List.class),
any(List.class), any(List.class));
+
+ Thread.sleep(1000);
+
+ verifyZeroInteractions(session);
+ }
+
+ @Test
+ public void close() throws Exception {
+ ioTDBSink.close();
+ verify(session).close();
+ }
+}
diff --git
a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkInsertTest.java
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkInsertTest.java
new file mode 100644
index 0000000..06fe7f3
--- /dev/null
+++
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkInsertTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.flink;
+
+import com.google.common.collect.Lists;
+import org.apache.iotdb.session.Session;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class IoTDBSinkInsertTest {
+
+ private IoTDBSink ioTDBSink;
+ private Session session;
+
+ @Before
+ public void setUp() throws Exception {
+ IoTDBOptions options = new IoTDBOptions();
+ options.setTimeseriesOptionList(Lists.newArrayList(new
IoTDBOptions.TimeseriesOption("root.sg.D01.temperature")));
+ ioTDBSink = new IoTDBSink(options, new
DefaultIoTSerializationSchema());
+
+ session = mock(Session.class);
+ ioTDBSink.setSession(session);
+ }
+
+ @Test
+ public void testInsert() throws Exception {
+ Map<String,String> tuple = new HashMap();
+ tuple.put("device", "root.sg.D01");
+ tuple.put("timestamp", "1581861293000");
+ tuple.put("measurements", "temperature");
+ tuple.put("values", "36.5");
+
+ ioTDBSink.invoke(tuple, null);
+ verify(session).insert(any(String.class), any(Long.class),
any(List.class), any(List.class));
+ }
+
+ @Test
+ public void close() throws Exception {
+ ioTDBSink.close();
+ verify(session).close();
+ }
+}
diff --git a/pom.xml b/pom.xml
index 8f754f1..b3d7f46 100644
--- a/pom.xml
+++ b/pom.xml
@@ -56,6 +56,7 @@
<module>spark-tsfile</module>
<module>hadoop</module>
<module>spark-iotdb-connector</module>
+ <module>flink-iotdb-connector</module>
<module>distribution</module>
<module>hive-connector</module>
</modules>
@@ -84,6 +85,7 @@
<common.lang.version>2.6</common.lang.version>
<common.lang3.version>3.8.1</common.lang3.version>
<common.logging.version>1.1.3</common.logging.version>
+ <guava.version>21.0</guava.version>
<jline.version>2.14.5</jline.version>
<jetty.version>9.4.24.v20191120</jetty.version>
<metrics.version>3.2.6</metrics.version>
@@ -150,7 +152,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
- <version>21.0</version>
+ <version>${guava.version}</version>
</dependency>
<dependency>
<groupId>com.sun.istack</groupId>