Repository: nifi Updated Branches: refs/heads/master 83fccc4be -> a1794b101
NIFI-5041: Adds SPNEGO authentication to LivySessionController NIFI-5041: fixes http client version issue Change-Id: I1b87ec4752ff6e1603025883a72113919aba5dd4 NIFI-5041: fixes Kerberos configuration Change-Id: I868fdf3ea7cfd28cf415164e420f23bf3f6eefeb NIFI-5041: adds new NOTICE entries NIFI-5041: yields processor if no session is available, fixes error handling in session manager thread, fixes error returned in KerberosKeytabSPNegoScheme on authentication failure Change-Id: I443e063ae21c446980087e5464a4b70373d730f6 NIFI-5041: makes the session manager thread exceptions visible to the users Change-Id: I33fde5df6933cec2a87a4d82e681d4464f21b459 NIFI-5041: adds special SessionManagerException to identify error occurred on session manager thread Change-Id: I25a52c025376a0cd238f14bda533d6f5f3e5fb4a This closes #2630 Signed-off-by: Matthew Burgess <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a1794b10 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a1794b10 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a1794b10 Branch: refs/heads/master Commit: a1794b101ea843410606b65bbb75fd8bc87ccd39 Parents: 83fccc4 Author: Peter Toth <[email protected]> Authored: Tue Apr 10 14:27:51 2018 +0200 Committer: Matthew Burgess <[email protected]> Committed: Thu May 31 11:07:11 2018 -0400 ---------------------------------------------------------------------- NOTICE | 6 + nifi-assembly/NOTICE | 6 + .../nifi-hadoop-utils/pom.xml | 12 ++ .../nifi/hadoop/KerberosConfiguration.java | 52 ++++++++ .../nifi/hadoop/KerberosKeytabCredentials.java | 51 ++++++++ .../KerberosKeytabSPNegoAuthSchemeProvider.java | 32 +++++ .../nifi/hadoop/KerberosKeytabSPNegoScheme.java | 83 ++++++++++++ .../src/main/resources/META-INF/NOTICE | 11 ++ .../nifi-livy-controller-service-api/pom.xml | 5 + .../controller/api/livy/LivySessionService.java | 9 +- .../livy/exception/SessionManagerException.java | 27 ++++ .../nifi-livy-controller-service/pom.xml | 24 ++++ .../controller/livy/LivySessionController.java | 131 ++++++++++++++----- .../nifi-livy-processors/pom.xml | 5 + .../livy/ExecuteSparkInteractive.java | 78 ++++++----- 15 files changed, 461 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/a1794b10/NOTICE ---------------------------------------------------------------------- diff --git a/NOTICE b/NOTICE index a4f0bb7..e0bb225 100644 --- a/NOTICE +++ b/NOTICE @@ -20,6 +20,12 @@ This includes derived works from the Apache Storm (ASLv2 licensed) project (http org/apache/storm/hive/common/HiveOptions.java and can be found in the org.apache.nifi.util.hive package +This includes derived works from the Apache Hadoop (ASLv2 licensed) project (https://github.com/apache/hadoop): + Copyright 2014 The Apache Software Foundation + The derived work is adapted from + org/apache/hadoop/security/authentication/client/KerberosAuthenticator.java + and can be found in the org.apache.nifi.hadoop package + This includes derived works from the Apache Hive (ASLv2 licensed) project (https://github.com/apache/hive): Copyright 2008-2016 The Apache Software Foundation The derived work is adapted from http://git-wip-us.apache.org/repos/asf/nifi/blob/a1794b10/nifi-assembly/NOTICE ---------------------------------------------------------------------- diff --git a/nifi-assembly/NOTICE b/nifi-assembly/NOTICE index e27e234..9f413d6 100644 --- a/nifi-assembly/NOTICE +++ b/nifi-assembly/NOTICE @@ -20,6 +20,12 @@ This includes derived works from the Apache Storm (ASLv2 licensed) project (http org/apache/storm/hive/common/HiveOptions.java and can be found in the org.apache.nifi.util.hive package +This includes derived works from the Apache Hadoop (ASLv2 licensed) project (https://github.com/apache/hadoop): + Copyright 2014 The Apache Software Foundation + The derived work is adapted from + org/apache/hadoop/security/authentication/client/KerberosAuthenticator.java + and can be found in the org.apache.nifi.hadoop package + This includes derived works from the Apache Hive (ASLv2 licensed) project (https://github.com/apache/hive): Copyright 2008-2016 The Apache Software Foundation The derived work is adapted from http://git-wip-us.apache.org/repos/asf/nifi/blob/a1794b10/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/pom.xml index 12da3f7..7d9b681 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/pom.xml +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/pom.xml @@ -52,6 +52,18 @@ <version>${hadoop.version}</version> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-auth</artifactId> + <version>${hadoop.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>4.5.5</version> + <scope>provided</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/nifi/blob/a1794b10/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosConfiguration.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosConfiguration.java b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosConfiguration.java new file mode 100644 index 0000000..ade65fe --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosConfiguration.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hadoop; + +import org.apache.hadoop.security.authentication.util.KerberosUtil; + +import javax.security.auth.login.AppConfigurationEntry; +import java.util.HashMap; +import java.util.Map; + +/** + * Modified Kerberos configuration class from {@link org.apache.hadoop.security.authentication.client.KerberosAuthenticator.KerberosConfiguration} + * that requires authentication from a keytab. + */ +public class KerberosConfiguration extends javax.security.auth.login.Configuration { + + private static final Map<String, String> USER_KERBEROS_OPTIONS = new HashMap<>(); + private static final AppConfigurationEntry USER_KERBEROS_LOGIN; + private static final AppConfigurationEntry[] USER_KERBEROS_CONF; + + KerberosConfiguration(String principal, String keytab) { + USER_KERBEROS_OPTIONS.put("principal", principal); + USER_KERBEROS_OPTIONS.put("keyTab", keytab); + } + + public AppConfigurationEntry[] getAppConfigurationEntry(String appName) { + return USER_KERBEROS_CONF; + } + + static { + USER_KERBEROS_OPTIONS.put("doNotPrompt", "true"); + USER_KERBEROS_OPTIONS.put("useKeyTab", "true"); + USER_KERBEROS_OPTIONS.put("refreshKrb5Config", "true"); + USER_KERBEROS_LOGIN = new AppConfigurationEntry(KerberosUtil.getKrb5LoginModuleName(), AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, USER_KERBEROS_OPTIONS); + USER_KERBEROS_CONF = new AppConfigurationEntry[]{USER_KERBEROS_LOGIN}; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a1794b10/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosKeytabCredentials.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosKeytabCredentials.java b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosKeytabCredentials.java new file mode 100644 index 0000000..24a4354 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosKeytabCredentials.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hadoop; + +import org.apache.http.auth.Credentials; + +import javax.security.auth.kerberos.KerberosPrincipal; +import java.security.Principal; + +/** + * Crendentials that incorporate a user principal and a keytab file. + */ +public class KerberosKeytabCredentials implements Credentials { + + private final KerberosPrincipal userPrincipal; + private final String keytab; + + public KerberosKeytabCredentials(String principalName, String keytab) { + this.userPrincipal = new KerberosPrincipal(principalName); + this.keytab = keytab; + } + + @Override + public Principal getUserPrincipal() { + return userPrincipal; + } + + @Override + public String getPassword() { + return null; + } + + public String getKeytab() { + return keytab; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a1794b10/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosKeytabSPNegoAuthSchemeProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosKeytabSPNegoAuthSchemeProvider.java b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosKeytabSPNegoAuthSchemeProvider.java new file mode 100644 index 0000000..295b765 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosKeytabSPNegoAuthSchemeProvider.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hadoop; + +import org.apache.http.auth.AuthScheme; +import org.apache.http.auth.AuthSchemeProvider; +import org.apache.http.protocol.HttpContext; + +/** + * Provider class for KerberosKeytabSPNegoAuthScheme. + */ +public class KerberosKeytabSPNegoAuthSchemeProvider implements AuthSchemeProvider { + + public AuthScheme create(HttpContext context) { + return new KerberosKeytabSPNegoScheme(); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a1794b10/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosKeytabSPNegoScheme.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosKeytabSPNegoScheme.java b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosKeytabSPNegoScheme.java new file mode 100644 index 0000000..fd7171c --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosKeytabSPNegoScheme.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hadoop; + +import org.apache.hadoop.security.authentication.util.KerberosUtil; +import org.apache.http.auth.Credentials; +import org.apache.http.impl.auth.SPNegoScheme; +import org.ietf.jgss.GSSContext; +import org.ietf.jgss.GSSException; +import org.ietf.jgss.GSSManager; +import org.ietf.jgss.GSSName; +import org.ietf.jgss.Oid; + +import javax.security.auth.Subject; +import javax.security.auth.login.LoginContext; +import javax.security.auth.login.LoginException; +import java.net.UnknownHostException; +import java.security.Principal; +import java.security.PrivilegedActionException; +import java.security.PrivilegedExceptionAction; +import java.util.HashSet; +import java.util.Set; + +/** + * This class provides a very similar authentication scheme and token generation as {@link SPNegoScheme} does. + * The token generation is based on a keytab file coming from {@link KerberosKeytabCredentials} and the process + * uses hadoop-auth tools. + */ +public class KerberosKeytabSPNegoScheme extends SPNegoScheme { + + public KerberosKeytabSPNegoScheme() { + super(true, false); + } + + @Override + public byte[] generateToken(byte[] input, String authServer, Credentials credentials) { + Set<Principal> principals = new HashSet<>(); + principals.add(credentials.getUserPrincipal()); + Subject subject = new Subject(false, principals, new HashSet<>(), new HashSet<>()); + + try { + LoginContext loginContext = new LoginContext("", subject, null, + new KerberosConfiguration(credentials.getUserPrincipal().getName(), + ((KerberosKeytabCredentials) credentials).getKeytab())); + loginContext.login(); + Subject loggedInSubject = loginContext.getSubject(); + + return Subject.doAs(loggedInSubject, new PrivilegedExceptionAction<byte[]>() { + + public byte[] run() throws UnknownHostException, ClassNotFoundException, GSSException, + IllegalAccessException, NoSuchFieldException { + GSSManager gssManager = GSSManager.getInstance(); + String servicePrincipal = KerberosUtil.getServicePrincipal("HTTP", authServer); + Oid serviceOid = KerberosUtil.getOidInstance("NT_GSS_KRB5_PRINCIPAL"); + GSSName serviceName = gssManager.createName(servicePrincipal, serviceOid); + Oid mechOid = KerberosUtil.getOidInstance("GSS_KRB5_MECH_OID"); + GSSContext gssContext = gssManager.createContext(serviceName, mechOid, null, 0); + gssContext.requestCredDeleg(true); + gssContext.requestMutualAuth(true); + return gssContext.initSecContext(input, 0, input.length); + } + + }); + } catch (PrivilegedActionException | LoginException e) { + throw new RuntimeException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a1794b10/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/resources/META-INF/NOTICE ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000..823ddc6 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/resources/META-INF/NOTICE @@ -0,0 +1,11 @@ +nifi-hadoop-utils +Copyright 2014-2018 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +This includes derived works from the Apache Hadoop (ASLv2 licensed) project (https://github.com/apache/hadoop): + Copyright 2014 The Apache Software Foundation + The derived work is adapted from + org/apache/hadoop/security/authentication/client/KerberosAuthenticator.java + and can be found in the org.apache.nifi.hadoop package http://git-wip-us.apache.org/repos/asf/nifi/blob/a1794b10/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api/pom.xml b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api/pom.xml index 7765065..c954e2d 100644 --- a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api/pom.xml +++ b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api/pom.xml @@ -28,5 +28,10 @@ <artifactId>nifi-api</artifactId> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>4.5.5</version> + </dependency> </dependencies> </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/a1794b10/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api/src/main/java/org/apache/nifi/controller/api/livy/LivySessionService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api/src/main/java/org/apache/nifi/controller/api/livy/LivySessionService.java b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api/src/main/java/org/apache/nifi/controller/api/livy/LivySessionService.java index 7627aa4..f75170f 100644 --- a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api/src/main/java/org/apache/nifi/controller/api/livy/LivySessionService.java +++ b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api/src/main/java/org/apache/nifi/controller/api/livy/LivySessionService.java @@ -17,18 +17,17 @@ package org.apache.nifi.controller.api.livy; import java.io.IOException; -import java.net.HttpURLConnection; import java.util.Map; +import org.apache.http.client.HttpClient; import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.controller.api.livy.exception.SessionManagerException; public interface LivySessionService extends ControllerService { String APPLICATION_JSON = "application/json"; String USER = "nifi"; - String GET = "GET"; - String POST = "POST"; - Map<String, String> getSession(); + Map<String, String> getSession() throws SessionManagerException; - HttpURLConnection getConnection(String urlString) throws IOException; + HttpClient getConnection() throws IOException, SessionManagerException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/a1794b10/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api/src/main/java/org/apache/nifi/controller/api/livy/exception/SessionManagerException.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api/src/main/java/org/apache/nifi/controller/api/livy/exception/SessionManagerException.java b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api/src/main/java/org/apache/nifi/controller/api/livy/exception/SessionManagerException.java new file mode 100644 index 0000000..77b63f0 --- /dev/null +++ b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api/src/main/java/org/apache/nifi/controller/api/livy/exception/SessionManagerException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.api.livy.exception; + +public class SessionManagerException extends Exception { + + private static final long serialVersionUID = 1L; + + public SessionManagerException(final Throwable t) { + super(t); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a1794b10/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service/pom.xml b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service/pom.xml index 84eedf9..56aa4f6 100644 --- a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service/pom.xml +++ b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service/pom.xml @@ -60,5 +60,29 @@ <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-mapper-asl</artifactId> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-kerberos-credentials-service-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-hadoop-utils</artifactId> + <version>1.7.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-auth</artifactId> + <version>2.7.3</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + </exclusions> + </dependency> </dependencies> </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/a1794b10/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service/src/main/java/org/apache/nifi/controller/livy/LivySessionController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service/src/main/java/org/apache/nifi/controller/livy/LivySessionController.java b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service/src/main/java/org/apache/nifi/controller/livy/LivySessionController.java index f9ded28..77b146a 100644 --- a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service/src/main/java/org/apache/nifi/controller/livy/LivySessionController.java +++ b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service/src/main/java/org/apache/nifi/controller/livy/LivySessionController.java @@ -21,11 +21,8 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; -import java.io.OutputStream; import java.net.ConnectException; -import java.net.HttpURLConnection; import java.net.SocketTimeoutException; -import java.net.URL; import java.nio.charset.StandardCharsets; import java.security.KeyManagementException; import java.security.KeyStore; @@ -45,6 +42,22 @@ import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.auth.AuthSchemeProvider; +import org.apache.http.auth.AuthScope; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.client.HttpClient; +import org.apache.http.client.config.AuthSchemes; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.config.Lookup; +import org.apache.http.config.RegistryBuilder; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.impl.client.HttpClientBuilder; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnDisabled; @@ -53,8 +66,11 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ControllerServiceInitializationContext; +import org.apache.nifi.controller.api.livy.exception.SessionManagerException; +import org.apache.nifi.hadoop.KerberosKeytabCredentials; +import org.apache.nifi.hadoop.KerberosKeytabSPNegoAuthSchemeProvider; +import org.apache.nifi.kerberos.KerberosCredentialsService; import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.ssl.SSLContextService; import org.codehaus.jackson.map.ObjectMapper; @@ -64,10 +80,8 @@ import org.codehaus.jettison.json.JSONObject; import org.apache.nifi.controller.api.livy.LivySessionService; import org.apache.nifi.expression.ExpressionLanguageScope; -import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLSocketFactory; import javax.net.ssl.TrustManagerFactory; @Tags({"Livy", "REST", "Spark", "http"}) @@ -157,6 +171,14 @@ public class LivySessionController extends AbstractControllerService implements .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .build(); + static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder() + .name("kerberos-credentials-service") + .displayName("Kerberos Credentials Service") + .description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos") + .identifiesControllerService(KerberosCredentialsService.class) + .required(false) + .build(); + private volatile String livyUrl; private volatile int sessionPoolSize; private volatile String controllerKind; @@ -168,6 +190,8 @@ public class LivySessionController extends AbstractControllerService implements private volatile int connectTimeout; private volatile Thread livySessionManagerThread = null; private volatile boolean enabled = true; + private volatile KerberosCredentialsService credentialsService; + private volatile SessionManagerException sessionManagerException; private List<PropertyDescriptor> properties; @@ -183,6 +207,7 @@ public class LivySessionController extends AbstractControllerService implements props.add(CONNECT_TIMEOUT); props.add(JARS); props.add(FILES); + props.add(KERBEROS_CREDENTIALS_SERVICE); properties = Collections.unmodifiableList(props); } @@ -204,6 +229,7 @@ public class LivySessionController extends AbstractControllerService implements sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); sslContext = sslContextService == null ? null : sslContextService.createSSLContext(SSLContextService.ClientAuth.NONE); connectTimeout = Math.toIntExact(context.getProperty(CONNECT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS)); + credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); this.livyUrl = "http" + (sslContextService != null ? "s" : "") + "://" + livyHost + ":" + livyPort; this.controllerKind = sessionKind; @@ -216,12 +242,16 @@ public class LivySessionController extends AbstractControllerService implements while (enabled) { try { manageSessions(); + sessionManagerException = null; + } catch (Exception e) { + getLogger().error("Livy Session Manager Thread run into an error, but continues to run", e); + sessionManagerException = new SessionManagerException(e); + } + try { Thread.sleep(sessionManagerStatusInterval); } catch (InterruptedException e) { Thread.currentThread().interrupt(); enabled = false; - } catch (IOException ioe) { - throw new ProcessException(ioe); } } }); @@ -243,7 +273,9 @@ public class LivySessionController extends AbstractControllerService implements } @Override - public Map<String, String> getSession() { + public Map<String, String> getSession() throws SessionManagerException { + checkSessionManagerException(); + Map<String, String> sessionMap = new HashMap<>(); try { final Map<Integer, JSONObject> sessionsCopy = sessions; @@ -254,6 +286,7 @@ public class LivySessionController extends AbstractControllerService implements if (state.equalsIgnoreCase("idle") && sessionKind.equalsIgnoreCase(controllerKind)) { sessionMap.put("sessionId", String.valueOf(sessionId)); sessionMap.put("livyUrl", livyUrl); + break; } } } catch (JSONException e) { @@ -263,18 +296,41 @@ public class LivySessionController extends AbstractControllerService implements } @Override - public HttpURLConnection getConnection(String urlString) throws IOException { - URL url = new URL(urlString); - HttpURLConnection connection = (HttpURLConnection) url.openConnection(); - connection.setConnectTimeout(connectTimeout); + public HttpClient getConnection() throws IOException, SessionManagerException { + checkSessionManagerException(); + + return openConnection(); + } + + private HttpClient openConnection() throws IOException { + HttpClientBuilder httpClientBuilder = HttpClientBuilder.create(); + if (sslContextService != null) { try { - setSslSocketFactory((HttpsURLConnection) connection, sslContextService, sslContext); + SSLContext sslContext = getSslSocketFactory(sslContextService); + httpClientBuilder.setSSLContext(sslContext); } catch (KeyStoreException | CertificateException | NoSuchAlgorithmException | UnrecoverableKeyException | KeyManagementException e) { throw new IOException(e); } } - return connection; + + if (credentialsService != null) { + CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(new AuthScope(null, -1, null), + new KerberosKeytabCredentials(credentialsService.getPrincipal(), credentialsService.getKeytab())); + httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); + Lookup<AuthSchemeProvider> authSchemeRegistry = RegistryBuilder.<AuthSchemeProvider> create() + .register(AuthSchemes.SPNEGO, new KerberosKeytabSPNegoAuthSchemeProvider()).build(); + httpClientBuilder.setDefaultAuthSchemeRegistry(authSchemeRegistry); + } + + RequestConfig.Builder requestConfigBuilder = RequestConfig.custom(); + requestConfigBuilder.setConnectTimeout(connectTimeout); + requestConfigBuilder.setConnectionRequestTimeout(connectTimeout); + requestConfigBuilder.setSocketTimeout(connectTimeout); + httpClientBuilder.setDefaultRequestConfig(requestConfigBuilder.build()); + + return httpClientBuilder.build(); } private void manageSessions() throws InterruptedException, IOException { @@ -439,36 +495,34 @@ public class LivySessionController extends AbstractControllerService implements } private JSONObject readJSONObjectFromUrlPOST(String urlString, Map<String, String> headers, String payload) throws IOException, JSONException { - HttpURLConnection connection = getConnection(urlString); - - connection.setRequestMethod(POST); - connection.setDoOutput(true); + HttpClient httpClient = openConnection(); + HttpPost request = new HttpPost(urlString); for (Map.Entry<String, String> entry : headers.entrySet()) { - connection.setRequestProperty(entry.getKey(), entry.getValue()); + request.addHeader(entry.getKey(), entry.getValue()); } + HttpEntity httpEntity = new StringEntity(payload); + request.setEntity(httpEntity); + HttpResponse response = httpClient.execute(request); - OutputStream os = connection.getOutputStream(); - os.write(payload.getBytes()); - os.flush(); - - if (connection.getResponseCode() != HttpURLConnection.HTTP_OK && connection.getResponseCode() != HttpURLConnection.HTTP_CREATED) { - throw new RuntimeException("Failed : HTTP error code : " + connection.getResponseCode() + " : " + connection.getResponseMessage()); + if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK && response.getStatusLine().getStatusCode() != HttpStatus.SC_CREATED) { + throw new RuntimeException("Failed : HTTP error code : " + response.getStatusLine().getStatusCode() + " : " + response.getStatusLine().getReasonPhrase()); } - InputStream content = connection.getInputStream(); + InputStream content = response.getEntity().getContent(); return readAllIntoJSONObject(content); } private JSONObject readJSONFromUrl(String urlString, Map<String, String> headers) throws IOException, JSONException { + HttpClient httpClient = openConnection(); - HttpURLConnection connection = getConnection(urlString); + HttpGet request = new HttpGet(urlString); for (Map.Entry<String, String> entry : headers.entrySet()) { - connection.setRequestProperty(entry.getKey(), entry.getValue()); + request.addHeader(entry.getKey(), entry.getValue()); } - connection.setRequestMethod(GET); - connection.setDoOutput(true); - InputStream content = connection.getInputStream(); + HttpResponse response = httpClient.execute(request); + + InputStream content = response.getEntity().getContent(); return readAllIntoJSONObject(content); } @@ -478,7 +532,7 @@ public class LivySessionController extends AbstractControllerService implements return new JSONObject(jsonText); } - private void setSslSocketFactory(HttpsURLConnection httpsURLConnection, SSLContextService sslService, SSLContext sslContext) + private SSLContext getSslSocketFactory(SSLContextService sslService) throws IOException, KeyStoreException, CertificateException, NoSuchAlgorithmException, UnrecoverableKeyException, KeyManagementException { final String keystoreLocation = sslService.getKeyStoreFile(); final String keystorePass = sslService.getKeyStorePassword(); @@ -506,7 +560,14 @@ public class LivySessionController extends AbstractControllerService implements sslContext.init(keyManagerFactory.getKeyManagers(), trustManagerFactory.getTrustManagers(), null); - final SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory(); - httpsURLConnection.setSSLSocketFactory(sslSocketFactory); + return sslContext; } + + private void checkSessionManagerException() throws SessionManagerException { + SessionManagerException exception = sessionManagerException; + if (exception != null) { + throw sessionManagerException; + } + } + } http://git-wip-us.apache.org/repos/asf/nifi/blob/a1794b10/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/pom.xml b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/pom.xml index 7b021c3..56e115d 100644 --- a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/pom.xml +++ b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/pom.xml @@ -86,5 +86,10 @@ <artifactId>jetty-server</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-kerberos-credentials-service-api</artifactId> + <scope>test</scope> + </dependency> </dependencies> </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/a1794b10/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java index dcb6b82..4a87842 100644 --- a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java +++ b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java @@ -20,8 +20,6 @@ import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; -import java.io.OutputStream; -import java.net.HttpURLConnection; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; @@ -37,10 +35,18 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringEscapeUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.api.livy.exception.SessionManagerException; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.logging.ComponentLog; @@ -162,10 +168,19 @@ public class ExecuteSparkInteractive extends AbstractProcessor { final ComponentLog log = getLogger(); final LivySessionService livySessionService = context.getProperty(LIVY_CONTROLLER_SERVICE).asControllerService(LivySessionService.class); - final Map<String, String> livyController = livySessionService.getSession(); - if (livyController == null || livyController.isEmpty()) { - log.debug("No Spark session available (yet), routing flowfile to wait"); + final Map<String, String> livyController; + try { + livyController = livySessionService.getSession(); + if (livyController == null || livyController.isEmpty()) { + log.debug("No Spark session available (yet), routing flowfile to wait"); + session.transfer(flowFile, REL_WAIT); + context.yield(); + return; + } + } catch (SessionManagerException sme) { + log.error("Error opening spark session, routing flowfile to wait", sme); session.transfer(flowFile, REL_WAIT); + context.yield(); return; } final long statusCheckInterval = context.getProperty(STATUS_CHECK_INTERVAL).evaluateAttributeExpressions(flowFile).asTimePeriod(TimeUnit.MILLISECONDS); @@ -214,14 +229,15 @@ public class ExecuteSparkInteractive extends AbstractProcessor { session.transfer(flowFile, REL_FAILURE); } } - } catch (IOException ioe) { - log.error("Failure processing flowfile {} due to {}, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); + } catch (IOException | SessionManagerException e) { + log.error("Failure processing flowfile {} due to {}, penalizing and routing to failure", new Object[]{flowFile, e.getMessage()}, e); flowFile = session.penalize(flowFile); session.transfer(flowFile, REL_FAILURE); } } - private JSONObject submitAndHandleJob(String livyUrl, LivySessionService livySessionService, String sessionId, String payload, long statusCheckInterval) throws IOException { + private JSONObject submitAndHandleJob(String livyUrl, LivySessionService livySessionService, String sessionId, String payload, long statusCheckInterval) + throws IOException, SessionManagerException { ComponentLog log = getLogger(); String statementUrl = livyUrl + "/sessions/" + sessionId + "/statements"; JSONObject output = null; @@ -265,42 +281,42 @@ public class ExecuteSparkInteractive extends AbstractProcessor { } private JSONObject readJSONObjectFromUrlPOST(String urlString, LivySessionService livySessionService, Map<String, String> headers, String payload) - throws IOException, JSONException { - - HttpURLConnection connection = livySessionService.getConnection(urlString); - connection.setRequestMethod("POST"); - connection.setDoOutput(true); + throws IOException, JSONException, SessionManagerException { + HttpClient httpClient = livySessionService.getConnection(); + HttpPost request = new HttpPost(urlString); for (Map.Entry<String, String> entry : headers.entrySet()) { - connection.setRequestProperty(entry.getKey(), entry.getValue()); + request.addHeader(entry.getKey(), entry.getValue()); } + HttpEntity httpEntity = new StringEntity(payload); + request.setEntity(httpEntity); + HttpResponse response = httpClient.execute(request); - OutputStream os = connection.getOutputStream(); - os.write(payload.getBytes()); - os.flush(); - - if (connection.getResponseCode() != HttpURLConnection.HTTP_OK && connection.getResponseCode() != HttpURLConnection.HTTP_CREATED) { - throw new RuntimeException("Failed : HTTP error code : " + connection.getResponseCode() + " : " + connection.getResponseMessage()); + if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK && response.getStatusLine().getStatusCode() != HttpStatus.SC_CREATED) { + throw new RuntimeException("Failed : HTTP error code : " + response.getStatusLine().getStatusCode() + " : " + response.getStatusLine().getReasonPhrase()); } - InputStream content = connection.getInputStream(); - BufferedReader rd = new BufferedReader(new InputStreamReader(content, StandardCharsets.UTF_8)); - String jsonText = IOUtils.toString(rd); - return new JSONObject(jsonText); + InputStream content = response.getEntity().getContent(); + return readAllIntoJSONObject(content); } - private JSONObject readJSONObjectFromUrl(final String urlString, LivySessionService livySessionService, final Map<String, String> headers) - throws IOException, JSONException { + private JSONObject readJSONObjectFromUrl(String urlString, LivySessionService livySessionService, Map<String, String> headers) throws IOException, JSONException, SessionManagerException { + HttpClient httpClient = livySessionService.getConnection(); - HttpURLConnection connection = livySessionService.getConnection(urlString); + HttpGet request = new HttpGet(urlString); for (Map.Entry<String, String> entry : headers.entrySet()) { - connection.setRequestProperty(entry.getKey(), entry.getValue()); + request.addHeader(entry.getKey(), entry.getValue()); } - connection.setRequestMethod("GET"); - connection.setDoOutput(true); - InputStream content = connection.getInputStream(); + HttpResponse response = httpClient.execute(request); + + InputStream content = response.getEntity().getContent(); + return readAllIntoJSONObject(content); + } + + private JSONObject readAllIntoJSONObject(InputStream content) throws IOException, JSONException { BufferedReader rd = new BufferedReader(new InputStreamReader(content, StandardCharsets.UTF_8)); String jsonText = IOUtils.toString(rd); return new JSONObject(jsonText); } + }
