This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new beb8418ba7c [improve][io] Make connectors load sensitive fields from
secrets (#21675)
beb8418ba7c is described below
commit beb8418ba7c667ab713fb480a03ebb0deb41db1d
Author: jiangpengcheng <[email protected]>
AuthorDate: Mon Dec 11 09:13:43 2023 +0800
[improve][io] Make connectors load sensitive fields from secrets (#21675)
---
pulsar-io/canal/pom.xml | 5 ++
.../pulsar/io/canal/CanalAbstractSource.java | 2 +-
.../apache/pulsar/io/canal/CanalSourceConfig.java | 7 +--
.../org/apache/pulsar/io/common/IOConfigUtils.java | 7 +--
.../apache/pulsar/io/common/IOConfigUtilsTest.java | 11 ++++
pulsar-io/dynamodb/pom.xml | 6 +++
.../apache/pulsar/io/dynamodb/DynamoDBSource.java | 2 +-
.../pulsar/io/dynamodb/DynamoDBSourceConfig.java | 8 +--
.../io/dynamodb/DynamoDBSourceConfigTests.java | 52 +++++++++++++++++--
pulsar-io/influxdb/pom.xml | 5 ++
.../io/influxdb/InfluxDBGenericRecordSink.java | 4 +-
.../io/influxdb/v1/InfluxDBAbstractSink.java | 2 +-
.../pulsar/io/influxdb/v1/InfluxDBSinkConfig.java | 11 ++--
.../apache/pulsar/io/influxdb/v2/InfluxDBSink.java | 2 +-
.../pulsar/io/influxdb/v2/InfluxDBSinkConfig.java | 14 ++---
.../io/influxdb/v1/InfluxDBSinkConfigTest.java | 56 +++++++++++++++++---
.../io/influxdb/v2/InfluxDBSinkConfigTest.java | 29 +++++++++--
pulsar-io/jdbc/core/pom.xml | 6 +++
.../apache/pulsar/io/jdbc/JdbcAbstractSink.java | 2 +-
.../org/apache/pulsar/io/jdbc/JdbcSinkConfig.java | 7 +--
pulsar-io/kafka/pom.xml | 5 ++
.../apache/pulsar/io/kafka/KafkaAbstractSink.java | 6 +--
.../pulsar/io/kafka/KafkaAbstractSource.java | 2 +-
.../apache/pulsar/io/kafka/KafkaSinkConfig.java | 11 ++--
.../apache/pulsar/io/kafka/KafkaSourceConfig.java | 9 +++-
.../io/kafka/sink/KafkaAbstractSinkTest.java | 8 +--
.../io/kafka/source/KafkaAbstractSourceTest.java | 26 ++++++++--
pulsar-io/mongo/pom.xml | 5 ++
.../pulsar/io/mongodb/MongoAbstractConfig.java | 3 +-
.../org/apache/pulsar/io/mongodb/MongoSink.java | 2 +-
.../apache/pulsar/io/mongodb/MongoSinkConfig.java | 9 ++--
.../org/apache/pulsar/io/mongodb/MongoSource.java | 2 +-
.../pulsar/io/mongodb/MongoSourceConfig.java | 10 ++--
.../pulsar/io/mongodb/MongoSinkConfigTest.java | 41 ++++++++++++---
.../pulsar/io/mongodb/MongoSourceConfigTest.java | 38 +++++++++++---
pulsar-io/rabbitmq/pom.xml | 5 ++
.../apache/pulsar/io/rabbitmq/RabbitMQSink.java | 2 +-
.../pulsar/io/rabbitmq/RabbitMQSinkConfig.java | 9 ++--
.../apache/pulsar/io/rabbitmq/RabbitMQSource.java | 2 +-
.../pulsar/io/rabbitmq/RabbitMQSourceConfig.java | 7 +--
.../io/rabbitmq/sink/RabbitMQSinkConfigTest.java | 52 +++++++++++++++++--
.../rabbitmq/source/RabbitMQSourceConfigTest.java | 60 +++++++++++++++++++---
pulsar-io/redis/pom.xml | 5 ++
.../pulsar/io/redis/RedisAbstractConfig.java | 5 +-
.../org/apache/pulsar/io/redis/sink/RedisSink.java | 2 +-
.../pulsar/io/redis/sink/RedisSinkConfig.java | 11 ++--
.../pulsar/io/redis/sink/RedisSinkConfigTest.java | 47 ++++++++++++++---
.../apache/pulsar/io/redis/sink/RedisSinkTest.java | 5 +-
pulsar-io/solr/pom.xml | 5 ++
.../apache/pulsar/io/solr/SolrAbstractSink.java | 2 +-
.../org/apache/pulsar/io/solr/SolrSinkConfig.java | 7 +--
.../apache/pulsar/io/solr/SolrSinkConfigTest.java | 47 ++++++++++++++---
52 files changed, 540 insertions(+), 148 deletions(-)
diff --git a/pulsar-io/canal/pom.xml b/pulsar-io/canal/pom.xml
index 7d4887e6db0..b15c23dd591 100644
--- a/pulsar-io/canal/pom.xml
+++ b/pulsar-io/canal/pom.xml
@@ -37,6 +37,11 @@
</properties>
<dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-io-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-core</artifactId>
diff --git
a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalAbstractSource.java
b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalAbstractSource.java
index 06c8788d5ae..7d0cd0305a4 100644
---
a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalAbstractSource.java
+++
b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalAbstractSource.java
@@ -57,7 +57,7 @@ public abstract class CanalAbstractSource<V> extends
PushSource<V> {
@Override
public void open(Map<String, Object> config, SourceContext sourceContext)
throws Exception {
- canalSourceConfig = CanalSourceConfig.load(config);
+ canalSourceConfig = CanalSourceConfig.load(config, sourceContext);
if (canalSourceConfig.getCluster()) {
connector =
CanalConnectors.newClusterConnector(canalSourceConfig.getZkServers(),
canalSourceConfig.getDestination(),
canalSourceConfig.getUsername(),
diff --git
a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSourceConfig.java
b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSourceConfig.java
index a0408e60e5f..5a754988ffd 100644
---
a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSourceConfig.java
+++
b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSourceConfig.java
@@ -26,6 +26,8 @@ import java.io.Serializable;
import java.util.Map;
import lombok.Data;
import lombok.experimental.Accessors;
+import org.apache.pulsar.io.common.IOConfigUtils;
+import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;
@@ -86,8 +88,7 @@ public class CanalSourceConfig implements Serializable{
}
- public static CanalSourceConfig load(Map<String, Object> map) throws
IOException {
- ObjectMapper mapper = new ObjectMapper();
- return mapper.readValue(mapper.writeValueAsString(map),
CanalSourceConfig.class);
+ public static CanalSourceConfig load(Map<String, Object> map,
SourceContext sourceContext) throws IOException {
+ return IOConfigUtils.loadWithSecrets(map, CanalSourceConfig.class,
sourceContext);
}
}
diff --git
a/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java
b/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java
index d15986a897c..69d981bf687 100644
---
a/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java
+++
b/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java
@@ -77,13 +77,14 @@ public class IOConfigUtils {
}
}
configs.computeIfAbsent(field.getName(), key -> {
- if (fieldDoc.required()) {
- throw new IllegalArgumentException(field.getName()
+ " cannot be null");
- }
+ // Use default value if it is not null before checking
required
String value = fieldDoc.defaultValue();
if (value != null && !value.isEmpty()) {
return value;
}
+ if (fieldDoc.required()) {
+ throw new IllegalArgumentException(field.getName()
+ " cannot be null");
+ }
return null;
});
}
diff --git
a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
index 52afac1a5ac..fdcd2ea7fe8 100644
---
a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
+++
b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
@@ -54,6 +54,14 @@ public class IOConfigUtilsTest {
)
protected String testRequired;
+ @FieldDoc(
+ required = true,
+ defaultValue = "defaultRequired",
+ sensitive = true,
+ help = "testRequired"
+ )
+ protected String testDefaultRequired;
+
@FieldDoc(
required = false,
defaultValue = "defaultStr",
@@ -299,6 +307,9 @@ public class IOConfigUtilsTest {
configMap.put("testRequired", "test");
TestDefaultConfig testDefaultConfig =
IOConfigUtils.loadWithSecrets(configMap,
TestDefaultConfig.class, new TestSinkContext());
+ // if there is default value for a required field and no value
provided when load config,
+ // it should not throw exception but use the default value.
+ Assert.assertEquals(testDefaultConfig.getTestDefaultRequired(),
"defaultRequired");
Assert.assertEquals(testDefaultConfig.getDefaultStr(), "defaultStr");
Assert.assertEquals(testDefaultConfig.isDefaultBool(), true);
Assert.assertEquals(testDefaultConfig.getDefaultInt(), 100);
diff --git a/pulsar-io/dynamodb/pom.xml b/pulsar-io/dynamodb/pom.xml
index 7e5fad8854f..1e5423e94cc 100644
--- a/pulsar-io/dynamodb/pom.xml
+++ b/pulsar-io/dynamodb/pom.xml
@@ -32,6 +32,12 @@
<dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-io-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-core</artifactId>
diff --git
a/pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/DynamoDBSource.java
b/pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/DynamoDBSource.java
index d67c4e21154..2193cf39c17 100644
---
a/pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/DynamoDBSource.java
+++
b/pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/DynamoDBSource.java
@@ -65,7 +65,7 @@ public class DynamoDBSource extends AbstractAwsConnector
implements Source<byte[
@Override
public void open(Map<String, Object> config, SourceContext sourceContext)
throws Exception {
- this.dynamodbSourceConfig = DynamoDBSourceConfig.load(config);
+ this.dynamodbSourceConfig = DynamoDBSourceConfig.load(config,
sourceContext);
checkArgument(isNotBlank(dynamodbSourceConfig.getAwsDynamodbStreamArn()),
"empty dynamo-stream arn");
// Even if the endpoint is set, it seems to require a region to go
with it
diff --git
a/pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/DynamoDBSourceConfig.java
b/pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/DynamoDBSourceConfig.java
index b734dd57411..0547ff8f863 100644
---
a/pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/DynamoDBSourceConfig.java
+++
b/pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/DynamoDBSourceConfig.java
@@ -35,6 +35,8 @@ import java.util.Date;
import java.util.Map;
import lombok.Data;
import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin;
+import org.apache.pulsar.io.common.IOConfigUtils;
+import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;
import software.amazon.awssdk.regions.Region;
@@ -77,6 +79,7 @@ public class DynamoDBSourceConfig implements Serializable {
@FieldDoc(
required = false,
defaultValue = "",
+ sensitive = true,
help = "json-parameters to initialize
`AwsCredentialsProviderPlugin`")
private String awsCredentialPluginParam = "";
@@ -170,9 +173,8 @@ public class DynamoDBSourceConfig implements Serializable {
return mapper.readValue(new File(yamlFile),
DynamoDBSourceConfig.class);
}
- public static DynamoDBSourceConfig load(Map<String, Object> map) throws
IOException {
- ObjectMapper mapper = new ObjectMapper();
- return mapper.readValue(mapper.writeValueAsString(map),
DynamoDBSourceConfig.class);
+ public static DynamoDBSourceConfig load(Map<String, Object> map,
SourceContext sourceContext) throws IOException {
+ return IOConfigUtils.loadWithSecrets(map, DynamoDBSourceConfig.class,
sourceContext);
}
protected Region regionAsV2Region() {
diff --git
a/pulsar-io/dynamodb/src/test/java/org/apache/pulsar/io/dynamodb/DynamoDBSourceConfigTests.java
b/pulsar-io/dynamodb/src/test/java/org/apache/pulsar/io/dynamodb/DynamoDBSourceConfigTests.java
index f84cb785896..bdccaa2e584 100644
---
a/pulsar-io/dynamodb/src/test/java/org/apache/pulsar/io/dynamodb/DynamoDBSourceConfigTests.java
+++
b/pulsar-io/dynamodb/src/test/java/org/apache/pulsar/io/dynamodb/DynamoDBSourceConfigTests.java
@@ -31,6 +31,8 @@ import java.util.HashMap;
import java.util.Map;
import
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import org.apache.pulsar.io.core.SourceContext;
+import org.mockito.Mockito;
import org.testng.annotations.Test;
@@ -90,7 +92,8 @@ public class DynamoDBSourceConfigTests {
map.put("initialPositionInStream",
InitialPositionInStream.TRIM_HORIZON);
map.put("startAtTime", DAY);
- DynamoDBSourceConfig config = DynamoDBSourceConfig.load(map);
+ SourceContext sourceContext = Mockito.mock(SourceContext.class);
+ DynamoDBSourceConfig config = DynamoDBSourceConfig.load(map,
sourceContext);
assertNotNull(config);
assertEquals(config.getAwsEndpoint(), "https://some.endpoint.aws");
@@ -111,7 +114,46 @@ public class DynamoDBSourceConfigTests {
ZonedDateTime expected = ZonedDateTime.ofInstant(DAY.toInstant(),
ZoneOffset.UTC);
assertEquals(actual, expected);
}
-
+
+ @Test
+ public final void loadFromMapCredentialFromSecretTest() throws IOException
{
+ Map<String, Object> map = new HashMap<String, Object> ();
+ map.put("awsEndpoint", "https://some.endpoint.aws");
+ map.put("awsRegion", "us-east-1");
+ map.put("awsDynamodbStreamArn",
"arn:aws:dynamodb:us-west-2:111122223333:table/TestTable/stream/2015-05-11T21:21:33.291");
+ map.put("checkpointInterval", "30000");
+ map.put("backoffTime", "4000");
+ map.put("numRetries", "3");
+ map.put("receiveQueueSize", 2000);
+ map.put("applicationName", "My test application");
+ map.put("initialPositionInStream",
InitialPositionInStream.TRIM_HORIZON);
+ map.put("startAtTime", DAY);
+
+ SourceContext sourceContext = Mockito.mock(SourceContext.class);
+ Mockito.when(sourceContext.getSecret("awsCredentialPluginParam"))
+
.thenReturn("{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
+ DynamoDBSourceConfig config = DynamoDBSourceConfig.load(map,
sourceContext);
+
+ assertNotNull(config);
+ assertEquals(config.getAwsEndpoint(), "https://some.endpoint.aws");
+ assertEquals(config.getAwsRegion(), "us-east-1");
+ assertEquals(config.getAwsDynamodbStreamArn(),
"arn:aws:dynamodb:us-west-2:111122223333:table/TestTable/stream/2015-05-11T21:21:33.291");
+ assertEquals(config.getAwsCredentialPluginParam(),
+ "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
+ assertEquals(config.getApplicationName(), "My test application");
+ assertEquals(config.getCheckpointInterval(), 30000);
+ assertEquals(config.getBackoffTime(), 4000);
+ assertEquals(config.getNumRetries(), 3);
+ assertEquals(config.getReceiveQueueSize(), 2000);
+ assertEquals(config.getInitialPositionInStream(),
InitialPositionInStream.TRIM_HORIZON);
+
+ Calendar cal = Calendar.getInstance();
+ cal.setTime(config.getStartAtTime());
+ ZonedDateTime actual = ZonedDateTime.ofInstant(cal.toInstant(),
ZoneOffset.UTC);
+ ZonedDateTime expected = ZonedDateTime.ofInstant(DAY.toInstant(),
ZoneOffset.UTC);
+ assertEquals(actual, expected);
+ }
+
@Test(expectedExceptions = IllegalArgumentException.class,
expectedExceptionsMessageRegExp = "empty aws-credential param")
public final void missingCredentialsTest() throws Exception {
@@ -121,7 +163,8 @@ public class DynamoDBSourceConfigTests {
map.put("awsDynamodbStreamArn",
"arn:aws:dynamodb:us-west-2:111122223333:table/TestTable/stream/2015-05-11T21:21:33.291");
DynamoDBSource source = new DynamoDBSource();
- source.open(map, null);
+ SourceContext sourceContext = Mockito.mock(SourceContext.class);
+ source.open(map, sourceContext);
}
@Test(expectedExceptions = IllegalArgumentException.class,
@@ -136,7 +179,8 @@ public class DynamoDBSourceConfigTests {
map.put("initialPositionInStream",
InitialPositionInStream.AT_TIMESTAMP);
DynamoDBSource source = new DynamoDBSource();
- source.open(map, null);
+ SourceContext sourceContext = Mockito.mock(SourceContext.class);
+ source.open(map, sourceContext);
}
private File getFile(String name) {
diff --git a/pulsar-io/influxdb/pom.xml b/pulsar-io/influxdb/pom.xml
index 9c58cd40093..ec3d1c8d2f7 100644
--- a/pulsar-io/influxdb/pom.xml
+++ b/pulsar-io/influxdb/pom.xml
@@ -32,6 +32,11 @@
<name>Pulsar IO :: InfluxDB</name>
<dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-io-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-core</artifactId>
diff --git
a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSink.java
b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSink.java
index 5b51461fc7b..0d431f84c52 100644
---
a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSink.java
+++
b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSink.java
@@ -46,12 +46,12 @@ public class InfluxDBGenericRecordSink implements
Sink<GenericRecord> {
@Override
public void open(Map<String, Object> map, SinkContext sinkContext) throws
Exception {
try {
- val configV2 = InfluxDBSinkConfig.load(map);
+ val configV2 = InfluxDBSinkConfig.load(map, sinkContext);
configV2.validate();
sink = new InfluxDBSink();
} catch (Exception e) {
try {
- val configV1 =
org.apache.pulsar.io.influxdb.v1.InfluxDBSinkConfig.load(map);
+ val configV1 =
org.apache.pulsar.io.influxdb.v1.InfluxDBSinkConfig.load(map, sinkContext);
configV1.validate();
sink = new
org.apache.pulsar.io.influxdb.v1.InfluxDBGenericRecordSink();
} catch (Exception e1) {
diff --git
a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v1/InfluxDBAbstractSink.java
b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v1/InfluxDBAbstractSink.java
index 06856bad80e..217c5304b24 100644
---
a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v1/InfluxDBAbstractSink.java
+++
b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v1/InfluxDBAbstractSink.java
@@ -43,7 +43,7 @@ public abstract class InfluxDBAbstractSink<T> extends
BatchSink<Point, T> {
@Override
public void open(Map<String, Object> config, SinkContext sinkContext)
throws Exception {
- InfluxDBSinkConfig influxDBSinkConfig =
InfluxDBSinkConfig.load(config);
+ InfluxDBSinkConfig influxDBSinkConfig =
InfluxDBSinkConfig.load(config, sinkContext);
influxDBSinkConfig.validate();
super.init(influxDBSinkConfig.getBatchTimeMs(),
influxDBSinkConfig.getBatchSize());
diff --git
a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v1/InfluxDBSinkConfig.java
b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v1/InfluxDBSinkConfig.java
index 9b7d8e1ce90..4ae2cf1e4a3 100644
---
a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v1/InfluxDBSinkConfig.java
+++
b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v1/InfluxDBSinkConfig.java
@@ -27,6 +27,8 @@ import java.io.Serializable;
import java.util.Map;
import lombok.Data;
import lombok.experimental.Accessors;
+import org.apache.pulsar.io.common.IOConfigUtils;
+import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;
/**
@@ -94,7 +96,7 @@ public class InfluxDBSinkConfig implements Serializable {
@FieldDoc(
required = false,
- defaultValue = "1000L",
+ defaultValue = "1000",
help = "The InfluxDB operation time in milliseconds")
private long batchTimeMs = 1000L;
@@ -110,14 +112,11 @@ public class InfluxDBSinkConfig implements Serializable {
return mapper.readValue(new File(yamlFile), InfluxDBSinkConfig.class);
}
- public static InfluxDBSinkConfig load(Map<String, Object> map) throws
IOException {
- ObjectMapper mapper = new ObjectMapper();
- return mapper.readValue(mapper.writeValueAsString(map),
InfluxDBSinkConfig.class);
+ public static InfluxDBSinkConfig load(Map<String, Object> map, SinkContext
sinkContext) throws IOException {
+ return IOConfigUtils.loadWithSecrets(map, InfluxDBSinkConfig.class,
sinkContext);
}
public void validate() {
- Preconditions.checkNotNull(influxdbUrl, "influxdbUrl property not
set.");
- Preconditions.checkNotNull(database, "database property not set.");
Preconditions.checkArgument(batchSize > 0, "batchSize must be a
positive integer.");
Preconditions.checkArgument(batchTimeMs > 0, "batchTimeMs must be a
positive long.");
}
diff --git
a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSink.java
b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSink.java
index 08f1ab23399..0aa43570596 100644
---
a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSink.java
+++
b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSink.java
@@ -49,7 +49,7 @@ public class InfluxDBSink extends BatchSink<Point,
GenericRecord> {
@Override
public void open(Map<String, Object> config, SinkContext sinkContext)
throws Exception {
- InfluxDBSinkConfig influxDBSinkConfig =
InfluxDBSinkConfig.load(config);
+ InfluxDBSinkConfig influxDBSinkConfig =
InfluxDBSinkConfig.load(config, sinkContext);
influxDBSinkConfig.validate();
super.init(influxDBSinkConfig.getBatchTimeMs(),
influxDBSinkConfig.getBatchSize());
diff --git
a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSinkConfig.java
b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSinkConfig.java
index 899b00c0021..ea87ee66b90 100644
---
a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSinkConfig.java
+++
b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSinkConfig.java
@@ -27,6 +27,8 @@ import java.io.Serializable;
import java.util.Map;
import lombok.Data;
import lombok.experimental.Accessors;
+import org.apache.pulsar.io.common.IOConfigUtils;
+import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;
/**
@@ -87,7 +89,7 @@ public class InfluxDBSinkConfig implements Serializable {
@FieldDoc(
required = false,
- defaultValue = "1000L",
+ defaultValue = "1000",
help = "The InfluxDB operation time in milliseconds")
private long batchTimeMs = 1000;
@@ -103,17 +105,11 @@ public class InfluxDBSinkConfig implements Serializable {
return mapper.readValue(new File(yamlFile), InfluxDBSinkConfig.class);
}
- public static InfluxDBSinkConfig load(Map<String, Object> map) throws
IOException {
- ObjectMapper mapper = new ObjectMapper();
- return mapper.readValue(mapper.writeValueAsString(map),
InfluxDBSinkConfig.class);
+ public static InfluxDBSinkConfig load(Map<String, Object> map, SinkContext
sinkContext) throws IOException {
+ return IOConfigUtils.loadWithSecrets(map, InfluxDBSinkConfig.class,
sinkContext);
}
public void validate() {
- Preconditions.checkNotNull(influxdbUrl, "influxdbUrl property not
set.");
- Preconditions.checkNotNull(token, "token property not set.");
- Preconditions.checkNotNull(organization, "organization property not
set.");
- Preconditions.checkNotNull(bucket, "bucket property not set.");
-
Preconditions.checkArgument(batchSize > 0, "batchSize must be a
positive integer.");
Preconditions.checkArgument(batchTimeMs > 0, "batchTimeMs must be a
positive long.");
}
diff --git
a/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/v1/InfluxDBSinkConfigTest.java
b/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/v1/InfluxDBSinkConfigTest.java
index 4493dcfb248..10b1bfb624f 100644
---
a/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/v1/InfluxDBSinkConfigTest.java
+++
b/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/v1/InfluxDBSinkConfigTest.java
@@ -18,7 +18,9 @@
*/
package org.apache.pulsar.io.influxdb.v1;
+import org.apache.pulsar.io.core.SinkContext;
import org.influxdb.InfluxDB;
+import org.mockito.Mockito;
import org.testng.annotations.Test;
import java.io.File;
@@ -60,8 +62,11 @@ public class InfluxDBSinkConfigTest {
map.put("gzipEnable", "false");
map.put("batchTimeMs", "1000");
map.put("batchSize", "100");
+ map.put("username", "admin");
+ map.put("password", "admin");
- InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map);
+ SinkContext sinkContext = Mockito.mock(SinkContext.class);
+ InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map, sinkContext);
assertNotNull(config);
assertEquals("http://localhost:8086", config.getInfluxdbUrl());
assertEquals("test_db", config.getDatabase());
@@ -71,6 +76,39 @@ public class InfluxDBSinkConfigTest {
assertEquals(Boolean.parseBoolean("false"), config.isGzipEnable());
assertEquals(Long.parseLong("1000"), config.getBatchTimeMs());
assertEquals(Integer.parseInt("100"), config.getBatchSize());
+ assertEquals("admin", config.getUsername());
+ assertEquals("admin", config.getPassword());
+ }
+
+ @Test
+ public final void loadFromMapCredentialFromSecretTest() throws IOException
{
+ Map<String, Object> map = new HashMap<>();
+ map.put("influxdbUrl", "http://localhost:8086");
+ map.put("database", "test_db");
+ map.put("consistencyLevel", "ONE");
+ map.put("logLevel", "NONE");
+ map.put("retentionPolicy", "autogen");
+ map.put("gzipEnable", "false");
+ map.put("batchTimeMs", "1000");
+ map.put("batchSize", "100");
+
+ SinkContext sinkContext = Mockito.mock(SinkContext.class);
+ Mockito.when(sinkContext.getSecret("username"))
+ .thenReturn("admin");
+ Mockito.when(sinkContext.getSecret("password"))
+ .thenReturn("admin");
+ InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map, sinkContext);
+ assertNotNull(config);
+ assertEquals("http://localhost:8086", config.getInfluxdbUrl());
+ assertEquals("test_db", config.getDatabase());
+ assertEquals("ONE", config.getConsistencyLevel());
+ assertEquals("NONE", config.getLogLevel());
+ assertEquals("autogen", config.getRetentionPolicy());
+ assertEquals(Boolean.parseBoolean("false"), config.isGzipEnable());
+ assertEquals(Long.parseLong("1000"), config.getBatchTimeMs());
+ assertEquals(Integer.parseInt("100"), config.getBatchSize());
+ assertEquals("admin", config.getUsername());
+ assertEquals("admin", config.getPassword());
}
@Test
@@ -85,12 +123,13 @@ public class InfluxDBSinkConfigTest {
map.put("batchTimeMs", "1000");
map.put("batchSize", "100");
- InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map);
+ SinkContext sinkContext = Mockito.mock(SinkContext.class);
+ InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map, sinkContext);
config.validate();
}
- @Test(expectedExceptions = NullPointerException.class,
- expectedExceptionsMessageRegExp = "influxdbUrl property not set.")
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = "influxdbUrl cannot be null")
public final void missingInfluxdbUrlValidateTest() throws IOException {
Map<String, Object> map = new HashMap<>();
map.put("database", "test_db");
@@ -101,7 +140,8 @@ public class InfluxDBSinkConfigTest {
map.put("batchTimeMs", "1000");
map.put("batchSize", "100");
- InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map);
+ SinkContext sinkContext = Mockito.mock(SinkContext.class);
+ InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map, sinkContext);
config.validate();
}
@@ -118,7 +158,8 @@ public class InfluxDBSinkConfigTest {
map.put("batchTimeMs", "1000");
map.put("batchSize", "-100");
- InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map);
+ SinkContext sinkContext = Mockito.mock(SinkContext.class);
+ InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map, sinkContext);
config.validate();
}
@@ -135,7 +176,8 @@ public class InfluxDBSinkConfigTest {
map.put("batchTimeMs", "1000");
map.put("batchSize", "100");
- InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map);
+ SinkContext sinkContext = Mockito.mock(SinkContext.class);
+ InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map, sinkContext);
config.validate();
InfluxDB.ConsistencyLevel.valueOf(config.getConsistencyLevel().toUpperCase());
diff --git
a/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSinkConfigTest.java
b/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSinkConfigTest.java
index df1f7fd29a6..d6cee1e308d 100644
---
a/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSinkConfigTest.java
+++
b/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSinkConfigTest.java
@@ -24,6 +24,8 @@ import static org.testng.Assert.assertNotNull;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
+import org.apache.pulsar.io.core.SinkContext;
+import org.mockito.Mockito;
import org.testng.annotations.Test;
public class InfluxDBSinkConfigTest {
@@ -58,18 +60,34 @@ public class InfluxDBSinkConfigTest {
public final void testLoadFromMap() throws Exception {
Map<String, Object> map = buildValidConfigMap();
- InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map);
+ SinkContext sinkContext = Mockito.mock(SinkContext.class);
+ InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map, sinkContext);
assertNotNull(config);
config.validate();
verifyValues(config);
}
- @Test(expectedExceptions = NullPointerException.class,
- expectedExceptionsMessageRegExp = "influxdbUrl property not set.")
+ @Test
+ public final void testLoadFromMapCredentialFromSecret() throws Exception {
+ Map<String, Object> map = buildValidConfigMap();
+ map.remove("token");
+
+ SinkContext sinkContext = Mockito.mock(SinkContext.class);
+ Mockito.when(sinkContext.getSecret("token"))
+ .thenReturn("xxxx");
+ InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map, sinkContext);
+ assertNotNull(config);
+ config.validate();
+ verifyValues(config);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = "influxdbUrl cannot be null")
public void testRequiredConfigMissing() throws Exception {
Map<String, Object> map = buildValidConfigMap();
map.remove("influxdbUrl");
- InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map);
+ SinkContext sinkContext = Mockito.mock(SinkContext.class);
+ InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map, sinkContext);
config.validate();
}
@@ -78,7 +96,8 @@ public class InfluxDBSinkConfigTest {
public void testBatchConfig() throws Exception {
Map<String, Object> map = buildValidConfigMap();
map.put("batchSize", -1);
- InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map);
+ SinkContext sinkContext = Mockito.mock(SinkContext.class);
+ InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map, sinkContext);
config.validate();
}
diff --git a/pulsar-io/jdbc/core/pom.xml b/pulsar-io/jdbc/core/pom.xml
index 324aa174aca..7870eaa6454 100644
--- a/pulsar-io/jdbc/core/pom.xml
+++ b/pulsar-io/jdbc/core/pom.xml
@@ -32,6 +32,12 @@
<name>Pulsar IO :: Jdbc :: Core</name>
<dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-io-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-core</artifactId>
diff --git
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
index 4586fcebcf1..ca33b3cfdab 100644
---
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
+++
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
@@ -76,7 +76,7 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
@Override
public void open(Map<String, Object> config, SinkContext sinkContext)
throws Exception {
- jdbcSinkConfig = JdbcSinkConfig.load(config);
+ jdbcSinkConfig = JdbcSinkConfig.load(config, sinkContext);
jdbcSinkConfig.validate();
jdbcUrl = jdbcSinkConfig.getJdbcUrl();
diff --git
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
index f798d94f7c3..854d6838131 100644
---
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
+++
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
@@ -26,6 +26,8 @@ import java.io.Serializable;
import java.util.Map;
import lombok.Data;
import lombok.experimental.Accessors;
+import org.apache.pulsar.io.common.IOConfigUtils;
+import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;
@Data
@@ -145,9 +147,8 @@ public class JdbcSinkConfig implements Serializable {
return mapper.readValue(new File(yamlFile), JdbcSinkConfig.class);
}
- public static JdbcSinkConfig load(Map<String, Object> map) throws
IOException {
- ObjectMapper mapper = new ObjectMapper();
- return mapper.readValue(mapper.writeValueAsString(map),
JdbcSinkConfig.class);
+ public static JdbcSinkConfig load(Map<String, Object> map, SinkContext
sinkContext) throws IOException {
+ return IOConfigUtils.loadWithSecrets(map, JdbcSinkConfig.class,
sinkContext);
}
public void validate() {
diff --git a/pulsar-io/kafka/pom.xml b/pulsar-io/kafka/pom.xml
index 190e90209db..adbbce1348e 100644
--- a/pulsar-io/kafka/pom.xml
+++ b/pulsar-io/kafka/pom.xml
@@ -46,6 +46,11 @@
</dependencyManagement>
<dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-io-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>${project.groupId}</groupId>
diff --git
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
index 8fbd6c81861..b1710d2c0b7 100644
---
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
+++
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.io.kafka;
import java.io.IOException;
import java.util.Map;
-import java.util.Objects;
import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@@ -79,10 +78,7 @@ public abstract class KafkaAbstractSink<K, V> implements
Sink<byte[]> {
@Override
public void open(Map<String, Object> config, SinkContext sinkContext)
throws Exception {
- kafkaSinkConfig = KafkaSinkConfig.load(config);
- Objects.requireNonNull(kafkaSinkConfig.getTopic(), "Kafka topic is not
set");
- Objects.requireNonNull(kafkaSinkConfig.getBootstrapServers(), "Kafka
bootstrapServers is not set");
- Objects.requireNonNull(kafkaSinkConfig.getAcks(), "Kafka acks mode is
not set");
+ kafkaSinkConfig = KafkaSinkConfig.load(config, sinkContext);
if (kafkaSinkConfig.getBatchSize() <= 0) {
throw new IllegalArgumentException("Invalid Kafka Producer
batchSize : "
+ kafkaSinkConfig.getBatchSize());
diff --git
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
index ceee8e0b04c..cd0db92bd78 100644
---
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
+++
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
@@ -65,7 +65,7 @@ public abstract class KafkaAbstractSource<V> extends
KafkaPushSource<V> {
@Override
public void open(Map<String, Object> config, SourceContext sourceContext)
throws Exception {
- kafkaSourceConfig = KafkaSourceConfig.load(config);
+ kafkaSourceConfig = KafkaSourceConfig.load(config, sourceContext);
Objects.requireNonNull(kafkaSourceConfig.getTopic(), "Kafka topic is
not set");
Objects.requireNonNull(kafkaSourceConfig.getBootstrapServers(), "Kafka
bootstrapServers is not set");
Objects.requireNonNull(kafkaSourceConfig.getGroupId(), "Kafka consumer
group id is not set");
diff --git
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSinkConfig.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSinkConfig.java
index dbbf1a7b5e2..4ea5fc897d6 100644
---
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSinkConfig.java
+++
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSinkConfig.java
@@ -26,6 +26,8 @@ import java.io.Serializable;
import java.util.Map;
import lombok.Data;
import lombok.experimental.Accessors;
+import org.apache.pulsar.io.common.IOConfigUtils;
+import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;
@Data
@@ -91,12 +93,12 @@ public class KafkaSinkConfig implements Serializable {
+ " before considering a request complete. This controls
the durability of records that are sent.")
private String acks;
@FieldDoc(
- defaultValue = "16384L",
+ defaultValue = "16384",
help = "The batch size that Kafka producer will attempt to batch
records together"
+ " before sending them to brokers.")
private long batchSize = 16384L;
@FieldDoc(
- defaultValue = "1048576L",
+ defaultValue = "1048576",
help =
"The maximum size of a Kafka request in bytes.")
private long maxRequestSize = 1048576L;
@@ -129,8 +131,7 @@ public class KafkaSinkConfig implements Serializable {
return mapper.readValue(new File(yamlFile), KafkaSinkConfig.class);
}
- public static KafkaSinkConfig load(Map<String, Object> map) throws
IOException {
- ObjectMapper mapper = new ObjectMapper();
- return mapper.readValue(mapper.writeValueAsString(map),
KafkaSinkConfig.class);
+ public static KafkaSinkConfig load(Map<String, Object> map, SinkContext
sinkContext) throws IOException {
+ return IOConfigUtils.loadWithSecrets(map, KafkaSinkConfig.class,
sinkContext);
}
}
\ No newline at end of file
diff --git
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java
index ad2e121d26a..422659cdda4 100644
---
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java
+++
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java
@@ -27,6 +27,7 @@ import java.io.Serializable;
import java.util.Map;
import lombok.Data;
import lombok.experimental.Accessors;
+import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;
@Data
@@ -158,8 +159,14 @@ public class KafkaSourceConfig implements Serializable {
return mapper.readValue(new File(yamlFile), KafkaSourceConfig.class);
}
- public static KafkaSourceConfig load(Map<String, Object> map) throws
IOException {
+ public static KafkaSourceConfig load(Map<String, Object> map,
SourceContext sourceContext) throws IOException {
ObjectMapper mapper = new ObjectMapper();
+ // since the KafkaSourceConfig requires the
ACCEPT_EMPTY_STRING_AS_NULL_OBJECT feature
+ // We manually set the sensitive fields here instead of calling
`IOConfigUtils.loadWithSecrets`
+ String sslTruststorePassword =
sourceContext.getSecret("sslTruststorePassword");
+ if (sslTruststorePassword != null) {
+ map.put("sslTruststorePassword", sslTruststorePassword);
+ }
mapper.enable(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT);
return mapper.readValue(mapper.writeValueAsString(map),
KafkaSourceConfig.class);
}
diff --git
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
index ec9ee4a957d..c428f38fe08 100644
---
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
+++
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
@@ -188,12 +188,12 @@ public class KafkaAbstractSinkTest {
sink.close();
}
};
- expectThrows(NullPointerException.class, "Kafka topic is not set",
openAndClose);
- config.put("topic", "topic_2");
- expectThrows(NullPointerException.class, "Kafka bootstrapServers is
not set", openAndClose);
+ expectThrows(IllegalArgumentException.class, "bootstrapServers cannot
be null", openAndClose);
config.put("bootstrapServers", "localhost:6667");
- expectThrows(NullPointerException.class, "Kafka acks mode is not set",
openAndClose);
+ expectThrows(IllegalArgumentException.class, "acks cannot be null",
openAndClose);
config.put("acks", "1");
+ expectThrows(IllegalArgumentException.class, "topic cannot be null",
openAndClose);
+ config.put("topic", "topic_2");
config.put("batchSize", "-1");
expectThrows(IllegalArgumentException.class, "Invalid Kafka Producer
batchSize : -1", openAndClose);
config.put("batchSize", "16384");
diff --git
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
index 6911ec2a6bf..0b8ba4b40ee 100644
---
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
+++
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
@@ -111,19 +111,39 @@ public class KafkaAbstractSourceTest {
public void loadConsumerConfigPropertiesFromMapTest() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put("consumerConfigProperties", "");
- KafkaSourceConfig kafkaSourceConfig = KafkaSourceConfig.load(config);
+ config.put("bootstrapServers", "localhost:8080");
+ config.put("groupId", "test-group");
+ config.put("topic", "test-topic");
+ SourceContext sourceContext = Mockito.mock(SourceContext.class);
+ KafkaSourceConfig kafkaSourceConfig = KafkaSourceConfig.load(config,
sourceContext);
assertNotNull(kafkaSourceConfig);
assertNull(kafkaSourceConfig.getConsumerConfigProperties());
config.put("consumerConfigProperties", null);
- kafkaSourceConfig = KafkaSourceConfig.load(config);
+ kafkaSourceConfig = KafkaSourceConfig.load(config, sourceContext);
assertNull(kafkaSourceConfig.getConsumerConfigProperties());
config.put("consumerConfigProperties", ImmutableMap.of("foo", "bar"));
- kafkaSourceConfig = KafkaSourceConfig.load(config);
+ kafkaSourceConfig = KafkaSourceConfig.load(config, sourceContext);
assertEquals(kafkaSourceConfig.getConsumerConfigProperties(),
ImmutableMap.of("foo", "bar"));
}
+ @Test
+ public void loadSensitiveFieldsFromSecretTest() throws Exception {
+ Map<String, Object> config = new HashMap<>();
+ config.put("consumerConfigProperties", "");
+ config.put("bootstrapServers", "localhost:8080");
+ config.put("groupId", "test-group");
+ config.put("topic", "test-topic");
+ SourceContext sourceContext = Mockito.mock(SourceContext.class);
+ Mockito.when(sourceContext.getSecret("sslTruststorePassword"))
+ .thenReturn("xxxx");
+ KafkaSourceConfig kafkaSourceConfig = KafkaSourceConfig.load(config,
sourceContext);
+ assertNotNull(kafkaSourceConfig);
+ assertNull(kafkaSourceConfig.getConsumerConfigProperties());
+ assertEquals("xxxx", kafkaSourceConfig.getSslTruststorePassword());
+ }
+
@Test
public final void loadFromYamlFileTest() throws IOException {
File yamlFile = getFile("kafkaSourceConfig.yaml");
diff --git a/pulsar-io/mongo/pom.xml b/pulsar-io/mongo/pom.xml
index 9dc72861a75..25c6c59d7a1 100644
--- a/pulsar-io/mongo/pom.xml
+++ b/pulsar-io/mongo/pom.xml
@@ -37,6 +37,11 @@
</properties>
<dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-io-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>${project.parent.groupId}</groupId>
<artifactId>pulsar-io-core</artifactId>
diff --git
a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoAbstractConfig.java
b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoAbstractConfig.java
index 35c327ed82b..74f077da620 100644
---
a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoAbstractConfig.java
+++
b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoAbstractConfig.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import java.io.Serializable;
import lombok.Data;
import lombok.experimental.Accessors;
-import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.io.core.annotations.FieldDoc;
/**
@@ -42,6 +41,7 @@ public abstract class MongoAbstractConfig implements
Serializable {
@FieldDoc(
required = true,
+ sensitive = true, // it may contain password
defaultValue = "",
help = "The URI of MongoDB that the connector connects to "
+ "(see:
https://docs.mongodb.com/manual/reference/connection-string/)"
@@ -95,7 +95,6 @@ public abstract class MongoAbstractConfig implements
Serializable {
}
public void validate() {
- checkArgument(!StringUtils.isEmpty(getMongoUri()), "Required MongoDB
URI is not set.");
checkArgument(getBatchSize() > 0, "batchSize must be a positive
integer.");
checkArgument(getBatchTimeMs() > 0, "batchTimeMs must be a positive
long.");
}
diff --git
a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java
b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java
index 2206d232eaf..61d5aeb697e 100644
--- a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java
+++ b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java
@@ -86,7 +86,7 @@ public class MongoSink implements Sink<byte[]> {
public void open(Map<String, Object> config, SinkContext sinkContext)
throws Exception {
log.info("Open MongoDB Sink");
- mongoSinkConfig = MongoSinkConfig.load(config);
+ mongoSinkConfig = MongoSinkConfig.load(config, sinkContext);
mongoSinkConfig.validate();
if (clientProvider != null) {
diff --git
a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSinkConfig.java
b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSinkConfig.java
index 285f3c97bef..9431fe49108 100644
---
a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSinkConfig.java
+++
b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSinkConfig.java
@@ -30,6 +30,8 @@ import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.io.common.IOConfigUtils;
+import org.apache.pulsar.io.core.SinkContext;
/**
* Configuration class for the MongoDB Sink Connectors.
@@ -59,11 +61,8 @@ public class MongoSinkConfig extends MongoAbstractConfig {
return cfg;
}
- public static MongoSinkConfig load(Map<String, Object> map) throws
IOException {
- final ObjectMapper mapper = new ObjectMapper();
- final MongoSinkConfig cfg =
mapper.readValue(mapper.writeValueAsString(map), MongoSinkConfig.class);
-
- return cfg;
+ public static MongoSinkConfig load(Map<String, Object> map, SinkContext
sinkContext) throws IOException {
+ return IOConfigUtils.loadWithSecrets(map, MongoSinkConfig.class,
sinkContext);
}
@Override
diff --git
a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSource.java
b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSource.java
index 6ee95fc4cd4..68a31b461a5 100644
---
a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSource.java
+++
b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSource.java
@@ -79,7 +79,7 @@ public class MongoSource extends PushSource<byte[]> {
public void open(Map<String, Object> config, SourceContext sourceContext)
throws Exception {
log.info("Open MongoDB Source");
- mongoSourceConfig = MongoSourceConfig.load(config);
+ mongoSourceConfig = MongoSourceConfig.load(config, sourceContext);
mongoSourceConfig.validate();
if (clientProvider != null) {
diff --git
a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSourceConfig.java
b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSourceConfig.java
index cf887a93bf3..1c0c7f4b365 100644
---
a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSourceConfig.java
+++
b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSourceConfig.java
@@ -29,6 +29,8 @@ import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.io.common.IOConfigUtils;
+import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;
/**
@@ -75,12 +77,8 @@ public class MongoSourceConfig extends MongoAbstractConfig {
return cfg;
}
- public static MongoSourceConfig load(Map<String, Object> map) throws
IOException {
- final ObjectMapper mapper = new ObjectMapper();
- final MongoSourceConfig cfg =
- mapper.readValue(mapper.writeValueAsString(map),
MongoSourceConfig.class);
-
- return cfg;
+ public static MongoSourceConfig load(Map<String, Object> map,
SourceContext sourceContext) throws IOException {
+ return IOConfigUtils.loadWithSecrets(map, MongoSourceConfig.class,
sourceContext);
}
/**
diff --git
a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkConfigTest.java
b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkConfigTest.java
index b1166eac572..c86e45feb23 100644
---
a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkConfigTest.java
+++
b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkConfigTest.java
@@ -19,6 +19,8 @@
package org.apache.pulsar.io.mongodb;
import java.util.Map;
+import org.apache.pulsar.io.core.SinkContext;
+import org.mockito.Mockito;
import org.testng.annotations.Test;
import java.io.File;
@@ -34,7 +36,27 @@ public class MongoSinkConfigTest {
commonConfigMap.put("batchSize", TestHelper.BATCH_SIZE);
commonConfigMap.put("batchTimeMs", TestHelper.BATCH_TIME);
- final MongoSinkConfig cfg = MongoSinkConfig.load(commonConfigMap);
+ SinkContext sinkContext = Mockito.mock(SinkContext.class);
+ final MongoSinkConfig cfg = MongoSinkConfig.load(commonConfigMap,
sinkContext);
+
+ assertEquals(cfg.getMongoUri(), TestHelper.URI);
+ assertEquals(cfg.getDatabase(), TestHelper.DB);
+ assertEquals(cfg.getCollection(), TestHelper.COLL);
+ assertEquals(cfg.getBatchSize(), TestHelper.BATCH_SIZE);
+ assertEquals(cfg.getBatchTimeMs(), TestHelper.BATCH_TIME);
+ }
+
+ @Test
+ public void testLoadMapConfigUrlFromSecret() throws IOException {
+ final Map<String, Object> commonConfigMap =
TestHelper.createCommonConfigMap();
+ commonConfigMap.put("batchSize", TestHelper.BATCH_SIZE);
+ commonConfigMap.put("batchTimeMs", TestHelper.BATCH_TIME);
+ commonConfigMap.remove("mongoUri");
+
+ SinkContext sinkContext = Mockito.mock(SinkContext.class);
+ Mockito.when(sinkContext.getSecret("mongoUri"))
+ .thenReturn(TestHelper.URI);
+ final MongoSinkConfig cfg = MongoSinkConfig.load(commonConfigMap,
sinkContext);
assertEquals(cfg.getMongoUri(), TestHelper.URI);
assertEquals(cfg.getDatabase(), TestHelper.DB);
@@ -44,12 +66,13 @@ public class MongoSinkConfigTest {
}
@Test(expectedExceptions = IllegalArgumentException.class,
- expectedExceptionsMessageRegExp = "Required MongoDB URI is not
set.")
+ expectedExceptionsMessageRegExp = "mongoUri cannot be null")
public void testBadMongoUri() throws IOException {
final Map<String, Object> configMap =
TestHelper.createCommonConfigMap();
TestHelper.removeMongoUri(configMap);
- final MongoSinkConfig cfg = MongoSinkConfig.load(configMap);
+ SinkContext sinkContext = Mockito.mock(SinkContext.class);
+ final MongoSinkConfig cfg = MongoSinkConfig.load(configMap,
sinkContext);
cfg.validate();
}
@@ -60,7 +83,8 @@ public class MongoSinkConfigTest {
final Map<String, Object> configMap =
TestHelper.createCommonConfigMap();
TestHelper.removeDatabase(configMap);
- final MongoSinkConfig cfg = MongoSinkConfig.load(configMap);
+ SinkContext sinkContext = Mockito.mock(SinkContext.class);
+ final MongoSinkConfig cfg = MongoSinkConfig.load(configMap,
sinkContext);
cfg.validate();
}
@@ -71,7 +95,8 @@ public class MongoSinkConfigTest {
final Map<String, Object> configMap =
TestHelper.createCommonConfigMap();
TestHelper.removeCollection(configMap);
- final MongoSinkConfig cfg = MongoSinkConfig.load(configMap);
+ SinkContext sinkContext = Mockito.mock(SinkContext.class);
+ final MongoSinkConfig cfg = MongoSinkConfig.load(configMap,
sinkContext);
cfg.validate();
}
@@ -82,7 +107,8 @@ public class MongoSinkConfigTest {
final Map<String, Object> configMap =
TestHelper.createCommonConfigMap();
TestHelper.putBatchSize(configMap, 0);
- final MongoSinkConfig cfg = MongoSinkConfig.load(configMap);
+ SinkContext sinkContext = Mockito.mock(SinkContext.class);
+ final MongoSinkConfig cfg = MongoSinkConfig.load(configMap,
sinkContext);
cfg.validate();
}
@@ -93,7 +119,8 @@ public class MongoSinkConfigTest {
final Map<String, Object> configMap =
TestHelper.createCommonConfigMap();
TestHelper.putBatchTime(configMap, 0L);
- final MongoSinkConfig cfg = MongoSinkConfig.load(configMap);
+ SinkContext sinkContext = Mockito.mock(SinkContext.class);
+ final MongoSinkConfig cfg = MongoSinkConfig.load(configMap,
sinkContext);
cfg.validate();
}
diff --git
a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceConfigTest.java
b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceConfigTest.java
index e7fd01549b0..528cd0237ef 100644
---
a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceConfigTest.java
+++
b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceConfigTest.java
@@ -23,6 +23,8 @@ import com.fasterxml.jackson.databind.JsonMappingException;
import java.io.File;
import java.io.IOException;
import java.util.Map;
+import org.apache.pulsar.io.core.SourceContext;
+import org.mockito.Mockito;
import org.testng.annotations.Test;
public class MongoSourceConfigTest {
@@ -32,7 +34,27 @@ public class MongoSourceConfigTest {
final Map<String, Object> configMap =
TestHelper.createCommonConfigMap();
TestHelper.putSyncType(configMap, TestHelper.SYNC_TYPE);
- final MongoSourceConfig cfg = MongoSourceConfig.load(configMap);
+ SourceContext sourceContext = Mockito.mock(SourceContext.class);
+ final MongoSourceConfig cfg = MongoSourceConfig.load(configMap,
sourceContext);
+
+ assertEquals(cfg.getMongoUri(), TestHelper.URI);
+ assertEquals(cfg.getDatabase(), TestHelper.DB);
+ assertEquals(cfg.getCollection(), TestHelper.COLL);
+ assertEquals(cfg.getSyncType(), TestHelper.SYNC_TYPE);
+ assertEquals(cfg.getBatchSize(), TestHelper.BATCH_SIZE);
+ assertEquals(cfg.getBatchTimeMs(), TestHelper.BATCH_TIME);
+ }
+
+ @Test
+ public void testLoadMapConfigUriFromSecret() throws IOException {
+ final Map<String, Object> configMap =
TestHelper.createCommonConfigMap();
+ TestHelper.putSyncType(configMap, TestHelper.SYNC_TYPE);
+ configMap.remove("mongoUri");
+
+ SourceContext sourceContext = Mockito.mock(SourceContext.class);
+ Mockito.when(sourceContext.getSecret("mongoUri"))
+ .thenReturn(TestHelper.URI);
+ final MongoSourceConfig cfg = MongoSourceConfig.load(configMap,
sourceContext);
assertEquals(cfg.getMongoUri(), TestHelper.URI);
assertEquals(cfg.getDatabase(), TestHelper.DB);
@@ -43,12 +65,13 @@ public class MongoSourceConfigTest {
}
@Test(expectedExceptions = IllegalArgumentException.class,
- expectedExceptionsMessageRegExp = "Required MongoDB URI is not
set.")
+ expectedExceptionsMessageRegExp = "mongoUri cannot be null")
public void testBadMongoUri() throws IOException {
final Map<String, Object> configMap =
TestHelper.createCommonConfigMap();
TestHelper.removeMongoUri(configMap);
- final MongoSourceConfig cfg = MongoSourceConfig.load(configMap);
+ SourceContext sourceContext = Mockito.mock(SourceContext.class);
+ final MongoSourceConfig cfg = MongoSourceConfig.load(configMap,
sourceContext);
cfg.validate();
}
@@ -61,7 +84,8 @@ public class MongoSourceConfigTest {
final Map<String, Object> configMap =
TestHelper.createCommonConfigMap();
TestHelper.putSyncType(configMap, "wrong_sync_type_str");
- final MongoSourceConfig cfg = MongoSourceConfig.load(configMap);
+ SourceContext sourceContext = Mockito.mock(SourceContext.class);
+ final MongoSourceConfig cfg = MongoSourceConfig.load(configMap,
sourceContext);
cfg.validate();
}
@@ -72,7 +96,8 @@ public class MongoSourceConfigTest {
final Map<String, Object> configMap =
TestHelper.createCommonConfigMap();
TestHelper.putBatchSize(configMap, 0);
- final MongoSourceConfig cfg = MongoSourceConfig.load(configMap);
+ SourceContext sourceContext = Mockito.mock(SourceContext.class);
+ final MongoSourceConfig cfg = MongoSourceConfig.load(configMap,
sourceContext);
cfg.validate();
}
@@ -83,7 +108,8 @@ public class MongoSourceConfigTest {
final Map<String, Object> configMap =
TestHelper.createCommonConfigMap();
TestHelper.putBatchTime(configMap, 0L);
- final MongoSourceConfig cfg = MongoSourceConfig.load(configMap);
+ SourceContext sourceContext = Mockito.mock(SourceContext.class);
+ final MongoSourceConfig cfg = MongoSourceConfig.load(configMap,
sourceContext);
cfg.validate();
}
diff --git a/pulsar-io/rabbitmq/pom.xml b/pulsar-io/rabbitmq/pom.xml
index 85e0452df1b..e35b5516858 100644
--- a/pulsar-io/rabbitmq/pom.xml
+++ b/pulsar-io/rabbitmq/pom.xml
@@ -32,6 +32,11 @@
<dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-io-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-core</artifactId>
diff --git
a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java
index f317a35734e..89192c42346 100644
---
a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java
+++
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java
@@ -53,7 +53,7 @@ public class RabbitMQSink implements Sink<byte[]> {
@Override
public void open(Map<String, Object> config, SinkContext sinkContext)
throws Exception {
- rabbitMQSinkConfig = RabbitMQSinkConfig.load(config);
+ rabbitMQSinkConfig = RabbitMQSinkConfig.load(config, sinkContext);
rabbitMQSinkConfig.validate();
ConnectionFactory connectionFactory =
rabbitMQSinkConfig.createConnectionFactory();
diff --git
a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSinkConfig.java
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSinkConfig.java
index c1f8d6b8ad3..39f97e5e460 100644
---
a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSinkConfig.java
+++
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSinkConfig.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.io.rabbitmq;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
-import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
@@ -28,6 +27,8 @@ import java.util.Map;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
+import org.apache.pulsar.io.common.IOConfigUtils;
+import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;
@Data
@@ -60,14 +61,12 @@ public class RabbitMQSinkConfig extends
RabbitMQAbstractConfig implements Serial
return mapper.readValue(new File(yamlFile), RabbitMQSinkConfig.class);
}
- public static RabbitMQSinkConfig load(Map<String, Object> map) throws
IOException {
- ObjectMapper mapper = new ObjectMapper();
- return mapper.readValue(mapper.writeValueAsString(map),
RabbitMQSinkConfig.class);
+ public static RabbitMQSinkConfig load(Map<String, Object> map, SinkContext
sinkContext) throws IOException {
+ return IOConfigUtils.loadWithSecrets(map, RabbitMQSinkConfig.class,
sinkContext);
}
@Override
public void validate() {
super.validate();
- Preconditions.checkNotNull(exchangeName, "exchangeName property not
set.");
}
}
diff --git
a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
index d15108c4d82..b0b7ef31b08 100644
---
a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
+++
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
@@ -54,7 +54,7 @@ public class RabbitMQSource extends PushSource<byte[]> {
@Override
public void open(Map<String, Object> config, SourceContext sourceContext)
throws Exception {
- rabbitMQSourceConfig = RabbitMQSourceConfig.load(config);
+ rabbitMQSourceConfig = RabbitMQSourceConfig.load(config,
sourceContext);
rabbitMQSourceConfig.validate();
ConnectionFactory connectionFactory =
rabbitMQSourceConfig.createConnectionFactory();
diff --git
a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSourceConfig.java
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSourceConfig.java
index f24018e70da..01e23a71460 100644
---
a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSourceConfig.java
+++
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSourceConfig.java
@@ -28,6 +28,8 @@ import java.util.Map;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
+import org.apache.pulsar.io.common.IOConfigUtils;
+import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;
@Data
@@ -66,9 +68,8 @@ public class RabbitMQSourceConfig extends
RabbitMQAbstractConfig implements Seri
return mapper.readValue(new File(yamlFile),
RabbitMQSourceConfig.class);
}
- public static RabbitMQSourceConfig load(Map<String, Object> map) throws
IOException {
- ObjectMapper mapper = new ObjectMapper();
- return mapper.readValue(mapper.writeValueAsString(map),
RabbitMQSourceConfig.class);
+ public static RabbitMQSourceConfig load(Map<String, Object> map,
SourceContext sourceContext) throws IOException {
+ return IOConfigUtils.loadWithSecrets(map, RabbitMQSourceConfig.class,
sourceContext);
}
@Override
diff --git
a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkConfigTest.java
b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkConfigTest.java
index 3d4fd6f46e1..8706cb56752 100644
---
a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkConfigTest.java
+++
b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkConfigTest.java
@@ -18,7 +18,9 @@
*/
package org.apache.pulsar.io.rabbitmq.sink;
+import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.rabbitmq.RabbitMQSinkConfig;
+import org.mockito.Mockito;
import org.testng.annotations.Test;
import java.io.File;
@@ -71,7 +73,45 @@ public class RabbitMQSinkConfigTest {
map.put("exchangeName", "test-exchange");
map.put("exchangeType", "test-exchange-type");
- RabbitMQSinkConfig config = RabbitMQSinkConfig.load(map);
+ SinkContext sinkContext = Mockito.mock(SinkContext.class);
+ RabbitMQSinkConfig config = RabbitMQSinkConfig.load(map, sinkContext);
+ assertNotNull(config);
+ assertEquals(config.getHost(), "localhost");
+ assertEquals(config.getPort(), Integer.parseInt("5673"));
+ assertEquals(config.getVirtualHost(), "/");
+ assertEquals(config.getUsername(), "guest");
+ assertEquals(config.getPassword(), "guest");
+ assertEquals(config.getConnectionName(), "test-connection");
+ assertEquals(config.getRequestedChannelMax(), Integer.parseInt("0"));
+ assertEquals(config.getRequestedFrameMax(), Integer.parseInt("0"));
+ assertEquals(config.getConnectionTimeout(), Integer.parseInt("60000"));
+ assertEquals(config.getHandshakeTimeout(), Integer.parseInt("10000"));
+ assertEquals(config.getRequestedHeartbeat(), Integer.parseInt("60"));
+ assertEquals(config.getExchangeName(), "test-exchange");
+ assertEquals(config.getExchangeType(), "test-exchange-type");
+ }
+
+ @Test
+ public final void loadFromMapCredentialsFromSecretTest() throws
IOException {
+ Map<String, Object> map = new HashMap<>();
+ map.put("host", "localhost");
+ map.put("port", "5673");
+ map.put("virtualHost", "/");
+ map.put("connectionName", "test-connection");
+ map.put("requestedChannelMax", "0");
+ map.put("requestedFrameMax", "0");
+ map.put("connectionTimeout", "60000");
+ map.put("handshakeTimeout", "10000");
+ map.put("requestedHeartbeat", "60");
+ map.put("exchangeName", "test-exchange");
+ map.put("exchangeType", "test-exchange-type");
+
+ SinkContext sinkContext = Mockito.mock(SinkContext.class);
+ Mockito.when(sinkContext.getSecret("username"))
+ .thenReturn("guest");
+ Mockito.when(sinkContext.getSecret("password"))
+ .thenReturn("guest");
+ RabbitMQSinkConfig config = RabbitMQSinkConfig.load(map, sinkContext);
assertNotNull(config);
assertEquals(config.getHost(), "localhost");
assertEquals(config.getPort(), Integer.parseInt("5673"));
@@ -105,12 +145,13 @@ public class RabbitMQSinkConfigTest {
map.put("exchangeName", "test-exchange");
map.put("exchangeType", "test-exchange-type");
- RabbitMQSinkConfig config = RabbitMQSinkConfig.load(map);
+ SinkContext sinkContext = Mockito.mock(SinkContext.class);
+ RabbitMQSinkConfig config = RabbitMQSinkConfig.load(map, sinkContext);
config.validate();
}
- @Test(expectedExceptions = NullPointerException.class,
- expectedExceptionsMessageRegExp = "exchangeName property not set.")
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = "exchangeName cannot be null")
public final void missingExchangeValidateTest() throws IOException {
Map<String, Object> map = new HashMap<>();
map.put("host", "localhost");
@@ -126,7 +167,8 @@ public class RabbitMQSinkConfigTest {
map.put("requestedHeartbeat", "60");
map.put("exchangeType", "test-exchange-type");
- RabbitMQSinkConfig config = RabbitMQSinkConfig.load(map);
+ SinkContext sinkContext = Mockito.mock(SinkContext.class);
+ RabbitMQSinkConfig config = RabbitMQSinkConfig.load(map, sinkContext);
config.validate();
}
diff --git
a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceConfigTest.java
b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceConfigTest.java
index c33e0070c6f..43a90062fa4 100644
---
a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceConfigTest.java
+++
b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceConfigTest.java
@@ -18,7 +18,9 @@
*/
package org.apache.pulsar.io.rabbitmq.source;
+import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.rabbitmq.RabbitMQSourceConfig;
+import org.mockito.Mockito;
import org.testng.annotations.Test;
import java.io.File;
@@ -76,7 +78,50 @@ public class RabbitMQSourceConfigTest {
map.put("prefetchGlobal", "false");
map.put("passive", "true");
- RabbitMQSourceConfig config = RabbitMQSourceConfig.load(map);
+ SourceContext sourceContext = Mockito.mock(SourceContext.class);
+ RabbitMQSourceConfig config = RabbitMQSourceConfig.load(map,
sourceContext);
+ assertNotNull(config);
+ assertEquals("localhost", config.getHost());
+ assertEquals(Integer.parseInt("5672"), config.getPort());
+ assertEquals("/", config.getVirtualHost());
+ assertEquals("guest", config.getUsername());
+ assertEquals("guest", config.getPassword());
+ assertEquals("test-queue", config.getQueueName());
+ assertEquals("test-connection", config.getConnectionName());
+ assertEquals(Integer.parseInt("0"), config.getRequestedChannelMax());
+ assertEquals(Integer.parseInt("0"), config.getRequestedFrameMax());
+ assertEquals(Integer.parseInt("60000"), config.getConnectionTimeout());
+ assertEquals(Integer.parseInt("10000"), config.getHandshakeTimeout());
+ assertEquals(Integer.parseInt("60"), config.getRequestedHeartbeat());
+ assertEquals(Integer.parseInt("0"), config.getPrefetchCount());
+ assertEquals(false, config.isPrefetchGlobal());
+ assertEquals(false, config.isPrefetchGlobal());
+ assertEquals(true, config.isPassive());
+ }
+
+ @Test
+ public final void loadFromMapCredentialsFromSecretTest() throws
IOException {
+ Map<String, Object> map = new HashMap<>();
+ map.put("host", "localhost");
+ map.put("port", "5672");
+ map.put("virtualHost", "/");
+ map.put("queueName", "test-queue");
+ map.put("connectionName", "test-connection");
+ map.put("requestedChannelMax", "0");
+ map.put("requestedFrameMax", "0");
+ map.put("connectionTimeout", "60000");
+ map.put("handshakeTimeout", "10000");
+ map.put("requestedHeartbeat", "60");
+ map.put("prefetchCount", "0");
+ map.put("prefetchGlobal", "false");
+ map.put("passive", "true");
+
+ SourceContext sourceContext = Mockito.mock(SourceContext.class);
+ Mockito.when(sourceContext.getSecret("username"))
+ .thenReturn("guest");
+ Mockito.when(sourceContext.getSecret("password"))
+ .thenReturn("guest");
+ RabbitMQSourceConfig config = RabbitMQSourceConfig.load(map,
sourceContext);
assertNotNull(config);
assertEquals("localhost", config.getHost());
assertEquals(Integer.parseInt("5672"), config.getPort());
@@ -115,12 +160,13 @@ public class RabbitMQSourceConfigTest {
map.put("prefetchGlobal", "false");
map.put("passive", "false");
- RabbitMQSourceConfig config = RabbitMQSourceConfig.load(map);
+ SourceContext sourceContext = Mockito.mock(SourceContext.class);
+ RabbitMQSourceConfig config = RabbitMQSourceConfig.load(map,
sourceContext);
config.validate();
}
- @Test(expectedExceptions = NullPointerException.class,
- expectedExceptionsMessageRegExp = "host property not set.")
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = "host cannot be null")
public final void missingHostValidateTest() throws IOException {
Map<String, Object> map = new HashMap<>();
map.put("port", "5672");
@@ -138,7 +184,8 @@ public class RabbitMQSourceConfigTest {
map.put("prefetchGlobal", "false");
map.put("passive", "false");
- RabbitMQSourceConfig config = RabbitMQSourceConfig.load(map);
+ SourceContext sourceContext = Mockito.mock(SourceContext.class);
+ RabbitMQSourceConfig config = RabbitMQSourceConfig.load(map,
sourceContext);
config.validate();
}
@@ -162,7 +209,8 @@ public class RabbitMQSourceConfigTest {
map.put("prefetchGlobal", "false");
map.put("passive", "false");
- RabbitMQSourceConfig config = RabbitMQSourceConfig.load(map);
+ SourceContext sourceContext = Mockito.mock(SourceContext.class);
+ RabbitMQSourceConfig config = RabbitMQSourceConfig.load(map,
sourceContext);
config.validate();
}
diff --git a/pulsar-io/redis/pom.xml b/pulsar-io/redis/pom.xml
index 1b9d2d76d3f..ef66a3f37c6 100644
--- a/pulsar-io/redis/pom.xml
+++ b/pulsar-io/redis/pom.xml
@@ -32,6 +32,11 @@
<name>Pulsar IO :: Redis</name>
<dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-io-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-core</artifactId>
diff --git
a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/RedisAbstractConfig.java
b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/RedisAbstractConfig.java
index 978e7de31a5..89ec684dded 100644
---
a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/RedisAbstractConfig.java
+++
b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/RedisAbstractConfig.java
@@ -88,13 +88,11 @@ public class RedisAbstractConfig implements Serializable {
@FieldDoc(
required = false,
- defaultValue = "10000L",
+ defaultValue = "10000",
help = "The amount of time in milliseconds to wait before timing out
when connecting")
private long connectTimeout = 10000L;
public void validate() {
- Preconditions.checkNotNull(redisHosts, "redisHosts property not set.");
- Preconditions.checkNotNull(redisDatabase, "redisDatabase property not
set.");
Preconditions.checkNotNull(clientMode, "clientMode property not set.");
}
@@ -105,7 +103,6 @@ public class RedisAbstractConfig implements Serializable {
public List<HostAndPort> getHostAndPorts() {
List<HostAndPort> hostAndPorts = Lists.newArrayList();
- Preconditions.checkNotNull(redisHosts, "redisHosts property not set.");
String[] hosts = StringUtils.split(redisHosts, ",");
for (String host : hosts) {
HostAndPort hostAndPort = HostAndPort.fromString(host);
diff --git
a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java
b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java
index bff0a5c2da5..ebd6e9dbab2 100644
---
a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java
+++
b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java
@@ -68,7 +68,7 @@ public class RedisSink implements Sink<byte[]> {
public void open(Map<String, Object> config, SinkContext sinkContext)
throws Exception {
log.info("Open Redis Sink");
- redisSinkConfig = RedisSinkConfig.load(config);
+ redisSinkConfig = RedisSinkConfig.load(config, sinkContext);
redisSinkConfig.validate();
redisSession = RedisSession.create(redisSinkConfig);
diff --git
a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSinkConfig.java
b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSinkConfig.java
index a9db66812a4..f7a70cb65a8 100644
---
a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSinkConfig.java
+++
b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSinkConfig.java
@@ -28,6 +28,8 @@ import java.util.Map;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
+import org.apache.pulsar.io.common.IOConfigUtils;
+import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;
import org.apache.pulsar.io.redis.RedisAbstractConfig;
@@ -40,13 +42,13 @@ public class RedisSinkConfig extends RedisAbstractConfig
implements Serializable
@FieldDoc(
required = false,
- defaultValue = "10000L",
+ defaultValue = "10000",
help = "The amount of time in milliseconds before an operation is
marked as timed out")
private long operationTimeout = 10000L;
@FieldDoc(
required = false,
- defaultValue = "1000L",
+ defaultValue = "1000",
help = "The Redis operation time in milliseconds")
private long batchTimeMs = 1000L;
@@ -62,9 +64,8 @@ public class RedisSinkConfig extends RedisAbstractConfig
implements Serializable
return mapper.readValue(new File(yamlFile), RedisSinkConfig.class);
}
- public static RedisSinkConfig load(Map<String, Object> map) throws
IOException {
- ObjectMapper mapper = new ObjectMapper();
- return mapper.readValue(mapper.writeValueAsString(map),
RedisSinkConfig.class);
+ public static RedisSinkConfig load(Map<String, Object> map, SinkContext
sinkContext) throws IOException {
+ return IOConfigUtils.loadWithSecrets(map, RedisSinkConfig.class,
sinkContext);
}
@Override
diff --git
a/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkConfigTest.java
b/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkConfigTest.java
index 1316d0994a1..39fc6e540c2 100644
---
a/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkConfigTest.java
+++
b/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkConfigTest.java
@@ -18,7 +18,9 @@
*/
package org.apache.pulsar.io.redis.sink;
+import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.redis.RedisAbstractConfig;
+import org.mockito.Mockito;
import org.testng.annotations.Test;
import java.io.File;
@@ -62,7 +64,34 @@ public class RedisSinkConfigTest {
map.put("batchTimeMs", "1000");
map.put("connectTimeout", "3000");
- RedisSinkConfig config = RedisSinkConfig.load(map);
+ SinkContext sinkContext = Mockito.mock(SinkContext.class);
+ RedisSinkConfig config = RedisSinkConfig.load(map, sinkContext);
+ assertNotNull(config);
+ assertEquals(config.getRedisHosts(), "localhost:6379");
+ assertEquals(config.getRedisPassword(), "fake@123");
+ assertEquals(config.getRedisDatabase(), Integer.parseInt("1"));
+ assertEquals(config.getClientMode(), "Standalone");
+ assertEquals(config.getOperationTimeout(), Long.parseLong("2000"));
+ assertEquals(config.getBatchSize(), Integer.parseInt("100"));
+ assertEquals(config.getBatchTimeMs(), Long.parseLong("1000"));
+ assertEquals(config.getConnectTimeout(), Long.parseLong("3000"));
+ }
+
+ @Test
+ public final void loadFromMapCredentialsFromSecretTest() throws
IOException {
+ Map<String, Object> map = new HashMap<String, Object>();
+ map.put("redisHosts", "localhost:6379");
+ map.put("redisDatabase", "1");
+ map.put("clientMode", "Standalone");
+ map.put("operationTimeout", "2000");
+ map.put("batchSize", "100");
+ map.put("batchTimeMs", "1000");
+ map.put("connectTimeout", "3000");
+
+ SinkContext sinkContext = Mockito.mock(SinkContext.class);
+ Mockito.when(sinkContext.getSecret("redisPassword"))
+ .thenReturn("fake@123");
+ RedisSinkConfig config = RedisSinkConfig.load(map, sinkContext);
assertNotNull(config);
assertEquals(config.getRedisHosts(), "localhost:6379");
assertEquals(config.getRedisPassword(), "fake@123");
@@ -86,12 +115,13 @@ public class RedisSinkConfigTest {
map.put("batchTimeMs", "1000");
map.put("connectTimeout", "3000");
- RedisSinkConfig config = RedisSinkConfig.load(map);
+ SinkContext sinkContext = Mockito.mock(SinkContext.class);
+ RedisSinkConfig config = RedisSinkConfig.load(map, sinkContext);
config.validate();
}
- @Test(expectedExceptions = NullPointerException.class,
- expectedExceptionsMessageRegExp = "redisHosts property not set.")
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = "redisHosts cannot be null")
public final void missingValidValidateTableNameTest() throws IOException {
Map<String, Object> map = new HashMap<String, Object>();
map.put("redisPassword", "fake@123");
@@ -102,7 +132,8 @@ public class RedisSinkConfigTest {
map.put("batchTimeMs", "1000");
map.put("connectTimeout", "3000");
- RedisSinkConfig config = RedisSinkConfig.load(map);
+ SinkContext sinkContext = Mockito.mock(SinkContext.class);
+ RedisSinkConfig config = RedisSinkConfig.load(map, sinkContext);
config.validate();
}
@@ -119,7 +150,8 @@ public class RedisSinkConfigTest {
map.put("batchTimeMs", "-100");
map.put("connectTimeout", "3000");
- RedisSinkConfig config = RedisSinkConfig.load(map);
+ SinkContext sinkContext = Mockito.mock(SinkContext.class);
+ RedisSinkConfig config = RedisSinkConfig.load(map, sinkContext);
config.validate();
}
@@ -136,7 +168,8 @@ public class RedisSinkConfigTest {
map.put("batchTimeMs", "1000");
map.put("connectTimeout", "3000");
- RedisSinkConfig config = RedisSinkConfig.load(map);
+ SinkContext sinkContext = Mockito.mock(SinkContext.class);
+ RedisSinkConfig config = RedisSinkConfig.load(map, sinkContext);
config.validate();
RedisAbstractConfig.ClientMode.valueOf(config.getClientMode().toUpperCase());
diff --git
a/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java
b/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java
index 214151345b4..2b407fafa5e 100644
---
a/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java
+++
b/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java
@@ -21,7 +21,9 @@ package org.apache.pulsar.io.redis.sink;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.instance.SinkRecord;
+import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.redis.EmbeddedRedisUtils;
+import org.mockito.Mockito;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -66,7 +68,8 @@ public class RedisSinkTest {
Record<byte[]> record = build("fakeTopic", "fakeKey", "fakeValue");
// open should success
- sink.open(configs, null);
+ SinkContext sinkContext = Mockito.mock(SinkContext.class);
+ sink.open(configs, sinkContext);
// write should success.
sink.write(record);
diff --git a/pulsar-io/solr/pom.xml b/pulsar-io/solr/pom.xml
index deb0e70e2e0..ffd1340f5f3 100644
--- a/pulsar-io/solr/pom.xml
+++ b/pulsar-io/solr/pom.xml
@@ -36,6 +36,11 @@
<name>Pulsar IO :: Solr</name>
<dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-io-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>${project.parent.groupId}</groupId>
<artifactId>pulsar-io-core</artifactId>
diff --git
a/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java
b/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java
index de9cdb4a9d8..202c782c14c 100644
---
a/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java
+++
b/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java
@@ -48,7 +48,7 @@ public abstract class SolrAbstractSink<T> implements Sink<T> {
@Override
public void open(Map<String, Object> config, SinkContext sinkContext)
throws Exception {
- solrSinkConfig = SolrSinkConfig.load(config);
+ solrSinkConfig = SolrSinkConfig.load(config, sinkContext);
solrSinkConfig.validate();
enableBasicAuth = !Strings.isNullOrEmpty(solrSinkConfig.getUsername());
diff --git
a/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrSinkConfig.java
b/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrSinkConfig.java
index 02733d230bd..daa93a366b1 100644
--- a/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrSinkConfig.java
+++ b/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrSinkConfig.java
@@ -27,6 +27,8 @@ import java.io.Serializable;
import java.util.Map;
import lombok.Data;
import lombok.experimental.Accessors;
+import org.apache.pulsar.io.common.IOConfigUtils;
+import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;
/**
@@ -84,9 +86,8 @@ public class SolrSinkConfig implements Serializable {
return mapper.readValue(new File(yamlFile), SolrSinkConfig.class);
}
- public static SolrSinkConfig load(Map<String, Object> map) throws
IOException {
- ObjectMapper mapper = new ObjectMapper();
- return mapper.readValue(mapper.writeValueAsString(map),
SolrSinkConfig.class);
+ public static SolrSinkConfig load(Map<String, Object> map, SinkContext
sinkContext) throws IOException {
+ return IOConfigUtils.loadWithSecrets(map, SolrSinkConfig.class,
sinkContext);
}
public void validate() {
diff --git
a/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrSinkConfigTest.java
b/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrSinkConfigTest.java
index 42d2121dbfc..2c2447a637d 100644
---
a/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrSinkConfigTest.java
+++
b/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrSinkConfigTest.java
@@ -19,6 +19,8 @@
package org.apache.pulsar.io.solr;
import com.google.common.collect.Lists;
+import org.apache.pulsar.io.core.SinkContext;
+import org.mockito.Mockito;
import org.testng.annotations.Test;
import java.io.File;
@@ -61,7 +63,31 @@ public class SolrSinkConfigTest {
map.put("username", "fakeuser");
map.put("password", "fake@123");
- SolrSinkConfig config = SolrSinkConfig.load(map);
+ SinkContext sinkContext = Mockito.mock(SinkContext.class);
+ SolrSinkConfig config = SolrSinkConfig.load(map, sinkContext);
+ assertNotNull(config);
+ assertEquals(config.getSolrUrl(),
"localhost:2181,localhost:2182/chroot");
+ assertEquals(config.getSolrMode(), "SolrCloud");
+ assertEquals(config.getSolrCollection(), "techproducts");
+ assertEquals(config.getSolrCommitWithinMs(), Integer.parseInt("100"));
+ assertEquals(config.getUsername(), "fakeuser");
+ assertEquals(config.getPassword(), "fake@123");
+ }
+
+ @Test
+ public final void loadFromMapCredentialsFromSecretTest() throws
IOException {
+ Map<String, Object> map = new HashMap<>();
+ map.put("solrUrl", "localhost:2181,localhost:2182/chroot");
+ map.put("solrMode", "SolrCloud");
+ map.put("solrCollection", "techproducts");
+ map.put("solrCommitWithinMs", "100");
+
+ SinkContext sinkContext = Mockito.mock(SinkContext.class);
+ Mockito.when(sinkContext.getSecret("username"))
+ .thenReturn("fakeuser");
+ Mockito.when(sinkContext.getSecret("password"))
+ .thenReturn("fake@123");
+ SolrSinkConfig config = SolrSinkConfig.load(map, sinkContext);
assertNotNull(config);
assertEquals(config.getSolrUrl(),
"localhost:2181,localhost:2182/chroot");
assertEquals(config.getSolrMode(), "SolrCloud");
@@ -81,12 +107,13 @@ public class SolrSinkConfigTest {
map.put("username", "fakeuser");
map.put("password", "fake@123");
- SolrSinkConfig config = SolrSinkConfig.load(map);
+ SinkContext sinkContext = Mockito.mock(SinkContext.class);
+ SolrSinkConfig config = SolrSinkConfig.load(map, sinkContext);
config.validate();
}
- @Test(expectedExceptions = NullPointerException.class,
- expectedExceptionsMessageRegExp = "solrUrl property not set.")
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = "solrUrl cannot be null")
public final void missingValidValidateSolrModeTest() throws IOException {
Map<String, Object> map = new HashMap<>();
map.put("solrMode", "SolrCloud");
@@ -95,7 +122,8 @@ public class SolrSinkConfigTest {
map.put("username", "fakeuser");
map.put("password", "fake@123");
- SolrSinkConfig config = SolrSinkConfig.load(map);
+ SinkContext sinkContext = Mockito.mock(SinkContext.class);
+ SolrSinkConfig config = SolrSinkConfig.load(map, sinkContext);
config.validate();
}
@@ -110,7 +138,8 @@ public class SolrSinkConfigTest {
map.put("username", "fakeuser");
map.put("password", "fake@123");
- SolrSinkConfig config = SolrSinkConfig.load(map);
+ SinkContext sinkContext = Mockito.mock(SinkContext.class);
+ SolrSinkConfig config = SolrSinkConfig.load(map, sinkContext);
config.validate();
}
@@ -125,7 +154,8 @@ public class SolrSinkConfigTest {
map.put("username", "fakeuser");
map.put("password", "fake@123");
- SolrSinkConfig config = SolrSinkConfig.load(map);
+ SinkContext sinkContext = Mockito.mock(SinkContext.class);
+ SolrSinkConfig config = SolrSinkConfig.load(map, sinkContext);
config.validate();
SolrAbstractSink.SolrMode.valueOf(config.getSolrMode().toUpperCase());
@@ -141,7 +171,8 @@ public class SolrSinkConfigTest {
map.put("username", "fakeuser");
map.put("password", "fake@123");
- SolrSinkConfig config = SolrSinkConfig.load(map);
+ SinkContext sinkContext = Mockito.mock(SinkContext.class);
+ SolrSinkConfig config = SolrSinkConfig.load(map, sinkContext);
config.validate();
String url = config.getSolrUrl();