This is an automated email from the ASF dual-hosted git repository. eskabetxe pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/bahir-flink.git
commit d7d3e5d663299521276beead871d90730cb08270 Author: dave <[email protected]> AuthorDate: Thu Oct 7 09:12:23 2021 -0400 [BAHIR-284] Add option to use token authentication for influxdb2 --- .../influxdb/sink/InfluxDBSinkBuilder.java | 30 ++++++++++++++++-- .../influxdb/sink/InfluxDBSinkOptions.java | 25 +++++++++++---- .../influxdb/sink/InfluxDBSinkBuilderTest.java | 37 +++++++++++++++++----- .../influxdb/util/InfluxDBContainer.java | 1 + 4 files changed, 76 insertions(+), 17 deletions(-) diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java index 70e3bbe..6b9fb5e 100644 --- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java +++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java @@ -20,10 +20,12 @@ package org.apache.flink.streaming.connectors.influxdb.sink; import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_BUCKET; import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_ORGANIZATION; import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_PASSWORD; +import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_TOKEN; import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_URL; import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_USERNAME; import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_BUFFER_SIZE; import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_DATA_POINT_CHECKPOINT; +import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; import org.apache.flink.configuration.Configuration; @@ -59,6 +61,7 @@ public final class InfluxDBSinkBuilder<IN> { private String influxDBUrl; private String influxDBUsername; private String influxDBPassword; + private String influxDBToken; private String bucketName; private String organizationName; private final Configuration configuration; @@ -67,6 +70,7 @@ public final class InfluxDBSinkBuilder<IN> { this.influxDBUrl = null; this.influxDBUsername = null; this.influxDBPassword = null; + this.influxDBToken = null; this.bucketName = null; this.organizationName = null; this.influxDBSchemaSerializer = null; @@ -109,6 +113,18 @@ public final class InfluxDBSinkBuilder<IN> { return this; } + /** + * Sets the InfluxDB token. + * + * @param influxDBToken the token of the InfluxDB instance. + * @return this InfluxDBSinkBuilder. + */ + public InfluxDBSinkBuilder<IN> setInfluxDBToken(final String influxDBToken) { + this.influxDBToken = influxDBToken; + this.configuration.setString(INFLUXDB_TOKEN, checkNotNull(influxDBToken)); + return this; + } + /** * Sets the InfluxDB bucket name. * @@ -190,8 +206,18 @@ public final class InfluxDBSinkBuilder<IN> { private void sanityCheck() { // Check required settings. checkNotNull(this.influxDBUrl, "The InfluxDB URL is required but not provided."); - checkNotNull(this.influxDBUsername, "The InfluxDB username is required but not provided."); - checkNotNull(this.influxDBPassword, "The InfluxDB password is required but not provided."); + // check that either username/password or token is provided for authentication + checkArgument( + this.influxDBToken != null + || (this.influxDBUsername != null && this.influxDBPassword != null), + "Either the InfluxDB username and password or InfluxDB token are required but neither provided" + ); + // check that both username/password and token are not both provided for authentication + checkArgument( + ! (this.influxDBToken != null + && (this.influxDBUsername != null || this.influxDBPassword != null)), + "Either the InfluxDB username and password or InfluxDB token are required but both provided" + ); checkNotNull(this.bucketName, "The Bucket name is required but not provided."); checkNotNull(this.organizationName, "The Organization name is required but not provided."); checkNotNull( diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkOptions.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkOptions.java index 97ff44e..90a23f5 100644 --- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkOptions.java +++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkOptions.java @@ -60,6 +60,12 @@ public final class InfluxDBSinkOptions { .noDefaultValue() .withDescription("InfluxDB password."); + public static final ConfigOption<String> INFLUXDB_TOKEN = + ConfigOptions.key("sink.influxDB.client.token") + .stringType() + .noDefaultValue() + .withDescription("InfluxDB access token."); + public static final ConfigOption<String> INFLUXDB_BUCKET = ConfigOptions.key("sink.influxDB.client.bucket") .stringType() @@ -76,15 +82,20 @@ public final class InfluxDBSinkOptions { final String url = configuration.getString(INFLUXDB_URL); final String username = configuration.getString(INFLUXDB_USERNAME); final String password = configuration.getString(INFLUXDB_PASSWORD); + final String token = configuration.getString(INFLUXDB_TOKEN); final String bucket = configuration.getString(INFLUXDB_BUCKET); final String organization = configuration.getString(INFLUXDB_ORGANIZATION); - final InfluxDBClientOptions influxDBClientOptions = - InfluxDBClientOptions.builder() - .url(url) - .authenticate(username, password.toCharArray()) - .bucket(bucket) - .org(organization) - .build(); + InfluxDBClientOptions.Builder builder = InfluxDBClientOptions.builder(); + builder = builder + .url(url) + .bucket(bucket) + .org(organization); + if (token != null) { + builder = builder.authenticateToken(token.toCharArray()); + } else if (username != null && password != null) { + builder = builder.authenticate(username, password.toCharArray()); + } + final InfluxDBClientOptions influxDBClientOptions = builder.build(); return InfluxDBClientFactory.create(influxDBClientOptions); } } diff --git a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilderTest.java b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilderTest.java index 1d47bb5..f8dc31c 100644 --- a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilderTest.java +++ b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilderTest.java @@ -42,10 +42,10 @@ class InfluxDBSinkBuilderTest { } @Test - void shouldNotBuildSinkWhenUsernameIsNotProvided() { - final NullPointerException exception = + void shouldNotBuildSinkWhenTokenNotProvidedAndUsernameIsNotProvided() { + final IllegalArgumentException exception = assertThrows( - NullPointerException.class, + IllegalArgumentException.class, () -> InfluxDBSink.builder() .setInfluxDBUrl("http://localhost:8086") @@ -54,14 +54,15 @@ class InfluxDBSinkBuilderTest { .setInfluxDBOrganization(InfluxDBContainer.organization) .setInfluxDBSchemaSerializer(new InfluxDBTestSerializer()) .build()); - assertEquals(exception.getMessage(), "The InfluxDB username is required but not provided."); + assertEquals(exception.getMessage(), + "Either the InfluxDB username and password or InfluxDB token are required but neither provided"); } @Test - void shouldNotBuildSinkWhenPasswordIsNotProvided() { - final NullPointerException exception = + void shouldNotBuildSinkWhenTokenNotProvidedAndPasswordIsNotProvided() { + final IllegalArgumentException exception = assertThrows( - NullPointerException.class, + IllegalArgumentException.class, () -> InfluxDBSink.builder() .setInfluxDBUrl("http://localhost:8086") @@ -70,7 +71,27 @@ class InfluxDBSinkBuilderTest { .setInfluxDBOrganization(InfluxDBContainer.organization) .setInfluxDBSchemaSerializer(new InfluxDBTestSerializer()) .build()); - assertEquals(exception.getMessage(), "The InfluxDB password is required but not provided."); + assertEquals(exception.getMessage(), + "Either the InfluxDB username and password or InfluxDB token are required but neither provided"); + } + + @Test + void shouldNotBuildSinkWhenTokenProvidedAndUsernamePasswordIsProvided() { + final IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> + InfluxDBSink.builder() + .setInfluxDBUrl("http://localhost:8086") + .setInfluxDBToken(InfluxDBContainer.token) + .setInfluxDBUsername(InfluxDBContainer.username) + .setInfluxDBPassword(InfluxDBContainer.password) + .setInfluxDBBucket(InfluxDBContainer.bucket) + .setInfluxDBOrganization(InfluxDBContainer.organization) + .setInfluxDBSchemaSerializer(new InfluxDBTestSerializer()) + .build()); + assertEquals(exception.getMessage(), + "Either the InfluxDB username and password or InfluxDB token are required but both provided"); } @Test diff --git a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBContainer.java b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBContainer.java index 0f7347a..b8a0d69 100644 --- a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBContainer.java +++ b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBContainer.java @@ -45,6 +45,7 @@ public final class InfluxDBContainer<SELF extends InfluxDBContainer<SELF>> public static final String username = "test-user"; public static final String password = "test-password"; + public static final String token = "access-token"; public static final String bucket = "test-bucket"; public static final String organization = "test-org"; private static final int retention = 0;
