shibd commented on code in PR #20903:
URL: https://github.com/apache/pulsar/pull/20903#discussion_r1284943597


##########
pip/pip-289.md:
##########
@@ -0,0 +1,256 @@
+# PIP 289: Secure Pulsar Connector Configuration
+# Background knowledge
+
+Pulsar Sinks and Sources (a.k.a. Connectors) allow you to move data from a 
remote system into and out of a Pulsar cluster. These remote systems often 
require authentication, which requires secret management. 
+
+The current state of Pulsar Connector secret management is fragmented, is not 
documented in the "Pulsar IO" docs, and is not possible in certain cases. This 
PIP aims to address these issues through several changes.
+
+The easiest way to show the current short comings is by way of example.
+
+## Elasticsearch Example
+Here is the current way to deploy an Elasticsearch Sink without the use of 
plaintext secrets:
+
+```shell
+$ bin/pulsar-admin sinks create \
+    --tenant public \
+    --namespace default \
+    --sink-type elastic_search \
+    --name elasticsearch-test-sink \
+    --sink-config '{"elasticSearchUrl":"http://localhost:9200","indexName": 
"my_index"}' \
+    --secrets '{"username": {"MY-K8S-SECRET-USERNAME": 
"secret-name"},"password": {"MY-K8S-SECRET-PASSWORD": "password123"}}'
+    --inputs elasticsearch_test
+```
+
+When run targetting Kubernetes, the above works by mounting secrets 
`MY-K8S-SECRET-USERNAME` and `MY-K8S-SECRET-PASSWORD` into the sink pod 
container as [environment 
variables](https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java#L85-L99):
+
+```shell
+username=secret-name
+password=password123
+```
+
+Those environment variables are then 
[injected](https://github.com/apache/pulsar/blob/674655347da95305cf671f0696f113dcca88b44d/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java#L67-L78)
 into the config when it is loaded at runtime based on 
[annotations](https://github.com/apache/pulsar/blob/b7eab9469177eda2c56e36bb9871aab48a17d4ec/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java#L99-L113)
 on the `ElasticSearchConfig`.
+
+### Problem
+
+The annotation approach, which is the only way to inject secrets into 
connectors, requires that all secret fields are annotated with `sensitive = 
true` and that all secret fields are at the top level of their configuration 
class. However, the Elasticsearch config contains an `ssl` field that has 
nested secrets. See:
+
+```json
+{
+      "elasticSearchUrl": "http://localhost:9200";,
+      "indexName": "my_index",
+      "username": "username",
+      "password": "password",
+      "ssl": {
+        "enabled": true,
+        "truststorePath": "/pulsar/security/truststore.jks",
+        "truststorePassword": "truststorepass",
+        "keystorePath": "/pulsar/security/keystore.jks",
+        "keystorePassword": "keystorepass"
+      }
+}
+```
+
+Because `truststorePassword` and `keystorePassword` are not at the top level, 
we do not currently have a secure way (i.e. non-plaintext) to configure those 
settings.
+
+## RabbitMQ Example
+
+Another relevant example shows how the Pulsar code base has not consistently 
implemented secret management for connectors. For the RabbitMQ Sink, the 
sensitive fields are [annotated 
correctly](https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQAbstractConfig.java#L61-L73),
 but the configuration is not loaded via the `IOConfigUtils#loadWithSecrets` 
method, which means the only way to load rabbit secrets is as plaintext values 
in the config.
+
+## Kafka Connect Adapter Example
+
+The final relevant example is the Kafka Connect Adapter. This adapter allows 
you to run Kafka Connectors in Pulsar Connectors. Because of the recursive 
nature of these connectors, the configuration for the wrapped connector is 
stored in a map named 
[kafkaConnectorConfigProperties](https://github.com/apache/pulsar/blob/55523ac8f31fd6d54aacba326edef1f53028877e/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java#L59-L62).
 Because this field is an arbitrary map, we cannot rely on the Pulsar 
`sensitive` annotation flag to determine whether to load the secret when 
building the config class.
+
+# Motivation
+
+Increase Pulsar Function security by giving users a way to configure Pulsar 
Connectors with non-plaintext secrets.
+
+The recent 
[CVE-2023-37579](https://github.com/apache/pulsar/wiki/CVE%E2%80%902023%E2%80%9037579)
 resulted in the potential to leak connector configurations. Because we do not 
always provide a way to configure connector configuration in the connector's 
secrets map, leaking the configuration meant leaking secrets.
+
+# Goals
+
+## In Scope
+
+* Provide users with a secure way to configure official Pulsar Connectors as 
well as third party connectors.
+* Improve documentation to reflect the current state of secrets management in 
Pulsar Connectors.
+* Only sinks and sources will benefit from this change.
+* Only the `JavaInstanceRunnable` class will benefit from this change.
+
+## Out of Scope
+
+* This PIP will not prevent users from configuring secrets via insecure 
methods, such as plaintext configuration.
+* Functions are out of scope because they do not need arbitrary secret 
injection. Functions can already access secrets through the `Context#getSecret` 
method.
+* Python and Go Function Runtimes--sinks and sources are not typically written 
in these languages.
+
+# High Level Design
+
+* Add a new secrets injection mechanism which allows for arbitrary secret 
injection into the connector configuration at runtime.
+* Update existing, official connectors to properly use the already available 
secret injection mechanism.
+* Fix the documentation for the existing secrets management methods.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+In order to add a new way to inject, or interpolate, secrets, we need to add a 
new method to the `SecretsProvider` interface, which can be implemented by 
users, but is not exposed to functions/connectors. This new method will be used 
to first determine if a secret should be interpolated for a given value, and if 
so, return the interpolated value. If the value is not a secret, or the secret 
does not exist, the method will return null. The notable difference for this 
method is that it does not have a "path" to the secret. Therefore, the existing 
`secrets` map might not apply for certain use cases. In the environment 
variable scenario, this is a natural fit because the `value` can be interpreted 
as the name of the environment variable. For usage of the new configuration 
mechanism, see the [cli](#cli) section.
+
+In order to add support for the existing `sensitive` annotation, I propose 
fixing all the connectors that have explicit secrets in their configurations.
+
+Fixing the documentation will be a matter of updating the existing 
documentation to reflect the current state of the code.
+
+## Public-facing Changes
+
+### Public API
+
+#### Add new method to SecretsProvider Interface
+
+Add the following method to the `SecretsProvider` interface:
+
+```java
+interface SecretsProvider {
+    /**
+     * If the passed value is formatted as a reference to a secret, as defined 
by the implementation, return the
+     * referenced secret. If the value is not formatted as a secret reference 
or the referenced secret does not exist,
+     * return null.
+     *
+     * @param value a config value that may be formatted as a reference to a 
secret
+     * @return the materialized secret. Otherwise, null.
+     */
+    default String interpolateSecretForValue(String value) {

Review Comment:
   Sorry, I'm not clear about this method.
   
   Who will call it? Can you provide examples?



##########
pip/pip-289.md:
##########
@@ -0,0 +1,256 @@
+# PIP 289: Secure Pulsar Connector Configuration
+# Background knowledge
+
+Pulsar Sinks and Sources (a.k.a. Connectors) allow you to move data from a 
remote system into and out of a Pulsar cluster. These remote systems often 
require authentication, which requires secret management. 
+
+The current state of Pulsar Connector secret management is fragmented, is not 
documented in the "Pulsar IO" docs, and is not possible in certain cases. This 
PIP aims to address these issues through several changes.
+
+The easiest way to show the current short comings is by way of example.
+
+## Elasticsearch Example
+Here is the current way to deploy an Elasticsearch Sink without the use of 
plaintext secrets:
+
+```shell
+$ bin/pulsar-admin sinks create \
+    --tenant public \
+    --namespace default \
+    --sink-type elastic_search \
+    --name elasticsearch-test-sink \
+    --sink-config '{"elasticSearchUrl":"http://localhost:9200","indexName": 
"my_index"}' \
+    --secrets '{"username": {"MY-K8S-SECRET-USERNAME": 
"secret-name"},"password": {"MY-K8S-SECRET-PASSWORD": "password123"}}'
+    --inputs elasticsearch_test
+```
+
+When run targetting Kubernetes, the above works by mounting secrets 
`MY-K8S-SECRET-USERNAME` and `MY-K8S-SECRET-PASSWORD` into the sink pod 
container as [environment 
variables](https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java#L85-L99):
+
+```shell
+username=secret-name
+password=password123
+```
+
+Those environment variables are then 
[injected](https://github.com/apache/pulsar/blob/674655347da95305cf671f0696f113dcca88b44d/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java#L67-L78)
 into the config when it is loaded at runtime based on 
[annotations](https://github.com/apache/pulsar/blob/b7eab9469177eda2c56e36bb9871aab48a17d4ec/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java#L99-L113)
 on the `ElasticSearchConfig`.
+
+### Problem
+
+The annotation approach, which is the only way to inject secrets into 
connectors, requires that all secret fields are annotated with `sensitive = 
true` and that all secret fields are at the top level of their configuration 
class. However, the Elasticsearch config contains an `ssl` field that has 
nested secrets. See:
+
+```json
+{
+      "elasticSearchUrl": "http://localhost:9200";,
+      "indexName": "my_index",
+      "username": "username",
+      "password": "password",
+      "ssl": {
+        "enabled": true,
+        "truststorePath": "/pulsar/security/truststore.jks",
+        "truststorePassword": "truststorepass",
+        "keystorePath": "/pulsar/security/keystore.jks",
+        "keystorePassword": "keystorepass"
+      }
+}
+```
+
+Because `truststorePassword` and `keystorePassword` are not at the top level, 
we do not currently have a secure way (i.e. non-plaintext) to configure those 
settings.
+
+## RabbitMQ Example
+
+Another relevant example shows how the Pulsar code base has not consistently 
implemented secret management for connectors. For the RabbitMQ Sink, the 
sensitive fields are [annotated 
correctly](https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQAbstractConfig.java#L61-L73),
 but the configuration is not loaded via the `IOConfigUtils#loadWithSecrets` 
method, which means the only way to load rabbit secrets is as plaintext values 
in the config.
+
+## Kafka Connect Adapter Example
+
+The final relevant example is the Kafka Connect Adapter. This adapter allows 
you to run Kafka Connectors in Pulsar Connectors. Because of the recursive 
nature of these connectors, the configuration for the wrapped connector is 
stored in a map named 
[kafkaConnectorConfigProperties](https://github.com/apache/pulsar/blob/55523ac8f31fd6d54aacba326edef1f53028877e/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java#L59-L62).
 Because this field is an arbitrary map, we cannot rely on the Pulsar 
`sensitive` annotation flag to determine whether to load the secret when 
building the config class.
+
+# Motivation
+
+Increase Pulsar Function security by giving users a way to configure Pulsar 
Connectors with non-plaintext secrets.
+
+The recent 
[CVE-2023-37579](https://github.com/apache/pulsar/wiki/CVE%E2%80%902023%E2%80%9037579)
 resulted in the potential to leak connector configurations. Because we do not 
always provide a way to configure connector configuration in the connector's 
secrets map, leaking the configuration meant leaking secrets.
+
+# Goals
+
+## In Scope
+
+* Provide users with a secure way to configure official Pulsar Connectors as 
well as third party connectors.
+* Improve documentation to reflect the current state of secrets management in 
Pulsar Connectors.
+* Only sinks and sources will benefit from this change.
+* Only the `JavaInstanceRunnable` class will benefit from this change.
+
+## Out of Scope
+
+* This PIP will not prevent users from configuring secrets via insecure 
methods, such as plaintext configuration.
+* Functions are out of scope because they do not need arbitrary secret 
injection. Functions can already access secrets through the `Context#getSecret` 
method.
+* Python and Go Function Runtimes--sinks and sources are not typically written 
in these languages.
+
+# High Level Design
+
+* Add a new secrets injection mechanism which allows for arbitrary secret 
injection into the connector configuration at runtime.
+* Update existing, official connectors to properly use the already available 
secret injection mechanism.
+* Fix the documentation for the existing secrets management methods.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+In order to add a new way to inject, or interpolate, secrets, we need to add a 
new method to the `SecretsProvider` interface, which can be implemented by 
users, but is not exposed to functions/connectors. This new method will be used 
to first determine if a secret should be interpolated for a given value, and if 
so, return the interpolated value. If the value is not a secret, or the secret 
does not exist, the method will return null. The notable difference for this 
method is that it does not have a "path" to the secret. Therefore, the existing 
`secrets` map might not apply for certain use cases. In the environment 
variable scenario, this is a natural fit because the `value` can be interpreted 
as the name of the environment variable. For usage of the new configuration 
mechanism, see the [cli](#cli) section.
+
+In order to add support for the existing `sensitive` annotation, I propose 
fixing all the connectors that have explicit secrets in their configurations.
+
+Fixing the documentation will be a matter of updating the existing 
documentation to reflect the current state of the code.
+
+## Public-facing Changes
+
+### Public API
+
+#### Add new method to SecretsProvider Interface
+
+Add the following method to the `SecretsProvider` interface:
+
+```java
+interface SecretsProvider {
+    /**
+     * If the passed value is formatted as a reference to a secret, as defined 
by the implementation, return the
+     * referenced secret. If the value is not formatted as a secret reference 
or the referenced secret does not exist,
+     * return null.
+     *
+     * @param value a config value that may be formatted as a reference to a 
secret
+     * @return the materialized secret. Otherwise, null.
+     */
+    default String interpolateSecretForValue(String value) {
+        return null;
+    }
+}
+```
+
+There are only two official implementations of the `SecretProvider` interface. 
The `ClearTextSecretsProvider` and the `EnvironmentBasedSecretsProvider`. Given 
that the `ClearTextSecretsProvider` is only plaintext, it will not override the 
new method. Here is the proposed implementation for the 
`EnvironmentBasedSecretsProvider`:
+
+```java
+public class EnvironmentBasedSecretsProvider implements SecretsProvider {
+    /**
+     * Pattern to match ${secretName} in the value.
+     */
+    private static final Pattern interpolationPattern = 
Pattern.compile("\\$\\{(.+?)}");
+
+    @Override
+    public String interpolateSecretForValue(String value) {
+        Matcher m = interpolationPattern.matcher(value);
+        if (m.matches()) {
+            String secretName = m.group(1);
+            // If the secret doesn't exist, we return null and don't override 
the current value.
+            return provideSecret(secretName, null);
+        }
+        return null;
+    }
+}
+```
+
+### Binary protocol
+
+No change.
+
+### Configuration
+
+Add `interpolateSecretsIntoConfigMap` to the `WorkerConfig`. This 
configuration will allow operators to determine whether users can interpolate 
secrets into configurations. Here is the potential documentation for this flag:
+
+```java
+class WorkerConfig {
+    @FieldContext(
+            category = CATEGORY_CONNECTORS,
+            doc = "Whether to interpolate secrets into the connector's 
configuration. Only affects Sinks and Sources."
+                    + " Applies to all connectors deployed by this function 
worker."
+                    + " When true, the SecretsProvider will recursively 
interpolate secrets into the connector's "

Review Comment:
   What would we do with this configuration? Can you show you an overview of 
the implementation code?
   
   > When true, the SecretsProvider will recursively interpolate secrets into 
the connector's .
   
   Is the `SecretsProvider` implemented by the user himself?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to