This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 094222260c NIFI-9402: Adding DatabaseParameterProvider
094222260c is described below

commit 094222260c7cb7dd839520abb2dcf3bcec316ea5
Author: Joe Gresock <[email protected]>
AuthorDate: Sat Sep 10 08:40:54 2022 -0400

    NIFI-9402: Adding DatabaseParameterProvider
    
    Adding provided scope to api dependency in 
nifi-standard-parameter-providers module
    
    Adding additional documentation, other minor code cleanup
    
    Correcting error handling in StandardParameterProviderNode, updating 
additional details for DatabaseParameterProvider
    
    Correcting null columm value handling
    
    NIFI-9402: Fixed Checkstyle violation
    Signed-off-by: Matthew Burgess <[email protected]>
    
    This closes #6391
---
 .../parameter/StandardParameterProviderNode.java   |   4 +-
 .../nifi-standard-parameter-providers/pom.xml      |  11 +
 .../nifi/parameter/DatabaseParameterProvider.java  | 265 ++++++++++++++++++
 .../org.apache.nifi.parameter.ParameterProvider    |   1 +
 .../additionalDetails.html                         | 214 +++++++++++++++
 .../parameter/TestDatabaseParameterProvider.java   | 303 +++++++++++++++++++++
 6 files changed, 796 insertions(+), 2 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/parameter/StandardParameterProviderNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/parameter/StandardParameterProviderNode.java
