Repository: falcon Updated Branches: refs/heads/master dc470882f -> d7a4eeb4e
FALCON-2324 Retry, Latererun and JobLog mover services not working with kerberos enabled Author: sandeep <[email protected]> Author: sandeep.samudrala <[email protected]> Author: Sandeep Samudrala <[email protected]> Reviewers: @pallavi-rao Closes #399 from sandeepSamudrala/master and squashes the following commits: 974dbbe80 [Sandeep Samudrala] FALCON-2324 Retry, Latererun and JobLog mover services not working with kerberos enabled 678df54d1 [Sandeep Samudrala] Merge branch 'master' of https://github.com/apache/falcon 9c907efdb [sandeep.samudrala] FALCON-2319. Falcon Build failure fix for enunciate b5fb5786c [sandeep.samudrala] git applyMerge branch 'master' of https://github.com/apache/falcon 575e76866 [sandeep.samudrala] Merge branch 'master' of https://github.com/apache/falcon e0ad35884 [sandeep] Merge branch 'master' of https://github.com/apache/falcon f96a084f6 [sandeep] Merge branch 'master' of https://github.com/apache/falcon 9cf36e93f [sandeep] Merge branch 'master' of https://github.com/apache/falcon bbca081ff [sandeep] Merge branch 'master' of https://github.com/apache/falcon 48f6afaca [sandeep] Merge branch 'master' of https://github.com/apache/falcon 250cc4609 [sandeep] Merge branch 'master' of https://github.com/apache/falcon d0393e98d [sandeep] Merge branch 'master' of https://github.com/apache/falcon a17880526 [sandeep] Merge branch 'master' of https://github.com/apache/falcon d6dc8bfaa [sandeep] Merge branch 'master' of https://github.com/apache/falcon 1bb8d3c72 [sandeep] Merge branch 'master' of https://github.com/apache/falcon c06556623 [sandeep] reverting last line changes made 1a4dcd234 [sandeep] rebased and resolved the conflicts from master 271318b9c [sandeep] FALCON-2097. Adding UT to the new method for getting next instance time with Delay. a94d4fe07 [sandeep] rebasing from master 9e68a5783 [sandeep] FALCON-298. Feed update with replication delay creates holes Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/d7a4eeb4 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/d7a4eeb4 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/d7a4eeb4 Branch: refs/heads/master Commit: d7a4eeb4ea19f731a984f40a80cb081995e57521 Parents: dc47088 Author: sandeep <[email protected]> Authored: Mon Feb 26 11:10:38 2018 +0530 Committer: pallavi-rao <[email protected]> Committed: Mon Feb 26 11:10:38 2018 +0530 ---------------------------------------------------------------------- .../org/apache/falcon/security/CurrentUser.java | 8 +- .../apache/falcon/security/HostnameFilter.java | 105 +++++++++++++++++++ .../org/apache/falcon/logging/JobLogMover.java | 10 +- .../workflow/engine/OozieClientFactory.java | 2 +- .../apache/oozie/client/ProxyOozieClient.java | 20 ++++ .../apache/falcon/security/HostnameFilter.java | 105 ------------------- .../rerun/handler/AbstractRerunConsumer.java | 2 +- .../rerun/handler/AbstractRerunHandler.java | 9 +- .../falcon/rerun/handler/LateRerunConsumer.java | 5 +- .../falcon/rerun/handler/LateRerunHandler.java | 2 +- .../falcon/rerun/handler/RetryConsumer.java | 4 +- 11 files changed, 148 insertions(+), 124 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/d7a4eeb4/common/src/main/java/org/apache/falcon/security/CurrentUser.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/security/CurrentUser.java b/common/src/main/java/org/apache/falcon/security/CurrentUser.java index e7c1594..63e599a 100644 --- a/common/src/main/java/org/apache/falcon/security/CurrentUser.java +++ b/common/src/main/java/org/apache/falcon/security/CurrentUser.java @@ -72,10 +72,8 @@ public final class CurrentUser { * * @param doAsUser doAs user * @param proxyHost proxy host - * @throws IOException */ - public static void proxyDoAsUser(final String doAsUser, - final String proxyHost) throws IOException { + public static void proxyDoAsUser(final String doAsUser, final String proxyHost) { if (!isAuthenticated()) { throw new IllegalStateException("Authentication not done"); } @@ -106,10 +104,8 @@ public final class CurrentUser { * * @param aclOwner entity acl owner * @param aclGroup entity acl group - * @throws IOException */ - public static void proxy(final String aclOwner, - final String aclGroup) throws IOException { + public static void proxy(final String aclOwner, final String aclGroup) { if (!isAuthenticated() || StringUtils.isEmpty(aclOwner)) { throw new IllegalStateException("Authentication not done or Bad user name"); } http://git-wip-us.apache.org/repos/asf/falcon/blob/d7a4eeb4/common/src/main/java/org/apache/falcon/security/HostnameFilter.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/security/HostnameFilter.java b/common/src/main/java/org/apache/falcon/security/HostnameFilter.java new file mode 100644 index 0000000..19e7bf4 --- /dev/null +++ b/common/src/main/java/org/apache/falcon/security/HostnameFilter.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.security; + +import javax.servlet.Filter; +import javax.servlet.FilterChain; +import javax.servlet.FilterConfig; +import javax.servlet.ServletException; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Filter that resolves the requester hostname. + */ +public class HostnameFilter implements Filter { + private static final Logger LOG = LoggerFactory.getLogger(HostnameFilter.class); + + static final ThreadLocal<String> HOSTNAME_TL = new ThreadLocal<>(); + + /** + * Initializes the filter. + * + * @param config filter configuration. + * + * @throws javax.servlet.ServletException thrown if the filter could not be initialized. + */ + @Override + public void init(FilterConfig config) throws ServletException { + } + + /** + * Resolves the requester hostname and delegates the request to the chain. + * <p> + * The requester hostname is available via the {@link #get} method. + * + * @param request servlet request. + * @param response servlet response. + * @param chain filter chain. + * + * @throws java.io.IOException thrown if an IO error occurs. + * @throws ServletException thrown if a servlet error occurs. + */ + @Override + public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) + throws IOException, ServletException { + try { + String hostname; + try { + String address = request.getRemoteAddr(); + if (address != null) { + hostname = InetAddress.getByName(address).getCanonicalHostName(); + } else { + LOG.warn("Request remote address is NULL"); + hostname = "???"; + } + } catch (UnknownHostException ex) { + LOG.warn("Request remote address could not be resolved, {}", ex.toString(), ex); + hostname = "???"; + } + HOSTNAME_TL.set(hostname); + chain.doFilter(request, response); + } finally { + HOSTNAME_TL.remove(); + } + } + + /** + * Returns the requester hostname. + * + * @return the requester hostname. + */ + public static String get() { + return HOSTNAME_TL.get(); + } + + /** + * Destroys the filter. + * <p> + * This implementation is a NOP. + */ + @Override + public void destroy() { + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/d7a4eeb4/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java index 5023db3..e80967f 100644 --- a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java +++ b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java @@ -23,13 +23,14 @@ import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.process.EngineType; import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.falcon.security.CurrentUser; +import org.apache.falcon.security.HostnameFilter; import org.apache.falcon.workflow.WorkflowExecutionContext; +import org.apache.falcon.workflow.engine.OozieClientFactory; import org.apache.falcon.workflow.util.OozieActionConfigurationHelper; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.OozieClientException; @@ -72,10 +73,6 @@ public class JobLogMover { } public void moveLog(WorkflowExecutionContext context){ - if (UserGroupInformation.isSecurityEnabled()) { - LOG.info("Unable to move logs as security is enabled."); - return; - } try { run(context); } catch (Exception ignored) { @@ -95,10 +92,11 @@ public class JobLogMover { String instanceOwner = context.getWorkflowUser(); if (StringUtils.isNotBlank(instanceOwner)) { CurrentUser.authenticate(instanceOwner); + CurrentUser.proxyDoAsUser(instanceOwner, HostnameFilter.get()); } else { CurrentUser.authenticate(System.getProperty("user.name")); } - OozieClient client = new OozieClient(engineUrl); + OozieClient client = OozieClientFactory.getClientRef(engineUrl); WorkflowJob jobInfo; try { jobInfo = client.getJobInfo(context.getWorkflowId()); http://git-wip-us.apache.org/repos/asf/falcon/blob/d7a4eeb4/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java index 3380b1a..278b0c4 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java @@ -55,7 +55,7 @@ public final class OozieClientFactory { return get((Cluster) ConfigurationStore.get().get(EntityType.CLUSTER, clusterName)); } - private static OozieClient getClientRef(String oozieUrl) + public static OozieClient getClientRef(String oozieUrl) throws FalconException { if (OozieConstants.LOCAL_OOZIE.equals(oozieUrl)) { http://git-wip-us.apache.org/repos/asf/falcon/blob/d7a4eeb4/oozie/src/main/java/org/apache/oozie/client/ProxyOozieClient.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/oozie/client/ProxyOozieClient.java b/oozie/src/main/java/org/apache/oozie/client/ProxyOozieClient.java index eaf3ede..fca6137 100644 --- a/oozie/src/main/java/org/apache/oozie/client/ProxyOozieClient.java +++ b/oozie/src/main/java/org/apache/oozie/client/ProxyOozieClient.java @@ -489,6 +489,26 @@ public class ProxyOozieClient extends AuthOozieClient { } @Override + public List<CoordinatorAction> reRunCoord(final String jobId, final String rerunType, final String scope, + final boolean refresh, final boolean noCleanup, + final boolean failed, final Properties props) + throws OozieClientException { + try { + return doAs(CurrentUser.getUser(), new Callable<List<CoordinatorAction>>() { + + public List<CoordinatorAction> call() throws Exception { + return ProxyOozieClient.super.reRunCoord(jobId, rerunType, scope, refresh, noCleanup, failed, + props); + } + }); + } catch (OozieClientException e) { + throw e; + } catch (Exception e) { + throw new OozieClientException(e.toString(), e); + } + } + + @Override public Void reRunBundle(final String jobId, final String coordScope, final String dateScope, final boolean refresh, final boolean noCleanup) throws OozieClientException { http://git-wip-us.apache.org/repos/asf/falcon/blob/d7a4eeb4/prism/src/main/java/org/apache/falcon/security/HostnameFilter.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/security/HostnameFilter.java b/prism/src/main/java/org/apache/falcon/security/HostnameFilter.java deleted file mode 100644 index 19e7bf4..0000000 --- a/prism/src/main/java/org/apache/falcon/security/HostnameFilter.java +++ /dev/null @@ -1,105 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.security; - -import javax.servlet.Filter; -import javax.servlet.FilterChain; -import javax.servlet.FilterConfig; -import javax.servlet.ServletException; -import javax.servlet.ServletRequest; -import javax.servlet.ServletResponse; -import java.io.IOException; -import java.net.InetAddress; -import java.net.UnknownHostException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Filter that resolves the requester hostname. - */ -public class HostnameFilter implements Filter { - private static final Logger LOG = LoggerFactory.getLogger(HostnameFilter.class); - - static final ThreadLocal<String> HOSTNAME_TL = new ThreadLocal<>(); - - /** - * Initializes the filter. - * - * @param config filter configuration. - * - * @throws javax.servlet.ServletException thrown if the filter could not be initialized. - */ - @Override - public void init(FilterConfig config) throws ServletException { - } - - /** - * Resolves the requester hostname and delegates the request to the chain. - * <p> - * The requester hostname is available via the {@link #get} method. - * - * @param request servlet request. - * @param response servlet response. - * @param chain filter chain. - * - * @throws java.io.IOException thrown if an IO error occurs. - * @throws ServletException thrown if a servlet error occurs. - */ - @Override - public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) - throws IOException, ServletException { - try { - String hostname; - try { - String address = request.getRemoteAddr(); - if (address != null) { - hostname = InetAddress.getByName(address).getCanonicalHostName(); - } else { - LOG.warn("Request remote address is NULL"); - hostname = "???"; - } - } catch (UnknownHostException ex) { - LOG.warn("Request remote address could not be resolved, {}", ex.toString(), ex); - hostname = "???"; - } - HOSTNAME_TL.set(hostname); - chain.doFilter(request, response); - } finally { - HOSTNAME_TL.remove(); - } - } - - /** - * Returns the requester hostname. - * - * @return the requester hostname. - */ - public static String get() { - return HOSTNAME_TL.get(); - } - - /** - * Destroys the filter. - * <p> - * This implementation is a NOP. - */ - @Override - public void destroy() { - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/d7a4eeb4/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java index 000fd55..8a75754 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java @@ -77,7 +77,7 @@ public abstract class AbstractRerunConsumer<T extends RerunEvent, M extends Abst // Login the user to access WfEngine as this user CurrentUser.authenticate(message.getWorkflowUser()); AbstractWorkflowEngine wfEngine = handler.getWfEngine(message.getEntityType(), - message.getEntityName()); + message.getEntityName(), message.getWorkflowUser()); String jobStatus = wfEngine.getWorkflowStatus( message.getClusterName(), message.getWfId()); handleRerun(message.getClusterName(), jobStatus, message, http://git-wip-us.apache.org/repos/asf/falcon/blob/d7a4eeb4/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java index 700095e..bfeb6c3 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java @@ -25,6 +25,8 @@ import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.process.Retry; import org.apache.falcon.rerun.event.RerunEvent; import org.apache.falcon.rerun.queue.DelayedQueue; +import org.apache.falcon.security.CurrentUser; +import org.apache.falcon.security.HostnameFilter; import org.apache.falcon.workflow.WorkflowEngineFactory; import org.apache.falcon.workflow.WorkflowExecutionListener; import org.apache.falcon.workflow.engine.AbstractWorkflowEngine; @@ -60,7 +62,12 @@ public abstract class AbstractRerunHandler<T extends RerunEvent, M extends Delay String wfId, String parentId, String workflowUser, long msgReceivedTime); //RESUME CHECKSTYLE CHECK ParameterNumberCheck - public AbstractWorkflowEngine getWfEngine(String entityType, String entityName) throws FalconException { + public AbstractWorkflowEngine getWfEngine(String entityType, String entityName, String doAsUser) + throws FalconException { + if (StringUtils.isNotBlank(doAsUser)) { + CurrentUser.authenticate(doAsUser); + CurrentUser.proxyDoAsUser(doAsUser, HostnameFilter.get()); + } if (StringUtils.isBlank(entityType) || StringUtils.isBlank(entityName)) { return wfEngine; } http://git-wip-us.apache.org/repos/asf/falcon/blob/d7a4eeb4/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java index 98db379..1ba5a65 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java @@ -85,7 +85,8 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv if (StringUtils.isBlank(id)) { id = message.getWfId(); } - handler.getWfEngine(entityType, entityName).reRun(message.getClusterName(), id, null, true); + handler.getWfEngine(entityType, entityName, message.getWorkflowUser()) + .reRun(message.getClusterName(), id, null, true); LOG.info("Scheduled late rerun for wf-id: {} on cluster: {}", message.getWfId(), message.getClusterName()); } catch (Exception e) { @@ -106,7 +107,7 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv public String detectLate(LaterunEvent message) throws Exception { LateDataHandler late = new LateDataHandler(); AbstractWorkflowEngine wfEngine = handler.getWfEngine(message.getEntityType(), - message.getEntityName()); + message.getEntityName(), message.getWorkflowUser()); Properties properties = wfEngine.getWorkflowProperties(message.getClusterName(), message.getWfId()); String falconInputs = properties.getProperty(WorkflowExecutionArgs.INPUT_NAMES.getName()); String falconInPaths = properties.getProperty(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName()); http://git-wip-us.apache.org/repos/asf/falcon/blob/d7a4eeb4/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java index 02ab792..0be0b25 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java @@ -66,7 +66,7 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends Long wait = getEventDelay(entity, nominalTime); if (wait == -1) { LOG.info("Late rerun expired for entity: {} ({})", entityType, entityName); - AbstractWorkflowEngine wfEngine = this.getWfEngine(entityType, entityName); + AbstractWorkflowEngine wfEngine = this.getWfEngine(entityType, entityName, entity.getACL().getOwner()); java.util.Properties properties = wfEngine.getWorkflowProperties(cluster, wfId); String logDir = properties.getProperty("logDir"); String srcClusterName = properties.getProperty("srcClusterName"); http://git-wip-us.apache.org/repos/asf/falcon/blob/d7a4eeb4/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java index 3cad362..7ee2337 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java @@ -63,7 +63,9 @@ public class RetryConsumer<T extends RetryHandler<DelayedQueue<RetryEvent>>> if (!id.contains("-C@") && StringUtils.isNotBlank(message.getParentId())) { id = message.getParentId(); } - handler.getWfEngine(entityType, entityName).reRun(message.getClusterName(), id, null, false); + + handler.getWfEngine(entityType, entityName, message.getWorkflowUser()) + .reRun(message.getClusterName(), id, null, false); } catch (Exception e) { if (e instanceof EntityNotRegisteredException) { LOG.warn("Entity {} of type {} doesn't exist in config store. So retry "
