Repository: nifi Updated Branches: refs/heads/master 500a254e3 -> 099bfcdf3
NIFI-5121: Added DBCPService API method for passing in flow file attributes when available This closes #2658 Signed-off-by: Mike Thomsen <mikerthom...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/099bfcdf Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/099bfcdf Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/099bfcdf Branch: refs/heads/master Commit: 099bfcdf3a5873a311312eb7e9e85b7b22ef1b98 Parents: 500a254 Author: Matthew Burgess <mattyb...@apache.org> Authored: Wed Apr 25 14:36:16 2018 -0400 Committer: Mike Thomsen <mikerthom...@gmail.com> Committed: Fri May 11 08:19:16 2018 -0400 ---------------------------------------------------------------------- .../processor/util/pattern/PartialFunctions.java | 2 +- .../apache/nifi/processor/util/pattern/Put.java | 3 ++- .../processors/groovyx/ExecuteGroovyScript.java | 2 +- .../resources/groovy/test_ctl_01_access.groovy | 2 +- .../apache/nifi/processors/hive/PutHiveQL.java | 4 ++-- .../nifi/processors/hive/SelectHiveQL.java | 2 +- .../standard/AbstractDatabaseFetchProcessor.java | 3 ++- .../processors/standard/ConvertJSONToSQL.java | 3 ++- .../nifi/processors/standard/ExecuteSQL.java | 2 +- .../processors/standard/GenerateTableFetch.java | 2 +- .../processors/standard/ListDatabaseTables.java | 2 +- .../processors/standard/PutDatabaseRecord.java | 5 +++-- .../apache/nifi/processors/standard/PutSQL.java | 6 ++++-- .../processors/standard/QueryDatabaseTable.java | 2 +- .../java/org/apache/nifi/dbcp/DBCPService.java | 19 ++++++++++++++++++- 15 files changed, 41 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/099bfcdf/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/PartialFunctions.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/PartialFunctions.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/PartialFunctions.java index 8332289..7b969b0 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/PartialFunctions.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/PartialFunctions.java @@ -32,7 +32,7 @@ public class PartialFunctions { @FunctionalInterface public interface InitConnection<FC, C> { - C apply(ProcessContext context, ProcessSession session, FC functionContext) throws ProcessException; + C apply(ProcessContext context, ProcessSession session, FC functionContext, FlowFile flowFile) throws ProcessException; } @FunctionalInterface http://git-wip-us.apache.org/repos/asf/nifi/blob/099bfcdf/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/Put.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/Put.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/Put.java index 790f48a..80b8088 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/Put.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/Put.java @@ -93,7 +93,8 @@ public class Put<FC, C extends AutoCloseable> { return; } - try (C connection = initConnection.apply(context, session, functionContext)) { + // Only pass in a flow file if there is a single one present + try (C connection = initConnection.apply(context, session, functionContext, flowFiles.size() == 1 ? flowFiles.get(0) : null)) { try { // Execute the core function. http://git-wip-us.apache.org/repos/asf/nifi/blob/099bfcdf/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScript.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScript.java b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScript.java index ce89bdd..c35ab39 100644 --- a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScript.java +++ b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScript.java @@ -336,7 +336,7 @@ public class ExecuteGroovyScript extends AbstractProcessor { private void onInitSQL(HashMap SQL) throws SQLException { for (Map.Entry e : (Set<Map.Entry>) SQL.entrySet()) { DBCPService s = (DBCPService) e.getValue(); - OSql sql = new OSql(s.getConnection()); + OSql sql = new OSql(s.getConnection(Collections.emptyMap())); //try to set autocommit to false try { if (sql.getConnection().getAutoCommit()) { http://git-wip-us.apache.org/repos/asf/nifi/blob/099bfcdf/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_ctl_01_access.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_ctl_01_access.groovy b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_ctl_01_access.groovy index f788fbb..858b8c4 100644 --- a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_ctl_01_access.groovy +++ b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_ctl_01_access.groovy @@ -17,7 +17,7 @@ //just check that it's possible to access controller services def ff=session.create() -def con=CTL.mydbcp.getConnection() +def con=CTL.mydbcp.getConnection([:]) assert con instanceof java.sql.Connection con.close(); ff.write('UTF-8', 'OK') http://git-wip-us.apache.org/repos/asf/nifi/blob/099bfcdf/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java index d066967..e053a9a 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java @@ -201,9 +201,9 @@ public class PutHiveQL extends AbstractHiveQLProcessor { } } - private InitConnection<FunctionContext, Connection> initConnection = (context, session, fc) -> { + private InitConnection<FunctionContext, Connection> initConnection = (context, session, fc, ff) -> { final HiveDBCPService dbcpService = context.getProperty(HIVE_DBCP_SERVICE).asControllerService(HiveDBCPService.class); - final Connection connection = dbcpService.getConnection(); + final Connection connection = dbcpService.getConnection(ff == null ? Collections.emptyMap() : ff.getAttributes()); fc.connectionUrl = dbcpService.getConnectionURL(); return connection; }; http://git-wip-us.apache.org/repos/asf/nifi/blob/099bfcdf/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java index 2526ef5..74855a1 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java @@ -308,7 +308,7 @@ public class SelectHiveQL extends AbstractHiveQLProcessor { final boolean escape = context.getProperty(HIVEQL_CSV_HEADER).asBoolean(); final String fragmentIdentifier = UUID.randomUUID().toString(); - try (final Connection con = dbcpService.getConnection(); + try (final Connection con = dbcpService.getConnection(fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes()); final Statement st = (flowbased ? con.prepareStatement(selectQuery) : con.createStatement()) ) { http://git-wip-us.apache.org/repos/asf/nifi/blob/099bfcdf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java index ef8dd0a..0210739 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java @@ -46,6 +46,7 @@ import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.List; @@ -260,7 +261,7 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact final String sqlQuery = context.getProperty(SQL_QUERY).evaluateAttributeExpressions().getValue(); final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue()); - try (final Connection con = dbcpService.getConnection(); + try (final Connection con = dbcpService.getConnection(flowFile == null ? Collections.emptyMap() : flowFile.getAttributes()); final Statement st = con.createStatement()) { // Try a query that returns no rows, for the purposes of getting metadata about the columns. It is possible http://git-wip-us.apache.org/repos/asf/nifi/blob/099bfcdf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java index 272a3ff..9ac6b18 100755 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java @@ -28,6 +28,7 @@ import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Types; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -315,7 +316,7 @@ public class ConvertJSONToSQL extends AbstractProcessor { if (schema == null) { // No schema exists for this table yet. Query the database to determine the schema and put it into the cache. final DBCPService dbcpService = context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class); - try (final Connection conn = dbcpService.getConnection()) { + try (final Connection conn = dbcpService.getConnection(flowFile == null ? Collections.emptyMap() : flowFile.getAttributes())) { schema = TableSchema.from(conn, catalog, schemaName, tableName, translateFieldNames, includePrimaryKeys); schemaCache.put(schemaKey, schema); } catch (final SQLException e) { http://git-wip-us.apache.org/repos/asf/nifi/blob/099bfcdf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java index 13be8d5..ac93feb 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java @@ -216,7 +216,7 @@ public class ExecuteSQL extends AbstractProcessor { } int resultCount=0; - try (final Connection con = dbcpService.getConnection(); + try (final Connection con = dbcpService.getConnection(fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes()); final PreparedStatement st = con.prepareStatement(selectQuery)) { st.setQueryTimeout(queryTimeout); // timeout in seconds http://git-wip-us.apache.org/repos/asf/nifi/blob/099bfcdf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java index 52a815d..a2d01e9 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java @@ -277,7 +277,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { final String selectQuery = dbAdapter.getSelectStatement(tableName, columnsClause, whereClause, null, null, null); long rowCount = 0; - try (final Connection con = dbcpService.getConnection(); + try (final Connection con = dbcpService.getConnection(finalFileToProcess == null ? Collections.emptyMap() : finalFileToProcess.getAttributes()); final Statement st = con.createStatement()) { final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.SECONDS).intValue(); http://git-wip-us.apache.org/repos/asf/nifi/blob/099bfcdf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java index d594913..f9116c5 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java @@ -225,7 +225,7 @@ public class ListDatabaseTables extends AbstractProcessor { throw new ProcessException(ioe); } - try (final Connection con = dbcpService.getConnection()) { + try (final Connection con = dbcpService.getConnection(Collections.emptyMap())) { DatabaseMetaData dbMetaData = con.getMetaData(); ResultSet rs = dbMetaData.getTables(catalog, schemaPattern, tableNamePattern, tableTypes); http://git-wip-us.apache.org/repos/asf/nifi/blob/099bfcdf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java index 9b236fe..ca79d9b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java @@ -329,8 +329,9 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor { .build(); } - private final PartialFunctions.InitConnection<FunctionContext, Connection> initConnection = (c, s, fc) -> { - final Connection connection = c.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class).getConnection(); + private final PartialFunctions.InitConnection<FunctionContext, Connection> initConnection = (c, s, fc, ff) -> { + final Connection connection = c.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class) + .getConnection(ff == null ? Collections.emptyMap() : ff.getAttributes()); try { fc.originalAutoCommit = connection.getAutoCommit(); connection.setAutoCommit(false); http://git-wip-us.apache.org/repos/asf/nifi/blob/099bfcdf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java index 55e4c5f..9957c2e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java @@ -64,6 +64,7 @@ import java.sql.SQLNonTransientException; import java.sql.Statement; import java.util.ArrayList; import java.util.BitSet; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; @@ -233,8 +234,9 @@ public class PutSQL extends AbstractSessionFactoryProcessor { return poll.getFlowFiles(); }; - private final PartialFunctions.InitConnection<FunctionContext, Connection> initConnection = (c, s, fc) -> { - final Connection connection = c.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class).getConnection(); + private final PartialFunctions.InitConnection<FunctionContext, Connection> initConnection = (c, s, fc, ff) -> { + final Connection connection = c.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class) + .getConnection(ff == null ? Collections.emptyMap() : ff.getAttributes()); try { fc.originalAutoCommit = connection.getAutoCommit(); connection.setAutoCommit(false); http://git-wip-us.apache.org/repos/asf/nifi/blob/099bfcdf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java index 17e47f5..1e8750f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java @@ -286,7 +286,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { final StopWatch stopWatch = new StopWatch(true); final String fragmentIdentifier = UUID.randomUUID().toString(); - try (final Connection con = dbcpService.getConnection(); + try (final Connection con = dbcpService.getConnection(Collections.emptyMap()); final Statement st = con.createStatement()) { if (fetchSize != null && fetchSize > 0) { http://git-wip-us.apache.org/repos/asf/nifi/blob/099bfcdf/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-api/src/main/java/org/apache/nifi/dbcp/DBCPService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-api/src/main/java/org/apache/nifi/dbcp/DBCPService.java b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-api/src/main/java/org/apache/nifi/dbcp/DBCPService.java index 9cfe096..ffc9b3a 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-api/src/main/java/org/apache/nifi/dbcp/DBCPService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-api/src/main/java/org/apache/nifi/dbcp/DBCPService.java @@ -17,6 +17,7 @@ package org.apache.nifi.dbcp; import java.sql.Connection; +import java.util.Map; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; @@ -30,5 +31,21 @@ import org.apache.nifi.processor.exception.ProcessException; @Tags({"dbcp", "jdbc", "database", "connection", "pooling", "store"}) @CapabilityDescription("Provides Database Connection Pooling Service. Connections can be asked from pool and returned after usage.") public interface DBCPService extends ControllerService { - public Connection getConnection() throws ProcessException; + Connection getConnection() throws ProcessException; + + /** + * Allows a Map of attributes to be passed to the DBCPService for use in configuration, etc. + * An implementation will want to override getConnection() to return getConnection(Collections.emptyMap()), + * and override this method (possibly with its existing getConnection() implementation). + * @param attributes a Map of attributes to be passed to the DBCPService. The use of these + * attributes is implementation-specific, and the source of the attributes + * is processor-specific + * @return a Connection from the specifed/configured connection pool(s) + * @throws ProcessException if an error occurs while getting a connection + */ + default Connection getConnection(Map<String,String> attributes) throws ProcessException { + // default implementation (for backwards compatibility) is to call getConnection() + // without attributes + return getConnection(); + } }