index a1651e6376..a35fbea3da 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/parameter/StandardParameterProviderNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/parameter/StandardParameterProviderNode.java
@@ -277,8 +277,8 @@ public class StandardParameterProviderNode extends 
AbstractComponentNode impleme
         List<ParameterGroup> fetchedParameterGroups;
         try (final NarCloseable narCloseable = 
NarCloseable.withComponentNarLoader(getExtensionManager(), 
parameterProvider.getClass(), parameterProvider.getIdentifier())) {
             fetchedParameterGroups = 
parameterProvider.fetchParameters(configurationContext);
-        } catch (final IOException e) {
-            throw new RuntimeException(String.format("Error fetching 
parameters for %s", this), e);
+        } catch (final IOException | RuntimeException e) {
+            throw new IllegalStateException(String.format("Error fetching 
parameters for %s: %s", this, e.getMessage()), e);
         }
 
         if (fetchedParameterGroups == null || 
fetchedParameterGroups.isEmpty()) {
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/pom.xml
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/pom.xml
index cc5d40a7da..16111bda50 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/pom.xml
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/pom.xml
@@ -30,6 +30,17 @@
             <groupId>commons-io</groupId>
             <artifactId>commons-io</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-dbcp-service-api</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-processors</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-utils</artifactId>
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/src/main/java/org/apache/nifi/parameter/DatabaseParameterProvider.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/src/main/java/org/apache/nifi/parameter/DatabaseParameterProvider.java
new file mode 100644
index 0000000000..0bb924350a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/src/main/java/org/apache/nifi/parameter/DatabaseParameterProvider.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.parameter;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.db.DatabaseAdapter;
+import org.apache.nifi.util.StringUtils;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.stream.Collectors;
+
+@Tags({"database", "dbcp", "sql"})
+@CapabilityDescription("Fetches parameters from database tables")
+
+public class DatabaseParameterProvider extends AbstractParameterProvider 
implements VerifiableParameterProvider {
+
+    protected final static Map<String, DatabaseAdapter> dbAdapters = new 
HashMap<>();
+
+    public static final PropertyDescriptor DB_TYPE;
+
+    static {
+        // Load the DatabaseAdapters
+        ArrayList<AllowableValue> dbAdapterValues = new ArrayList<>();
+        ServiceLoader<DatabaseAdapter> dbAdapterLoader = 
ServiceLoader.load(DatabaseAdapter.class);
+        dbAdapterLoader.forEach(it -> {
+            dbAdapters.put(it.getName(), it);
+            dbAdapterValues.add(new AllowableValue(it.getName(), it.getName(), 
it.getDescription()));
+        });
+
+        DB_TYPE = new PropertyDescriptor.Builder()
+                .name("db-type")
+                .displayName("Database Type")
+                .description("The type/flavor of database, used for generating 
database-specific code. In many cases the Generic type "
+                        + "should suffice, but some databases (such as Oracle) 
require custom SQL clauses. ")
+                .allowableValues(dbAdapterValues.toArray(new 
AllowableValue[dbAdapterValues.size()]))
+                .defaultValue("Generic")
+                .required(true)
+                .build();
+    }
+
+    static AllowableValue GROUPING_BY_COLUMN = new 
AllowableValue("grouping-by-column", "Column",
+            "A single table is partitioned by the 'Parameter Group Name 
Column'.  All rows with the same value in this column will " +
+                    "map to a group of the same name.");
+    static AllowableValue GROUPING_BY_TABLE_NAME = new 
AllowableValue("grouping-by-table-name", "Table Name",
+            "An entire table maps to a Parameter Group.  The group name will 
be the table name.");
+
+    public static final PropertyDescriptor DBCP_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("dbcp-service")
+            .displayName("Database Connection Pooling Service")
+            .description("The Controller Service that is used to obtain a 
connection to the database.")
+            .required(true)
+            .identifiesControllerService(DBCPService.class)
+            .build();
+
+    public static final PropertyDescriptor PARAMETER_GROUPING_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("parameter-grouping-strategy")
+            .displayName("Parameter Grouping Strategy")
+            .description("The strategy used to group parameters.")
+            .required(true)
+            .allowableValues(GROUPING_BY_COLUMN, GROUPING_BY_TABLE_NAME)
+            .defaultValue(GROUPING_BY_COLUMN.getValue())
+            .build();
+
+    public static final PropertyDescriptor TABLE_NAMES = new 
PropertyDescriptor.Builder()
+            .name("table-names")
+            .displayName("Table Names")
+            .description("A comma-separated list of names of the database 
tables containing the parameters.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .dependsOn(PARAMETER_GROUPING_STRATEGY, GROUPING_BY_TABLE_NAME)
+            .build();
+
+    public static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
+            .name("table-name")
+            .displayName("Table Name")
+            .description("The name of the database table containing the 
parameters.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .dependsOn(PARAMETER_GROUPING_STRATEGY, GROUPING_BY_COLUMN)
+            .build();
+
+    public static final PropertyDescriptor PARAMETER_NAME_COLUMN = new 
PropertyDescriptor.Builder()
+            .name("parameter-name-column")
+            .displayName("Parameter Name Column")
+            .description("The name of a column containing the parameter name.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PARAMETER_VALUE_COLUMN = new 
PropertyDescriptor.Builder()
+            .name("parameter-value-column")
+            .displayName("Parameter Value Column")
+            .description("The name of a column containing the parameter 
value.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor PARAMETER_GROUP_NAME_COLUMN = new 
PropertyDescriptor.Builder()
+            .name("parameter-group-name-column")
+            .displayName("Parameter Group Name Column")
+            .description("The name of a column containing the name of the 
parameter group into which the parameter should be mapped.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .dependsOn(PARAMETER_GROUPING_STRATEGY, GROUPING_BY_COLUMN)
+            .build();
+
+    public static final PropertyDescriptor SQL_WHERE_CLAUSE = new 
PropertyDescriptor.Builder()
+            .name("sql-where-clause")
+            .displayName("SQL WHERE clause")
+            .description("A optional SQL query 'WHERE' clause by which to 
filter all results.  The 'WHERE' keyword should not be included.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    private List<PropertyDescriptor> properties;
+
+    @Override
+    protected void init(final ParameterProviderInitializationContext config) {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(DB_TYPE);
+        properties.add(DBCP_SERVICE);
+        properties.add(PARAMETER_GROUPING_STRATEGY);
+        properties.add(TABLE_NAME);
+        properties.add(TABLE_NAMES);
+        properties.add(PARAMETER_NAME_COLUMN);
+        properties.add(PARAMETER_VALUE_COLUMN);
+        properties.add(PARAMETER_GROUP_NAME_COLUMN);
+        properties.add(SQL_WHERE_CLAUSE);
+
+        this.properties = Collections.unmodifiableList(properties);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public List<ParameterGroup> fetchParameters(final ConfigurationContext 
context) {
+        final boolean groupByColumn = 
GROUPING_BY_COLUMN.getValue().equals(context.getProperty(PARAMETER_GROUPING_STRATEGY).getValue());
+
+        final DBCPService dbcpService = 
context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
+        final String whereClause = 
context.getProperty(SQL_WHERE_CLAUSE).getValue();
+        final String parameterNameColumn = 
context.getProperty(PARAMETER_NAME_COLUMN).getValue();
+        final String parameterValueColumn = 
context.getProperty(PARAMETER_VALUE_COLUMN).getValue();
+        final String parameterGroupNameColumn = 
context.getProperty(PARAMETER_GROUP_NAME_COLUMN).getValue();
+
+        final List<String> tableNames = groupByColumn
+                ? 
Collections.singletonList(context.getProperty(TABLE_NAME).getValue())
+                : 
Arrays.stream(context.getProperty(TABLE_NAMES).getValue().split(",")).map(String::trim).collect(Collectors.toList());
+
+        final Map<String, List<Parameter>> parameterMap = new HashMap<>();
+        for (final String tableName : tableNames) {
+            try (final Connection con = 
dbcpService.getConnection(Collections.emptyMap()); final Statement st = 
con.createStatement()) {
+                final List<String> columns = new ArrayList<>();
+                columns.add(parameterNameColumn);
+                columns.add(parameterValueColumn);
+                if (groupByColumn) {
+                    columns.add(parameterGroupNameColumn);
+                }
+                final String query = getQuery(context, tableName, columns, 
whereClause);
+
+                getLogger().info("Fetching parameters with query: " + query);
+                try (final ResultSet rs = st.executeQuery(query)) {
+                    while (rs.next()) {
+                        final String parameterName = 
rs.getString(parameterNameColumn);
+                        final String parameterValue = 
rs.getString(parameterValueColumn);
+
+                        validateValueNotNull(parameterName, 
parameterNameColumn);
+                        validateValueNotNull(parameterValue, 
parameterValueColumn);
+                        final String parameterGroupName;
+                        if (groupByColumn) {
+                            parameterGroupName = parameterGroupNameColumn == 
null ? null : rs.getString(parameterGroupNameColumn);
+                            validateValueNotNull(parameterGroupName, 
parameterGroupNameColumn);
+                        } else {
+                            parameterGroupName = tableName;
+                        }
+
+                        final ParameterDescriptor parameterDescriptor = new 
ParameterDescriptor.Builder()
+                                .name(parameterName)
+                                .build();
+                        final Parameter parameter = new 
Parameter(parameterDescriptor, parameterValue);
+
+                        parameterMap.computeIfAbsent(parameterGroupName, key 
-> new ArrayList<>()).add(parameter);
+                    }
+                }
+            } catch (final SQLException e) {
+                getLogger().error("Encountered a database error when fetching 
parameters: {}", e.getMessage(), e);
+                throw new RuntimeException("Encountered a database error when 
fetching parameters: " + e.getMessage(), e);
+            }
+        }
+
+        return parameterMap.entrySet().stream()
+                .map(entry -> new ParameterGroup(entry.getKey(), 
entry.getValue()))
+                .collect(Collectors.toList());
+    }
+
+    private void validateValueNotNull(final String value, final String 
columnName) {
+        if (value == null) {
+            throw new IllegalStateException(String.format("Expected %s column 
to be non-null", columnName));
+        }
+    }
+
+    String getQuery(final ConfigurationContext context, final String 
tableName, final List<String> columns, final String whereClause) {
+        final DatabaseAdapter dbAdapter = 
dbAdapters.get(context.getProperty(DB_TYPE).getValue());
+        return dbAdapter.getSelectStatement(tableName, 
StringUtils.join(columns, ", "), whereClause, null, null, null);
+    }
+
+    @Override
+    public List<ConfigVerificationResult> verify(final ConfigurationContext 
context, final ComponentLog verificationLogger) {
+        final List<ConfigVerificationResult> results = new ArrayList<>();
+        try {
+            final List<ParameterGroup> parameterGroups = 
fetchParameters(context);
+            final long parameterCount = parameterGroups.stream()
+                    .flatMap(group -> group.getParameters().stream())
+                    .count();
+            results.add(new ConfigVerificationResult.Builder()
+                    .outcome(ConfigVerificationResult.Outcome.SUCCESSFUL)
+                    .verificationStepName("Fetch Parameters")
+                    .explanation(String.format("Successfully fetched %s 
Parameter Groups containing %s Parameters matching the filter.", 
parameterGroups.size(),
+                            parameterCount))
+                    .build());
+        } catch (final Exception e) {
+            verificationLogger.error("Failed to fetch Parameter Groups", e);
+            results.add(new ConfigVerificationResult.Builder()
+                    .outcome(ConfigVerificationResult.Outcome.FAILED)
+                    .verificationStepName("Fetch Parameters")
+                    .explanation(String.format("Failed to parameters: " + 
e.getMessage()))
+                    .build());
+        }
+
+        return results;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/src/main/resources/META-INF/services/org.apache.nifi.parameter.ParameterProvider
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/src/main/resources/META-INF/services/org.apache.nifi.parameter.ParameterProvider
index 77ce897ba2..3b17ef9236 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/src/main/resources/META-INF/services/org.apache.nifi.parameter.ParameterProvider
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/src/main/resources/META-INF/services/org.apache.nifi.parameter.ParameterProvider
@@ -14,3 +14,4 @@
 # limitations under the License.
 org.apache.nifi.parameter.EnvironmentVariableParameterProvider
 org.apache.nifi.parameter.FileParameterProvider
+org.apache.nifi.parameter.DatabaseParameterProvider
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/src/main/resources/docs/org.apache.nifi.parameter.DatabaseParameterProvider/additionalDetails.html
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/src/main/resources/docs/org.apache.nifi.parameter.DatabaseParameterProvider/additionalDetails.html
new file mode 100644
index 0000000000..b349f8aa7e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/src/main/resources/docs/org.apache.nifi.parameter.DatabaseParameterProvider/additionalDetails.html
@@ -0,0 +1,214 @@
+<!DOCTYPE html>
+<html lang="en" xmlns="http://www.w3.org/1999/html";>
+<!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+
+<head>
+    <meta charset="utf-8"/>
+    <title>DatabaseParameterProvider</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" 
type="text/css"/>
+</head>
+<body>
+
+<h1>Providing Parameters from a Database</h1>
+
+<p>
+    The DatabaseParameterProvider at its core maps database rows to 
Parameters, specified by a
+    Parameter Name Column and Parameter Value Column.  The Parameter Group 
name must also be accounted for, and may
+    be specified in different ways using the Parameter Grouping Strategy.
+</p>
+
+<p>
+    Before discussing the actual configuration, note that in some databases, 
the words 'PARAMETER', 'PARAMETERS', 'GROUP',
+    and even 'VALUE' are reserved words.  If you choose a column name that is 
a reserved word in the database you are using,
+    make sure to quote it per the database documentation.
+</p>
+
+<p>
+    Also note that you should use the preferred table name and column name 
case for your database.  For example, Postgres
+    prefers lowercase table and column names, while Oracle prefers capitalized 
ones.  Choosing the appropriate case can
+    avoid unexpected issues in configuring your DatabaseParameterProvider.
+</p>
+
+<p>
+    The default configuration uses a fully column-based approach, with the 
Parameter Group Name
+    also specified by columns in the same table.  An example of a table using 
this configuration would be:
+</p>
+<table>
+    <thead>
+        <tr>
+            <th colspan="4" style="text-align: center">PARAMETER_CONTEXTS</th>
+        </tr>
+        <tr>
+            
<th>PARAMETER_NAME</th><th>PARAMETER_VALUE</th><th>PARAMETER_GROUP</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td>param.foo</td><td>value-foo</td><td>group_1</td>
+        </tr>
+        <tr>
+            <td>param.bar</td><td>value-bar</td><td>group_1</td>
+        </tr>
+        <tr>
+            <td>param.one</td><td>value-one</td><td>group_2</td>
+        </tr>
+        <tr>
+            <td>param.two</td><td>value-two</td><td>group_2</td>
+        </tr>
+    </tbody>
+    <caption>Table 1: Database table example with Grouping Strategy = 
Column</caption>
+</table>
+
+<p>
+    In order to use the data from this table, set the following Properties:
+</p>
+
+<ul>
+    <li><b>Parameter Grouping Strategy</b> - Column</li>
+    <li><b>Table Name</b> - PARAMETER_CONTEXTS</li>
+    <li><b>Parameter Name Column</b> - PARAMETER_NAME</li>
+    <li><b>Parameter Value Column</b> - PARAMETER_VALUE</li>
+    <li><b>Parameter Group Name Column</b> - PARAMETER_GROUP</li>
+</ul>
+
+<p>
+    Once fetched, the parameters in this example will look like this:
+</p>
+
+<p>
+    Parameter Group <b>group_1</b>:
+    <ul>
+        <li>param.foo - value-foo</li>
+        <li>param.bar - value-bar</li>
+    </ul>
+</p>
+
+<p>
+    Parameter Group <b>group_2</b>:
+    <ul>
+        <li>param.one - value-one</li>
+        <li>param.two - value-two</li>
+    </ul>
+</p>
+
+<h3>Grouping Strategy</h3>
+
+<p>
+    The default Grouping Strategy is by Column, which allows you to specify 
the parameter Group name explicitly in the Parameter Group Column.
+    Note that if the value in this column is NULL, an exception will be thrown.
+</p>
+
+<p>
+    The other Grouping Strategy is by Table, which maps each table to a 
Parameter Group and sets the Parameter Group Name to the table name.
+    In this Grouping Strategy, the Parameter Group Column is not used.  An 
example configuration using this strategy would be:
+</p>
+
+<ul>
+    <li><b>Parameter Grouping Strategy</b> - Table</li>
+    <li><b>Table Names</b> - KAFKA, S3</li>
+    <li><b>Parameter Name Column</b> - PARAMETER_NAME</li>
+    <li><b>Parameter Value Column</b> - PARAMETER_VALUE</li>
+</ul>
+
+<p>
+    An example of some tables that may be used with this strategy:
+</p>
+
+<table>
+    <thead>
+        <tr>
+            <th colspan="3" style="text-align: center">KAFKA</th>
+        </tr>
+        <tr>
+            <th>PARAMETER_NAME</th><th>PARAMETER_VALUE</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td>brokers</td><td>http://localhost:9092</td>
+        </tr>
+        <tr>
+            <td>topic</td><td>my-topic</td>
+        </tr>
+        <tr>
+            <td>password</td><td>my-password</td>
+        </tr>
+    </tbody>
+    <caption>Table 2: 'KAFKA' Database table example with Grouping Strategy = 
Table</caption>
+</table>
+
+<table>
+    <thead>
+        <tr>
+            <th colspan="3" style="text-align: center">S3</th>
+        </tr>
+        <tr>
+            <th>PARAMETER_NAME</th><th>PARAMETER_VALUE</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td>bucket</td><td>my-bucket</td>
+        </tr>
+        <tr>
+            <td>secret.access.key</td><td>my-key</td>
+        </tr>
+    </tbody>
+    <caption>Table 3: 'S3' Database table example with Grouping Strategy = 
Table</caption>
+</table>
+
+<p>
+    Once fetched, the parameters in this example will look like this:
+</p>
+
+<p>
+    Parameter Group <b>KAFKA</b>:
+    <ul>
+        <li>brokers - http://localhost:9092</li>
+        <li>topic - my-topic</li>
+        <li>password - my-password</li>
+    </ul>
+</p>
+
+<p>
+    Parameter Group <b>S3</b>:
+    <ul>
+        <li>bucket - my-bucket</li>
+        <li>secret.access.key - my-key</li>
+    </ul>
+</p>
+
+<h3>Filtering rows</h3>
+
+<p>
+    If you need to include only some rows in a table as parameters, you can 
use the 'SQL WHERE clause' property.  An example of this is as follows:
+</p>
+
+<ul>
+    <li><b>Parameter Grouping Strategy</b> - Table</li>
+    <li><b>Table Names</b> - KAFKA, S3</li>
+    <li><b>Parameter Name Column</b> - PARAMETER_NAME</li>
+    <li><b>Parameter Value Column</b> - PARAMETER_VALUE</li>
+    <li><b>SQL WHERE clause</b> - OTHER_COLUMN = 'my-parameters'</li>
+</ul>
+
+<p>
+    Here we are assuming there is another column, 'OTHER_COLUMN' in both the 
KAFKA and S3 tables.  Only rows whose 'OTHER_COLUMN' value is 'my-parameters'
+    will then be fetched from these tables.
+</p>
+
+</body>
+</html>
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/src/test/java/org/apache/nifi/parameter/TestDatabaseParameterProvider.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/src/test/java/org/apache/nifi/parameter/TestDatabaseParameterProvider.java
new file mode 100644
index 0000000000..9e6aae4acd
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-parameter-providers/src/test/java/org/apache/nifi/parameter/TestDatabaseParameterProvider.java
@@ -0,0 +1,303 @@
+package org.apache.nifi.parameter;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockConfigurationContext;
+import org.apache.nifi.util.MockParameterProviderInitializationContext;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mockito.stubbing.OngoingStubbing;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+public class TestDatabaseParameterProvider {
+
+    public static final String DBCP_SERVICE = "dbcp-service";
+    public static final String TABLE_NAME = "myTable";
+    private DBCPService dbcpService;
+
+    private DatabaseParameterProvider parameterProvider;
+
+    private MockParameterProviderInitializationContext initializationContext;
+
+    private Map<PropertyDescriptor, String> columnBasedProperties;
+
+    private Map<PropertyDescriptor, String> nonColumnBasedProperties;
+
+    @Before
+    public void init() throws InitializationException {
+        dbcpService = mock(DBCPService.class);
+
+        final DatabaseParameterProvider rawProvider = new 
DatabaseParameterProvider();
+        initializationContext = new 
MockParameterProviderInitializationContext("id", "name", 
mock(ComponentLog.class));
+        initializationContext.addControllerService(dbcpService, DBCP_SERVICE);
+        rawProvider.initialize(initializationContext);
+        parameterProvider = spy(rawProvider);
+        // Return the table name
+        doAnswer(invocationOnMock -> 
invocationOnMock.getArgument(1)).when(parameterProvider).getQuery(any(), any(), 
any(), any());
+
+        columnBasedProperties = new HashMap<>();
+
+        columnBasedProperties.put(DatabaseParameterProvider.DBCP_SERVICE, 
DBCP_SERVICE);
+        columnBasedProperties.put(DatabaseParameterProvider.DB_TYPE, 
"Generic");
+        
columnBasedProperties.put(DatabaseParameterProvider.PARAMETER_GROUPING_STRATEGY,
 DatabaseParameterProvider.GROUPING_BY_COLUMN.getValue());
+        
columnBasedProperties.put(DatabaseParameterProvider.PARAMETER_GROUP_NAME_COLUMN,
 "group");
+        
columnBasedProperties.put(DatabaseParameterProvider.PARAMETER_NAME_COLUMN, 
"name");
+        
columnBasedProperties.put(DatabaseParameterProvider.PARAMETER_VALUE_COLUMN, 
"value");
+        columnBasedProperties.put(DatabaseParameterProvider.TABLE_NAME, 
TABLE_NAME);
+
+        nonColumnBasedProperties = new HashMap<>();
+        nonColumnBasedProperties.put(DatabaseParameterProvider.DBCP_SERVICE, 
DBCP_SERVICE);
+        nonColumnBasedProperties.put(DatabaseParameterProvider.DB_TYPE, 
"Generic");
+        
nonColumnBasedProperties.put(DatabaseParameterProvider.PARAMETER_GROUPING_STRATEGY,
 DatabaseParameterProvider.GROUPING_BY_TABLE_NAME.getValue());
+        
nonColumnBasedProperties.put(DatabaseParameterProvider.PARAMETER_NAME_COLUMN, 
"name");
+        
nonColumnBasedProperties.put(DatabaseParameterProvider.PARAMETER_VALUE_COLUMN, 
"value");
+        nonColumnBasedProperties.put(DatabaseParameterProvider.TABLE_NAMES, 
"KAFKA, S3");
+    }
+
+    @Test
+    public void testColumnStrategies() throws SQLException {
+        runColumnStrategiesTest(columnBasedProperties);
+    }
+
+    @Test
+    public void testColumnStrategiesWithExtraProperties() throws SQLException {
+        // Ensure setting the unrelated properties don't break anything
+        columnBasedProperties.put(DatabaseParameterProvider.TABLE_NAMES, 
"a,b");
+        runColumnStrategiesTest(columnBasedProperties);
+    }
+
+    private void runColumnStrategiesTest(final Map<PropertyDescriptor, String> 
properties) throws SQLException {
+        final List<Map<String, String>> rows = Arrays.asList(
+                new HashMap<String, String>() { {
+                    put("group", "Kafka"); put("name", "brokers"); 
put("value", "my-brokers"); put("unrelated_column", "unrelated_value");
+                } },
+                new HashMap<String, String>() { {
+                    put("group", "Kafka"); put("name", "topic"); put("value", 
"my-topic"); put("unrelated_column", "unrelated_value");
+                } },
+                new HashMap<String, String>() { {
+                    put("group", "Kafka"); put("name", "password"); 
put("value", "my-password"); put("unrelated_column", "unrelated_value");
+                } },
+                new HashMap<String, String>() { {
+                    put("group", "S3"); put("name", "bucket"); put("value", 
"my-bucket"); put("unrelated_column", "unrelated_value");
+                } },
+                new HashMap<String, String>() { {
+                    put("group", "S3"); put("name", "s3-password"); 
put("value", "my-s3-password"); put("unrelated_column", "unrelated_value");
+                } }
+        );
+        mockTableResults(new MockTable(TABLE_NAME, rows));
+
+        final ConfigurationContext context = new 
MockConfigurationContext(properties, initializationContext);
+        final List<ParameterGroup> groups = 
parameterProvider.fetchParameters(context);
+        assertEquals(2, groups.size());
+
+        for (final ParameterGroup group : groups) {
+            final String groupName = group.getGroupName();
+            if (groupName.equals("S3")) {
+                final Parameter parameter = 
group.getParameters().iterator().next();
+                assertEquals("bucket", parameter.getDescriptor().getName());
+                assertEquals("my-bucket", parameter.getValue());
+                assertFalse(parameter.getDescriptor().isSensitive());
+            }
+        }
+    }
+
+    @Test
+    public void testNonColumnStrategies() throws SQLException {
+        runNonColumnStrategyTest(nonColumnBasedProperties);
+    }
+
+    @Test
+    public void testNonColumnStrategiesWithExtraProperties() throws 
SQLException {
+        nonColumnBasedProperties.put(DatabaseParameterProvider.TABLE_NAME, 
TABLE_NAME);
+        
nonColumnBasedProperties.put(DatabaseParameterProvider.PARAMETER_GROUP_NAME_COLUMN,
 "group");
+        runNonColumnStrategyTest(nonColumnBasedProperties);
+    }
+
+    private void runNonColumnStrategyTest(final Map<PropertyDescriptor, 
String> properties) throws SQLException {
+        final List<Map<String, String>> kafkaRows = Arrays.asList(
+                new HashMap<String, String>() { {
+                    put("name", "nifi_brokers"); put("value", "my-brokers");
+                } },
+                new HashMap<String, String>() { {
+                    put("name", "nifi_topic"); put("value", "my-topic");
+                } },
+                new HashMap<String, String>() { {
+                    put("name", "unrelated_field"); put("value", "my-value");
+                } },
+                new HashMap<String, String>() { {
+                    put("name", "kafka_password"); put("value", "my-password");
+                } },
+                new HashMap<String, String>() { {
+                    put("name", "nifi_password"); put("value", 
"my-nifi-password");
+                } }
+        );
+        final List<Map<String, String>> s3Rows = Arrays.asList(
+                new HashMap<String, String>() { {
+                    put("name", "nifi_s3_bucket"); put("value", "my-bucket");
+                } },
+                new HashMap<String, String>() { {
+                    put("name", "s3_password"); put("value", "my-password");
+                } },
+                new HashMap<String, String>() { {
+                    put("name", "nifi_other_field"); put("value", "my-field");
+                } },
+                new HashMap<String, String>() { {
+                    put("name", "other_password"); put("value", "my-password");
+                } }
+        );
+        mockTableResults(new MockTable("KAFKA", kafkaRows), new 
MockTable("S3", s3Rows));
+
+        final ConfigurationContext context = new 
MockConfigurationContext(properties, initializationContext);
+        final List<ParameterGroup> groups = 
parameterProvider.fetchParameters(context);
+        assertEquals(2, groups.size());
+
+        for (final ParameterGroup group : groups) {
+            if (group.getGroupName().equals("KAFKA")) {
+                assertTrue(group.getParameters().stream()
+                        .filter(parameter -> 
parameter.getDescriptor().getName().equals("nifi_brokers"))
+                        .anyMatch(parameter -> 
parameter.getValue().equals("my-brokers")));
+            } else {
+                assertTrue(group.getParameters().stream()
+                        .filter(parameter -> 
parameter.getDescriptor().getName().equals("nifi_s3_bucket"))
+                        .anyMatch(parameter -> 
parameter.getValue().equals("my-bucket")));
+            }
+        }
+        final Set<String> allParameterNames = groups.stream()
+                .flatMap(group -> group.getParameters().stream())
+                .map(parameter -> parameter.getDescriptor().getName())
+                .collect(Collectors.toSet());
+        assertEquals(new HashSet<>(Arrays.asList("nifi_brokers", "nifi_topic", 
"kafka_password", "nifi_password",
+                "s3_password", "nifi_s3_bucket", "unrelated_field", 
"nifi_other_field", "other_password")), allParameterNames);
+    }
+
+    @Test
+    public void testNullNameColumn() throws SQLException {
+        mockTableResults(new MockTable(TABLE_NAME,
+                Arrays.asList(new HashMap<String, String>() { { put("name", 
null); } })));
+        runTestWithExpectedFailure(columnBasedProperties);
+    }
+
+    @Test
+    public void testNullGroupNameColumn() throws SQLException {
+        mockTableResults(new MockTable(TABLE_NAME,
+                Arrays.asList(new HashMap<String, String>() { {  put("name", 
"param"); put("value", "value"); put("group", null); } })));
+        runTestWithExpectedFailure(columnBasedProperties);
+    }
+
+    @Test
+    public void testNullValueColumn() throws SQLException {
+        mockTableResults(new MockTable(TABLE_NAME,
+                Arrays.asList(new HashMap<String, String>() { { put("name", 
"param"); put("value", null); } })));
+        runTestWithExpectedFailure(columnBasedProperties);
+    }
+
+    public void runTestWithExpectedFailure(final Map<PropertyDescriptor, 
String> properties) {
+        final ConfigurationContext context = new 
MockConfigurationContext(properties, initializationContext);
+        assertThrows(IllegalStateException.class, () -> 
parameterProvider.fetchParameters(context));
+    }
+
+    private void mockTableResults(final MockTable... mockTables) throws 
SQLException {
+        final Connection connection = mock(Connection.class);
+        when(dbcpService.getConnection(any(Map.class))).thenReturn(connection);
+
+        OngoingStubbing<Statement> statementStubbing = null;
+        for (final MockTable mockTable : mockTables) {
+            final ResultSet resultSet = mock(ResultSet.class);
+            final ResultSetAnswer resultSetAnswer = new 
ResultSetAnswer(mockTable.rows);
+            when(resultSet.next()).thenAnswer(resultSetAnswer);
+
+            when(resultSet.getString(anyString())).thenAnswer(invocationOnMock 
-> resultSetAnswer.getValue(invocationOnMock.getArgument(0)));
+
+            final Statement statement = mock(Statement.class);
+            
when(statement.executeQuery(ArgumentMatchers.contains(mockTable.tableName))).thenReturn(resultSet);
+
+            if (statementStubbing == null) {
+                statementStubbing = 
when(connection.createStatement()).thenReturn(statement);
+            } else {
+                statementStubbing = statementStubbing.thenReturn(statement);
+            }
+        }
+    }
+
+    private class MockTable {
+        private final String tableName;
+
+        private final List<Map<String, String>> rows;
+
+        private MockTable(final String tableName, final List<Map<String, 
String>> rows) {
+            this.tableName = tableName;
+            this.rows = rows;
+        }
+    }
+
+    private class ResultSetAnswer implements Answer<Boolean> {
+        private final List<java.util.Map<String, String>> rows;
+
+        private Iterator<java.util.Map<String, String>> rowIterator;
+        private java.util.Map<String, String> currentRow;
+
+        private ResultSetAnswer(final List<java.util.Map<String, String>> 
rows) {
+            this.rows = rows;
+            this.rowIterator = rows.iterator();
+        }
+
+        @Override
+        public Boolean answer(final InvocationOnMock invocationOnMock) {
+            final boolean hasNext = rowIterator.hasNext();
+            if (hasNext) {
+                currentRow = rowIterator.next();
+            } else {
+                currentRow = null;
+            }
+            return hasNext;
+        }
+
+        String getValue(final String column) {
+            return currentRow.get(column);
+        }
+    }
+}


Reply via email to