This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 8c8a9b4 NIFI-5985: Added capability for DBCPConnectionPool to use
KerberosCredentialsService. Refactored KerberosAction to return a result from
execute() Removed usage of ProcessContext.yield() from KerberosAction, which
should instead be handled the component using the KerberosCredentialsService.
Updated SolrProcessor to yield a flowfile on error, rather than the
KerberosAction invoking the yield.
8c8a9b4 is described below
commit 8c8a9b4d53584913881d16771c8ace05a6cf6819
Author: Jeff Storck <[email protected]>
AuthorDate: Wed Jan 30 19:08:29 2019 -0500
NIFI-5985: Added capability for DBCPConnectionPool to use
KerberosCredentialsService.
Refactored KerberosAction to return a result from execute()
Removed usage of ProcessContext.yield() from KerberosAction, which should
instead be handled the component using the KerberosCredentialsService.
Updated SolrProcessor to yield a flowfile on error, rather than the
KerberosAction invoking the yield.
NIFI-5985: Updated TestPutSolrContentStream.testUpdateWithKerberosAuth test
case to match on PrivilegedExceptionAction instead of PrivilegedAction doAs
arguments.
NIFI-5985: Moved kerberosUser logout after closing the datasource in the
shutdown method.
NIFI-5985: Removed catching exceptions in DBCPConnectionPool.shutdown
Exception when closing the datasource is prioritized over an exception when
logging out the kerberos principal
Added GroovyDBCPServiceTest tests to verify prioritizing datasource.close()
exception over kerberosUser.logout() exception
This closes #3288.
Signed-off-by: Bryan Bende <[email protected]>
---
.../apache/nifi/security/krb/KerberosAction.java | 31 +++++-------
.../apache/nifi/security/krb/KerberosUserIT.java | 6 +--
.../apache/nifi/processors/solr/SolrProcessor.java | 13 +++--
.../processors/solr/TestPutSolrContentStream.java | 11 ++--
.../nifi-dbcp-service/pom.xml | 8 ++-
.../org/apache/nifi/dbcp/DBCPConnectionPool.java | 55 ++++++++++++++++++--
.../apache/nifi/dbcp/GroovyDBCPServiceTest.groovy | 58 ++++++++++++++++++++++
7 files changed, 146 insertions(+), 36 deletions(-)
diff --git
a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/KerberosAction.java
b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/KerberosAction.java
index bd3e1f9..18c752b 100644
---
a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/KerberosAction.java
+++
b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/KerberosAction.java
@@ -18,45 +18,40 @@ package org.apache.nifi.security.krb;
import org.apache.commons.lang3.Validate;
import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
import javax.security.auth.login.LoginException;
-import java.security.PrivilegedAction;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
/**
* Helper class for processors to perform an action as a KerberosUser.
*/
-public class KerberosAction {
+public class KerberosAction<T> {
private final KerberosUser kerberosUser;
- private final PrivilegedAction action;
- private final ProcessContext context;
+ private final PrivilegedExceptionAction<T> action;
private final ComponentLog logger;
public KerberosAction(final KerberosUser kerberosUser,
- final PrivilegedAction action,
- final ProcessContext context,
+ final PrivilegedExceptionAction<T> action,
final ComponentLog logger) {
this.kerberosUser = kerberosUser;
this.action = action;
- this.context = context;
this.logger = logger;
Validate.notNull(this.kerberosUser);
Validate.notNull(this.action);
- Validate.notNull(this.context);
Validate.notNull(this.logger);
}
- public void execute() {
+ public T execute() {
+ T result;
// lazily login the first time the processor executes
if (!kerberosUser.isLoggedIn()) {
try {
kerberosUser.login();
logger.info("Successful login for {}", new
Object[]{kerberosUser.getPrincipal()});
} catch (LoginException e) {
- // make sure to yield so the processor doesn't keep retrying
the rolled back flow files immediately
- context.yield();
throw new ProcessException("Login failed due to: " +
e.getMessage(), e);
}
}
@@ -65,14 +60,12 @@ public class KerberosAction {
try {
kerberosUser.checkTGTAndRelogin();
} catch (LoginException e) {
- // make sure to yield so the processor doesn't keep retrying the
rolled back flow files immediately
- context.yield();
throw new ProcessException("Relogin check failed due to: " +
e.getMessage(), e);
}
// attempt to execute the action, if an exception is caught attempt to
logout/login and retry
try {
- kerberosUser.doAs(action);
+ result = kerberosUser.doAs(action);
} catch (SecurityException se) {
logger.info("Privileged action failed, attempting relogin and
retrying...");
logger.debug("", se);
@@ -80,13 +73,15 @@ public class KerberosAction {
try {
kerberosUser.logout();
kerberosUser.login();
- kerberosUser.doAs(action);
+ result = kerberosUser.doAs(action);
} catch (Exception e) {
- // make sure to yield so the processor doesn't keep retrying
the rolled back flow files immediately
- context.yield();
throw new ProcessException("Retrying privileged action failed
due to: " + e.getMessage(), e);
}
+ } catch (PrivilegedActionException e) {
+ throw new ProcessException("Privileged action failed due to: " +
e.getMessage(), e.getException());
}
+
+ return result;
}
}
diff --git
a/nifi-commons/nifi-security-utils/src/test/java/org/apache/nifi/security/krb/KerberosUserIT.java
b/nifi-commons/nifi-security-utils/src/test/java/org/apache/nifi/security/krb/KerberosUserIT.java
index e636008..5a390d6 100644
---
a/nifi-commons/nifi-security-utils/src/test/java/org/apache/nifi/security/krb/KerberosUserIT.java
+++
b/nifi-commons/nifi-security-utils/src/test/java/org/apache/nifi/security/krb/KerberosUserIT.java
@@ -29,7 +29,7 @@ import javax.security.auth.kerberos.KerberosPrincipal;
import javax.security.auth.login.LoginException;
import java.io.File;
import java.security.Principal;
-import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
@@ -174,7 +174,7 @@ public class KerberosUserIT {
final KerberosUser user1 = new
KerberosKeytabUser(principal1.getName(),
principal1KeytabFile.getAbsolutePath());
final AtomicReference<String> resultHolder = new
AtomicReference<>(null);
- final PrivilegedAction privilegedAction = () -> {
+ final PrivilegedExceptionAction<Void> privilegedAction = () -> {
resultHolder.set("SUCCESS");
return null;
};
@@ -183,7 +183,7 @@ public class KerberosUserIT {
final ComponentLog logger = Mockito.mock(ComponentLog.class);
// create the action to test and execute it
- final KerberosAction kerberosAction = new KerberosAction(user1,
privilegedAction, context, logger);
+ final KerberosAction kerberosAction = new KerberosAction<>(user1,
privilegedAction, logger);
kerberosAction.execute();
// if the result holder has the string success then we know the action
executed
diff --git
a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java
index 4a193b6..79f397f 100755
---
a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java
+++
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java
@@ -36,7 +36,7 @@ import org.apache.solr.client.solrj.SolrClient;
import javax.security.auth.login.LoginException;
import java.io.IOException;
-import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -113,14 +113,19 @@ public abstract class SolrProcessor extends
AbstractProcessor {
doOnTrigger(context, session);
} else {
// wrap doOnTrigger in a privileged action
- final PrivilegedAction action = () -> {
+ final PrivilegedExceptionAction<Void> action = () -> {
doOnTrigger(context, session);
return null;
};
// execute the privileged action as the given keytab user
- final KerberosAction kerberosAction = new
KerberosAction(kerberosUser, action, context, getLogger());
- kerberosAction.execute();
+ final KerberosAction kerberosAction = new
KerberosAction<>(kerberosUser, action, getLogger());
+ try {
+ kerberosAction.execute();
+ } catch (ProcessException e) {
+ context.yield();
+ throw e;
+ }
}
}
diff --git
a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java
index e3c3b79..d41f89a 100755
---
a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java
+++
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java
@@ -44,7 +44,8 @@ import javax.net.ssl.SSLContext;
import javax.security.auth.login.LoginException;
import java.io.FileInputStream;
import java.io.IOException;
-import java.security.PrivilegedAction;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
@@ -459,7 +460,7 @@ public class TestPutSolrContentStream {
}
@Test
- public void testUpdateWithKerberosAuth() throws IOException,
InitializationException, LoginException {
+ public void testUpdateWithKerberosAuth() throws IOException,
InitializationException, LoginException, PrivilegedActionException {
final String principal = "[email protected]";
final String keytab = "src/test/resources/foo.keytab";
@@ -467,8 +468,8 @@ public class TestPutSolrContentStream {
final KerberosKeytabUser kerberosUser =
Mockito.mock(KerberosKeytabUser.class);
when(kerberosUser.getPrincipal()).thenReturn(principal);
when(kerberosUser.getKeytabFile()).thenReturn(keytab);
-
when(kerberosUser.doAs(any(PrivilegedAction.class))).thenAnswer((invocation -> {
- final PrivilegedAction action = (PrivilegedAction)
invocation.getArguments()[0];
+
when(kerberosUser.doAs(any(PrivilegedExceptionAction.class))).thenAnswer((invocation
-> {
+ final PrivilegedExceptionAction action =
(PrivilegedExceptionAction) invocation.getArguments()[0];
action.run();
return null;
})
@@ -502,7 +503,7 @@ public class TestPutSolrContentStream {
// Verify that during the update the user was logged in, TGT was
checked, and the action was executed
verify(kerberosUser, times(1)).login();
verify(kerberosUser, times(1)).checkTGTAndRelogin();
- verify(kerberosUser, times(1)).doAs(any(PrivilegedAction.class));
+ verify(kerberosUser,
times(1)).doAs(any(PrivilegedExceptionAction.class));
}
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/pom.xml
b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/pom.xml
index 82d5b8d..0640582 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/pom.xml
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/pom.xml
@@ -44,6 +44,12 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-kerberos-credentials-service-api</artifactId>
+ <version>1.9.0-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>1.9.0-SNAPSHOT</version>
<scope>test</scope>
@@ -57,7 +63,7 @@
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>10.11.1.1</version>
- </dependency>
+ </dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derbynet</artifactId>
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java
b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java
index d6f83f2..6dd24d1 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java
@@ -32,12 +32,16 @@ import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.security.krb.KerberosAction;
+import org.apache.nifi.security.krb.KerberosKeytabUser;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
+import javax.security.auth.login.LoginException;
import java.net.MalformedURLException;
import java.sql.Connection;
import java.sql.Driver;
@@ -263,6 +267,14 @@ public class DBCPConnectionPool extends
AbstractControllerService implements DBC
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
+ public static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new
PropertyDescriptor.Builder()
+ .name("kerberos-credentials-service")
+ .displayName("Kerberos Credentials Service")
+ .description("Specifies the Kerberos Credentials Controller
Service that should be used for authenticating with Kerberos")
+ .identifiesControllerService(KerberosCredentialsService.class)
+ .required(false)
+ .build();
+
private static final List<PropertyDescriptor> properties;
static {
@@ -270,6 +282,7 @@ public class DBCPConnectionPool extends
AbstractControllerService implements DBC
props.add(DATABASE_URL);
props.add(DB_DRIVERNAME);
props.add(DB_DRIVER_LOCATION);
+ props.add(KERBEROS_CREDENTIALS_SERVICE);
props.add(DB_USER);
props.add(DB_PASSWORD);
props.add(MAX_WAIT_TIME);
@@ -286,6 +299,7 @@ public class DBCPConnectionPool extends
AbstractControllerService implements DBC
}
private volatile BasicDataSource dataSource;
+ private volatile KerberosKeytabUser kerberosUser;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -333,6 +347,16 @@ public class DBCPConnectionPool extends
AbstractControllerService implements DBC
final Long timeBetweenEvictionRunsMillis =
extractMillisWithInfinite(context.getProperty(EVICTION_RUN_PERIOD));
final Long minEvictableIdleTimeMillis =
extractMillisWithInfinite(context.getProperty(MIN_EVICTABLE_IDLE_TIME));
final Long softMinEvictableIdleTimeMillis =
extractMillisWithInfinite(context.getProperty(SOFT_MIN_EVICTABLE_IDLE_TIME));
+ final KerberosCredentialsService kerberosCredentialsService =
context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+
+ if (kerberosCredentialsService != null) {
+ kerberosUser = new
KerberosKeytabUser(kerberosCredentialsService.getPrincipal(),
kerberosCredentialsService.getKeytab());
+ try {
+ kerberosUser.login();
+ } catch (LoginException e) {
+ throw new InitializationException("Unable to authenticate
Kerberos principal", e);
+ }
+ }
dataSource = new BasicDataSource();
dataSource.setDriverClassName(drv);
@@ -410,20 +434,41 @@ public class DBCPConnectionPool extends
AbstractControllerService implements DBC
/**
* Shutdown pool, close all open connections.
+ * If a principal is authenticated with a KDC, that principal is logged
out.
+ *
+ * If a @{@link LoginException} occurs while attempting to log out the
@{@link org.apache.nifi.security.krb.KerberosUser},
+ * an attempt will still be made to shut down the pool and close open
connections.
+ *
+ * @throws SQLException if there is an error while closing open connections
+ * @throws LoginException if there is an error during the principal log
out, and will only be thrown if there was
+ * no exception while closing open connections
*/
@OnDisabled
- public void shutdown() {
+ public void shutdown() throws SQLException, LoginException {
try {
- dataSource.close();
- } catch (final SQLException e) {
- throw new ProcessException(e);
+ if (kerberosUser != null) {
+ kerberosUser.logout();
+ }
+ } finally {
+ kerberosUser = null;
+ try {
+ dataSource.close();
+ } finally {
+ dataSource = null;
+ }
}
}
@Override
public Connection getConnection() throws ProcessException {
try {
- final Connection con = dataSource.getConnection();
+ final Connection con;
+ if (kerberosUser != null) {
+ KerberosAction<Connection> kerberosAction = new
KerberosAction<>(kerberosUser, () -> dataSource.getConnection(), getLogger());
+ con = kerberosAction.execute();
+ } else {
+ con = dataSource.getConnection();
+ }
return con;
} catch (final SQLException e) {
throw new ProcessException(e);
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/groovy/org/apache/nifi/dbcp/GroovyDBCPServiceTest.groovy
b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/groovy/org/apache/nifi/dbcp/GroovyDBCPServiceTest.groovy
index 9da4b22..cb267db 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/groovy/org/apache/nifi/dbcp/GroovyDBCPServiceTest.groovy
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/groovy/org/apache/nifi/dbcp/GroovyDBCPServiceTest.groovy
@@ -16,13 +16,16 @@
*/
package org.apache.nifi.dbcp
+import org.apache.commons.dbcp2.BasicDataSource
import org.apache.nifi.reporting.InitializationException
+import org.apache.nifi.security.krb.KerberosKeytabUser
import org.apache.nifi.util.TestRunner
import org.apache.nifi.util.TestRunners
import org.junit.Assert
import org.junit.BeforeClass
import org.junit.Test
+import javax.security.auth.login.LoginException
import java.sql.Connection
import java.sql.SQLException
@@ -72,4 +75,59 @@ class GroovyDBCPServiceTest {
connection.close() // will return connection to pool
}
}
+
+ @Test(expected = LoginException)
+ void testDatasourceCloseSuccessWithKerberosUserLogoutException() {
+ final DBCPConnectionPool dbcpConnectionPoolService = new
DBCPConnectionPool()
+
+ def basicDataSource = [close: { -> }] as BasicDataSource
+ dbcpConnectionPoolService.dataSource = basicDataSource
+ def kerberosKeytabUser = new KerberosKeytabUser("[email protected]",
"fake.keytab") {
+ @Override
+ void logout() throws LoginException {
+ throw new LoginException("fake logout exception")
+ }
+ }
+ dbcpConnectionPoolService.kerberosUser = kerberosKeytabUser
+
+ dbcpConnectionPoolService.shutdown()
+
+ }
+
+ @Test(expected = SQLException)
+ void testDatasourceCloseExceptionWithKerberosUserLogoutSuccess() {
+ final DBCPConnectionPool dbcpConnectionPoolService = new
DBCPConnectionPool()
+
+ def basicDataSource = [
+ close: { -> throw new SQLException("fake sql exception")
+ }] as BasicDataSource
+ dbcpConnectionPoolService.dataSource = basicDataSource
+ def kerberosKeytabUser = new KerberosKeytabUser("[email protected]",
"fake.keytab") {
+ @Override
+ void logout() throws LoginException {
+ }
+ }
+ dbcpConnectionPoolService.kerberosUser = kerberosKeytabUser
+
+ dbcpConnectionPoolService.shutdown()
+ }
+
+ @Test(expected = SQLException)
+ void testDatasourceCloseExceptionWithKerberosUserLogoutException() {
+ final DBCPConnectionPool dbcpConnectionPoolService = new
DBCPConnectionPool()
+
+ def basicDataSource = [
+ close: { -> throw new SQLException("fake sql exception")
+ }] as BasicDataSource
+ dbcpConnectionPoolService.dataSource = basicDataSource
+ def kerberosKeytabUser = new KerberosKeytabUser("[email protected]",
"fake.keytab") {
+ @Override
+ void logout() throws LoginException {
+ throw new LoginException("fake logout exception")
+ }
+ }
+ dbcpConnectionPoolService.kerberosUser = kerberosKeytabUser
+
+ dbcpConnectionPoolService.shutdown()
+ }
}