This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new b8af44d NIFI-6871: Added HikariCPConnectionPool controller service
b8af44d is described below
commit b8af44d81bff403654289e7837237088e4dc7a40
Author: Matthew Burgess <[email protected]>
AuthorDate: Tue Nov 23 11:11:14 2021 -0500
NIFI-6871: Added HikariCPConnectionPool controller service
This closes #3890
Signed-off-by: David Handermann <[email protected]>
---
.../nifi-dbcp-service-nar/pom.xml | 5 +
.../org.apache.nifi.controller.ControllerService | 2 +-
.../nifi-hikari-dbcp-service/pom.xml | 109 +++++++
.../apache/nifi/dbcp/HikariCPConnectionPool.java | 362 +++++++++++++++++++++
.../org.apache.nifi.controller.ControllerService | 4 +-
.../nifi/dbcp/HikariCPConnectionPoolTest.java | 257 +++++++++++++++
.../nifi-dbcp-service-bundle/pom.xml | 1 +
7 files changed, 736 insertions(+), 4 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service-nar/pom.xml
b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service-nar/pom.xml
index 18380c4..37cbe63 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service-nar/pom.xml
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service-nar/pom.xml
@@ -38,5 +38,10 @@
<artifactId>nifi-dbcp-service</artifactId>
<version>1.16.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-hikari-dbcp-service</artifactId>
+ <version>1.16.0-SNAPSHOT</version>
+ </dependency>
</dependencies>
</project>
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index f877996..57bc3c6 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -14,4 +14,4 @@
# limitations under the License.
org.apache.nifi.dbcp.DBCPConnectionPool
org.apache.nifi.dbcp.DBCPConnectionPoolLookup
-org.apache.nifi.record.sink.db.DatabaseRecordSink
\ No newline at end of file
+org.apache.nifi.record.sink.db.DatabaseRecordSink
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-hikari-dbcp-service/pom.xml
b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-hikari-dbcp-service/pom.xml
new file mode 100644
index 0000000..7c9c85e
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-hikari-dbcp-service/pom.xml
@@ -0,0 +1,109 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-dbcp-service-bundle</artifactId>
+ <version>1.16.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>nifi-hikari-dbcp-service</artifactId>
+ <packaging>jar</packaging>
+ <properties>
+ <derby.version>10.14.2.0</derby.version>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-dbcp-service-api</artifactId>
+ <version>1.16.0-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-utils</artifactId>
+ <version>1.16.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-service-utils</artifactId>
+ <version>1.16.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-security-kerberos</artifactId>
+ <version>1.16.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-kerberos-user-service-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.zaxxer</groupId>
+ <artifactId>HikariCP</artifactId>
+ <version>4.0.3</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mock</artifactId>
+ <version>1.16.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jcl-over-slf4j</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.derby</groupId>
+ <artifactId>derby</artifactId>
+ <version>${derby.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.derby</groupId>
+ <artifactId>derbynet</artifactId>
+ <version>${derby.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.derby</groupId>
+ <artifactId>derbytools</artifactId>
+ <version>${derby.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.derby</groupId>
+ <artifactId>derbyclient</artifactId>
+ <version>${derby.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <configuration>
+ <excludes combine.children="append">
+ <exclude>src/test/resources/fake.keytab</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-hikari-dbcp-service/src/main/java/org/apache/nifi/dbcp/HikariCPConnectionPool.java
b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-hikari-dbcp-service/src/main/java/org/apache/nifi/dbcp/HikariCPConnectionPool.java
new file mode 100644
index 0000000..040c6cb
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-hikari-dbcp-service/src/main/java/org/apache/nifi/dbcp/HikariCPConnectionPool.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.dbcp;
+
+import com.zaxxer.hikari.HikariDataSource;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.resource.ResourceCardinality;
+import org.apache.nifi.components.resource.ResourceType;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.kerberos.KerberosUserService;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.security.krb.KerberosAction;
+import org.apache.nifi.security.krb.KerberosUser;
+
+import javax.security.auth.login.LoginException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Implementation of Database Connection Pooling Service. HikariCP is used for
connection pooling functionality.
+ */
+@RequiresInstanceClassLoading
+@Tags({"dbcp", "hikari", "jdbc", "database", "connection", "pooling", "store"})
+@CapabilityDescription("Provides Database Connection Pooling Service based on
HikariCP. Connections can be asked from pool and returned after usage.")
+@DynamicProperty(name = "JDBC property name", value = "JDBC property value",
expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY,
+ description = "Specifies a property name and value to be set on the
JDBC connection(s). "
+ + "If Expression Language is used, evaluation will be
performed upon the controller service being enabled. "
+ + "Note that no flow file input (attributes, e.g.) is
available for use in Expression Language constructs for these properties.")
+public class HikariCPConnectionPool extends AbstractControllerService
implements DBCPService {
+ /**
+ * Property Name Prefix for Sensitive Dynamic Properties
+ */
+ protected static final String SENSITIVE_PROPERTY_PREFIX = "SENSITIVE.";
+ protected static final long INFINITE_MILLISECONDS = -1L;
+
+ private static final String DEFAULT_TOTAL_CONNECTIONS = "10";
+ private static final String DEFAULT_MAX_CONN_LIFETIME = "-1";
+
+ public static final PropertyDescriptor DATABASE_URL = new
PropertyDescriptor.Builder()
+ .name("hikaricp-connection-url")
+ .displayName("Database Connection URL")
+ .description("A database connection URL used to connect to a
database. May contain database system name, host, port, database name and some
parameters."
+ + " The exact syntax of a database connection URL is
specified by your DBMS.")
+ .defaultValue(null)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ public static final PropertyDescriptor DB_DRIVERNAME = new
PropertyDescriptor.Builder()
+ .name("hikaricp-driver-classname")
+ .displayName("Database Driver Class Name")
+ .description("The fully-qualified class name of the JDBC driver.
Example: com.mysql.jdbc.Driver")
+ .defaultValue(null)
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ public static final PropertyDescriptor DB_DRIVER_LOCATION = new
PropertyDescriptor.Builder()
+ .name("hikaricp-driver-locations")
+ .displayName("Database Driver Location(s)")
+ .description("Comma-separated list of files/folders and/or URLs
containing the driver JAR and its dependencies (if any). For example
'/var/tmp/mariadb-java-client-1.1.7.jar'")
+ .defaultValue(null)
+ .required(false)
+ .identifiesExternalResource(ResourceCardinality.MULTIPLE,
ResourceType.FILE, ResourceType.DIRECTORY, ResourceType.URL)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .dynamicallyModifiesClasspath(true)
+ .build();
+
+ public static final PropertyDescriptor DB_USER = new
PropertyDescriptor.Builder()
+ .name("hikaricp-username")
+ .displayName("Database User")
+ .description("Database user name")
+ .defaultValue(null)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ public static final PropertyDescriptor DB_PASSWORD = new
PropertyDescriptor.Builder()
+ .name("hikaricp-password")
+ .displayName("Password")
+ .description("The password for the database user")
+ .defaultValue(null)
+ .required(false)
+ .sensitive(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ public static final PropertyDescriptor MAX_WAIT_TIME = new
PropertyDescriptor.Builder()
+ .name("hikaricp-max-wait-time")
+ .displayName("Max Wait Time")
+ .description("The maximum amount of time that the pool will wait
(when there are no available connections) "
+ + " for a connection to be returned before failing, or 0
<time units> to wait indefinitely. ")
+ .defaultValue("500 millis")
+ .required(true)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .sensitive(false)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ public static final PropertyDescriptor MAX_TOTAL_CONNECTIONS = new
PropertyDescriptor.Builder()
+ .name("hikaricp-max-total-conns")
+ .displayName("Max Total Connections")
+ .description("This property controls the maximum size that the
pool is allowed to reach, including both idle and in-use connections. Basically
this value will determine the "
+ + "maximum number of actual connections to the database
backend. A reasonable value for this is best determined by your execution
environment. When the pool reaches "
+ + "this size, and no idle connections are available, the
service will block for up to connectionTimeout milliseconds before timing out.")
+ .defaultValue(DEFAULT_TOTAL_CONNECTIONS)
+ .required(true)
+ .addValidator(StandardValidators.INTEGER_VALIDATOR)
+ .sensitive(false)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ public static final PropertyDescriptor VALIDATION_QUERY = new
PropertyDescriptor.Builder()
+ .name("hikaricp-validation-query")
+ .displayName("Validation Query")
+ .description("Validation Query used to validate connections before
returning them. "
+ + "When connection is invalid, it gets dropped and new
valid connection will be returned. "
+ + "NOTE: Using validation might have some performance
penalty.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ public static final PropertyDescriptor MIN_IDLE = new
PropertyDescriptor.Builder()
+ .name("hikaricp-min-idle-conns")
+ .displayName("Minimum Idle Connections")
+ .description("This property controls the minimum number of idle
connections that HikariCP tries to maintain in the pool. If the idle
connections dip below this value and total "
+ + "connections in the pool are less than 'Max Total
Connections', HikariCP will make a best effort to add additional connections
quickly and efficiently. It is recommended "
+ + "that this property to be set equal to 'Max Total
Connections'.")
+ .defaultValue(DEFAULT_TOTAL_CONNECTIONS)
+ .required(true)
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ public static final PropertyDescriptor MAX_CONN_LIFETIME = new
PropertyDescriptor.Builder()
+ .name("hikaricp-max-conn-lifetime")
+ .displayName("Max Connection Lifetime")
+ .description("The maximum lifetime in milliseconds of a
connection. After this time is exceeded the " +
+ "connection will fail the next activation, passivation or
validation test. A value of zero or less " +
+ "means the connection has an infinite lifetime.")
+ .defaultValue(DEFAULT_MAX_CONN_LIFETIME)
+ .required(false)
+ .addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ public static final PropertyDescriptor KERBEROS_USER_SERVICE = new
PropertyDescriptor.Builder()
+ .name("hikaricp-kerberos-user-service")
+ .displayName("Kerberos User Service")
+ .description("Specifies the Kerberos User Controller Service that
should be used for authenticating with Kerberos")
+ .identifiesControllerService(KerberosUserService.class)
+ .required(false)
+ .build();
+
+ private static final List<PropertyDescriptor> properties;
+
+ static {
+ final List<PropertyDescriptor> props = new ArrayList<>();
+ props.add(DATABASE_URL);
+ props.add(DB_DRIVERNAME);
+ props.add(DB_DRIVER_LOCATION);
+ props.add(KERBEROS_USER_SERVICE);
+ props.add(DB_USER);
+ props.add(DB_PASSWORD);
+ props.add(MAX_WAIT_TIME);
+ props.add(MAX_TOTAL_CONNECTIONS);
+ props.add(VALIDATION_QUERY);
+ props.add(MIN_IDLE);
+ props.add(MAX_CONN_LIFETIME);
+
+ properties = Collections.unmodifiableList(props);
+ }
+
+ private volatile HikariDataSource dataSource;
+ private volatile KerberosUser kerberosUser;
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final
String propertyDescriptorName) {
+ final PropertyDescriptor.Builder builder = new
PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .required(false)
+ .dynamic(true)
+
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING,
true))
+
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR);
+
+ if (propertyDescriptorName.startsWith(SENSITIVE_PROPERTY_PREFIX)) {
+
builder.sensitive(true).expressionLanguageSupported(ExpressionLanguageScope.NONE);
+ } else {
+
builder.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY);
+ }
+
+ return builder.build();
+ }
+
+ /**
+ * Configures connection pool by creating an instance of the
+ * {@link HikariDataSource} based on configuration provided with
+ * {@link ConfigurationContext}.
+ * <p>
+ * This operation makes no guarantees that the actual connection could be
+ * made since the underlying system may still go off-line during normal
+ * operation of the connection pool.
+ *
+ * @param context the configuration context
+ */
+ @OnEnabled
+ public void onConfigured(final ConfigurationContext context) {
+
+ final String driverName =
context.getProperty(DB_DRIVERNAME).evaluateAttributeExpressions().getValue();
+ final String user =
context.getProperty(DB_USER).evaluateAttributeExpressions().getValue();
+ final String passw =
context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue();
+ final String dburl =
context.getProperty(DATABASE_URL).evaluateAttributeExpressions().getValue();
+ final Integer maxTotal =
context.getProperty(MAX_TOTAL_CONNECTIONS).evaluateAttributeExpressions().asInteger();
+ final String validationQuery =
context.getProperty(VALIDATION_QUERY).evaluateAttributeExpressions().getValue();
+ final long maxWaitMillis =
extractMillisWithInfinite(context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions());
+ final int minIdle =
context.getProperty(MIN_IDLE).evaluateAttributeExpressions().asInteger();
+ final long maxConnLifetimeMillis =
extractMillisWithInfinite(context.getProperty(MAX_CONN_LIFETIME).evaluateAttributeExpressions());
+ final KerberosUserService kerberosUserService =
context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
+
+ if (kerberosUserService != null) {
+ kerberosUser = kerberosUserService.createKerberosUser();
+ if (kerberosUser != null) {
+ kerberosUser.login();
+ }
+ }
+
+ dataSource = new HikariDataSource();
+ dataSource.setDriverClassName(driverName);
+ dataSource.setConnectionTimeout(maxWaitMillis);
+ dataSource.setMaximumPoolSize(maxTotal);
+ dataSource.setMinimumIdle(minIdle);
+ dataSource.setMaxLifetime(maxConnLifetimeMillis);
+
+ if (validationQuery != null && !validationQuery.isEmpty()) {
+ dataSource.setConnectionTestQuery(validationQuery);
+ }
+
+ dataSource.setJdbcUrl(dburl);
+ dataSource.setUsername(user);
+ dataSource.setPassword(passw);
+
+ final List<PropertyDescriptor> dynamicProperties =
context.getProperties()
+ .keySet()
+ .stream()
+ .filter(PropertyDescriptor::isDynamic)
+ .collect(Collectors.toList());
+
+ Properties properties = dataSource.getDataSourceProperties();
+ dynamicProperties.forEach((descriptor) -> {
+ final PropertyValue propertyValue =
context.getProperty(descriptor);
+ if (descriptor.isSensitive()) {
+ final String propertyName =
StringUtils.substringAfter(descriptor.getName(), SENSITIVE_PROPERTY_PREFIX);
+ properties.setProperty(propertyName, propertyValue.getValue());
+ } else {
+ properties.setProperty(descriptor.getName(),
propertyValue.evaluateAttributeExpressions().getValue());
+ }
+ });
+ dataSource.setDataSourceProperties(properties);
+
+ }
+
+ private long extractMillisWithInfinite(PropertyValue prop) {
+ return "-1".equals(prop.getValue()) ? INFINITE_MILLISECONDS :
prop.asTimePeriod(TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Shutdown pool, close all open connections.
+ * If a principal is authenticated with a KDC, that principal is logged
out.
+ * <p>
+ * If a @{@link LoginException} occurs while attempting to log out the
@{@link org.apache.nifi.security.krb.KerberosUser},
+ * an attempt will still be made to shut down the pool and close open
connections.
+ *
+ */
+ @OnDisabled
+ public void shutdown() {
+ try {
+ if (kerberosUser != null) {
+ kerberosUser.logout();
+ }
+ } finally {
+ kerberosUser = null;
+ try {
+ if (dataSource != null) {
+ dataSource.close();
+ }
+ } finally {
+ dataSource = null;
+ }
+ }
+ }
+
+ @Override
+ public Connection getConnection() throws ProcessException {
+ try {
+ final Connection con;
+ if (kerberosUser != null) {
+ KerberosAction<Connection> kerberosAction = new
KerberosAction<>(kerberosUser, () -> dataSource.getConnection(), getLogger());
+ con = kerberosAction.execute();
+ } else {
+ con = dataSource.getConnection();
+ }
+ return con;
+ } catch (final SQLException e) {
+ // If using Kerberos, attempt to re-login
+ if (kerberosUser != null) {
+ getLogger().info("Error getting connection, performing
Kerberos re-login");
+ kerberosUser.login();
+ }
+ throw new ProcessException("Connection retrieval failed", e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s[id=%s]", getClass().getSimpleName(),
getIdentifier());
+ }
+
+ HikariDataSource getDataSource() {
+ return dataSource;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-hikari-dbcp-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
similarity index 85%
copy from
nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
copy to
nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-hikari-dbcp-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index f877996..1d4a1ee 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-hikari-dbcp-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -12,6 +12,4 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.nifi.dbcp.DBCPConnectionPool
-org.apache.nifi.dbcp.DBCPConnectionPoolLookup
-org.apache.nifi.record.sink.db.DatabaseRecordSink
\ No newline at end of file
+org.apache.nifi.dbcp.HikariCPConnectionPool
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-hikari-dbcp-service/src/test/java/org/apache/nifi/dbcp/HikariCPConnectionPoolTest.java
b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-hikari-dbcp-service/src/test/java/org/apache/nifi/dbcp/HikariCPConnectionPoolTest.java
new file mode 100644
index 0000000..2896efb
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-hikari-dbcp-service/src/test/java/org/apache/nifi/dbcp/HikariCPConnectionPoolTest.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.dbcp;
+
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Map;
+
+public class HikariCPConnectionPoolTest {
+ private final static String DB_LOCATION = "target/db";
+ private final static String GOOD_CS_NAME = "test-good1";
+ private final static String BAD_CS_NAME = "test-bad1";
+ private final static String EXHAUST_CS_NAME = "test-exhaust";
+
+ private static String originalDerbyStreamErrorFile;
+
+ private TestRunner runner;
+
+ @BeforeAll
+ public static void setupBeforeClass() {
+ originalDerbyStreamErrorFile =
System.getProperty("derby.stream.error.file");
+ System.setProperty("derby.stream.error.file", "target/derby.log");
+ }
+
+ @AfterAll
+ public static void shutdownAfterClass() {
+ if (originalDerbyStreamErrorFile != null) {
+ System.setProperty("derby.stream.error.file",
originalDerbyStreamErrorFile);
+ }
+ }
+
+ @BeforeEach
+ public void setup() {
+ // remove previous test database, if any
+ final File dbLocation = new File(DB_LOCATION);
+ dbLocation.delete();
+
+ runner = TestRunners.newTestRunner(NoOpProcessor.class);
+ }
+
+ /**
+ * Missing property values.
+ */
+ @Test
+ public void testMissingPropertyValues() throws InitializationException {
+ final HikariCPConnectionPool service = new HikariCPConnectionPool();
+ final Map<String, String> properties = new HashMap<>();
+ runner.addControllerService(BAD_CS_NAME, service, properties);
+ runner.assertNotValid(service);
+ }
+
+ /**
+ * Max wait set to -1
+ */
+ @Test
+ public void testMaxWait() throws InitializationException {
+ final HikariCPConnectionPool service = new HikariCPConnectionPool();
+ runner.addControllerService(GOOD_CS_NAME, service);
+
+ setDerbyProperties(service);
+ runner.setProperty(service, HikariCPConnectionPool.MAX_WAIT_TIME, "0
millis");
+
+ runner.enableControllerService(service);
+ runner.assertValid(service);
+ }
+
+ /**
+ * Checks validity of idle limit and time settings including a default
+ */
+ @Test
+ public void testIdleConnectionsSettings() throws InitializationException {
+ final HikariCPConnectionPool service = new HikariCPConnectionPool();
+ runner.addControllerService(GOOD_CS_NAME, service);
+
+ setDerbyProperties(service);
+ runner.setProperty(service, HikariCPConnectionPool.MAX_WAIT_TIME, "0
millis");
+ runner.setProperty(service, HikariCPConnectionPool.MAX_CONN_LIFETIME,
"1 secs");
+
+ runner.enableControllerService(service);
+ runner.assertValid(service);
+ }
+
+ @Test
+ public void testMinIdleCannotBeNegative() throws InitializationException {
+ final HikariCPConnectionPool service = new HikariCPConnectionPool();
+ runner.addControllerService(GOOD_CS_NAME, service);
+
+ setDerbyProperties(service);
+ runner.setProperty(service, HikariCPConnectionPool.MAX_WAIT_TIME, "0
millis");
+ runner.setProperty(service, HikariCPConnectionPool.MIN_IDLE, "-1");
+
+ runner.assertNotValid(service);
+ }
+
+ /**
+ * Checks to ensure that settings have been passed down into the HikariCP
+ */
+ @Test
+ public void testIdleSettingsAreSet() throws InitializationException {
+ final HikariCPConnectionPool service = new HikariCPConnectionPool();
+ runner.addControllerService(GOOD_CS_NAME, service);
+
+ setDerbyProperties(service);
+ runner.setProperty(service, HikariCPConnectionPool.MAX_WAIT_TIME, "0
millis");
+ runner.setProperty(service, HikariCPConnectionPool.MIN_IDLE, "4");
+ runner.setProperty(service, HikariCPConnectionPool.MAX_CONN_LIFETIME,
"1 secs");
+
+ runner.enableControllerService(service);
+
+ Assertions.assertEquals(4, service.getDataSource().getMinimumIdle());
+ Assertions.assertEquals(1000,
service.getDataSource().getMaxLifetime());
+
+ service.getDataSource().close();
+ }
+
+ /**
+ * Test database connection using Derby. Connect, create table, insert,
select, drop table.
+ */
+ @Test
+ public void testCreateInsertSelect() throws InitializationException,
SQLException {
+ final HikariCPConnectionPool service = new HikariCPConnectionPool();
+ runner.addControllerService(GOOD_CS_NAME, service);
+
+ setDerbyProperties(service);
+
+ runner.enableControllerService(service);
+
+ runner.assertValid(service);
+ final DBCPService dbcpService = (DBCPService)
runner.getProcessContext().getControllerServiceLookup().getControllerService(GOOD_CS_NAME);
+ Assertions.assertNotNull(dbcpService);
+ final Connection connection = dbcpService.getConnection();
+ Assertions.assertNotNull(connection);
+
+ createInsertSelectDrop(connection);
+
+ connection.close(); // return to pool
+ }
+
+ /**
+ * Test get database connection using Derby. Get many times, after a while
pool should not contain any available connection and getConnection should fail.
+ */
+ @Test
+ public void testExhaustPool() throws InitializationException {
+ final HikariCPConnectionPool service = new HikariCPConnectionPool();
+ runner.addControllerService(EXHAUST_CS_NAME, service);
+
+ setDerbyProperties(service);
+
+ runner.enableControllerService(service);
+
+ runner.assertValid(service);
+ Assertions.assertDoesNotThrow(() -> {
+
runner.getProcessContext().getControllerServiceLookup().getControllerService(EXHAUST_CS_NAME);
+ });
+ final DBCPService dbcpService = (DBCPService)
runner.getProcessContext().getControllerServiceLookup().getControllerService(EXHAUST_CS_NAME);
+ Assertions.assertNotNull(dbcpService);
+
+ try {
+ for (int i = 0; i < 100; i++) {
+ final Connection connection = dbcpService.getConnection();
+ Assertions.assertNotNull(connection);
+ }
+ Assertions.fail("Should have exhausted the pool and thrown a
ProcessException");
+ } catch (ProcessException pe) {
+ // Do nothing, this is expected
+ } catch (Throwable t) {
+ Assertions.fail("Should have exhausted the pool and thrown a
ProcessException but threw " + t);
+ }
+ }
+
+ /**
+ * Test get database connection using Derby. Get many times, release
immediately and getConnection should not fail.
+ */
+ @Test
+ public void testGetManyNormal() throws InitializationException,
SQLException {
+ final HikariCPConnectionPool service = new HikariCPConnectionPool();
+ runner.addControllerService(EXHAUST_CS_NAME, service);
+
+ setDerbyProperties(service);
+
+ runner.enableControllerService(service);
+
+ runner.assertValid(service);
+ final DBCPService dbcpService = (DBCPService)
runner.getProcessContext().getControllerServiceLookup().getControllerService(EXHAUST_CS_NAME);
+ Assertions.assertNotNull(dbcpService);
+
+ for (int i = 0; i < 100; i++) {
+ final Connection connection = dbcpService.getConnection();
+ Assertions.assertNotNull(connection);
+ connection.close(); // will return connection to pool
+ }
+ }
+
+ private void createInsertSelectDrop(Connection con) throws SQLException {
+
+ final Statement st = con.createStatement();
+
+ try {
+ String dropTable = "drop table restaurants";
+ st.executeUpdate(dropTable);
+ } catch (final Exception e) {
+ // table may not exist, this is not serious problem.
+ }
+
+ String createTable = "create table restaurants(id integer, name
varchar(20), city varchar(50))";
+ st.executeUpdate(createTable);
+
+ st.executeUpdate("insert into restaurants values (1, 'Irifunes', 'San
Mateo')");
+ st.executeUpdate("insert into restaurants values (2, 'Estradas', 'Daly
City')");
+ st.executeUpdate("insert into restaurants values (3, 'Prime Rib
House', 'San Francisco')");
+
+ int nrOfRows = 0;
+ final ResultSet resultSet = st.executeQuery("select * from
restaurants");
+ while (resultSet.next()) {
+ nrOfRows++;
+ }
+ Assertions.assertEquals(3, nrOfRows);
+
+ st.close();
+ }
+
+ private void setDerbyProperties(final HikariCPConnectionPool service) {
+ runner.setProperty(service, HikariCPConnectionPool.DATABASE_URL,
"jdbc:derby:" + DB_LOCATION + ";create=true");
+ runner.setProperty(service, HikariCPConnectionPool.DB_DRIVERNAME,
"org.apache.derby.jdbc.EmbeddedDriver");
+ runner.setProperty(service, HikariCPConnectionPool.DB_USER, "tester");
+ runner.setProperty(service, HikariCPConnectionPool.DB_PASSWORD,
"testerp");
+ }
+}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/pom.xml
b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/pom.xml
index 17bd1b3..f3cbdcb 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/pom.xml
@@ -24,6 +24,7 @@
<packaging>pom</packaging>
<modules>
<module>nifi-dbcp-service</module>
+ <module>nifi-hikari-dbcp-service</module>
<module>nifi-dbcp-service-nar</module>
</modules>
</project>