This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 8e83fe5 [Feature][Connectors] Support auth in InfluxDB Flink Source
plugin (#1216)
8e83fe5 is described below
commit 8e83fe5959a6346568571d032f4d486ed7ae7cc7
Author: Benedict Jin <[email protected]>
AuthorDate: Fri Feb 11 10:09:36 2022 +0800
[Feature][Connectors] Support auth in InfluxDB Flink Source plugin (#1216)
---
.../flink/configuration/source-plugins/InfluxDb.md | 26 ++++++++++++++++++++++
.../flink/source/InfluxDbInputFormat.java | 18 ++++++++++++++-
.../seatunnel/flink/source/InfluxDbSource.java | 6 +++++
3 files changed, 49 insertions(+), 1 deletion(-)
diff --git a/docs/en/flink/configuration/source-plugins/InfluxDb.md
b/docs/en/flink/configuration/source-plugins/InfluxDb.md
index b7dae20..fb6c5c9 100644
--- a/docs/en/flink/configuration/source-plugins/InfluxDb.md
+++ b/docs/en/flink/configuration/source-plugins/InfluxDb.md
@@ -9,6 +9,8 @@ Read data from InfluxDB.
| name | type | required | default value |
| ----------- | -------------- | -------- | ------------- |
| server_url | `String` | yes | - |
+| username | `String` | no | - |
+| password | `String` | no | - |
| database | `String` | yes | - |
| measurement | `String` | yes | - |
| fields | `List<String>` | yes | - |
@@ -18,6 +20,14 @@ Read data from InfluxDB.
The URL of InfluxDB Server.
+### username [`String`]
+
+The username of InfluxDB Server.
+
+### password [`String`]
+
+The password of InfluxDB Server.
+
### datasource [`String`]
The DataSource name in InfluxDB.
@@ -36,9 +46,25 @@ The list of Field Types in InfluxDB.
## Example
+### Simple
+
+```hocon
+InfluxDbSource {
+ server_url = "http://127.0.0.1:8086/"
+ database = "influxdb"
+ measurement = "m"
+ fields = ["time", "temperature"]
+ field_types = ["STRING", "DOUBLE"]
+}
+```
+
+### Auth
+
```hocon
InfluxDbSource {
server_url = "http://127.0.0.1:8086/"
+ username = "admin"
+ password = "password"
database = "influxdb"
measurement = "m"
fields = ["time", "temperature"]
diff --git
a/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/source/InfluxDbInputFormat.java
b/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/source/InfluxDbInputFormat.java
index 0cd06f4..b1e3ef7 100644
---
a/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/source/InfluxDbInputFormat.java
+++
b/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/source/InfluxDbInputFormat.java
@@ -44,6 +44,8 @@ public class InfluxDbInputFormat extends RichInputFormat<Row,
InputSplit> implem
private static final Logger LOG =
LoggerFactory.getLogger(InfluxDbInputFormat.class);
private String serverURL;
+ private String username;
+ private String password;
private String database;
private String query;
private List<String> fields;
@@ -69,7 +71,11 @@ public class InfluxDbInputFormat extends
RichInputFormat<Row, InputSplit> implem
@Override
public void openInputFormat() {
try {
- conn = InfluxDBFactory.connect(serverURL);
+ if (username == null || password == null) {
+ conn = InfluxDBFactory.connect(serverURL);
+ } else {
+ conn = InfluxDBFactory.connect(serverURL, username, password);
+ }
conn.setDatabase(database);
offset = 0;
hasNext = true;
@@ -163,6 +169,16 @@ public class InfluxDbInputFormat extends
RichInputFormat<Row, InputSplit> implem
return this;
}
+ public InfluxDbInputFormatBuilder setUsername(String username) {
+ format.username = username;
+ return this;
+ }
+
+ public InfluxDbInputFormatBuilder setPassword(String password) {
+ format.password = password;
+ return this;
+ }
+
public InfluxDbInputFormatBuilder setDatabase(String database) {
format.database = database;
return this;
diff --git
a/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/source/InfluxDbSource.java
b/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/source/InfluxDbSource.java
index 018f509..c045765 100644
---
a/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/source/InfluxDbSource.java
+++
b/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/source/InfluxDbSource.java
@@ -47,6 +47,8 @@ public class InfluxDbSource implements FlinkBatchSource<Row> {
private InfluxDbInputFormat influxDbInputFormat;
private static final String SERVER_URL = "server_url";
+ private static final String USER_NAME = "username";
+ private static final String PASSWORD = "password";
private static final String DATABASE = "database";
private static final String MEASUREMENT = "measurement";
private static final String FIELDS = "fields";
@@ -91,6 +93,8 @@ public class InfluxDbSource implements FlinkBatchSource<Row> {
@Override
public void prepare(FlinkEnvironment env) {
String serverURL = config.getString(SERVER_URL);
+ String username = config.hasPath(USER_NAME) ?
config.getString(USER_NAME) : null;
+ String password = config.hasPath(PASSWORD) ?
config.getString(PASSWORD) : null;
String database = config.getString(DATABASE);
String measurement = config.getString(MEASUREMENT);
List<String> fields = config.getStringList(FIELDS);
@@ -108,6 +112,8 @@ public class InfluxDbSource implements
FlinkBatchSource<Row> {
this.influxDbInputFormat =
InfluxDbInputFormat.buildInfluxDbInputFormat()
.setServerURL(serverURL)
+ .setUsername(username)
+ .setPassword(password)
.setDatabase(database)
.setQuery(sql)
.setFields(fields)