Repository: nifi
Updated Branches:
  refs/heads/master 500a254e3 -> 099bfcdf3


NIFI-5121: Added DBCPService API method for passing in flow file attributes 
when available

This closes #2658

Signed-off-by: Mike Thomsen <mikerthom...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/099bfcdf
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/099bfcdf
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/099bfcdf

Branch: refs/heads/master
Commit: 099bfcdf3a5873a311312eb7e9e85b7b22ef1b98
Parents: 500a254
Author: Matthew Burgess <mattyb...@apache.org>
Authored: Wed Apr 25 14:36:16 2018 -0400
Committer: Mike Thomsen <mikerthom...@gmail.com>
Committed: Fri May 11 08:19:16 2018 -0400

----------------------------------------------------------------------
 .../processor/util/pattern/PartialFunctions.java |  2 +-
 .../apache/nifi/processor/util/pattern/Put.java  |  3 ++-
 .../processors/groovyx/ExecuteGroovyScript.java  |  2 +-
 .../resources/groovy/test_ctl_01_access.groovy   |  2 +-
 .../apache/nifi/processors/hive/PutHiveQL.java   |  4 ++--
 .../nifi/processors/hive/SelectHiveQL.java       |  2 +-
 .../standard/AbstractDatabaseFetchProcessor.java |  3 ++-
 .../processors/standard/ConvertJSONToSQL.java    |  3 ++-
 .../nifi/processors/standard/ExecuteSQL.java     |  2 +-
 .../processors/standard/GenerateTableFetch.java  |  2 +-
 .../processors/standard/ListDatabaseTables.java  |  2 +-
 .../processors/standard/PutDatabaseRecord.java   |  5 +++--
 .../apache/nifi/processors/standard/PutSQL.java  |  6 ++++--
 .../processors/standard/QueryDatabaseTable.java  |  2 +-
 .../java/org/apache/nifi/dbcp/DBCPService.java   | 19 ++++++++++++++++++-
 15 files changed, 41 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/099bfcdf/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/PartialFunctions.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/PartialFunctions.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/PartialFunctions.java
