FALCON-753 Change the ownership for staging dir to user submitting the feed. 
Contributed by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/15b89bc3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/15b89bc3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/15b89bc3

Branch: refs/heads/master
Commit: 15b89bc31438fd76071585aacefd9a549d99ac38
Parents: 1caadaf
Author: Venkatesh Seetharam <venkat...@apache.org>
Authored: Thu Oct 16 16:27:23 2014 -0700
Committer: Venkatesh Seetharam <venkat...@apache.org>
Committed: Thu Oct 16 16:27:23 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../falcon/cleanup/AbstractCleanupHandler.java  |  53 ++++---
 .../org/apache/falcon/entity/EntityUtil.java    |  34 +++--
 .../entity/parser/ClusterEntityParser.java      |  98 +++++++------
 .../falcon/entity/parser/EntityParser.java      |   2 +-
 .../falcon/entity/store/ConfigurationStore.java |  28 ++--
 .../falcon/hadoop/HadoopClientFactory.java      |  76 +++++++---
 .../org/apache/falcon/security/CurrentUser.java |  46 +++++-
 .../security/DefaultAuthorizationProvider.java  |  21 +--
 .../workflow/WorkflowExecutionContext.java      |  12 +-
 .../falcon/cleanup/LogCleanupServiceTest.java   |   3 +-
 .../apache/falcon/entity/AbstractTestBase.java  |  49 ++++++-
 .../falcon/hadoop/HadoopClientFactoryTest.java  |   4 +-
 .../apache/falcon/security/CurrentUserTest.java |   2 +-
 docs/src/site/twiki/EntitySpecification.twiki   |   5 +
 docs/src/site/twiki/InstallationSteps.twiki     |   3 +
 docs/src/site/twiki/OnBoarding.twiki            |   4 +
 docs/src/site/twiki/Security.twiki              |  16 ++-
 .../apache/falcon/hadoop/JailedFileSystem.java  |   5 +
 .../falcon/messaging/JMSMessageConsumer.java    |   4 +
 .../falcon/messaging/JMSMessageProducer.java    |   9 +-
 .../org/apache/falcon/logging/JobLogMover.java  |   3 +-
 .../org/apache/falcon/logging/LogProvider.java  |   2 +-
 .../apache/falcon/oozie/OozieBundleBuilder.java |  34 +----
 .../falcon/oozie/OozieCoordinatorBuilder.java   |   3 +-
 .../apache/falcon/oozie/OozieEntityBuilder.java |  22 ++-
 .../OozieOrchestrationWorkflowBuilder.java      |  12 +-
 .../feed/FeedReplicationCoordinatorBuilder.java |   7 +-
 .../oozie/process/ProcessBundleBuilder.java     |   3 +-
 .../ProcessExecutionWorkflowBuilder.java        |   4 +-
 .../service/SharedLibraryHostingService.java    |  60 ++++----
 .../engine/OozieHouseKeepingService.java        |   2 +-
 .../workflow/engine/OozieWorkflowEngine.java    |  31 ++++-
 .../feed/OozieFeedWorkflowBuilderTest.java      | 139 ++++++++++++++++---
 .../falcon/resource/AbstractEntityManager.java  |  54 ++++++-
 .../security/FalconAuthorizationFilter.java     |   2 +-
 .../falcon/replication/FeedReplicator.java      |   7 +-
 .../apache/falcon/latedata/LateDataHandler.java |  17 +--
 .../rerun/handler/AbstractRerunConsumer.java    |   1 +
 .../falcon/rerun/handler/LateRerunConsumer.java |   2 +-
 .../falcon/rerun/handler/LateRerunHandler.java  |   4 +-
 .../apache/falcon/retention/FeedEvictor.java    |  14 +-
 .../falcon/cluster/util/EmbeddedCluster.java    |   2 +-
 .../java/org/apache/falcon/cli/FalconCLIIT.java |   1 +
 .../apache/falcon/late/LateDataHandlerIT.java   |   2 +
 .../falcon/resource/EntityManagerJerseyIT.java  |   4 +-
 .../org/apache/falcon/resource/TestContext.java |  54 +++++++
 .../validation/ClusterEntityValidationIT.java   |  81 +++++++++--
 webapp/src/test/resources/cluster-template.xml  |   2 +-
 webapp/src/test/resources/process-template.xml  |   2 +-
 50 files changed, 744 insertions(+), 303 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d53017f..7af3263 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -3,6 +3,8 @@ Apache Falcon (incubating) Change log
 Trunk (Unreleased)
 
   INCOMPATIBLE CHANGES
+   FALCON-753 Change the ownership for staging dir to user submitting the feed
+   (Venkatesh Seetharam)
 
   NEW FEATURES
    FALCON-687 Add hooks for extensions in Audit (Venkatesh Seetharam)

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java 
b/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
index c315c25..cd088b2 100644
--- a/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
+++ b/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
@@ -22,6 +22,7 @@ import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.AccessControlList;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
@@ -29,6 +30,7 @@ import org.apache.falcon.entity.v0.Frequency.TimeUnit;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.expression.ExpressionHelper;
 import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.util.DeploymentUtil;
 import org.apache.falcon.util.RuntimeProperties;
 import org.apache.falcon.util.StartupProperties;
@@ -72,9 +74,8 @@ public abstract class AbstractCleanupHandler {
                 "log.cleanup.frequency." + timeunit + ".retention", "days(1)");
     }
 
