Repository: bahir-flink Updated Branches: refs/heads/master b580566f0 -> 4f0179a17
[BAHIR-134] Add InfluxDb sink for flink stream This closes #21 Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/f07276ee Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/f07276ee Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/f07276ee Branch: refs/heads/master Commit: f07276eef2babc52ffdb43c5fcb76f9d51b9153f Parents: b580566 Author: zhouhai02 <[email protected]> Authored: Sun Aug 27 19:35:31 2017 +0800 Committer: Robert Metzger <[email protected]> Committed: Wed Sep 20 15:05:28 2017 +0200 ---------------------------------------------------------------------- flink-connector-influxdb/README.md | 32 +++ .../examples/influxdb/InfluxDBSinkExample.java | 94 +++++++ .../src/main/resources/log4j.properties | 23 ++ flink-connector-influxdb/pom.xml | 78 ++++++ .../connectors/influxdb/InfluxDBConfig.java | 257 +++++++++++++++++++ .../connectors/influxdb/InfluxDBPoint.java | 78 ++++++ .../connectors/influxdb/InfluxDBSink.java | 106 ++++++++ pom.xml | 1 + 8 files changed, 669 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/f07276ee/flink-connector-influxdb/README.md ---------------------------------------------------------------------- diff --git a/flink-connector-influxdb/README.md b/flink-connector-influxdb/README.md new file mode 100644 index 0000000..0f9e477 --- /dev/null +++ b/flink-connector-influxdb/README.md @@ -0,0 +1,32 @@ +# Flink InfluxDB Connector + +This connector provides a sink that can send data to [InfluxDB](https://www.influxdata.com/). To use this connector, add the +following dependency to your project: + + <dependency> + <groupId>org.apache.bahir</groupId> + <artifactId>flink-connector-influxdb_2.11</artifactId> + <version>1.0-SNAPSHOT</version> + </dependency> + +*Version Compatibility*: This module is compatible with InfluxDB 1.3.x +*Requirements*: Java 1.8+ + +Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution. +See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/linking.html). + +## Installing InfluxDB +Follow the instructions from the [InfluxDB download page](https://portal.influxdata.com/downloads#influxdb). + +## Examples + +### JAVA API + + DataStream<InfluxDBPoint> dataStream = ... + InfluxDBConfig influxDBConfig = InfluxDBConfig.builder(String host, String username, String password, String dbName) + dataStream.addSink(new InfluxDBSink(influxDBConfig)); + + +See end-to-end examples at [InfluxDB Examples](https://github.com/apache/bahir-flink/tree/master/flink-connector-influxdb/examples) + + \ No newline at end of file http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/f07276ee/flink-connector-influxdb/examples/src/main/java/org/apache/flink/streaming/examples/influxdb/InfluxDBSinkExample.java ---------------------------------------------------------------------- diff --git a/flink-connector-influxdb/examples/src/main/java/org/apache/flink/streaming/examples/influxdb/InfluxDBSinkExample.java b/flink-connector-influxdb/examples/src/main/java/org/apache/flink/streaming/examples/influxdb/InfluxDBSinkExample.java new file mode 100644 index 0000000..3047743 --- /dev/null +++ b/flink-connector-influxdb/examples/src/main/java/org/apache/flink/streaming/examples/influxdb/InfluxDBSinkExample.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.influxdb; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.influxdb.InfluxDBConfig; +import org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint; +import org.apache.flink.streaming.connectors.influxdb.InfluxDBSink; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * This is an example showing the to use the InfluxDB Sink in the Streaming API. + * <p> + * <p>The example assumes that a database exists in a local InfluxDB server, according to the following query: + * <p>curl -POST http://localhost:8086/query --data-urlencode "q=CREATE DATABASE db_flink_test" + */ +public class InfluxDBSinkExample { + private static final Logger LOG = LoggerFactory.getLogger(InfluxDBSinkExample.class); + + private static final int N = 10000; + + public static void main(String[] args) throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + List<String> dataList = new ArrayList<>(); + for (int i = 0; i < N; ++i) { + String id = "server" + String.valueOf(i); + dataList.add("cpu#" + id); + dataList.add("mem#" + id); + dataList.add("disk#" + id); + } + DataStream<String> source = env.fromElements(dataList.toArray(new String[0])); + + + DataStream<InfluxDBPoint> dataStream = source.map( + new RichMapFunction<String, InfluxDBPoint>() { + @Override + public InfluxDBPoint map(String s) throws Exception { + String[] input = s.split("#"); + + String measurement = input[0]; + long timestamp = System.currentTimeMillis(); + + HashMap<String, String> tags = new HashMap<>(); + tags.put("host", input[1]); + tags.put("region", "region#" + String.valueOf(input[1].hashCode() % 20)); + + HashMap<String, Object> fields = new HashMap<>(); + fields.put("value1", input[1].hashCode() % 100); + fields.put("value2", input[1].hashCode() % 50); + + return new InfluxDBPoint(measurement, timestamp, tags, fields); + } + } + ); + + //dataStream.print(); + + //InfluxDBConfig influxDBConfig = new InfluxDBConfig.Builder("http://localhost:8086", "root", "root", "db_flink_test") + InfluxDBConfig influxDBConfig = InfluxDBConfig.builder("http://localhost:8086", "root", "root", "db_flink_test") + .batchActions(1000) + .flushDuration(100, TimeUnit.MILLISECONDS) + .enableGzip(true) + .build(); + + dataStream.addSink(new InfluxDBSink(influxDBConfig)); + + env.execute("InfluxDB Sink Example"); + } + +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/f07276ee/flink-connector-influxdb/examples/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/flink-connector-influxdb/examples/src/main/resources/log4j.properties b/flink-connector-influxdb/examples/src/main/resources/log4j.properties new file mode 100644 index 0000000..6a59e09 --- /dev/null +++ b/flink-connector-influxdb/examples/src/main/resources/log4j.properties @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +log4j.rootLogger=INFO, console + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n + http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/f07276ee/flink-connector-influxdb/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connector-influxdb/pom.xml b/flink-connector-influxdb/pom.xml new file mode 100644 index 0000000..dce012e --- /dev/null +++ b/flink-connector-influxdb/pom.xml @@ -0,0 +1,78 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.bahir</groupId> + <artifactId>bahir-flink-parent_2.11</artifactId> + <version>1.1-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-connector-influxdb_2.11</artifactId> + <name>flink-connector-influxdb</name> + + <packaging>jar</packaging> + + <properties> + <influxdb-client.version>2.7</influxdb-client.version> + </properties> + + <dependencies> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + </dependency> + + <dependency> + <groupId>org.influxdb</groupId> + <artifactId>influxdb-java</artifactId> + <version>${influxdb-client.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/f07276ee/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBConfig.java ---------------------------------------------------------------------- diff --git a/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBConfig.java b/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBConfig.java new file mode 100644 index 0000000..9c1220d --- /dev/null +++ b/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBConfig.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.influxdb; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.concurrent.TimeUnit; + +/** + * Configuration for InfluxDB. + */ +public class InfluxDBConfig implements Serializable { + private static final long serialVersionUID = 1L; + + private static final int DEFAULT_BATCH_ACTIONS = 2000; + private static final int DEFAULT_FLUSH_DURATION = 100; + + private String url; + private String username; + private String password; + private String database; + private int batchActions; + private int flushDuration; + private TimeUnit flushDurationTimeUnit; + private boolean enableGzip; + + public InfluxDBConfig(InfluxDBConfig.Builder builder) { + Preconditions.checkArgument(builder != null, "InfluxDBConfig builder can not be null"); + + this.url = Preconditions.checkNotNull(builder.getUrl(), "host can not be null"); + this.username = Preconditions.checkNotNull(builder.getUsername(), "username can not be null"); + this.password = Preconditions.checkNotNull(builder.getPassword(), "password can not be null"); + this.database = Preconditions.checkNotNull(builder.getDatabase(), "database name can not be null"); + + this.batchActions = builder.getBatchActions(); + this.flushDuration = builder.getFlushDuration(); + this.flushDurationTimeUnit = builder.getFlushDurationTimeUnit(); + this.enableGzip = builder.isEnableGzip(); + } + + public String getUrl() { + return url; + } + + public String getUsername() { + return username; + } + + public String getPassword() { + return password; + } + + public String getDatabase() { + return database; + } + + public int getBatchActions() { + return batchActions; + } + + public int getFlushDuration() { + return flushDuration; + } + + public TimeUnit getFlushDurationTimeUnit() { + return flushDurationTimeUnit; + } + + public boolean isEnableGzip() { + return enableGzip; + } + + /** + * Creates a new {@link InfluxDBConfig.Builder} instance. + * <p/> + * This is a convenience method for {@code new InfluxDBConfig.Builder()}. + * + * @param url the url to connect to + * @param username the username which is used to authorize against the influxDB instance + * @param password the password for the username which is used to authorize against the influxDB + * instance + * @param database the name of the database to write + * @return the new InfluxDBConfig builder. + */ + public static Builder builder(String url, String username, String password, String database) { + return new Builder(url, username, password, database); + } + + /** + * A builder used to create a build an instance of a InfluxDBConfig. + */ + public static class Builder { + private String url; + private String username; + private String password; + private String database; + private int batchActions = DEFAULT_BATCH_ACTIONS; + private int flushDuration = DEFAULT_FLUSH_DURATION; + private TimeUnit flushDurationTimeUnit = TimeUnit.MILLISECONDS; + private boolean enableGzip = false; + + /** + * Creates a builder + * + * @param url the url to connect to + * @param username the username which is used to authorize against the influxDB instance + * @param password the password for the username which is used to authorize against the influxDB + * instance + * @param database the name of the database to write + */ + public Builder(String url, String username, String password, String database) { + this.url = url; + this.username = username; + this.password = password; + this.database = database; + } + + /** + * Sets url. + * + * @param url the url to connect to + * @return this Builder to use it fluent + */ + public InfluxDBConfig.Builder url(String url) { + this.url = url; + return this; + } + + /** + * Sets username. + * + * @param username the username which is used to authorize against the influxDB instance + * @return this Builder to use it fluent + */ + public InfluxDBConfig.Builder username(String username) { + this.username = username; + return this; + } + + /** + * Sets password. + * + * @param password the password for the username which is used to authorize against the influxDB + * instance + * @return this Builder to use it fluent + */ + public InfluxDBConfig.Builder password(String password) { + this.password = password; + return this; + } + + /** + * Sets database name. + * + * @param database the name of the database to write + * @return this Builder to use it fluent + */ + public InfluxDBConfig.Builder database(String database) { + this.database = database; + return this; + } + + /** + * Sets when to flush a new bulk request based on the number of batch actions currently added. + * Defaults to <tt>DEFAULT_BATCH_ACTIONS</tt>. Can be set to <tt>-1</tt> to disable it. + * + * @param batchActions number of Points written after which a write must happen. + * @return this Builder to use it fluent + */ + public InfluxDBConfig.Builder batchActions(int batchActions) { + this.batchActions = batchActions; + return this; + } + + /** + * Sets a flush interval flushing *any* bulk actions pending if the interval passes. + * + * @param flushDuration the flush duration + * @param flushDurationTimeUnit the TimeUnit of the flush duration + * @return this Builder to use it fluent + */ + public Builder flushDuration(int flushDuration, TimeUnit flushDurationTimeUnit) { + this.flushDuration = flushDuration; + this.flushDurationTimeUnit = flushDurationTimeUnit; + return this; + } + + /** + * Enable Gzip compress for http request body. + * + * @param enableGzip the enableGzip value + * @return this Builder to use it fluent + */ + public InfluxDBConfig.Builder enableGzip(boolean enableGzip) { + this.enableGzip = enableGzip; + return this; + } + + /** + * Builds InfluxDBConfig. + * + * @return the InfluxDBConfig instance. + */ + public InfluxDBConfig build() { + return new InfluxDBConfig(this); + } + + + public String getUrl() { + return url; + } + + public String getUsername() { + return username; + } + + public String getPassword() { + return password; + } + + public String getDatabase() { + return database; + } + + public int getBatchActions() { + return batchActions; + } + + public int getFlushDuration() { + return flushDuration; + } + + public TimeUnit getFlushDurationTimeUnit() { + return flushDurationTimeUnit; + } + + public boolean isEnableGzip() { + return enableGzip; + } + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/f07276ee/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBPoint.java ---------------------------------------------------------------------- diff --git a/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBPoint.java b/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBPoint.java new file mode 100644 index 0000000..3be9c90 --- /dev/null +++ b/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBPoint.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.influxdb; + +import java.util.HashMap; +import java.util.Map; + +/** + * Representation of a InfluxDB database Point. + */ +public class InfluxDBPoint { + + private String measurement; + private long timestamp; + private Map<String, String> tags; + private Map<String, Object> fields; + + public InfluxDBPoint(String measurement, long timestamp) { + this.measurement = measurement; + this.timestamp = timestamp; + this.fields = new HashMap<>(); + this.tags = new HashMap<>(); + } + + public InfluxDBPoint(String measurement, long timestamp, Map<String, String> tags, Map<String, Object> fields) { + this.measurement = measurement; + this.timestamp = timestamp; + this.tags = tags; + this.fields = fields; + } + + public String getMeasurement() { + return measurement; + } + + public void setMeasurement(String measurement) { + this.measurement = measurement; + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + public Map<String, String> getTags() { + return tags; + } + + public void setTags(Map<String, String> tags) { + this.tags = tags; + } + + public Map<String, Object> getFields() { + return fields; + } + + public void setFields(Map<String, Object> fields) { + this.fields = fields; + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/f07276ee/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java ---------------------------------------------------------------------- diff --git a/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java b/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java new file mode 100644 index 0000000..03521b9 --- /dev/null +++ b/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.influxdb; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; +import org.influxdb.InfluxDB; +import org.influxdb.InfluxDBFactory; +import org.influxdb.dto.Point; + +import java.util.concurrent.TimeUnit; + +/** + * Sink to save data into a InfluxDB cluster. + */ +public class InfluxDBSink extends RichSinkFunction<InfluxDBPoint> { + + private transient InfluxDB influxDBClient; + private final InfluxDBConfig influxDBConfig; + + /** + * Creates a new {@link InfluxDBSink} that connects to the InfluxDB server. + * + * @param influxDBConfig The configuration of {@link InfluxDBConfig} + */ + public InfluxDBSink(InfluxDBConfig influxDBConfig) { + this.influxDBConfig = Preconditions.checkNotNull(influxDBConfig, "InfluxDB client config should not be null"); + } + + /** + * Initializes the connection to InfluxDB by either cluster or sentinels or single server. + */ + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + influxDBClient = InfluxDBFactory.connect(influxDBConfig.getUrl(), influxDBConfig.getUsername(), influxDBConfig.getPassword()); + + if (!influxDBClient.databaseExists(influxDBConfig.getDatabase())) { + throw new RuntimeException("This " + influxDBConfig.getDatabase() + " database does not exist!"); + } + + influxDBClient.setDatabase(influxDBConfig.getDatabase()); + + if (influxDBConfig.getBatchActions() > 0) { + influxDBClient.enableBatch(influxDBConfig.getBatchActions(), influxDBConfig.getFlushDuration(), influxDBConfig.getFlushDurationTimeUnit()); + } + + if (influxDBConfig.isEnableGzip()) { + + influxDBClient.enableGzip(); + } + } + + /** + * Called when new data arrives to the sink, and forwards it to InfluxDB. + * + * @param dataPoint {@link InfluxDBPoint} + */ + @Override + public void invoke(InfluxDBPoint dataPoint) throws Exception { + if (StringUtils.isNullOrWhitespaceOnly(dataPoint.getMeasurement())) { + throw new RuntimeException("No measurement defined"); + } + + Point.Builder builder = Point.measurement(dataPoint.getMeasurement()) + .time(dataPoint.getTimestamp(), TimeUnit.MILLISECONDS); + + if (!CollectionUtil.isNullOrEmpty(dataPoint.getFields())) { + builder.fields(dataPoint.getFields()); + } + + if (!CollectionUtil.isNullOrEmpty(dataPoint.getTags())) { + builder.tag(dataPoint.getTags()); + } + + Point point = builder.build(); + influxDBClient.write(point); + } + + @Override + public void close() { + if (influxDBClient.isBatchEnabled()) { + influxDBClient.disableBatch(); + } + influxDBClient.close(); + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/f07276ee/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 2e39a83..4070698 100644 --- a/pom.xml +++ b/pom.xml @@ -76,6 +76,7 @@ <module>flink-connector-activemq</module> <module>flink-connector-netty</module> <module>flink-connector-akka</module> + <module>flink-connector-influxdb</module> </modules> <properties>
