This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 419a9cc73b NIFI-12890: Refactor HadoopDBCPConnectionPool to extend
AbstractDBCPConnectionPool
419a9cc73b is described below
commit 419a9cc73bfe5410c0ff399f4feb8e6350b7609d
Author: lehelb <[email protected]>
AuthorDate: Wed Mar 13 14:22:19 2024 -0500
NIFI-12890: Refactor HadoopDBCPConnectionPool to extend
AbstractDBCPConnectionPool
This closes #8619.
Signed-off-by: Peter Turcsanyi <[email protected]>
---
.../nifi-hadoop-dbcp-service/pom.xml | 5 +
.../apache/nifi/dbcp/HadoopDBCPConnectionPool.java | 374 +++++++--------------
.../nifi/dbcp/HadoopDBCPConnectionPoolTest.java | 15 +-
3 files changed, 135 insertions(+), 259 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/pom.xml
b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/pom.xml
index 37a0370672..396c82bd1a 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/pom.xml
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/pom.xml
@@ -28,6 +28,11 @@
<version>2.0.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-dbcp-base</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/main/java/org/apache/nifi/dbcp/HadoopDBCPConnectionPool.java
b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/main/java/org/apache/nifi/dbcp/HadoopDBCPConnectionPool.java
index 5d75daf2ca..09c6302990 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/main/java/org/apache/nifi/dbcp/HadoopDBCPConnectionPool.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/main/java/org/apache/nifi/dbcp/HadoopDBCPConnectionPool.java
@@ -16,21 +16,6 @@
*/
package org.apache.nifi.dbcp;
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.UndeclaredThrowableException;
-import java.security.PrivilegedExceptionAction;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import javax.security.auth.login.LoginException;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
@@ -45,15 +30,15 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceType;
-import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.dbcp.utils.DBCPProperties;
+import org.apache.nifi.dbcp.utils.DataSourceConfiguration;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.hadoop.SecurityUtil;
@@ -64,15 +49,45 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.krb.KerberosKeytabUser;
import org.apache.nifi.security.krb.KerberosLoginException;
import org.apache.nifi.security.krb.KerberosPasswordUser;
-import org.apache.nifi.security.krb.KerberosUser;
+
+import javax.security.auth.login.LoginException;
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.security.PrivilegedExceptionAction;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.nifi.dbcp.utils.DBCPProperties.DATABASE_URL;
+import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_DRIVERNAME;
+import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_PASSWORD;
+import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_USER;
+import static org.apache.nifi.dbcp.utils.DBCPProperties.EVICTION_RUN_PERIOD;
+import static org.apache.nifi.dbcp.utils.DBCPProperties.KERBEROS_USER_SERVICE;
+import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_CONN_LIFETIME;
+import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_IDLE;
+import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_TOTAL_CONNECTIONS;
+import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_WAIT_TIME;
+import static
org.apache.nifi.dbcp.utils.DBCPProperties.MIN_EVICTABLE_IDLE_TIME;
+import static org.apache.nifi.dbcp.utils.DBCPProperties.MIN_IDLE;
+import static
org.apache.nifi.dbcp.utils.DBCPProperties.SOFT_MIN_EVICTABLE_IDLE_TIME;
+import static org.apache.nifi.dbcp.utils.DBCPProperties.VALIDATION_QUERY;
+import static
org.apache.nifi.dbcp.utils.DBCPProperties.extractMillisWithInfinite;
/**
* Implementation of Database Connection Pooling Service for Hadoop related
JDBC Service.
* Apache DBCP is used for connection pooling functionality.
- *
*/
@RequiresInstanceClassLoading
-@Tags({ "dbcp", "jdbc", "database", "connection", "pooling", "store", "hadoop"
})
+@Tags({"dbcp", "jdbc", "database", "connection", "pooling", "store", "hadoop"})
@CapabilityDescription("Provides a Database Connection Pooling Service for
Hadoop related JDBC services. This service requires that " +
"the Database Driver Location(s) contains some version of a
hadoop-common JAR, or a shaded JAR that shades hadoop-common.")
@DynamicProperty(name = "The name of a Hadoop configuration property.", value
= "The value of the given Hadoop configuration property.",
@@ -86,47 +101,19 @@ import org.apache.nifi.security.krb.KerberosUser;
)
}
)
-public class HadoopDBCPConnectionPool extends AbstractControllerService
implements DBCPService {
+public class HadoopDBCPConnectionPool extends AbstractDBCPConnectionPool {
private static final String ALLOW_EXPLICIT_KEYTAB =
"NIFI_ALLOW_EXPLICIT_KEYTAB";
private static final String HADOOP_CONFIGURATION_CLASS =
"org.apache.hadoop.conf.Configuration";
private static final String HADOOP_UGI_CLASS =
"org.apache.hadoop.security.UserGroupInformation";
- private static final String DEFAULT_MIN_IDLE = "0";
- private static final String DEFAULT_MAX_IDLE = "8";
- private static final String DEFAULT_MAX_CONN_LIFETIME = "-1";
- private static final String DEFAULT_EVICTION_RUN_PERIOD =
String.valueOf(-1L);
- private static final String DEFAULT_MIN_EVICTABLE_IDLE_TIME = "30 mins";
- private static final String DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME =
String.valueOf(-1L);
-
- public static final PropertyDescriptor DATABASE_URL = new
PropertyDescriptor.Builder()
- .name("Database Connection URL")
- .description("A database connection URL used to connect to a
database. May contain database system name, host, port, database name and some
parameters."
- + " The exact syntax of a database connection URL is
specified by your DBMS.")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .required(true)
- .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
- .build();
-
- public static final PropertyDescriptor DB_DRIVERNAME = new
PropertyDescriptor.Builder()
- .name("Database Driver Class Name")
- .description("Database driver class name")
- .required(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
- .build();
-
public static final PropertyDescriptor DB_DRIVER_LOCATION = new
PropertyDescriptor.Builder()
- .name("database-driver-locations")
- .displayName("Database Driver Location(s)")
- .description("Comma-separated list of files/folders and/or URLs
containing the driver JAR and its dependencies (if any). " +
+ .fromPropertyDescriptor(DBCPProperties.DB_DRIVER_LOCATION)
+ .description("Comma-separated list of files/folders and/or URLs
containing the driver JAR and its dependencies. " +
"For example '/var/tmp/phoenix-client.jar'. NOTE: It is
required that the resources specified by this property provide " +
"the classes from hadoop-common, such as Configuration and
UserGroupInformation.")
.required(true)
- .identifiesExternalResource(ResourceCardinality.MULTIPLE,
ResourceType.FILE, ResourceType.DIRECTORY, ResourceType.URL)
- .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
- .dynamicallyModifiesClasspath(true)
.build();
static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES = new
PropertyDescriptor.Builder()
@@ -141,126 +128,6 @@ public class HadoopDBCPConnectionPool extends
AbstractControllerService implemen
.dynamicallyModifiesClasspath(true)
.build();
- public static final PropertyDescriptor DB_USER = new
PropertyDescriptor.Builder()
- .name("Database User")
- .description("The user for the database")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
- .build();
-
- public static final PropertyDescriptor DB_PASSWORD = new
PropertyDescriptor.Builder()
- .name("Password")
- .description("The password for the database user")
- .required(false)
- .sensitive(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
- .build();
-
- public static final PropertyDescriptor MAX_WAIT_TIME = new
PropertyDescriptor.Builder()
- .name("Max Wait Time")
- .description("The maximum amount of time that the pool will wait
(when there are no available connections) "
- + " for a connection to be returned before failing, or -1
to wait indefinitely. ")
- .defaultValue("500 millis")
- .required(true)
- .addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
- .sensitive(false)
- .build();
-
- public static final PropertyDescriptor MAX_TOTAL_CONNECTIONS = new
PropertyDescriptor.Builder()
- .name("Max Total Connections")
- .description("The maximum number of active connections that can be
allocated from this pool at the same time, "
- + " or negative for no limit.")
- .defaultValue("8")
- .required(true)
- .addValidator(StandardValidators.INTEGER_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
- .sensitive(false)
- .build();
-
- public static final PropertyDescriptor VALIDATION_QUERY = new
PropertyDescriptor.Builder()
- .name("Validation-query")
- .displayName("Validation query")
- .description("Validation query used to validate connections before
returning them. "
- + "When connection is invalid, it get's dropped and new
valid connection will be returned. "
- + "Note!! Using validation might have some performance
penalty.")
- .required(false)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
- .build();
-
- public static final PropertyDescriptor MIN_IDLE = new
PropertyDescriptor.Builder()
- .displayName("Minimum Idle Connections")
- .name("dbcp-min-idle-conns")
- .description("The minimum number of connections that can remain
idle in the pool, without extra ones being " +
- "created, or zero to create none.")
- .defaultValue(DEFAULT_MIN_IDLE)
- .required(false)
- .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
- .build();
-
- public static final PropertyDescriptor MAX_IDLE = new
PropertyDescriptor.Builder()
- .displayName("Max Idle Connections")
- .name("dbcp-max-idle-conns")
- .description("The maximum number of connections that can remain
idle in the pool, without extra ones being " +
- "released, or negative for no limit.")
- .defaultValue(DEFAULT_MAX_IDLE)
- .required(false)
- .addValidator(StandardValidators.INTEGER_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
- .build();
-
- public static final PropertyDescriptor MAX_CONN_LIFETIME = new
PropertyDescriptor.Builder()
- .displayName("Max Connection Lifetime")
- .name("dbcp-max-conn-lifetime")
- .description("The maximum lifetime of a connection. After this
time is exceeded the " +
- "connection will fail the next activation, passivation or
validation test. A value of zero or less " +
- "means the connection has an infinite lifetime.")
- .defaultValue(DEFAULT_MAX_CONN_LIFETIME)
- .required(false)
- .addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
- .build();
-
- public static final PropertyDescriptor EVICTION_RUN_PERIOD = new
PropertyDescriptor.Builder()
- .displayName("Time Between Eviction Runs")
- .name("dbcp-time-between-eviction-runs")
- .description("The time period to sleep between runs of the idle
connection evictor thread. When " +
- "non-positive, no idle connection evictor thread will be
run.")
- .defaultValue(DEFAULT_EVICTION_RUN_PERIOD)
- .required(false)
- .addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
- .build();
-
- public static final PropertyDescriptor MIN_EVICTABLE_IDLE_TIME = new
PropertyDescriptor.Builder()
- .displayName("Minimum Evictable Idle Time")
- .name("dbcp-min-evictable-idle-time")
- .description("The minimum amount of time a connection may sit idle
in the pool before it is eligible for eviction.")
- .defaultValue(DEFAULT_MIN_EVICTABLE_IDLE_TIME)
- .required(false)
- .addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
- .build();
-
- public static final PropertyDescriptor SOFT_MIN_EVICTABLE_IDLE_TIME = new
PropertyDescriptor.Builder()
- .displayName("Soft Minimum Evictable Idle Time")
- .name("dbcp-soft-min-evictable-idle-time")
- .description("The minimum amount of time a connection may sit idle
in the pool before it is eligible for " +
- "eviction by the idle connection evictor, with the extra
condition that at least a minimum number of" +
- " idle connections remain in the pool. When the not-soft
version of this option is set to a positive" +
- " value, it is examined first by the idle connection
evictor: when idle connections are visited by " +
- "the evictor, idle time is first compared against it
(without considering the number of idle " +
- "connections in the pool) and then against this soft
option, including the minimum idle connections " +
- "constraint.")
- .defaultValue(DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME)
- .required(false)
- .addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
- .build();
-
public static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new
PropertyDescriptor.Builder()
.name("kerberos-credentials-service")
.displayName("Kerberos Credentials Service")
@@ -269,21 +136,10 @@ public class HadoopDBCPConnectionPool extends
AbstractControllerService implemen
.required(false)
.build();
- public static final PropertyDescriptor KERBEROS_USER_SERVICE = new
PropertyDescriptor.Builder()
- .name("kerberos-user-service")
- .displayName("Kerberos User Service")
- .description("Specifies the Kerberos User Controller Service that
should be used for authenticating with Kerberos")
- .identifiesControllerService(KerberosUserService.class)
- .required(false)
- .build();
-
-
private KerberosProperties kerberosProperties;
private List<PropertyDescriptor> properties;
- private volatile BasicDataSource dataSource;
private volatile UserGroupInformation ugi;
- private volatile KerberosUser kerberosUser;
private volatile Boolean foundHadoopDependencies;
// Holder of cached Configuration information so validation does not
reload the same config over and over
@@ -294,29 +150,28 @@ public class HadoopDBCPConnectionPool extends
AbstractControllerService implemen
File kerberosConfigFile = context.getKerberosConfigurationFile();
kerberosProperties = getKerberosProperties(kerberosConfigFile);
- final List<PropertyDescriptor> props = new ArrayList<>();
- props.add(DATABASE_URL);
- props.add(DB_DRIVERNAME);
- props.add(DB_DRIVER_LOCATION);
- props.add(HADOOP_CONFIGURATION_RESOURCES);
- props.add(KERBEROS_USER_SERVICE);
- props.add(KERBEROS_CREDENTIALS_SERVICE);
- props.add(kerberosProperties.getKerberosPrincipal());
- props.add(kerberosProperties.getKerberosKeytab());
- props.add(kerberosProperties.getKerberosPassword());
- props.add(DB_USER);
- props.add(DB_PASSWORD);
- props.add(MAX_WAIT_TIME);
- props.add(MAX_TOTAL_CONNECTIONS);
- props.add(VALIDATION_QUERY);
- props.add(MIN_IDLE);
- props.add(MAX_IDLE);
- props.add(MAX_CONN_LIFETIME);
- props.add(EVICTION_RUN_PERIOD);
- props.add(MIN_EVICTABLE_IDLE_TIME);
- props.add(SOFT_MIN_EVICTABLE_IDLE_TIME);
-
- properties = Collections.unmodifiableList(props);
+ properties = Arrays.asList(
+ DATABASE_URL,
+ DB_DRIVERNAME,
+ DB_DRIVER_LOCATION,
+ HADOOP_CONFIGURATION_RESOURCES,
+ KERBEROS_USER_SERVICE,
+ KERBEROS_CREDENTIALS_SERVICE,
+ kerberosProperties.getKerberosPrincipal(),
+ kerberosProperties.getKerberosKeytab(),
+ kerberosProperties.getKerberosPassword(),
+ DB_USER,
+ DB_PASSWORD,
+ MAX_WAIT_TIME,
+ MAX_TOTAL_CONNECTIONS,
+ VALIDATION_QUERY,
+ MIN_IDLE,
+ MAX_IDLE,
+ MAX_CONN_LIFETIME,
+ EVICTION_RUN_PERIOD,
+ MIN_EVICTABLE_IDLE_TIME,
+ SOFT_MIN_EVICTABLE_IDLE_TIME
+ );
}
protected KerberosProperties getKerberosProperties(File
kerberosConfigFile) {
@@ -458,7 +313,7 @@ public class HadoopDBCPConnectionPool extends
AbstractControllerService implemen
* Configures connection pool by creating an instance of the
* {@link BasicDataSource} based on configuration provided with
* {@link ConfigurationContext}.
- *
+ * <p>
* This operation makes no guarantees that the actual connection could be
* made since the underlying system may still go off-line during normal
* operation of the connection pool.
@@ -498,64 +353,25 @@ public class HadoopDBCPConnectionPool extends
AbstractControllerService implemen
if (resolvedKeytab != null) {
kerberosUser = new KerberosKeytabUser(resolvedPrincipal,
resolvedKeytab);
- getLogger().info("Security Enabled, logging in as principal {}
with keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab});
+ getLogger().info("Security Enabled, logging in as principal {}
with keytab {}", resolvedPrincipal, resolvedKeytab);
} else if (explicitPassword != null) {
kerberosUser = new KerberosPasswordUser(resolvedPrincipal,
explicitPassword);
- getLogger().info("Security Enabled, logging in as principal {}
with password", new Object[] {resolvedPrincipal});
+ getLogger().info("Security Enabled, logging in as principal {}
with password", resolvedPrincipal);
} else {
throw new IOException("Unable to authenticate with Kerberos,
no keytab or password was provided");
}
ugi = SecurityUtil.getUgiForKerberosUser(hadoopConfig,
kerberosUser);
- getLogger().info("Successfully logged in as principal " +
resolvedPrincipal);
+ getLogger().info("Successfully logged in as principal {}",
resolvedPrincipal);
} else {
getLogger().info("Simple Authentication");
}
-
- // Initialize the DataSource...
- final String dbUrl =
context.getProperty(DATABASE_URL).evaluateAttributeExpressions().getValue();
- final String driverName =
context.getProperty(DB_DRIVERNAME).evaluateAttributeExpressions().getValue();
- final String user =
context.getProperty(DB_USER).evaluateAttributeExpressions().getValue();
- final String passw =
context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue();
- final Integer maxTotal =
context.getProperty(MAX_TOTAL_CONNECTIONS).evaluateAttributeExpressions().asInteger();
- final String validationQuery =
context.getProperty(VALIDATION_QUERY).evaluateAttributeExpressions().getValue();
- final Long maxWaitMillis =
extractMillisWithInfinite(context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions());
- final Integer minIdle =
context.getProperty(MIN_IDLE).evaluateAttributeExpressions().asInteger();
- final Integer maxIdle =
context.getProperty(MAX_IDLE).evaluateAttributeExpressions().asInteger();
- final Long maxConnLifetimeMillis =
extractMillisWithInfinite(context.getProperty(MAX_CONN_LIFETIME).evaluateAttributeExpressions());
- final Long timeBetweenEvictionRunsMillis =
extractMillisWithInfinite(context.getProperty(EVICTION_RUN_PERIOD).evaluateAttributeExpressions());
- final Long minEvictableIdleTimeMillis =
extractMillisWithInfinite(context.getProperty(MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions());
- final Long softMinEvictableIdleTimeMillis =
extractMillisWithInfinite(context.getProperty(SOFT_MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions());
-
- dataSource = new BasicDataSource();
- dataSource.setDriverClassName(driverName);
- dataSource.setDriverClassLoader(this.getClass().getClassLoader());
- dataSource.setUrl(dbUrl);
- dataSource.setUsername(user);
- dataSource.setPassword(passw);
- dataSource.setMaxWait(Duration.ofMillis(maxWaitMillis));
- dataSource.setMaxTotal(maxTotal);
- dataSource.setMinIdle(minIdle);
- dataSource.setMaxIdle(maxIdle);
- dataSource.setMaxConn(Duration.ofMillis(maxConnLifetimeMillis));
-
dataSource.setDurationBetweenEvictionRuns(Duration.ofMillis(timeBetweenEvictionRunsMillis));
-
dataSource.setMinEvictableIdle(Duration.ofMillis(minEvictableIdleTimeMillis));
-
dataSource.setSoftMinEvictableIdle(Duration.ofMillis(softMinEvictableIdleTimeMillis));
-
- if (StringUtils.isEmpty(validationQuery)) {
- dataSource.setValidationQuery(validationQuery);
- dataSource.setTestOnBorrow(true);
- }
- }
-
- private Long extractMillisWithInfinite(PropertyValue prop) {
- return "-1".equals(prop.getValue()) ? -1 :
prop.asTimePeriod(TimeUnit.MILLISECONDS);
}
/**
* Shutdown pool, close all open connections.
* If a principal is authenticated with a KDC, that principal is logged
out.
- *
+ * <p>
* If a @{@link LoginException} occurs while attempting to log out the
@{@link org.apache.nifi.security.krb.KerberosUser},
* an attempt will still be made to shut down the pool and close open
connections.
*
@@ -582,6 +398,61 @@ public class HadoopDBCPConnectionPool extends
AbstractControllerService implemen
}
}
+ @Override
+ protected Driver getDriver(String driverName, String url) {
+ final Class<?> clazz;
+
+ try {
+ clazz = Class.forName(driverName);
+ } catch (final ClassNotFoundException e) {
+ throw new ProcessException("Driver class " + driverName + " is not
found", e);
+ }
+
+ try {
+ return DriverManager.getDriver(url);
+ } catch (final SQLException e) {
+ // In case the driver is not registered by the implementation, we
explicitly try to register it.
+ try {
+ final Driver driver = (Driver)
clazz.getDeclaredConstructor().newInstance();
+ DriverManager.registerDriver(driver);
+ return DriverManager.getDriver(url);
+ } catch (final SQLException e2) {
+ throw new ProcessException("No suitable driver for the given
Database Connection URL", e2);
+ } catch (final Exception e2) {
+ throw new ProcessException("Creating driver instance is
failed", e2);
+ }
+ }
+ }
+
+ @Override
+ protected DataSourceConfiguration
getDataSourceConfiguration(ConfigurationContext context) {
+ final String url =
context.getProperty(DATABASE_URL).evaluateAttributeExpressions().getValue();
+ final String driverName =
context.getProperty(DB_DRIVERNAME).evaluateAttributeExpressions().getValue();
+ final String user =
context.getProperty(DB_USER).evaluateAttributeExpressions().getValue();
+ final String password =
context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue();
+ final Integer maxTotal =
context.getProperty(MAX_TOTAL_CONNECTIONS).evaluateAttributeExpressions().asInteger();
+ final String validationQuery =
context.getProperty(VALIDATION_QUERY).evaluateAttributeExpressions().getValue();
+ final Long maxWaitMillis =
extractMillisWithInfinite(context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions());
+ final Integer minIdle =
context.getProperty(MIN_IDLE).evaluateAttributeExpressions().asInteger();
+ final Integer maxIdle =
context.getProperty(MAX_IDLE).evaluateAttributeExpressions().asInteger();
+ final Long maxConnLifetimeMillis =
extractMillisWithInfinite(context.getProperty(MAX_CONN_LIFETIME).evaluateAttributeExpressions());
+ final Long timeBetweenEvictionRunsMillis =
extractMillisWithInfinite(context.getProperty(EVICTION_RUN_PERIOD).evaluateAttributeExpressions());
+ final Long minEvictableIdleTimeMillis =
extractMillisWithInfinite(context.getProperty(MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions());
+ final Long softMinEvictableIdleTimeMillis =
extractMillisWithInfinite(context.getProperty(SOFT_MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions());
+
+ return new DataSourceConfiguration.Builder(url, driverName, user,
password)
+ .validationQuery(validationQuery)
+ .maxWaitMillis(maxWaitMillis)
+ .maxTotal(maxTotal)
+ .minIdle(minIdle)
+ .maxIdle(maxIdle)
+ .maxConnLifetimeMillis(maxConnLifetimeMillis)
+ .timeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis)
+ .minEvictableIdleTimeMillis(minEvictableIdleTimeMillis)
+ .softMinEvictableIdleTimeMillis(softMinEvictableIdleTimeMillis)
+ .build();
+ }
+
@Override
public Connection getConnection() throws ProcessException {
try {
@@ -591,9 +462,9 @@ public class HadoopDBCPConnectionPool extends
AbstractControllerService implemen
getLogger().trace("getting UGI instance");
if (kerberosUser != null) {
// if there's a KerberosUser associated with this UGI,
check the TGT and relogin if it is close to expiring
- getLogger().debug("kerberosUser is " + kerberosUser);
+ getLogger().debug("kerberosUser is {}", kerberosUser);
try {
- getLogger().debug("checking TGT on kerberosUser " +
kerberosUser);
+ getLogger().debug("checking TGT on kerberosUser {}",
kerberosUser);
kerberosUser.checkTGTAndRelogin();
} catch (final KerberosLoginException e) {
throw new ProcessException("Unable to relogin with
kerberos credentials for " + kerberosUser.getPrincipal(), e);
@@ -619,7 +490,6 @@ public class HadoopDBCPConnectionPool extends
AbstractControllerService implemen
return dataSource.getConnection();
}
} catch (SQLException | IOException | InterruptedException e) {
- getLogger().error("Error getting Connection: " + e.getMessage(),
e);
throw new ProcessException(e);
}
}
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/test/java/org/apache/nifi/dbcp/HadoopDBCPConnectionPoolTest.java
b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/test/java/org/apache/nifi/dbcp/HadoopDBCPConnectionPoolTest.java
index a6d3b2c259..b1ab4be992 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/test/java/org/apache/nifi/dbcp/HadoopDBCPConnectionPoolTest.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/test/java/org/apache/nifi/dbcp/HadoopDBCPConnectionPoolTest.java
@@ -17,6 +17,7 @@
package org.apache.nifi.dbcp;
import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.dbcp.utils.DBCPProperties;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.kerberos.KerberosContext;
import org.apache.nifi.kerberos.KerberosCredentialsService;
@@ -55,9 +56,9 @@ public class HadoopDBCPConnectionPoolTest {
// Configure minimum required properties..
final HadoopDBCPConnectionPool hadoopDBCPService = new
TestableHadoopDBCPConnectionPool(true);
runner.addControllerService("hadoop-dbcp-service", hadoopDBCPService);
- runner.setProperty(hadoopDBCPService,
HadoopDBCPConnectionPool.DATABASE_URL,
"jdbc:phoenix:zk-host1,zk-host2:2181:/hbase");
- runner.setProperty(hadoopDBCPService,
HadoopDBCPConnectionPool.DB_DRIVERNAME,
"org.apache.phoenix.jdbc.PhoenixDriver");
- runner.setProperty(hadoopDBCPService,
HadoopDBCPConnectionPool.DB_DRIVER_LOCATION, "target");
+ runner.setProperty(hadoopDBCPService, DBCPProperties.DATABASE_URL,
"jdbc:phoenix:zk-host1,zk-host2:2181:/hbase");
+ runner.setProperty(hadoopDBCPService, DBCPProperties.DB_DRIVERNAME,
"org.apache.phoenix.jdbc.PhoenixDriver");
+ runner.setProperty(hadoopDBCPService,
DBCPProperties.DB_DRIVER_LOCATION, "target");
// Security is not enabled yet since no conf files provided, so should
be valid
runner.assertValid(hadoopDBCPService);
@@ -100,7 +101,7 @@ public class HadoopDBCPConnectionPoolTest {
when(kerberosUserService.getIdentifier()).thenReturn("userService1");
runner.addControllerService(kerberosUserService.getIdentifier(),
kerberosUserService);
runner.enableControllerService(kerberosUserService);
- runner.setProperty(hadoopDBCPService,
HadoopDBCPConnectionPool.KERBEROS_USER_SERVICE,
kerberosUserService.getIdentifier());
+ runner.setProperty(hadoopDBCPService,
DBCPProperties.KERBEROS_USER_SERVICE, kerberosUserService.getIdentifier());
runner.assertNotValid(hadoopDBCPService);
// Remove KerberosCredentialService, should be valid with only
KerberosUserService
@@ -118,7 +119,7 @@ public class HadoopDBCPConnectionPoolTest {
runner.assertNotValid(hadoopDBCPService);
// Remove kerberos user service, should be valid
- runner.removeProperty(hadoopDBCPService,
HadoopDBCPConnectionPool.KERBEROS_USER_SERVICE);
+ runner.removeProperty(hadoopDBCPService,
DBCPProperties.KERBEROS_USER_SERVICE);
runner.assertValid(hadoopDBCPService);
}
@@ -130,8 +131,8 @@ public class HadoopDBCPConnectionPoolTest {
// Configure minimum required properties..
final HadoopDBCPConnectionPool hadoopDBCPService = new
TestableHadoopDBCPConnectionPool(false);
runner.addControllerService("hadoop-dbcp-service", hadoopDBCPService);
- runner.setProperty(hadoopDBCPService,
HadoopDBCPConnectionPool.DATABASE_URL,
"jdbc:phoenix:zk-host1,zk-host2:2181:/hbase");
- runner.setProperty(hadoopDBCPService,
HadoopDBCPConnectionPool.DB_DRIVERNAME,
"org.apache.phoenix.jdbc.PhoenixDriver");
+ runner.setProperty(hadoopDBCPService, DBCPProperties.DATABASE_URL,
"jdbc:phoenix:zk-host1,zk-host2:2181:/hbase");
+ runner.setProperty(hadoopDBCPService, DBCPProperties.DB_DRIVERNAME,
"org.apache.phoenix.jdbc.PhoenixDriver");
runner.setProperty(hadoopDBCPService,
HadoopDBCPConnectionPool.DB_DRIVER_LOCATION, "target");
// Security is not enabled yet since no conf files provided, so should
be valid