index 8332289..7b969b0 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/PartialFunctions.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/PartialFunctions.java
@@ -32,7 +32,7 @@ public class PartialFunctions {
 
     @FunctionalInterface
     public interface InitConnection<FC, C> {
-        C apply(ProcessContext context, ProcessSession session, FC 
functionContext) throws ProcessException;
+        C apply(ProcessContext context, ProcessSession session, FC 
functionContext, FlowFile flowFile) throws ProcessException;
     }
 
     @FunctionalInterface

http://git-wip-us.apache.org/repos/asf/nifi/blob/099bfcdf/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/Put.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/Put.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/Put.java
index 790f48a..80b8088 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/Put.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/Put.java
@@ -93,7 +93,8 @@ public class Put<FC, C extends AutoCloseable> {
             return;
         }
 
-        try (C connection = initConnection.apply(context, session, 
functionContext)) {
+        // Only pass in a flow file if there is a single one present
+        try (C connection = initConnection.apply(context, session, 
functionContext, flowFiles.size() == 1 ? flowFiles.get(0) : null)) {
 
             try {
                 // Execute the core function.

http://git-wip-us.apache.org/repos/asf/nifi/blob/099bfcdf/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScript.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScript.java
 
b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScript.java
index ce89bdd..c35ab39 100644
--- 
a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScript.java
+++ 
b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScript.java
@@ -336,7 +336,7 @@ public class ExecuteGroovyScript extends AbstractProcessor {
     private void onInitSQL(HashMap SQL) throws SQLException {
         for (Map.Entry e : (Set<Map.Entry>) SQL.entrySet()) {
             DBCPService s = (DBCPService) e.getValue();
-            OSql sql = new OSql(s.getConnection());
+            OSql sql = new OSql(s.getConnection(Collections.emptyMap()));
             //try to set autocommit to false
             try {
                 if (sql.getConnection().getAutoCommit()) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/099bfcdf/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_ctl_01_access.groovy
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_ctl_01_access.groovy
 
b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_ctl_01_access.groovy
index f788fbb..858b8c4 100644
--- 
a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_ctl_01_access.groovy
+++ 
b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_ctl_01_access.groovy
@@ -17,7 +17,7 @@
 
 //just check that it's possible to access controller services
 def ff=session.create()
-def con=CTL.mydbcp.getConnection()
+def con=CTL.mydbcp.getConnection([:])
 assert con instanceof java.sql.Connection
 con.close();
 ff.write('UTF-8', 'OK')

http://git-wip-us.apache.org/repos/asf/nifi/blob/099bfcdf/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java
index d066967..e053a9a 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java
@@ -201,9 +201,9 @@ public class PutHiveQL extends AbstractHiveQLProcessor {
         }
     }
 
-    private InitConnection<FunctionContext, Connection> initConnection = 
(context, session, fc) -> {
+    private InitConnection<FunctionContext, Connection> initConnection = 
(context, session, fc, ff) -> {
         final HiveDBCPService dbcpService = 
context.getProperty(HIVE_DBCP_SERVICE).asControllerService(HiveDBCPService.class);
-        final Connection connection = dbcpService.getConnection();
+        final Connection connection = dbcpService.getConnection(ff == null ? 
Collections.emptyMap() : ff.getAttributes());
         fc.connectionUrl = dbcpService.getConnectionURL();
         return connection;
     };

http://git-wip-us.apache.org/repos/asf/nifi/blob/099bfcdf/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
index 2526ef5..74855a1 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
@@ -308,7 +308,7 @@ public class SelectHiveQL extends AbstractHiveQLProcessor {
         final boolean escape = 
context.getProperty(HIVEQL_CSV_HEADER).asBoolean();
         final String fragmentIdentifier = UUID.randomUUID().toString();
 
-        try (final Connection con = dbcpService.getConnection();
+        try (final Connection con = dbcpService.getConnection(fileToProcess == 
null ? Collections.emptyMap() : fileToProcess.getAttributes());
              final Statement st = (flowbased ? 
con.prepareStatement(selectQuery) : con.createStatement())
         ) {
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/099bfcdf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
index ef8dd0a..0210739 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
@@ -46,6 +46,7 @@ import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -260,7 +261,7 @@ public abstract class AbstractDatabaseFetchProcessor 
extends AbstractSessionFact
             final String sqlQuery = 
context.getProperty(SQL_QUERY).evaluateAttributeExpressions().getValue();
 
             final DatabaseAdapter dbAdapter = 
dbAdapters.get(context.getProperty(DB_TYPE).getValue());
-            try (final Connection con = dbcpService.getConnection();
+            try (final Connection con = dbcpService.getConnection(flowFile == 
null ? Collections.emptyMap() : flowFile.getAttributes());
                  final Statement st = con.createStatement()) {
 
                 // Try a query that returns no rows, for the purposes of 
getting metadata about the columns. It is possible

http://git-wip-us.apache.org/repos/asf/nifi/blob/099bfcdf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
index 272a3ff..9ac6b18 100755
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
@@ -28,6 +28,7 @@ import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Types;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -315,7 +316,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
             if (schema == null) {
                 // No schema exists for this table yet. Query the database to 
determine the schema and put it into the cache.
                 final DBCPService dbcpService = 
context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class);
-                try (final Connection conn = dbcpService.getConnection()) {
+                try (final Connection conn = 
dbcpService.getConnection(flowFile == null ? Collections.emptyMap() : 
flowFile.getAttributes())) {
                     schema = TableSchema.from(conn, catalog, schemaName, 
tableName, translateFieldNames, includePrimaryKeys);
                     schemaCache.put(schemaKey, schema);
                 } catch (final SQLException e) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/099bfcdf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
index 13be8d5..ac93feb 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
@@ -216,7 +216,7 @@ public class ExecuteSQL extends AbstractProcessor {
         }
 
         int resultCount=0;
-        try (final Connection con = dbcpService.getConnection();
+        try (final Connection con = dbcpService.getConnection(fileToProcess == 
null ? Collections.emptyMap() : fileToProcess.getAttributes());
             final PreparedStatement st = con.prepareStatement(selectQuery)) {
             st.setQueryTimeout(queryTimeout); // timeout in seconds
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/099bfcdf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
index 52a815d..a2d01e9 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
@@ -277,7 +277,7 @@ public class GenerateTableFetch extends 
AbstractDatabaseFetchProcessor {
             final String selectQuery = dbAdapter.getSelectStatement(tableName, 
columnsClause, whereClause, null, null, null);
             long rowCount = 0;
 
-            try (final Connection con = dbcpService.getConnection();
+            try (final Connection con = 
dbcpService.getConnection(finalFileToProcess == null ? Collections.emptyMap() : 
finalFileToProcess.getAttributes());
                  final Statement st = con.createStatement()) {
 
                 final Integer queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.SECONDS).intValue();

http://git-wip-us.apache.org/repos/asf/nifi/blob/099bfcdf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java
index d594913..f9116c5 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java
@@ -225,7 +225,7 @@ public class ListDatabaseTables extends AbstractProcessor {
             throw new ProcessException(ioe);
         }
 
-        try (final Connection con = dbcpService.getConnection()) {
+        try (final Connection con = 
dbcpService.getConnection(Collections.emptyMap())) {
 
             DatabaseMetaData dbMetaData = con.getMetaData();
             ResultSet rs = dbMetaData.getTables(catalog, schemaPattern, 
tableNamePattern, tableTypes);

http://git-wip-us.apache.org/repos/asf/nifi/blob/099bfcdf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index 9b236fe..ca79d9b 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -329,8 +329,9 @@ public class PutDatabaseRecord extends 
AbstractSessionFactoryProcessor {
                 .build();
     }
 
-    private final PartialFunctions.InitConnection<FunctionContext, Connection> 
initConnection = (c, s, fc) -> {
-        final Connection connection = 
c.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class).getConnection();
+    private final PartialFunctions.InitConnection<FunctionContext, Connection> 
initConnection = (c, s, fc, ff) -> {
+        final Connection connection = 
c.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class)
+                .getConnection(ff == null ? Collections.emptyMap() : 
ff.getAttributes());
         try {
             fc.originalAutoCommit = connection.getAutoCommit();
             connection.setAutoCommit(false);

http://git-wip-us.apache.org/repos/asf/nifi/blob/099bfcdf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
index 55e4c5f..9957c2e 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
@@ -64,6 +64,7 @@ import java.sql.SQLNonTransientException;
 import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.BitSet;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -233,8 +234,9 @@ public class PutSQL extends AbstractSessionFactoryProcessor 
{
         return poll.getFlowFiles();
     };
 
-    private final PartialFunctions.InitConnection<FunctionContext, Connection> 
initConnection = (c, s, fc) -> {
-        final Connection connection = 
c.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class).getConnection();
+    private final PartialFunctions.InitConnection<FunctionContext, Connection> 
initConnection = (c, s, fc, ff) -> {
+        final Connection connection = 
c.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class)
+                .getConnection(ff == null ? Collections.emptyMap() : 
ff.getAttributes());
         try {
             fc.originalAutoCommit = connection.getAutoCommit();
             connection.setAutoCommit(false);

http://git-wip-us.apache.org/repos/asf/nifi/blob/099bfcdf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index 17e47f5..1e8750f 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -286,7 +286,7 @@ public class QueryDatabaseTable extends 
AbstractDatabaseFetchProcessor {
         final StopWatch stopWatch = new StopWatch(true);
         final String fragmentIdentifier = UUID.randomUUID().toString();
 
-        try (final Connection con = dbcpService.getConnection();
+        try (final Connection con = 
dbcpService.getConnection(Collections.emptyMap());
              final Statement st = con.createStatement()) {
 
             if (fetchSize != null && fetchSize > 0) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/099bfcdf/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-api/src/main/java/org/apache/nifi/dbcp/DBCPService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-api/src/main/java/org/apache/nifi/dbcp/DBCPService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-api/src/main/java/org/apache/nifi/dbcp/DBCPService.java
index 9cfe096..ffc9b3a 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-api/src/main/java/org/apache/nifi/dbcp/DBCPService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-api/src/main/java/org/apache/nifi/dbcp/DBCPService.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.dbcp;
 
 import java.sql.Connection;
+import java.util.Map;
 
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
@@ -30,5 +31,21 @@ import org.apache.nifi.processor.exception.ProcessException;
 @Tags({"dbcp", "jdbc", "database", "connection", "pooling", "store"})
 @CapabilityDescription("Provides Database Connection Pooling Service. 
Connections can be asked from pool and returned after usage.")
 public interface DBCPService extends ControllerService {
-    public Connection getConnection()  throws ProcessException;
+    Connection getConnection() throws ProcessException;
+
+    /**
+     * Allows a Map of attributes to be passed to the DBCPService for use in 
configuration, etc.
+     * An implementation will want to override getConnection() to return 
getConnection(Collections.emptyMap()),
+     * and override this method (possibly with its existing getConnection() 
implementation).
+     * @param attributes a Map of attributes to be passed to the DBCPService. 
The use of these
+     *                   attributes is implementation-specific, and the source 
of the attributes
+     *                   is processor-specific
+     * @return a Connection from the specifed/configured connection pool(s)
+     * @throws ProcessException if an error occurs while getting a connection
+     */
+    default Connection getConnection(Map<String,String> attributes) throws 
ProcessException {
+        // default implementation (for backwards compatibility) is to call 
getConnection()
+        // without attributes
+        return getConnection();
+    }
 }

Reply via email to