Repository: falcon Updated Branches: refs/heads/master 0df8e7f0a -> 6e50f31ad
FALCON-1647 Unable to create feed : FilePermission error under cluster staging directory. Contributed by Balu Vellanki. Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/6e50f31a Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/6e50f31a Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/6e50f31a Branch: refs/heads/master Commit: 6e50f31ad0c66357181f838059553527c8c7e1ab Parents: 0df8e7f Author: Ajay Yadava <[email protected]> Authored: Wed Dec 9 23:39:20 2015 +0530 Committer: Ajay Yadava <[email protected]> Committed: Wed Dec 9 23:39:20 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../entity/parser/ClusterEntityParser.java | 84 ++++++++++++-------- .../entity/parser/ClusterEntityParserTest.java | 14 ++++ 3 files changed, 65 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/6e50f31a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index f0aca7f..9ea8c79 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -60,6 +60,8 @@ Trunk (Unreleased) OPTIMIZATIONS BUG FIXES + FALCON-1647 Unable to create feed : FilePermission error under cluster staging directory(Balu Vellanki via Ajay Yadava) + FALCON-1651 Falcon doesn't start (Ajay Yadava) FALCON-1598 Flaky test : EntityManagerJerseyIT.testDuplicateDeleteCommands (Narayan Periwal via Pallavi Rao) http://git-wip-us.apache.org/repos/asf/falcon/blob/6e50f31a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java index b4f61d7..bef4b39 100644 --- a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java +++ b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java @@ -268,53 +268,22 @@ public class ClusterEntityParser extends EntityParser<Cluster> { Configuration conf = ClusterHelper.getConfiguration(cluster); FileSystem fs; try { - fs = HadoopClientFactory.get().createProxiedFileSystem(conf); + fs = HadoopClientFactory.get().createFalconFileSystem(conf); } catch (FalconException e) { throw new ValidationException("Unable to get file system handle for cluster " + cluster.getName(), e); } Location stagingLocation = ClusterHelper.getLocation(cluster, ClusterLocationType.STAGING); - if (stagingLocation == null) { throw new ValidationException( "Unable to find the mandatory location of name: " + ClusterLocationType.STAGING.value() + " for cluster " + cluster.getName()); } else { - checkPathOwnerAndPermission(cluster.getName(), stagingLocation.getPath(), fs, HadoopClientFactory.ALL_PERMISSION); - if (!ClusterHelper.checkWorkingLocationExists(cluster)) { //Creating location type of working in the sub dir of staging dir with perms 755. FALCON-910 - - Path workingDirPath = new Path(stagingLocation.getPath(), ClusterHelper.WORKINGDIR); - try { - if (!fs.exists(workingDirPath)) { //Checking if the staging dir has the working dir to be created - HadoopClientFactory.mkdirs(fs, workingDirPath, HadoopClientFactory.READ_EXECUTE_PERMISSION); - } else { - if (fs.isDirectory(workingDirPath)) { - FsPermission workingPerms = fs.getFileStatus(workingDirPath).getPermission(); - if (!workingPerms.equals(HadoopClientFactory.READ_EXECUTE_PERMISSION)) { //perms check - throw new ValidationException( - "Falcon needs subdir " + ClusterHelper.WORKINGDIR + " inside staging dir:" - + stagingLocation.getPath() - + " when staging location not specified with " - + HadoopClientFactory.READ_EXECUTE_PERMISSION.toString() + " got " - + workingPerms.toString()); - } - } else { - throw new ValidationException( - "Falcon needs subdir " + ClusterHelper.WORKINGDIR + " inside staging dir:" - + stagingLocation.getPath() - + " when staging location not specified. Got a file at " + workingDirPath - .toString()); - } - } - } catch (IOException e) { - throw new ValidationException( - "Unable to create path for " + workingDirPath.toString() + " with path: " - + workingDirPath.toString() + " for cluster " + cluster.getName(), e); - } + createWorkingDirUnderStaging(fs, cluster, stagingLocation); } else { Location workingLocation = ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING); if (stagingLocation.getPath().equals(workingLocation.getPath())) { @@ -323,15 +292,60 @@ public class ClusterEntityParser extends EntityParser<Cluster> { .getName().value() + " cannot have same path: " + stagingLocation.getPath() + " for cluster :" + cluster.getName()); } else { - checkPathOwnerAndPermission(cluster.getName(), workingLocation.getPath(), fs, HadoopClientFactory.READ_EXECUTE_PERMISSION); - } } + // Create staging subdirs falcon/workflows/feed and falcon/workflows/process : Falcon-1647 + createStagingSubdirs(fs, cluster, stagingLocation, + "falcon/workflows/feed", HadoopClientFactory.ALL_PERMISSION); + createStagingSubdirs(fs, cluster, stagingLocation, + "falcon/workflows/process", HadoopClientFactory.ALL_PERMISSION); + } + } + private void createWorkingDirUnderStaging(FileSystem fs, Cluster cluster, + Location stagingLocation) throws ValidationException { + Path workingDirPath = new Path(stagingLocation.getPath(), ClusterHelper.WORKINGDIR); + try { + if (!fs.exists(workingDirPath)) { //Checking if the staging dir has the working dir to be created + HadoopClientFactory.mkdirs(fs, workingDirPath, HadoopClientFactory.READ_EXECUTE_PERMISSION); + } else { + if (fs.isDirectory(workingDirPath)) { + FsPermission workingPerms = fs.getFileStatus(workingDirPath).getPermission(); + if (!workingPerms.equals(HadoopClientFactory.READ_EXECUTE_PERMISSION)) { //perms check + throw new ValidationException( + "Falcon needs subdir " + ClusterHelper.WORKINGDIR + " inside staging dir:" + + stagingLocation.getPath() + + " when staging location not specified with " + + HadoopClientFactory.READ_EXECUTE_PERMISSION.toString() + " got " + + workingPerms.toString()); + } + } else { + throw new ValidationException( + "Falcon needs subdir " + ClusterHelper.WORKINGDIR + " inside staging dir:" + + stagingLocation.getPath() + + " when staging location not specified. Got a file at " + workingDirPath + .toString()); + } + } + } catch (IOException e) { + throw new ValidationException( + "Unable to create path for " + workingDirPath.toString() + " with path: " + + workingDirPath.toString() + " for cluster " + cluster.getName(), e); } + } + private void createStagingSubdirs(FileSystem fs, Cluster cluster, Location stagingLocation, + String path, FsPermission permission) throws ValidationException { + Path subdirPath = new Path(stagingLocation.getPath(), path); + try { + HadoopClientFactory.mkdirs(fs, subdirPath, permission); + } catch (IOException e) { + throw new ValidationException( + "Unable to create path " + + subdirPath.toString() + " for cluster " + cluster.getName(), e); + } } protected void validateProperties(Cluster cluster) throws ValidationException { http://git-wip-us.apache.org/repos/asf/falcon/blob/6e50f31a/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java index cd61a8c..f98b6e4 100644 --- a/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java +++ b/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java @@ -36,6 +36,7 @@ import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.falcon.util.StartupProperties; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterClass; @@ -354,6 +355,19 @@ public class ClusterEntityParserTest extends AbstractTestBase { FileStatus workingDirStatus = this.dfsCluster.getFileSystem().getFileLinkStatus(new Path(workingDirPath)); Assert.assertTrue(workingDirStatus.isDirectory()); Assert.assertEquals(workingDirStatus.getPermission(), HadoopClientFactory.READ_EXECUTE_PERMISSION); + Assert.assertEquals(workingDirStatus.getOwner(), UserGroupInformation.getLoginUser().getShortUserName()); + + String stagingSubdirFeed = cluster.getLocations().getLocations().get(0).getPath() + "/falcon/workflows/feed"; + String stagingSubdirProcess = + cluster.getLocations().getLocations().get(0).getPath() + "/falcon/workflows/process"; + FileStatus stagingSubdirFeedStatus = + this.dfsCluster.getFileSystem().getFileLinkStatus(new Path(stagingSubdirFeed)); + FileStatus stagingSubdirProcessStatus = + this.dfsCluster.getFileSystem().getFileLinkStatus(new Path(stagingSubdirProcess)); + Assert.assertTrue(stagingSubdirFeedStatus.isDirectory()); + Assert.assertEquals(stagingSubdirFeedStatus.getPermission(), HadoopClientFactory.ALL_PERMISSION); + Assert.assertTrue(stagingSubdirProcessStatus.isDirectory()); + Assert.assertEquals(stagingSubdirProcessStatus.getPermission(), HadoopClientFactory.ALL_PERMISSION); } /**
