This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new f7c850f  [IOTDB-497] Apache Flink Connector Support (#817)
f7c850f is described below

commit f7c850f89772579f7d1b8a1589929c28754efbca
Author: Xin Wang <[email protected]>
AuthorDate: Sat Mar 7 21:20:47 2020 +0800

    [IOTDB-497] Apache Flink Connector Support (#817)
---
 example/flink/README.md                            |  30 ++++
 example/flink/pom.xml                              |  45 +++++
 .../org/apache/iotdb/flink/FlinkIoTDBSink.java     |  85 ++++++++++
 example/pom.xml                                    |   1 +
 flink-iotdb-connector/README.md                    |  58 +++++++
 flink-iotdb-connector/pom.xml                      |  55 ++++++
 .../iotdb/flink/DefaultIoTSerializationSchema.java |  99 +++++++++++
 .../main/java/org/apache/iotdb/flink/Event.java    |  54 ++++++
 .../java/org/apache/iotdb/flink/IoTDBOptions.java  | 152 +++++++++++++++++
 .../java/org/apache/iotdb/flink/IoTDBSink.java     | 187 +++++++++++++++++++++
 .../apache/iotdb/flink/IoTSerializationSchema.java |  31 ++++
 .../flink/DefaultIoTSerializationSchemaTest.java   |  49 ++++++
 .../iotdb/flink/IoTDBSinkBatchInsertTest.java      | 102 +++++++++++
 .../iotdb/flink/IoTDBSinkBatchTimerTest.java       |  74 ++++++++
 .../apache/iotdb/flink/IoTDBSinkInsertTest.java    |  66 ++++++++
 pom.xml                                            |   4 +-
 16 files changed, 1091 insertions(+), 1 deletion(-)

diff --git a/example/flink/README.md b/example/flink/README.md
new file mode 100644
index 0000000..fa3b867
--- /dev/null
+++ b/example/flink/README.md
@@ -0,0 +1,30 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+# IoTDB-Flink-Connector Example
+
+## Function
+```
+The example is to show how to send data to a IoTDB server from a Flink job.
+```
+
+## Usage
+
+* Run `org.apache.iotdb.flink.FlinkIoTDBSink.java` to launch the local iotDB 
server and run the flink job on local mini cluster.
diff --git a/example/flink/pom.xml b/example/flink/pom.xml
new file mode 100644
index 0000000..efcd34d
--- /dev/null
+++ b/example/flink/pom.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.iotdb</groupId>
+        <artifactId>iotdb-examples</artifactId>
+        <version>0.10.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <artifactId>flink-example</artifactId>
+    <name>IoTDB-Flink Examples</name>
+    <packaging>jar</packaging>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>flink-iotdb-connector</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-server</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+</project>
diff --git 
a/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java 
b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java
new file mode 100644
index 0000000..0d8bb2f
--- /dev/null
+++ b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.flink;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.iotdb.db.service.IoTDB;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+public class FlinkIoTDBSink {
+    public static void main(String[] args) throws Exception {
+        // launch the local iotDB server at default port: 6667
+        IoTDB.main(args);
+
+        Thread.sleep(3000);
+
+        // run the flink job on local mini cluster
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+        IoTDBOptions options = new IoTDBOptions();
+        options.setHost("127.0.0.1");
+        options.setPort(6667);
+        options.setUser("root");
+        options.setPassword("root");
+        options.setStorageGroup("root.sg");
+        options.setTimeseriesOptionList(Lists.newArrayList(new 
IoTDBOptions.TimeseriesOption("root.sg.d1.s1")));
+
+        IoTSerializationSchema serializationSchema = new 
DefaultIoTSerializationSchema();
+        IoTDBSink ioTDBSink = new IoTDBSink(options, serializationSchema)
+                // enable batching
+                .withBatchSize(10);
+
+        env.addSource(new SensorSource())
+                .name("sensor-source")
+                .setParallelism(1)
+                .addSink(ioTDBSink)
+                .name("iotdb-sink")
+                .setParallelism(1);
+
+        env.execute("iotdb-flink-example");
+    }
+
+    private static class SensorSource implements 
SourceFunction<Map<String,String>> {
+        boolean running = true;
+
+        @Override
+        public void run(SourceContext context) throws Exception {
+            Random random = new Random();
+            while (running) {
+                Map<String,String> tuple = new HashMap();
+                tuple.put("device", "root.sg.d1");
+                tuple.put("timestamp", 
String.valueOf(System.currentTimeMillis()));
+                tuple.put("measurements", "s1");
+                tuple.put("values", String.valueOf(random.nextDouble()));
+
+                context.collect(tuple);
+                Thread.sleep(1000);
+            }
+        }
+
+        @Override
+        public void cancel() {
+            running = false;
+        }
+    }
+}
diff --git a/example/pom.xml b/example/pom.xml
index 88c6bf0..283f2d8 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -40,6 +40,7 @@
         <module>tsfile</module>
         <module>jdbc</module>
         <module>hadoop</module>
+        <module>flink</module>
     </modules>
     <build>
         <pluginManagement>
diff --git a/flink-iotdb-connector/README.md b/flink-iotdb-connector/README.md
new file mode 100644
index 0000000..b88872f
--- /dev/null
+++ b/flink-iotdb-connector/README.md
@@ -0,0 +1,58 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+# IoTDB-Flink-Connector
+
+IoTDB integration for [Apache Flink](https://flink.apache.org/). This module 
includes the iotdb sink that allows a flink job to write events into timeseries.
+
+## IoTDBSink
+To use the `IoTDBSink`,  you need construct an instance of it by specifying 
`IoTDBOptions` and `IoTSerializationSchema` instances.
+The `IoTDBSink` send only one event after another by default, but you can 
change to batch by invoking `withBatchSize(int)`. 
+
+## Examples
+The following is an example which receiving events from sensor source and then 
sending events to iotdb.
+
+ ```java
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+        IoTDBOptions options = new IoTDBOptions();
+        options.setHost("127.0.0.1");
+        options.setPort(6667);
+        options.setUser("root");
+        options.setPassword("root");
+        options.setStorageGroup("root.sg");
+        options.setTimeseries(Lists.newArrayList("root.sg.d1.s1"));
+
+        IoTSerializationSchema serializationSchema = new 
DefaultIoTSerializationSchema();
+        IoTDBSink ioTDBSink = new IoTDBSink(options, serializationSchema)
+                // enable batching
+                .withBatchSize(10)
+                ;
+
+        env.addSource(new SensorSource())
+                .name("sensor-source")
+                .setParallelism(1)
+                .addSink(ioTDBSink)
+                .name("iotdb-sink")
+                .setParallelism(1)
+        ;
+
+        env.execute("iotdb-flink-example");
+ ```
diff --git a/flink-iotdb-connector/pom.xml b/flink-iotdb-connector/pom.xml
new file mode 100644
index 0000000..2538931
--- /dev/null
+++ b/flink-iotdb-connector/pom.xml
@@ -0,0 +1,55 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.iotdb</groupId>
+        <artifactId>iotdb-parent</artifactId>
+        <version>0.10.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <artifactId>flink-iotdb-connector</artifactId>
+    <packaging>jar</packaging>
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <compile.version>1.8</compile.version>
+        <flink.version>1.10.0</flink.version>
+        <scala.binary.version>2.11</scala.binary.version>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-session</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-module-junit4</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git 
a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/DefaultIoTSerializationSchema.java
 
b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/DefaultIoTSerializationSchema.java
new file mode 100644
index 0000000..4ac596b
--- /dev/null
+++ 
b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/DefaultIoTSerializationSchema.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.flink;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @inheritDoc
+ * The default implementation of IoTSerializationSchema. Gets info from a map 
struct.
+ */
+public class DefaultIoTSerializationSchema implements 
IoTSerializationSchema<Map<String,String>> {
+    private String fieldDevice = "device";
+    private String fieldTimestamp = "timestamp";
+    private String fieldMeasurements = "measurements";
+    private String fieldValues = "values";
+    private String separator = ",";
+
+    @Override
+    public Event serialize(Map<String,String> tuple) {
+        if (tuple == null) {
+            return null;
+        }
+
+        String device = tuple.get(fieldDevice);
+
+        String ts = tuple.get(fieldTimestamp);
+        Long timestamp = ts == null ? System.currentTimeMillis() : 
Long.parseLong(ts);
+
+        List<String> measurements = null;
+        if (tuple.get(fieldMeasurements) != null) {
+            measurements = 
Arrays.asList(tuple.get(fieldMeasurements).split(separator));
+        }
+
+        List<String> values = null;
+        if (tuple.get(fieldValues) != null) {
+            values = Arrays.asList(tuple.get(fieldValues).split(separator));
+        }
+
+        return new Event(device, timestamp, measurements, values);
+    }
+
+    public String getFieldDevice() {
+        return fieldDevice;
+    }
+
+    public void setFieldDevice(String fieldDevice) {
+        this.fieldDevice = fieldDevice;
+    }
+
+    public String getFieldTimestamp() {
+        return fieldTimestamp;
+    }
+
+    public void setFieldTimestamp(String fieldTimestamp) {
+        this.fieldTimestamp = fieldTimestamp;
+    }
+
+    public String getFieldMeasurements() {
+        return fieldMeasurements;
+    }
+
+    public void setFieldMeasurements(String fieldMeasurements) {
+        this.fieldMeasurements = fieldMeasurements;
+    }
+
+    public String getFieldValues() {
+        return fieldValues;
+    }
+
+    public void setFieldValues(String fieldValues) {
+        this.fieldValues = fieldValues;
+    }
+
+    public String getSeparator() {
+        return separator;
+    }
+
+    public void setSeparator(String separator) {
+        this.separator = separator;
+    }
+}
diff --git 
a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/Event.java 
b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/Event.java
new file mode 100644
index 0000000..6cccb23
--- /dev/null
+++ b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/Event.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.flink;
+
+import java.util.List;
+
+/**
+ * Event serializes the device/sensor related data, such as time, measurements 
etc.
+ */
+public class Event {
+    private String device;
+    private Long timestamp;
+    private List<String> measurements;
+    private List<String> values;
+
+    public Event(String device, Long timestamp, List<String> measurements, 
List<String> values) {
+        this.device = device;
+        this.timestamp = timestamp;
+        this.measurements = measurements;
+        this.values = values;
+    }
+
+    public String getDevice() {
+        return device;
+    }
+
+    public Long getTimestamp() {
+        return timestamp;
+    }
+
+    public List<String> getMeasurements() {
+        return measurements;
+    }
+
+    public List<String> getValues() {
+        return values;
+    }
+}
diff --git 
a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBOptions.java 
b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBOptions.java
new file mode 100644
index 0000000..f2038ff
--- /dev/null
+++ 
b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBOptions.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.flink;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * IoTDBOptions describes the configuration related information for IoTDB and 
timeseries.
+ */
+public class IoTDBOptions implements Serializable {
+    private String host;
+    private int port;
+    private String user;
+    private String password;
+    private String storageGroup;
+    private List<TimeseriesOption> timeseriesOptionList;
+
+    public IoTDBOptions() {
+    }
+
+    public IoTDBOptions(String host, int port, String user, String password,
+                        String storageGroup, List<TimeseriesOption> 
timeseriesOptionList) {
+        this.host = host;
+        this.port = port;
+        this.user = user;
+        this.password = password;
+        this.storageGroup = storageGroup;
+        this.timeseriesOptionList = timeseriesOptionList;
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public void setHost(String host) {
+        this.host = host;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public void setPort(int port) {
+        this.port = port;
+    }
+
+    public String getUser() {
+        return user;
+    }
+
+    public void setUser(String user) {
+        this.user = user;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    public String getStorageGroup() {
+        return storageGroup;
+    }
+
+    public void setStorageGroup(String storageGroup) {
+        this.storageGroup = storageGroup;
+    }
+
+    public List<TimeseriesOption> getTimeseriesOptionList() {
+        return timeseriesOptionList;
+    }
+
+    public void setTimeseriesOptionList(List<TimeseriesOption> 
timeseriesOptionList) {
+        this.timeseriesOptionList = timeseriesOptionList;
+    }
+
+    public static class TimeseriesOption implements Serializable {
+        private String path;
+        private TSDataType dataType = TSDataType.TEXT;
+        private TSEncoding encoding = TSEncoding.PLAIN;
+        private CompressionType compressor = CompressionType.SNAPPY;
+
+        public TimeseriesOption() {
+        }
+
+        public TimeseriesOption(String path) {
+            this.path = path;
+        }
+
+        public TimeseriesOption(String path, TSDataType dataType, TSEncoding 
encoding, CompressionType compressor) {
+            this.path = path;
+            this.dataType = dataType;
+            this.encoding = encoding;
+            this.compressor = compressor;
+        }
+
+        public String getPath() {
+            return path;
+        }
+
+        public void setPath(String path) {
+            this.path = path;
+        }
+
+        public TSDataType getDataType() {
+            return dataType;
+        }
+
+        public void setDataType(TSDataType dataType) {
+            this.dataType = dataType;
+        }
+
+        public TSEncoding getEncoding() {
+            return encoding;
+        }
+
+        public void setEncoding(TSEncoding encoding) {
+            this.encoding = encoding;
+        }
+
+        public CompressionType getCompressor() {
+            return compressor;
+        }
+
+        public void setCompressor(CompressionType compressor) {
+            this.compressor = compressor;
+        }
+    }
+}
diff --git 
a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java 
b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
new file mode 100644
index 0000000..1f405d6
--- /dev/null
+++ b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.flink;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The `IoTDBSink` allows flink jobs to write events into IoTDB timeseries.
+ * By default send only one event after another, but you can change to batch 
by invoking `withBatchSize(int)`.
+ * @param <IN> the input data type
+ */
+public class IoTDBSink<IN> extends RichSinkFunction<IN> {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOG = LoggerFactory.getLogger(IoTDBSink.class);
+
+    private IoTDBOptions options;
+    private IoTSerializationSchema<IN> serializationSchema;
+    private Map<String, IoTDBOptions.TimeseriesOption> timeseriesOptionMap;
+    private transient Session session;
+    private transient ScheduledExecutorService scheduledExecutor;
+
+    private int batchSize = 0;
+    private int flushIntervalMs = 3000;
+    private List<Event> batchList;
+
+    public IoTDBSink(IoTDBOptions options, IoTSerializationSchema<IN> schema) {
+        this.options = options;
+        this.serializationSchema = schema;
+        this.batchList = new LinkedList<>();
+        this.timeseriesOptionMap = new HashMap<>();
+        for (IoTDBOptions.TimeseriesOption timeseriesOption : 
options.getTimeseriesOptionList()) {
+            timeseriesOptionMap.put(timeseriesOption.getPath(), 
timeseriesOption);
+        }
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        initSession();
+        initScheduler();
+    }
+
+    void initSession() throws Exception {
+        session = new Session(options.getHost(), options.getPort(), 
options.getUser(), options.getPassword());
+        session.open();
+
+        session.setStorageGroup(options.getStorageGroup());
+        for (IoTDBOptions.TimeseriesOption option : 
options.getTimeseriesOptionList()) {
+            if (!session.checkTimeseriesExists(option.getPath())) {
+                session.createTimeseries(option.getPath(), 
option.getDataType(), option.getEncoding(), option.getCompressor());
+            }
+        }
+    }
+
+    void initScheduler() {
+        if (batchSize > 0) {
+            scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
+            scheduledExecutor.scheduleAtFixedRate(() -> {
+                try {
+                    flush();
+                } catch (Exception e) {
+                    LOG.error("flush error", e);
+                }
+            }, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    //  for testing
+    void setSession(Session session) {
+        this.session = session;
+    }
+
+    @Override
+    public void invoke(IN input, Context context) throws Exception {
+        Event event = serializationSchema.serialize(input);
+        if (event == null) {
+            return;
+        }
+
+        if (batchSize > 0) {
+            synchronized (batchList) {
+                batchList.add(event);
+                if (batchList.size() >= batchSize) {
+                    flush();
+                }
+                return;
+            }
+        }
+
+        convertText(event.getDevice(), event.getMeasurements(), 
event.getValues());
+        TSStatus status = session.insert(event.getDevice(), 
event.getTimestamp(),
+                event.getMeasurements(), event.getValues());
+        LOG.debug("send event result: {}", status);
+    }
+
+    public IoTDBSink<IN> withBatchSize(int batchSize) {
+        Preconditions.checkArgument(batchSize >= 0);
+        this.batchSize = batchSize;
+        return this;
+    }
+
+    public IoTDBSink<IN> withFlushIntervalMs(int flushIntervalMs) {
+        Preconditions.checkArgument(flushIntervalMs > 0);
+        this.flushIntervalMs = flushIntervalMs;
+        return this;
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (session != null) {
+            try {
+                flush();
+            } catch (Exception e) {
+                LOG.error("flush error", e);
+            }
+            session.close();
+        }
+        if (scheduledExecutor != null) {
+            scheduledExecutor.shutdown();
+        }
+    }
+
+    private void convertText(String device, List<String> measurements, 
List<String> values) {
+        if (device != null && measurements != null && values != null && 
measurements.size() == values.size()) {
+            for (int i = 0; i < measurements.size(); i++) {
+                String measurement = device + "." + measurements.get(i);
+                IoTDBOptions.TimeseriesOption timeseriesOption = 
timeseriesOptionMap.get(measurement);
+                if (timeseriesOption!= null && 
TSDataType.TEXT.equals(timeseriesOption.getDataType())) {
+                    // The TEXT data type should be covered by " or '
+                    values.set(i, "'" + values.get(i) + "'");
+                }
+            }
+        }
+    }
+
+    private void flush() throws Exception {
+        if (batchSize > 0) {
+            synchronized (batchList) {
+                if (batchList.size() > 0) {
+                    List<String> deviceIds = new ArrayList<>();
+                    List<Long> timestamps = new ArrayList<>();
+                    List<List<String>> measurementsList = new ArrayList<>();
+                    List<List<String>> valuesList = new ArrayList<>();
+
+                    for (Event event : batchList) {
+                        convertText(event.getDevice(), 
event.getMeasurements(), event.getValues());
+                        deviceIds.add(event.getDevice());
+                        timestamps.add(event.getTimestamp());
+                        measurementsList.add(event.getMeasurements());
+                        valuesList.add(event.getValues());
+                    }
+                    List<TSStatus> statusList = 
session.insertInBatch(deviceIds, timestamps, measurementsList, valuesList);
+                    LOG.debug("send events result: {}", statusList);
+                    batchList.clear();
+                }
+            }
+        }
+    }
+}
diff --git 
a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTSerializationSchema.java
 
b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTSerializationSchema.java
new file mode 100644
index 0000000..e0b72d4
--- /dev/null
+++ 
b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTSerializationSchema.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.flink;
+
+import java.io.Serializable;
+
+/**
+ * IoTSerializationSchema serializes the input tuple data into events for 
inserting into IoTDB server.
+ * @param <T> the input data type
+ */
+public interface IoTSerializationSchema<T> extends Serializable {
+
+    Event serialize(T tuple);
+
+}
diff --git 
a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/DefaultIoTSerializationSchemaTest.java
 
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/DefaultIoTSerializationSchemaTest.java
new file mode 100644
index 0000000..efb5a34
--- /dev/null
+++ 
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/DefaultIoTSerializationSchemaTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.flink;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+public class DefaultIoTSerializationSchemaTest {
+
+    @Test
+    public void serialize() {
+        IoTDBOptions options = new IoTDBOptions();
+        options.setTimeseriesOptionList(Lists.newArrayList(new 
IoTDBOptions.TimeseriesOption("root.sg.D01.temperature")));
+        DefaultIoTSerializationSchema serializationSchema = new 
DefaultIoTSerializationSchema();
+
+        Map<String,String> tuple = new HashMap();
+        tuple.put("device", "root.sg.D01");
+        tuple.put("timestamp", "1581861293000");
+        tuple.put("measurements", "temperature");
+        tuple.put("values", "36.5");
+
+        Event event = serializationSchema.serialize(tuple);
+        assertEquals(tuple.get("device"), event.getDevice());
+        assertEquals(tuple.get("timestamp"), 
String.valueOf(event.getTimestamp()));
+        assertEquals(tuple.get("measurements"), 
event.getMeasurements().get(0));
+        assertEquals(tuple.get("values"), event.getValues().get(0));
+    }
+}
diff --git 
a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java
 
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java
new file mode 100644
index 0000000..3692377
--- /dev/null
+++ 
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.flink;
+
+import com.google.common.collect.Lists;
+import org.apache.iotdb.session.Session;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+public class IoTDBSinkBatchInsertTest {
+
+    private IoTDBSink ioTDBSink;
+    private Session session;
+
+    @Before
+    public void setUp() throws Exception {
+        IoTDBOptions options = new IoTDBOptions();
+        options.setTimeseriesOptionList(Lists.newArrayList(new 
IoTDBOptions.TimeseriesOption("root.sg.D01.temperature")));
+        ioTDBSink = new IoTDBSink(options, new 
DefaultIoTSerializationSchema());
+        ioTDBSink.withBatchSize(3);
+
+        session = mock(Session.class);
+        ioTDBSink.setSession(session);
+    }
+
+    @Test
+    public void testBatchInsert() throws Exception {
+        Map<String,String> tuple = new HashMap();
+        tuple.put("device", "root.sg.D01");
+        tuple.put("timestamp", "1581861293000");
+        tuple.put("measurements", "temperature");
+        tuple.put("values", "36.5");
+        ioTDBSink.invoke(tuple, null);
+
+        verifyZeroInteractions(session);
+
+        tuple = new HashMap();
+        tuple.put("device", "root.sg.D01");
+        tuple.put("timestamp", "1581861293001");
+        tuple.put("measurements", "temperature");
+        tuple.put("values", "37.2");
+        ioTDBSink.invoke(tuple, null);
+
+        verifyZeroInteractions(session);
+
+        tuple = new HashMap();
+        tuple.put("device", "root.sg.D01");
+        tuple.put("timestamp", "1581861293003");
+        tuple.put("measurements", "temperature");
+        tuple.put("values", "37.1");
+        ioTDBSink.invoke(tuple, null);
+
+        verify(session).insertInBatch(any(List.class), any(List.class), 
any(List.class), any(List.class));
+
+        tuple = new HashMap();
+        tuple.put("device", "root.sg.D01");
+        tuple.put("timestamp", "1581861293005");
+        tuple.put("measurements", "temperature");
+        tuple.put("values", "36.5");
+        ioTDBSink.invoke(tuple, null);
+
+        verifyZeroInteractions(session);
+    }
+
+    @Test
+    public void close() throws Exception {
+        Map<String,String> tuple = new HashMap();
+        tuple.put("device", "root.sg.D01");
+        tuple.put("timestamp", "1581861293005");
+        tuple.put("measurements", "temperature");
+        tuple.put("values", "36.5");
+        ioTDBSink.invoke(tuple, null);
+        verifyZeroInteractions(session);
+
+        ioTDBSink.close();
+        verify(session).insertInBatch(any(List.class), any(List.class), 
any(List.class), any(List.class));
+        verify(session).close();
+    }
+}
diff --git 
a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchTimerTest.java
 
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchTimerTest.java
new file mode 100644
index 0000000..226e798
--- /dev/null
+++ 
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchTimerTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.flink;
+
+import com.google.common.collect.Lists;
+import org.apache.iotdb.session.Session;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+public class IoTDBSinkBatchTimerTest {
+
+    private IoTDBSink ioTDBSink;
+    private Session session;
+
+    @Before
+    public void setUp() throws Exception {
+        IoTDBOptions options = new IoTDBOptions();
+        options.setTimeseriesOptionList(Lists.newArrayList(new 
IoTDBOptions.TimeseriesOption("root.sg.D01.temperature")));
+        ioTDBSink = new IoTDBSink(options, new 
DefaultIoTSerializationSchema());
+        ioTDBSink.withBatchSize(3);
+        ioTDBSink.withFlushIntervalMs(1000);
+        ioTDBSink.initScheduler();
+
+        session = mock(Session.class);
+        ioTDBSink.setSession(session);
+    }
+
+    @Test
+    public void testBatchInsert() throws Exception {
+        Map<String,String> tuple = new HashMap();
+        tuple.put("device", "root.sg.D01");
+        tuple.put("timestamp", "1581861293000");
+        tuple.put("measurements", "temperature");
+        tuple.put("values", "36.5");
+        ioTDBSink.invoke(tuple, null);
+
+        Thread.sleep(2500);
+
+        verify(session).insertInBatch(any(List.class), any(List.class), 
any(List.class), any(List.class));
+
+        Thread.sleep(1000);
+
+        verifyZeroInteractions(session);
+    }
+
+    @Test
+    public void close() throws Exception {
+        ioTDBSink.close();
+        verify(session).close();
+    }
+}
diff --git 
a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkInsertTest.java
 
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkInsertTest.java
new file mode 100644
index 0000000..06fe7f3
--- /dev/null
+++ 
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkInsertTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.flink;
+
+import com.google.common.collect.Lists;
+import org.apache.iotdb.session.Session;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class IoTDBSinkInsertTest {
+
+    private IoTDBSink ioTDBSink;
+    private Session session;
+
+    @Before
+    public void setUp() throws Exception {
+        IoTDBOptions options = new IoTDBOptions();
+        options.setTimeseriesOptionList(Lists.newArrayList(new 
IoTDBOptions.TimeseriesOption("root.sg.D01.temperature")));
+        ioTDBSink = new IoTDBSink(options, new 
DefaultIoTSerializationSchema());
+
+        session = mock(Session.class);
+        ioTDBSink.setSession(session);
+    }
+
+    @Test
+    public void testInsert() throws Exception {
+        Map<String,String> tuple = new HashMap();
+        tuple.put("device", "root.sg.D01");
+        tuple.put("timestamp", "1581861293000");
+        tuple.put("measurements", "temperature");
+        tuple.put("values", "36.5");
+
+        ioTDBSink.invoke(tuple, null);
+        verify(session).insert(any(String.class), any(Long.class), 
any(List.class), any(List.class));
+    }
+
+    @Test
+    public void close() throws Exception {
+        ioTDBSink.close();
+        verify(session).close();
+    }
+}
diff --git a/pom.xml b/pom.xml
index 8f754f1..b3d7f46 100644
--- a/pom.xml
+++ b/pom.xml
@@ -56,6 +56,7 @@
         <module>spark-tsfile</module>
         <module>hadoop</module>
         <module>spark-iotdb-connector</module>
+        <module>flink-iotdb-connector</module>
         <module>distribution</module>
         <module>hive-connector</module>
     </modules>
@@ -84,6 +85,7 @@
         <common.lang.version>2.6</common.lang.version>
         <common.lang3.version>3.8.1</common.lang3.version>
         <common.logging.version>1.1.3</common.logging.version>
+        <guava.version>21.0</guava.version>
         <jline.version>2.14.5</jline.version>
         <jetty.version>9.4.24.v20191120</jetty.version>
         <metrics.version>3.2.6</metrics.version>
@@ -150,7 +152,7 @@
             <dependency>
                 <groupId>com.google.guava</groupId>
                 <artifactId>guava</artifactId>
-                <version>21.0</version>
+                <version>${guava.version}</version>
             </dependency>
             <dependency>
                 <groupId>com.sun.istack</groupId>

Reply via email to