This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 8e83fe5  [Feature][Connectors] Support auth in InfluxDB Flink Source 
plugin (#1216)
8e83fe5 is described below

commit 8e83fe5959a6346568571d032f4d486ed7ae7cc7
Author: Benedict Jin <[email protected]>
AuthorDate: Fri Feb 11 10:09:36 2022 +0800

    [Feature][Connectors] Support auth in InfluxDB Flink Source plugin (#1216)
---
 .../flink/configuration/source-plugins/InfluxDb.md | 26 ++++++++++++++++++++++
 .../flink/source/InfluxDbInputFormat.java          | 18 ++++++++++++++-
 .../seatunnel/flink/source/InfluxDbSource.java     |  6 +++++
 3 files changed, 49 insertions(+), 1 deletion(-)

diff --git a/docs/en/flink/configuration/source-plugins/InfluxDb.md 
b/docs/en/flink/configuration/source-plugins/InfluxDb.md
index b7dae20..fb6c5c9 100644
--- a/docs/en/flink/configuration/source-plugins/InfluxDb.md
+++ b/docs/en/flink/configuration/source-plugins/InfluxDb.md
@@ -9,6 +9,8 @@ Read data from InfluxDB.
 | name        | type           | required | default value |
 | ----------- | -------------- | -------- | ------------- |
 | server_url  | `String`       | yes      | -             |
+| username    | `String`       | no       | -             |
+| password    | `String`       | no       | -             |
 | database    | `String`       | yes      | -             |
 | measurement | `String`       | yes      | -             |
 | fields      | `List<String>` | yes      | -             |
@@ -18,6 +20,14 @@ Read data from InfluxDB.
 
 The URL of InfluxDB Server.
 
+### username [`String`]
+
+The username of InfluxDB Server.
+
+### password [`String`]
+
+The password of InfluxDB Server.
+
 ### datasource [`String`]
 
 The DataSource name in InfluxDB.
@@ -36,9 +46,25 @@ The list of Field Types in InfluxDB.
 
 ## Example
 
+### Simple
+
+```hocon
+InfluxDbSource {
+  server_url = "http://127.0.0.1:8086/";
+  database = "influxdb"
+  measurement = "m"
+  fields = ["time", "temperature"]
+  field_types = ["STRING", "DOUBLE"]
+}
+```
+
+### Auth
+
 ```hocon
 InfluxDbSource {
   server_url = "http://127.0.0.1:8086/";
+  username = "admin"
+  password = "password"
   database = "influxdb"
   measurement = "m"
   fields = ["time", "temperature"]
diff --git 
a/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/source/InfluxDbInputFormat.java
 
b/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/source/InfluxDbInputFormat.java
index 0cd06f4..b1e3ef7 100644
--- 
a/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/source/InfluxDbInputFormat.java
+++ 
b/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/source/InfluxDbInputFormat.java
@@ -44,6 +44,8 @@ public class InfluxDbInputFormat extends RichInputFormat<Row, 
InputSplit> implem
     private static final Logger LOG = 
LoggerFactory.getLogger(InfluxDbInputFormat.class);
 
     private String serverURL;
+    private String username;
+    private String password;
     private String database;
     private String query;
     private List<String> fields;
@@ -69,7 +71,11 @@ public class InfluxDbInputFormat extends 
RichInputFormat<Row, InputSplit> implem
     @Override
     public void openInputFormat() {
         try {
-            conn = InfluxDBFactory.connect(serverURL);
+            if (username == null || password == null) {
+                conn = InfluxDBFactory.connect(serverURL);
+            } else {
+                conn = InfluxDBFactory.connect(serverURL, username, password);
+            }
             conn.setDatabase(database);
             offset = 0;
             hasNext = true;
@@ -163,6 +169,16 @@ public class InfluxDbInputFormat extends 
RichInputFormat<Row, InputSplit> implem
             return this;
         }
 
+        public InfluxDbInputFormatBuilder setUsername(String username) {
+            format.username = username;
+            return this;
+        }
+
+        public InfluxDbInputFormatBuilder setPassword(String password) {
+            format.password = password;
+            return this;
+        }
+
         public InfluxDbInputFormatBuilder setDatabase(String database) {
             format.database = database;
             return this;
diff --git 
a/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/source/InfluxDbSource.java
 
b/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/source/InfluxDbSource.java
index 018f509..c045765 100644
--- 
a/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/source/InfluxDbSource.java
+++ 
b/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/source/InfluxDbSource.java
@@ -47,6 +47,8 @@ public class InfluxDbSource implements FlinkBatchSource<Row> {
     private InfluxDbInputFormat influxDbInputFormat;
 
     private static final String SERVER_URL = "server_url";
+    private static final String USER_NAME = "username";
+    private static final String PASSWORD = "password";
     private static final String DATABASE = "database";
     private static final String MEASUREMENT = "measurement";
     private static final String FIELDS = "fields";
@@ -91,6 +93,8 @@ public class InfluxDbSource implements FlinkBatchSource<Row> {
     @Override
     public void prepare(FlinkEnvironment env) {
         String serverURL = config.getString(SERVER_URL);
+        String username = config.hasPath(USER_NAME) ? 
config.getString(USER_NAME) : null;
+        String password = config.hasPath(PASSWORD) ? 
config.getString(PASSWORD) : null;
         String database = config.getString(DATABASE);
         String measurement = config.getString(MEASUREMENT);
         List<String> fields = config.getStringList(FIELDS);
@@ -108,6 +112,8 @@ public class InfluxDbSource implements 
FlinkBatchSource<Row> {
 
         this.influxDbInputFormat = 
InfluxDbInputFormat.buildInfluxDbInputFormat()
                 .setServerURL(serverURL)
+                .setUsername(username)
+                .setPassword(password)
                 .setDatabase(database)
                 .setQuery(sql)
                 .setFields(fields)

Reply via email to