This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new f24490e59c NIFI-15216 Add support for AWS RDS IAM authentication in
DBCP Connection Pool (#10524)
f24490e59c is described below
commit f24490e59ca2aba9bac606ca79e2c60eafc98f8b
Author: Pierre Villard <[email protected]>
AuthorDate: Tue Nov 25 15:45:14 2025 +0100
NIFI-15216 Add support for AWS RDS IAM authentication in DBCP Connection
Pool (#10524)
- Added DatabasePasswordProvider interface to DBCP Service API
Signed-off-by: David Handermann <[email protected]>
---
.../nifi-aws-bundle/nifi-aws-processors/pom.xml | 9 +
.../aws/rds/AwsRdsIamDatabasePasswordProvider.java | 183 +++++++++++++++++++++
.../org.apache.nifi.controller.ControllerService | 1 +
.../additionalDetails.md | 92 +++++++++++
.../rds/AwsRdsIamDatabasePasswordProviderTest.java | 112 +++++++++++++
.../nifi/dbcp/AbstractDBCPConnectionPool.java | 56 ++++++-
.../nifi/dbcp/ProviderAwareBasicDataSource.java | 95 +++++++++++
.../org/apache/nifi/dbcp/utils/DBCPProperties.java | 46 ++++++
.../nifi/dbcp/AbstractDBCPConnectionPoolTest.java | 87 +++++++++-
.../nifi/dbcp/api/DatabasePasswordProvider.java | 33 ++++
.../dbcp/api/DatabasePasswordRequestContext.java | 93 +++++++++++
.../org/apache/nifi/dbcp/DBCPConnectionPool.java | 11 +-
12 files changed, 809 insertions(+), 9 deletions(-)
diff --git a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
index b6c4f6574f..3807e9ccba 100644
--- a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
+++ b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
@@ -194,6 +194,15 @@
<groupId>software.amazon.awssdk</groupId>
<artifactId>dynamodb</artifactId>
</dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>rds</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-dbcp-service-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sdk-core</artifactId>
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/rds/AwsRdsIamDatabasePasswordProvider.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/rds/AwsRdsIamDatabasePasswordProvider.java
new file mode 100644
index 0000000000..a38f952ae8
--- /dev/null
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/rds/AwsRdsIamDatabasePasswordProvider.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.rds;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.dbcp.api.DatabasePasswordProvider;
+import org.apache.nifi.dbcp.api.DatabasePasswordRequestContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import
org.apache.nifi.processors.aws.credentials.provider.AwsCredentialsProviderService;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.rds.RdsUtilities;
+import
software.amazon.awssdk.services.rds.model.GenerateAuthenticationTokenRequest;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.nifi.processors.aws.region.RegionUtil.CUSTOM_REGION;
+import static org.apache.nifi.processors.aws.region.RegionUtil.REGION;
+import static org.apache.nifi.processors.aws.region.RegionUtil.getRegion;
+import static org.apache.nifi.processors.aws.region.RegionUtil.isDynamicRegion;
+
+@Tags({"aws", "rds", "iam", "jdbc", "password"})
+@CapabilityDescription("""
+Generates Amazon RDS IAM authentication tokens each time a JDBC connection is
requested.
+The generated token replaces the database user password so that NiFi does not
need to store long-lived credentials inside DBCP services.
+""")
+public class AwsRdsIamDatabasePasswordProvider extends
AbstractControllerService implements DatabasePasswordProvider {
+
+ static final PropertyDescriptor AWS_CREDENTIALS_PROVIDER_SERVICE = new
PropertyDescriptor.Builder()
+ .name("AWS Credentials Provider Service")
+ .description("Controller Service that provides the AWS credentials
used to sign IAM authentication requests.")
+ .identifiesControllerService(AwsCredentialsProviderService.class)
+ .required(true)
+ .build();
+
+ private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS =
List.of(
+ AWS_CREDENTIALS_PROVIDER_SERVICE,
+ REGION,
+ CUSTOM_REGION
+ );
+
+ private volatile AwsCredentialsProvider awsCredentialsProvider;
+ private volatile RdsUtilities rdsUtilities;
+ private volatile Region awsRegion;
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTY_DESCRIPTORS;
+ }
+
+ @Override
+ protected Collection<ValidationResult> customValidate(final
ValidationContext validationContext) {
+ final List<ValidationResult> results = new ArrayList<>();
+ if (isDynamicRegion(validationContext)) {
+ results.add(new ValidationResult.Builder()
+ .subject(REGION.getDisplayName())
+ .valid(false)
+ .explanation("FlowFile or attribute-driven regions are not
supported")
+ .build());
+ }
+ return results;
+ }
+
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) {
+ final AwsCredentialsProviderService credentialsService =
context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE)
+ .asControllerService(AwsCredentialsProviderService.class);
+ awsCredentialsProvider =
credentialsService.getAwsCredentialsProvider();
+ awsRegion = getRegion(context);
+ rdsUtilities = createRdsUtilities(awsRegion, awsCredentialsProvider);
+ }
+
+ @OnDisabled
+ public void onDisabled() {
+ awsCredentialsProvider = null;
+ rdsUtilities = null;
+ awsRegion = null;
+ }
+
+ @Override
+ public char[] getPassword(final DatabasePasswordRequestContext
requestContext) {
+ Objects.requireNonNull(requestContext, "Database Password Request
Context required");
+
+ final ParsedEndpoint parsedEndpoint =
parseEndpoint(requestContext.getJdbcUrl());
+ final String hostname = resolveHostname(parsedEndpoint,
requestContext.getJdbcUrl());
+ final int port = resolvePort(parsedEndpoint);
+ final String username =
resolveUsername(requestContext.getDatabaseUser());
+
+ final GenerateAuthenticationTokenRequest tokenRequest =
GenerateAuthenticationTokenRequest.builder()
+ .hostname(hostname)
+ .port(port)
+ .username(username)
+ .build();
+
+ final String token;
+ try {
+ token = rdsUtilities.generateAuthenticationToken(tokenRequest);
+ } catch (final RuntimeException e) {
+ throw new ProcessException("Failed to generate RDS IAM
authentication token", e);
+ }
+
+ return token.toCharArray();
+ }
+
+ protected RdsUtilities createRdsUtilities(final Region region, final
AwsCredentialsProvider credentialsProvider) {
+ return RdsUtilities.builder()
+ .region(region)
+ .credentialsProvider(credentialsProvider)
+ .build();
+ }
+
+ private String resolveHostname(final ParsedEndpoint parsedEndpoint, final
String jdbcUrl) {
+ final String hostname = parsedEndpoint.hostname();
+ if (StringUtils.isBlank(hostname)) {
+ throw new ProcessException("Database Endpoint not configured and
JDBC URL [%s] does not contain a hostname".formatted(jdbcUrl));
+ }
+ return hostname;
+ }
+
+ private int resolvePort(final ParsedEndpoint parsedEndpoint) {
+ final Integer parsedPort = parsedEndpoint.port();
+ if (parsedPort == null) {
+ throw new IllegalStateException("Database Endpoint not configured
and JDBC URL does not contain a port number");
+ }
+ return parsedPort;
+ }
+
+ private String resolveUsername(final String contextUser) {
+ final String username = StringUtils.trimToNull(contextUser);
+ if (StringUtils.isBlank(username)) {
+ throw new ProcessException("Database Username not configured and
referencing DBCP service did not provide a username");
+ }
+ return username;
+ }
+
+ private ParsedEndpoint parseEndpoint(final String jdbcUrl) {
+ if (StringUtils.isBlank(jdbcUrl)) {
+ return ParsedEndpoint.EMPTY;
+ }
+
+ final String normalized = jdbcUrl.startsWith("jdbc:") ?
jdbcUrl.substring(5) : jdbcUrl;
+ try {
+ final URI uri = URI.create(normalized);
+ final String host = uri.getHost();
+ final int port = uri.getPort();
+ return new ParsedEndpoint(host, port >= 0 ? port : null);
+ } catch (final IllegalArgumentException e) {
+ getLogger().debug("Unable to parse JDBC URL [{}] for hostname and
port", jdbcUrl, e);
+ return ParsedEndpoint.EMPTY;
+ }
+ }
+
+ private record ParsedEndpoint(String hostname, Integer port) {
+ private static final ParsedEndpoint EMPTY = new ParsedEndpoint(null,
null);
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index ca338c3972..b2f66a8ad5 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -14,5 +14,6 @@
# limitations under the License.
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService
+org.apache.nifi.processors.aws.rds.AwsRdsIamDatabasePasswordProvider
org.apache.nifi.processors.aws.s3.encryption.StandardS3EncryptionService
org.apache.nifi.processors.aws.s3.service.S3FileResourceService
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.rds.AwsRdsIamDatabasePasswordProvider/additionalDetails.md
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.rds.AwsRdsIamDatabasePasswordProvider/additionalDetails.md
new file mode 100644
index 0000000000..1b0b7b6d72
--- /dev/null
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.rds.AwsRdsIamDatabasePasswordProvider/additionalDetails.md
@@ -0,0 +1,92 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+## Summary
+
+`AwsRdsIamDatabasePasswordProvider` generates [Amazon RDS IAM authentication
tokens](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/UsingWithRDS.IAMDBAuth.Connecting.html)
each time a JDBC connection is requested. The Controller Service implements
the NiFi `DatabasePasswordProvider` API, so it can be referenced from DBCP
controller services to avoid storing long-lived database passwords in NiFi.
+
+## Usage
+
+1. Configure an `AWSCredentialsProviderControllerService` so the password
provider can obtain AWS credentials (for example, using an IAM role or
`AssumeRoleWithWebIdentity`).
+2. Create an `AwsRdsIamDatabasePasswordProvider` and reference the credentials
provider service. Configure the AWS region. Host, port, and database user are
inherited from the JDBC URL and “Database User” properties on the referencing
DBCP service.
+3. Update the DBCP controller service to set the _Database Password Provider_
property to the new IAM provider. The static _Password_ property is ignored
when a provider is configured.
+4. Ensure your JDBC URL enables TLS and includes the SSL parameters
recommended by AWS (for example, `ssl=true&sslmode=verify-full` for PostgreSQL).
+
+Each time the DBCP service needs to create a physical JDBC connection, a fresh
IAM token is generated and supplied as the password. Existing pooled
connections remain valid until the database closes them, so standard NiFi
pooling properties such as “Maximum Connection Lifetime” still apply.
+
+## Example Setup
+
+### PostgreSQL role and privileges
+
+Connect to the `nifi` database as a superuser and run:
+
+```sql
+CREATE ROLE nifi_app LOGIN PASSWORD 'temporary';
+GRANT rds_iam TO nifi_app;
+
+GRANT CONNECT ON DATABASE nifi TO nifi_app;
+GRANT USAGE ON SCHEMA public TO nifi_app;
+GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO nifi_app;
+GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO nifi_app;
+
+ALTER DEFAULT PRIVILEGES IN SCHEMA public
+ GRANT ALL PRIVILEGES ON TABLES TO nifi_app;
+ALTER DEFAULT PRIVILEGES IN SCHEMA public
+ GRANT ALL PRIVILEGES ON SEQUENCES TO nifi_app;
+```
+
+### IAM permissions
+
+Attach a policy like the following to your IAM role (for example
`myAuroraPostgresRole`). Replace `<region>`, `<account-id>`, and
`<db-resource-id>` with your values:
+
+```json
+{
+ "Version": "2012-10-17",
+ "Statement": [
+ {
+ "Effect": "Allow",
+ "Action": "rds-db:connect",
+ "Resource":
"arn:aws:rds-db:<region>:<account-id>:dbuser:<db-resource-id>/nifi_app"
+ }
+ ]
+}
+```
+
+Ensure the role’s trust policy allows the NiFi host (EC2, EKS, etc.) to assume
it.
+
+### CLI verification
+
+Generate an IAM auth token and connect with `psql`:
+
+```bash
+TOKEN=$(aws rds generate-db-auth-token \
+ --hostname database-1-instance-1.ccfuwyso6lcz.us-east-1.rds.amazonaws.com \
+ --port 5432 \
+ --region us-east-1 \
+ --username nifi_app)
+
+PGPASSWORD="$TOKEN" psql \
+ "host=database-1-instance-1.ccfuwyso6lcz.us-east-1.rds.amazonaws.com \
+ port=5432 user=nifi_app dbname=nifi \
+ sslmode=verify-full sslrootcert=/path/to/rds-combined-ca-bundle.pem"
+```
+
+When that works, configure NiFi’s DBCP service with:
+
+- `Database Connection URL`:
`jdbc:postgresql://database-1-instance-1.ccfuwyso6lcz.us-east-1.rds.amazonaws.com:5432/nifi?ssl=true&sslmode=verify-full`
+- `Database User`: `nifi_app`
+- `Database Password Provider`: `AwsRdsIamDatabasePasswordProvider`
+
+NiFi will then mint IAM tokens automatically for each new JDBC connection.
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/rds/AwsRdsIamDatabasePasswordProviderTest.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/rds/AwsRdsIamDatabasePasswordProviderTest.java
new file mode 100644
index 0000000000..9df6a940d9
--- /dev/null
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/rds/AwsRdsIamDatabasePasswordProviderTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.rds;
+
+import org.apache.nifi.dbcp.api.DatabasePasswordProvider;
+import org.apache.nifi.dbcp.api.DatabasePasswordRequestContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.regions.Region;
+
+import static
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService.ACCESS_KEY_ID;
+import static
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService.SECRET_KEY;
+import static
org.apache.nifi.processors.aws.rds.AwsRdsIamDatabasePasswordProvider.AWS_CREDENTIALS_PROVIDER_SERVICE;
+import static org.apache.nifi.processors.aws.region.RegionUtil.REGION;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class AwsRdsIamDatabasePasswordProviderTest {
+
+ private TestRunner runner;
+ private AWSCredentialsProviderControllerService credentialsService;
+ private AwsRdsIamDatabasePasswordProvider passwordProvider;
+
+ private static final String POSTGRES_DRIVER_CLASS =
"org.postgresql.Driver";
+ private static final String DB_USER = "dbuser";
+ private static final String HOSTNAME =
"example.us-west-2.rds.amazonaws.com";
+ private static final String JDBC_PREFIX = "jdbc:postgresql://";
+ private static final String DATABASE = "dev";
+ private static final int PORT = 5432;
+
+ @BeforeEach
+ void setUp() throws Exception {
+ runner = TestRunners.newTestRunner(NoOpProcessor.class);
+
+ credentialsService = new AWSCredentialsProviderControllerService();
+ runner.addControllerService("awsCredentials", credentialsService);
+ runner.setProperty(credentialsService, ACCESS_KEY_ID, "accessKey");
+ runner.setProperty(credentialsService, SECRET_KEY, "secretKey");
+ runner.enableControllerService(credentialsService);
+
+ passwordProvider = new AwsRdsIamDatabasePasswordProvider();
+ runner.addControllerService("iamProvider", passwordProvider);
+ runner.setProperty(passwordProvider, AWS_CREDENTIALS_PROVIDER_SERVICE,
"awsCredentials");
+ runner.setProperty(passwordProvider, REGION, Region.US_WEST_2.id());
+ runner.enableControllerService(passwordProvider);
+ }
+
+ @Test
+ void testGeneratesTokenUsingRequestContext() {
+ final DatabasePasswordProvider service = getService();
+ final DatabasePasswordRequestContext context =
DatabasePasswordRequestContext.builder()
+ .jdbcUrl("%s%s:%d/%s".formatted(JDBC_PREFIX, HOSTNAME, PORT,
DATABASE))
+ .databaseUser(DB_USER)
+ .driverClassName(POSTGRES_DRIVER_CLASS)
+ .build();
+
+ final String token = new String(service.getPassword(context));
+ assertTrue(token.startsWith("%s:%d/".formatted(HOSTNAME, PORT)));
+ assertTrue(token.contains("DBUser=" + DB_USER));
+ }
+
+ @Test
+ void testGeneratesTokenWithDefaultPort() {
+ final DatabasePasswordProvider service = getService();
+ final DatabasePasswordRequestContext context =
DatabasePasswordRequestContext.builder()
+ .jdbcUrl("%s%s:%d/%s".formatted(JDBC_PREFIX, HOSTNAME, PORT,
DATABASE))
+ .databaseUser(DB_USER)
+ .driverClassName(POSTGRES_DRIVER_CLASS)
+ .build();
+
+ final String token = new String(service.getPassword(context));
+ assertTrue(token.startsWith("%s:%d/".formatted(HOSTNAME, PORT)));
+ assertTrue(token.contains("DBUser=" + DB_USER));
+ }
+
+ @Test
+ void testMissingHostnameThrowsProcessException() {
+ final DatabasePasswordProvider service = getService();
+ final DatabasePasswordRequestContext context =
DatabasePasswordRequestContext.builder()
+ .jdbcUrl("%s/%s".formatted(JDBC_PREFIX, DATABASE))
+ .databaseUser(DB_USER)
+ .driverClassName(POSTGRES_DRIVER_CLASS)
+ .build();
+
+ assertThrows(ProcessException.class, () ->
service.getPassword(context));
+ }
+
+ private DatabasePasswordProvider getService() {
+ return (DatabasePasswordProvider) runner.getProcessContext()
+ .getControllerServiceLookup()
+ .getControllerService("iamProvider");
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-extension-utils/nifi-dbcp-base/src/main/java/org/apache/nifi/dbcp/AbstractDBCPConnectionPool.java
b/nifi-extension-bundles/nifi-extension-utils/nifi-dbcp-base/src/main/java/org/apache/nifi/dbcp/AbstractDBCPConnectionPool.java
index babdb9b1de..ea1d29832b 100644
---
a/nifi-extension-bundles/nifi-extension-utils/nifi-dbcp-base/src/main/java/org/apache/nifi/dbcp/AbstractDBCPConnectionPool.java
+++
b/nifi-extension-bundles/nifi-extension-utils/nifi-dbcp-base/src/main/java/org/apache/nifi/dbcp/AbstractDBCPConnectionPool.java
@@ -35,6 +35,8 @@ import org.apache.nifi.components.resource.ResourceReferences;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.VerifiableControllerService;
+import org.apache.nifi.dbcp.api.DatabasePasswordProvider;
+import org.apache.nifi.dbcp.api.DatabasePasswordRequestContext;
import org.apache.nifi.dbcp.utils.DataSourceConfiguration;
import org.apache.nifi.dbcp.utils.DriverUtils;
import org.apache.nifi.kerberos.KerberosUserService;
@@ -49,11 +51,14 @@ import static
org.apache.nifi.components.ConfigVerificationResult.Outcome.FAILED
import static
org.apache.nifi.components.ConfigVerificationResult.Outcome.SUCCESSFUL;
import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_DRIVERNAME;
import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_DRIVER_LOCATION;
+import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_PASSWORD_PROVIDER;
import static org.apache.nifi.dbcp.utils.DBCPProperties.KERBEROS_USER_SERVICE;
+import static org.apache.nifi.dbcp.utils.DBCPProperties.PASSWORD_SOURCE;
+import static
org.apache.nifi.dbcp.utils.DBCPProperties.PasswordSource.PASSWORD_PROVIDER;
public abstract class AbstractDBCPConnectionPool extends
AbstractControllerService implements DBCPService, VerifiableControllerService {
- protected volatile BasicDataSource dataSource;
+ protected volatile ProviderAwareBasicDataSource dataSource;
protected volatile KerberosUser kerberosUser;
@Override
@@ -80,10 +85,17 @@ public abstract class AbstractDBCPConnectionPool extends
AbstractControllerServi
.build());
}
- final BasicDataSource basicDataSource = new BasicDataSource();
+ final ProviderAwareBasicDataSource basicDataSource = new
ProviderAwareBasicDataSource();
try {
final DataSourceConfiguration configuration =
getDataSourceConfiguration(context);
- configureDataSource(context, basicDataSource, configuration);
+ final Map<String, String> connectionProperties =
getConnectionProperties(context);
+ configureDataSource(context, basicDataSource, configuration,
connectionProperties);
+
+ final DatabasePasswordProvider passwordProvider =
getDatabasePasswordProvider(context);
+ final DatabasePasswordRequestContext passwordRequestContext =
passwordProvider == null ? null :
+ buildDatabasePasswordRequestContext(configuration,
connectionProperties);
+ basicDataSource.setDatabasePasswordProvider(passwordProvider,
passwordRequestContext);
+
results.add(new ConfigVerificationResult.Builder()
.verificationStepName("Configure Data Source")
.outcome(SUCCESSFUL)
@@ -154,11 +166,17 @@ public abstract class AbstractDBCPConnectionPool extends
AbstractControllerServi
*/
@OnEnabled
public void onConfigured(final ConfigurationContext context) throws
InitializationException {
- dataSource = new BasicDataSource();
+ dataSource = new ProviderAwareBasicDataSource();
kerberosUser = getKerberosUser(context);
loginKerberos(kerberosUser);
final DataSourceConfiguration configuration =
getDataSourceConfiguration(context);
- configureDataSource(context, dataSource, configuration);
+ final Map<String, String> connectionProperties =
getConnectionProperties(context);
+ configureDataSource(context, dataSource, configuration,
connectionProperties);
+
+ final DatabasePasswordProvider passwordProvider =
getDatabasePasswordProvider(context);
+ final DatabasePasswordRequestContext passwordRequestContext =
passwordProvider == null ? null :
+ buildDatabasePasswordRequestContext(configuration,
connectionProperties);
+ dataSource.setDatabasePasswordProvider(passwordProvider,
passwordRequestContext);
}
private void loginKerberos(KerberosUser kerberosUser) throws
InitializationException {
@@ -175,7 +193,8 @@ public abstract class AbstractDBCPConnectionPool extends
AbstractControllerServi
protected abstract DataSourceConfiguration
getDataSourceConfiguration(final ConfigurationContext context);
- protected void configureDataSource(final ConfigurationContext context,
final BasicDataSource basicDataSource, final DataSourceConfiguration
configuration) {
+ protected void configureDataSource(final ConfigurationContext context,
final BasicDataSource basicDataSource,
+ final DataSourceConfiguration
configuration, final Map<String, String> connectionProperties) {
final Driver driver = getDriver(configuration.getDriverName(),
configuration.getUrl());
basicDataSource.setDriver(driver);
@@ -198,7 +217,7 @@ public abstract class AbstractDBCPConnectionPool extends
AbstractControllerServi
basicDataSource.setUsername(configuration.getUserName());
basicDataSource.setPassword(configuration.getPassword());
-
getConnectionProperties(context).forEach(basicDataSource::addConnectionProperty);
+ connectionProperties.forEach(basicDataSource::addConnectionProperty);
}
protected Map<String, String> getConnectionProperties(final
ConfigurationContext context) {
@@ -218,6 +237,29 @@ public abstract class AbstractDBCPConnectionPool extends
AbstractControllerServi
.collect(Collectors.toList());
}
+ protected DatabasePasswordProvider getDatabasePasswordProvider(final
ConfigurationContext context) {
+ final PropertyValue passwordSourceProperty =
context.getProperty(PASSWORD_SOURCE);
+ final boolean databasePasswordProviderSelected =
passwordSourceProperty != null
+ &&
PASSWORD_PROVIDER.getValue().equals(passwordSourceProperty.getValue());
+
+ if (!databasePasswordProviderSelected) {
+ return null;
+ }
+
+ final PropertyValue passwordProviderProperty =
context.getProperty(DB_PASSWORD_PROVIDER);
+ return passwordProviderProperty == null ? null :
passwordProviderProperty.asControllerService(DatabasePasswordProvider.class);
+ }
+
+ protected DatabasePasswordRequestContext
buildDatabasePasswordRequestContext(final DataSourceConfiguration configuration,
+
final Map<String, String> connectionProperties) {
+ return DatabasePasswordRequestContext.builder()
+ .jdbcUrl(configuration.getUrl())
+ .driverClassName(configuration.getDriverName())
+ .databaseUser(configuration.getUserName())
+ .connectionProperties(connectionProperties)
+ .build();
+ }
+
protected KerberosUser getKerberosUser(final ConfigurationContext context)
{
final KerberosUser kerberosUser;
final KerberosUserService kerberosUserService =
context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
diff --git
a/nifi-extension-bundles/nifi-extension-utils/nifi-dbcp-base/src/main/java/org/apache/nifi/dbcp/ProviderAwareBasicDataSource.java
b/nifi-extension-bundles/nifi-extension-utils/nifi-dbcp-base/src/main/java/org/apache/nifi/dbcp/ProviderAwareBasicDataSource.java
new file mode 100644
index 0000000000..ba9c498ba4
--- /dev/null
+++
b/nifi-extension-bundles/nifi-extension-utils/nifi-dbcp-base/src/main/java/org/apache/nifi/dbcp/ProviderAwareBasicDataSource.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.dbcp;
+
+import org.apache.commons.dbcp2.BasicDataSource;
+import org.apache.commons.dbcp2.Constants;
+import org.apache.commons.dbcp2.ConnectionFactory;
+import org.apache.commons.dbcp2.DriverConnectionFactory;
+import org.apache.nifi.dbcp.api.DatabasePasswordProvider;
+import org.apache.nifi.dbcp.api.DatabasePasswordRequestContext;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Arrays;
+
+/**
+ * Extension of {@link BasicDataSource} that supports obtaining database
passwords from a {@link DatabasePasswordProvider}.
+ */
+public class ProviderAwareBasicDataSource extends BasicDataSource {
+ private volatile DatabasePasswordProvider databasePasswordProvider;
+ private volatile DatabasePasswordRequestContext passwordRequestContext;
+
+ public void setDatabasePasswordProvider(final DatabasePasswordProvider
passwordProvider,
+ final
DatabasePasswordRequestContext requestContext) {
+ if (passwordProvider != null && requestContext == null) {
+ throw new IllegalArgumentException("Database Password Request
Context required when a provider is configured");
+ }
+ this.databasePasswordProvider = passwordProvider;
+ this.passwordRequestContext = requestContext;
+ }
+
+ @Override
+ protected ConnectionFactory createConnectionFactory() throws SQLException {
+ final ConnectionFactory delegate = super.createConnectionFactory();
+
+ if (databasePasswordProvider == null) {
+ return delegate;
+ }
+
+ if (delegate instanceof DriverConnectionFactory
driverConnectionFactory) {
+ return new
PasswordRefreshingConnectionFactory(driverConnectionFactory,
databasePasswordProvider, passwordRequestContext);
+ }
+
+ throw new SQLException("Database Password Provider configured but
unsupported ConnectionFactory [%s]".formatted(
+ delegate.getClass().getName()));
+ }
+
+ private static class PasswordRefreshingConnectionFactory extends
DriverConnectionFactory {
+ private final DatabasePasswordProvider passwordProvider;
+ private final DatabasePasswordRequestContext passwordRequestContext;
+
+ PasswordRefreshingConnectionFactory(final DriverConnectionFactory
delegate,
+ final DatabasePasswordProvider
passwordProvider,
+ final
DatabasePasswordRequestContext passwordRequestContext) {
+ super(delegate.getDriver(), delegate.getConnectionString(),
delegate.getProperties());
+ this.passwordProvider = passwordProvider;
+ this.passwordRequestContext = passwordRequestContext;
+ }
+
+ @Override
+ public Connection createConnection() throws SQLException {
+ final char[] passwordCharacters;
+ try {
+ passwordCharacters =
passwordProvider.getPassword(passwordRequestContext);
+ } catch (final Exception e) {
+ throw new SQLException("Failed to obtain database password
from provider", e);
+ }
+
+ if (passwordCharacters == null || passwordCharacters.length == 0) {
+ throw new SQLException("Database Password Provider returned an
empty password");
+ }
+
+ final String password = new String(passwordCharacters);
+ Arrays.fill(passwordCharacters, '\0');
+
+ // DBCP expects the password value to be in the connection
properties when creating a new connection
+ getProperties().put(Constants.KEY_PASSWORD, password);
+ return super.createConnection();
+ }
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-extension-utils/nifi-dbcp-base/src/main/java/org/apache/nifi/dbcp/utils/DBCPProperties.java
b/nifi-extension-bundles/nifi-extension-utils/nifi-dbcp-base/src/main/java/org/apache/nifi/dbcp/utils/DBCPProperties.java
index 211dc21e7f..5d99f13226 100644
---
a/nifi-extension-bundles/nifi-extension-utils/nifi-dbcp-base/src/main/java/org/apache/nifi/dbcp/utils/DBCPProperties.java
+++
b/nifi-extension-bundles/nifi-extension-utils/nifi-dbcp-base/src/main/java/org/apache/nifi/dbcp/utils/DBCPProperties.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.dbcp.utils;
+import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.resource.ResourceCardinality;
@@ -23,6 +24,7 @@ import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.dbcp.ConnectionUrlValidator;
import org.apache.nifi.dbcp.DBCPValidator;
import org.apache.nifi.dbcp.DriverClassValidator;
+import org.apache.nifi.dbcp.api.DatabasePasswordProvider;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.kerberos.KerberosUserService;
import org.apache.nifi.processor.util.StandardValidators;
@@ -64,6 +66,14 @@ public final class DBCPProperties {
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
+ public static final PropertyDescriptor PASSWORD_SOURCE = new
PropertyDescriptor.Builder()
+ .name("Password Source")
+ .description("Specifies whether to supply the database password
directly or obtain it from a Database Password Provider.")
+ .allowableValues(PasswordSource.class)
+ .defaultValue(PasswordSource.PASSWORD)
+ .required(true)
+ .build();
+
public static final PropertyDescriptor DB_PASSWORD = new
PropertyDescriptor.Builder()
.name("Password")
.description("The password for the database user")
@@ -71,8 +81,16 @@ public final class DBCPProperties {
.sensitive(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .dependsOn(PASSWORD_SOURCE, PasswordSource.PASSWORD)
.build();
+ public static final PropertyDescriptor DB_PASSWORD_PROVIDER = new
PropertyDescriptor.Builder()
+ .name("Database Password Provider")
+ .description("Controller Service that supplies database passwords
on demand. When configured, the Password property is ignored.")
+ .required(true)
+ .identifiesControllerService(DatabasePasswordProvider.class)
+ .dependsOn(PASSWORD_SOURCE, PasswordSource.PASSWORD_PROVIDER)
+ .build();
public static final PropertyDescriptor DB_DRIVERNAME = new
PropertyDescriptor.Builder()
.name("Database Driver Class Name")
@@ -123,6 +141,34 @@ public final class DBCPProperties {
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
+ public enum PasswordSource implements DescribedValue {
+ PASSWORD("Password", "Use the configured Password property for
database authentication."),
+ PASSWORD_PROVIDER("Password Provider", "Obtain database passwords from
a configured Database Password Provider.");
+
+ private final String displayName;
+ private final String description;
+
+ PasswordSource(final String displayName, final String description) {
+ this.displayName = displayName;
+ this.description = description;
+ }
+
+ @Override
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ @Override
+ public String getValue() {
+ return name();
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+ }
+
public static final PropertyDescriptor MIN_IDLE = new
PropertyDescriptor.Builder()
.name("Minimum Idle Connections")
.description("The minimum number of connections that can remain
idle in the pool without extra ones being " +
diff --git
a/nifi-extension-bundles/nifi-extension-utils/nifi-dbcp-base/src/test/java/org/apache/nifi/dbcp/AbstractDBCPConnectionPoolTest.java
b/nifi-extension-bundles/nifi-extension-utils/nifi-dbcp-base/src/test/java/org/apache/nifi/dbcp/AbstractDBCPConnectionPoolTest.java
index 7c85b876b9..88879bc38d 100644
---
a/nifi-extension-bundles/nifi-extension-utils/nifi-dbcp-base/src/test/java/org/apache/nifi/dbcp/AbstractDBCPConnectionPoolTest.java
+++
b/nifi-extension-bundles/nifi-extension-utils/nifi-dbcp-base/src/test/java/org/apache/nifi/dbcp/AbstractDBCPConnectionPoolTest.java
@@ -19,12 +19,16 @@ package org.apache.nifi.dbcp;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.dbcp.api.DatabasePasswordProvider;
import org.apache.nifi.dbcp.utils.DBCPProperties;
import org.apache.nifi.dbcp.utils.DataSourceConfiguration;
import org.apache.nifi.kerberos.KerberosUserService;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.util.MockPropertyValue;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@@ -34,14 +38,19 @@ import java.sql.SQLException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.Properties;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.verify;
@ExtendWith(MockitoExtension.class)
class AbstractDBCPConnectionPoolTest {
@@ -68,18 +77,27 @@ class AbstractDBCPConnectionPoolTest {
@Mock
PropertyValue kerberosUserServiceProperty;
+ @Mock
+ PropertyValue passwordProviderProperty;
+
@Mock
KerberosUserService kerberosUserService;
+ @Mock
+ DatabasePasswordProvider databasePasswordProvider;
+
@Test
void testVerifySuccessful() throws SQLException {
final AbstractDBCPConnectionPool connectionPool = new
MockDBCPConnectionPool();
- when(dataSourceConfiguration.getMaxTotal()).thenReturn(MAX_TOTAL);
+ mockContextDatabaseProperties();
+ mockDataSourceConfigurationDefaults();
when(configurationContext.getProperty(eq(DBCPProperties.KERBEROS_USER_SERVICE))).thenReturn(kerberosUserServiceProperty);
when(kerberosUserServiceProperty.asControllerService(eq(KerberosUserService.class))).thenReturn(kerberosUserService);
when(driver.connect(any(), any())).thenReturn(connection);
when(connection.isValid(eq(TIMEOUT))).thenReturn(true);
+
when(configurationContext.getProperty(eq(DBCPProperties.PASSWORD_SOURCE))).thenReturn(propertyValue(DBCPProperties.PASSWORD_SOURCE,
+ DBCPProperties.PasswordSource.PASSWORD.getValue()));
final List<ConfigVerificationResult> results =
connectionPool.verify(configurationContext, componentLog,
Collections.emptyMap());
@@ -101,6 +119,69 @@ class AbstractDBCPConnectionPoolTest {
assertFalse(resultsFound.hasNext());
}
+ @Test
+ void testVerifyUsesDatabasePasswordProvider() throws SQLException {
+ final AbstractDBCPConnectionPool connectionPool = new
MockDBCPConnectionPool();
+
+ mockContextDatabaseProperties();
+ mockDataSourceConfigurationDefaults();
+
when(configurationContext.getProperty(eq(DBCPProperties.KERBEROS_USER_SERVICE))).thenReturn(kerberosUserServiceProperty);
+
when(kerberosUserServiceProperty.asControllerService(eq(KerberosUserService.class))).thenReturn(null);
+
when(configurationContext.getProperty(eq(DBCPProperties.DB_PASSWORD_PROVIDER))).thenReturn(passwordProviderProperty);
+
when(passwordProviderProperty.asControllerService(eq(DatabasePasswordProvider.class))).thenReturn(databasePasswordProvider);
+
when(configurationContext.getProperty(eq(DBCPProperties.PASSWORD_SOURCE))).thenReturn(propertyValue(DBCPProperties.PASSWORD_SOURCE,
+ DBCPProperties.PasswordSource.PASSWORD_PROVIDER.getValue()));
+ when(connection.isValid(eq(TIMEOUT))).thenReturn(true);
+
when(databasePasswordProvider.getPassword(any())).thenAnswer(invocation ->
"token".toCharArray());
+
+ final ArgumentCaptor<Properties> propertiesCaptor =
ArgumentCaptor.forClass(Properties.class);
+ when(driver.connect(any(),
propertiesCaptor.capture())).thenReturn(connection);
+
+ connectionPool.verify(configurationContext, componentLog,
Collections.emptyMap());
+
+ final List<Properties> capturedProperties =
propertiesCaptor.getAllValues();
+ assertTrue(capturedProperties.stream().anyMatch(props ->
"token".equals(props.getProperty("password"))));
+ verify(databasePasswordProvider, atLeastOnce()).getPassword(any());
+ }
+
+ @Test
+ void testGetDatabasePasswordProviderHandlesNullProperty() {
+ final MockDBCPConnectionPool connectionPool = new
MockDBCPConnectionPool();
+
+
assertNull(connectionPool.callGetDatabasePasswordProvider(configurationContext));
+ }
+
+ private void mockDataSourceConfigurationDefaults() {
+
when(dataSourceConfiguration.getUrl()).thenReturn("jdbc:postgresql://example");
+
when(dataSourceConfiguration.getDriverName()).thenReturn("org.postgresql.Driver");
+ when(dataSourceConfiguration.getUserName()).thenReturn("dbuser");
+ when(dataSourceConfiguration.getPassword()).thenReturn("secret");
+ when(dataSourceConfiguration.getValidationQuery()).thenReturn(null);
+ when(dataSourceConfiguration.getMaxWaitMillis()).thenReturn(1000L);
+ when(dataSourceConfiguration.getMaxTotal()).thenReturn(MAX_TOTAL);
+ when(dataSourceConfiguration.getMinIdle()).thenReturn(0);
+ when(dataSourceConfiguration.getMaxIdle()).thenReturn(1);
+
when(dataSourceConfiguration.getMaxConnLifetimeMillis()).thenReturn(1000L);
+
when(dataSourceConfiguration.getTimeBetweenEvictionRunsMillis()).thenReturn(1000L);
+
when(dataSourceConfiguration.getMinEvictableIdleTimeMillis()).thenReturn(1000L);
+
when(dataSourceConfiguration.getSoftMinEvictableIdleTimeMillis()).thenReturn(1000L);
+ }
+
+ private void mockContextDatabaseProperties() {
+
when(configurationContext.getProperties()).thenReturn(Collections.emptyMap());
+
lenient().when(configurationContext.getProperty(eq(DBCPProperties.DATABASE_URL))).thenReturn(propertyValue(DBCPProperties.DATABASE_URL,
"jdbc:postgresql://example"));
+
lenient().when(configurationContext.getProperty(eq(DBCPProperties.DB_DRIVERNAME))).thenReturn(propertyValue(DBCPProperties.DB_DRIVERNAME,
"org.postgresql.Driver"));
+
lenient().when(configurationContext.getProperty(eq(DBCPProperties.DB_DRIVER_LOCATION))).thenReturn(propertyValue(DBCPProperties.DB_DRIVER_LOCATION,
""));
+
lenient().when(configurationContext.getProperty(eq(DBCPProperties.DB_USER))).thenReturn(propertyValue(DBCPProperties.DB_USER,
"dbuser"));
+
lenient().when(configurationContext.getProperty(eq(DBCPProperties.DB_PASSWORD))).thenReturn(propertyValue(DBCPProperties.DB_PASSWORD,
"secret"));
+
lenient().when(configurationContext.getProperty(eq(DBCPProperties.PASSWORD_SOURCE))).thenReturn(propertyValue(DBCPProperties.PASSWORD_SOURCE,
+ DBCPProperties.PasswordSource.PASSWORD.getValue()));
+ }
+
+ private PropertyValue propertyValue(final PropertyDescriptor descriptor,
final String value) {
+ return new MockPropertyValue(value, null, descriptor,
Collections.emptyMap());
+ }
+
private class MockDBCPConnectionPool extends AbstractDBCPConnectionPool {
@Override
@@ -112,5 +193,9 @@ class AbstractDBCPConnectionPoolTest {
protected DataSourceConfiguration getDataSourceConfiguration(final
ConfigurationContext context) {
return dataSourceConfiguration;
}
+
+ private DatabasePasswordProvider callGetDatabasePasswordProvider(final
ConfigurationContext context) {
+ return getDatabasePasswordProvider(context);
+ }
}
}
diff --git
a/nifi-extension-bundles/nifi-standard-services/nifi-dbcp-service-api/src/main/java/org/apache/nifi/dbcp/api/DatabasePasswordProvider.java
b/nifi-extension-bundles/nifi-standard-services/nifi-dbcp-service-api/src/main/java/org/apache/nifi/dbcp/api/DatabasePasswordProvider.java
new file mode 100644
index 0000000000..3826b5ecc0
--- /dev/null
+++
b/nifi-extension-bundles/nifi-standard-services/nifi-dbcp-service-api/src/main/java/org/apache/nifi/dbcp/api/DatabasePasswordProvider.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.dbcp.api;
+
+import org.apache.nifi.controller.ControllerService;
+
+/**
+ * Supplies database passwords on-demand for Controller Services that
establish JDBC connections.
+ */
+public interface DatabasePasswordProvider extends ControllerService {
+
+ /**
+ * Returns a password to be used when establishing a database connection.
+ *
+ * @param requestContext context for the database password request
+ * @return password characters
+ */
+ char[] getPassword(DatabasePasswordRequestContext requestContext);
+}
diff --git
a/nifi-extension-bundles/nifi-standard-services/nifi-dbcp-service-api/src/main/java/org/apache/nifi/dbcp/api/DatabasePasswordRequestContext.java
b/nifi-extension-bundles/nifi-standard-services/nifi-dbcp-service-api/src/main/java/org/apache/nifi/dbcp/api/DatabasePasswordRequestContext.java
new file mode 100644
index 0000000000..747576ccc8
--- /dev/null
+++
b/nifi-extension-bundles/nifi-standard-services/nifi-dbcp-service-api/src/main/java/org/apache/nifi/dbcp/api/DatabasePasswordRequestContext.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.dbcp.api;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Provides contextual information to {@link DatabasePasswordProvider}
implementations when a password is requested.
+ */
+public class DatabasePasswordRequestContext {
+ private final String jdbcUrl;
+ private final String databaseUser;
+ private final String driverClassName;
+ private final Map<String, String> connectionProperties;
+
+ private DatabasePasswordRequestContext(final Builder builder) {
+ this.jdbcUrl = builder.jdbcUrl;
+ this.databaseUser = builder.databaseUser;
+ this.driverClassName = builder.driverClassName;
+ this.connectionProperties = builder.connectionProperties == null
+ ? Collections.emptyMap()
+ : Collections.unmodifiableMap(builder.connectionProperties);
+ }
+
+ public String getJdbcUrl() {
+ return jdbcUrl;
+ }
+
+ public String getDatabaseUser() {
+ return databaseUser;
+ }
+
+ public String getDriverClassName() {
+ return driverClassName;
+ }
+
+ public Map<String, String> getConnectionProperties() {
+ return connectionProperties;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private String jdbcUrl;
+ private String databaseUser;
+ private String driverClassName;
+ private Map<String, String> connectionProperties;
+
+ public Builder jdbcUrl(final String jdbcUrl) {
+ this.jdbcUrl = jdbcUrl;
+ return this;
+ }
+
+ public Builder databaseUser(final String databaseUser) {
+ this.databaseUser = databaseUser;
+ return this;
+ }
+
+ public Builder driverClassName(final String driverClassName) {
+ this.driverClassName = driverClassName;
+ return this;
+ }
+
+ public Builder connectionProperties(final Map<String, String>
connectionProperties) {
+ this.connectionProperties = connectionProperties;
+ return this;
+ }
+
+ public DatabasePasswordRequestContext build() {
+ Objects.requireNonNull(jdbcUrl, "JDBC URL required");
+ Objects.requireNonNull(driverClassName, "Driver Class Name
required");
+ return new DatabasePasswordRequestContext(this);
+ }
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java
b/nifi-extension-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java
index dd27c90ba2..ec967bd230 100644
---
a/nifi-extension-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java
+++
b/nifi-extension-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java
@@ -55,6 +55,7 @@ import static
org.apache.nifi.dbcp.utils.DBCPProperties.DATABASE_URL;
import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_DRIVERNAME;
import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_DRIVER_LOCATION;
import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_PASSWORD;
+import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_PASSWORD_PROVIDER;
import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_USER;
import static org.apache.nifi.dbcp.utils.DBCPProperties.EVICTION_RUN_PERIOD;
import static org.apache.nifi.dbcp.utils.DBCPProperties.KERBEROS_USER_SERVICE;
@@ -64,6 +65,9 @@ import static
org.apache.nifi.dbcp.utils.DBCPProperties.MAX_TOTAL_CONNECTIONS;
import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_WAIT_TIME;
import static
org.apache.nifi.dbcp.utils.DBCPProperties.MIN_EVICTABLE_IDLE_TIME;
import static org.apache.nifi.dbcp.utils.DBCPProperties.MIN_IDLE;
+import static org.apache.nifi.dbcp.utils.DBCPProperties.PASSWORD_SOURCE;
+import static
org.apache.nifi.dbcp.utils.DBCPProperties.PasswordSource.PASSWORD;
+import static
org.apache.nifi.dbcp.utils.DBCPProperties.PasswordSource.PASSWORD_PROVIDER;
import static
org.apache.nifi.dbcp.utils.DBCPProperties.SOFT_MIN_EVICTABLE_IDLE_TIME;
import static org.apache.nifi.dbcp.utils.DBCPProperties.VALIDATION_QUERY;
import static
org.apache.nifi.dbcp.utils.DBCPProperties.extractMillisWithInfinite;
@@ -105,7 +109,9 @@ public class DBCPConnectionPool extends
AbstractDBCPConnectionPool implements DB
DB_DRIVER_LOCATION,
KERBEROS_USER_SERVICE,
DB_USER,
+ PASSWORD_SOURCE,
DB_PASSWORD,
+ DB_PASSWORD_PROVIDER,
MAX_WAIT_TIME,
MAX_TOTAL_CONNECTIONS,
VALIDATION_QUERY,
@@ -149,7 +155,10 @@ public class DBCPConnectionPool extends
AbstractDBCPConnectionPool implements DB
final String url =
context.getProperty(DATABASE_URL).evaluateAttributeExpressions().getValue();
final String driverName =
context.getProperty(DB_DRIVERNAME).evaluateAttributeExpressions().getValue();
final String user =
context.getProperty(DB_USER).evaluateAttributeExpressions().getValue();
- final String password =
context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue();
+ final PropertyValue passwordSourceProperty =
context.getProperty(PASSWORD_SOURCE);
+ final String passwordSource = passwordSourceProperty == null ?
PASSWORD.getValue() : passwordSourceProperty.getValue();
+ final boolean passwordProviderSelected =
PASSWORD_PROVIDER.getValue().equals(passwordSource);
+ final String password = passwordProviderSelected ? null :
context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue();
final Integer maxTotal =
context.getProperty(MAX_TOTAL_CONNECTIONS).evaluateAttributeExpressions().asInteger();
final String validationQuery =
context.getProperty(VALIDATION_QUERY).evaluateAttributeExpressions().getValue();
final Long maxWaitMillis =
extractMillisWithInfinite(context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions());