This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push: new 21e50ca36b2 Debezium sources: Support loading config from secrets (#19205) 21e50ca36b2 is described below commit 21e50ca36b2aadf6176214973b21463ed15808a4 Author: Alexander Preuß <alexander.pre...@streamnative.io> AuthorDate: Sun Jan 15 07:24:25 2023 +0100 Debezium sources: Support loading config from secrets (#19205) --- .../org/apache/pulsar/io/debezium/DebeziumSource.java | 16 ++++++++++++++++ .../io/debezium/mongodb/DebeziumMongoDbSource.java | 8 ++++++++ 2 files changed, 24 insertions(+) diff --git a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java index 62c3984ef60..9e731fe48bb 100644 --- a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java +++ b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java @@ -20,12 +20,14 @@ package org.apache.pulsar.io.debezium; import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig; import java.util.Map; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.io.core.SourceContext; import org.apache.pulsar.io.kafka.connect.KafkaConnectSource; import org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig; +@Slf4j public abstract class DebeziumSource extends KafkaConnectSource { private static final String DEFAULT_CONVERTER = "org.apache.kafka.connect.json.JsonConverter"; private static final String DEFAULT_HISTORY = "org.apache.pulsar.io.debezium.PulsarDatabaseHistory"; @@ -60,11 +62,25 @@ public abstract class DebeziumSource extends KafkaConnectSource { + (StringUtils.isEmpty(namespace) ? TopicName.DEFAULT_NAMESPACE : namespace); } + public static void tryLoadingConfigSecret(String secretName, Map<String, Object> config, SourceContext context) { + try { + String secret = context.getSecret(secretName); + if (secret != null) { + config.put(secretName, secret); + log.info("Config key {} set from secret.", secretName); + } + } catch (Exception e) { + log.warn("Failed to read secret {}.", secretName, e); + } + } + public abstract void setDbConnectorTask(Map<String, Object> config) throws Exception; @Override public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception { setDbConnectorTask(config); + tryLoadingConfigSecret("database.user", config, sourceContext); + tryLoadingConfigSecret("database.password", config, sourceContext); // key.converter setConfigIfNull(config, PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG, DEFAULT_CONVERTER); diff --git a/pulsar-io/debezium/mongodb/src/main/java/org/apache/pulsar/io/debezium/mongodb/DebeziumMongoDbSource.java b/pulsar-io/debezium/mongodb/src/main/java/org/apache/pulsar/io/debezium/mongodb/DebeziumMongoDbSource.java index 8017ffef459..19c56da54b6 100644 --- a/pulsar-io/debezium/mongodb/src/main/java/org/apache/pulsar/io/debezium/mongodb/DebeziumMongoDbSource.java +++ b/pulsar-io/debezium/mongodb/src/main/java/org/apache/pulsar/io/debezium/mongodb/DebeziumMongoDbSource.java @@ -20,6 +20,7 @@ package org.apache.pulsar.io.debezium.mongodb; import java.util.Map; import org.apache.kafka.connect.runtime.TaskConfig; +import org.apache.pulsar.io.core.SourceContext; import org.apache.pulsar.io.debezium.DebeziumSource; /** @@ -32,4 +33,11 @@ public class DebeziumMongoDbSource extends DebeziumSource { public void setDbConnectorTask(Map<String, Object> config) throws Exception { throwExceptionIfConfigNotMatch(config, TaskConfig.TASK_CLASS_CONFIG, DEFAULT_TASK); } + + @Override + public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception { + tryLoadingConfigSecret("mongodb.user", config, sourceContext); + tryLoadingConfigSecret("mongodb.password", config, sourceContext); + super.open(config, sourceContext); + } }