This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new fd8acd57b8 NIFI-13555 Added Verification to HikariDBCPConnectionPool
(#9085)
fd8acd57b8 is described below
commit fd8acd57b8172417e382fd004a7a7480792d2da7
Author: Matt Burgess <[email protected]>
AuthorDate: Tue Aug 6 14:26:19 2024 -0400
NIFI-13555 Added Verification to HikariDBCPConnectionPool (#9085)
Signed-off-by: David Handermann <[email protected]>
---
.../apache/nifi/dbcp/HikariCPConnectionPool.java | 201 +++++++++++++++++----
.../nifi/dbcp/HikariCPConnectionPoolTest.java | 51 +++++-
.../nifi/util/MockControllerServiceLookup.java | 2 +-
3 files changed, 211 insertions(+), 43 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-hikari-dbcp-service/src/main/java/org/apache/nifi/dbcp/HikariCPConnectionPool.java
b/nifi-extension-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-hikari-dbcp-service/src/main/java/org/apache/nifi/dbcp/HikariCPConnectionPool.java
index 230df5a6f3..59413b71e4 100644
---
a/nifi-extension-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-hikari-dbcp-service/src/main/java/org/apache/nifi/dbcp/HikariCPConnectionPool.java
+++
b/nifi-extension-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-hikari-dbcp-service/src/main/java/org/apache/nifi/dbcp/HikariCPConnectionPool.java
@@ -27,6 +27,7 @@ import
org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.RequiredPermission;
@@ -34,12 +35,15 @@ import
org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.VerifiableControllerService;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.kerberos.KerberosUserService;
+import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.krb.KerberosAction;
+import org.apache.nifi.security.krb.KerberosLoginException;
import org.apache.nifi.security.krb.KerberosUser;
import javax.security.auth.login.LoginException;
@@ -48,10 +52,14 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import static
org.apache.nifi.components.ConfigVerificationResult.Outcome.FAILED;
+import static
org.apache.nifi.components.ConfigVerificationResult.Outcome.SUCCESSFUL;
+
/**
* Implementation of Database Connection Pooling Service. HikariCP is used for
connection pooling functionality.
*/
@@ -71,7 +79,7 @@ import java.util.stream.Collectors;
)
}
)
-public class HikariCPConnectionPool extends AbstractControllerService
implements DBCPService {
+public class HikariCPConnectionPool extends AbstractControllerService
implements DBCPService, VerifiableControllerService {
/**
* Property Name Prefix for Sensitive Dynamic Properties
*/
@@ -81,6 +89,8 @@ public class HikariCPConnectionPool extends
AbstractControllerService implements
private static final String DEFAULT_TOTAL_CONNECTIONS = "10";
private static final String DEFAULT_MAX_CONN_LIFETIME = "-1";
+ private static final int DEFAULT_MIN_VALIDATION_TIMEOUT = 250;
+
public static final PropertyDescriptor DATABASE_URL = new
PropertyDescriptor.Builder()
.name("hikaricp-connection-url")
.displayName("Database Connection URL")
@@ -254,7 +264,132 @@ public class HikariCPConnectionPool extends
AbstractControllerService implements
*/
@OnEnabled
public void onConfigured(final ConfigurationContext context) {
+ dataSource = new HikariDataSource();
+ configureDataSource(context, dataSource);
+ }
+
+ private long extractMillisWithInfinite(PropertyValue prop) {
+ return "-1".equals(prop.getValue()) ? INFINITE_MILLISECONDS :
prop.asTimePeriod(TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Shutdown pool, close all open connections.
+ * If a principal is authenticated with a KDC, that principal is logged
out.
+ * <p>
+ * If a @{@link LoginException} occurs while attempting to log out the
@{@link org.apache.nifi.security.krb.KerberosUser},
+ * an attempt will still be made to shut down the pool and close open
connections.
+ *
+ */
+ @OnDisabled
+ public void shutdown() {
+ try {
+ if (kerberosUser != null) {
+ kerberosUser.logout();
+ }
+ } finally {
+ kerberosUser = null;
+ try {
+ if (dataSource != null) {
+ dataSource.close();
+ }
+ } finally {
+ dataSource = null;
+ }
+ }
+ }
+
+ @Override
+ public Connection getConnection() throws ProcessException {
+ try {
+ final Connection con;
+ if (kerberosUser != null) {
+ KerberosAction<Connection> kerberosAction = new
KerberosAction<>(kerberosUser, dataSource::getConnection, getLogger());
+ con = kerberosAction.execute();
+ } else {
+ con = dataSource.getConnection();
+ }
+ return con;
+ } catch (final SQLException e) {
+ // If using Kerberos, attempt to re-login
+ if (kerberosUser != null) {
+ getLogger().info("Error getting connection, performing
Kerberos re-login");
+ kerberosUser.login();
+ }
+ throw new ProcessException("Connection retrieval failed", e);
+ }
+ }
+
+ @Override
+ public List<ConfigVerificationResult> verify(final ConfigurationContext
context, final ComponentLog verificationLogger, final Map<String, String>
variables) {
+ List<ConfigVerificationResult> results = new ArrayList<>();
+ final KerberosUserService kerberosUserService =
context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
+ KerberosUser kerberosUser = null;
+ try {
+ if (kerberosUserService != null) {
+ kerberosUser = kerberosUserService.createKerberosUser();
+ if (kerberosUser != null) {
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Configure Kerberos User")
+ .outcome(SUCCESSFUL)
+ .explanation("Successfully configured Kerberos
user")
+ .build());
+ }
+ }
+ } catch (final Exception e) {
+ verificationLogger.error("Failed to configure Kerberos user", e);
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Configure Kerberos User")
+ .outcome(FAILED)
+ .explanation("Failed to configure Kerberos user: " +
e.getMessage())
+ .build());
+ }
+ final HikariDataSource hikariDataSource = new HikariDataSource();
+ try {
+ configureDataSource(context, hikariDataSource);
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Configure Data Source")
+ .outcome(SUCCESSFUL)
+ .explanation("Successfully configured data source")
+ .build());
+
+ try (final Connection conn = getConnection(hikariDataSource,
kerberosUser)) {
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Establish Connection")
+ .outcome(SUCCESSFUL)
+ .explanation("Successfully established Database
Connection")
+ .build());
+ } catch (final Exception e) {
+ verificationLogger.error("Failed to establish Database
Connection", e);
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Establish Connection")
+ .outcome(FAILED)
+ .explanation("Failed to establish Database Connection:
" + e.getMessage())
+ .build());
+ }
+ } catch (final Exception e) {
+ String message = "Failed to configure Data Source.";
+ if (e.getCause() instanceof ClassNotFoundException) {
+ message += String.format(" Ensure changes to the '%s'
property are applied before verifying",
+ DB_DRIVER_LOCATION.getDisplayName());
+ }
+ verificationLogger.error(message, e);
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Configure Data Source")
+ .outcome(FAILED)
+ .explanation(message + ": " + e.getMessage())
+ .build());
+ } finally {
+ try {
+ shutdown(dataSource, kerberosUser);
+ } catch (final SQLException e) {
+ verificationLogger.error("Failed to shut down data source", e);
+ }
+ }
+ return results;
+ }
+
+ protected void configureDataSource(final ConfigurationContext context,
final HikariDataSource dataSource) {
final String driverName =
context.getProperty(DB_DRIVERNAME).evaluateAttributeExpressions().getValue();
final String user =
context.getProperty(DB_USER).evaluateAttributeExpressions().getValue();
final String passw =
context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue();
@@ -264,6 +399,7 @@ public class HikariCPConnectionPool extends
AbstractControllerService implements
final long maxWaitMillis =
extractMillisWithInfinite(context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions());
final int minIdle =
context.getProperty(MIN_IDLE).evaluateAttributeExpressions().asInteger();
final long maxConnLifetimeMillis =
extractMillisWithInfinite(context.getProperty(MAX_CONN_LIFETIME).evaluateAttributeExpressions());
+
final KerberosUserService kerberosUserService =
context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
if (kerberosUserService != null) {
@@ -273,9 +409,8 @@ public class HikariCPConnectionPool extends
AbstractControllerService implements
}
}
- dataSource = new HikariDataSource();
- dataSource.setDriverClassName(driverName);
dataSource.setConnectionTimeout(maxWaitMillis);
+ dataSource.setValidationTimeout(Math.max(maxWaitMillis,
DEFAULT_MIN_VALIDATION_TIMEOUT));
dataSource.setMaximumPoolSize(maxTotal);
dataSource.setMinimumIdle(minIdle);
dataSource.setMaxLifetime(maxConnLifetimeMillis);
@@ -284,6 +419,7 @@ public class HikariCPConnectionPool extends
AbstractControllerService implements
dataSource.setConnectionTestQuery(validationQuery);
}
+ dataSource.setDriverClassName(driverName);
dataSource.setJdbcUrl(dburl);
dataSource.setUsername(user);
dataSource.setPassword(passw);
@@ -308,42 +444,11 @@ public class HikariCPConnectionPool extends
AbstractControllerService implements
dataSource.setPoolName(toString());
}
- private long extractMillisWithInfinite(PropertyValue prop) {
- return "-1".equals(prop.getValue()) ? INFINITE_MILLISECONDS :
prop.asTimePeriod(TimeUnit.MILLISECONDS);
- }
-
- /**
- * Shutdown pool, close all open connections.
- * If a principal is authenticated with a KDC, that principal is logged
out.
- * <p>
- * If a @{@link LoginException} occurs while attempting to log out the
@{@link org.apache.nifi.security.krb.KerberosUser},
- * an attempt will still be made to shut down the pool and close open
connections.
- *
- */
- @OnDisabled
- public void shutdown() {
- try {
- if (kerberosUser != null) {
- kerberosUser.logout();
- }
- } finally {
- kerberosUser = null;
- try {
- if (dataSource != null) {
- dataSource.close();
- }
- } finally {
- dataSource = null;
- }
- }
- }
-
- @Override
- public Connection getConnection() throws ProcessException {
+ private Connection getConnection(final HikariDataSource dataSource, final
KerberosUser kerberosUser) {
try {
final Connection con;
if (kerberosUser != null) {
- KerberosAction<Connection> kerberosAction = new
KerberosAction<>(kerberosUser, () -> dataSource.getConnection(), getLogger());
+ KerberosAction<Connection> kerberosAction = new
KerberosAction<>(kerberosUser, dataSource::getConnection, getLogger());
con = kerberosAction.execute();
} else {
con = dataSource.getConnection();
@@ -352,14 +457,30 @@ public class HikariCPConnectionPool extends
AbstractControllerService implements
} catch (final SQLException e) {
// If using Kerberos, attempt to re-login
if (kerberosUser != null) {
- getLogger().info("Error getting connection, performing
Kerberos re-login");
- kerberosUser.login();
+ try {
+ getLogger().info("Error getting connection, performing
Kerberos re-login", e);
+ kerberosUser.login();
+ } catch (KerberosLoginException le) {
+ throw new ProcessException("Unable to authenticate
Kerberos principal", le);
+ }
}
- throw new ProcessException("Connection retrieval failed", e);
+ throw new ProcessException(e);
}
}
- @Override
+ private void shutdown(final HikariDataSource dataSource, final
KerberosUser kerberosUser) throws SQLException {
+ try {
+ if (kerberosUser != null) {
+ kerberosUser.logout();
+ }
+ } finally {
+ if (dataSource != null) {
+ dataSource.close();
+ }
+ }
+ }
+
+ @Override
public String toString() {
return String.format("%s[id=%s]", getClass().getSimpleName(),
getIdentifier());
}
diff --git
a/nifi-extension-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-hikari-dbcp-service/src/test/java/org/apache/nifi/dbcp/HikariCPConnectionPoolTest.java
b/nifi-extension-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-hikari-dbcp-service/src/test/java/org/apache/nifi/dbcp/HikariCPConnectionPoolTest.java
index 3abc75cb19..5d2f925a0a 100644
---
a/nifi-extension-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-hikari-dbcp-service/src/test/java/org/apache/nifi/dbcp/HikariCPConnectionPoolTest.java
+++
b/nifi-extension-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-hikari-dbcp-service/src/test/java/org/apache/nifi/dbcp/HikariCPConnectionPoolTest.java
@@ -16,7 +16,11 @@
*/
package org.apache.nifi.dbcp;
+import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.ControllerServiceConfiguration;
+import org.apache.nifi.util.MockConfigurationContext;
+import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.NoOpProcessor;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -26,8 +30,14 @@ import org.junit.jupiter.api.Test;
import java.sql.Connection;
import java.sql.SQLException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
public class HikariCPConnectionPoolTest {
@@ -35,6 +45,10 @@ public class HikariCPConnectionPoolTest {
private static final String INVALID_CONNECTION_URL = "jdbc:h2";
+ private static final String DB_DRIVERNAME_VALUE = "jdbc:mock";
+
+ private static final String MAX_WAIT_TIME_VALUE = "5 s";
+
private TestRunner runner;
@BeforeEach
@@ -134,11 +148,44 @@ public class HikariCPConnectionPoolTest {
}
}
+ @Test
+ void testVerifySuccessful() throws Exception {
+ final HikariCPConnectionPool service = new HikariCPConnectionPool();
+ runner.addControllerService(SERVICE_ID, service);
+ final Connection mockConnection = mock(Connection.class);
+ MockDriver.setConnection(mockConnection);
+ setDatabaseProperties(service);
+ runner.setProperty(service,
HikariCPConnectionPool.MAX_TOTAL_CONNECTIONS, "2");
+ runner.enableControllerService(service);
+ runner.assertValid(service);
+ MockProcessContext processContext = (MockProcessContext)
runner.getProcessContext();
+ final ControllerServiceConfiguration configuration =
processContext.getConfiguration(service.getIdentifier());
+ final MockConfigurationContext configContext = new
MockConfigurationContext(service, configuration.getProperties(),
processContext, Collections.emptyMap());
+ final List<ConfigVerificationResult> results =
service.verify(configContext, runner.getLogger(),
configContext.getAllProperties());
+
+ assertOutcomeSuccessful(results);
+ }
+
private void setDatabaseProperties(final HikariCPConnectionPool service) {
- runner.setProperty(service, HikariCPConnectionPool.DATABASE_URL,
"jdbc:mock");
+ runner.setProperty(service, HikariCPConnectionPool.DATABASE_URL,
DB_DRIVERNAME_VALUE);
runner.setProperty(service, HikariCPConnectionPool.DB_DRIVERNAME,
MockDriver.class.getName());
- runner.setProperty(service, HikariCPConnectionPool.MAX_WAIT_TIME, "5
s");
+ runner.setProperty(service, HikariCPConnectionPool.MAX_WAIT_TIME,
MAX_WAIT_TIME_VALUE);
runner.setProperty(service, HikariCPConnectionPool.DB_USER,
String.class.getSimpleName());
runner.setProperty(service, HikariCPConnectionPool.DB_PASSWORD,
String.class.getName());
}
+
+ private void assertOutcomeSuccessful(final List<ConfigVerificationResult>
results) {
+ assertNotNull(results);
+ final Iterator<ConfigVerificationResult> resultsFound =
results.iterator();
+
+ assertTrue(resultsFound.hasNext());
+ final ConfigVerificationResult firstResult = resultsFound.next();
+ assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL,
firstResult.getOutcome(), firstResult.getExplanation());
+
+ assertTrue(resultsFound.hasNext());
+ final ConfigVerificationResult secondResult = resultsFound.next();
+ assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL,
secondResult.getOutcome(), secondResult.getExplanation());
+
+ assertFalse(resultsFound.hasNext());
+ }
}
\ No newline at end of file
diff --git
a/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceLookup.java
b/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceLookup.java
index 5bec0ce078..9d5de109d6 100644
---
a/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceLookup.java
+++
b/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceLookup.java
@@ -55,7 +55,7 @@ public abstract class MockControllerServiceLookup implements
ControllerServiceLo
this.controllerServiceMap.putAll(other.controllerServiceMap);
}
- protected ControllerServiceConfiguration getConfiguration(final String
identifier) {
+ public ControllerServiceConfiguration getConfiguration(final String
identifier) {
return controllerServiceMap.get(identifier);
}