[BAHIR-178] Added option to create new InfluxDb database Closes #34
Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/898e913f Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/898e913f Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/898e913f Branch: refs/heads/master Commit: 898e913fe9bdc9837e6f5a9be8d80e41dc1ea020 Parents: f6c6362 Author: Wojciech Luczkow <[email protected]> Authored: Tue Sep 18 11:12:37 2018 +0200 Committer: Luciano Resende <[email protected]> Committed: Wed Nov 7 12:28:34 2018 -0800 ---------------------------------------------------------------------- .../connectors/influxdb/InfluxDBConfig.java | 18 ++++++++++++++++++ .../connectors/influxdb/InfluxDBSink.java | 7 ++++++- 2 files changed, 24 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/898e913f/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBConfig.java ---------------------------------------------------------------------- diff --git a/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBConfig.java b/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBConfig.java index 9c1220d..eeafb7a 100644 --- a/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBConfig.java +++ b/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBConfig.java @@ -39,6 +39,7 @@ public class InfluxDBConfig implements Serializable { private int flushDuration; private TimeUnit flushDurationTimeUnit; private boolean enableGzip; + private boolean createDatabase; public InfluxDBConfig(InfluxDBConfig.Builder builder) { Preconditions.checkArgument(builder != null, "InfluxDBConfig builder can not be null"); @@ -52,6 +53,7 @@ public class InfluxDBConfig implements Serializable { this.flushDuration = builder.getFlushDuration(); this.flushDurationTimeUnit = builder.getFlushDurationTimeUnit(); this.enableGzip = builder.isEnableGzip(); + this.createDatabase = builder.isCreateDatabase(); } public String getUrl() { @@ -86,6 +88,8 @@ public class InfluxDBConfig implements Serializable { return enableGzip; } + public boolean isCreateDatabase() { return createDatabase; } + /** * Creates a new {@link InfluxDBConfig.Builder} instance. * <p/> @@ -114,6 +118,7 @@ public class InfluxDBConfig implements Serializable { private int flushDuration = DEFAULT_FLUSH_DURATION; private TimeUnit flushDurationTimeUnit = TimeUnit.MILLISECONDS; private boolean enableGzip = false; + private boolean createDatabase = false; /** * Creates a builder @@ -213,6 +218,17 @@ public class InfluxDBConfig implements Serializable { } /** + * Make InfluxDb sink create new database + * + * @param createDatabase createDatabase switch value + * @return this Builder to use it fluent + */ + public InfluxDBConfig.Builder createDatabase(boolean createDatabase) { + this.createDatabase = createDatabase; + return this; + } + + /** * Builds InfluxDBConfig. * * @return the InfluxDBConfig instance. @@ -253,5 +269,7 @@ public class InfluxDBConfig implements Serializable { public boolean isEnableGzip() { return enableGzip; } + + public boolean isCreateDatabase() { return createDatabase; } } } http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/898e913f/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java ---------------------------------------------------------------------- diff --git a/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java b/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java index e7f8916..61de76a 100644 --- a/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java +++ b/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java @@ -55,7 +55,12 @@ public class InfluxDBSink extends RichSinkFunction<InfluxDBPoint> { influxDBClient = InfluxDBFactory.connect(influxDBConfig.getUrl(), influxDBConfig.getUsername(), influxDBConfig.getPassword()); if (!influxDBClient.databaseExists(influxDBConfig.getDatabase())) { - throw new RuntimeException("This " + influxDBConfig.getDatabase() + " database does not exist!"); + if(influxDBConfig.isCreateDatabase()) { + influxDBClient.createDatabase(influxDBConfig.getDatabase()); + } + else { + throw new RuntimeException("This " + influxDBConfig.getDatabase() + " database does not exist!"); + } } influxDBClient.setDatabase(influxDBConfig.getDatabase());
