Repository: nifi Updated Branches: refs/heads/master 63f55d05b -> a628aced6
NIFI-5790: Exposes 6 commons-dbcp options in DBCPConnectionPool Signed-off-by: Peter Wicks <[email protected]> This Closes #3133 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a628aced Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a628aced Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a628aced Branch: refs/heads/master Commit: a628aced6b32b16eb4509094d214266938a915b9 Parents: 63f55d0 Author: Colin Dean <[email protected]> Authored: Mon Nov 5 16:18:21 2018 -0500 Committer: Peter Wicks <[email protected]> Committed: Fri Nov 9 11:29:11 2018 -0700 ---------------------------------------------------------------------- .../apache/nifi/dbcp/DBCPConnectionPool.java | 126 +++++++++++++++- .../org/apache/nifi/dbcp/DBCPServiceTest.java | 150 +++++++++++++++++++ 2 files changed, 275 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/a628aced/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java index 01205e2..5228496 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java @@ -17,12 +17,14 @@ package org.apache.nifi.dbcp; import org.apache.commons.dbcp2.BasicDataSource; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnDisabled; import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; @@ -59,6 +61,32 @@ import java.util.regex.Pattern; + "Note that no flow file input (attributes, e.g.) is available for use in Expression Language constructs for these properties.") public class DBCPConnectionPool extends AbstractControllerService implements DBCPService { + /** + * Copied from {@link GenericObjectPoolConfig.DEFAULT_MIN_IDLE} in Commons-DBCP 2.5.0 + */ + private static final String DEFAULT_MIN_IDLE = "0"; + /** + * Copied from {@link GenericObjectPoolConfig.DEFAULT_MAX_IDLE} in Commons-DBCP 2.5.0 + */ + private static final String DEFAULT_MAX_IDLE = "8"; + /** + * Copied from private variable {@link BasicDataSource.maxConnLifetimeMillis} in Commons-DBCP 2.5.0 + */ + private static final String DEFAULT_MAX_CONN_LIFETIME = "-1"; + /** + * Copied from {@link GenericObjectPoolConfig.DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS} in Commons-DBCP 2.5.0 + */ + private static final String DEFAULT_EVICTION_RUN_PERIOD = String.valueOf(-1L); + /** + * Copied from {@link GenericObjectPoolConfig.DEFAULT_MIN_EVICTABLE_IDLE_TIME_MILLIS} in Commons-DBCP 2.5.0 + * and converted from 1800000L to "1800000 millis" to "30 mins" + */ + private static final String DEFAULT_MIN_EVICTABLE_IDLE_TIME = "30 mins"; + /** + * Copied from {@link GenericObjectPoolConfig.DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME_MILLIS} in Commons-DBCP 2.5.0 + */ + private static final String DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME = String.valueOf(-1L); + private static final Validator CUSTOM_TIME_PERIOD_VALIDATOR = new Validator() { private final Pattern TIME_DURATION_PATTERN = Pattern.compile(FormatUtils.TIME_DURATION_REGEX); @@ -164,6 +192,77 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); + public static final PropertyDescriptor MIN_IDLE = new PropertyDescriptor.Builder() + .displayName("Minimum Idle Connections") + .name("dbcp-mim-idle-conns") + .description("The minimum number of connections that can remain idle in the pool, without extra ones being " + + "created, or zero to create none.") + .defaultValue(DEFAULT_MIN_IDLE) + .required(false) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor MAX_IDLE = new PropertyDescriptor.Builder() + .displayName("Max Idle Connections") + .name("dbcp-max-idle-conns") + .description("The maximum number of connections that can remain idle in the pool, without extra ones being " + + "released, or negative for no limit.") + .defaultValue(DEFAULT_MAX_IDLE) + .required(false) + .addValidator(StandardValidators.INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor MAX_CONN_LIFETIME = new PropertyDescriptor.Builder() + .displayName("Max Connection Lifetime") + .name("dbcp-max-conn-lifetime") + .description("The maximum lifetime in milliseconds of a connection. After this time is exceeded the " + + "connection will fail the next activation, passivation or validation test. A value of zero or less " + + "means the connection has an infinite lifetime.") + .defaultValue(DEFAULT_MAX_CONN_LIFETIME) + .required(false) + .addValidator(CUSTOM_TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor EVICTION_RUN_PERIOD = new PropertyDescriptor.Builder() + .displayName("Time Between Eviction Runs") + .name("dbcp-time-between-eviction-runs") + .description("The number of milliseconds to sleep between runs of the idle connection evictor thread. When " + + "non-positive, no idle connection evictor thread will be run.") + .defaultValue(DEFAULT_EVICTION_RUN_PERIOD) + .required(false) + .addValidator(CUSTOM_TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor MIN_EVICTABLE_IDLE_TIME = new PropertyDescriptor.Builder() + .displayName("Minimum Evictable Idle Time") + .name("dbcp-min-evictable-idle-time") + .description("The minimum amount of time a connection may sit idle in the pool before it is eligible for eviction.") + .defaultValue(DEFAULT_MIN_EVICTABLE_IDLE_TIME) + .required(false) + .addValidator(CUSTOM_TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor SOFT_MIN_EVICTABLE_IDLE_TIME = new PropertyDescriptor.Builder() + .displayName("Soft Minimum Evictable Idle Time") + .name("dbcp-soft-min-evictable-idle-time") + .description("The minimum amount of time a connection may sit idle in the pool before it is eligible for " + + "eviction by the idle connection evictor, with the extra condition that at least a minimum number of" + + " idle connections remain in the pool. When the not-soft version of this option is set to a positive" + + " value, it is examined first by the idle connection evictor: when idle connections are visited by " + + "the evictor, idle time is first compared against it (without considering the number of idle " + + "connections in the pool) and then against this soft option, including the minimum idle connections " + + "constraint.") + .defaultValue(DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME) + .required(false) + .addValidator(CUSTOM_TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + private static final List<PropertyDescriptor> properties; static { @@ -176,6 +275,12 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC props.add(MAX_WAIT_TIME); props.add(MAX_TOTAL_CONNECTIONS); props.add(VALIDATION_QUERY); + props.add(MIN_IDLE); + props.add(MAX_IDLE); + props.add(MAX_CONN_LIFETIME); + props.add(EVICTION_RUN_PERIOD); + props.add(MIN_EVICTABLE_IDLE_TIME); + props.add(SOFT_MIN_EVICTABLE_IDLE_TIME); properties = Collections.unmodifiableList(props); } @@ -221,7 +326,13 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC final String passw = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue(); final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).asInteger(); final String validationQuery = context.getProperty(VALIDATION_QUERY).evaluateAttributeExpressions().getValue(); - final Long maxWaitMillis = "-1".equals(context.getProperty(MAX_WAIT_TIME).getValue()) ? -1 : context.getProperty(MAX_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS); + final Long maxWaitMillis = extractMillisWithInfinite(context.getProperty(MAX_WAIT_TIME)); + final Integer minIdle = context.getProperty(MIN_IDLE).asInteger(); + final Integer maxIdle = context.getProperty(MAX_IDLE).asInteger(); + final Long maxConnLifetimeMillis = extractMillisWithInfinite(context.getProperty(MAX_CONN_LIFETIME)); + final Long timeBetweenEvictionRunsMillis = extractMillisWithInfinite(context.getProperty(EVICTION_RUN_PERIOD)); + final Long minEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(MIN_EVICTABLE_IDLE_TIME)); + final Long softMinEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(SOFT_MIN_EVICTABLE_IDLE_TIME)); dataSource = new BasicDataSource(); dataSource.setDriverClassName(drv); @@ -234,6 +345,12 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC dataSource.setMaxWaitMillis(maxWaitMillis); dataSource.setMaxTotal(maxTotal); + dataSource.setMinIdle(minIdle); + dataSource.setMaxIdle(maxIdle); + dataSource.setMaxConnLifetimeMillis(maxConnLifetimeMillis); + dataSource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis); + dataSource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis); + dataSource.setSoftMinEvictableIdleTimeMillis(softMinEvictableIdleTimeMillis); if (validationQuery!=null && !validationQuery.isEmpty()) { dataSource.setValidationQuery(validationQuery); @@ -250,6 +367,10 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC } + private Long extractMillisWithInfinite(PropertyValue prop) { + return "-1".equals(prop.getValue()) ? -1 : prop.asTimePeriod(TimeUnit.MILLISECONDS); + } + /** * using Thread.currentThread().getContextClassLoader(); will ensure that you are using the ClassLoader for you NAR. * @@ -314,4 +435,7 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC return "DBCPConnectionPool[id=" + getIdentifier() + "]"; } + BasicDataSource getDataSource() { + return dataSource; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/a628aced/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/DBCPServiceTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/DBCPServiceTest.java b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/DBCPServiceTest.java index 1c84a7f..4d32b33 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/DBCPServiceTest.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/DBCPServiceTest.java @@ -41,6 +41,7 @@ import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.ArrayList; import java.util.HashMap; import java.util.Map; @@ -94,6 +95,155 @@ public class DBCPServiceTest { } /** + * Checks validity of idle limit and time settings including a default + */ + @Test + public void testIdleConnectionsSettings() throws InitializationException { + final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); + final DBCPConnectionPool service = new DBCPConnectionPool(); + runner.addControllerService("test-good1", service); + + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + // set embedded Derby database connection url + runner.setProperty(service, DBCPConnectionPool.DATABASE_URL, "jdbc:derby:" + DB_LOCATION + ";create=true"); + runner.setProperty(service, DBCPConnectionPool.DB_USER, "tester"); + runner.setProperty(service, DBCPConnectionPool.DB_PASSWORD, "testerp"); + runner.setProperty(service, DBCPConnectionPool.DB_DRIVERNAME, "org.apache.derby.jdbc.EmbeddedDriver"); + runner.setProperty(service, DBCPConnectionPool.MAX_WAIT_TIME, "-1"); + runner.setProperty(service, DBCPConnectionPool.MAX_IDLE, "2"); + runner.setProperty(service, DBCPConnectionPool.MAX_CONN_LIFETIME, "1 secs"); + runner.setProperty(service, DBCPConnectionPool.EVICTION_RUN_PERIOD, "1 secs"); + runner.setProperty(service, DBCPConnectionPool.MIN_EVICTABLE_IDLE_TIME, "1 secs"); + runner.setProperty(service, DBCPConnectionPool.SOFT_MIN_EVICTABLE_IDLE_TIME, "1 secs"); + + runner.enableControllerService(service); + runner.assertValid(service); + } + + @Test + public void testMinIdleCannotBeNegative() throws InitializationException { + final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); + final DBCPConnectionPool service = new DBCPConnectionPool(); + runner.addControllerService("test-good1", service); + + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + // set embedded Derby database connection url + runner.setProperty(service, DBCPConnectionPool.DATABASE_URL, "jdbc:derby:" + DB_LOCATION + ";create=true"); + runner.setProperty(service, DBCPConnectionPool.DB_USER, "tester"); + runner.setProperty(service, DBCPConnectionPool.DB_PASSWORD, "testerp"); + runner.setProperty(service, DBCPConnectionPool.DB_DRIVERNAME, "org.apache.derby.jdbc.EmbeddedDriver"); + runner.setProperty(service, DBCPConnectionPool.MAX_WAIT_TIME, "-1"); + runner.setProperty(service, DBCPConnectionPool.MIN_IDLE, "-1"); + + runner.assertNotValid(service); + } + + /** + * Checks to ensure that settings have been passed down into the DBCP + */ + @Test + public void testIdleSettingsAreSet() throws InitializationException, SQLException { + final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); + final DBCPConnectionPool service = new DBCPConnectionPool(); + runner.addControllerService("test-good1", service); + + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + // set embedded Derby database connection url + runner.setProperty(service, DBCPConnectionPool.DATABASE_URL, "jdbc:derby:" + DB_LOCATION + ";create=true"); + runner.setProperty(service, DBCPConnectionPool.DB_USER, "tester"); + runner.setProperty(service, DBCPConnectionPool.DB_PASSWORD, "testerp"); + runner.setProperty(service, DBCPConnectionPool.DB_DRIVERNAME, "org.apache.derby.jdbc.EmbeddedDriver"); + runner.setProperty(service, DBCPConnectionPool.MAX_WAIT_TIME, "-1"); + runner.setProperty(service, DBCPConnectionPool.MAX_IDLE, "6"); + runner.setProperty(service, DBCPConnectionPool.MIN_IDLE, "4"); + runner.setProperty(service, DBCPConnectionPool.MAX_CONN_LIFETIME, "1 secs"); + runner.setProperty(service, DBCPConnectionPool.EVICTION_RUN_PERIOD, "1 secs"); + runner.setProperty(service, DBCPConnectionPool.MIN_EVICTABLE_IDLE_TIME, "1 secs"); + runner.setProperty(service, DBCPConnectionPool.SOFT_MIN_EVICTABLE_IDLE_TIME, "1 secs"); + + runner.enableControllerService(service); + + Assert.assertEquals(6, service.getDataSource().getMaxIdle()); + Assert.assertEquals(4, service.getDataSource().getMinIdle()); + Assert.assertEquals(1000, service.getDataSource().getMaxConnLifetimeMillis()); + Assert.assertEquals(1000, service.getDataSource().getTimeBetweenEvictionRunsMillis()); + Assert.assertEquals(1000, service.getDataSource().getMinEvictableIdleTimeMillis()); + Assert.assertEquals(1000, service.getDataSource().getSoftMinEvictableIdleTimeMillis()); + + service.getDataSource().close(); + } + + /** + * Creates a few connections and step closes them to see what happens + */ + @Test + public void testIdle() throws InitializationException, SQLException, InterruptedException { + final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); + final DBCPConnectionPool service = new DBCPConnectionPool(); + runner.addControllerService("test-good1", service); + + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + // set embedded Derby database connection url + runner.setProperty(service, DBCPConnectionPool.DATABASE_URL, "jdbc:derby:" + DB_LOCATION + ";create=true"); + runner.setProperty(service, DBCPConnectionPool.DB_USER, "tester"); + runner.setProperty(service, DBCPConnectionPool.DB_PASSWORD, "testerp"); + runner.setProperty(service, DBCPConnectionPool.DB_DRIVERNAME, "org.apache.derby.jdbc.EmbeddedDriver"); + runner.setProperty(service, DBCPConnectionPool.MAX_WAIT_TIME, "-1"); + runner.setProperty(service, DBCPConnectionPool.MAX_IDLE, "4"); + runner.setProperty(service, DBCPConnectionPool.MIN_IDLE, "1"); + runner.setProperty(service, DBCPConnectionPool.MAX_CONN_LIFETIME, "1000 millis"); + runner.setProperty(service, DBCPConnectionPool.EVICTION_RUN_PERIOD, "100 millis"); + runner.setProperty(service, DBCPConnectionPool.MIN_EVICTABLE_IDLE_TIME, "100 millis"); + runner.setProperty(service, DBCPConnectionPool.SOFT_MIN_EVICTABLE_IDLE_TIME, "100 millis"); + + runner.enableControllerService(service); + + ArrayList<Connection> connections = new ArrayList<>(); + for (int i = 0; i < 6; i++) { + connections.add(service.getConnection()); + } + + Assert.assertEquals(6, service.getDataSource().getNumActive()); + + connections.get(0).close(); + Assert.assertEquals(5, service.getDataSource().getNumActive()); + Assert.assertEquals(1, service.getDataSource().getNumIdle()); + + connections.get(1).close(); + connections.get(2).close(); + connections.get(3).close(); + //now at max idle + Assert.assertEquals(2, service.getDataSource().getNumActive()); + Assert.assertEquals(4, service.getDataSource().getNumIdle()); + + //now a connection should get closed for real so that numIdle does not exceed maxIdle + connections.get(4).close(); + Assert.assertEquals(4, service.getDataSource().getNumIdle()); + Assert.assertEquals(1, service.getDataSource().getNumActive()); + + connections.get(5).close(); + Assert.assertEquals(4, service.getDataSource().getNumIdle()); + Assert.assertEquals(0, service.getDataSource().getNumActive()); + + Thread.sleep(500); + Assert.assertEquals(1, service.getDataSource().getNumIdle()); + + service.getDataSource().close(); + } + + /** * Test database connection using Derby. Connect, create table, insert, select, drop table. * */
