This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new beb8418ba7c [improve][io] Make connectors load sensitive fields from 
secrets (#21675)
beb8418ba7c is described below

commit beb8418ba7c667ab713fb480a03ebb0deb41db1d
Author: jiangpengcheng <[email protected]>
AuthorDate: Mon Dec 11 09:13:43 2023 +0800

    [improve][io] Make connectors load sensitive fields from secrets (#21675)
---
 pulsar-io/canal/pom.xml                            |  5 ++
 .../pulsar/io/canal/CanalAbstractSource.java       |  2 +-
 .../apache/pulsar/io/canal/CanalSourceConfig.java  |  7 +--
 .../org/apache/pulsar/io/common/IOConfigUtils.java |  7 +--
 .../apache/pulsar/io/common/IOConfigUtilsTest.java | 11 ++++
 pulsar-io/dynamodb/pom.xml                         |  6 +++
 .../apache/pulsar/io/dynamodb/DynamoDBSource.java  |  2 +-
 .../pulsar/io/dynamodb/DynamoDBSourceConfig.java   |  8 +--
 .../io/dynamodb/DynamoDBSourceConfigTests.java     | 52 +++++++++++++++++--
 pulsar-io/influxdb/pom.xml                         |  5 ++
 .../io/influxdb/InfluxDBGenericRecordSink.java     |  4 +-
 .../io/influxdb/v1/InfluxDBAbstractSink.java       |  2 +-
 .../pulsar/io/influxdb/v1/InfluxDBSinkConfig.java  | 11 ++--
 .../apache/pulsar/io/influxdb/v2/InfluxDBSink.java |  2 +-
 .../pulsar/io/influxdb/v2/InfluxDBSinkConfig.java  | 14 ++---
 .../io/influxdb/v1/InfluxDBSinkConfigTest.java     | 56 +++++++++++++++++---
 .../io/influxdb/v2/InfluxDBSinkConfigTest.java     | 29 +++++++++--
 pulsar-io/jdbc/core/pom.xml                        |  6 +++
 .../apache/pulsar/io/jdbc/JdbcAbstractSink.java    |  2 +-
 .../org/apache/pulsar/io/jdbc/JdbcSinkConfig.java  |  7 +--
 pulsar-io/kafka/pom.xml                            |  5 ++
 .../apache/pulsar/io/kafka/KafkaAbstractSink.java  |  6 +--
 .../pulsar/io/kafka/KafkaAbstractSource.java       |  2 +-
 .../apache/pulsar/io/kafka/KafkaSinkConfig.java    | 11 ++--
 .../apache/pulsar/io/kafka/KafkaSourceConfig.java  |  9 +++-
 .../io/kafka/sink/KafkaAbstractSinkTest.java       |  8 +--
 .../io/kafka/source/KafkaAbstractSourceTest.java   | 26 ++++++++--
 pulsar-io/mongo/pom.xml                            |  5 ++
 .../pulsar/io/mongodb/MongoAbstractConfig.java     |  3 +-
 .../org/apache/pulsar/io/mongodb/MongoSink.java    |  2 +-
 .../apache/pulsar/io/mongodb/MongoSinkConfig.java  |  9 ++--
 .../org/apache/pulsar/io/mongodb/MongoSource.java  |  2 +-
 .../pulsar/io/mongodb/MongoSourceConfig.java       | 10 ++--
 .../pulsar/io/mongodb/MongoSinkConfigTest.java     | 41 ++++++++++++---
 .../pulsar/io/mongodb/MongoSourceConfigTest.java   | 38 +++++++++++---
 pulsar-io/rabbitmq/pom.xml                         |  5 ++
 .../apache/pulsar/io/rabbitmq/RabbitMQSink.java    |  2 +-
 .../pulsar/io/rabbitmq/RabbitMQSinkConfig.java     |  9 ++--
 .../apache/pulsar/io/rabbitmq/RabbitMQSource.java  |  2 +-
 .../pulsar/io/rabbitmq/RabbitMQSourceConfig.java   |  7 +--
 .../io/rabbitmq/sink/RabbitMQSinkConfigTest.java   | 52 +++++++++++++++++--
 .../rabbitmq/source/RabbitMQSourceConfigTest.java  | 60 +++++++++++++++++++---
 pulsar-io/redis/pom.xml                            |  5 ++
 .../pulsar/io/redis/RedisAbstractConfig.java       |  5 +-
 .../org/apache/pulsar/io/redis/sink/RedisSink.java |  2 +-
 .../pulsar/io/redis/sink/RedisSinkConfig.java      | 11 ++--
 .../pulsar/io/redis/sink/RedisSinkConfigTest.java  | 47 ++++++++++++++---
 .../apache/pulsar/io/redis/sink/RedisSinkTest.java |  5 +-
 pulsar-io/solr/pom.xml                             |  5 ++
 .../apache/pulsar/io/solr/SolrAbstractSink.java    |  2 +-
 .../org/apache/pulsar/io/solr/SolrSinkConfig.java  |  7 +--
 .../apache/pulsar/io/solr/SolrSinkConfigTest.java  | 47 ++++++++++++++---
 52 files changed, 540 insertions(+), 148 deletions(-)

diff --git a/pulsar-io/canal/pom.xml b/pulsar-io/canal/pom.xml
index 7d4887e6db0..b15c23dd591 100644
--- a/pulsar-io/canal/pom.xml
+++ b/pulsar-io/canal/pom.xml
@@ -37,6 +37,11 @@
     </properties>
 
     <dependencies>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>pulsar-io-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>${project.groupId}</groupId>
             <artifactId>pulsar-io-core</artifactId>
diff --git 
a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalAbstractSource.java
 
b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalAbstractSource.java
index 06c8788d5ae..7d0cd0305a4 100644
--- 
a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalAbstractSource.java
+++ 
b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalAbstractSource.java
@@ -57,7 +57,7 @@ public abstract class CanalAbstractSource<V> extends 
PushSource<V> {
 
     @Override
     public void open(Map<String, Object> config, SourceContext sourceContext) 
throws Exception {
-        canalSourceConfig = CanalSourceConfig.load(config);
+        canalSourceConfig = CanalSourceConfig.load(config, sourceContext);
         if (canalSourceConfig.getCluster()) {
             connector = 
CanalConnectors.newClusterConnector(canalSourceConfig.getZkServers(),
                     canalSourceConfig.getDestination(), 
canalSourceConfig.getUsername(),
diff --git 
a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSourceConfig.java
 
b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSourceConfig.java
index a0408e60e5f..5a754988ffd 100644
--- 
a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSourceConfig.java
+++ 
b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSourceConfig.java
@@ -26,6 +26,8 @@ import java.io.Serializable;
 import java.util.Map;
 import lombok.Data;
 import lombok.experimental.Accessors;
+import org.apache.pulsar.io.common.IOConfigUtils;
+import org.apache.pulsar.io.core.SourceContext;
 import org.apache.pulsar.io.core.annotations.FieldDoc;
 
 
@@ -86,8 +88,7 @@ public class CanalSourceConfig implements Serializable{
     }
 
 
-    public static CanalSourceConfig load(Map<String, Object> map) throws 
IOException {
-        ObjectMapper mapper = new ObjectMapper();
-        return mapper.readValue(mapper.writeValueAsString(map), 
CanalSourceConfig.class);
+    public static CanalSourceConfig load(Map<String, Object> map, 
SourceContext sourceContext) throws IOException {
+        return IOConfigUtils.loadWithSecrets(map, CanalSourceConfig.class, 
sourceContext);
     }
 }
diff --git 
a/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java 
b/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java
index d15986a897c..69d981bf687 100644
--- 
a/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java
+++ 
b/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java
@@ -77,13 +77,14 @@ public class IOConfigUtils {
                         }
                     }
                     configs.computeIfAbsent(field.getName(), key -> {
-                        if (fieldDoc.required()) {
-                            throw new IllegalArgumentException(field.getName() 
+ " cannot be null");
-                        }
+                        // Use default value if it is not null before checking 
required
                         String value = fieldDoc.defaultValue();
                         if (value != null && !value.isEmpty()) {
                             return value;
                         }
+                        if (fieldDoc.required()) {
+                            throw new IllegalArgumentException(field.getName() 
+ " cannot be null");
+                        }
                         return null;
                     });
                 }
diff --git 
a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
 
b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
index 52afac1a5ac..fdcd2ea7fe8 100644
--- 
a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
+++ 
b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
@@ -54,6 +54,14 @@ public class IOConfigUtilsTest {
         )
         protected String testRequired;
 
+        @FieldDoc(
+                required = true,
+                defaultValue = "defaultRequired",
+                sensitive = true,
+                help = "testRequired"
+        )
+        protected String testDefaultRequired;
+
         @FieldDoc(
                 required = false,
                 defaultValue = "defaultStr",
@@ -299,6 +307,9 @@ public class IOConfigUtilsTest {
         configMap.put("testRequired", "test");
         TestDefaultConfig testDefaultConfig =
                 IOConfigUtils.loadWithSecrets(configMap, 
TestDefaultConfig.class, new TestSinkContext());
+        // if there is default value for a required field and no value 
provided when load config,
+        // it should not throw exception but use the default value.
+        Assert.assertEquals(testDefaultConfig.getTestDefaultRequired(), 
"defaultRequired");
         Assert.assertEquals(testDefaultConfig.getDefaultStr(), "defaultStr");
         Assert.assertEquals(testDefaultConfig.isDefaultBool(), true);
         Assert.assertEquals(testDefaultConfig.getDefaultInt(), 100);
diff --git a/pulsar-io/dynamodb/pom.xml b/pulsar-io/dynamodb/pom.xml
index 7e5fad8854f..1e5423e94cc 100644
--- a/pulsar-io/dynamodb/pom.xml
+++ b/pulsar-io/dynamodb/pom.xml
@@ -32,6 +32,12 @@
 
   <dependencies>
 
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
     <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-io-core</artifactId>
diff --git 
a/pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/DynamoDBSource.java
 
b/pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/DynamoDBSource.java
index d67c4e21154..2193cf39c17 100644
--- 
a/pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/DynamoDBSource.java
+++ 
b/pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/DynamoDBSource.java
@@ -65,7 +65,7 @@ public class DynamoDBSource extends AbstractAwsConnector 
implements Source<byte[
 
     @Override
     public void open(Map<String, Object> config, SourceContext sourceContext) 
throws Exception {
-        this.dynamodbSourceConfig = DynamoDBSourceConfig.load(config);
+        this.dynamodbSourceConfig = DynamoDBSourceConfig.load(config, 
sourceContext);
         
checkArgument(isNotBlank(dynamodbSourceConfig.getAwsDynamodbStreamArn()),
                 "empty dynamo-stream arn");
         // Even if the endpoint is set, it seems to require a region to go 
with it
diff --git 
a/pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/DynamoDBSourceConfig.java
 
b/pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/DynamoDBSourceConfig.java
index b734dd57411..0547ff8f863 100644
--- 
a/pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/DynamoDBSourceConfig.java
+++ 
b/pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/DynamoDBSourceConfig.java
@@ -35,6 +35,8 @@ import java.util.Date;
 import java.util.Map;
 import lombok.Data;
 import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin;
+import org.apache.pulsar.io.common.IOConfigUtils;
+import org.apache.pulsar.io.core.SourceContext;
 import org.apache.pulsar.io.core.annotations.FieldDoc;
 import software.amazon.awssdk.regions.Region;
 
@@ -77,6 +79,7 @@ public class DynamoDBSourceConfig implements Serializable {
     @FieldDoc(
             required = false,
             defaultValue = "",
+            sensitive = true,
             help = "json-parameters to initialize 
`AwsCredentialsProviderPlugin`")
     private String awsCredentialPluginParam = "";
 
@@ -170,9 +173,8 @@ public class DynamoDBSourceConfig implements Serializable {
         return mapper.readValue(new File(yamlFile), 
DynamoDBSourceConfig.class);
     }
 
-    public static DynamoDBSourceConfig load(Map<String, Object> map) throws 
IOException {
-        ObjectMapper mapper = new ObjectMapper();
-        return mapper.readValue(mapper.writeValueAsString(map), 
DynamoDBSourceConfig.class);
+    public static DynamoDBSourceConfig load(Map<String, Object> map, 
SourceContext sourceContext) throws IOException {
+        return IOConfigUtils.loadWithSecrets(map, DynamoDBSourceConfig.class, 
sourceContext);
     }
 
     protected Region regionAsV2Region() {
diff --git 
a/pulsar-io/dynamodb/src/test/java/org/apache/pulsar/io/dynamodb/DynamoDBSourceConfigTests.java
 
b/pulsar-io/dynamodb/src/test/java/org/apache/pulsar/io/dynamodb/DynamoDBSourceConfigTests.java
index f84cb785896..bdccaa2e584 100644
--- 
a/pulsar-io/dynamodb/src/test/java/org/apache/pulsar/io/dynamodb/DynamoDBSourceConfigTests.java
+++ 
b/pulsar-io/dynamodb/src/test/java/org/apache/pulsar/io/dynamodb/DynamoDBSourceConfigTests.java
@@ -31,6 +31,8 @@ import java.util.HashMap;
 import java.util.Map;
 
 import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import org.apache.pulsar.io.core.SourceContext;
+import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
 
@@ -90,7 +92,8 @@ public class DynamoDBSourceConfigTests {
         map.put("initialPositionInStream", 
InitialPositionInStream.TRIM_HORIZON);
         map.put("startAtTime", DAY);
 
-        DynamoDBSourceConfig config = DynamoDBSourceConfig.load(map);
+        SourceContext sourceContext = Mockito.mock(SourceContext.class);
+        DynamoDBSourceConfig config = DynamoDBSourceConfig.load(map, 
sourceContext);
         
         assertNotNull(config);
         assertEquals(config.getAwsEndpoint(), "https://some.endpoint.aws";);
@@ -111,7 +114,46 @@ public class DynamoDBSourceConfigTests {
         ZonedDateTime expected = ZonedDateTime.ofInstant(DAY.toInstant(), 
ZoneOffset.UTC);
         assertEquals(actual, expected);
     }
-    
+
+    @Test
+    public final void loadFromMapCredentialFromSecretTest() throws IOException 
{
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("awsEndpoint", "https://some.endpoint.aws";);
+        map.put("awsRegion", "us-east-1");
+        map.put("awsDynamodbStreamArn", 
"arn:aws:dynamodb:us-west-2:111122223333:table/TestTable/stream/2015-05-11T21:21:33.291");
+        map.put("checkpointInterval", "30000");
+        map.put("backoffTime", "4000");
+        map.put("numRetries", "3");
+        map.put("receiveQueueSize", 2000);
+        map.put("applicationName", "My test application");
+        map.put("initialPositionInStream", 
InitialPositionInStream.TRIM_HORIZON);
+        map.put("startAtTime", DAY);
+
+        SourceContext sourceContext = Mockito.mock(SourceContext.class);
+        Mockito.when(sourceContext.getSecret("awsCredentialPluginParam"))
+                
.thenReturn("{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
+        DynamoDBSourceConfig config = DynamoDBSourceConfig.load(map, 
sourceContext);
+
+        assertNotNull(config);
+        assertEquals(config.getAwsEndpoint(), "https://some.endpoint.aws";);
+        assertEquals(config.getAwsRegion(), "us-east-1");
+        assertEquals(config.getAwsDynamodbStreamArn(), 
"arn:aws:dynamodb:us-west-2:111122223333:table/TestTable/stream/2015-05-11T21:21:33.291");
+        assertEquals(config.getAwsCredentialPluginParam(),
+                "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
+        assertEquals(config.getApplicationName(), "My test application");
+        assertEquals(config.getCheckpointInterval(), 30000);
+        assertEquals(config.getBackoffTime(), 4000);
+        assertEquals(config.getNumRetries(), 3);
+        assertEquals(config.getReceiveQueueSize(), 2000);
+        assertEquals(config.getInitialPositionInStream(), 
InitialPositionInStream.TRIM_HORIZON);
+
+        Calendar cal = Calendar.getInstance();
+        cal.setTime(config.getStartAtTime());
+        ZonedDateTime actual = ZonedDateTime.ofInstant(cal.toInstant(), 
ZoneOffset.UTC);
+        ZonedDateTime expected = ZonedDateTime.ofInstant(DAY.toInstant(), 
ZoneOffset.UTC);
+        assertEquals(actual, expected);
+    }
+
     @Test(expectedExceptions = IllegalArgumentException.class, 
             expectedExceptionsMessageRegExp = "empty aws-credential param")
     public final void missingCredentialsTest() throws Exception {
@@ -121,7 +163,8 @@ public class DynamoDBSourceConfigTests {
         map.put("awsDynamodbStreamArn", 
"arn:aws:dynamodb:us-west-2:111122223333:table/TestTable/stream/2015-05-11T21:21:33.291");
      
         DynamoDBSource source = new DynamoDBSource();
-        source.open(map, null);
+        SourceContext sourceContext = Mockito.mock(SourceContext.class);
+        source.open(map, sourceContext);
     }
     
     @Test(expectedExceptions = IllegalArgumentException.class, 
@@ -136,7 +179,8 @@ public class DynamoDBSourceConfigTests {
         map.put("initialPositionInStream", 
InitialPositionInStream.AT_TIMESTAMP);
 
         DynamoDBSource source = new DynamoDBSource();
-        source.open(map, null);
+        SourceContext sourceContext = Mockito.mock(SourceContext.class);
+        source.open(map, sourceContext);
     }
     
     private File getFile(String name) {
diff --git a/pulsar-io/influxdb/pom.xml b/pulsar-io/influxdb/pom.xml
index 9c58cd40093..ec3d1c8d2f7 100644
--- a/pulsar-io/influxdb/pom.xml
+++ b/pulsar-io/influxdb/pom.xml
@@ -32,6 +32,11 @@
     <name>Pulsar IO :: InfluxDB</name>
 
     <dependencies>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>pulsar-io-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>${project.groupId}</groupId>
             <artifactId>pulsar-io-core</artifactId>
diff --git 
a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSink.java
 
b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSink.java
index 5b51461fc7b..0d431f84c52 100644
--- 
a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSink.java
+++ 
b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSink.java
@@ -46,12 +46,12 @@ public class InfluxDBGenericRecordSink implements 
Sink<GenericRecord> {
     @Override
     public void open(Map<String, Object> map, SinkContext sinkContext) throws 
Exception {
         try {
-            val configV2 = InfluxDBSinkConfig.load(map);
+            val configV2 = InfluxDBSinkConfig.load(map, sinkContext);
             configV2.validate();
             sink = new InfluxDBSink();
         } catch (Exception e) {
             try {
-                val configV1 = 
org.apache.pulsar.io.influxdb.v1.InfluxDBSinkConfig.load(map);
+                val configV1 = 
org.apache.pulsar.io.influxdb.v1.InfluxDBSinkConfig.load(map, sinkContext);
                 configV1.validate();
                 sink = new 
org.apache.pulsar.io.influxdb.v1.InfluxDBGenericRecordSink();
             } catch (Exception e1) {
diff --git 
a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v1/InfluxDBAbstractSink.java
 
b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v1/InfluxDBAbstractSink.java
index 06856bad80e..217c5304b24 100644
--- 
a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v1/InfluxDBAbstractSink.java
+++ 
b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v1/InfluxDBAbstractSink.java
@@ -43,7 +43,7 @@ public abstract class InfluxDBAbstractSink<T> extends 
BatchSink<Point, T> {
 
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) 
throws Exception {
-        InfluxDBSinkConfig influxDBSinkConfig = 
InfluxDBSinkConfig.load(config);
+        InfluxDBSinkConfig influxDBSinkConfig = 
InfluxDBSinkConfig.load(config, sinkContext);
         influxDBSinkConfig.validate();
 
         super.init(influxDBSinkConfig.getBatchTimeMs(), 
influxDBSinkConfig.getBatchSize());
diff --git 
a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v1/InfluxDBSinkConfig.java
 
b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v1/InfluxDBSinkConfig.java
index 9b7d8e1ce90..4ae2cf1e4a3 100644
--- 
a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v1/InfluxDBSinkConfig.java
+++ 
b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v1/InfluxDBSinkConfig.java
@@ -27,6 +27,8 @@ import java.io.Serializable;
 import java.util.Map;
 import lombok.Data;
 import lombok.experimental.Accessors;
+import org.apache.pulsar.io.common.IOConfigUtils;
+import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.annotations.FieldDoc;
 
 /**
@@ -94,7 +96,7 @@ public class InfluxDBSinkConfig implements Serializable {
 
     @FieldDoc(
         required = false,
-        defaultValue = "1000L",
+        defaultValue = "1000",
         help = "The InfluxDB operation time in milliseconds")
     private long batchTimeMs = 1000L;
 
@@ -110,14 +112,11 @@ public class InfluxDBSinkConfig implements Serializable {
         return mapper.readValue(new File(yamlFile), InfluxDBSinkConfig.class);
     }
 
-    public static InfluxDBSinkConfig load(Map<String, Object> map) throws 
IOException {
-        ObjectMapper mapper = new ObjectMapper();
-        return mapper.readValue(mapper.writeValueAsString(map), 
InfluxDBSinkConfig.class);
+    public static InfluxDBSinkConfig load(Map<String, Object> map, SinkContext 
sinkContext) throws IOException {
+        return IOConfigUtils.loadWithSecrets(map, InfluxDBSinkConfig.class, 
sinkContext);
     }
 
     public void validate() {
-        Preconditions.checkNotNull(influxdbUrl, "influxdbUrl property not 
set.");
-        Preconditions.checkNotNull(database, "database property not set.");
         Preconditions.checkArgument(batchSize > 0, "batchSize must be a 
positive integer.");
         Preconditions.checkArgument(batchTimeMs > 0, "batchTimeMs must be a 
positive long.");
     }
diff --git 
a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSink.java
 
b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSink.java
index 08f1ab23399..0aa43570596 100644
--- 
a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSink.java
+++ 
b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSink.java
@@ -49,7 +49,7 @@ public class InfluxDBSink extends BatchSink<Point, 
GenericRecord> {
 
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) 
throws Exception {
-        InfluxDBSinkConfig influxDBSinkConfig = 
InfluxDBSinkConfig.load(config);
+        InfluxDBSinkConfig influxDBSinkConfig = 
InfluxDBSinkConfig.load(config, sinkContext);
         influxDBSinkConfig.validate();
         super.init(influxDBSinkConfig.getBatchTimeMs(), 
influxDBSinkConfig.getBatchSize());
 
diff --git 
a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSinkConfig.java
 
b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSinkConfig.java
index 899b00c0021..ea87ee66b90 100644
--- 
a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSinkConfig.java
+++ 
b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSinkConfig.java
@@ -27,6 +27,8 @@ import java.io.Serializable;
 import java.util.Map;
 import lombok.Data;
 import lombok.experimental.Accessors;
+import org.apache.pulsar.io.common.IOConfigUtils;
+import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.annotations.FieldDoc;
 
 /**
@@ -87,7 +89,7 @@ public class InfluxDBSinkConfig implements Serializable {
 
     @FieldDoc(
             required = false,
-            defaultValue = "1000L",
+            defaultValue = "1000",
             help = "The InfluxDB operation time in milliseconds")
     private long batchTimeMs = 1000;
 
@@ -103,17 +105,11 @@ public class InfluxDBSinkConfig implements Serializable {
         return mapper.readValue(new File(yamlFile), InfluxDBSinkConfig.class);
     }
 
-    public static InfluxDBSinkConfig load(Map<String, Object> map) throws 
IOException {
-        ObjectMapper mapper = new ObjectMapper();
-        return mapper.readValue(mapper.writeValueAsString(map), 
InfluxDBSinkConfig.class);
+    public static InfluxDBSinkConfig load(Map<String, Object> map, SinkContext 
sinkContext) throws IOException {
+        return IOConfigUtils.loadWithSecrets(map, InfluxDBSinkConfig.class, 
sinkContext);
     }
 
     public void validate() {
-        Preconditions.checkNotNull(influxdbUrl, "influxdbUrl property not 
set.");
-        Preconditions.checkNotNull(token, "token property not set.");
-        Preconditions.checkNotNull(organization, "organization property not 
set.");
-        Preconditions.checkNotNull(bucket, "bucket property not set.");
-
         Preconditions.checkArgument(batchSize > 0, "batchSize must be a 
positive integer.");
         Preconditions.checkArgument(batchTimeMs > 0, "batchTimeMs must be a 
positive long.");
     }
diff --git 
a/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/v1/InfluxDBSinkConfigTest.java
 
b/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/v1/InfluxDBSinkConfigTest.java
index 4493dcfb248..10b1bfb624f 100644
--- 
a/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/v1/InfluxDBSinkConfigTest.java
+++ 
b/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/v1/InfluxDBSinkConfigTest.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pulsar.io.influxdb.v1;
 
+import org.apache.pulsar.io.core.SinkContext;
 import org.influxdb.InfluxDB;
+import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
 import java.io.File;
@@ -60,8 +62,11 @@ public class InfluxDBSinkConfigTest {
         map.put("gzipEnable", "false");
         map.put("batchTimeMs", "1000");
         map.put("batchSize", "100");
+        map.put("username", "admin");
+        map.put("password", "admin");
 
-        InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map);
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map, sinkContext);
         assertNotNull(config);
         assertEquals("http://localhost:8086";, config.getInfluxdbUrl());
         assertEquals("test_db", config.getDatabase());
@@ -71,6 +76,39 @@ public class InfluxDBSinkConfigTest {
         assertEquals(Boolean.parseBoolean("false"), config.isGzipEnable());
         assertEquals(Long.parseLong("1000"), config.getBatchTimeMs());
         assertEquals(Integer.parseInt("100"), config.getBatchSize());
+        assertEquals("admin", config.getUsername());
+        assertEquals("admin", config.getPassword());
+    }
+
+    @Test
+    public final void loadFromMapCredentialFromSecretTest() throws IOException 
{
+        Map<String, Object> map = new HashMap<>();
+        map.put("influxdbUrl", "http://localhost:8086";);
+        map.put("database", "test_db");
+        map.put("consistencyLevel", "ONE");
+        map.put("logLevel", "NONE");
+        map.put("retentionPolicy", "autogen");
+        map.put("gzipEnable", "false");
+        map.put("batchTimeMs", "1000");
+        map.put("batchSize", "100");
+
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        Mockito.when(sinkContext.getSecret("username"))
+                .thenReturn("admin");
+        Mockito.when(sinkContext.getSecret("password"))
+                .thenReturn("admin");
+        InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map, sinkContext);
+        assertNotNull(config);
+        assertEquals("http://localhost:8086";, config.getInfluxdbUrl());
+        assertEquals("test_db", config.getDatabase());
+        assertEquals("ONE", config.getConsistencyLevel());
+        assertEquals("NONE", config.getLogLevel());
+        assertEquals("autogen", config.getRetentionPolicy());
+        assertEquals(Boolean.parseBoolean("false"), config.isGzipEnable());
+        assertEquals(Long.parseLong("1000"), config.getBatchTimeMs());
+        assertEquals(Integer.parseInt("100"), config.getBatchSize());
+        assertEquals("admin", config.getUsername());
+        assertEquals("admin", config.getPassword());
     }
 
     @Test
@@ -85,12 +123,13 @@ public class InfluxDBSinkConfigTest {
         map.put("batchTimeMs", "1000");
         map.put("batchSize", "100");
 
-        InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map);
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map, sinkContext);
         config.validate();
     }
 
-    @Test(expectedExceptions = NullPointerException.class,
-        expectedExceptionsMessageRegExp = "influxdbUrl property not set.")
+    @Test(expectedExceptions = IllegalArgumentException.class,
+        expectedExceptionsMessageRegExp = "influxdbUrl cannot be null")
     public final void missingInfluxdbUrlValidateTest() throws IOException {
         Map<String, Object> map = new HashMap<>();
         map.put("database", "test_db");
@@ -101,7 +140,8 @@ public class InfluxDBSinkConfigTest {
         map.put("batchTimeMs", "1000");
         map.put("batchSize", "100");
 
-        InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map);
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map, sinkContext);
         config.validate();
     }
 
@@ -118,7 +158,8 @@ public class InfluxDBSinkConfigTest {
         map.put("batchTimeMs", "1000");
         map.put("batchSize", "-100");
 
-        InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map);
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map, sinkContext);
         config.validate();
     }
 
@@ -135,7 +176,8 @@ public class InfluxDBSinkConfigTest {
         map.put("batchTimeMs", "1000");
         map.put("batchSize", "100");
 
-        InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map);
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map, sinkContext);
         config.validate();
 
         
InfluxDB.ConsistencyLevel.valueOf(config.getConsistencyLevel().toUpperCase());
diff --git 
a/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSinkConfigTest.java
 
b/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSinkConfigTest.java
index df1f7fd29a6..d6cee1e308d 100644
--- 
a/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSinkConfigTest.java
+++ 
b/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSinkConfigTest.java
@@ -24,6 +24,8 @@ import static org.testng.Assert.assertNotNull;
 import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.pulsar.io.core.SinkContext;
+import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
 public class InfluxDBSinkConfigTest {
@@ -58,18 +60,34 @@ public class InfluxDBSinkConfigTest {
     public final void testLoadFromMap() throws Exception {
         Map<String, Object> map = buildValidConfigMap();
 
-        InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map);
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map, sinkContext);
         assertNotNull(config);
         config.validate();
         verifyValues(config);
     }
 
-    @Test(expectedExceptions = NullPointerException.class,
-            expectedExceptionsMessageRegExp = "influxdbUrl property not set.")
+    @Test
+    public final void testLoadFromMapCredentialFromSecret() throws Exception {
+        Map<String, Object> map = buildValidConfigMap();
+        map.remove("token");
+
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        Mockito.when(sinkContext.getSecret("token"))
+                .thenReturn("xxxx");
+        InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map, sinkContext);
+        assertNotNull(config);
+        config.validate();
+        verifyValues(config);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class,
+            expectedExceptionsMessageRegExp = "influxdbUrl cannot be null")
     public void testRequiredConfigMissing() throws Exception {
         Map<String, Object> map = buildValidConfigMap();
         map.remove("influxdbUrl");
-        InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map);
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map, sinkContext);
         config.validate();
     }
 
@@ -78,7 +96,8 @@ public class InfluxDBSinkConfigTest {
     public void testBatchConfig() throws Exception {
         Map<String, Object> map = buildValidConfigMap();
         map.put("batchSize", -1);
-        InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map);
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map, sinkContext);
         config.validate();
     }
 
diff --git a/pulsar-io/jdbc/core/pom.xml b/pulsar-io/jdbc/core/pom.xml
index 324aa174aca..7870eaa6454 100644
--- a/pulsar-io/jdbc/core/pom.xml
+++ b/pulsar-io/jdbc/core/pom.xml
@@ -32,6 +32,12 @@
   <name>Pulsar IO :: Jdbc :: Core</name>
 
   <dependencies>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
     <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-io-core</artifactId>
diff --git 
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
 
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
index 4586fcebcf1..ca33b3cfdab 100644
--- 
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
+++ 
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
@@ -76,7 +76,7 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
 
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) 
throws Exception {
-        jdbcSinkConfig = JdbcSinkConfig.load(config);
+        jdbcSinkConfig = JdbcSinkConfig.load(config, sinkContext);
         jdbcSinkConfig.validate();
 
         jdbcUrl = jdbcSinkConfig.getJdbcUrl();
diff --git 
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
 
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
index f798d94f7c3..854d6838131 100644
--- 
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
+++ 
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
@@ -26,6 +26,8 @@ import java.io.Serializable;
 import java.util.Map;
 import lombok.Data;
 import lombok.experimental.Accessors;
+import org.apache.pulsar.io.common.IOConfigUtils;
+import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.annotations.FieldDoc;
 
 @Data
@@ -145,9 +147,8 @@ public class JdbcSinkConfig implements Serializable {
         return mapper.readValue(new File(yamlFile), JdbcSinkConfig.class);
     }
 
-    public static JdbcSinkConfig load(Map<String, Object> map) throws 
IOException {
-        ObjectMapper mapper = new ObjectMapper();
-        return mapper.readValue(mapper.writeValueAsString(map), 
JdbcSinkConfig.class);
+    public static JdbcSinkConfig load(Map<String, Object> map, SinkContext 
sinkContext) throws IOException {
+        return IOConfigUtils.loadWithSecrets(map, JdbcSinkConfig.class, 
sinkContext);
     }
 
     public void validate() {
diff --git a/pulsar-io/kafka/pom.xml b/pulsar-io/kafka/pom.xml
index 190e90209db..adbbce1348e 100644
--- a/pulsar-io/kafka/pom.xml
+++ b/pulsar-io/kafka/pom.xml
@@ -46,6 +46,11 @@
   </dependencyManagement>
 
   <dependencies>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
 
     <dependency>
       <groupId>${project.groupId}</groupId>
diff --git 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
index 8fbd6c81861..b1710d2c0b7 100644
--- 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
+++ 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.io.kafka;
 
 import java.io.IOException;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Properties;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
@@ -79,10 +78,7 @@ public abstract class KafkaAbstractSink<K, V> implements 
Sink<byte[]> {
 
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) 
throws Exception {
-        kafkaSinkConfig = KafkaSinkConfig.load(config);
-        Objects.requireNonNull(kafkaSinkConfig.getTopic(), "Kafka topic is not 
set");
-        Objects.requireNonNull(kafkaSinkConfig.getBootstrapServers(), "Kafka 
bootstrapServers is not set");
-        Objects.requireNonNull(kafkaSinkConfig.getAcks(), "Kafka acks mode is 
not set");
+        kafkaSinkConfig = KafkaSinkConfig.load(config, sinkContext);
         if (kafkaSinkConfig.getBatchSize() <= 0) {
             throw new IllegalArgumentException("Invalid Kafka Producer 
batchSize : "
                 + kafkaSinkConfig.getBatchSize());
diff --git 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
index ceee8e0b04c..cd0db92bd78 100644
--- 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
+++ 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
@@ -65,7 +65,7 @@ public abstract class KafkaAbstractSource<V> extends 
KafkaPushSource<V> {
 
     @Override
     public void open(Map<String, Object> config, SourceContext sourceContext) 
throws Exception {
-        kafkaSourceConfig = KafkaSourceConfig.load(config);
+        kafkaSourceConfig = KafkaSourceConfig.load(config, sourceContext);
         Objects.requireNonNull(kafkaSourceConfig.getTopic(), "Kafka topic is 
not set");
         Objects.requireNonNull(kafkaSourceConfig.getBootstrapServers(), "Kafka 
bootstrapServers is not set");
         Objects.requireNonNull(kafkaSourceConfig.getGroupId(), "Kafka consumer 
group id is not set");
diff --git 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSinkConfig.java 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSinkConfig.java
index dbbf1a7b5e2..4ea5fc897d6 100644
--- 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSinkConfig.java
+++ 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSinkConfig.java
@@ -26,6 +26,8 @@ import java.io.Serializable;
 import java.util.Map;
 import lombok.Data;
 import lombok.experimental.Accessors;
+import org.apache.pulsar.io.common.IOConfigUtils;
+import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.annotations.FieldDoc;
 
 @Data
@@ -91,12 +93,12 @@ public class KafkaSinkConfig implements Serializable {
                     + " before considering a request complete. This controls 
the durability of records that are sent.")
     private String acks;
     @FieldDoc(
-            defaultValue = "16384L",
+            defaultValue = "16384",
             help = "The batch size that Kafka producer will attempt to batch 
records together"
                     + " before sending them to brokers.")
     private long batchSize = 16384L;
     @FieldDoc(
-            defaultValue = "1048576L",
+            defaultValue = "1048576",
             help =
                     "The maximum size of a Kafka request in bytes.")
     private long maxRequestSize = 1048576L;
@@ -129,8 +131,7 @@ public class KafkaSinkConfig implements Serializable {
         return mapper.readValue(new File(yamlFile), KafkaSinkConfig.class);
     }
 
-    public static KafkaSinkConfig load(Map<String, Object> map) throws 
IOException {
-        ObjectMapper mapper = new ObjectMapper();
-        return mapper.readValue(mapper.writeValueAsString(map), 
KafkaSinkConfig.class);
+    public static KafkaSinkConfig load(Map<String, Object> map, SinkContext 
sinkContext) throws IOException {
+        return IOConfigUtils.loadWithSecrets(map, KafkaSinkConfig.class, 
sinkContext);
     }
 }
\ No newline at end of file
diff --git 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java
 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java
index ad2e121d26a..422659cdda4 100644
--- 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java
+++ 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java
@@ -27,6 +27,7 @@ import java.io.Serializable;
 import java.util.Map;
 import lombok.Data;
 import lombok.experimental.Accessors;
+import org.apache.pulsar.io.core.SourceContext;
 import org.apache.pulsar.io.core.annotations.FieldDoc;
 
 @Data
@@ -158,8 +159,14 @@ public class KafkaSourceConfig implements Serializable {
         return mapper.readValue(new File(yamlFile), KafkaSourceConfig.class);
     }
 
-    public static KafkaSourceConfig load(Map<String, Object> map) throws 
IOException {
+    public static KafkaSourceConfig load(Map<String, Object> map, 
SourceContext sourceContext) throws IOException {
         ObjectMapper mapper = new ObjectMapper();
+        // since the KafkaSourceConfig requires the 
ACCEPT_EMPTY_STRING_AS_NULL_OBJECT feature
+        // We manually set the sensitive fields here instead of calling 
`IOConfigUtils.loadWithSecrets`
+        String sslTruststorePassword = 
sourceContext.getSecret("sslTruststorePassword");
+        if (sslTruststorePassword != null) {
+            map.put("sslTruststorePassword", sslTruststorePassword);
+        }
         
mapper.enable(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT);
         return mapper.readValue(mapper.writeValueAsString(map), 
KafkaSourceConfig.class);
     }
diff --git 
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
 
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
index ec9ee4a957d..c428f38fe08 100644
--- 
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
+++ 
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
@@ -188,12 +188,12 @@ public class KafkaAbstractSinkTest {
                 sink.close();
             }
         };
-        expectThrows(NullPointerException.class, "Kafka topic is not set", 
openAndClose);
-        config.put("topic", "topic_2");
-        expectThrows(NullPointerException.class, "Kafka bootstrapServers is 
not set", openAndClose);
+        expectThrows(IllegalArgumentException.class, "bootstrapServers cannot 
be null", openAndClose);
         config.put("bootstrapServers", "localhost:6667");
-        expectThrows(NullPointerException.class, "Kafka acks mode is not set", 
openAndClose);
+        expectThrows(IllegalArgumentException.class, "acks cannot be null", 
openAndClose);
         config.put("acks", "1");
+        expectThrows(IllegalArgumentException.class, "topic cannot be null", 
openAndClose);
+        config.put("topic", "topic_2");
         config.put("batchSize", "-1");
         expectThrows(IllegalArgumentException.class, "Invalid Kafka Producer 
batchSize : -1", openAndClose);
         config.put("batchSize", "16384");
diff --git 
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
 
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
index 6911ec2a6bf..0b8ba4b40ee 100644
--- 
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
+++ 
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
@@ -111,19 +111,39 @@ public class KafkaAbstractSourceTest {
     public void loadConsumerConfigPropertiesFromMapTest() throws Exception {
         Map<String, Object> config = new HashMap<>();
         config.put("consumerConfigProperties", "");
-        KafkaSourceConfig kafkaSourceConfig = KafkaSourceConfig.load(config);
+        config.put("bootstrapServers", "localhost:8080");
+        config.put("groupId", "test-group");
+        config.put("topic", "test-topic");
+        SourceContext sourceContext = Mockito.mock(SourceContext.class);
+        KafkaSourceConfig kafkaSourceConfig = KafkaSourceConfig.load(config, 
sourceContext);
         assertNotNull(kafkaSourceConfig);
         assertNull(kafkaSourceConfig.getConsumerConfigProperties());
 
         config.put("consumerConfigProperties", null);
-        kafkaSourceConfig = KafkaSourceConfig.load(config);
+        kafkaSourceConfig = KafkaSourceConfig.load(config, sourceContext);
         assertNull(kafkaSourceConfig.getConsumerConfigProperties());
 
         config.put("consumerConfigProperties", ImmutableMap.of("foo", "bar"));
-        kafkaSourceConfig = KafkaSourceConfig.load(config);
+        kafkaSourceConfig = KafkaSourceConfig.load(config, sourceContext);
         assertEquals(kafkaSourceConfig.getConsumerConfigProperties(), 
ImmutableMap.of("foo", "bar"));
     }
 
+    @Test
+    public void loadSensitiveFieldsFromSecretTest() throws Exception {
+        Map<String, Object> config = new HashMap<>();
+        config.put("consumerConfigProperties", "");
+        config.put("bootstrapServers", "localhost:8080");
+        config.put("groupId", "test-group");
+        config.put("topic", "test-topic");
+        SourceContext sourceContext = Mockito.mock(SourceContext.class);
+        Mockito.when(sourceContext.getSecret("sslTruststorePassword"))
+                .thenReturn("xxxx");
+        KafkaSourceConfig kafkaSourceConfig = KafkaSourceConfig.load(config, 
sourceContext);
+        assertNotNull(kafkaSourceConfig);
+        assertNull(kafkaSourceConfig.getConsumerConfigProperties());
+        assertEquals("xxxx", kafkaSourceConfig.getSslTruststorePassword());
+    }
+
     @Test
     public final void loadFromYamlFileTest() throws IOException {
         File yamlFile = getFile("kafkaSourceConfig.yaml");
diff --git a/pulsar-io/mongo/pom.xml b/pulsar-io/mongo/pom.xml
index 9dc72861a75..25c6c59d7a1 100644
--- a/pulsar-io/mongo/pom.xml
+++ b/pulsar-io/mongo/pom.xml
@@ -37,6 +37,11 @@
   </properties>
 
   <dependencies>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
     <dependency>
       <groupId>${project.parent.groupId}</groupId>
       <artifactId>pulsar-io-core</artifactId>
diff --git 
a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoAbstractConfig.java
 
b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoAbstractConfig.java
index 35c327ed82b..74f077da620 100644
--- 
a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoAbstractConfig.java
+++ 
b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoAbstractConfig.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import java.io.Serializable;
 import lombok.Data;
 import lombok.experimental.Accessors;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.io.core.annotations.FieldDoc;
 
 /**
@@ -42,6 +41,7 @@ public abstract class MongoAbstractConfig implements 
Serializable {
 
     @FieldDoc(
             required = true,
+            sensitive = true, // it may contain password
             defaultValue = "",
             help = "The URI of MongoDB that the connector connects to "
                     + "(see: 
https://docs.mongodb.com/manual/reference/connection-string/)"
@@ -95,7 +95,6 @@ public abstract class MongoAbstractConfig implements 
Serializable {
     }
 
     public void validate() {
-        checkArgument(!StringUtils.isEmpty(getMongoUri()), "Required MongoDB 
URI is not set.");
         checkArgument(getBatchSize() > 0, "batchSize must be a positive 
integer.");
         checkArgument(getBatchTimeMs() > 0, "batchTimeMs must be a positive 
long.");
     }
diff --git 
a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java 
b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java
index 2206d232eaf..61d5aeb697e 100644
--- a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java
+++ b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java
@@ -86,7 +86,7 @@ public class MongoSink implements Sink<byte[]> {
     public void open(Map<String, Object> config, SinkContext sinkContext) 
throws Exception {
         log.info("Open MongoDB Sink");
 
-        mongoSinkConfig = MongoSinkConfig.load(config);
+        mongoSinkConfig = MongoSinkConfig.load(config, sinkContext);
         mongoSinkConfig.validate();
 
         if (clientProvider != null) {
diff --git 
a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSinkConfig.java
 
b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSinkConfig.java
index 285f3c97bef..9431fe49108 100644
--- 
a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSinkConfig.java
+++ 
b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSinkConfig.java
@@ -30,6 +30,8 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.experimental.Accessors;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.io.common.IOConfigUtils;
+import org.apache.pulsar.io.core.SinkContext;
 
 /**
  * Configuration class for the MongoDB Sink Connectors.
@@ -59,11 +61,8 @@ public class MongoSinkConfig extends MongoAbstractConfig {
         return cfg;
     }
 
-    public static MongoSinkConfig load(Map<String, Object> map) throws 
IOException {
-        final ObjectMapper mapper = new ObjectMapper();
-        final MongoSinkConfig cfg = 
mapper.readValue(mapper.writeValueAsString(map), MongoSinkConfig.class);
-
-        return cfg;
+    public static MongoSinkConfig load(Map<String, Object> map, SinkContext 
sinkContext) throws IOException {
+        return IOConfigUtils.loadWithSecrets(map, MongoSinkConfig.class, 
sinkContext);
     }
 
     @Override
diff --git 
a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSource.java 
b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSource.java
index 6ee95fc4cd4..68a31b461a5 100644
--- 
a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSource.java
+++ 
b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSource.java
@@ -79,7 +79,7 @@ public class MongoSource extends PushSource<byte[]> {
     public void open(Map<String, Object> config, SourceContext sourceContext) 
throws Exception {
         log.info("Open MongoDB Source");
 
-        mongoSourceConfig = MongoSourceConfig.load(config);
+        mongoSourceConfig = MongoSourceConfig.load(config, sourceContext);
         mongoSourceConfig.validate();
 
         if (clientProvider != null) {
diff --git 
a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSourceConfig.java
 
b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSourceConfig.java
index cf887a93bf3..1c0c7f4b365 100644
--- 
a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSourceConfig.java
+++ 
b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSourceConfig.java
@@ -29,6 +29,8 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.experimental.Accessors;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.io.common.IOConfigUtils;
+import org.apache.pulsar.io.core.SourceContext;
 import org.apache.pulsar.io.core.annotations.FieldDoc;
 
 /**
@@ -75,12 +77,8 @@ public class MongoSourceConfig extends MongoAbstractConfig {
         return cfg;
     }
 
-    public static MongoSourceConfig load(Map<String, Object> map) throws 
IOException {
-        final ObjectMapper mapper = new ObjectMapper();
-        final MongoSourceConfig cfg =
-                mapper.readValue(mapper.writeValueAsString(map), 
MongoSourceConfig.class);
-
-        return cfg;
+    public static MongoSourceConfig load(Map<String, Object> map, 
SourceContext sourceContext) throws IOException {
+        return IOConfigUtils.loadWithSecrets(map, MongoSourceConfig.class, 
sourceContext);
     }
 
     /**
diff --git 
a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkConfigTest.java
 
b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkConfigTest.java
index b1166eac572..c86e45feb23 100644
--- 
a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkConfigTest.java
+++ 
b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkConfigTest.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.io.mongodb;
 
 import java.util.Map;
+import org.apache.pulsar.io.core.SinkContext;
+import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
 import java.io.File;
@@ -34,7 +36,27 @@ public class MongoSinkConfigTest {
         commonConfigMap.put("batchSize", TestHelper.BATCH_SIZE);
         commonConfigMap.put("batchTimeMs", TestHelper.BATCH_TIME);
 
-        final MongoSinkConfig cfg = MongoSinkConfig.load(commonConfigMap);
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        final MongoSinkConfig cfg = MongoSinkConfig.load(commonConfigMap, 
sinkContext);
+
+        assertEquals(cfg.getMongoUri(), TestHelper.URI);
+        assertEquals(cfg.getDatabase(), TestHelper.DB);
+        assertEquals(cfg.getCollection(), TestHelper.COLL);
+        assertEquals(cfg.getBatchSize(), TestHelper.BATCH_SIZE);
+        assertEquals(cfg.getBatchTimeMs(), TestHelper.BATCH_TIME);
+    }
+
+    @Test
+    public void testLoadMapConfigUrlFromSecret() throws IOException {
+        final Map<String, Object> commonConfigMap = 
TestHelper.createCommonConfigMap();
+        commonConfigMap.put("batchSize", TestHelper.BATCH_SIZE);
+        commonConfigMap.put("batchTimeMs", TestHelper.BATCH_TIME);
+        commonConfigMap.remove("mongoUri");
+
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        Mockito.when(sinkContext.getSecret("mongoUri"))
+                .thenReturn(TestHelper.URI);
+        final MongoSinkConfig cfg = MongoSinkConfig.load(commonConfigMap, 
sinkContext);
 
         assertEquals(cfg.getMongoUri(), TestHelper.URI);
         assertEquals(cfg.getDatabase(), TestHelper.DB);
@@ -44,12 +66,13 @@ public class MongoSinkConfigTest {
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class,
-            expectedExceptionsMessageRegExp = "Required MongoDB URI is not 
set.")
+            expectedExceptionsMessageRegExp = "mongoUri cannot be null")
     public void testBadMongoUri() throws IOException {
         final Map<String, Object> configMap = 
TestHelper.createCommonConfigMap();
         TestHelper.removeMongoUri(configMap);
 
-        final MongoSinkConfig cfg = MongoSinkConfig.load(configMap);
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        final MongoSinkConfig cfg = MongoSinkConfig.load(configMap, 
sinkContext);
 
         cfg.validate();
     }
@@ -60,7 +83,8 @@ public class MongoSinkConfigTest {
         final Map<String, Object> configMap = 
TestHelper.createCommonConfigMap();
         TestHelper.removeDatabase(configMap);
 
-        final MongoSinkConfig cfg = MongoSinkConfig.load(configMap);
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        final MongoSinkConfig cfg = MongoSinkConfig.load(configMap, 
sinkContext);
 
         cfg.validate();
     }
@@ -71,7 +95,8 @@ public class MongoSinkConfigTest {
         final Map<String, Object> configMap = 
TestHelper.createCommonConfigMap();
         TestHelper.removeCollection(configMap);
 
-        final MongoSinkConfig cfg = MongoSinkConfig.load(configMap);
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        final MongoSinkConfig cfg = MongoSinkConfig.load(configMap, 
sinkContext);
 
         cfg.validate();
     }
@@ -82,7 +107,8 @@ public class MongoSinkConfigTest {
         final Map<String, Object> configMap = 
TestHelper.createCommonConfigMap();
         TestHelper.putBatchSize(configMap, 0);
 
-        final MongoSinkConfig cfg = MongoSinkConfig.load(configMap);
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        final MongoSinkConfig cfg = MongoSinkConfig.load(configMap, 
sinkContext);
 
         cfg.validate();
     }
@@ -93,7 +119,8 @@ public class MongoSinkConfigTest {
         final Map<String, Object> configMap = 
TestHelper.createCommonConfigMap();
         TestHelper.putBatchTime(configMap, 0L);
 
-        final MongoSinkConfig cfg = MongoSinkConfig.load(configMap);
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        final MongoSinkConfig cfg = MongoSinkConfig.load(configMap, 
sinkContext);
 
         cfg.validate();
     }
diff --git 
a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceConfigTest.java
 
b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceConfigTest.java
index e7fd01549b0..528cd0237ef 100644
--- 
a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceConfigTest.java
+++ 
b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceConfigTest.java
@@ -23,6 +23,8 @@ import com.fasterxml.jackson.databind.JsonMappingException;
 import java.io.File;
 import java.io.IOException;
 import java.util.Map;
+import org.apache.pulsar.io.core.SourceContext;
+import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
 public class MongoSourceConfigTest {
@@ -32,7 +34,27 @@ public class MongoSourceConfigTest {
         final Map<String, Object> configMap = 
TestHelper.createCommonConfigMap();
         TestHelper.putSyncType(configMap, TestHelper.SYNC_TYPE);
 
-        final MongoSourceConfig cfg = MongoSourceConfig.load(configMap);
+        SourceContext sourceContext = Mockito.mock(SourceContext.class);
+        final MongoSourceConfig cfg = MongoSourceConfig.load(configMap, 
sourceContext);
+
+        assertEquals(cfg.getMongoUri(), TestHelper.URI);
+        assertEquals(cfg.getDatabase(), TestHelper.DB);
+        assertEquals(cfg.getCollection(), TestHelper.COLL);
+        assertEquals(cfg.getSyncType(), TestHelper.SYNC_TYPE);
+        assertEquals(cfg.getBatchSize(), TestHelper.BATCH_SIZE);
+        assertEquals(cfg.getBatchTimeMs(), TestHelper.BATCH_TIME);
+    }
+
+    @Test
+    public void testLoadMapConfigUriFromSecret() throws IOException {
+        final Map<String, Object> configMap = 
TestHelper.createCommonConfigMap();
+        TestHelper.putSyncType(configMap, TestHelper.SYNC_TYPE);
+        configMap.remove("mongoUri");
+
+        SourceContext sourceContext = Mockito.mock(SourceContext.class);
+        Mockito.when(sourceContext.getSecret("mongoUri"))
+                .thenReturn(TestHelper.URI);
+        final MongoSourceConfig cfg = MongoSourceConfig.load(configMap, 
sourceContext);
 
         assertEquals(cfg.getMongoUri(), TestHelper.URI);
         assertEquals(cfg.getDatabase(), TestHelper.DB);
@@ -43,12 +65,13 @@ public class MongoSourceConfigTest {
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class,
-            expectedExceptionsMessageRegExp = "Required MongoDB URI is not 
set.")
+            expectedExceptionsMessageRegExp = "mongoUri cannot be null")
     public void testBadMongoUri() throws IOException {
         final Map<String, Object> configMap = 
TestHelper.createCommonConfigMap();
         TestHelper.removeMongoUri(configMap);
 
-        final MongoSourceConfig cfg = MongoSourceConfig.load(configMap);
+        SourceContext sourceContext = Mockito.mock(SourceContext.class);
+        final MongoSourceConfig cfg = MongoSourceConfig.load(configMap, 
sourceContext);
 
         cfg.validate();
     }
@@ -61,7 +84,8 @@ public class MongoSourceConfigTest {
         final Map<String, Object> configMap = 
TestHelper.createCommonConfigMap();
         TestHelper.putSyncType(configMap, "wrong_sync_type_str");
 
-        final MongoSourceConfig cfg = MongoSourceConfig.load(configMap);
+        SourceContext sourceContext = Mockito.mock(SourceContext.class);
+        final MongoSourceConfig cfg = MongoSourceConfig.load(configMap, 
sourceContext);
 
         cfg.validate();
     }
@@ -72,7 +96,8 @@ public class MongoSourceConfigTest {
         final Map<String, Object> configMap = 
TestHelper.createCommonConfigMap();
         TestHelper.putBatchSize(configMap, 0);
 
-        final MongoSourceConfig cfg = MongoSourceConfig.load(configMap);
+        SourceContext sourceContext = Mockito.mock(SourceContext.class);
+        final MongoSourceConfig cfg = MongoSourceConfig.load(configMap, 
sourceContext);
 
         cfg.validate();
     }
@@ -83,7 +108,8 @@ public class MongoSourceConfigTest {
         final Map<String, Object> configMap = 
TestHelper.createCommonConfigMap();
         TestHelper.putBatchTime(configMap, 0L);
 
-        final MongoSourceConfig cfg = MongoSourceConfig.load(configMap);
+        SourceContext sourceContext = Mockito.mock(SourceContext.class);
+        final MongoSourceConfig cfg = MongoSourceConfig.load(configMap, 
sourceContext);
 
         cfg.validate();
     }
diff --git a/pulsar-io/rabbitmq/pom.xml b/pulsar-io/rabbitmq/pom.xml
index 85e0452df1b..e35b5516858 100644
--- a/pulsar-io/rabbitmq/pom.xml
+++ b/pulsar-io/rabbitmq/pom.xml
@@ -32,6 +32,11 @@
 
   <dependencies>
 
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
     <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-io-core</artifactId>
diff --git 
a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java
 
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java
index f317a35734e..89192c42346 100644
--- 
a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java
+++ 
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java
@@ -53,7 +53,7 @@ public class RabbitMQSink implements Sink<byte[]> {
 
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) 
throws Exception {
-        rabbitMQSinkConfig = RabbitMQSinkConfig.load(config);
+        rabbitMQSinkConfig = RabbitMQSinkConfig.load(config, sinkContext);
         rabbitMQSinkConfig.validate();
 
         ConnectionFactory connectionFactory = 
rabbitMQSinkConfig.createConnectionFactory();
diff --git 
a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSinkConfig.java
 
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSinkConfig.java
index c1f8d6b8ad3..39f97e5e460 100644
--- 
a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSinkConfig.java
+++ 
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSinkConfig.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.io.rabbitmq;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
-import com.google.common.base.Preconditions;
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
@@ -28,6 +27,8 @@ import java.util.Map;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.experimental.Accessors;
+import org.apache.pulsar.io.common.IOConfigUtils;
+import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.annotations.FieldDoc;
 
 @Data
@@ -60,14 +61,12 @@ public class RabbitMQSinkConfig extends 
RabbitMQAbstractConfig implements Serial
         return mapper.readValue(new File(yamlFile), RabbitMQSinkConfig.class);
     }
 
-    public static RabbitMQSinkConfig load(Map<String, Object> map) throws 
IOException {
-        ObjectMapper mapper = new ObjectMapper();
-        return mapper.readValue(mapper.writeValueAsString(map), 
RabbitMQSinkConfig.class);
+    public static RabbitMQSinkConfig load(Map<String, Object> map, SinkContext 
sinkContext) throws IOException {
+        return IOConfigUtils.loadWithSecrets(map, RabbitMQSinkConfig.class, 
sinkContext);
     }
 
     @Override
     public void validate() {
         super.validate();
-        Preconditions.checkNotNull(exchangeName, "exchangeName property not 
set.");
     }
 }
diff --git 
a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
 
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
index d15108c4d82..b0b7ef31b08 100644
--- 
a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
+++ 
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
@@ -54,7 +54,7 @@ public class RabbitMQSource extends PushSource<byte[]> {
 
     @Override
     public void open(Map<String, Object> config, SourceContext sourceContext) 
throws Exception {
-        rabbitMQSourceConfig = RabbitMQSourceConfig.load(config);
+        rabbitMQSourceConfig = RabbitMQSourceConfig.load(config, 
sourceContext);
         rabbitMQSourceConfig.validate();
 
         ConnectionFactory connectionFactory = 
rabbitMQSourceConfig.createConnectionFactory();
diff --git 
a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSourceConfig.java
 
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSourceConfig.java
index f24018e70da..01e23a71460 100644
--- 
a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSourceConfig.java
+++ 
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSourceConfig.java
@@ -28,6 +28,8 @@ import java.util.Map;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.experimental.Accessors;
+import org.apache.pulsar.io.common.IOConfigUtils;
+import org.apache.pulsar.io.core.SourceContext;
 import org.apache.pulsar.io.core.annotations.FieldDoc;
 
 @Data
@@ -66,9 +68,8 @@ public class RabbitMQSourceConfig extends 
RabbitMQAbstractConfig implements Seri
         return mapper.readValue(new File(yamlFile), 
RabbitMQSourceConfig.class);
     }
 
-    public static RabbitMQSourceConfig load(Map<String, Object> map) throws 
IOException {
-        ObjectMapper mapper = new ObjectMapper();
-        return mapper.readValue(mapper.writeValueAsString(map), 
RabbitMQSourceConfig.class);
+    public static RabbitMQSourceConfig load(Map<String, Object> map, 
SourceContext sourceContext) throws IOException {
+        return IOConfigUtils.loadWithSecrets(map, RabbitMQSourceConfig.class, 
sourceContext);
     }
 
     @Override
diff --git 
a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkConfigTest.java
 
b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkConfigTest.java
index 3d4fd6f46e1..8706cb56752 100644
--- 
a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkConfigTest.java
+++ 
b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkConfigTest.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pulsar.io.rabbitmq.sink;
 
+import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.rabbitmq.RabbitMQSinkConfig;
+import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
 import java.io.File;
@@ -71,7 +73,45 @@ public class RabbitMQSinkConfigTest {
         map.put("exchangeName", "test-exchange");
         map.put("exchangeType", "test-exchange-type");
 
-        RabbitMQSinkConfig config = RabbitMQSinkConfig.load(map);
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        RabbitMQSinkConfig config = RabbitMQSinkConfig.load(map, sinkContext);
+        assertNotNull(config);
+        assertEquals(config.getHost(), "localhost");
+        assertEquals(config.getPort(), Integer.parseInt("5673"));
+        assertEquals(config.getVirtualHost(), "/");
+        assertEquals(config.getUsername(), "guest");
+        assertEquals(config.getPassword(), "guest");
+        assertEquals(config.getConnectionName(), "test-connection");
+        assertEquals(config.getRequestedChannelMax(), Integer.parseInt("0"));
+        assertEquals(config.getRequestedFrameMax(), Integer.parseInt("0"));
+        assertEquals(config.getConnectionTimeout(), Integer.parseInt("60000"));
+        assertEquals(config.getHandshakeTimeout(), Integer.parseInt("10000"));
+        assertEquals(config.getRequestedHeartbeat(), Integer.parseInt("60"));
+        assertEquals(config.getExchangeName(), "test-exchange");
+        assertEquals(config.getExchangeType(), "test-exchange-type");
+    }
+
+    @Test
+    public final void loadFromMapCredentialsFromSecretTest() throws 
IOException {
+        Map<String, Object> map = new HashMap<>();
+        map.put("host", "localhost");
+        map.put("port", "5673");
+        map.put("virtualHost", "/");
+        map.put("connectionName", "test-connection");
+        map.put("requestedChannelMax", "0");
+        map.put("requestedFrameMax", "0");
+        map.put("connectionTimeout", "60000");
+        map.put("handshakeTimeout", "10000");
+        map.put("requestedHeartbeat", "60");
+        map.put("exchangeName", "test-exchange");
+        map.put("exchangeType", "test-exchange-type");
+
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        Mockito.when(sinkContext.getSecret("username"))
+                .thenReturn("guest");
+        Mockito.when(sinkContext.getSecret("password"))
+                .thenReturn("guest");
+        RabbitMQSinkConfig config = RabbitMQSinkConfig.load(map, sinkContext);
         assertNotNull(config);
         assertEquals(config.getHost(), "localhost");
         assertEquals(config.getPort(), Integer.parseInt("5673"));
@@ -105,12 +145,13 @@ public class RabbitMQSinkConfigTest {
         map.put("exchangeName", "test-exchange");
         map.put("exchangeType", "test-exchange-type");
 
-        RabbitMQSinkConfig config = RabbitMQSinkConfig.load(map);
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        RabbitMQSinkConfig config = RabbitMQSinkConfig.load(map, sinkContext);
         config.validate();
     }
 
-    @Test(expectedExceptions = NullPointerException.class,
-        expectedExceptionsMessageRegExp = "exchangeName property not set.")
+    @Test(expectedExceptions = IllegalArgumentException.class,
+        expectedExceptionsMessageRegExp = "exchangeName cannot be null")
     public final void missingExchangeValidateTest() throws IOException {
         Map<String, Object> map = new HashMap<>();
         map.put("host", "localhost");
@@ -126,7 +167,8 @@ public class RabbitMQSinkConfigTest {
         map.put("requestedHeartbeat", "60");
         map.put("exchangeType", "test-exchange-type");
 
-        RabbitMQSinkConfig config = RabbitMQSinkConfig.load(map);
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        RabbitMQSinkConfig config = RabbitMQSinkConfig.load(map, sinkContext);
         config.validate();
     }
 
diff --git 
a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceConfigTest.java
 
b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceConfigTest.java
index c33e0070c6f..43a90062fa4 100644
--- 
a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceConfigTest.java
+++ 
b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceConfigTest.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pulsar.io.rabbitmq.source;
 
+import org.apache.pulsar.io.core.SourceContext;
 import org.apache.pulsar.io.rabbitmq.RabbitMQSourceConfig;
+import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
 import java.io.File;
@@ -76,7 +78,50 @@ public class RabbitMQSourceConfigTest {
         map.put("prefetchGlobal", "false");
         map.put("passive", "true");
 
-        RabbitMQSourceConfig config = RabbitMQSourceConfig.load(map);
+        SourceContext sourceContext = Mockito.mock(SourceContext.class);
+        RabbitMQSourceConfig config = RabbitMQSourceConfig.load(map, 
sourceContext);
+        assertNotNull(config);
+        assertEquals("localhost", config.getHost());
+        assertEquals(Integer.parseInt("5672"), config.getPort());
+        assertEquals("/", config.getVirtualHost());
+        assertEquals("guest", config.getUsername());
+        assertEquals("guest", config.getPassword());
+        assertEquals("test-queue", config.getQueueName());
+        assertEquals("test-connection", config.getConnectionName());
+        assertEquals(Integer.parseInt("0"), config.getRequestedChannelMax());
+        assertEquals(Integer.parseInt("0"), config.getRequestedFrameMax());
+        assertEquals(Integer.parseInt("60000"), config.getConnectionTimeout());
+        assertEquals(Integer.parseInt("10000"), config.getHandshakeTimeout());
+        assertEquals(Integer.parseInt("60"), config.getRequestedHeartbeat());
+        assertEquals(Integer.parseInt("0"), config.getPrefetchCount());
+        assertEquals(false, config.isPrefetchGlobal());
+        assertEquals(false, config.isPrefetchGlobal());
+        assertEquals(true, config.isPassive());
+    }
+
+    @Test
+    public final void loadFromMapCredentialsFromSecretTest() throws 
IOException {
+        Map<String, Object> map = new HashMap<>();
+        map.put("host", "localhost");
+        map.put("port", "5672");
+        map.put("virtualHost", "/");
+        map.put("queueName", "test-queue");
+        map.put("connectionName", "test-connection");
+        map.put("requestedChannelMax", "0");
+        map.put("requestedFrameMax", "0");
+        map.put("connectionTimeout", "60000");
+        map.put("handshakeTimeout", "10000");
+        map.put("requestedHeartbeat", "60");
+        map.put("prefetchCount", "0");
+        map.put("prefetchGlobal", "false");
+        map.put("passive", "true");
+
+        SourceContext sourceContext = Mockito.mock(SourceContext.class);
+        Mockito.when(sourceContext.getSecret("username"))
+                .thenReturn("guest");
+        Mockito.when(sourceContext.getSecret("password"))
+                .thenReturn("guest");
+        RabbitMQSourceConfig config = RabbitMQSourceConfig.load(map, 
sourceContext);
         assertNotNull(config);
         assertEquals("localhost", config.getHost());
         assertEquals(Integer.parseInt("5672"), config.getPort());
@@ -115,12 +160,13 @@ public class RabbitMQSourceConfigTest {
         map.put("prefetchGlobal", "false");
         map.put("passive", "false");
 
-        RabbitMQSourceConfig config = RabbitMQSourceConfig.load(map);
+        SourceContext sourceContext = Mockito.mock(SourceContext.class);
+        RabbitMQSourceConfig config = RabbitMQSourceConfig.load(map, 
sourceContext);
         config.validate();
     }
 
-    @Test(expectedExceptions = NullPointerException.class,
-        expectedExceptionsMessageRegExp = "host property not set.")
+    @Test(expectedExceptions = IllegalArgumentException.class,
+        expectedExceptionsMessageRegExp = "host cannot be null")
     public final void missingHostValidateTest() throws IOException {
         Map<String, Object> map = new HashMap<>();
         map.put("port", "5672");
@@ -138,7 +184,8 @@ public class RabbitMQSourceConfigTest {
         map.put("prefetchGlobal", "false");
         map.put("passive", "false");
 
-        RabbitMQSourceConfig config = RabbitMQSourceConfig.load(map);
+        SourceContext sourceContext = Mockito.mock(SourceContext.class);
+        RabbitMQSourceConfig config = RabbitMQSourceConfig.load(map, 
sourceContext);
         config.validate();
     }
 
@@ -162,7 +209,8 @@ public class RabbitMQSourceConfigTest {
         map.put("prefetchGlobal", "false");
         map.put("passive", "false");
 
-        RabbitMQSourceConfig config = RabbitMQSourceConfig.load(map);
+        SourceContext sourceContext = Mockito.mock(SourceContext.class);
+        RabbitMQSourceConfig config = RabbitMQSourceConfig.load(map, 
sourceContext);
         config.validate();
     }
 
diff --git a/pulsar-io/redis/pom.xml b/pulsar-io/redis/pom.xml
index 1b9d2d76d3f..ef66a3f37c6 100644
--- a/pulsar-io/redis/pom.xml
+++ b/pulsar-io/redis/pom.xml
@@ -32,6 +32,11 @@
     <name>Pulsar IO :: Redis</name>
 
     <dependencies>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>pulsar-io-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>${project.groupId}</groupId>
             <artifactId>pulsar-io-core</artifactId>
diff --git 
a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/RedisAbstractConfig.java
 
b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/RedisAbstractConfig.java
index 978e7de31a5..89ec684dded 100644
--- 
a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/RedisAbstractConfig.java
+++ 
b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/RedisAbstractConfig.java
@@ -88,13 +88,11 @@ public class RedisAbstractConfig implements Serializable {
 
     @FieldDoc(
         required = false,
-        defaultValue = "10000L",
+        defaultValue = "10000",
         help = "The amount of time in milliseconds to wait before timing out 
when connecting")
     private long connectTimeout = 10000L;
 
     public void validate() {
-        Preconditions.checkNotNull(redisHosts, "redisHosts property not set.");
-        Preconditions.checkNotNull(redisDatabase, "redisDatabase property not 
set.");
         Preconditions.checkNotNull(clientMode, "clientMode property not set.");
     }
 
@@ -105,7 +103,6 @@ public class RedisAbstractConfig implements Serializable {
 
     public List<HostAndPort> getHostAndPorts() {
         List<HostAndPort> hostAndPorts = Lists.newArrayList();
-        Preconditions.checkNotNull(redisHosts, "redisHosts property not set.");
         String[] hosts = StringUtils.split(redisHosts, ",");
         for (String host : hosts) {
             HostAndPort hostAndPort = HostAndPort.fromString(host);
diff --git 
a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java 
b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java
index bff0a5c2da5..ebd6e9dbab2 100644
--- 
a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java
+++ 
b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java
@@ -68,7 +68,7 @@ public class RedisSink implements Sink<byte[]> {
     public void open(Map<String, Object> config, SinkContext sinkContext) 
throws Exception {
         log.info("Open Redis Sink");
 
-        redisSinkConfig = RedisSinkConfig.load(config);
+        redisSinkConfig = RedisSinkConfig.load(config, sinkContext);
         redisSinkConfig.validate();
 
         redisSession = RedisSession.create(redisSinkConfig);
diff --git 
a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSinkConfig.java
 
b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSinkConfig.java
index a9db66812a4..f7a70cb65a8 100644
--- 
a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSinkConfig.java
+++ 
b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSinkConfig.java
@@ -28,6 +28,8 @@ import java.util.Map;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.experimental.Accessors;
+import org.apache.pulsar.io.common.IOConfigUtils;
+import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.annotations.FieldDoc;
 import org.apache.pulsar.io.redis.RedisAbstractConfig;
 
@@ -40,13 +42,13 @@ public class RedisSinkConfig extends RedisAbstractConfig 
implements Serializable
 
     @FieldDoc(
         required = false,
-        defaultValue = "10000L",
+        defaultValue = "10000",
         help = "The amount of time in milliseconds before an operation is 
marked as timed out")
     private long operationTimeout = 10000L;
 
     @FieldDoc(
         required = false,
-        defaultValue = "1000L",
+        defaultValue = "1000",
         help = "The Redis operation time in milliseconds")
     private long batchTimeMs = 1000L;
 
@@ -62,9 +64,8 @@ public class RedisSinkConfig extends RedisAbstractConfig 
implements Serializable
         return mapper.readValue(new File(yamlFile), RedisSinkConfig.class);
     }
 
-    public static RedisSinkConfig load(Map<String, Object> map) throws 
IOException {
-        ObjectMapper mapper = new ObjectMapper();
-        return mapper.readValue(mapper.writeValueAsString(map), 
RedisSinkConfig.class);
+    public static RedisSinkConfig load(Map<String, Object> map, SinkContext 
sinkContext) throws IOException {
+        return IOConfigUtils.loadWithSecrets(map, RedisSinkConfig.class, 
sinkContext);
     }
 
     @Override
diff --git 
a/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkConfigTest.java
 
b/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkConfigTest.java
index 1316d0994a1..39fc6e540c2 100644
--- 
a/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkConfigTest.java
+++ 
b/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkConfigTest.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pulsar.io.redis.sink;
 
+import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.redis.RedisAbstractConfig;
+import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
 import java.io.File;
@@ -62,7 +64,34 @@ public class RedisSinkConfigTest {
         map.put("batchTimeMs", "1000");
         map.put("connectTimeout", "3000");
 
-        RedisSinkConfig config = RedisSinkConfig.load(map);
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        RedisSinkConfig config = RedisSinkConfig.load(map, sinkContext);
+        assertNotNull(config);
+        assertEquals(config.getRedisHosts(), "localhost:6379");
+        assertEquals(config.getRedisPassword(), "fake@123");
+        assertEquals(config.getRedisDatabase(), Integer.parseInt("1"));
+        assertEquals(config.getClientMode(), "Standalone");
+        assertEquals(config.getOperationTimeout(), Long.parseLong("2000"));
+        assertEquals(config.getBatchSize(), Integer.parseInt("100"));
+        assertEquals(config.getBatchTimeMs(), Long.parseLong("1000"));
+        assertEquals(config.getConnectTimeout(), Long.parseLong("3000"));
+    }
+
+    @Test
+    public final void loadFromMapCredentialsFromSecretTest() throws 
IOException {
+        Map<String, Object> map = new HashMap<String, Object>();
+        map.put("redisHosts", "localhost:6379");
+        map.put("redisDatabase", "1");
+        map.put("clientMode", "Standalone");
+        map.put("operationTimeout", "2000");
+        map.put("batchSize", "100");
+        map.put("batchTimeMs", "1000");
+        map.put("connectTimeout", "3000");
+
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        Mockito.when(sinkContext.getSecret("redisPassword"))
+                .thenReturn("fake@123");
+        RedisSinkConfig config = RedisSinkConfig.load(map, sinkContext);
         assertNotNull(config);
         assertEquals(config.getRedisHosts(), "localhost:6379");
         assertEquals(config.getRedisPassword(), "fake@123");
@@ -86,12 +115,13 @@ public class RedisSinkConfigTest {
         map.put("batchTimeMs", "1000");
         map.put("connectTimeout", "3000");
 
-        RedisSinkConfig config = RedisSinkConfig.load(map);
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        RedisSinkConfig config = RedisSinkConfig.load(map, sinkContext);
         config.validate();
     }
 
-    @Test(expectedExceptions = NullPointerException.class,
-        expectedExceptionsMessageRegExp = "redisHosts property not set.")
+    @Test(expectedExceptions = IllegalArgumentException.class,
+        expectedExceptionsMessageRegExp = "redisHosts cannot be null")
     public final void missingValidValidateTableNameTest() throws IOException {
         Map<String, Object> map = new HashMap<String, Object>();
         map.put("redisPassword", "fake@123");
@@ -102,7 +132,8 @@ public class RedisSinkConfigTest {
         map.put("batchTimeMs", "1000");
         map.put("connectTimeout", "3000");
 
-        RedisSinkConfig config = RedisSinkConfig.load(map);
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        RedisSinkConfig config = RedisSinkConfig.load(map, sinkContext);
         config.validate();
     }
 
@@ -119,7 +150,8 @@ public class RedisSinkConfigTest {
         map.put("batchTimeMs", "-100");
         map.put("connectTimeout", "3000");
 
-        RedisSinkConfig config = RedisSinkConfig.load(map);
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        RedisSinkConfig config = RedisSinkConfig.load(map, sinkContext);
         config.validate();
     }
 
@@ -136,7 +168,8 @@ public class RedisSinkConfigTest {
         map.put("batchTimeMs", "1000");
         map.put("connectTimeout", "3000");
 
-        RedisSinkConfig config = RedisSinkConfig.load(map);
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        RedisSinkConfig config = RedisSinkConfig.load(map, sinkContext);
         config.validate();
 
         
RedisAbstractConfig.ClientMode.valueOf(config.getClientMode().toUpperCase());
diff --git 
a/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java
 
b/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java
index 214151345b4..2b407fafa5e 100644
--- 
a/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java
+++ 
b/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java
@@ -21,7 +21,9 @@ package org.apache.pulsar.io.redis.sink;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.instance.SinkRecord;
+import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.redis.EmbeddedRedisUtils;
+import org.mockito.Mockito;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -66,7 +68,8 @@ public class RedisSinkTest {
         Record<byte[]> record = build("fakeTopic", "fakeKey", "fakeValue");
 
         // open should success
-        sink.open(configs, null);
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        sink.open(configs, sinkContext);
 
         // write should success.
         sink.write(record);
diff --git a/pulsar-io/solr/pom.xml b/pulsar-io/solr/pom.xml
index deb0e70e2e0..ffd1340f5f3 100644
--- a/pulsar-io/solr/pom.xml
+++ b/pulsar-io/solr/pom.xml
@@ -36,6 +36,11 @@
     <name>Pulsar IO :: Solr</name>
 
     <dependencies>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>pulsar-io-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>${project.parent.groupId}</groupId>
             <artifactId>pulsar-io-core</artifactId>
diff --git 
a/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java 
b/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java
index de9cdb4a9d8..202c782c14c 100644
--- 
a/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java
+++ 
b/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java
@@ -48,7 +48,7 @@ public abstract class SolrAbstractSink<T> implements Sink<T> {
 
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) 
throws Exception {
-        solrSinkConfig = SolrSinkConfig.load(config);
+        solrSinkConfig = SolrSinkConfig.load(config, sinkContext);
         solrSinkConfig.validate();
 
         enableBasicAuth = !Strings.isNullOrEmpty(solrSinkConfig.getUsername());
diff --git 
a/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrSinkConfig.java 
b/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrSinkConfig.java
index 02733d230bd..daa93a366b1 100644
--- a/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrSinkConfig.java
+++ b/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrSinkConfig.java
@@ -27,6 +27,8 @@ import java.io.Serializable;
 import java.util.Map;
 import lombok.Data;
 import lombok.experimental.Accessors;
+import org.apache.pulsar.io.common.IOConfigUtils;
+import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.annotations.FieldDoc;
 
 /**
@@ -84,9 +86,8 @@ public class SolrSinkConfig implements Serializable {
         return mapper.readValue(new File(yamlFile), SolrSinkConfig.class);
     }
 
-    public static SolrSinkConfig load(Map<String, Object> map) throws 
IOException {
-        ObjectMapper mapper = new ObjectMapper();
-        return mapper.readValue(mapper.writeValueAsString(map), 
SolrSinkConfig.class);
+    public static SolrSinkConfig load(Map<String, Object> map, SinkContext 
sinkContext) throws IOException {
+        return IOConfigUtils.loadWithSecrets(map, SolrSinkConfig.class, 
sinkContext);
     }
 
     public void validate() {
diff --git 
a/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrSinkConfigTest.java
 
b/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrSinkConfigTest.java
index 42d2121dbfc..2c2447a637d 100644
--- 
a/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrSinkConfigTest.java
+++ 
b/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrSinkConfigTest.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.io.solr;
 
 import com.google.common.collect.Lists;
+import org.apache.pulsar.io.core.SinkContext;
+import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
 import java.io.File;
@@ -61,7 +63,31 @@ public class SolrSinkConfigTest {
         map.put("username", "fakeuser");
         map.put("password", "fake@123");
 
-        SolrSinkConfig config = SolrSinkConfig.load(map);
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        SolrSinkConfig config = SolrSinkConfig.load(map, sinkContext);
+        assertNotNull(config);
+        assertEquals(config.getSolrUrl(), 
"localhost:2181,localhost:2182/chroot");
+        assertEquals(config.getSolrMode(), "SolrCloud");
+        assertEquals(config.getSolrCollection(), "techproducts");
+        assertEquals(config.getSolrCommitWithinMs(), Integer.parseInt("100"));
+        assertEquals(config.getUsername(), "fakeuser");
+        assertEquals(config.getPassword(), "fake@123");
+    }
+
+    @Test
+    public final void loadFromMapCredentialsFromSecretTest() throws 
IOException {
+        Map<String, Object> map = new HashMap<>();
+        map.put("solrUrl", "localhost:2181,localhost:2182/chroot");
+        map.put("solrMode", "SolrCloud");
+        map.put("solrCollection", "techproducts");
+        map.put("solrCommitWithinMs", "100");
+
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        Mockito.when(sinkContext.getSecret("username"))
+                .thenReturn("fakeuser");
+        Mockito.when(sinkContext.getSecret("password"))
+                .thenReturn("fake@123");
+        SolrSinkConfig config = SolrSinkConfig.load(map, sinkContext);
         assertNotNull(config);
         assertEquals(config.getSolrUrl(), 
"localhost:2181,localhost:2182/chroot");
         assertEquals(config.getSolrMode(), "SolrCloud");
@@ -81,12 +107,13 @@ public class SolrSinkConfigTest {
         map.put("username", "fakeuser");
         map.put("password", "fake@123");
 
-        SolrSinkConfig config = SolrSinkConfig.load(map);
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        SolrSinkConfig config = SolrSinkConfig.load(map, sinkContext);
         config.validate();
     }
 
-    @Test(expectedExceptions = NullPointerException.class,
-        expectedExceptionsMessageRegExp = "solrUrl property not set.")
+    @Test(expectedExceptions = IllegalArgumentException.class,
+        expectedExceptionsMessageRegExp = "solrUrl cannot be null")
     public final void missingValidValidateSolrModeTest() throws IOException {
         Map<String, Object> map = new HashMap<>();
         map.put("solrMode", "SolrCloud");
@@ -95,7 +122,8 @@ public class SolrSinkConfigTest {
         map.put("username", "fakeuser");
         map.put("password", "fake@123");
 
-        SolrSinkConfig config = SolrSinkConfig.load(map);
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        SolrSinkConfig config = SolrSinkConfig.load(map, sinkContext);
         config.validate();
     }
 
@@ -110,7 +138,8 @@ public class SolrSinkConfigTest {
         map.put("username", "fakeuser");
         map.put("password", "fake@123");
 
-        SolrSinkConfig config = SolrSinkConfig.load(map);
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        SolrSinkConfig config = SolrSinkConfig.load(map, sinkContext);
         config.validate();
     }
 
@@ -125,7 +154,8 @@ public class SolrSinkConfigTest {
         map.put("username", "fakeuser");
         map.put("password", "fake@123");
 
-        SolrSinkConfig config = SolrSinkConfig.load(map);
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        SolrSinkConfig config = SolrSinkConfig.load(map, sinkContext);
         config.validate();
 
         SolrAbstractSink.SolrMode.valueOf(config.getSolrMode().toUpperCase());
@@ -141,7 +171,8 @@ public class SolrSinkConfigTest {
         map.put("username", "fakeuser");
         map.put("password", "fake@123");
 
-        SolrSinkConfig config = SolrSinkConfig.load(map);
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        SolrSinkConfig config = SolrSinkConfig.load(map, sinkContext);
         config.validate();
 
         String url = config.getSolrUrl();


Reply via email to