This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 21e50ca36b2 Debezium sources: Support loading config from secrets 
(#19205)
21e50ca36b2 is described below

commit 21e50ca36b2aadf6176214973b21463ed15808a4
Author: Alexander Preuß <alexander.pre...@streamnative.io>
AuthorDate: Sun Jan 15 07:24:25 2023 +0100

    Debezium sources: Support loading config from secrets (#19205)
---
 .../org/apache/pulsar/io/debezium/DebeziumSource.java    | 16 ++++++++++++++++
 .../io/debezium/mongodb/DebeziumMongoDbSource.java       |  8 ++++++++
 2 files changed, 24 insertions(+)

diff --git 
a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
 
b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
index 62c3984ef60..9e731fe48bb 100644
--- 
a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
+++ 
b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
@@ -20,12 +20,14 @@ package org.apache.pulsar.io.debezium;
 
 import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig;
 import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.io.core.SourceContext;
 import org.apache.pulsar.io.kafka.connect.KafkaConnectSource;
 import org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig;
 
+@Slf4j
 public abstract class DebeziumSource extends KafkaConnectSource {
     private static final String DEFAULT_CONVERTER = 
"org.apache.kafka.connect.json.JsonConverter";
     private static final String DEFAULT_HISTORY = 
"org.apache.pulsar.io.debezium.PulsarDatabaseHistory";
@@ -60,11 +62,25 @@ public abstract class DebeziumSource extends 
KafkaConnectSource {
                 + (StringUtils.isEmpty(namespace) ? 
TopicName.DEFAULT_NAMESPACE : namespace);
     }
 
+    public static void tryLoadingConfigSecret(String secretName, Map<String, 
Object> config, SourceContext context) {
+        try {
+            String secret = context.getSecret(secretName);
+            if (secret != null) {
+                config.put(secretName, secret);
+                log.info("Config key {} set from secret.", secretName);
+            }
+        } catch (Exception e) {
+            log.warn("Failed to read secret {}.", secretName, e);
+        }
+    }
+
     public abstract void setDbConnectorTask(Map<String, Object> config) throws 
Exception;
 
     @Override
     public void open(Map<String, Object> config, SourceContext sourceContext) 
throws Exception {
         setDbConnectorTask(config);
+        tryLoadingConfigSecret("database.user", config, sourceContext);
+        tryLoadingConfigSecret("database.password", config, sourceContext);
 
         // key.converter
         setConfigIfNull(config, 
PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG, DEFAULT_CONVERTER);
diff --git 
a/pulsar-io/debezium/mongodb/src/main/java/org/apache/pulsar/io/debezium/mongodb/DebeziumMongoDbSource.java
 
b/pulsar-io/debezium/mongodb/src/main/java/org/apache/pulsar/io/debezium/mongodb/DebeziumMongoDbSource.java
index 8017ffef459..19c56da54b6 100644
--- 
a/pulsar-io/debezium/mongodb/src/main/java/org/apache/pulsar/io/debezium/mongodb/DebeziumMongoDbSource.java
+++ 
b/pulsar-io/debezium/mongodb/src/main/java/org/apache/pulsar/io/debezium/mongodb/DebeziumMongoDbSource.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.io.debezium.mongodb;
 
 import java.util.Map;
 import org.apache.kafka.connect.runtime.TaskConfig;
+import org.apache.pulsar.io.core.SourceContext;
 import org.apache.pulsar.io.debezium.DebeziumSource;
 
 /**
@@ -32,4 +33,11 @@ public class DebeziumMongoDbSource extends DebeziumSource {
     public void setDbConnectorTask(Map<String, Object> config) throws 
Exception {
         throwExceptionIfConfigNotMatch(config, TaskConfig.TASK_CLASS_CONFIG, 
DEFAULT_TASK);
     }
+
+    @Override
+    public void open(Map<String, Object> config, SourceContext sourceContext) 
throws Exception {
+        tryLoadingConfigSecret("mongodb.user", config, sourceContext);
+        tryLoadingConfigSecret("mongodb.password", config, sourceContext);
+        super.open(config, sourceContext);
+    }
 }

Reply via email to