-    protected FileStatus[] 
getAllLogs(org.apache.falcon.entity.v0.cluster.Cluster cluster,
+    protected FileStatus[] getAllLogs(FileSystem fs, Cluster cluster,
                                       Entity entity) throws FalconException {
-        FileSystem fs = getFileSystem(cluster);
         FileStatus[] paths;
         try {
             Path logPath = getLogPath(cluster, entity);
@@ -91,27 +92,42 @@ public abstract class AbstractCleanupHandler {
         return new Path(EntityUtil.getLogPath(cluster, entity), 
getRelativeLogPath());
     }
 
-    protected FileSystem 
getFileSystem(org.apache.falcon.entity.v0.cluster.Cluster cluster)
-        throws FalconException {
+    private FileSystem getFileSystemAsEntityOwner(Cluster cluster,
+                                                  Entity entity) throws 
FalconException {
+        try {
+            final AccessControlList acl = EntityUtil.getACL(entity);
+            if (acl == null) {
+                throw new FalconException("ACL for entity " + entity.getName() 
+ " is empty");
+            }
 
-        return 
HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
+            final String proxyUser = acl.getOwner();
+            // user for proxying
+            CurrentUser.authenticate(proxyUser);
+            return HadoopClientFactory.get().createProxiedFileSystem(
+                    ClusterHelper.getConfiguration(cluster));
+        } catch (Exception e) {
+            throw new FalconException(e);
+        }
     }
 
     protected void delete(String clusterName, Entity entity, long retention) 
throws FalconException {
         Cluster currentCluster = STORE.get(EntityType.CLUSTER, clusterName);
-        if (isClusterInCurrentColo(currentCluster.getColo())) {
-            LOG.info("Cleaning up logs for {}: {} in cluster: {} with 
retention: {}",
-                    entity.getEntityType(), entity.getName(), clusterName, 
retention);
-            FileStatus[] logs = getAllLogs(currentCluster, entity);
-            deleteInternal(currentCluster, entity, retention, logs);
-        } else {
+        if (!isClusterInCurrentColo(currentCluster.getColo())) {
             LOG.info("Ignoring cleanup for {}: {} in cluster: {} as this does 
not belong to current colo",
                     entity.getEntityType(), entity.getName(), clusterName);
+            return;
         }
+
+        LOG.info("Cleaning up logs for {}: {} in cluster: {} with retention: 
{}",
+                entity.getEntityType(), entity.getName(), clusterName, 
retention);
+
+        FileSystem fs = getFileSystemAsEntityOwner(currentCluster, entity);
+        FileStatus[] logs = getAllLogs(fs, currentCluster, entity);
+        deleteInternal(fs, currentCluster, entity, retention, logs);
     }
 
-    protected void deleteInternal(Cluster cluster, Entity entity,
-                                  long retention, FileStatus[] logs) throws 
FalconException {
+    private void deleteInternal(FileSystem fs, Cluster cluster, Entity entity,
+                                long retention, FileStatus[] logs) throws 
FalconException {
         if (logs == null || logs.length == 0) {
             LOG.info("Nothing to delete for cluster: {}, entity: {}", 
cluster.getName(),
                     entity.getName());
@@ -123,13 +139,10 @@ public abstract class AbstractCleanupHandler {
         for (FileStatus log : logs) {
             if (now - log.getModificationTime() > retention) {
                 try {
-                    boolean isDeleted = 
getFileSystem(cluster).delete(log.getPath(), true);
-                    if (!isDeleted) {
-                        LOG.error("Unable to delete path: {}", log.getPath());
-                    } else {
-                        LOG.info("Deleted path: {}", log.getPath());
-                    }
-                    deleteParentIfEmpty(getFileSystem(cluster), 
log.getPath().getParent());
+                    boolean isDeleted = fs.delete(log.getPath(), true);
+                    LOG.error(isDeleted ? "Deleted path: {}" : "Unable to 
delete path: {}",
+                            log.getPath());
+                    deleteParentIfEmpty(fs, log.getPath().getParent());
                 } catch (IOException e) {
                     throw new FalconException(" Unable to delete log file : "
                             + log.getPath() + " for entity " + entity.getName()

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java 
b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index b8f2d7d..1a10986 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -27,6 +27,7 @@ import org.apache.falcon.Pair;
 import org.apache.falcon.Tag;
 import org.apache.falcon.entity.WorkflowNameBuilder.WorkflowName;
 import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.AccessControlList;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
@@ -197,6 +198,7 @@ public final class EntityUtil {
     }
 
     public static int getParallel(Feed feed) {
+        // todo - how this this supposed to work?
         return 1;
     }
 
@@ -442,7 +444,7 @@ public final class EntityUtil {
         return builder.getWorkflowTag(workflowName);
     }
 
-    public static List<String> getWorkflowNames(Entity entity, String cluster) 
{
+    public static List<String> getWorkflowNames(Entity entity) {
         switch(entity.getEntityType()) {
         case FEED:
             return Arrays.asList(getWorkflowName(Tag.RETENTION, 
entity).toString(),
@@ -581,20 +583,16 @@ public final class EntityUtil {
         Entity entity)
         throws FalconException {
         Path basePath = getBaseStagingPath(cluster, entity);
-        FileSystem fs = 
HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
+        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+                ClusterHelper.getConfiguration(cluster));
         try {
-            FileStatus[] filesArray = fs.listStatus(basePath, new PathFilter() 
{
+            return fs.listStatus(basePath, new PathFilter() {
                 @Override
                 public boolean accept(Path path) {
-                    if (path.getName().equals("logs")) {
-                        return false;
-                    }
-                    return true;
+                    return !path.getName().equals("logs");
                 }
             });
 
-            return filesArray;
-
         } catch (FileNotFoundException e) {
             LOG.info("Staging path " + basePath + " doesn't exist, entity is 
not scheduled");
             //Staging path doesn't exist if entity is not scheduled
@@ -755,4 +753,22 @@ public final class EntityUtil {
         return new Pair<Date, Date>(clusterMinStartDate.first, 
clusterMaxEndDate.first);
     }
 
+    public static AccessControlList getACL(Entity entity) {
+        switch (entity.getEntityType()) {
+        case CLUSTER:
+            return ((org.apache.falcon.entity.v0.cluster.Cluster) 
entity).getACL();
+
+        case FEED:
+            return ((org.apache.falcon.entity.v0.feed.Feed) entity).getACL();
+
+        case PROCESS:
+            return ((org.apache.falcon.entity.v0.process.Process) 
entity).getACL();
+
+        default:
+            break;
+        }
+
+        throw new IllegalArgumentException("Unknown entity type: " + 
entity.getEntityType()
+                + " for: " + entity.getName());
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java 
b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
index fbbdbcb..7c4b99d 100644
--- 
a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
+++ 
b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
@@ -38,7 +38,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.slf4j.Logger;
@@ -52,7 +52,7 @@ import java.io.IOException;
  */
 public class ClusterEntityParser extends EntityParser<Cluster> {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(ProcessEntityParser.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(ClusterEntityParser.class);
 
     public ClusterEntityParser() {
         super(EntityType.CLUSTER);
@@ -122,9 +122,7 @@ public class ClusterEntityParser extends 
EntityParser<Cluster> {
                 conf.set(SecurityUtil.NN_PRINCIPAL, nameNodePrincipal);
             }
 
-            // todo: ideally check if the end user has access using 
createProxiedFileSystem
-            // hftp won't work and bug is logged at HADOOP-10215
-            HadoopClientFactory.get().createFileSystem(conf);
+            HadoopClientFactory.get().createProxiedFileSystem(conf);
         } catch (FalconException e) {
             throw new ValidationException("Invalid storage server or port: " + 
storageUrl, e);
         }
@@ -135,7 +133,7 @@ public class ClusterEntityParser extends 
EntityParser<Cluster> {
         LOG.info("Validating execute interface: {}", executeUrl);
 
         try {
-            HadoopClientFactory.validateJobClient(executeUrl);
+            HadoopClientFactory.get().validateJobClient(executeUrl);
         } catch (IOException e) {
             throw new ValidationException("Invalid Execute server or port: " + 
executeUrl, e);
         }
@@ -231,55 +229,69 @@ public class ClusterEntityParser extends 
EntityParser<Cluster> {
     }
 
     /**
-     * Validate the locations on the cluster is owned by falcon.
+     * Validate the locations on the cluster exists with appropriate 
permissions
+     * for the user to write to this directory.
      *
      * @param cluster cluster entity
      * @throws ValidationException
      */
     private void validateLocations(Cluster cluster) throws ValidationException 
{
+        Configuration conf = ClusterHelper.getConfiguration(cluster);
+        FileSystem fs;
         try {
-            Configuration conf = ClusterHelper.getConfiguration(cluster);
-            FileSystem fs = HadoopClientFactory.get().createFileSystem(conf);
-            for (Location location : cluster.getLocations().getLocations()) {
-                if (location.getName().equals("temp")) {
-                    continue;
-                }
-
-                try {
-                    Path locationPath = new Path(location.getPath());
-                    if (fs.exists(locationPath)) {
-                        FileStatus fileStatus = fs.getFileStatus(locationPath);
-                        checkPathPermissions(locationPath, fileStatus);
-                        checkPathOwner(locationPath, fileStatus);
-                    }
-                } catch (IOException e) {
-                    throw new ValidationException("Unable to validate the 
location " + location
-                            + "for cluster.", e);
-                }
-            }
+            fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
         } catch (FalconException e) {
-            throw new ValidationException("Unable to validate the locations 
for cluster.", e);
+            throw new ValidationException(
+                    "Unable to get file system handle for cluster " + 
cluster.getName(), e);
         }
-    }
 
-    private void checkPathPermissions(Path locationPath,
-                                      FileStatus fileStatus) throws 
ValidationException {
-        if (fileStatus.getPermission().getUserAction() != FsAction.ALL) {
-            LOG.error("Path {} doesn't have rwx permissions {}",
-                    locationPath, fileStatus.getPermission());
-            throw new ValidationException("Path " + locationPath
-                    + " doesn't have rwx permissions: " + 
fileStatus.getPermission());
+        for (Location location : cluster.getLocations().getLocations()) {
+            final String locationName = location.getName();
+            if (locationName.equals("temp")) {
+                continue;
+            }
+
+            try {
+                checkPathOwnerAndPermission(cluster.getName(), 
location.getPath(), fs,
+                        "staging".equals(locationName)
+                                ? HadoopClientFactory.ALL_PERMISSION
+                                : HadoopClientFactory.READ_EXECUTE_PERMISSION);
+            } catch (IOException e) {
+                throw new ValidationException("Unable to validate the location 
" + location
+                        + " for cluster " + cluster.getName(), e);
+            }
         }
     }
 
-    private void checkPathOwner(Path locationPath,
-                                FileStatus fileStatus) throws IOException, 
ValidationException {
-        final String owner = 
UserGroupInformation.getLoginUser().getShortUserName();
-        if (!fileStatus.getOwner().equals(owner)) {
-            LOG.error("Path {} with owner {} doesn't match the actual path 
owner {}",
-                    locationPath, owner, fileStatus.getOwner());
-            throw new ValidationException("Path [" + locationPath + "] with 
owner [" + owner
-                    + "] doesn't match the actual path  owner " + 
fileStatus.getOwner());
+    private void checkPathOwnerAndPermission(String clusterName, String 
location, FileSystem fs,
+                                             FsPermission expectedPermission)
+        throws IOException, ValidationException {
+
+        Path locationPath = new Path(location);
+        FileStatus fileStatus = fs.getFileStatus(locationPath);
+        if (!fs.exists(locationPath)) {
+            throw new ValidationException("Location " + location
+                    + " for cluster " + clusterName + " must exist.");
         }
+
+        // falcon owns this path on each cluster
+        final String loginUser = 
UserGroupInformation.getLoginUser().getShortUserName();
+        final String locationOwner = fileStatus.getOwner();
+        if (!locationOwner.equals(loginUser)) {
+            LOG.error("Location {} has owner {}, should be the process user 
{}",
+                    locationPath, locationOwner, loginUser);
+            throw new ValidationException("Path [" + locationPath + "] has 
owner [" + locationOwner
+                    + "], should be the process user " + loginUser);
+        }
+
+        if (fileStatus.getPermission().toShort() != 
expectedPermission.toShort()) {
+            LOG.error("Location {} has permissions {}, should be {}",
+                    locationPath, fileStatus.getPermission(), 
expectedPermission);
+            throw new ValidationException("Path " + locationPath + " has 
permissions: "
+                    + fileStatus.getPermission() + ", should be " + 
expectedPermission);
+        }
+
+        // try to list to see if the user is able to write to this folder
+        fs.listStatus(locationPath);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java 
b/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java
index 8a3f669..ac58280 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java
@@ -119,7 +119,7 @@ public abstract class EntityParser<T extends Entity> {
                              AccessControlList acl) throws 
AuthorizationException {
         try {
             SecurityUtil.getAuthorizationProvider().authorizeEntity(entityName,
-                    getEntityType().name(), acl, "validate", 
CurrentUser.getProxyUgi());
+                    getEntityType().name(), acl, "validate", 
CurrentUser.getProxyUGI());
         } catch (FalconException e) {
             throw new AuthorizationException(e);
         } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java 
b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
index 25fc21b..62a3e5b 100644
--- 
a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
+++ 
b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
@@ -58,22 +58,16 @@ public final class ConfigurationStore implements 
FalconService {
     private static final Logger AUDIT = LoggerFactory.getLogger("AUDIT");
     private static final String UTF_8 = CharEncoding.UTF_8;
 
-    private static final ConfigurationStore STORE = new ConfigurationStore();
+    private static final FsPermission STORE_PERMISSION =
+            new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
 
     private Set<ConfigurationChangeListener> listeners = new 
LinkedHashSet<ConfigurationChangeListener>();
 
     private ThreadLocal<Entity> updatesInProgress = new ThreadLocal<Entity>();
 
-    public static ConfigurationStore get() {
-        return STORE;
-    }
-
     private final Map<EntityType, ConcurrentHashMap<String, Entity>> dictionary
         = new HashMap<EntityType, ConcurrentHashMap<String, Entity>>();
 
-    private final FileSystem fs;
-    private final Path storePath;
-
     private static final Entity NULL = new Entity() {
         @Override
         public String getName() {
@@ -81,6 +75,15 @@ public final class ConfigurationStore implements 
FalconService {
         }
     };
 
+    private static final ConfigurationStore STORE = new ConfigurationStore();
+
+    public static ConfigurationStore get() {
+        return STORE;
+    }
+
+    private final FileSystem fs;
+    private final Path storePath;
+
     private ConfigurationStore() {
         for (EntityType type : EntityType.values()) {
             dictionary.put(type, new ConcurrentHashMap<String, Entity>());
@@ -98,13 +101,12 @@ public final class ConfigurationStore implements 
FalconService {
      */
     private FileSystem initializeFileSystem() {
         try {
-            FileSystem fileSystem = 
HadoopClientFactory.get().createFileSystem(storePath.toUri());
+            FileSystem fileSystem =
+                    
HadoopClientFactory.get().createFalconFileSystem(storePath.toUri());
             if (!fileSystem.exists(storePath)) {
                 LOG.info("Creating configuration store directory: {}", 
storePath);
-                fileSystem.mkdirs(storePath);
                 // set permissions so config store dir is owned by falcon alone
-                FsPermission permission = new FsPermission(FsAction.ALL, 
FsAction.NONE, FsAction.NONE);
-                fileSystem.setPermission(storePath, permission);
+                HadoopClientFactory.mkdirs(fileSystem, storePath, 
STORE_PERMISSION);
             }
 
             return fileSystem;
@@ -331,7 +333,7 @@ public final class ConfigurationStore implements 
FalconService {
      */
     private void archive(EntityType type, String name) throws IOException {
         Path archivePath = new Path(storePath, "archive" + Path.SEPARATOR + 
type);
-        fs.mkdirs(archivePath);
+        HadoopClientFactory.mkdirs(fs, archivePath, STORE_PERMISSION);
         fs.rename(new Path(storePath, type + Path.SEPARATOR + 
URLEncoder.encode(name, UTF_8) + ".xml"),
                 new Path(archivePath, URLEncoder.encode(name, UTF_8) + "." + 
System.currentTimeMillis()));
         LOG.info("Archived configuration {}/{}", type, name);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java 
b/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
index ecdbf14..3011b65 100644
--- a/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
+++ b/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
@@ -26,6 +26,9 @@ import org.apache.falcon.util.StartupProperties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -44,6 +47,11 @@ public final class HadoopClientFactory {
     public static final String MR_JT_ADDRESS_KEY = 
"mapreduce.jobtracker.address";
     public static final String YARN_RM_ADDRESS_KEY = 
"yarn.resourcemanager.address";
 
+    public static final FsPermission READ_EXECUTE_PERMISSION =
+            new FsPermission(FsAction.ALL, FsAction.READ_EXECUTE, 
FsAction.READ_EXECUTE);
+    public static final FsPermission ALL_PERMISSION =
+            new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
+
     private static final HadoopClientFactory INSTANCE = new 
HadoopClientFactory();
 
     private HadoopClientFactory() {
@@ -61,7 +69,7 @@ public final class HadoopClientFactory {
      * @throws org.apache.falcon.FalconException
      *          if the filesystem could not be created.
      */
-    public FileSystem createFileSystem(final URI uri) throws FalconException {
+    public FileSystem createFalconFileSystem(final URI uri) throws 
FalconException {
         Validate.notNull(uri, "uri cannot be null");
 
         try {
@@ -76,7 +84,7 @@ public final class HadoopClientFactory {
         }
     }
 
-    public FileSystem createFileSystem(final Configuration conf)
+    public FileSystem createFalconFileSystem(final Configuration conf)
         throws FalconException {
         Validate.notNull(conf, "configuration cannot be null");
 
@@ -90,17 +98,6 @@ public final class HadoopClientFactory {
         }
     }
 
-    public FileSystem createFileSystem(final URI uri, final Configuration conf)
-        throws FalconException {
-        Validate.notNull(uri, "uri cannot be null");
-
-        try {
-            return createFileSystem(UserGroupInformation.getLoginUser(), uri, 
conf);
-        } catch (IOException e) {
-            throw new FalconException("Exception while getting FileSystem for: 
" + uri, e);
-        }
-    }
-
     /**
      * Return a FileSystem created with the authenticated proxy user for the 
specified conf.
      *
@@ -115,7 +112,7 @@ public final class HadoopClientFactory {
 
         String nameNode = conf.get(FS_DEFAULT_NAME_KEY);
         try {
-            return createFileSystem(CurrentUser.getProxyUgi(), new 
URI(nameNode), conf);
+            return createFileSystem(getProxyUGI(), new URI(nameNode), conf);
         } catch (URISyntaxException e) {
             throw new FalconException("Exception while getting FileSystem for 
proxy: "
                     + CurrentUser.getUser(), e);
@@ -125,6 +122,32 @@ public final class HadoopClientFactory {
         }
     }
 
+    public FileSystem createProxiedFileSystem(final URI uri) throws 
FalconException {
+        return createProxiedFileSystem(uri, new Configuration());
+    }
+
+    public FileSystem createProxiedFileSystem(final URI uri,
+                                              final Configuration conf) throws 
FalconException {
+        Validate.notNull(uri, "uri cannot be null");
+
+        try {
+            return createFileSystem(getProxyUGI(), uri, conf);
+        } catch (IOException e) {
+            throw new FalconException("Exception while getting FileSystem for 
proxy: "
+                    + CurrentUser.getUser(), e);
+        }
+    }
+
+    private UserGroupInformation getProxyUGI() throws IOException {
+        try { // get the authenticated user
+            return CurrentUser.getProxyUGI();
+        } catch (Exception ignore) {
+            // ignore since the user authentication might have failed or in 
oozie
+        }
+
+        return UserGroupInformation.getCurrentUser();
+    }
+
     /**
      * Return a FileSystem created with the provided user for the specified 
URI.
      *
@@ -136,8 +159,8 @@ public final class HadoopClientFactory {
      *          if the filesystem could not be created.
      */
     @SuppressWarnings("ResultOfMethodCallIgnored")
-    public FileSystem createFileSystem(UserGroupInformation ugi, final URI 
uri, final Configuration conf)
-        throws FalconException {
+    public FileSystem createFileSystem(UserGroupInformation ugi, final URI uri,
+                                       final Configuration conf) throws 
FalconException {
         Validate.notNull(ugi, "ugi cannot be null");
         Validate.notNull(conf, "configuration cannot be null");
 
@@ -172,7 +195,7 @@ public final class HadoopClientFactory {
      * @param executeUrl jt url or RM url
      * @throws IOException
      */
-    public static void validateJobClient(String executeUrl) throws IOException 
{
+    public void validateJobClient(String executeUrl) throws IOException {
         final JobConf jobConf = new JobConf();
         jobConf.set(MR_JT_ADDRESS_KEY, executeUrl);
         jobConf.set(YARN_RM_ADDRESS_KEY, executeUrl);
@@ -190,4 +213,23 @@ public final class HadoopClientFactory {
             throw new IOException("Exception creating job client:" + 
e.getMessage(), e);
         }
     }
+
+    public static FsPermission getDirDefaultPermission(Configuration conf) {
+        return 
FsPermission.getDirDefault().applyUMask(FsPermission.getUMask(conf));
+    }
+
+    public static FsPermission getFileDefaultPermission(Configuration conf) {
+        return 
FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf));
+    }
+
+    public static void mkdirsWithDefaultPerms(FileSystem fs, Path path) throws 
IOException {
+        mkdirs(fs, path, getDirDefaultPermission(fs.getConf()));
+    }
+
+    public static void mkdirs(FileSystem fs, Path path,
+                              FsPermission permission) throws IOException {
+        if (!FileSystem.mkdirs(fs, path, permission)) {
+            throw new IOException("mkdir failed for " + path);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/common/src/main/java/org/apache/falcon/security/CurrentUser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/CurrentUser.java 
b/common/src/main/java/org/apache/falcon/security/CurrentUser.java
index b7d2c66..cfea143 100644
--- a/common/src/main/java/org/apache/falcon/security/CurrentUser.java
+++ b/common/src/main/java/org/apache/falcon/security/CurrentUser.java
@@ -49,7 +49,7 @@ public final class CurrentUser {
 
     private final ThreadLocal<Subject> currentSubject = new 
ThreadLocal<Subject>();
 
-    public static void authenticate(String user) {
+    public static void authenticate(final String user) {
         if (user == null || user.isEmpty()) {
             throw new IllegalStateException("Bad user name sent for 
authentication");
         }
@@ -59,6 +59,13 @@ public final class CurrentUser {
 
         Subject subject = new Subject();
         subject.getPrincipals().add(new FalconPrincipal(user));
+
+        try {  // initialize proxy user
+            createProxyUGI(user);
+        } catch (IOException e) {
+            throw new IllegalStateException("Unable to create a proxy user");
+        }
+
         LOG.info("Logging in {}", user);
         INSTANCE.currentSubject.set(subject);
     }
@@ -93,19 +100,38 @@ public final class CurrentUser {
             new ConcurrentHashMap<String, UserGroupInformation>();
 
     /**
+     * Create a proxy UGI object for the current authenticated user.
+     *
+     * @param proxyUser logged in user
+     * @return UGI object
+     * @throws IOException
+     */
+    public static UserGroupInformation createProxyUGI(String proxyUser) throws 
IOException {
+        UserGroupInformation proxyUgi = userUgiMap.get(proxyUser);
+        if (proxyUgi == null) {
+            // taking care of a race condition, the latest UGI will be 
discarded
+            proxyUgi = UserGroupInformation.createProxyUser(
+                    proxyUser, UserGroupInformation.getLoginUser());
+            userUgiMap.putIfAbsent(proxyUser, proxyUgi);
+        }
+
+        return proxyUgi;
+    }
+
+    /**
      * Dole out a proxy UGI object for the current authenticated user.
      *
      * @return UGI object
      * @throws java.io.IOException
      */
-    public static UserGroupInformation getProxyUgi() throws IOException {
+    public static UserGroupInformation getProxyUGI() throws IOException {
         String proxyUser = getUser();
 
         UserGroupInformation proxyUgi = userUgiMap.get(proxyUser);
         if (proxyUgi == null) {
             // taking care of a race condition, the latest UGI will be 
discarded
-            proxyUgi = UserGroupInformation
-                    .createProxyUser(proxyUser, 
UserGroupInformation.getLoginUser());
+            proxyUgi = UserGroupInformation.createProxyUser(
+                    proxyUser, UserGroupInformation.getLoginUser());
             userUgiMap.putIfAbsent(proxyUser, proxyUgi);
         }
 
@@ -113,7 +139,17 @@ public final class CurrentUser {
     }
 
     public static Set<String> getGroupNames() throws IOException {
-        HashSet<String> s = new 
HashSet<String>(Arrays.asList(getProxyUgi().getGroupNames()));
+        HashSet<String> s = new 
HashSet<String>(Arrays.asList(getProxyUGI().getGroupNames()));
         return Collections.unmodifiableSet(s);
     }
+
+    public static String getPrimaryGroupName() {
+        try {
+            return getProxyUGI().getPrimaryGroupName();
+        } catch (IOException ignore) {
+            // ignored
+        }
+
+        return "unknown"; // this can only happen in tests
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java
 
b/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java
index e90518d..4b7c4a9 100644
--- 
a/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java
+++ 
b/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java
@@ -25,7 +25,6 @@ import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.v0.AccessControlList;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AuthorizationException;
@@ -284,7 +283,7 @@ public class DefaultAuthorizationProvider implements 
AuthorizationProvider {
         if (entityName != null) { // lifecycle actions
             Entity entity = getEntity(entityName, entityType);
             authorizeEntity(entity.getName(), entity.getEntityType().name(),
-                    getACL(entity), action, proxyUgi);
+                    EntityUtil.getACL(entity), action, proxyUgi);
         } else {
             // non lifecycle actions, lifecycle actions with null entity will 
validate later
             LOG.info("Authorization for action={} will be done in the API", 
action);
@@ -300,24 +299,6 @@ public class DefaultAuthorizationProvider implements 
AuthorizationProvider {
         }
     }
 
-    protected AccessControlList getACL(Entity entity) throws 
AuthorizationException {
-        switch (entity.getEntityType()) {
-        case CLUSTER:
-            return ((Cluster) entity).getACL();
-
-        case FEED:
-            return ((org.apache.falcon.entity.v0.feed.Feed) entity).getACL();
-
-        case PROCESS:
-            return ((org.apache.falcon.entity.v0.process.Process) 
entity).getACL();
-
-        default:
-            break;
-        }
-
-        throw new AuthorizationException("Cannot get owner for entity: " + 
entity.getName());
-    }
-
     protected void authorizeLineageResource(String authenticatedUser, String 
action) {
         LOG.debug("User {} authorized for action {} ", authenticatedUser, 
action);
         // todo - do nothing for now, read-only for all

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java 
b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index f3e643e..ebd4dd4 100644
--- 
a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ 
b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -30,8 +30,6 @@ import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.json.simple.JSONValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -285,13 +283,8 @@ public class WorkflowExecutionContext {
         OutputStream out = null;
         Path file = new Path(contextFile);
         try {
-            FileSystem fs = 
HadoopClientFactory.get().createFileSystem(file.toUri());
+            FileSystem fs = 
HadoopClientFactory.get().createProxiedFileSystem(file.toUri());
             out = fs.create(file);
-
-            // making sure falcon can read this file
-            FsPermission permission = new FsPermission(FsAction.ALL, 
FsAction.ALL, FsAction.ALL);
-            fs.setPermission(file, permission);
-
             out.write(JSONValue.toJSONString(context).getBytes());
         } catch (IOException e) {
             throw new FalconException("Error serializing context to: " + 
contextFile,  e);
@@ -315,7 +308,8 @@ public class WorkflowExecutionContext {
     public static WorkflowExecutionContext deSerialize(String contextFile) 
throws FalconException {
         try {
             Path lineageDataPath = new Path(contextFile); // file has 777 
permissions
-            FileSystem fs = 
HadoopClientFactory.get().createFileSystem(lineageDataPath.toUri());
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+                    lineageDataPath.toUri());
 
             BufferedReader in = new BufferedReader(new 
InputStreamReader(fs.open(lineageDataPath)));
             return new WorkflowExecutionContext((Map<WorkflowExecutionArgs, 
String>) JSONValue.parse(in));

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java
----------------------------------------------------------------------
diff --git 
a/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java 
b/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java
index 6ad742e..8e2e544 100644
--- a/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java
@@ -25,6 +25,7 @@ import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.security.CurrentUser;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
@@ -69,7 +70,7 @@ public class LogCleanupServiceTest extends AbstractTestBase {
     @Override
     @BeforeClass
     public void setup() throws Exception {
-        this.dfsCluster = EmbeddedCluster.newCluster("testCluster");
+        this.dfsCluster = EmbeddedCluster.newCluster("testCluster", 
CurrentUser.getUser());
         conf = dfsCluster.getConf();
         fs = dfsCluster.getFileSystem();
         fs.delete(new Path("/"), true);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
----------------------------------------------------------------------
diff --git 
a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java 
b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
index 2d41661..a1319b8 100644
--- a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
+++ b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
@@ -91,6 +91,9 @@ public class AbstractTestBase {
     }
 
     protected void storeEntity(EntityType type, String name) throws Exception {
+        final String proxyUser = CurrentUser.getUser();
+        final String defaultGroupName = CurrentUser.getPrimaryGroupName();
+
         Unmarshaller unmarshaller = type.getUnmarshaller();
         store = ConfigurationStore.get();
         store.remove(type, name);
@@ -100,12 +103,16 @@ public class AbstractTestBase {
             cluster.setName(name);
             ClusterHelper.getInterface(cluster, Interfacetype.WRITE)
                     
.setEndpoint(conf.get(HadoopClientFactory.FS_DEFAULT_NAME_KEY));
+            decorateACL(proxyUser, defaultGroupName, cluster);
+
             store.publish(type, cluster);
             break;
 
         case FEED:
             Feed feed = (Feed) 
unmarshaller.unmarshal(this.getClass().getResource(FEED_XML));
             feed.setName(name);
+            decorateACL(proxyUser, defaultGroupName, feed);
+
             store.publish(type, feed);
             break;
 
@@ -117,12 +124,52 @@ public class AbstractTestBase {
             if (!fs.exists(new Path(process.getWorkflow() + "/lib"))) {
                 fs.mkdirs(new Path(process.getWorkflow() + "/lib"));
             }
+
+            decorateACL(proxyUser, defaultGroupName, process);
+
             store.publish(type, process);
             break;
         default:
         }
     }
 
+    private void decorateACL(String proxyUser, String defaultGroupName, 
Cluster cluster) {
+        if (cluster.getACL() != null) {
+            return;
+        }
+
+        org.apache.falcon.entity.v0.cluster.ACL clusterACL =
+                new org.apache.falcon.entity.v0.cluster.ACL();
+        clusterACL.setOwner(proxyUser);
+        clusterACL.setGroup(defaultGroupName);
+        cluster.setACL(clusterACL);
+    }
+
+    private void decorateACL(String proxyUser, String defaultGroupName, Feed 
feed) {
+        if (feed.getACL() != null) {
+            return;
+        }
+
+        org.apache.falcon.entity.v0.feed.ACL feedACL =
+                new org.apache.falcon.entity.v0.feed.ACL();
+        feedACL.setOwner(proxyUser);
+        feedACL.setGroup(defaultGroupName);
+        feed.setACL(feedACL);
+    }
+
+    private void decorateACL(String proxyUser, String defaultGroupName,
+                             Process process) {
+        if (process.getACL() != null) {
+            return;
+        }
+
+        org.apache.falcon.entity.v0.process.ACL processACL =
+                new org.apache.falcon.entity.v0.process.ACL();
+        processACL.setOwner(proxyUser);
+        processACL.setGroup(defaultGroupName);
+        process.setACL(processACL);
+    }
+
     public void setup() throws Exception {
         store = ConfigurationStore.get();
         for (EntityType type : EntityType.values()) {
@@ -147,7 +194,7 @@ public class AbstractTestBase {
 
     // assumes there will always be at least one group for a logged in user
     protected String getGroupName() throws IOException {
-        String[] groupNames = CurrentUser.getProxyUgi().getGroupNames();
+        String[] groupNames = CurrentUser.getProxyUGI().getGroupNames();
         System.out.println("groupNames = " + Arrays.asList(groupNames));
         return groupNames[0];
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/common/src/test/java/org/apache/falcon/hadoop/HadoopClientFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/common/src/test/java/org/apache/falcon/hadoop/HadoopClientFactoryTest.java 
b/common/src/test/java/org/apache/falcon/hadoop/HadoopClientFactoryTest.java
index 8f25f57..1cb08ac 100644
--- a/common/src/test/java/org/apache/falcon/hadoop/HadoopClientFactoryTest.java
+++ b/common/src/test/java/org/apache/falcon/hadoop/HadoopClientFactoryTest.java
@@ -64,7 +64,7 @@ public class HadoopClientFactoryTest {
             Configuration conf = embeddedCluster.getConf();
             URI uri = new 
URI(conf.get(HadoopClientFactory.FS_DEFAULT_NAME_KEY));
             Assert.assertNotNull(uri);
-            
HadoopClientFactory.get().createFileSystem(CurrentUser.getProxyUgi(), uri, 
conf);
+            
HadoopClientFactory.get().createFileSystem(CurrentUser.getProxyUGI(), uri, 
conf);
             Assert.fail("Impersonation should have failed.");
         } catch (Exception e) {
             Assert.assertEquals(e.getCause().getClass(), 
RemoteException.class);
@@ -99,7 +99,7 @@ public class HadoopClientFactoryTest {
         Assert.assertNotNull(uri);
 
         CurrentUser.authenticate(System.getProperty("user.name"));
-        FileSystem fs = 
HadoopClientFactory.get().createFileSystem(CurrentUser.getProxyUgi(), uri, 
conf);
+        FileSystem fs = 
HadoopClientFactory.get().createFileSystem(CurrentUser.getProxyUGI(), uri, 
conf);
         Assert.assertNotNull(fs);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/common/src/test/java/org/apache/falcon/security/CurrentUserTest.java
----------------------------------------------------------------------
diff --git 
a/common/src/test/java/org/apache/falcon/security/CurrentUserTest.java 
b/common/src/test/java/org/apache/falcon/security/CurrentUserTest.java
index a1861e1..187d85e 100644
--- a/common/src/test/java/org/apache/falcon/security/CurrentUserTest.java
+++ b/common/src/test/java/org/apache/falcon/security/CurrentUserTest.java
@@ -37,7 +37,7 @@ public class CurrentUserTest {
     @Test
     public void testGetProxyUser() throws Exception {
         CurrentUser.authenticate("proxy");
-        UserGroupInformation proxyUgi = CurrentUser.getProxyUgi();
+        UserGroupInformation proxyUgi = CurrentUser.getProxyUGI();
         Assert.assertNotNull(proxyUgi);
         Assert.assertEquals(proxyUgi.getUserName(), "proxy");
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/docs/src/site/twiki/EntitySpecification.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/EntitySpecification.twiki 
b/docs/src/site/twiki/EntitySpecification.twiki
index a710dfd..b0dfb7f 100644
--- a/docs/src/site/twiki/EntitySpecification.twiki
+++ b/docs/src/site/twiki/EntitySpecification.twiki
@@ -69,6 +69,11 @@ Location has the name and the path, name is the type of 
locations like staging,
 and path is the hdfs path for each location.
 Falcon would use the location to do intermediate processing of entities in 
hdfs and hence Falcon
 should have read/write/execute permission on these locations.
+These locations MUST be created prior to submitting a cluster entity to Falcon.
+*staging* must have 777 permissions and the parent dirs must have execute 
permissions so multiple
+users can write to this location
+*working* must have 755 permissions and the parent dirs must have execute 
permissions so multiple
+users can read from this location
 
 ---+++ ACL
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/docs/src/site/twiki/InstallationSteps.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/InstallationSteps.twiki 
b/docs/src/site/twiki/InstallationSteps.twiki
index 9cddcbb..88f3226 100644
--- a/docs/src/site/twiki/InstallationSteps.twiki
+++ b/docs/src/site/twiki/InstallationSteps.twiki
@@ -261,6 +261,9 @@ src/bin/package.sh <<hadoop-version>> <<oozie-version>>
 bin/falcon-start
 </verbatim>
 Make sure the hadoop and oozie endpoints are according to your setup in 
examples/entity/filesystem/standalone-cluster.xml
+The cluster locations,staging and working dirs, MUST be created prior to 
submitting a cluster entity to Falcon.
+*staging* must have 777 permissions and the parent dirs must have execute 
permissions
+*working* must have 755 permissions and the parent dirs must have execute 
permissions
 <verbatim>
 bin/falcon entity -submit -type cluster -file 
examples/entity/filesystem/standalone-cluster.xml
 </verbatim>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/docs/src/site/twiki/OnBoarding.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/OnBoarding.twiki 
b/docs/src/site/twiki/OnBoarding.twiki
index 75d0d7e..4f49c5a 100644
--- a/docs/src/site/twiki/OnBoarding.twiki
+++ b/docs/src/site/twiki/OnBoarding.twiki
@@ -16,6 +16,10 @@
 ---+++ Sample Pipeline
 ---++++ Cluster   
 Cluster definition that contains end points for name node, job tracker, oozie 
and jms server:
+The cluster locations MUST be created prior to submitting a cluster entity to 
Falcon.
+*staging* must have 777 permissions and the parent dirs must have execute 
permissions
+*working* must have 755 permissions and the parent dirs must have execute 
permissions
+
 <verbatim>
 <?xml version="1.0"?>
 <!--

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/docs/src/site/twiki/Security.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/Security.twiki 
b/docs/src/site/twiki/Security.twiki
index 4e33182..f4db28b 100644
--- a/docs/src/site/twiki/Security.twiki
+++ b/docs/src/site/twiki/Security.twiki
@@ -301,11 +301,17 @@ Falcon should be configured to communicate with Prism 
over TLS in secure mode. I
 
 ---++ Changes to ownership and permissions of directories managed by Falcon
 
-| *Directory*             | *Location*                                         
               | *Owner* | *Permissions* |
-| Configuration Store     | ${config.store.uri}                                
               | falcon  | 750           |
-| Oozie coord/bundle XMLs | 
${cluster.staging-location}/workflows/{entity}/{entity-name}      | falcon  | 
644           |
-| Shared libs             | {cluster.working}/{lib,libext}                     
               | falcon  | 755           |
-| App logs                | 
${cluster.staging-location}/workflows/{entity}/{entity-name}/logs | falcon  | 
777           |
+| *Directory*              | *Location*                                        
                | *Owner* | *Permissions* |
+| Configuration Store      | ${config.store.uri}                               
                | falcon  | 700           |
+| Cluster Staging Location | ${cluster.staging-location}                       
                | falcon  | 777           |
+| Cluster Working Location | ${cluster.working-location}                       
                | falcon  | 755           |
+| Shared libs              | {cluster.working}/{lib,libext}                    
                | falcon  | 755           |
+| Oozie coord/bundle XMLs  | 
${cluster.staging-location}/workflows/{entity}/{entity-name}      | $user   | 
cluster umask |
+| App logs                 | 
${cluster.staging-location}/workflows/{entity}/{entity-name}/logs | $user   | 
cluster umask |
+
+*Note:* Please note that the cluster staging and working locations MUST be 
created prior to
+submitting a cluster entity to Falcon. Also, note that the the parent dirs 
must have execute
+permissions.
 
 
 ---++ Backwards compatibility

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/JailedFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/JailedFileSystem.java
 
b/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/JailedFileSystem.java
index 72e390e..7156bbd 100644
--- 
a/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/JailedFileSystem.java
+++ 
b/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/JailedFileSystem.java
@@ -142,6 +142,11 @@ public class JailedFileSystem extends FileSystem {
     }
 
     @Override
+    public void setPermission(Path p, FsPermission permission) throws 
IOException {
+        localFS.setPermission(toLocalPath(p), permission);
+    }
+
+    @Override
     public FileChecksum getFileChecksum(Path f) throws IOException {
         final byte[] md5 = DigestUtils.md5(FileUtils.readFileToByteArray(new 
File(toLocalPath(f).toString())));
         return new FileChecksum() {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
----------------------------------------------------------------------
diff --git 
a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java 
b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
index 4a0bc2a..300fecf 100644
--- 
a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
+++ 
b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
@@ -21,6 +21,7 @@ package org.apache.falcon.messaging;
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.aspect.GenericAlert;
+import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
 import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
@@ -99,6 +100,9 @@ public class JMSMessageConsumer implements MessageListener, 
ExceptionListener {
             WorkflowExecutionContext context = createContext(mapMessage);
             LOG.info("Created context from JMS message {}", context);
 
+            // Login the user so listeners can access FS and WfEngine as this 
user
+            CurrentUser.authenticate(context.getWorkflowUser());
+
             if (context.hasWorkflowFailed()) {
                 onFailure(context);
             } else if (context.hasWorkflowSucceeded()) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
----------------------------------------------------------------------
diff --git 
a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java 
b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
index 629e6a5..dece932 100644
--- 
a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
+++ 
b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
@@ -18,10 +18,10 @@
 
 package org.apache.falcon.messaging;
 
+import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.retention.EvictedInstanceSerDe;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
@@ -35,7 +35,6 @@ import javax.jms.MapMessage;
 import javax.jms.Message;
 import javax.jms.Session;
 import javax.jms.Topic;
-import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -184,7 +183,7 @@ public class JMSMessageProducer {
         String[] feedPaths;
         try {
             feedPaths = getFeedPaths();
-        } catch (IOException e) {
+        } catch (Exception e) {
             LOG.error("Error getting instance paths: ", e);
             throw new RuntimeException(e);
         }
@@ -248,7 +247,7 @@ public class JMSMessageProducer {
         message.put(key.getName(), value);
     }
 
-    private String[] getFeedPaths() throws IOException {
+    private String[] getFeedPaths() throws Exception {
         WorkflowExecutionContext.EntityOperations operation = 
context.getOperation();
         if (operation == WorkflowExecutionContext.EntityOperations.GENERATE
                 || operation == 
WorkflowExecutionContext.EntityOperations.REPLICATE) {
@@ -258,7 +257,7 @@ public class JMSMessageProducer {
 
         // else case of feed retention
         Path logFile = new Path(context.getLogFile());
-        FileSystem fs = FileSystem.get(logFile.toUri(), new Configuration());
+        FileSystem fs = 
HadoopClientFactory.get().createProxiedFileSystem(logFile.toUri());
 
         if (!fs.exists(logFile)) {
             // Evictor Failed without deleting a single path

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java 
b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
index 1a08ada..4a22ff2 100644
--- a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
+++ b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
@@ -20,6 +20,7 @@ package org.apache.falcon.logging;
 
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.process.EngineType;
+import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -71,7 +72,7 @@ public class JobLogMover {
 
             Path path = new Path(context.getLogDir() + "/"
                     + String.format("%03d", context.getWorkflowRunId()));
-            FileSystem fs = path.getFileSystem(getConf());
+            FileSystem fs = 
HadoopClientFactory.get().createProxiedFileSystem(path.toUri());
 
             if 
(EntityType.FEED.name().equalsIgnoreCase(context.getEntityType())
                     || 
notUserWorkflowEngineIsOozie(context.getUserWorkflowEngine())) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java 
b/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java
index 4ed8f52..2e5dffb 100644
--- a/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java
+++ b/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java
@@ -53,7 +53,7 @@ public final class LogProvider {
         try {
             Configuration conf = ClusterHelper.getConfiguration(clusterObj);
             // fs on behalf of the end user.
-            FileSystem fs = HadoopClientFactory.get().createFileSystem(conf);
+            FileSystem fs = 
HadoopClientFactory.get().createProxiedFileSystem(conf);
             String resolvedRunId = getResolvedRunId(fs, clusterObj, entity, 
instance, runId);
             // if runId param is not resolved, i.e job is killed or not 
started or running
             if (resolvedRunId.equals("-")

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
----------------------------------------------------------------------
diff --git 
a/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java 
b/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
index 82f7251..957300a 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
@@ -18,7 +18,6 @@
 
 package org.apache.falcon.oozie;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
@@ -34,8 +33,6 @@ import org.apache.falcon.util.OozieUtils;
 import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.oozie.client.OozieClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,7 +42,6 @@ import javax.xml.bind.JAXBException;
 import javax.xml.bind.Unmarshaller;
 import javax.xml.transform.stream.StreamSource;
 import java.io.IOException;
-import java.io.InputStream;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.Properties;
@@ -77,8 +73,6 @@ public abstract class OozieBundleBuilder<T extends Entity> 
extends OozieEntityBu
         bundle.setName(EntityUtil.getWorkflowName(entity).toString());
         // all the properties are set prior to bundle and coordinators creation
 
-        createLogsDir(cluster, buildPath); //create logs dir
-
         for (Properties coordProps : coords) {
             // add the coordinator to the bundle
             COORDINATOR coord = new COORDINATOR();
@@ -133,23 +127,6 @@ public abstract class OozieBundleBuilder<T extends Entity> 
extends OozieEntityBu
         return properties;
     }
 
-    private void createLogsDir(Cluster cluster, Path buildPath) throws 
FalconException {
-        try {
-            FileSystem fs = 
HadoopClientFactory.get().createFileSystem(buildPath.toUri(),
-                ClusterHelper.getConfiguration(cluster));
-            Path logsDir = new Path(buildPath.getParent(), "logs");
-            if (!fs.mkdirs(logsDir)) {
-                throw new FalconException("Failed to create " + logsDir);
-            }
-
-            // logs are copied with in oozie as the user in Post Processing 
and hence 777 permissions
-            FsPermission permission = new FsPermission(FsAction.ALL, 
FsAction.ALL, FsAction.ALL);
-            fs.setPermission(logsDir, permission);
-        } catch (IOException e) {
-            throw new FalconException(e);
-        }
-    }
-
     protected Path marshal(Cluster cluster, BUNDLEAPP bundle, Path outPath) 
throws FalconException {
         return marshal(cluster, new 
org.apache.falcon.oozie.bundle.ObjectFactory().createBundleApp(bundle),
             OozieUtils.BUNDLE_JAXB_CONTEXT, new Path(outPath, "bundle.xml"));
@@ -160,20 +137,17 @@ public abstract class OozieBundleBuilder<T extends 
Entity> extends OozieEntityBu
     protected abstract List<Properties> buildCoords(Cluster cluster, Path 
bundlePath) throws FalconException;
 
     public static BUNDLEAPP unmarshal(Cluster cluster, Path path) throws 
FalconException {
-        InputStream resourceAsStream = null;
         try {
-            FileSystem fs =
-                HadoopClientFactory.get().createFileSystem(path.toUri(), 
ClusterHelper.getConfiguration(cluster));
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+                        path.toUri(), ClusterHelper.getConfiguration(cluster));
             Unmarshaller unmarshaller = 
OozieUtils.BUNDLE_JAXB_CONTEXT.createUnmarshaller();
-            @SuppressWarnings("unchecked") JAXBElement<BUNDLEAPP> jaxbElement 
= (JAXBElement<BUNDLEAPP>)
-                unmarshaller.unmarshal(new StreamSource(fs.open(path)), 
BUNDLEAPP.class);
+            @SuppressWarnings("unchecked") JAXBElement<BUNDLEAPP> jaxbElement =
+                    unmarshaller.unmarshal(new StreamSource(fs.open(path)), 
BUNDLEAPP.class);
             return jaxbElement.getValue();
         } catch (JAXBException e) {
             throw new FalconException(e);
         } catch (IOException e) {
             throw new FalconException(e);
-        } finally {
-            IOUtils.closeQuietly(resourceAsStream);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git 
a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java 
b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
index fe2136b..2ceb91e 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
@@ -120,7 +120,8 @@ public abstract class OozieCoordinatorBuilder<T extends 
Entity> extends OozieEnt
         props.put(WorkflowExecutionArgs.TIMESTAMP.getName(), ACTUAL_TIME_EL);
         props.put("falconDataOperation", getOperation().name());
 
-        props.put(WorkflowExecutionArgs.LOG_DIR.getName(), 
getLogDirectory(cluster));
+        props.put(WorkflowExecutionArgs.LOG_DIR.getName(),
+                getStoragePath(EntityUtil.getLogPath(cluster, entity)));
         props.put(OozieClient.EXTERNAL_ID,
             new ExternalId(entity.getName(), 
EntityUtil.getWorkflowNameTag(coordName, entity),
                 "${coord:nominalTime()}").getId());

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
----------------------------------------------------------------------
diff --git 
a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java 
b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
index 1c3085c..e341fb8 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
@@ -23,7 +23,6 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.CatalogStorage;
 import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.ProcessHelper;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.cluster.Cluster;
@@ -127,8 +126,8 @@ public abstract class OozieEntityBuilder<T extends Entity> {
         throw new IllegalArgumentException("Unhandled type: " + 
entity.getEntityType());
     }
 
-    protected Path marshal(Cluster cluster, JAXBElement<?> jaxbElement, 
JAXBContext jaxbContext, Path outPath)
-        throws FalconException {
+    protected Path marshal(Cluster cluster, JAXBElement<?> jaxbElement,
+                           JAXBContext jaxbContext, Path outPath) throws 
FalconException {
         try {
             Marshaller marshaller = jaxbContext.createMarshaller();
             marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, 
Boolean.TRUE);
@@ -140,8 +139,8 @@ public abstract class OozieEntityBuilder<T extends Entity> {
                 LOG.debug(writer.getBuffer().toString());
             }
 
-            FileSystem fs = HadoopClientFactory.get().createFileSystem(
-                outPath.toUri(), ClusterHelper.getConfiguration(cluster));
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+                    outPath.toUri(), ClusterHelper.getConfiguration(cluster));
             OutputStream out = fs.create(outPath);
             try {
                 marshaller.marshal(jaxbElement, out);
@@ -261,8 +260,11 @@ public abstract class OozieEntityBuilder<T extends Entity> 
{
 
     protected void copySharedLibs(Cluster cluster, Path libPath) throws 
FalconException {
         try {
-            
SharedLibraryHostingService.pushLibsToHDFS(StartupProperties.get().getProperty("system.lib.location"),
-                libPath, cluster, FALCON_JAR_FILTER);
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+                    libPath.toUri(), ClusterHelper.getConfiguration(cluster));
+            SharedLibraryHostingService.pushLibsToHDFS(
+                    fs, 
StartupProperties.get().getProperty("system.lib.location"),
+                    libPath, FALCON_JAR_FILTER);
         } catch (IOException e) {
             throw new FalconException("Failed to copy shared libs on cluster " 
+ cluster.getName(), e);
         }
@@ -279,16 +281,11 @@ public abstract class OozieEntityBuilder<T extends 
Entity> {
         return prop;
     }
 
-    protected String getLogDirectory(Cluster cluster) {
-        return getStoragePath(new Path(EntityUtil.getBaseStagingPath(cluster, 
entity), "logs"));
-    }
-
     protected <T> T unmarshal(String template, JAXBContext context, Class<T> 
cls) throws FalconException {
         InputStream resourceAsStream = null;
         try {
             resourceAsStream = 
OozieEntityBuilder.class.getResourceAsStream(template);
             Unmarshaller unmarshaller = context.createUnmarshaller();
-            @SuppressWarnings("unchecked")
             JAXBElement<T> jaxbElement = unmarshaller.unmarshal(new 
StreamSource(resourceAsStream), cls);
             return jaxbElement.getValue();
         } catch (JAXBException e) {
@@ -310,5 +307,4 @@ public abstract class OozieEntityBuilder<T extends Entity> {
         }
         throw new IllegalArgumentException("Unhandled type " + 
entity.getEntityType());
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git 
a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
 
b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
index 2339284..3a3e26e 100644
--- 
a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
+++ 
b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
@@ -210,7 +210,8 @@ public abstract class OozieOrchestrationWorkflowBuilder<T 
extends Entity> extend
 
     protected void addLibExtensionsToWorkflow(Cluster cluster, WORKFLOWAPP wf, 
Tag tag) throws FalconException {
         String libext = ClusterHelper.getLocation(cluster, "working") + 
"/libext";
-        FileSystem fs = 
HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
+        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+                ClusterHelper.getConfiguration(cluster));
         try {
             addExtensionJars(fs, new Path(libext), wf);
             addExtensionJars(fs, new Path(libext, 
entity.getEntityType().name()), wf);
@@ -261,12 +262,13 @@ public abstract class OozieOrchestrationWorkflowBuilder<T 
extends Entity> extend
     }
 
     // creates hive-site.xml configuration in conf dir for the given cluster 
on the same cluster.
-    protected void createHiveConfiguration(Cluster cluster, Path workflowPath, 
String prefix) throws FalconException {
+    protected void createHiveConfiguration(Cluster cluster, Path workflowPath,
+                                           String prefix) throws 
FalconException {
         Configuration hiveConf = getHiveCredentialsAsConf(cluster);
 
         try {
             Configuration conf = ClusterHelper.getConfiguration(cluster);
-            FileSystem fs = HadoopClientFactory.get().createFileSystem(conf);
+            FileSystem fs = 
HadoopClientFactory.get().createProxiedFileSystem(conf);
 
             // create hive conf to stagingDir
             Path confPath = new Path(workflowPath + "/conf");
@@ -277,8 +279,8 @@ public abstract class OozieOrchestrationWorkflowBuilder<T 
extends Entity> extend
         }
     }
 
-    private void persistHiveConfiguration(FileSystem fs, Path confPath, 
Configuration hiveConf, String prefix)
-        throws IOException {
+    private void persistHiveConfiguration(FileSystem fs, Path confPath, 
Configuration hiveConf,
+                                          String prefix) throws IOException {
         OutputStream out = null;
         try {
             out = fs.create(new Path(confPath, prefix + "hive-site.xml"));

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git 
a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
 
b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
index 801d733..c5366dc 100644
--- 
a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
+++ 
b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
@@ -281,7 +281,7 @@ public class FeedReplicationCoordinatorBuilder extends 
OozieCoordinatorBuilder<F
     private void setupHiveConfiguration(Cluster srcCluster, Cluster trgCluster,
                                         Path buildPath) throws FalconException 
{
         Configuration conf = ClusterHelper.getConfiguration(trgCluster);
-        FileSystem fs = HadoopClientFactory.get().createFileSystem(conf);
+        FileSystem fs = 
HadoopClientFactory.get().createProxiedFileSystem(conf);
 
         try {
             // copy import export scripts to stagingDir
@@ -298,7 +298,8 @@ public class FeedReplicationCoordinatorBuilder extends 
OozieCoordinatorBuilder<F
         }
     }
 
-    private void copyHiveScript(FileSystem fs, Path scriptPath, String 
resource) throws IOException {
+    private void copyHiveScript(FileSystem fs, Path scriptPath,
+                                String resource) throws IOException {
         OutputStream out = null;
         InputStream in = null;
         try {
@@ -312,7 +313,7 @@ public class FeedReplicationCoordinatorBuilder extends 
OozieCoordinatorBuilder<F
     }
 
     protected void persistHiveConfiguration(FileSystem fs, Path confPath,
-        Cluster cluster, String prefix) throws IOException {
+                                            Cluster cluster, String prefix) 
throws IOException {
         Configuration hiveConf = getHiveCredentialsAsConf(cluster);
         OutputStream out = null;
         try {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
----------------------------------------------------------------------
diff --git 
a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java 
b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
index 8f97ffa..a38fdf6 100644
--- 
a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
+++ 
b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
@@ -120,7 +120,8 @@ public class ProcessBundleBuilder extends 
OozieBundleBuilder<Process> {
 
     private void copyUserWorkflow(Cluster cluster, Path buildPath) throws 
FalconException {
         try {
-            FileSystem fs = 
HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+                    ClusterHelper.getConfiguration(cluster));
 
             //Copy user workflow and lib to staging dir
             Map<String, String> checksums = UpdateHelper.checksumAndCopy(fs, 
new Path(entity.getWorkflow().getPath()),

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
----------------------------------------------------------------------
diff --git 
a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
 
b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
index 865beaf..2700802 100644
--- 
a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
+++ 
b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
@@ -32,6 +32,7 @@ import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Output;
 import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
 import org.apache.falcon.oozie.workflow.ACTION;
 import org.apache.falcon.oozie.workflow.CONFIGURATION;
@@ -221,7 +222,8 @@ public abstract class ProcessExecutionWorkflowBuilder 
extends OozieOrchestration
         }
 
         try {
-            final FileSystem fs = 
libPath.getFileSystem(ClusterHelper.getConfiguration(cluster));
+            final FileSystem fs = 
HadoopClientFactory.get().createProxiedFileSystem(
+                    ClusterHelper.getConfiguration(cluster));
             if (fs.isFile(libPath)) {  // File, not a Dir
                 archiveList.add(libPath.toString());
                 return;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
----------------------------------------------------------------------
diff --git 
a/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
 
b/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
index d273d61..9567c5f 100644
--- 
a/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
+++ 
b/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
@@ -28,7 +28,6 @@ import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.Interfacetype;
 import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.util.StartupProperties;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -73,40 +72,35 @@ public class SharedLibraryHostingService implements 
ConfigurationChangeListener
         Path lib = new Path(ClusterHelper.getLocation(cluster, "working"), 
"lib");
         Path libext = new Path(ClusterHelper.getLocation(cluster, "working"), 
"libext");
         try {
+            FileSystem fs = HadoopClientFactory.get().createFalconFileSystem(
+                    ClusterHelper.getConfiguration(cluster));
+
             Properties properties = StartupProperties.get();
-            pushLibsToHDFS(properties.getProperty("system.lib.location"), lib, 
cluster, NON_FALCON_JAR_FILTER);
-            pushLibsToHDFS(properties.getProperty("libext.paths"), libext, 
cluster, null);
-            pushLibsToHDFS(properties.getProperty("libext.feed.paths"),
-                    new Path(libext, EntityType.FEED.name()) , cluster, null);
-            
pushLibsToHDFS(properties.getProperty("libext.feed.replication.paths"),
-                    new Path(libext, EntityType.FEED.name() + "/replication"), 
cluster, null);
-            
pushLibsToHDFS(properties.getProperty("libext.feed.retention.paths"),
-                    new Path(libext, EntityType.FEED.name() + "/retention"), 
cluster, null);
-            pushLibsToHDFS(properties.getProperty("libext.process.paths"),
-                    new Path(libext, EntityType.PROCESS.name()) , cluster, 
null);
+            pushLibsToHDFS(fs, properties.getProperty("system.lib.location"), 
lib,
+                    NON_FALCON_JAR_FILTER);
+            pushLibsToHDFS(fs, properties.getProperty("libext.paths"), libext, 
null);
+            pushLibsToHDFS(fs, properties.getProperty("libext.feed.paths"),
+                    new Path(libext, EntityType.FEED.name()) , null);
+            pushLibsToHDFS(fs, 
properties.getProperty("libext.feed.replication.paths"),
+                    new Path(libext, EntityType.FEED.name() + "/replication"), 
null);
+            pushLibsToHDFS(fs, 
properties.getProperty("libext.feed.retention.paths"),
+                    new Path(libext, EntityType.FEED.name() + "/retention"), 
null);
+            pushLibsToHDFS(fs, properties.getProperty("libext.process.paths"),
+                    new Path(libext, EntityType.PROCESS.name()) , null);
         } catch (IOException e) {
             throw new FalconException("Failed to copy shared libs to cluster" 
+ cluster.getName(), e);
         }
     }
 
-    public static void pushLibsToHDFS(String src, Path target, Cluster 
cluster, FalconPathFilter pathFilter)
-        throws IOException, FalconException {
+    @SuppressWarnings("ConstantConditions")
+    public static void pushLibsToHDFS(FileSystem fs, String src, Path target,
+                                      FalconPathFilter pathFilter) throws 
IOException, FalconException {
         if (StringUtils.isEmpty(src)) {
             return;
         }
 
         LOG.debug("Copying libs from {}", src);
-        FileSystem fs;
-        try {
-            fs = getFileSystem(cluster);
-            fs.getConf().set("dfs.umaskmode", "022");  // drwxr-xr-x
-        } catch (Exception e) {
-            throw new FalconException("Unable to connect to HDFS: "
-                    + ClusterHelper.getStorageUrl(cluster), e);
-        }
-        if (!fs.exists(target) && !fs.mkdirs(target)) {
-            throw new FalconException("mkdir " + target + " failed");
-        }
+        createTargetPath(fs, target);
 
         for(String srcPaths : src.split(",")) {
             File srcFile = new File(srcPaths);
@@ -133,18 +127,20 @@ public class SharedLibraryHostingService implements 
ConfigurationChangeListener
                     }
                 }
                 fs.copyFromLocalFile(false, true, new 
Path(file.getAbsolutePath()), targetFile);
-                LOG.info("Copied {} to {} in {}", file.getAbsolutePath(), 
targetFile.toString(), fs.getUri());
+                fs.setPermission(targetFile, 
HadoopClientFactory.READ_EXECUTE_PERMISSION);
+                LOG.info("Copied {} to {} in {}",
+                        file.getAbsolutePath(), targetFile.toString(), 
fs.getUri());
             }
         }
     }
 
-    // the dir is owned by Falcon but world-readable
-    private static FileSystem getFileSystem(Cluster cluster)
-        throws FalconException, IOException {
-        Configuration conf = ClusterHelper.getConfiguration(cluster);
-        conf.setInt("ipc.client.connect.max.retries", 10);
-
-        return HadoopClientFactory.get().createFileSystem(conf);
+    private static void createTargetPath(FileSystem fs,
+                                         Path target) throws IOException, 
FalconException {
+        // the dir and files MUST be readable by all users
+        if (!fs.exists(target)
+                && !FileSystem.mkdirs(fs, target, 
HadoopClientFactory.READ_EXECUTE_PERMISSION)) {
+            throw new FalconException("mkdir " + target + " failed");
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java
----------------------------------------------------------------------
diff --git 
a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java
 
b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java
index bbed949..54cab51 100644
--- 
a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java
+++ 
b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java
@@ -58,7 +58,7 @@ public class OozieHouseKeepingService implements 
WorkflowEngineActionListener {
             LOG.info("Deleting entity path {} on cluster {}", entityPath, 
clusterName);
 
             Configuration conf = ClusterHelper.getConfiguration(cluster);
-            FileSystem fs = HadoopClientFactory.get().createFileSystem(conf);
+            FileSystem fs = 
HadoopClientFactory.get().createProxiedFileSystem(conf);
             if (fs.exists(entityPath) && !fs.delete(entityPath, true)) {
                 throw new FalconException("Unable to cleanup entity path: " + 
entityPath);
             }

Reply via email to