shibd commented on code in PR #20903:
URL: https://github.com/apache/pulsar/pull/20903#discussion_r1284943597
##########
pip/pip-289.md:
##########
@@ -0,0 +1,256 @@
+# PIP 289: Secure Pulsar Connector Configuration
+# Background knowledge
+
+Pulsar Sinks and Sources (a.k.a. Connectors) allow you to move data from a
remote system into and out of a Pulsar cluster. These remote systems often
require authentication, which requires secret management.
+
+The current state of Pulsar Connector secret management is fragmented, is not
documented in the "Pulsar IO" docs, and is not possible in certain cases. This
PIP aims to address these issues through several changes.
+
+The easiest way to show the current short comings is by way of example.
+
+## Elasticsearch Example
+Here is the current way to deploy an Elasticsearch Sink without the use of
plaintext secrets:
+
+```shell
+$ bin/pulsar-admin sinks create \
+ --tenant public \
+ --namespace default \
+ --sink-type elastic_search \
+ --name elasticsearch-test-sink \
+ --sink-config '{"elasticSearchUrl":"http://localhost:9200","indexName":
"my_index"}' \
+ --secrets '{"username": {"MY-K8S-SECRET-USERNAME":
"secret-name"},"password": {"MY-K8S-SECRET-PASSWORD": "password123"}}'
+ --inputs elasticsearch_test
+```
+
+When run targetting Kubernetes, the above works by mounting secrets
`MY-K8S-SECRET-USERNAME` and `MY-K8S-SECRET-PASSWORD` into the sink pod
container as [environment
variables](https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java#L85-L99):
+
+```shell
+username=secret-name
+password=password123
+```
+
+Those environment variables are then
[injected](https://github.com/apache/pulsar/blob/674655347da95305cf671f0696f113dcca88b44d/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java#L67-L78)
into the config when it is loaded at runtime based on
[annotations](https://github.com/apache/pulsar/blob/b7eab9469177eda2c56e36bb9871aab48a17d4ec/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java#L99-L113)
on the `ElasticSearchConfig`.
+
+### Problem
+
+The annotation approach, which is the only way to inject secrets into
connectors, requires that all secret fields are annotated with `sensitive =
true` and that all secret fields are at the top level of their configuration
class. However, the Elasticsearch config contains an `ssl` field that has
nested secrets. See:
+
+```json
+{
+ "elasticSearchUrl": "http://localhost:9200",
+ "indexName": "my_index",
+ "username": "username",
+ "password": "password",
+ "ssl": {
+ "enabled": true,
+ "truststorePath": "/pulsar/security/truststore.jks",
+ "truststorePassword": "truststorepass",
+ "keystorePath": "/pulsar/security/keystore.jks",
+ "keystorePassword": "keystorepass"
+ }
+}
+```
+
+Because `truststorePassword` and `keystorePassword` are not at the top level,
we do not currently have a secure way (i.e. non-plaintext) to configure those
settings.
+
+## RabbitMQ Example
+
+Another relevant example shows how the Pulsar code base has not consistently
implemented secret management for connectors. For the RabbitMQ Sink, the
sensitive fields are [annotated
correctly](https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQAbstractConfig.java#L61-L73),
but the configuration is not loaded via the `IOConfigUtils#loadWithSecrets`
method, which means the only way to load rabbit secrets is as plaintext values
in the config.
+
+## Kafka Connect Adapter Example
+
+The final relevant example is the Kafka Connect Adapter. This adapter allows
you to run Kafka Connectors in Pulsar Connectors. Because of the recursive
nature of these connectors, the configuration for the wrapped connector is
stored in a map named
[kafkaConnectorConfigProperties](https://github.com/apache/pulsar/blob/55523ac8f31fd6d54aacba326edef1f53028877e/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java#L59-L62).
Because this field is an arbitrary map, we cannot rely on the Pulsar
`sensitive` annotation flag to determine whether to load the secret when
building the config class.
+
+# Motivation
+
+Increase Pulsar Function security by giving users a way to configure Pulsar
Connectors with non-plaintext secrets.
+
+The recent
[CVE-2023-37579](https://github.com/apache/pulsar/wiki/CVE%E2%80%902023%E2%80%9037579)
resulted in the potential to leak connector configurations. Because we do not
always provide a way to configure connector configuration in the connector's
secrets map, leaking the configuration meant leaking secrets.
+
+# Goals
+
+## In Scope
+
+* Provide users with a secure way to configure official Pulsar Connectors as
well as third party connectors.
+* Improve documentation to reflect the current state of secrets management in
Pulsar Connectors.
+* Only sinks and sources will benefit from this change.
+* Only the `JavaInstanceRunnable` class will benefit from this change.
+
+## Out of Scope
+
+* This PIP will not prevent users from configuring secrets via insecure
methods, such as plaintext configuration.
+* Functions are out of scope because they do not need arbitrary secret
injection. Functions can already access secrets through the `Context#getSecret`
method.
+* Python and Go Function Runtimes--sinks and sources are not typically written
in these languages.
+
+# High Level Design
+
+* Add a new secrets injection mechanism which allows for arbitrary secret
injection into the connector configuration at runtime.
+* Update existing, official connectors to properly use the already available
secret injection mechanism.
+* Fix the documentation for the existing secrets management methods.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+In order to add a new way to inject, or interpolate, secrets, we need to add a
new method to the `SecretsProvider` interface, which can be implemented by
users, but is not exposed to functions/connectors. This new method will be used
to first determine if a secret should be interpolated for a given value, and if
so, return the interpolated value. If the value is not a secret, or the secret
does not exist, the method will return null. The notable difference for this
method is that it does not have a "path" to the secret. Therefore, the existing
`secrets` map might not apply for certain use cases. In the environment
variable scenario, this is a natural fit because the `value` can be interpreted
as the name of the environment variable. For usage of the new configuration
mechanism, see the [cli](#cli) section.
+
+In order to add support for the existing `sensitive` annotation, I propose
fixing all the connectors that have explicit secrets in their configurations.
+
+Fixing the documentation will be a matter of updating the existing
documentation to reflect the current state of the code.
+
+## Public-facing Changes
+
+### Public API
+
+#### Add new method to SecretsProvider Interface
+
+Add the following method to the `SecretsProvider` interface:
+
+```java
+interface SecretsProvider {
+ /**
+ * If the passed value is formatted as a reference to a secret, as defined
by the implementation, return the
+ * referenced secret. If the value is not formatted as a secret reference
or the referenced secret does not exist,
+ * return null.
+ *
+ * @param value a config value that may be formatted as a reference to a
secret
+ * @return the materialized secret. Otherwise, null.
+ */
+ default String interpolateSecretForValue(String value) {
Review Comment:
Sorry, I'm not clear about this method.
Who will call it? Can you provide examples?
##########
pip/pip-289.md:
##########
@@ -0,0 +1,256 @@
+# PIP 289: Secure Pulsar Connector Configuration
+# Background knowledge
+
+Pulsar Sinks and Sources (a.k.a. Connectors) allow you to move data from a
remote system into and out of a Pulsar cluster. These remote systems often
require authentication, which requires secret management.
+
+The current state of Pulsar Connector secret management is fragmented, is not
documented in the "Pulsar IO" docs, and is not possible in certain cases. This
PIP aims to address these issues through several changes.
+
+The easiest way to show the current short comings is by way of example.
+
+## Elasticsearch Example
+Here is the current way to deploy an Elasticsearch Sink without the use of
plaintext secrets:
+
+```shell
+$ bin/pulsar-admin sinks create \
+ --tenant public \
+ --namespace default \
+ --sink-type elastic_search \
+ --name elasticsearch-test-sink \
+ --sink-config '{"elasticSearchUrl":"http://localhost:9200","indexName":
"my_index"}' \
+ --secrets '{"username": {"MY-K8S-SECRET-USERNAME":
"secret-name"},"password": {"MY-K8S-SECRET-PASSWORD": "password123"}}'
+ --inputs elasticsearch_test
+```
+
+When run targetting Kubernetes, the above works by mounting secrets
`MY-K8S-SECRET-USERNAME` and `MY-K8S-SECRET-PASSWORD` into the sink pod
container as [environment
variables](https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java#L85-L99):
+
+```shell
+username=secret-name
+password=password123
+```
+
+Those environment variables are then
[injected](https://github.com/apache/pulsar/blob/674655347da95305cf671f0696f113dcca88b44d/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java#L67-L78)
into the config when it is loaded at runtime based on
[annotations](https://github.com/apache/pulsar/blob/b7eab9469177eda2c56e36bb9871aab48a17d4ec/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java#L99-L113)
on the `ElasticSearchConfig`.
+
+### Problem
+
+The annotation approach, which is the only way to inject secrets into
connectors, requires that all secret fields are annotated with `sensitive =
true` and that all secret fields are at the top level of their configuration
class. However, the Elasticsearch config contains an `ssl` field that has
nested secrets. See:
+
+```json
+{
+ "elasticSearchUrl": "http://localhost:9200",
+ "indexName": "my_index",
+ "username": "username",
+ "password": "password",
+ "ssl": {
+ "enabled": true,
+ "truststorePath": "/pulsar/security/truststore.jks",
+ "truststorePassword": "truststorepass",
+ "keystorePath": "/pulsar/security/keystore.jks",
+ "keystorePassword": "keystorepass"
+ }
+}
+```
+
+Because `truststorePassword` and `keystorePassword` are not at the top level,
we do not currently have a secure way (i.e. non-plaintext) to configure those
settings.
+
+## RabbitMQ Example
+
+Another relevant example shows how the Pulsar code base has not consistently
implemented secret management for connectors. For the RabbitMQ Sink, the
sensitive fields are [annotated
correctly](https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQAbstractConfig.java#L61-L73),
but the configuration is not loaded via the `IOConfigUtils#loadWithSecrets`
method, which means the only way to load rabbit secrets is as plaintext values
in the config.
+
+## Kafka Connect Adapter Example
+
+The final relevant example is the Kafka Connect Adapter. This adapter allows
you to run Kafka Connectors in Pulsar Connectors. Because of the recursive
nature of these connectors, the configuration for the wrapped connector is
stored in a map named
[kafkaConnectorConfigProperties](https://github.com/apache/pulsar/blob/55523ac8f31fd6d54aacba326edef1f53028877e/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java#L59-L62).
Because this field is an arbitrary map, we cannot rely on the Pulsar
`sensitive` annotation flag to determine whether to load the secret when
building the config class.
+
+# Motivation
+
+Increase Pulsar Function security by giving users a way to configure Pulsar
Connectors with non-plaintext secrets.
+
+The recent
[CVE-2023-37579](https://github.com/apache/pulsar/wiki/CVE%E2%80%902023%E2%80%9037579)
resulted in the potential to leak connector configurations. Because we do not
always provide a way to configure connector configuration in the connector's
secrets map, leaking the configuration meant leaking secrets.
+
+# Goals
+
+## In Scope
+
+* Provide users with a secure way to configure official Pulsar Connectors as
well as third party connectors.
+* Improve documentation to reflect the current state of secrets management in
Pulsar Connectors.
+* Only sinks and sources will benefit from this change.
+* Only the `JavaInstanceRunnable` class will benefit from this change.
+
+## Out of Scope
+
+* This PIP will not prevent users from configuring secrets via insecure
methods, such as plaintext configuration.
+* Functions are out of scope because they do not need arbitrary secret
injection. Functions can already access secrets through the `Context#getSecret`
method.
+* Python and Go Function Runtimes--sinks and sources are not typically written
in these languages.
+
+# High Level Design
+
+* Add a new secrets injection mechanism which allows for arbitrary secret
injection into the connector configuration at runtime.
+* Update existing, official connectors to properly use the already available
secret injection mechanism.
+* Fix the documentation for the existing secrets management methods.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+In order to add a new way to inject, or interpolate, secrets, we need to add a
new method to the `SecretsProvider` interface, which can be implemented by
users, but is not exposed to functions/connectors. This new method will be used
to first determine if a secret should be interpolated for a given value, and if
so, return the interpolated value. If the value is not a secret, or the secret
does not exist, the method will return null. The notable difference for this
method is that it does not have a "path" to the secret. Therefore, the existing
`secrets` map might not apply for certain use cases. In the environment
variable scenario, this is a natural fit because the `value` can be interpreted
as the name of the environment variable. For usage of the new configuration
mechanism, see the [cli](#cli) section.
+
+In order to add support for the existing `sensitive` annotation, I propose
fixing all the connectors that have explicit secrets in their configurations.
+
+Fixing the documentation will be a matter of updating the existing
documentation to reflect the current state of the code.
+
+## Public-facing Changes
+
+### Public API
+
+#### Add new method to SecretsProvider Interface
+
+Add the following method to the `SecretsProvider` interface:
+
+```java
+interface SecretsProvider {
+ /**
+ * If the passed value is formatted as a reference to a secret, as defined
by the implementation, return the
+ * referenced secret. If the value is not formatted as a secret reference
or the referenced secret does not exist,
+ * return null.
+ *
+ * @param value a config value that may be formatted as a reference to a
secret
+ * @return the materialized secret. Otherwise, null.
+ */
+ default String interpolateSecretForValue(String value) {
+ return null;
+ }
+}
+```
+
+There are only two official implementations of the `SecretProvider` interface.
The `ClearTextSecretsProvider` and the `EnvironmentBasedSecretsProvider`. Given
that the `ClearTextSecretsProvider` is only plaintext, it will not override the
new method. Here is the proposed implementation for the
`EnvironmentBasedSecretsProvider`:
+
+```java
+public class EnvironmentBasedSecretsProvider implements SecretsProvider {
+ /**
+ * Pattern to match ${secretName} in the value.
+ */
+ private static final Pattern interpolationPattern =
Pattern.compile("\\$\\{(.+?)}");
+
+ @Override
+ public String interpolateSecretForValue(String value) {
+ Matcher m = interpolationPattern.matcher(value);
+ if (m.matches()) {
+ String secretName = m.group(1);
+ // If the secret doesn't exist, we return null and don't override
the current value.
+ return provideSecret(secretName, null);
+ }
+ return null;
+ }
+}
+```
+
+### Binary protocol
+
+No change.
+
+### Configuration
+
+Add `interpolateSecretsIntoConfigMap` to the `WorkerConfig`. This
configuration will allow operators to determine whether users can interpolate
secrets into configurations. Here is the potential documentation for this flag:
+
+```java
+class WorkerConfig {
+ @FieldContext(
+ category = CATEGORY_CONNECTORS,
+ doc = "Whether to interpolate secrets into the connector's
configuration. Only affects Sinks and Sources."
+ + " Applies to all connectors deployed by this function
worker."
+ + " When true, the SecretsProvider will recursively
interpolate secrets into the connector's "
Review Comment:
What would we do with this configuration? Can you show you an overview of
the implementation code?
> When true, the SecretsProvider will recursively interpolate secrets into
the connector's .
Is the `SecretsProvider` implemented by the user himself?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]