This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 094222260c NIFI-9402: Adding DatabaseParameterProvider
094222260c is described below
commit 094222260c7cb7dd839520abb2dcf3bcec316ea5
Author: Joe Gresock <[email protected]>
AuthorDate: Sat Sep 10 08:40:54 2022 -0400
NIFI-9402: Adding DatabaseParameterProvider
Adding provided scope to api dependency in
nifi-standard-parameter-providers module
Adding additional documentation, other minor code cleanup
Correcting error handling in StandardParameterProviderNode, updating
additional details for DatabaseParameterProvider
Correcting null columm value handling
NIFI-9402: Fixed Checkstyle violation
Signed-off-by: Matthew Burgess <[email protected]>
This closes #6391
---
.../parameter/StandardParameterProviderNode.java | 4 +-
.../nifi-standard-parameter-providers/pom.xml | 11 +
.../nifi/parameter/DatabaseParameterProvider.java | 265 ++++++++++++++++++
.../org.apache.nifi.parameter.ParameterProvider | 1 +
.../additionalDetails.html | 214 +++++++++++++++
.../parameter/TestDatabaseParameterProvider.java | 303 +++++++++++++++++++++
6 files changed, 796 insertions(+), 2 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/parameter/StandardParameterProviderNode.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/parameter/StandardParameterProviderNode.java
index a1651e6376..a35fbea3da 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/parameter/StandardParameterProviderNode.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/parameter/StandardParameterProviderNode.java
@@ -277,8 +277,8 @@ public class StandardParameterProviderNode extends
AbstractComponentNode impleme
List<ParameterGroup> fetchedParameterGroups;
try (final NarCloseable narCloseable =
NarCloseable.withComponentNarLoader(getExtensionManager(),
parameterProvider.getClass(), parameterProvider.getIdentifier())) {
fetchedParameterGroups =
parameterProvider.fetchParameters(configurationContext);
- } catch (final IOException e) {
- throw new RuntimeException(String.format("Error fetching
parameters for %s", this), e);
+ } catch (final IOException | RuntimeException e) {
+ throw new IllegalStateException(String.format("Error fetching
parameters for %s: %s", this, e.getMessage()), e);
}
if (fetchedParameterGroups == null ||
fetchedParameterGroups.isEmpty()) {
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/pom.xml
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/pom.xml
index cc5d40a7da..16111bda50 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/pom.xml
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/pom.xml
@@ -30,6 +30,17 @@
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-dbcp-service-api</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-standard-processors</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/src/main/java/org/apache/nifi/parameter/DatabaseParameterProvider.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/src/main/java/org/apache/nifi/parameter/DatabaseParameterProvider.java
new file mode 100644
index 0000000000..0bb924350a
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/src/main/java/org/apache/nifi/parameter/DatabaseParameterProvider.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.parameter;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.db.DatabaseAdapter;
+import org.apache.nifi.util.StringUtils;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.stream.Collectors;
+
+@Tags({"database", "dbcp", "sql"})
+@CapabilityDescription("Fetches parameters from database tables")
+
+public class DatabaseParameterProvider extends AbstractParameterProvider
implements VerifiableParameterProvider {
+
+ protected final static Map<String, DatabaseAdapter> dbAdapters = new
HashMap<>();
+
+ public static final PropertyDescriptor DB_TYPE;
+
+ static {
+ // Load the DatabaseAdapters
+ ArrayList<AllowableValue> dbAdapterValues = new ArrayList<>();
+ ServiceLoader<DatabaseAdapter> dbAdapterLoader =
ServiceLoader.load(DatabaseAdapter.class);
+ dbAdapterLoader.forEach(it -> {
+ dbAdapters.put(it.getName(), it);
+ dbAdapterValues.add(new AllowableValue(it.getName(), it.getName(),
it.getDescription()));
+ });
+
+ DB_TYPE = new PropertyDescriptor.Builder()
+ .name("db-type")
+ .displayName("Database Type")
+ .description("The type/flavor of database, used for generating
database-specific code. In many cases the Generic type "
+ + "should suffice, but some databases (such as Oracle)
require custom SQL clauses. ")
+ .allowableValues(dbAdapterValues.toArray(new
AllowableValue[dbAdapterValues.size()]))
+ .defaultValue("Generic")
+ .required(true)
+ .build();
+ }
+
+ static AllowableValue GROUPING_BY_COLUMN = new
AllowableValue("grouping-by-column", "Column",
+ "A single table is partitioned by the 'Parameter Group Name
Column'. All rows with the same value in this column will " +
+ "map to a group of the same name.");
+ static AllowableValue GROUPING_BY_TABLE_NAME = new
AllowableValue("grouping-by-table-name", "Table Name",
+ "An entire table maps to a Parameter Group. The group name will
be the table name.");
+
+ public static final PropertyDescriptor DBCP_SERVICE = new
PropertyDescriptor.Builder()
+ .name("dbcp-service")
+ .displayName("Database Connection Pooling Service")
+ .description("The Controller Service that is used to obtain a
connection to the database.")
+ .required(true)
+ .identifiesControllerService(DBCPService.class)
+ .build();
+
+ public static final PropertyDescriptor PARAMETER_GROUPING_STRATEGY = new
PropertyDescriptor.Builder()
+ .name("parameter-grouping-strategy")
+ .displayName("Parameter Grouping Strategy")
+ .description("The strategy used to group parameters.")
+ .required(true)
+ .allowableValues(GROUPING_BY_COLUMN, GROUPING_BY_TABLE_NAME)
+ .defaultValue(GROUPING_BY_COLUMN.getValue())
+ .build();
+
+ public static final PropertyDescriptor TABLE_NAMES = new
PropertyDescriptor.Builder()
+ .name("table-names")
+ .displayName("Table Names")
+ .description("A comma-separated list of names of the database
tables containing the parameters.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .required(true)
+ .dependsOn(PARAMETER_GROUPING_STRATEGY, GROUPING_BY_TABLE_NAME)
+ .build();
+
+ public static final PropertyDescriptor TABLE_NAME = new
PropertyDescriptor.Builder()
+ .name("table-name")
+ .displayName("Table Name")
+ .description("The name of the database table containing the
parameters.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .required(true)
+ .dependsOn(PARAMETER_GROUPING_STRATEGY, GROUPING_BY_COLUMN)
+ .build();
+
+ public static final PropertyDescriptor PARAMETER_NAME_COLUMN = new
PropertyDescriptor.Builder()
+ .name("parameter-name-column")
+ .displayName("Parameter Name Column")
+ .description("The name of a column containing the parameter name.")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor PARAMETER_VALUE_COLUMN = new
PropertyDescriptor.Builder()
+ .name("parameter-value-column")
+ .displayName("Parameter Value Column")
+ .description("The name of a column containing the parameter
value.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor PARAMETER_GROUP_NAME_COLUMN = new
PropertyDescriptor.Builder()
+ .name("parameter-group-name-column")
+ .displayName("Parameter Group Name Column")
+ .description("The name of a column containing the name of the
parameter group into which the parameter should be mapped.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .required(true)
+ .dependsOn(PARAMETER_GROUPING_STRATEGY, GROUPING_BY_COLUMN)
+ .build();
+
+ public static final PropertyDescriptor SQL_WHERE_CLAUSE = new
PropertyDescriptor.Builder()
+ .name("sql-where-clause")
+ .displayName("SQL WHERE clause")
+ .description("A optional SQL query 'WHERE' clause by which to
filter all results. The 'WHERE' keyword should not be included.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ private List<PropertyDescriptor> properties;
+
+ @Override
+ protected void init(final ParameterProviderInitializationContext config) {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(DB_TYPE);
+ properties.add(DBCP_SERVICE);
+ properties.add(PARAMETER_GROUPING_STRATEGY);
+ properties.add(TABLE_NAME);
+ properties.add(TABLE_NAMES);
+ properties.add(PARAMETER_NAME_COLUMN);
+ properties.add(PARAMETER_VALUE_COLUMN);
+ properties.add(PARAMETER_GROUP_NAME_COLUMN);
+ properties.add(SQL_WHERE_CLAUSE);
+
+ this.properties = Collections.unmodifiableList(properties);
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ public List<ParameterGroup> fetchParameters(final ConfigurationContext
context) {
+ final boolean groupByColumn =
GROUPING_BY_COLUMN.getValue().equals(context.getProperty(PARAMETER_GROUPING_STRATEGY).getValue());
+
+ final DBCPService dbcpService =
context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
+ final String whereClause =
context.getProperty(SQL_WHERE_CLAUSE).getValue();
+ final String parameterNameColumn =
context.getProperty(PARAMETER_NAME_COLUMN).getValue();
+ final String parameterValueColumn =
context.getProperty(PARAMETER_VALUE_COLUMN).getValue();
+ final String parameterGroupNameColumn =
context.getProperty(PARAMETER_GROUP_NAME_COLUMN).getValue();
+
+ final List<String> tableNames = groupByColumn
+ ?
Collections.singletonList(context.getProperty(TABLE_NAME).getValue())
+ :
Arrays.stream(context.getProperty(TABLE_NAMES).getValue().split(",")).map(String::trim).collect(Collectors.toList());
+
+ final Map<String, List<Parameter>> parameterMap = new HashMap<>();
+ for (final String tableName : tableNames) {
+ try (final Connection con =
dbcpService.getConnection(Collections.emptyMap()); final Statement st =
con.createStatement()) {
+ final List<String> columns = new ArrayList<>();
+ columns.add(parameterNameColumn);
+ columns.add(parameterValueColumn);
+ if (groupByColumn) {
+ columns.add(parameterGroupNameColumn);
+ }
+ final String query = getQuery(context, tableName, columns,
whereClause);
+
+ getLogger().info("Fetching parameters with query: " + query);
+ try (final ResultSet rs = st.executeQuery(query)) {
+ while (rs.next()) {
+ final String parameterName =
rs.getString(parameterNameColumn);
+ final String parameterValue =
rs.getString(parameterValueColumn);
+
+ validateValueNotNull(parameterName,
parameterNameColumn);
+ validateValueNotNull(parameterValue,
parameterValueColumn);
+ final String parameterGroupName;
+ if (groupByColumn) {
+ parameterGroupName = parameterGroupNameColumn ==
null ? null : rs.getString(parameterGroupNameColumn);
+ validateValueNotNull(parameterGroupName,
parameterGroupNameColumn);
+ } else {
+ parameterGroupName = tableName;
+ }
+
+ final ParameterDescriptor parameterDescriptor = new
ParameterDescriptor.Builder()
+ .name(parameterName)
+ .build();
+ final Parameter parameter = new
Parameter(parameterDescriptor, parameterValue);
+
+ parameterMap.computeIfAbsent(parameterGroupName, key
-> new ArrayList<>()).add(parameter);
+ }
+ }
+ } catch (final SQLException e) {
+ getLogger().error("Encountered a database error when fetching
parameters: {}", e.getMessage(), e);
+ throw new RuntimeException("Encountered a database error when
fetching parameters: " + e.getMessage(), e);
+ }
+ }
+
+ return parameterMap.entrySet().stream()
+ .map(entry -> new ParameterGroup(entry.getKey(),
entry.getValue()))
+ .collect(Collectors.toList());
+ }
+
+ private void validateValueNotNull(final String value, final String
columnName) {
+ if (value == null) {
+ throw new IllegalStateException(String.format("Expected %s column
to be non-null", columnName));
+ }
+ }
+
+ String getQuery(final ConfigurationContext context, final String
tableName, final List<String> columns, final String whereClause) {
+ final DatabaseAdapter dbAdapter =
dbAdapters.get(context.getProperty(DB_TYPE).getValue());
+ return dbAdapter.getSelectStatement(tableName,
StringUtils.join(columns, ", "), whereClause, null, null, null);
+ }
+
+ @Override
+ public List<ConfigVerificationResult> verify(final ConfigurationContext
context, final ComponentLog verificationLogger) {
+ final List<ConfigVerificationResult> results = new ArrayList<>();
+ try {
+ final List<ParameterGroup> parameterGroups =
fetchParameters(context);
+ final long parameterCount = parameterGroups.stream()
+ .flatMap(group -> group.getParameters().stream())
+ .count();
+ results.add(new ConfigVerificationResult.Builder()
+ .outcome(ConfigVerificationResult.Outcome.SUCCESSFUL)
+ .verificationStepName("Fetch Parameters")
+ .explanation(String.format("Successfully fetched %s
Parameter Groups containing %s Parameters matching the filter.",
parameterGroups.size(),
+ parameterCount))
+ .build());
+ } catch (final Exception e) {
+ verificationLogger.error("Failed to fetch Parameter Groups", e);
+ results.add(new ConfigVerificationResult.Builder()
+ .outcome(ConfigVerificationResult.Outcome.FAILED)
+ .verificationStepName("Fetch Parameters")
+ .explanation(String.format("Failed to parameters: " +
e.getMessage()))
+ .build());
+ }
+
+ return results;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/src/main/resources/META-INF/services/org.apache.nifi.parameter.ParameterProvider
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/src/main/resources/META-INF/services/org.apache.nifi.parameter.ParameterProvider
index 77ce897ba2..3b17ef9236 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/src/main/resources/META-INF/services/org.apache.nifi.parameter.ParameterProvider
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/src/main/resources/META-INF/services/org.apache.nifi.parameter.ParameterProvider
@@ -14,3 +14,4 @@
# limitations under the License.
org.apache.nifi.parameter.EnvironmentVariableParameterProvider
org.apache.nifi.parameter.FileParameterProvider
+org.apache.nifi.parameter.DatabaseParameterProvider
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/src/main/resources/docs/org.apache.nifi.parameter.DatabaseParameterProvider/additionalDetails.html
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/src/main/resources/docs/org.apache.nifi.parameter.DatabaseParameterProvider/additionalDetails.html
new file mode 100644
index 0000000000..b349f8aa7e
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/src/main/resources/docs/org.apache.nifi.parameter.DatabaseParameterProvider/additionalDetails.html
@@ -0,0 +1,214 @@
+<!DOCTYPE html>
+<html lang="en" xmlns="http://www.w3.org/1999/html">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<head>
+ <meta charset="utf-8"/>
+ <title>DatabaseParameterProvider</title>
+ <link rel="stylesheet" href="../../../../../css/component-usage.css"
type="text/css"/>
+</head>
+<body>
+
+<h1>Providing Parameters from a Database</h1>
+
+<p>
+ The DatabaseParameterProvider at its core maps database rows to
Parameters, specified by a
+ Parameter Name Column and Parameter Value Column. The Parameter Group
name must also be accounted for, and may
+ be specified in different ways using the Parameter Grouping Strategy.
+</p>
+
+<p>
+ Before discussing the actual configuration, note that in some databases,
the words 'PARAMETER', 'PARAMETERS', 'GROUP',
+ and even 'VALUE' are reserved words. If you choose a column name that is
a reserved word in the database you are using,
+ make sure to quote it per the database documentation.
+</p>
+
+<p>
+ Also note that you should use the preferred table name and column name
case for your database. For example, Postgres
+ prefers lowercase table and column names, while Oracle prefers capitalized
ones. Choosing the appropriate case can
+ avoid unexpected issues in configuring your DatabaseParameterProvider.
+</p>
+
+<p>
+ The default configuration uses a fully column-based approach, with the
Parameter Group Name
+ also specified by columns in the same table. An example of a table using
this configuration would be:
+</p>
+<table>
+ <thead>
+ <tr>
+ <th colspan="4" style="text-align: center">PARAMETER_CONTEXTS</th>
+ </tr>
+ <tr>
+
<th>PARAMETER_NAME</th><th>PARAMETER_VALUE</th><th>PARAMETER_GROUP</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td>param.foo</td><td>value-foo</td><td>group_1</td>
+ </tr>
+ <tr>
+ <td>param.bar</td><td>value-bar</td><td>group_1</td>
+ </tr>
+ <tr>
+ <td>param.one</td><td>value-one</td><td>group_2</td>
+ </tr>
+ <tr>
+ <td>param.two</td><td>value-two</td><td>group_2</td>
+ </tr>
+ </tbody>
+ <caption>Table 1: Database table example with Grouping Strategy =
Column</caption>
+</table>
+
+<p>
+ In order to use the data from this table, set the following Properties:
+</p>
+
+<ul>
+ <li><b>Parameter Grouping Strategy</b> - Column</li>
+ <li><b>Table Name</b> - PARAMETER_CONTEXTS</li>
+ <li><b>Parameter Name Column</b> - PARAMETER_NAME</li>
+ <li><b>Parameter Value Column</b> - PARAMETER_VALUE</li>
+ <li><b>Parameter Group Name Column</b> - PARAMETER_GROUP</li>
+</ul>
+
+<p>
+ Once fetched, the parameters in this example will look like this:
+</p>
+
+<p>
+ Parameter Group <b>group_1</b>:
+ <ul>
+ <li>param.foo - value-foo</li>
+ <li>param.bar - value-bar</li>
+ </ul>
+</p>
+
+<p>
+ Parameter Group <b>group_2</b>:
+ <ul>
+ <li>param.one - value-one</li>
+ <li>param.two - value-two</li>
+ </ul>
+</p>
+
+<h3>Grouping Strategy</h3>
+
+<p>
+ The default Grouping Strategy is by Column, which allows you to specify
the parameter Group name explicitly in the Parameter Group Column.
+ Note that if the value in this column is NULL, an exception will be thrown.
+</p>
+
+<p>
+ The other Grouping Strategy is by Table, which maps each table to a
Parameter Group and sets the Parameter Group Name to the table name.
+ In this Grouping Strategy, the Parameter Group Column is not used. An
example configuration using this strategy would be:
+</p>
+
+<ul>
+ <li><b>Parameter Grouping Strategy</b> - Table</li>
+ <li><b>Table Names</b> - KAFKA, S3</li>
+ <li><b>Parameter Name Column</b> - PARAMETER_NAME</li>
+ <li><b>Parameter Value Column</b> - PARAMETER_VALUE</li>
+</ul>
+
+<p>
+ An example of some tables that may be used with this strategy:
+</p>
+
+<table>
+ <thead>
+ <tr>
+ <th colspan="3" style="text-align: center">KAFKA</th>
+ </tr>
+ <tr>
+ <th>PARAMETER_NAME</th><th>PARAMETER_VALUE</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td>brokers</td><td>http://localhost:9092</td>
+ </tr>
+ <tr>
+ <td>topic</td><td>my-topic</td>
+ </tr>
+ <tr>
+ <td>password</td><td>my-password</td>
+ </tr>
+ </tbody>
+ <caption>Table 2: 'KAFKA' Database table example with Grouping Strategy =
Table</caption>
+</table>
+
+<table>
+ <thead>
+ <tr>
+ <th colspan="3" style="text-align: center">S3</th>
+ </tr>
+ <tr>
+ <th>PARAMETER_NAME</th><th>PARAMETER_VALUE</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td>bucket</td><td>my-bucket</td>
+ </tr>
+ <tr>
+ <td>secret.access.key</td><td>my-key</td>
+ </tr>
+ </tbody>
+ <caption>Table 3: 'S3' Database table example with Grouping Strategy =
Table</caption>
+</table>
+
+<p>
+ Once fetched, the parameters in this example will look like this:
+</p>
+
+<p>
+ Parameter Group <b>KAFKA</b>:
+ <ul>
+ <li>brokers - http://localhost:9092</li>
+ <li>topic - my-topic</li>
+ <li>password - my-password</li>
+ </ul>
+</p>
+
+<p>
+ Parameter Group <b>S3</b>:
+ <ul>
+ <li>bucket - my-bucket</li>
+ <li>secret.access.key - my-key</li>
+ </ul>
+</p>
+
+<h3>Filtering rows</h3>
+
+<p>
+ If you need to include only some rows in a table as parameters, you can
use the 'SQL WHERE clause' property. An example of this is as follows:
+</p>
+
+<ul>
+ <li><b>Parameter Grouping Strategy</b> - Table</li>
+ <li><b>Table Names</b> - KAFKA, S3</li>
+ <li><b>Parameter Name Column</b> - PARAMETER_NAME</li>
+ <li><b>Parameter Value Column</b> - PARAMETER_VALUE</li>
+ <li><b>SQL WHERE clause</b> - OTHER_COLUMN = 'my-parameters'</li>
+</ul>
+
+<p>
+ Here we are assuming there is another column, 'OTHER_COLUMN' in both the
KAFKA and S3 tables. Only rows whose 'OTHER_COLUMN' value is 'my-parameters'
+ will then be fetched from these tables.
+</p>
+
+</body>
+</html>
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/src/test/java/org/apache/nifi/parameter/TestDatabaseParameterProvider.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/src/test/java/org/apache/nifi/parameter/TestDatabaseParameterProvider.java
new file mode 100644
index 0000000000..9e6aae4acd
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/src/test/java/org/apache/nifi/parameter/TestDatabaseParameterProvider.java
@@ -0,0 +1,303 @@
+package org.apache.nifi.parameter;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockConfigurationContext;
+import org.apache.nifi.util.MockParameterProviderInitializationContext;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mockito.stubbing.OngoingStubbing;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+public class TestDatabaseParameterProvider {
+
+ public static final String DBCP_SERVICE = "dbcp-service";
+ public static final String TABLE_NAME = "myTable";
+ private DBCPService dbcpService;
+
+ private DatabaseParameterProvider parameterProvider;
+
+ private MockParameterProviderInitializationContext initializationContext;
+
+ private Map<PropertyDescriptor, String> columnBasedProperties;
+
+ private Map<PropertyDescriptor, String> nonColumnBasedProperties;
+
+ @Before
+ public void init() throws InitializationException {
+ dbcpService = mock(DBCPService.class);
+
+ final DatabaseParameterProvider rawProvider = new
DatabaseParameterProvider();
+ initializationContext = new
MockParameterProviderInitializationContext("id", "name",
mock(ComponentLog.class));
+ initializationContext.addControllerService(dbcpService, DBCP_SERVICE);
+ rawProvider.initialize(initializationContext);
+ parameterProvider = spy(rawProvider);
+ // Return the table name
+ doAnswer(invocationOnMock ->
invocationOnMock.getArgument(1)).when(parameterProvider).getQuery(any(), any(),
any(), any());
+
+ columnBasedProperties = new HashMap<>();
+
+ columnBasedProperties.put(DatabaseParameterProvider.DBCP_SERVICE,
DBCP_SERVICE);
+ columnBasedProperties.put(DatabaseParameterProvider.DB_TYPE,
"Generic");
+
columnBasedProperties.put(DatabaseParameterProvider.PARAMETER_GROUPING_STRATEGY,
DatabaseParameterProvider.GROUPING_BY_COLUMN.getValue());
+
columnBasedProperties.put(DatabaseParameterProvider.PARAMETER_GROUP_NAME_COLUMN,
"group");
+
columnBasedProperties.put(DatabaseParameterProvider.PARAMETER_NAME_COLUMN,
"name");
+
columnBasedProperties.put(DatabaseParameterProvider.PARAMETER_VALUE_COLUMN,
"value");
+ columnBasedProperties.put(DatabaseParameterProvider.TABLE_NAME,
TABLE_NAME);
+
+ nonColumnBasedProperties = new HashMap<>();
+ nonColumnBasedProperties.put(DatabaseParameterProvider.DBCP_SERVICE,
DBCP_SERVICE);
+ nonColumnBasedProperties.put(DatabaseParameterProvider.DB_TYPE,
"Generic");
+
nonColumnBasedProperties.put(DatabaseParameterProvider.PARAMETER_GROUPING_STRATEGY,
DatabaseParameterProvider.GROUPING_BY_TABLE_NAME.getValue());
+
nonColumnBasedProperties.put(DatabaseParameterProvider.PARAMETER_NAME_COLUMN,
"name");
+
nonColumnBasedProperties.put(DatabaseParameterProvider.PARAMETER_VALUE_COLUMN,
"value");
+ nonColumnBasedProperties.put(DatabaseParameterProvider.TABLE_NAMES,
"KAFKA, S3");
+ }
+
+ @Test
+ public void testColumnStrategies() throws SQLException {
+ runColumnStrategiesTest(columnBasedProperties);
+ }
+
+ @Test
+ public void testColumnStrategiesWithExtraProperties() throws SQLException {
+ // Ensure setting the unrelated properties don't break anything
+ columnBasedProperties.put(DatabaseParameterProvider.TABLE_NAMES,
"a,b");
+ runColumnStrategiesTest(columnBasedProperties);
+ }
+
+ private void runColumnStrategiesTest(final Map<PropertyDescriptor, String>
properties) throws SQLException {
+ final List<Map<String, String>> rows = Arrays.asList(
+ new HashMap<String, String>() { {
+ put("group", "Kafka"); put("name", "brokers");
put("value", "my-brokers"); put("unrelated_column", "unrelated_value");
+ } },
+ new HashMap<String, String>() { {
+ put("group", "Kafka"); put("name", "topic"); put("value",
"my-topic"); put("unrelated_column", "unrelated_value");
+ } },
+ new HashMap<String, String>() { {
+ put("group", "Kafka"); put("name", "password");
put("value", "my-password"); put("unrelated_column", "unrelated_value");
+ } },
+ new HashMap<String, String>() { {
+ put("group", "S3"); put("name", "bucket"); put("value",
"my-bucket"); put("unrelated_column", "unrelated_value");
+ } },
+ new HashMap<String, String>() { {
+ put("group", "S3"); put("name", "s3-password");
put("value", "my-s3-password"); put("unrelated_column", "unrelated_value");
+ } }
+ );
+ mockTableResults(new MockTable(TABLE_NAME, rows));
+
+ final ConfigurationContext context = new
MockConfigurationContext(properties, initializationContext);
+ final List<ParameterGroup> groups =
parameterProvider.fetchParameters(context);
+ assertEquals(2, groups.size());
+
+ for (final ParameterGroup group : groups) {
+ final String groupName = group.getGroupName();
+ if (groupName.equals("S3")) {
+ final Parameter parameter =
group.getParameters().iterator().next();
+ assertEquals("bucket", parameter.getDescriptor().getName());
+ assertEquals("my-bucket", parameter.getValue());
+ assertFalse(parameter.getDescriptor().isSensitive());
+ }
+ }
+ }
+
+ @Test
+ public void testNonColumnStrategies() throws SQLException {
+ runNonColumnStrategyTest(nonColumnBasedProperties);
+ }
+
+ @Test
+ public void testNonColumnStrategiesWithExtraProperties() throws
SQLException {
+ nonColumnBasedProperties.put(DatabaseParameterProvider.TABLE_NAME,
TABLE_NAME);
+
nonColumnBasedProperties.put(DatabaseParameterProvider.PARAMETER_GROUP_NAME_COLUMN,
"group");
+ runNonColumnStrategyTest(nonColumnBasedProperties);
+ }
+
+ private void runNonColumnStrategyTest(final Map<PropertyDescriptor,
String> properties) throws SQLException {
+ final List<Map<String, String>> kafkaRows = Arrays.asList(
+ new HashMap<String, String>() { {
+ put("name", "nifi_brokers"); put("value", "my-brokers");
+ } },
+ new HashMap<String, String>() { {
+ put("name", "nifi_topic"); put("value", "my-topic");
+ } },
+ new HashMap<String, String>() { {
+ put("name", "unrelated_field"); put("value", "my-value");
+ } },
+ new HashMap<String, String>() { {
+ put("name", "kafka_password"); put("value", "my-password");
+ } },
+ new HashMap<String, String>() { {
+ put("name", "nifi_password"); put("value",
"my-nifi-password");
+ } }
+ );
+ final List<Map<String, String>> s3Rows = Arrays.asList(
+ new HashMap<String, String>() { {
+ put("name", "nifi_s3_bucket"); put("value", "my-bucket");
+ } },
+ new HashMap<String, String>() { {
+ put("name", "s3_password"); put("value", "my-password");
+ } },
+ new HashMap<String, String>() { {
+ put("name", "nifi_other_field"); put("value", "my-field");
+ } },
+ new HashMap<String, String>() { {
+ put("name", "other_password"); put("value", "my-password");
+ } }
+ );
+ mockTableResults(new MockTable("KAFKA", kafkaRows), new
MockTable("S3", s3Rows));
+
+ final ConfigurationContext context = new
MockConfigurationContext(properties, initializationContext);
+ final List<ParameterGroup> groups =
parameterProvider.fetchParameters(context);
+ assertEquals(2, groups.size());
+
+ for (final ParameterGroup group : groups) {
+ if (group.getGroupName().equals("KAFKA")) {
+ assertTrue(group.getParameters().stream()
+ .filter(parameter ->
parameter.getDescriptor().getName().equals("nifi_brokers"))
+ .anyMatch(parameter ->
parameter.getValue().equals("my-brokers")));
+ } else {
+ assertTrue(group.getParameters().stream()
+ .filter(parameter ->
parameter.getDescriptor().getName().equals("nifi_s3_bucket"))
+ .anyMatch(parameter ->
parameter.getValue().equals("my-bucket")));
+ }
+ }
+ final Set<String> allParameterNames = groups.stream()
+ .flatMap(group -> group.getParameters().stream())
+ .map(parameter -> parameter.getDescriptor().getName())
+ .collect(Collectors.toSet());
+ assertEquals(new HashSet<>(Arrays.asList("nifi_brokers", "nifi_topic",
"kafka_password", "nifi_password",
+ "s3_password", "nifi_s3_bucket", "unrelated_field",
"nifi_other_field", "other_password")), allParameterNames);
+ }
+
+ @Test
+ public void testNullNameColumn() throws SQLException {
+ mockTableResults(new MockTable(TABLE_NAME,
+ Arrays.asList(new HashMap<String, String>() { { put("name",
null); } })));
+ runTestWithExpectedFailure(columnBasedProperties);
+ }
+
+ @Test
+ public void testNullGroupNameColumn() throws SQLException {
+ mockTableResults(new MockTable(TABLE_NAME,
+ Arrays.asList(new HashMap<String, String>() { { put("name",
"param"); put("value", "value"); put("group", null); } })));
+ runTestWithExpectedFailure(columnBasedProperties);
+ }
+
+ @Test
+ public void testNullValueColumn() throws SQLException {
+ mockTableResults(new MockTable(TABLE_NAME,
+ Arrays.asList(new HashMap<String, String>() { { put("name",
"param"); put("value", null); } })));
+ runTestWithExpectedFailure(columnBasedProperties);
+ }
+
+ public void runTestWithExpectedFailure(final Map<PropertyDescriptor,
String> properties) {
+ final ConfigurationContext context = new
MockConfigurationContext(properties, initializationContext);
+ assertThrows(IllegalStateException.class, () ->
parameterProvider.fetchParameters(context));
+ }
+
+ private void mockTableResults(final MockTable... mockTables) throws
SQLException {
+ final Connection connection = mock(Connection.class);
+ when(dbcpService.getConnection(any(Map.class))).thenReturn(connection);
+
+ OngoingStubbing<Statement> statementStubbing = null;
+ for (final MockTable mockTable : mockTables) {
+ final ResultSet resultSet = mock(ResultSet.class);
+ final ResultSetAnswer resultSetAnswer = new
ResultSetAnswer(mockTable.rows);
+ when(resultSet.next()).thenAnswer(resultSetAnswer);
+
+ when(resultSet.getString(anyString())).thenAnswer(invocationOnMock
-> resultSetAnswer.getValue(invocationOnMock.getArgument(0)));
+
+ final Statement statement = mock(Statement.class);
+
when(statement.executeQuery(ArgumentMatchers.contains(mockTable.tableName))).thenReturn(resultSet);
+
+ if (statementStubbing == null) {
+ statementStubbing =
when(connection.createStatement()).thenReturn(statement);
+ } else {
+ statementStubbing = statementStubbing.thenReturn(statement);
+ }
+ }
+ }
+
+ private class MockTable {
+ private final String tableName;
+
+ private final List<Map<String, String>> rows;
+
+ private MockTable(final String tableName, final List<Map<String,
String>> rows) {
+ this.tableName = tableName;
+ this.rows = rows;
+ }
+ }
+
+ private class ResultSetAnswer implements Answer<Boolean> {
+ private final List<java.util.Map<String, String>> rows;
+
+ private Iterator<java.util.Map<String, String>> rowIterator;
+ private java.util.Map<String, String> currentRow;
+
+ private ResultSetAnswer(final List<java.util.Map<String, String>>
rows) {
+ this.rows = rows;
+ this.rowIterator = rows.iterator();
+ }
+
+ @Override
+ public Boolean answer(final InvocationOnMock invocationOnMock) {
+ final boolean hasNext = rowIterator.hasNext();
+ if (hasNext) {
+ currentRow = rowIterator.next();
+ } else {
+ currentRow = null;
+ }
+ return hasNext;
+ }
+
+ String getValue(final String column) {
+ return currentRow.get(column);
+ }
+ }
+}