[BAHIR-178] Added option to create new InfluxDb database

Closes #34


Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/898e913f
Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/898e913f
Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/898e913f

Branch: refs/heads/master
Commit: 898e913fe9bdc9837e6f5a9be8d80e41dc1ea020
Parents: f6c6362
Author: Wojciech Luczkow <[email protected]>
Authored: Tue Sep 18 11:12:37 2018 +0200
Committer: Luciano Resende <[email protected]>
Committed: Wed Nov 7 12:28:34 2018 -0800

----------------------------------------------------------------------
 .../connectors/influxdb/InfluxDBConfig.java       | 18 ++++++++++++++++++
 .../connectors/influxdb/InfluxDBSink.java         |  7 ++++++-
 2 files changed, 24 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/898e913f/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBConfig.java
 
b/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBConfig.java
index 9c1220d..eeafb7a 100644
--- 
a/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBConfig.java
+++ 
b/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBConfig.java
@@ -39,6 +39,7 @@ public class InfluxDBConfig implements Serializable {
     private int flushDuration;
     private TimeUnit flushDurationTimeUnit;
     private boolean enableGzip;
+    private boolean createDatabase;
 
     public InfluxDBConfig(InfluxDBConfig.Builder builder) {
         Preconditions.checkArgument(builder != null, "InfluxDBConfig builder 
can not be null");
@@ -52,6 +53,7 @@ public class InfluxDBConfig implements Serializable {
         this.flushDuration = builder.getFlushDuration();
         this.flushDurationTimeUnit = builder.getFlushDurationTimeUnit();
         this.enableGzip = builder.isEnableGzip();
+        this.createDatabase = builder.isCreateDatabase();
     }
 
     public String getUrl() {
@@ -86,6 +88,8 @@ public class InfluxDBConfig implements Serializable {
         return enableGzip;
     }
 
+    public boolean isCreateDatabase() { return createDatabase; }
+
     /**
      * Creates a new {@link InfluxDBConfig.Builder} instance.
      * <p/>
@@ -114,6 +118,7 @@ public class InfluxDBConfig implements Serializable {
         private int flushDuration = DEFAULT_FLUSH_DURATION;
         private TimeUnit flushDurationTimeUnit = TimeUnit.MILLISECONDS;
         private boolean enableGzip = false;
+        private boolean createDatabase = false;
 
         /**
          * Creates a builder
@@ -213,6 +218,17 @@ public class InfluxDBConfig implements Serializable {
         }
 
         /**
+         * Make InfluxDb sink create new database
+         *
+         * @param createDatabase createDatabase switch value
+         * @return this Builder to use it fluent
+         */
+        public InfluxDBConfig.Builder createDatabase(boolean createDatabase) {
+            this.createDatabase = createDatabase;
+            return this;
+        }
+
+        /**
          * Builds InfluxDBConfig.
          *
          * @return the InfluxDBConfig instance.
@@ -253,5 +269,7 @@ public class InfluxDBConfig implements Serializable {
         public boolean isEnableGzip() {
             return enableGzip;
         }
+
+        public boolean isCreateDatabase() { return createDatabase; }
     }
 }

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/898e913f/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java
 
b/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java
index e7f8916..61de76a 100644
--- 
a/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java
+++ 
b/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java
@@ -55,7 +55,12 @@ public class InfluxDBSink extends 
RichSinkFunction<InfluxDBPoint> {
         influxDBClient = InfluxDBFactory.connect(influxDBConfig.getUrl(), 
influxDBConfig.getUsername(), influxDBConfig.getPassword());
 
         if (!influxDBClient.databaseExists(influxDBConfig.getDatabase())) {
-            throw new RuntimeException("This " + influxDBConfig.getDatabase() 
+ " database does not exist!");
+            if(influxDBConfig.isCreateDatabase()) {
+                influxDBClient.createDatabase(influxDBConfig.getDatabase());
+            }
+            else {
+                throw new RuntimeException("This " + 
influxDBConfig.getDatabase() + " database does not exist!");
+            }
         }
 
         influxDBClient.setDatabase(influxDBConfig.getDatabase());

Reply via email to