FALCON-864 Falcon superuser is unable to delete scheduled feed. Contributed by Venkatesh Seetharam
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/7cb0666b Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/7cb0666b Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/7cb0666b Branch: refs/heads/master Commit: 7cb0666b2f05118e3c86d70a71f7e27e872703e1 Parents: e9b12f4 Author: Venkatesh Seetharam <venkat...@apache.org> Authored: Fri Nov 7 11:31:16 2014 -0800 Committer: Venkatesh Seetharam <venkat...@apache.org> Committed: Fri Nov 7 11:31:16 2014 -0800 ---------------------------------------------------------------------- CHANGES.txt | 3 + .../falcon/cleanup/AbstractCleanupHandler.java | 4 +- .../org/apache/falcon/entity/EntityUtil.java | 5 +- .../apache/falcon/entity/FileSystemStorage.java | 5 +- .../org/apache/falcon/entity/ProcessHelper.java | 4 +- .../entity/parser/ClusterEntityParser.java | 5 +- .../falcon/entity/parser/EntityParser.java | 2 +- .../falcon/entity/parser/FeedEntityParser.java | 3 +- .../entity/parser/ProcessEntityParser.java | 5 +- .../falcon/hadoop/HadoopClientFactory.java | 55 +++---- .../falcon/security/AuthorizationProvider.java | 27 +++- .../org/apache/falcon/security/CurrentUser.java | 153 ++++++++++++------- .../security/DefaultAuthorizationProvider.java | 151 +++++++++++------- .../apache/falcon/security/FalconPrincipal.java | 38 ----- .../org/apache/falcon/update/UpdateHelper.java | 3 +- .../apache/falcon/security/CurrentUserTest.java | 75 ++++++++- .../DefaultAuthorizationProviderTest.java | 3 +- .../org/apache/falcon/logging/LogProvider.java | 3 +- .../apache/falcon/oozie/OozieEntityBuilder.java | 4 +- .../OozieOrchestrationWorkflowBuilder.java | 5 +- .../feed/FeedReplicationCoordinatorBuilder.java | 2 +- .../oozie/process/ProcessBundleBuilder.java | 2 +- .../ProcessExecutionWorkflowBuilder.java | 2 +- .../workflow/engine/OozieClientFactory.java | 13 +- .../engine/OozieHouseKeepingService.java | 3 +- .../workflow/engine/OozieWorkflowEngine.java | 2 +- .../apache/oozie/client/ProxyOozieClient.java | 5 +- .../falcon/resource/AbstractEntityManager.java | 34 +++-- .../falcon/security/FalconAuditFilter.java | 16 +- .../security/FalconAuthenticationFilter.java | 2 +- .../security/FalconAuthorizationFilter.java | 84 ++++++++-- .../security/FalconAuthorizationFilterTest.java | 14 +- .../falcon/rerun/handler/LateRerunHandler.java | 3 +- 33 files changed, 459 insertions(+), 276 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e021bec..3f63b2a 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -129,6 +129,9 @@ Trunk (Unreleased) OPTIMIZATIONS BUG FIXES + FALCON-864 Falcon superuser is unable to delete scheduled feed + (Venkatesh Seetharam) + FALCON-862 Falcon entity Rest API - filter by tags also returns entities that do not have tags (Balu Vellanki via Venkatesh Seetharam) http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java b/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java index be300d7..6fa6c73 100644 --- a/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java +++ b/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java @@ -30,6 +30,7 @@ import org.apache.falcon.entity.v0.Frequency.TimeUnit; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.expression.ExpressionHelper; import org.apache.falcon.hadoop.HadoopClientFactory; +import org.apache.falcon.security.CurrentUser; import org.apache.falcon.util.DeploymentUtil; import org.apache.falcon.util.RuntimeProperties; import org.apache.falcon.util.StartupProperties; @@ -99,8 +100,9 @@ public abstract class AbstractCleanupHandler { throw new FalconException("ACL for entity " + entity.getName() + " is empty"); } + CurrentUser.authenticate(acl.getOwner()); // proxy user return HadoopClientFactory.get().createProxiedFileSystem( - ClusterHelper.getConfiguration(cluster), acl); + ClusterHelper.getConfiguration(cluster)); } catch (Exception e) { throw new FalconException(e); } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java index bcebb94..59e43fb 100644 --- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java +++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java @@ -579,11 +579,10 @@ public final class EntityUtil { //Returns all staging paths for the entity public static FileStatus[] getAllStagingPaths(org.apache.falcon.entity.v0.cluster.Cluster cluster, - Entity entity) - throws FalconException { + Entity entity) throws FalconException { Path basePath = getBaseStagingPath(cluster, entity); FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem( - ClusterHelper.getConfiguration(cluster), entity.getACL()); + ClusterHelper.getConfiguration(cluster)); try { return fs.listStatus(basePath, new PathFilter() { @Override http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java index 953c19e..76222fc 100644 --- a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java +++ b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java @@ -216,7 +216,7 @@ public class FileSystemStorage implements Storage { } public Path getWorkingDir() { - return new Path(CurrentUser.getSubject() == null ? "/" : "/user/" + CurrentUser.getUser()); + return new Path(CurrentUser.isAuthenticated() ? "/user/" + CurrentUser.getUser() : "/"); } @Override @@ -300,8 +300,7 @@ public class FileSystemStorage implements Storage { getLocations(FeedHelper.getCluster(feed, clusterName), feed); Location location = getLocation(clusterSpecificLocation, locationType); try { - FileSystem fileSystem = HadoopClientFactory.get().createProxiedFileSystem( - getConf(), feed.getACL()); + FileSystem fileSystem = HadoopClientFactory.get().createProxiedFileSystem(getConf()); Cluster cluster = ClusterHelper.getCluster(clusterName); Properties baseProperties = FeedHelper.getClusterProperties(cluster); baseProperties.putAll(FeedHelper.getFeedProperties(feed)); http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java index 8073229..174f8f6 100644 --- a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java +++ b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java @@ -87,7 +87,7 @@ public final class ProcessHelper { Path buildPath) throws FalconException { try { FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem( - ClusterHelper.getConfiguration(cluster), process.getACL()); + ClusterHelper.getConfiguration(cluster)); Path wfPath = new Path(process.getWorkflow().getPath()); if (fs.isFile(wfPath)) { return new Path(buildPath.getParent(), EntityUtil.PROCESS_USER_DIR + "/" + wfPath.getName()); @@ -109,7 +109,7 @@ public final class ProcessHelper { Path libPath = new Path(userLibPath); FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem( - ClusterHelper.getConfiguration(cluster), process.getACL()); + ClusterHelper.getConfiguration(cluster)); if (fs.isFile(libPath)) { return new Path(buildPath, EntityUtil.PROCESS_USERLIB_DIR + "/" + libPath.getName()); } else { http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java index 5a7ec17..b801dec 100644 --- a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java +++ b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java @@ -70,6 +70,8 @@ public class ClusterEntityParser extends EntityParser<Cluster> { validateScheme(cluster, Interfacetype.REGISTRY); } + validateACL(cluster); + if (!EntityUtil.responsibleFor(cluster.getColo())) { return; } @@ -81,7 +83,6 @@ public class ClusterEntityParser extends EntityParser<Cluster> { validateMessagingInterface(cluster); validateRegistryInterface(cluster); - validateACL(cluster); validateLocations(cluster); } @@ -241,7 +242,7 @@ public class ClusterEntityParser extends EntityParser<Cluster> { Configuration conf = ClusterHelper.getConfiguration(cluster); FileSystem fs; try { - fs = HadoopClientFactory.get().createProxiedFileSystem(conf, cluster.getACL()); + fs = HadoopClientFactory.get().createProxiedFileSystem(conf); } catch (FalconException e) { throw new ValidationException( "Unable to get file system handle for cluster " + cluster.getName(), e); http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java index e2742a1..05b204d 100644 --- a/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java +++ b/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java @@ -149,7 +149,7 @@ public abstract class EntityParser<T extends Entity> { AccessControlList acl) throws AuthorizationException { try { SecurityUtil.getAuthorizationProvider().authorizeEntity(entityName, - getEntityType().name(), acl, "validate", CurrentUser.getProxyUGI()); + getEntityType().name(), acl, "submit", CurrentUser.getAuthenticatedUGI()); } catch (FalconException e) { throw new AuthorizationException(e); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java index b4d29ee..63f9202 100644 --- a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java +++ b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java @@ -72,6 +72,7 @@ public class FeedEntityParser extends EntityParser<Feed> { throw new ValidationException("Feed should have at least one cluster"); } + validateACL(feed); for (Cluster cluster : feed.getClusters().getClusters()) { validateEntityExists(EntityType.CLUSTER, cluster.getName()); validateClusterValidity(cluster.getValidity().getStart(), cluster.getValidity().getEnd(), @@ -487,7 +488,7 @@ public class FeedEntityParser extends EntityParser<Feed> { if (dataLocation == null) { throw new ValidationException(feed.getName() + " is a FileSystem based feed " - + "but it doesn't contain location type - data in cluster " + cluster.getName().toString()); + + "but it doesn't contain location type - data in cluster " + cluster.getName()); } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java index aaaa229..55887ac 100644 --- a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java +++ b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java @@ -64,6 +64,7 @@ public class ProcessEntityParser extends EntityParser<Process> { process.setTimezone(TimeZone.getTimeZone("UTC")); } + validateACL(process); // check if dependent entities exists Set<String> clusters = new HashSet<String>(); for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) { @@ -99,7 +100,6 @@ public class ProcessEntityParser extends EntityParser<Process> { } validateDatasetName(process.getInputs(), process.getOutputs()); validateLateInputs(process); - validateACL(process); } /** @@ -122,8 +122,7 @@ public class ProcessEntityParser extends EntityParser<Process> { String nameNode = getNameNode(cluster); try { Configuration configuration = ClusterHelper.getConfiguration(cluster); - FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem( - configuration, process.getACL()); + FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(configuration); if (!fs.exists(new Path(workflowPath))) { throw new ValidationException( "Workflow path: " + workflowPath + " does not exists in HDFS: " + nameNode); http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java b/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java index 1496268..fb954ec 100644 --- a/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java +++ b/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java @@ -20,7 +20,6 @@ package org.apache.falcon.hadoop; import org.apache.commons.lang.Validate; import org.apache.falcon.FalconException; -import org.apache.falcon.entity.v0.AccessControlList; import org.apache.falcon.security.CurrentUser; import org.apache.falcon.security.SecurityUtil; import org.apache.falcon.util.StartupProperties; @@ -33,6 +32,8 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.URI; @@ -44,6 +45,8 @@ import java.security.PrivilegedExceptionAction; */ public final class HadoopClientFactory { + private static final Logger LOG = LoggerFactory.getLogger(HadoopClientFactory.class); + public static final String FS_DEFAULT_NAME_KEY = CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY; public static final String MR_JT_ADDRESS_KEY = "mapreduce.jobtracker.address"; public static final String YARN_RM_ADDRESS_KEY = "yarn.resourcemanager.address"; @@ -119,10 +122,15 @@ public final class HadoopClientFactory { throws FalconException { Validate.notNull(conf, "configuration cannot be null"); - return createProxiedFileSystem(conf, null); + String nameNode = getNameNode(conf); + try { + return createProxiedFileSystem(new URI(nameNode), conf); + } catch (URISyntaxException e) { + throw new FalconException("Exception while getting FileSystem for: " + nameNode, e); + } } - private String getNameNode(Configuration conf) { + private static String getNameNode(Configuration conf) { return conf.get(FS_DEFAULT_NAME_KEY); } @@ -141,47 +149,14 @@ public final class HadoopClientFactory { final Configuration conf) throws FalconException { Validate.notNull(uri, "uri cannot be null"); - return createProxiedFileSystem(uri, conf, null); - } - - public FileSystem createProxiedFileSystem(final Configuration conf, - final AccessControlList acl) throws FalconException { - Validate.notNull(conf, "configuration cannot be null"); - try { - return createProxiedFileSystem(new URI(getNameNode(conf)), conf, acl); - } catch (URISyntaxException e) { - throw new FalconException("Exception while getting FileSystem for proxy: " - + CurrentUser.getUser(), e); - } - } - - // getFileSystemAsEntityOwner - public FileSystem createProxiedFileSystem(final URI uri, - final Configuration conf, - final AccessControlList acl) throws FalconException { - Validate.notNull(uri, "uri cannot be null"); - - try { - UserGroupInformation proxyUGI = getProxyUGI(acl); - - return createFileSystem(proxyUGI, uri, conf); + return createFileSystem(CurrentUser.getProxyUGI(), uri, conf); } catch (IOException e) { throw new FalconException("Exception while getting FileSystem for proxy: " + CurrentUser.getUser(), e); } } - private UserGroupInformation getProxyUGI(AccessControlList acl) - throws FalconException, IOException { - - return CurrentUser.isAuthenticated() - ? acl != null - && SecurityUtil.getAuthorizationProvider().isSuperUser(CurrentUser.getProxyUGI()) - ? CurrentUser.createProxyUGI(acl.getOwner()) : CurrentUser.getProxyUGI() - : UserGroupInformation.getCurrentUser(); - } - /** * Return a FileSystem created with the provided user for the specified URI. * @@ -212,10 +187,14 @@ public final class HadoopClientFactory { try { // prevent falcon impersonating falcon, no need to use doas - if (ugi.equals(UserGroupInformation.getLoginUser())) { + final String proxyUserName = ugi.getShortUserName(); + if (proxyUserName.equals(UserGroupInformation.getLoginUser().getShortUserName())) { + LOG.info("Creating FS for the login user {}, impersonation not required", + proxyUserName); return FileSystem.get(uri, conf); } + LOG.info("Creating FS impersonating user {}", proxyUserName); return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() { public FileSystem run() throws Exception { return FileSystem.get(uri, conf); http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/common/src/main/java/org/apache/falcon/security/AuthorizationProvider.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/security/AuthorizationProvider.java b/common/src/main/java/org/apache/falcon/security/AuthorizationProvider.java index 1b36c4e..a6f2564 100644 --- a/common/src/main/java/org/apache/falcon/security/AuthorizationProvider.java +++ b/common/src/main/java/org/apache/falcon/security/AuthorizationProvider.java @@ -18,10 +18,13 @@ package org.apache.falcon.security; +import org.apache.falcon.entity.EntityNotRegisteredException; import org.apache.falcon.entity.v0.AccessControlList; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AuthorizationException; +import java.io.IOException; + /** * An interface for authorizing user against an entity operation. */ @@ -30,10 +33,21 @@ public interface AuthorizationProvider { /** * Check if the authenticated user is a super user. * - * @param proxyUgi proxy ugi for the authenticated user + * @param authenticatedUGI proxy ugi for the authenticated user * @return true if sure user, else false */ - boolean isSuperUser(UserGroupInformation proxyUgi); + boolean isSuperUser(UserGroupInformation authenticatedUGI); + + /** + * Checks if authenticated user can proxy the entity acl owner. + * + * @param authenticatedUGI proxy ugi for the authenticated user. + * @param aclOwner entity ACL Owner. + * @param aclGroup entity ACL group. + * @throws IOException + */ + boolean shouldProxy(UserGroupInformation authenticatedUGI, + String aclOwner, String aclGroup) throws IOException; /** * Determines if the authenticated user is authorized to execute the action on the resource, @@ -44,14 +58,15 @@ public interface AuthorizationProvider { * @param action action being authorized on resource and entity if applicable * @param entityType entity type in question, not for admin resource * @param entityName entity name in question, not for admin resource - * @param proxyUgi proxy ugi for the authenticated user + * @param authenticatedUGI proxy ugi for the authenticated user * @throws AuthorizationException */ void authorizeResource(String resource, String action, String entityType, String entityName, - UserGroupInformation proxyUgi) throws AuthorizationException; + UserGroupInformation authenticatedUGI) + throws AuthorizationException, EntityNotRegisteredException; /** * Determines if the authenticated user is authorized to execute the action on the entity. @@ -61,10 +76,10 @@ public interface AuthorizationProvider { * @param entityType entity in question, applicable for entities and instance resource * @param acl entity ACL * @param action action being authorized on resource and entity if applicable - * @param proxyUgi proxy ugi for the authenticated user + * @param authenticatedUGI proxy ugi for the authenticated user * @throws AuthorizationException */ void authorizeEntity(String entityName, String entityType, AccessControlList acl, String action, - UserGroupInformation proxyUgi) throws AuthorizationException; + UserGroupInformation authenticatedUGI) throws AuthorizationException; } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/common/src/main/java/org/apache/falcon/security/CurrentUser.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/security/CurrentUser.java b/common/src/main/java/org/apache/falcon/security/CurrentUser.java index 3d35630..11adccd 100644 --- a/common/src/main/java/org/apache/falcon/security/CurrentUser.java +++ b/common/src/main/java/org/apache/falcon/security/CurrentUser.java @@ -18,11 +18,11 @@ package org.apache.falcon.security; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.security.auth.Subject; import java.io.IOException; import java.util.Arrays; import java.util.Collections; @@ -32,71 +32,114 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; /** - * Current authenticated user via REST. - * Also doles out proxied UserGroupInformation. Caches proxied users. + * Current authenticated user via REST. Also captures the proxy user from authorized entity + * and doles out proxied UserGroupInformation. Caches proxied users. */ public final class CurrentUser { private static final Logger LOG = LoggerFactory.getLogger(CurrentUser.class); + private static final Logger AUDIT = LoggerFactory.getLogger("AUDIT"); - private static final CurrentUser INSTANCE = new CurrentUser(); + private final String authenticatedUser; + private String proxyUser; - private CurrentUser() {} - - public static CurrentUser get() { - return INSTANCE; + private CurrentUser(String authenticatedUser) { + this.authenticatedUser = authenticatedUser; + this.proxyUser = authenticatedUser; } - private final ThreadLocal<Subject> currentSubject = new ThreadLocal<Subject>(); + private static final ThreadLocal<CurrentUser> CURRENT_USER = new ThreadLocal<CurrentUser>(); + /** + * Captures the authenticated user. + * + * @param user authenticated user + */ public static void authenticate(final String user) { - if (user == null || user.isEmpty()) { + if (StringUtils.isEmpty(user)) { throw new IllegalStateException("Bad user name sent for authentication"); } - if (user.equals(getUserInternal())) { - return; - } - Subject subject = new Subject(); - subject.getPrincipals().add(new FalconPrincipal(user)); + LOG.info("Logging in {}", user); + CurrentUser currentUser = new CurrentUser(user); + CURRENT_USER.set(currentUser); + } - try { // initialize proxy user - createProxyUGI(user); - } catch (IOException e) { - throw new IllegalStateException("Unable to create a proxy user"); + /** + * Captures the entity owner if authenticated user is a super user. + * + * @param aclOwner entity acl owner + * @param aclGroup entity acl group + * @throws IOException + */ + public static void proxy(final String aclOwner, + final String aclGroup) throws IOException { + if (!isAuthenticated() || StringUtils.isEmpty(aclOwner)) { + throw new IllegalStateException("Authentication not done or Bad user name"); } - LOG.info("Logging in {}", user); - INSTANCE.currentSubject.set(subject); + CurrentUser user = CURRENT_USER.get(); + LOG.info("Authenticated user {} is proxying entity owner {}/{}", + user.authenticatedUser, aclOwner, aclGroup); + AUDIT.info("Authenticated user {} is proxying entity owner {}/{}", + user.authenticatedUser, aclOwner, aclGroup); + user.proxyUser = aclOwner; } - public static boolean isAuthenticated() { - return getSubject() != null; + /** + * Clears the context. + */ + public static void clear() { + CURRENT_USER.remove(); } - public static Subject getSubject() { - return INSTANCE.currentSubject.get(); + /** + * Checks if the authenticate method is already called. + * + * @return true if authenticated user is set else false + */ + public static boolean isAuthenticated() { + CurrentUser user = CURRENT_USER.get(); + return user != null && user.authenticatedUser != null; } - public static String getUser() { - String user = getUserInternal(); - if (user == null) { + /** + * Returns authenticated user. + * + * @return logged in authenticated user. + */ + public static String getAuthenticatedUser() { + CurrentUser user = CURRENT_USER.get(); + if (user == null || user.authenticatedUser == null) { throw new IllegalStateException("No user logged into the system"); } else { - return user; + return user.authenticatedUser; } } - private static String getUserInternal() { - Subject subject = getSubject(); - if (subject == null) { - return null; + /** + * Dole out a UGI object for the current authenticated user if authenticated + * else return current user. + * + * @return UGI object + * @throws java.io.IOException + */ + public static UserGroupInformation getAuthenticatedUGI() throws IOException { + return CurrentUser.isAuthenticated() + ? createProxyUGI(getAuthenticatedUser()) : UserGroupInformation.getCurrentUser(); + } + + /** + * Returns the proxy user. + * + * @return proxy user + */ + public static String getUser() { + CurrentUser user = CURRENT_USER.get(); + if (user == null || user.proxyUser == null) { + throw new IllegalStateException("No user logged into the system"); } else { - for (FalconPrincipal principal : subject. - getPrincipals(FalconPrincipal.class)) { - return principal.getName(); - } - return null; + return user.proxyUser; } } @@ -104,7 +147,7 @@ public final class CurrentUser { new ConcurrentHashMap<String, UserGroupInformation>(); /** - * Create a proxy UGI object for the current authenticated user. + * Create a proxy UGI object for the proxy user. * * @param proxyUser logged in user * @return UGI object @@ -123,33 +166,39 @@ public final class CurrentUser { } /** - * Dole out a proxy UGI object for the current authenticated user. + * Dole out a proxy UGI object for the current authenticated user if authenticated + * else return current user. * * @return UGI object * @throws java.io.IOException */ public static UserGroupInformation getProxyUGI() throws IOException { - String proxyUser = getUser(); - - UserGroupInformation proxyUgi = userUgiMap.get(proxyUser); - if (proxyUgi == null) { - // taking care of a race condition, the latest UGI will be discarded - proxyUgi = UserGroupInformation.createProxyUser( - proxyUser, UserGroupInformation.getLoginUser()); - userUgiMap.putIfAbsent(proxyUser, proxyUgi); - } - - return proxyUgi; + return CurrentUser.isAuthenticated() + ? createProxyUGI(getUser()) : UserGroupInformation.getCurrentUser(); } + /** + * Gets a collection of group names the proxy user belongs to. + * + * @return group names + * @throws IOException + */ public static Set<String> getGroupNames() throws IOException { HashSet<String> s = new HashSet<String>(Arrays.asList(getProxyUGI().getGroupNames())); return Collections.unmodifiableSet(s); } + /** + * Returns the primary group name for the proxy user. + * + * @return primary group name for the proxy user + */ public static String getPrimaryGroupName() { try { - return getProxyUGI().getPrimaryGroupName(); + String[] groups = getProxyUGI().getGroupNames(); + if (groups.length > 0) { + return groups[0]; + } } catch (IOException ignore) { // ignored } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java b/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java index e7895f8..b59718c 100644 --- a/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java +++ b/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java @@ -21,6 +21,7 @@ package org.apache.falcon.security; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.Validate; import org.apache.falcon.FalconException; +import org.apache.falcon.entity.EntityNotRegisteredException; import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.v0.AccessControlList; import org.apache.falcon.entity.v0.Entity; @@ -31,6 +32,7 @@ import org.apache.hadoop.security.authorize.AuthorizationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -104,6 +106,40 @@ public class DefaultAuthorizationProvider implements AuthorizationProvider { } /** + * Determines if the authenticated user is the user who started this process + * or belongs to the super user group. + * + * @param authenticatedUGI UGI + * @return true if super user else false. + */ + public boolean isSuperUser(UserGroupInformation authenticatedUGI) { + return SUPER_USER.equals(authenticatedUGI.getShortUserName()) + || (!StringUtils.isEmpty(superUserGroup) + && isUserInGroup(superUserGroup, authenticatedUGI)); + } + + /** + * Checks if authenticated user should proxy the entity acl owner. + * + * @param authenticatedUGI proxy ugi for the authenticated user. + * @param aclOwner entity ACL Owner. + * @param aclGroup entity ACL group. + * @throws IOException + */ + @Override + public boolean shouldProxy(UserGroupInformation authenticatedUGI, + final String aclOwner, + final String aclGroup) throws IOException { + Validate.notNull(authenticatedUGI, "User cannot be empty or null"); + Validate.notEmpty(aclOwner, "User cannot be empty or null"); + Validate.notEmpty(aclGroup, "Group cannot be empty or null"); + + return isSuperUser(authenticatedUGI) + || (!isUserACLOwner(authenticatedUGI.getShortUserName(), aclOwner) + && isUserInGroup(aclGroup, authenticatedUGI)); + } + + /** * Determines if the authenticated user is authorized to execute the action on the resource. * Throws an exception if not authorized. * @@ -111,45 +147,38 @@ public class DefaultAuthorizationProvider implements AuthorizationProvider { * @param action action being authorized on resource and entity if applicable * @param entityType entity type in question, not for admin resource * @param entityName entity name in question, not for admin resource - * @param proxyUgi proxy ugi for the authenticated user + * @param authenticatedUGI proxy ugi for the authenticated user * @throws org.apache.hadoop.security.authorize.AuthorizationException */ @Override public void authorizeResource(String resource, String action, String entityType, String entityName, - UserGroupInformation proxyUgi) throws AuthorizationException { + UserGroupInformation authenticatedUGI) + throws AuthorizationException, EntityNotRegisteredException { + Validate.notEmpty(resource, "Resource cannot be empty or null"); Validate.isTrue(RESOURCES.contains(resource), "Illegal resource: " + resource); Validate.notEmpty(action, "Action cannot be empty or null"); - if (isSuperUser(proxyUgi)) { - return; - } + try { + if (isSuperUser(authenticatedUGI)) { + return; + } - if ("admin".equals(resource)) { - if (!"version".equals(action)) { - authorizeAdminResource(proxyUgi, action); + if ("admin".equals(resource)) { + if (!"version".equals(action)) { + authorizeAdminResource(authenticatedUGI, action); + } + } else if ("entities".equals(resource) || "instance".equals(resource)) { + authorizeEntityResource(authenticatedUGI, entityName, entityType, action); + } else if ("metadata".equals(resource)) { + authorizeMetadataResource(authenticatedUGI, action); } - } else if ("entities".equals(resource) || "instance".equals(resource)) { - authorizeEntityResource(proxyUgi, entityName, entityType, action); - } else if ("metadata".equals(resource)) { - authorizeMetadataResource(proxyUgi.getShortUserName(), action); + } catch (IOException e) { + throw new AuthorizationException(e); } } - /** - * Determines if the authenticated user is the user who started this process - * or belongs to the super user group. - * - * @param authenticatedUGI UGI - * @return true if super user else false. - */ - public boolean isSuperUser(UserGroupInformation authenticatedUGI) { - return SUPER_USER.equals(authenticatedUGI.getShortUserName()) - || (!StringUtils.isEmpty(superUserGroup) - && isUserInGroup(superUserGroup, authenticatedUGI)); - } - protected Set<String> getGroupNames(UserGroupInformation proxyUgi) { HashSet<String> s = new HashSet<String>(Arrays.asList(proxyUgi.getGroupNames())); return Collections.unmodifiableSet(s); @@ -163,22 +192,26 @@ public class DefaultAuthorizationProvider implements AuthorizationProvider { * @param entityType entity in question, applicable for entities and instance resource * @param acl entity ACL * @param action action being authorized on resource and entity if applicable - * @param proxyUgi proxy ugi for the authenticated user + * @param authenticatedUGI proxy ugi for the authenticated user * @throws org.apache.hadoop.security.authorize.AuthorizationException */ @Override - public void authorizeEntity(String entityName, String entityType, - AccessControlList acl, String action, - UserGroupInformation proxyUgi) throws AuthorizationException { - String authenticatedUser = proxyUgi.getShortUserName(); - LOG.info("Authorizing authenticatedUser={}, action={}, entity={}, type{}", - authenticatedUser, action, entityName, entityType); - - if (isSuperUser(proxyUgi)) { - return; - } + public void authorizeEntity(String entityName, String entityType, AccessControlList acl, + String action, UserGroupInformation authenticatedUGI) + throws AuthorizationException { + + try { + LOG.info("Authorizing authenticatedUser={}, action={}, entity={}, type{}", + authenticatedUGI.getShortUserName(), action, entityName, entityType); - checkUser(entityName, acl.getOwner(), acl.getGroup(), action, authenticatedUser, proxyUgi); + if (isSuperUser(authenticatedUGI)) { + return; + } + + checkUser(entityName, acl.getOwner(), acl.getGroup(), action, authenticatedUGI); + } catch (IOException e) { + throw new AuthorizationException(e); + } } /** @@ -188,15 +221,14 @@ public class DefaultAuthorizationProvider implements AuthorizationProvider { * @param aclOwner entity ACL Owner. * @param aclGroup entity ACL group. * @param action action being authorized on resource and entity if applicable. - * @param authenticatedUser authenticated user name. - * @param proxyUgi proxy ugi for the authenticated user. + * @param authenticatedUGI proxy ugi for the authenticated user. * @throws AuthorizationException */ - protected void checkUser(String entityName, String aclOwner, String aclGroup, - String action, String authenticatedUser, - UserGroupInformation proxyUgi) throws AuthorizationException { + protected void checkUser(String entityName, String aclOwner, String aclGroup, String action, + UserGroupInformation authenticatedUGI) throws AuthorizationException { + final String authenticatedUser = authenticatedUGI.getShortUserName(); if (isUserACLOwner(authenticatedUser, aclOwner) - || isUserInGroup(aclGroup, proxyUgi)) { + || isUserInGroup(aclGroup, authenticatedUGI)) { return; } @@ -237,15 +269,15 @@ public class DefaultAuthorizationProvider implements AuthorizationProvider { /** * Check if the user has admin privileges. * - * @param proxyUgi proxy ugi for the authenticated user. + * @param authenticatedUGI proxy ugi for the authenticated user. * @param action admin action on the resource. * @throws AuthorizationException if the user does not have admin privileges. */ - protected void authorizeAdminResource(UserGroupInformation proxyUgi, + protected void authorizeAdminResource(UserGroupInformation authenticatedUGI, String action) throws AuthorizationException { - final String authenticatedUser = proxyUgi.getShortUserName(); + final String authenticatedUser = authenticatedUGI.getShortUserName(); LOG.debug("Authorizing user={} for admin, action={}", authenticatedUser, action); - if (adminUsers.contains(authenticatedUser) || isUserInAdminGroups(proxyUgi)) { + if (adminUsers.contains(authenticatedUser) || isUserInAdminGroups(authenticatedUGI)) { return; } @@ -268,35 +300,44 @@ public class DefaultAuthorizationProvider implements AuthorizationProvider { return isUserGroupInAdmin; } - protected void authorizeEntityResource(UserGroupInformation proxyUgi, + protected void authorizeEntityResource(UserGroupInformation authenticatedUGI, String entityName, String entityType, - String action) throws AuthorizationException { + String action) + throws AuthorizationException, EntityNotRegisteredException { + Validate.notEmpty(entityType, "Entity type cannot be empty or null"); LOG.debug("Authorizing authenticatedUser={} against entity/instance action={}, " + "entity name={}, entity type={}", - proxyUgi.getShortUserName(), action, entityName, entityType); + authenticatedUGI.getShortUserName(), action, entityName, entityType); if (entityName != null) { // lifecycle actions Entity entity = getEntity(entityName, entityType); authorizeEntity(entity.getName(), entity.getEntityType().name(), - entity.getACL(), action, proxyUgi); + entity.getACL(), action, authenticatedUGI); } else { // non lifecycle actions, lifecycle actions with null entity will validate later LOG.info("Authorization for action={} will be done in the API", action); } } - private Entity getEntity(String entityName, String entityType) throws AuthorizationException { + private Entity getEntity(String entityName, String entityType) + throws EntityNotRegisteredException, AuthorizationException { + try { EntityType type = EntityType.valueOf(entityType.toUpperCase()); return EntityUtil.getEntity(type, entityName); } catch (FalconException e) { - throw new AuthorizationException(e); + if (e instanceof EntityNotRegisteredException) { + throw (EntityNotRegisteredException) e; + } else { + throw new AuthorizationException(e); + } } } - protected void authorizeMetadataResource(String authenticatedUser, String action) { - LOG.debug("User {} authorized for action {} ", authenticatedUser, action); + protected void authorizeMetadataResource(UserGroupInformation authenticatedUGI, + String action) throws AuthorizationException { + LOG.debug("User {} authorized for action {} ", authenticatedUGI.getShortUserName(), action); // todo - read-only for all metadata but needs to be implemented } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/common/src/main/java/org/apache/falcon/security/FalconPrincipal.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/security/FalconPrincipal.java b/common/src/main/java/org/apache/falcon/security/FalconPrincipal.java deleted file mode 100644 index ab93e1a..0000000 --- a/common/src/main/java/org/apache/falcon/security/FalconPrincipal.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.security; - -import java.security.Principal; - -/** - * Falcon JAAS principal object. - */ -public class FalconPrincipal implements Principal { - - private final String user; - - public FalconPrincipal(String user) { - this.user = user; - } - - @Override - public String getName() { - return user; - } -} http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java index 7782c71..5a86ae3 100644 --- a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java +++ b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java @@ -124,8 +124,7 @@ public final class UpdateHelper { ConfigurationStore.get().get(EntityType.CLUSTER, cluster); Path checksum = new Path(bundleAppPath, EntityUtil.PROCESS_CHECKSUM_FILE); Configuration conf = ClusterHelper.getConfiguration(clusterEntity); - FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem( - conf, process.getACL()); + FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf); if (!fs.exists(checksum)) { //Update if there is no checksum file(for backward compatibility) return true; http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/common/src/test/java/org/apache/falcon/security/CurrentUserTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/security/CurrentUserTest.java b/common/src/test/java/org/apache/falcon/security/CurrentUserTest.java index 187d85e..9a3f365 100644 --- a/common/src/test/java/org/apache/falcon/security/CurrentUserTest.java +++ b/common/src/test/java/org/apache/falcon/security/CurrentUserTest.java @@ -18,8 +18,10 @@ package org.apache.falcon.security; +import org.apache.falcon.cluster.util.EntityBuilderTestUtil; import org.apache.hadoop.security.UserGroupInformation; import org.testng.Assert; +import org.testng.annotations.AfterMethod; import org.testng.annotations.Test; /** @@ -27,18 +29,89 @@ import org.testng.annotations.Test; */ public class CurrentUserTest { + @AfterMethod + public void cleanUp() { + CurrentUser.clear(); + } + @Test(threadPoolSize = 10, invocationCount = 10, timeOut = 10000) public void testGetUser() throws Exception { String id = Long.toString(System.nanoTime()); CurrentUser.authenticate(id); + Assert.assertEquals(CurrentUser.getAuthenticatedUser(), id); Assert.assertEquals(CurrentUser.getUser(), id); } + @Test (expectedExceptions = IllegalStateException.class) + public void testAuthenticateBadUser() throws Exception { + CurrentUser.authenticate(""); + } + + @Test (expectedExceptions = IllegalStateException.class) + public void testGetAuthenticatedUserInvalid() throws Exception { + CurrentUser.getAuthenticatedUser(); + } + + @Test (expectedExceptions = IllegalStateException.class) + public void testGetUserInvalid() throws Exception { + CurrentUser.getUser(); + } + + @Test (expectedExceptions = IllegalStateException.class) + public void testProxyBadUser() throws Exception { + CurrentUser.authenticate("falcon"); + CurrentUser.proxy("", ""); + } + + @Test (expectedExceptions = IllegalStateException.class) + public void testProxyWithNoAuth() throws Exception { + CurrentUser.proxy("falcon", "falcon"); + } + @Test - public void testGetProxyUser() throws Exception { + public void testGetProxyUserForAuthenticatedUser() throws Exception { CurrentUser.authenticate("proxy"); UserGroupInformation proxyUgi = CurrentUser.getProxyUGI(); Assert.assertNotNull(proxyUgi); Assert.assertEquals(proxyUgi.getUserName(), "proxy"); } + + @Test + public void testProxy() throws Exception { + CurrentUser.authenticate("real"); + + CurrentUser.proxy(EntityBuilderTestUtil.USER, "users"); + UserGroupInformation proxyUgi = CurrentUser.getProxyUGI(); + Assert.assertNotNull(proxyUgi); + Assert.assertEquals(proxyUgi.getUserName(), EntityBuilderTestUtil.USER); + + Assert.assertEquals(CurrentUser.getAuthenticatedUser(), "real"); + Assert.assertEquals(CurrentUser.getUser(), EntityBuilderTestUtil.USER); + } + + @Test + public void testProxySameUser() throws Exception { + CurrentUser.authenticate("falcon"); + + CurrentUser.proxy("falcon", "users"); + UserGroupInformation proxyUgi = CurrentUser.getProxyUGI(); + Assert.assertNotNull(proxyUgi); + Assert.assertEquals(proxyUgi.getUserName(), "falcon"); + + Assert.assertEquals(CurrentUser.getAuthenticatedUser(), "falcon"); + Assert.assertEquals(CurrentUser.getUser(), "falcon"); + } + + @Test + public void testSuperUser() throws Exception { + CurrentUser.authenticate(EntityBuilderTestUtil.USER); + CurrentUser.proxy("proxy", "users"); + + UserGroupInformation proxyUgi = CurrentUser.getProxyUGI(); + Assert.assertNotNull(proxyUgi); + Assert.assertEquals(proxyUgi.getUserName(), "proxy"); + + Assert.assertEquals(CurrentUser.getAuthenticatedUser(), EntityBuilderTestUtil.USER); + Assert.assertEquals(CurrentUser.getUser(), "proxy"); + } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/common/src/test/java/org/apache/falcon/security/DefaultAuthorizationProviderTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/security/DefaultAuthorizationProviderTest.java b/common/src/test/java/org/apache/falcon/security/DefaultAuthorizationProviderTest.java index 0a40359..162be12 100644 --- a/common/src/test/java/org/apache/falcon/security/DefaultAuthorizationProviderTest.java +++ b/common/src/test/java/org/apache/falcon/security/DefaultAuthorizationProviderTest.java @@ -20,6 +20,7 @@ package org.apache.falcon.security; import org.apache.falcon.FalconException; import org.apache.falcon.cluster.util.EntityBuilderTestUtil; +import org.apache.falcon.entity.EntityNotRegisteredException; import org.apache.falcon.entity.Storage; import org.apache.falcon.entity.store.ConfigurationStore; import org.apache.falcon.entity.v0.EntityType; @@ -344,7 +345,7 @@ public class DefaultAuthorizationProviderTest { processEntity.getACL(), "submit", proxyUgi); } - @Test (expectedExceptions = AuthorizationException.class) + @Test (expectedExceptions = EntityNotRegisteredException.class) public void testAuthorizeResourceOperationsBadEntity() throws Exception { StartupProperties.get().setProperty("falcon.security.authorization.admin.users", "admin"); StartupProperties.get().setProperty("falcon.security.authorization.admin.groups", "admin"); http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java b/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java index 6844f31..2e5dffb 100644 --- a/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java +++ b/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java @@ -53,8 +53,7 @@ public final class LogProvider { try { Configuration conf = ClusterHelper.getConfiguration(clusterObj); // fs on behalf of the end user. - FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem( - conf, entity.getACL()); + FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf); String resolvedRunId = getResolvedRunId(fs, clusterObj, entity, instance, runId); // if runId param is not resolved, i.e job is killed or not started or running if (resolvedRunId.equals("-") http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java index 4108839..e341fb8 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java @@ -140,7 +140,7 @@ public abstract class OozieEntityBuilder<T extends Entity> { } FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem( - outPath.toUri(), ClusterHelper.getConfiguration(cluster), entity.getACL()); + outPath.toUri(), ClusterHelper.getConfiguration(cluster)); OutputStream out = fs.create(outPath); try { marshaller.marshal(jaxbElement, out); @@ -261,7 +261,7 @@ public abstract class OozieEntityBuilder<T extends Entity> { protected void copySharedLibs(Cluster cluster, Path libPath) throws FalconException { try { FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem( - libPath.toUri(), ClusterHelper.getConfiguration(cluster), entity.getACL()); + libPath.toUri(), ClusterHelper.getConfiguration(cluster)); SharedLibraryHostingService.pushLibsToHDFS( fs, StartupProperties.get().getProperty("system.lib.location"), libPath, FALCON_JAR_FILTER); http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java index f7fed45..771295f 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java @@ -211,7 +211,7 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend protected void addLibExtensionsToWorkflow(Cluster cluster, WORKFLOWAPP wf, Tag tag) throws FalconException { String libext = ClusterHelper.getLocation(cluster, "working") + "/libext"; FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem( - ClusterHelper.getConfiguration(cluster), entity.getACL()); + ClusterHelper.getConfiguration(cluster)); try { addExtensionJars(fs, new Path(libext), wf); addExtensionJars(fs, new Path(libext, entity.getEntityType().name()), wf); @@ -268,8 +268,7 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend try { Configuration conf = ClusterHelper.getConfiguration(cluster); - FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf, - entity.getACL()); + FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf); // create hive conf to stagingDir Path confPath = new Path(workflowPath + "/conf"); http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java index c578005..c5366dc 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java @@ -281,7 +281,7 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F private void setupHiveConfiguration(Cluster srcCluster, Cluster trgCluster, Path buildPath) throws FalconException { Configuration conf = ClusterHelper.getConfiguration(trgCluster); - FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf, entity.getACL()); + FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf); try { // copy import export scripts to stagingDir http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java index 3e54bd2..8691ee5 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java @@ -121,7 +121,7 @@ public class ProcessBundleBuilder extends OozieBundleBuilder<Process> { private void copyUserWorkflow(Cluster cluster, Path buildPath) throws FalconException { try { FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem( - ClusterHelper.getConfiguration(cluster), entity.getACL()); + ClusterHelper.getConfiguration(cluster)); //Copy user workflow and lib to staging dir Map<String, String> checksums = UpdateHelper.checksumAndCopy(fs, new Path(entity.getWorkflow().getPath()), http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java index 24437fc..d271695 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java @@ -223,7 +223,7 @@ public abstract class ProcessExecutionWorkflowBuilder extends OozieOrchestration try { final FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem( - ClusterHelper.getConfiguration(cluster), entity.getACL()); + ClusterHelper.getConfiguration(cluster)); if (fs.isFile(libPath)) { // File, not a Dir archiveList.add(libPath.toString()); return; http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java index d598097..622238a 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java @@ -27,8 +27,6 @@ import org.apache.oozie.client.ProxyOozieClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.ConcurrentHashMap; - /** * Factory for providing appropriate oozie client. */ @@ -37,8 +35,6 @@ public final class OozieClientFactory { private static final Logger LOG = LoggerFactory.getLogger(OozieClientFactory.class); private static final String LOCAL_OOZIE = "local"; - private static final ConcurrentHashMap<String, ProxyOozieClient> CACHE = - new ConcurrentHashMap<String, ProxyOozieClient>(); private static volatile boolean localInitialized = false; private OozieClientFactory() {} @@ -48,13 +44,8 @@ public final class OozieClientFactory { assert cluster != null : "Cluster cant be null"; String oozieUrl = ClusterHelper.getOozieUrl(cluster); - if (!CACHE.containsKey(oozieUrl)) { - ProxyOozieClient ref = getClientRef(oozieUrl); - LOG.info("Caching Oozie client object for {}", oozieUrl); - CACHE.putIfAbsent(oozieUrl, ref); - } - - return CACHE.get(oozieUrl); + LOG.info("Creating Oozie client object for {}", oozieUrl); + return getClientRef(oozieUrl); } public static ProxyOozieClient get(String clusterName) throws FalconException { http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java index d9fe8c1..54cab51 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java @@ -58,8 +58,7 @@ public class OozieHouseKeepingService implements WorkflowEngineActionListener { LOG.info("Deleting entity path {} on cluster {}", entityPath, clusterName); Configuration conf = ClusterHelper.getConfiguration(cluster); - FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf, - entity.getACL()); + FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf); if (fs.exists(entityPath) && !fs.delete(entityPath, true)) { throw new FalconException("Unable to cleanup entity path: " + entityPath); } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java index 89bebe7..771839a 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java @@ -178,7 +178,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { try { FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem( - ClusterHelper.getConfiguration(cluster), entity.getACL()); + ClusterHelper.getConfiguration(cluster)); HadoopClientFactory.mkdirsWithDefaultPerms(fs, stagingPath); HadoopClientFactory.mkdirsWithDefaultPerms(fs, logPath); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/oozie/src/main/java/org/apache/oozie/client/ProxyOozieClient.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/oozie/client/ProxyOozieClient.java b/oozie/src/main/java/org/apache/oozie/client/ProxyOozieClient.java index a7c6960..fcd8ca7 100644 --- a/oozie/src/main/java/org/apache/oozie/client/ProxyOozieClient.java +++ b/oozie/src/main/java/org/apache/oozie/client/ProxyOozieClient.java @@ -73,9 +73,10 @@ public class ProxyOozieClient extends AuthOozieClient { final URL decoratedUrl = decorateUrlWithUser(url); LOG.debug("ProxyOozieClient.createConnection: u={}, m={}", url, method); - UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); + // Login User "falcon" has the kerberos credentials + UserGroupInformation loginUserUGI = UserGroupInformation.getLoginUser(); try { - return currentUser.doAs(new PrivilegedExceptionAction<HttpURLConnection>() { + return loginUserUGI.doAs(new PrivilegedExceptionAction<HttpURLConnection>() { public HttpURLConnection run() throws Exception { HttpURLConnection conn = ProxyOozieClient.super.createConnection(decoratedUrl, method); http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java index 3308d72..ceacb4e 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java @@ -342,12 +342,23 @@ public abstract class AbstractEntityManager { + "Can't be submitted again. Try removing before submitting."); } + tryProxy(entity); // proxy before validating since FS/Oozie needs to be proxied validate(entity); configStore.publish(entityType, entity); LOG.info("Submit successful: ({}): {}", type, entity.getName()); return entity; } + private void tryProxy(Entity entity) throws IOException, FalconException { + final String aclOwner = entity.getACL().getOwner(); + final String aclGroup = entity.getACL().getGroup(); + if (SecurityUtil.isAuthorizationEnabled() + && SecurityUtil.getAuthorizationProvider().shouldProxy( + CurrentUser.getAuthenticatedUGI(), aclOwner, aclGroup)) { + CurrentUser.proxy(aclOwner, aclGroup); + } + } + /** * KLUDGE - Until ACL is mandated entity passed should be decorated for equals check to pass. * existingEntity in config store will have teh decoration and equals check fails @@ -541,13 +552,13 @@ public abstract class AbstractEntityManager { protected List<Entity> getEntities(String type, String startDate, String endDate, String cluster, String filterBy, String filterTags, String orderBy, String sortOrder, - int offset, int resultsPerPage) throws FalconException { + int offset, int resultsPerPage) throws FalconException, IOException { final Map<String, String> filterByFieldsValues = getFilterByFieldsValues(filterBy); final List<String> filterByTags = getFilterByTags(filterTags); EntityType entityType = EntityType.valueOf(type.toUpperCase()); Collection<String> entityNames = configStore.getEntities(entityType); - if (entityNames == null || entityNames.isEmpty()) { + if (entityNames.isEmpty()) { return Collections.emptyList(); } @@ -564,9 +575,12 @@ public abstract class AbstractEntityManager { throw FalconWebException.newException(e1, Response.Status.BAD_REQUEST); } - if (filterEntityByDatesAndCluster(entity, startDate, endDate, cluster)) { + if (SecurityUtil.isAuthorizationEnabled() && !isEntityAuthorized(entity) + || filterEntityByDatesAndCluster(entity, startDate, endDate, cluster)) { + // the user who requested list query has no permission to access this entity. Skip this entity continue; } + tryProxy(entity); List<String> tags = EntityUtil.getTags(entity); List<String> pipelines = EntityUtil.getPipelines(entity); @@ -658,11 +672,6 @@ public abstract class AbstractEntityManager { private boolean filterEntity(Entity entity, String entityStatus, Map<String, String> filterByFieldsValues, List<String> filterByTags, List<String> tags, List<String> pipelines) { - if (SecurityUtil.isAuthorizationEnabled() && !isEntityAuthorized(entity)) { - // the user who requested list query has no permission to access this entity. Skip this entity - return true; - } - return !((filterByTags.isEmpty() || !filterEntityByTags(filterByTags, tags)) && (filterByFieldsValues.isEmpty() || !filterEntityByFields(entity, filterByFieldsValues, entityStatus, pipelines))); @@ -671,11 +680,12 @@ public abstract class AbstractEntityManager { protected boolean isEntityAuthorized(Entity entity) { try { - SecurityUtil.getAuthorizationProvider().authorizeResource("entities", "list", - entity.getEntityType().toString(), entity.getName(), CurrentUser.getProxyUGI()); + SecurityUtil.getAuthorizationProvider().authorizeEntity(entity.getName(), + entity.getEntityType().toString(), entity.getACL(), + "list", CurrentUser.getAuthenticatedUGI()); } catch (Exception e) { - LOG.error("Authorization failed for entity=" + entity.getName() - + " for user=" + CurrentUser.getUser(), e); + LOG.info("Authorization failed for entity=" + entity.getName() + + " for user=" + CurrentUser.getUser(), e); return false; } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/prism/src/main/java/org/apache/falcon/security/FalconAuditFilter.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/security/FalconAuditFilter.java b/prism/src/main/java/org/apache/falcon/security/FalconAuditFilter.java index 9a9b400..c1e54b8 100644 --- a/prism/src/main/java/org/apache/falcon/security/FalconAuditFilter.java +++ b/prism/src/main/java/org/apache/falcon/security/FalconAuditFilter.java @@ -69,6 +69,7 @@ public class FalconAuditFilter implements Filter { // put the request id into the response so users can trace logs for this request ((HttpServletResponse) response).setHeader(REQUEST_ID, requestId); NDC.pop(); + CurrentUser.clear(); } } @@ -85,16 +86,13 @@ public class FalconAuditFilter implements Filter { } private String getUserFromRequest(HttpServletRequest httpRequest) { - try { - // get the authenticated user - return CurrentUser.getUser(); - } catch (IllegalStateException ignore) { - // ignore since the user authentication might have failed + if (CurrentUser.isAuthenticated()) { + return CurrentUser.getAuthenticatedUser(); + } else { + // look for the user in the request + final String userFromRequest = Servlets.getUserFromRequest(httpRequest); + return userFromRequest == null ? "UNKNOWN" : userFromRequest; } - - // look for the user in the request - final String userFromRequest = Servlets.getUserFromRequest(httpRequest); - return userFromRequest == null ? "UNKNOWN" : userFromRequest; } @Override http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/prism/src/main/java/org/apache/falcon/security/FalconAuthenticationFilter.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/security/FalconAuthenticationFilter.java b/prism/src/main/java/org/apache/falcon/security/FalconAuthenticationFilter.java index fa30f0e..fa13a7d 100644 --- a/prism/src/main/java/org/apache/falcon/security/FalconAuthenticationFilter.java +++ b/prism/src/main/java/org/apache/falcon/security/FalconAuthenticationFilter.java @@ -178,7 +178,7 @@ public class FalconAuthenticationFilter try { NDC.push(user + ":" + httpRequest.getMethod() + "/" + httpRequest.getPathInfo()); CurrentUser.authenticate(user); - LOG.info("Request from user: {}, URL={}", user, + LOG.info("Request from authenticated user: {}, URL={}", user, Servlets.getRequestURI(httpRequest)); filterChain.doFilter(servletRequest, servletResponse); http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/prism/src/main/java/org/apache/falcon/security/FalconAuthorizationFilter.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/security/FalconAuthorizationFilter.java b/prism/src/main/java/org/apache/falcon/security/FalconAuthorizationFilter.java index 3cdb749..a3b7b9c 100644 --- a/prism/src/main/java/org/apache/falcon/security/FalconAuthorizationFilter.java +++ b/prism/src/main/java/org/apache/falcon/security/FalconAuthorizationFilter.java @@ -19,7 +19,14 @@ package org.apache.falcon.security; import org.apache.falcon.FalconException; +import org.apache.falcon.entity.EntityNotRegisteredException; +import org.apache.falcon.entity.EntityUtil; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AuthorizationException; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,6 +38,7 @@ import javax.servlet.ServletRequest; import javax.servlet.ServletResponse; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.core.MediaType; import java.io.IOException; /** @@ -61,34 +69,37 @@ public class FalconAuthorizationFilter implements Filter { public void doFilter(ServletRequest request, ServletResponse response, FilterChain filterChain) throws IOException, ServletException { - int errorCode = HttpServletResponse.SC_FORBIDDEN; - String errorMessage = null; - HttpServletResponse httpResponse = (HttpServletResponse) response; - if (isAuthorizationEnabled) { HttpServletRequest httpRequest = (HttpServletRequest) request; RequestParts requestParts = getUserRequest(httpRequest); LOG.info("Authorizing user={} against request={}", CurrentUser.getUser(), requestParts); try { + final UserGroupInformation authenticatedUGI = CurrentUser.getAuthenticatedUGI(); authorizationProvider.authorizeResource(requestParts.getResource(), requestParts.getAction(), requestParts.getEntityType(), - requestParts.getEntityName(), CurrentUser.getProxyUGI()); + requestParts.getEntityName(), authenticatedUGI); + tryProxy(authenticatedUGI, + requestParts.getEntityType(), requestParts.getEntityName()); + LOG.info("Authorization succeeded for user={}, proxy={}", + authenticatedUGI.getShortUserName(), CurrentUser.getUser()); } catch (AuthorizationException e) { - errorMessage = e.getMessage(); + sendError((HttpServletResponse) response, + HttpServletResponse.SC_FORBIDDEN, e.getMessage()); + return; // do not continue processing + } catch (EntityNotRegisteredException e) { + sendError((HttpServletResponse) response, + HttpServletResponse.SC_BAD_REQUEST, e.getMessage()); + return; // do not continue processing } catch (IllegalArgumentException e) { - errorMessage = e.getMessage(); - errorCode = HttpServletResponse.SC_BAD_REQUEST; + sendError((HttpServletResponse) response, + HttpServletResponse.SC_BAD_REQUEST, e.getMessage()); + return; // do not continue processing } } - if (errorMessage == null) { // continue processing if there was no exception - filterChain.doFilter(request, response); - } else { - if (!httpResponse.isCommitted()) { - httpResponse.sendError(errorCode, errorMessage); - } - } + // continue processing if there was no authorization error + filterChain.doFilter(request, response); } @Override @@ -117,6 +128,49 @@ public class FalconAuthorizationFilter implements Filter { } } + private void tryProxy(UserGroupInformation authenticatedUGI, + String entityType, String entityName) throws IOException { + if (entityType == null || entityName == null) { + return; + } + + try { + EntityType type = EntityType.valueOf(entityType.toUpperCase()); + Entity entity = EntityUtil.getEntity(type, entityName); + if (entity != null && entity.getACL() != null) { + final String aclOwner = entity.getACL().getOwner(); + final String aclGroup = entity.getACL().getGroup(); + if (authorizationProvider.shouldProxy( + authenticatedUGI, aclOwner, aclGroup)) { + CurrentUser.proxy(aclOwner, aclGroup); + } + } + } catch (FalconException ignore) { + // do nothing + } + } + + private void sendError(HttpServletResponse httpResponse, + int errorCode, String errorMessage) throws IOException { + LOG.error("Authorization failed : {}/{}", errorCode, errorMessage); + if (!httpResponse.isCommitted()) { // handle authorization error + httpResponse.setStatus(errorCode); + httpResponse.setContentType(MediaType.APPLICATION_JSON); + httpResponse.getOutputStream().print(getJsonResponse(errorCode, errorMessage)); + } + } + + private String getJsonResponse(int errorCode, String errorMessage) throws IOException { + try { + JSONObject response = new JSONObject(); + response.put("errorCode", errorCode); + response.put("errorMessage", errorMessage); + return response.toString(); + } catch (JSONException e) { + throw new IOException(e); + } + } + private static class RequestParts { private final String resource; private final String action; http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/prism/src/test/java/org/apache/falcon/security/FalconAuthorizationFilterTest.java ---------------------------------------------------------------------- diff --git a/prism/src/test/java/org/apache/falcon/security/FalconAuthorizationFilterTest.java b/prism/src/test/java/org/apache/falcon/security/FalconAuthorizationFilterTest.java index 03dc792..54b5859 100644 --- a/prism/src/test/java/org/apache/falcon/security/FalconAuthorizationFilterTest.java +++ b/prism/src/test/java/org/apache/falcon/security/FalconAuthorizationFilterTest.java @@ -35,8 +35,10 @@ import org.testng.annotations.Test; import javax.servlet.Filter; import javax.servlet.FilterChain; import javax.servlet.FilterConfig; +import javax.servlet.ServletOutputStream; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import java.io.IOException; /** * Test for FalconAuthorizationFilter using mock objects. @@ -89,8 +91,8 @@ public class FalconAuthorizationFilterTest { {"/metadata/lineage/vertices/all"}, {"/metadata/lineage/vertices/_1"}, {"/metadata/lineage/vertices/properties/_1"}, - {"metadata/discovery/process_entity/sample-process/relations"}, - {"metadata/discovery/process_entity/list?cluster=primary-cluster"}, + {"/metadata/discovery/process_entity/sample-process/relations"}, + {"/metadata/discovery/process_entity/list?cluster=primary-cluster"}, }; } @@ -146,6 +148,14 @@ public class FalconAuthorizationFilterTest { Mockito.when(mockRequest.getRequestURI()).thenReturn("/api" + resource); Mockito.when(mockRequest.getPathInfo()).thenReturn(resource); + Mockito.when(mockResponse.getOutputStream()).thenReturn( + new ServletOutputStream() { + @Override + public void write(int b) throws IOException { + System.out.print(b); + } + }); + try { filter.doFilter(mockRequest, mockResponse, mockChain); http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java index 6a8017e..c2cb09e 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java @@ -75,8 +75,7 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends LOG.info("Going to delete path: {}", lateLogPath); final String storageEndpoint = properties.getProperty(AbstractWorkflowEngine.NAME_NODE); Configuration conf = getConfiguration(storageEndpoint); - FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem( - conf, entity.getACL()); + FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf); if (fs.exists(lateLogPath)) { boolean deleted = fs.delete(lateLogPath, true); if (deleted) {