Repository: falcon Updated Branches: refs/heads/master d4800401a -> bc3d23bd0
FALCON-910 Better error messages when creating cluster's directories. Contributed by karan kumar Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/bc3d23bd Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/bc3d23bd Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/bc3d23bd Branch: refs/heads/master Commit: bc3d23bd05155092838273337ab6c84d1bcfd78f Parents: d480040 Author: Suhas Vasu <[email protected]> Authored: Thu Mar 19 12:26:13 2015 +0530 Committer: Suhas Vasu <[email protected]> Committed: Thu Mar 19 12:26:13 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 3 + client/src/main/resources/cluster-0.1.xsd | 14 +- .../org/apache/falcon/entity/ClusterHelper.java | 37 ++++- .../org/apache/falcon/entity/EntityUtil.java | 3 +- .../entity/parser/ClusterEntityParser.java | 149 +++++++++++++------ .../entity/parser/ClusterEntityParserTest.java | 105 +++++++++++-- docs/src/site/twiki/EntitySpecification.twiki | 13 +- .../falcon/regression/core/bundle/Bundle.java | 3 +- .../falcon/regression/core/util/BundleUtil.java | 5 +- .../apache/falcon/oozie/OozieBundleBuilder.java | 4 +- .../OozieOrchestrationWorkflowBuilder.java | 3 +- .../service/SharedLibraryHostingService.java | 5 +- .../feed/OozieFeedWorkflowBuilderTest.java | 5 +- .../falcon/oozie/process/AbstractTestBase.java | 9 +- .../OozieProcessWorkflowBuilderTest.java | 4 +- .../entity/filesystem/embedded-cluster.xml | 6 +- .../entity/filesystem/standalone-cluster.xml | 6 +- .../filesystem/standalone-target-cluster.xml | 6 +- .../falcon/cluster/util/EmbeddedCluster.java | 5 +- .../lifecycle/FileSystemFeedReplicationIT.java | 3 +- .../TableStorageFeedReplicationIT.java | 3 +- .../org/apache/falcon/process/PigProcessIT.java | 3 +- .../falcon/process/TableStorageProcessIT.java | 3 +- .../org/apache/falcon/resource/TestContext.java | 43 +++--- .../validation/ClusterEntityValidationIT.java | 40 +++-- 25 files changed, 350 insertions(+), 130 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b7a14f2..96cca70 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -10,6 +10,9 @@ Trunk (Unreleased) FALCON-822 Add reverse look up API (Ajay Yadava via Suhas Vasu) IMPROVEMENTS + FALCON-910 Better error messages when creating cluster's directories + (karan kumar via Suhas Vasu) + FALCON-1042 Misleading mesage received while performing touch operation on scheduled entity (Suhas Vasu) http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/client/src/main/resources/cluster-0.1.xsd ---------------------------------------------------------------------- diff --git a/client/src/main/resources/cluster-0.1.xsd b/client/src/main/resources/cluster-0.1.xsd index 6fd9de7..dd2c171 100644 --- a/client/src/main/resources/cluster-0.1.xsd +++ b/client/src/main/resources/cluster-0.1.xsd @@ -128,8 +128,9 @@ <xs:annotation> <xs:documentation> Location has the name and the path. - name: is the type of locations like - staging, temp and working. + name: is the type of locations which can be + staging, temp and working only. + staging is a mandatory type. path: the hdfs path for each location. Falcon would use the location to do intermediate processing of entities in hdfs and hence Falcon @@ -137,7 +138,7 @@ locations. </xs:documentation> </xs:annotation> - <xs:attribute type="IDENTIFIER" name="name" use="required"/> + <xs:attribute type="cluster-location-type" name="name" use="required"/> <xs:attribute type="xs:string" name="path" use="required"/> </xs:complexType> <xs:complexType name="interfaces"> @@ -200,4 +201,11 @@ <xs:attribute type="xs:string" name="group"/> <xs:attribute type="xs:string" name="permission" default="*"/> </xs:complexType> + <xs:simpleType name="cluster-location-type"> + <xs:restriction base="xs:string"> + <xs:enumeration value="staging"/> + <xs:enumeration value="working"/> + <xs:enumeration value="temp"/> + </xs:restriction> + </xs:simpleType> </xs:schema> http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java index 6a363b6..49d408f 100644 --- a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java +++ b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java @@ -22,6 +22,7 @@ import org.apache.falcon.FalconException; import org.apache.falcon.entity.store.ConfigurationStore; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.cluster.ClusterLocationType; import org.apache.falcon.entity.v0.cluster.Interface; import org.apache.falcon.entity.v0.cluster.Interfacetype; import org.apache.falcon.entity.v0.cluster.Location; @@ -39,6 +40,7 @@ import java.util.Map; */ public final class ClusterHelper { public static final String DEFAULT_BROKER_IMPL_CLASS = "org.apache.activemq.ActiveMQConnectionFactory"; + public static final String WORKINGDIR = "working"; private ClusterHelper() { } @@ -120,15 +122,44 @@ public final class ClusterHelper { return normalizedPath.substring(0, normalizedPath.length() - 1); } - public static String getLocation(Cluster cluster, String locationKey) { + public static Location getLocation(Cluster cluster, ClusterLocationType clusterLocationType) { for (Location loc : cluster.getLocations().getLocations()) { - if (loc.getName().equals(locationKey)) { - return loc.getPath(); + if (loc.getName().equals(clusterLocationType)) { + return loc; + } + } + //Mocking the working location FALCON-910 + if (clusterLocationType.equals(ClusterLocationType.WORKING)) { + Location staging = getLocation(cluster, ClusterLocationType.STAGING); + if (staging != null) { + Location working = new Location(); + working.setName(ClusterLocationType.WORKING); + working.setPath(staging.getPath().charAt(staging.getPath().length() - 1) == '/' + ? + staging.getPath().concat(WORKINGDIR) + : + staging.getPath().concat("/").concat(WORKINGDIR)); + return working; } } return null; } + /** + * Parsed the cluster object and checks for the working location. + * + * @param cluster + * @return + */ + public static boolean checkWorkingLocationExists(Cluster cluster) { + for (Location loc : cluster.getLocations().getLocations()) { + if (loc.getName().equals(ClusterLocationType.WORKING)) { + return true; + } + } + return false; + } + public static String getPropertyValue(Cluster cluster, String propName) { if (cluster.getProperties() != null) { for (Property prop : cluster.getProperties().getProperties()) { http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/common/src/main/java/org/apache/falcon/entity/EntityUtil.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java index c553d1e..3656ae4 100644 --- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java +++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java @@ -32,6 +32,7 @@ import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.Frequency; import org.apache.falcon.entity.v0.SchemaHelper; import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.cluster.ClusterLocationType; import org.apache.falcon.entity.v0.feed.ClusterType; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.process.*; @@ -574,7 +575,7 @@ public final class EntityUtil { //Each entity update creates a new staging path //Base staging path is the base path for all staging dirs public static Path getBaseStagingPath(org.apache.falcon.entity.v0.cluster.Cluster cluster, Entity entity) { - return new Path(ClusterHelper.getLocation(cluster, "staging"), + return new Path(ClusterHelper.getLocation(cluster, ClusterLocationType.STAGING).getPath(), "falcon/workflows/" + entity.getEntityType().name().toLowerCase() + "/" + entity.getName()); } http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java index 87b61a4..2af8c16 100644 --- a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java +++ b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java @@ -26,6 +26,7 @@ import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.cluster.ACL; import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.cluster.ClusterLocationType; import org.apache.falcon.entity.v0.cluster.Interface; import org.apache.falcon.entity.v0.cluster.Interfacetype; import org.apache.falcon.entity.v0.cluster.Location; @@ -154,8 +155,8 @@ public class ClusterEntityParser extends EntityParser<Cluster> { protected void validateMessagingInterface(Cluster cluster) throws ValidationException { final String messagingUrl = ClusterHelper.getMessageBrokerUrl(cluster); - final String implementation = StartupProperties.get().getProperty( - "broker.impl.class", "org.apache.activemq.ActiveMQConnectionFactory"); + final String implementation = StartupProperties.get().getProperty("broker.impl.class", + "org.apache.activemq.ActiveMQConnectionFactory"); LOG.info("Validating messaging interface: {}, implementation: {}", messagingUrl, implementation); try { @@ -243,58 +244,120 @@ public class ClusterEntityParser extends EntityParser<Cluster> { try { fs = HadoopClientFactory.get().createProxiedFileSystem(conf); } catch (FalconException e) { - throw new ValidationException( - "Unable to get file system handle for cluster " + cluster.getName(), e); + throw new ValidationException("Unable to get file system handle for cluster " + cluster.getName(), e); } - for (Location location : cluster.getLocations().getLocations()) { - final String locationName = location.getName(); - if (locationName.equals("temp")) { - continue; - } + Location stagingLocation = ClusterHelper.getLocation(cluster, ClusterLocationType.STAGING); - try { - checkPathOwnerAndPermission(cluster.getName(), location.getPath(), fs, - "staging".equals(locationName) - ? HadoopClientFactory.ALL_PERMISSION - : HadoopClientFactory.READ_EXECUTE_PERMISSION); - } catch (IOException e) { - throw new ValidationException("Unable to validate the location of name: " - + location.getName() + " with path:" + location.getPath() - + " for cluster " + cluster.getName(), e); + if (stagingLocation == null) { + throw new ValidationException( + "Unable to find the mandatory location of name: " + ClusterLocationType.STAGING.value() + + " for cluster " + cluster.getName()); + } else { + + checkPathOwnerAndPermission(cluster.getName(), stagingLocation.getPath(), fs, + HadoopClientFactory.READ_EXECUTE_PERMISSION, false); + + if (!ClusterHelper.checkWorkingLocationExists(cluster)) { + //Creating location type of working in the sub dir of staging dir with perms 755. FALCON-910 + + Path workingDirPath = new Path(stagingLocation.getPath(), ClusterHelper.WORKINGDIR); + try { + if (!fs.exists(workingDirPath)) { //Checking if the staging dir has the working dir to be created + HadoopClientFactory.mkdirs(fs, workingDirPath, HadoopClientFactory.READ_EXECUTE_PERMISSION); + } else { + if (fs.isDirectory(workingDirPath)) { + FsPermission workingPerms = fs.getFileStatus(workingDirPath).getPermission(); + if (!workingPerms.equals(HadoopClientFactory.READ_EXECUTE_PERMISSION)) { //perms check + throw new ValidationException( + "Falcon needs subdir " + ClusterHelper.WORKINGDIR + " inside staging dir:" + + stagingLocation.getPath() + + " when staging location not specified with " + + HadoopClientFactory.READ_EXECUTE_PERMISSION.toString() + " got " + + workingPerms.toString()); + } + } else { + throw new ValidationException( + "Falcon needs subdir " + ClusterHelper.WORKINGDIR + " inside staging dir:" + + stagingLocation.getPath() + + " when staging location not specified. Got a file at " + workingDirPath + .toString()); + } + } + } catch (IOException e) { + throw new ValidationException( + "Unable to create path for " + workingDirPath.toString() + " with path: " + + workingDirPath.toString() + " for cluster " + cluster.getName(), e); + } + } else { + Location workingLocation = ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING); + if (stagingLocation.getPath().equals(workingLocation.getPath())) { + throw new ValidationException( + "Location with name: " + stagingLocation.getName().value() + " and " + workingLocation + .getName().value() + " cannot of same path: " + stagingLocation.getPath() + + " for cluster :" + cluster.getName()); + } else { + + checkPathOwnerAndPermission(cluster.getName(), workingLocation.getPath(), fs, + HadoopClientFactory.READ_EXECUTE_PERMISSION, true); + + } } + } + } private void checkPathOwnerAndPermission(String clusterName, String location, FileSystem fs, - FsPermission expectedPermission) - throws IOException, ValidationException { + FsPermission expectedPermission, Boolean exactPerms) throws ValidationException { Path locationPath = new Path(location); - if (!fs.exists(locationPath)) { - throw new ValidationException("Location " + location - + " for cluster " + clusterName + " must exist."); - } - - // falcon owns this path on each cluster - final String loginUser = UserGroupInformation.getLoginUser().getShortUserName(); - FileStatus fileStatus = fs.getFileStatus(locationPath); - final String locationOwner = fileStatus.getOwner(); - if (!locationOwner.equals(loginUser)) { - LOG.error("Location {} has owner {}, should be the process user {}", - locationPath, locationOwner, loginUser); - throw new ValidationException("Path [" + locationPath + "] has owner [" + locationOwner - + "], should be the process user " + loginUser); - } + try { + if (!fs.exists(locationPath)) { + throw new ValidationException("Location " + location + " for cluster " + clusterName + " must exist."); + } - if (fileStatus.getPermission().toShort() != expectedPermission.toShort()) { - LOG.error("Location {} has permissions {}, should be {}", - locationPath, fileStatus.getPermission(), expectedPermission); - throw new ValidationException("Path " + locationPath + " has permissions: " - + fileStatus.getPermission() + ", should be " + expectedPermission); + // falcon owns this path on each cluster + final String loginUser = UserGroupInformation.getLoginUser().getShortUserName(); + FileStatus fileStatus = fs.getFileStatus(locationPath); + final String locationOwner = fileStatus.getOwner(); + if (!locationOwner.equals(loginUser)) { + LOG.error("Location {} has owner {}, should be the process user {}", locationPath, locationOwner, + loginUser); + throw new ValidationException( + "Path [" + locationPath + "] has owner [" + locationOwner + "], should be the process user " + + loginUser); + } + String errorMessage = "Path " + locationPath + " has permissions: " + fileStatus.getPermission().toString() + + ", should be " + expectedPermission; + if (exactPerms) { + if (fileStatus.getPermission().toShort() != expectedPermission.toShort()) { + LOG.error(errorMessage); + throw new ValidationException(errorMessage); + } + } else { + if (expectedPermission.getUserAction().ordinal() > fileStatus.getPermission().getUserAction() + .ordinal()) { + LOG.error(errorMessage + " at least"); + throw new ValidationException(errorMessage + " at least"); + } + if (expectedPermission.getGroupAction().ordinal() > fileStatus.getPermission().getGroupAction() + .ordinal()) { + LOG.error(errorMessage + " at least"); + throw new ValidationException(errorMessage + " at least"); + } + if (expectedPermission.getOtherAction().ordinal() > fileStatus.getPermission().getOtherAction() + .ordinal()) { + LOG.error(errorMessage + " at least"); + throw new ValidationException(errorMessage + " at least"); + } + } + // try to list to see if the user is able to write to this folder + fs.listStatus(locationPath); + } catch (IOException e) { + throw new ValidationException( + "Unable to validate the location with path: " + location + " for cluster:" + clusterName + + " due to transient failures ", e); } - - // try to list to see if the user is able to write to this folder - fs.listStatus(locationPath); } } http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java index a807e80..5085b24 100644 --- a/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java +++ b/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java @@ -25,12 +25,15 @@ import org.apache.falcon.entity.AbstractTestBase; import org.apache.falcon.entity.ClusterHelper; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.cluster.ClusterLocationType; import org.apache.falcon.entity.v0.cluster.Interface; import org.apache.falcon.entity.v0.cluster.Interfacetype; import org.apache.falcon.entity.v0.cluster.Location; import org.apache.falcon.entity.v0.cluster.Locations; import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.falcon.util.StartupProperties; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterClass; @@ -80,7 +83,8 @@ public class ClusterEntityParserTest extends AbstractTestBase { Assert.assertEquals(workflow.getEndpoint(), "http://localhost:11000/oozie/"); Assert.assertEquals(workflow.getVersion(), "4.0"); - Assert.assertEquals(ClusterHelper.getLocation(cluster, "staging"), "/projects/falcon/staging"); + Assert.assertEquals(ClusterHelper.getLocation(cluster, ClusterLocationType.STAGING).getPath(), + "/projects/falcon/staging"); StringWriter stringWriter = new StringWriter(); Marshaller marshaller = EntityType.CLUSTER.getMarshaller(); @@ -91,7 +95,8 @@ public class ClusterEntityParserTest extends AbstractTestBase { Assert.assertEquals(catalog.getEndpoint(), "http://localhost:48080/templeton/v1"); Assert.assertEquals(catalog.getVersion(), "0.11.0"); - Assert.assertEquals(ClusterHelper.getLocation(cluster, "staging"), "/projects/falcon/staging"); + Assert.assertEquals(ClusterHelper.getLocation(cluster, ClusterLocationType.STAGING).getPath(), + "/projects/falcon/staging"); } @Test @@ -195,19 +200,24 @@ public class ClusterEntityParserTest extends AbstractTestBase { } /** - * A lightweight unit test for a cluster where location paths are not instantiated . + * A lightweight unit test for a cluster where location type staging is missing. * Extensive tests are found in ClusterEntityValidationIT. * * @throws ValidationException */ - @Test(expectedExceptions = ValidationException.class) - public void testClusterWithoutLocationsPaths() throws ValidationException { - ClusterEntityParser clusterEntityParser = Mockito.spy( - (ClusterEntityParser) EntityParserFactory.getParser(EntityType.CLUSTER)); - Cluster cluster = this.dfsCluster.getCluster(); + @Test(expectedExceptions = ValidationException.class) public void testClusterWithoutStaging() throws Exception { + ClusterEntityParser clusterEntityParser = Mockito + .spy((ClusterEntityParser) EntityParserFactory.getParser(EntityType.CLUSTER)); + Cluster cluster = (Cluster) this.dfsCluster.getCluster().copy(); Mockito.doNothing().when(clusterEntityParser).validateWorkflowInterface(cluster); Mockito.doNothing().when(clusterEntityParser).validateMessagingInterface(cluster); Mockito.doNothing().when(clusterEntityParser).validateRegistryInterface(cluster); + Location location = new Location(); + location.setName(ClusterLocationType.WORKING); + location.setPath("/apps/non/existent/path"); + Locations locations = new Locations(); + locations.getLocations().add(location); + cluster.setLocations(locations); clusterEntityParser.validate(cluster); Assert.fail("Should have thrown a validation exception"); } @@ -219,12 +229,12 @@ public class ClusterEntityParserTest extends AbstractTestBase { * @throws ValidationException */ @Test(expectedExceptions = ValidationException.class) - public void testClusterWithInvalidLocationsPaths() throws ValidationException { - ClusterEntityParser clusterEntityParser = Mockito.spy( - (ClusterEntityParser) EntityParserFactory.getParser(EntityType.CLUSTER)); - Cluster cluster = this.dfsCluster.getCluster(); + public void testClusterWithInvalidLocationsPaths() throws Exception { + ClusterEntityParser clusterEntityParser = Mockito + .spy((ClusterEntityParser) EntityParserFactory.getParser(EntityType.CLUSTER)); + Cluster cluster = (Cluster)this.dfsCluster.getCluster().copy(); Location location = new Location(); - location.setName("TestName"); + location.setName(ClusterLocationType.STAGING); location.setPath("/apps/non/existent/path"); Locations locations = new Locations(); locations.getLocations().add(location); @@ -235,14 +245,79 @@ public class ClusterEntityParserTest extends AbstractTestBase { try { clusterEntityParser.validate(cluster); } catch (ValidationException e) { - String errorMessage = "Location " + location.getPath() + " for cluster " - + cluster.getName() + " must exist."; + String errorMessage = + "Location " + location.getPath() + " for cluster " + cluster.getName() + " must exist."; Assert.assertEquals(e.getMessage(), errorMessage); throw e; } Assert.fail("Should have thrown a validation exception"); } + /** + * A lightweight unit test for a cluster where location paths are same. + * Extensive tests are found in ClusterEntityValidationIT. + * + * @throws ValidationException + */ + @Test(expectedExceptions = ValidationException.class) + public void testClusterWithSameWorkingAndStaging() throws Exception { + ClusterEntityParser clusterEntityParser = Mockito + .spy((ClusterEntityParser) EntityParserFactory.getParser(EntityType.CLUSTER)); + Cluster cluster = (Cluster) this.dfsCluster.getCluster().copy(); + cluster.getLocations().getLocations().get(1).setPath("/projects/falcon/staging"); + Mockito.doNothing().when(clusterEntityParser).validateWorkflowInterface(cluster); + Mockito.doNothing().when(clusterEntityParser).validateMessagingInterface(cluster); + Mockito.doNothing().when(clusterEntityParser).validateRegistryInterface(cluster); + clusterEntityParser.validate(cluster); + Assert.fail("Should have thrown a validation exception"); + } + + /** + * A lightweight unit test for a cluster where location type working is missing. + * It should automatically get generated + * Extensive tests are found in ClusterEntityValidationIT. + */ + @Test public void testClusterWithOnlyStaging() throws Exception { + ClusterEntityParser clusterEntityParser = Mockito + .spy((ClusterEntityParser) EntityParserFactory.getParser(EntityType.CLUSTER)); + Cluster cluster = (Cluster) this.dfsCluster.getCluster().copy(); + cluster.getLocations().getLocations().remove(1); + Mockito.doNothing().when(clusterEntityParser).validateWorkflowInterface(cluster); + Mockito.doNothing().when(clusterEntityParser).validateMessagingInterface(cluster); + Mockito.doNothing().when(clusterEntityParser).validateRegistryInterface(cluster); + clusterEntityParser.validate(cluster); + String workingDirPath = cluster.getLocations().getLocations().get(0).getPath() + "/working"; + Assert.assertEquals(ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath(), workingDirPath); + FileStatus workingDirStatus = this.dfsCluster.getFileSystem().getFileLinkStatus(new Path(workingDirPath)); + Assert.assertTrue(workingDirStatus.isDirectory()); + Assert.assertEquals(workingDirStatus.getPermission(), HadoopClientFactory.READ_EXECUTE_PERMISSION); + + } + + /** + * A lightweight unit test for a cluster where location working is not there and staging + * has a subdir which will be used by cluster as working. + * Checking for wrong perms of this subdir + * Extensive tests are found in ClusterEntityValidationIT. + * + * @throws ValidationException + */ + @Test(expectedExceptions = ValidationException.class) + public void testClusterWithSubdirInStaging() throws Exception { + ClusterEntityParser clusterEntityParser = Mockito + .spy((ClusterEntityParser) EntityParserFactory.getParser(EntityType.CLUSTER)); + Cluster cluster = (Cluster) this.dfsCluster.getCluster().copy(); + cluster.getLocations().getLocations().get(1).setPath("/projects/falcon/staging"); + cluster.getLocations().getLocations().remove(1); + HadoopClientFactory.mkdirs(this.dfsCluster.getFileSystem(), + new Path(ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath()), + HadoopClientFactory.ALL_PERMISSION); + Mockito.doNothing().when(clusterEntityParser).validateWorkflowInterface(cluster); + Mockito.doNothing().when(clusterEntityParser).validateMessagingInterface(cluster); + Mockito.doNothing().when(clusterEntityParser).validateRegistryInterface(cluster); + clusterEntityParser.validate(cluster); + Assert.fail("Should have thrown a validation exception"); + } @BeforeClass public void init() throws Exception { http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/docs/src/site/twiki/EntitySpecification.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/EntitySpecification.twiki b/docs/src/site/twiki/EntitySpecification.twiki index 66fcd2f..86df6d4 100644 --- a/docs/src/site/twiki/EntitySpecification.twiki +++ b/docs/src/site/twiki/EntitySpecification.twiki @@ -63,16 +63,17 @@ A messaging interface specifies the interface for sending feed availability mess A cluster has a list of locations defined: <verbatim> <location name="staging" path="/projects/falcon/staging" /> -<location name="working" path="/projects/falcon/working" /> +<location name="working" path="/projects/falcon/working" /> <!--optional--> </verbatim> -Location has the name and the path, name is the type of locations like staging, temp and working. -and path is the hdfs path for each location. +Location has the name and the path, name is the type of locations .Allowed values of name are staging, temp and working. +Path is the hdfs path for each location. Falcon would use the location to do intermediate processing of entities in hdfs and hence Falcon should have read/write/execute permission on these locations. These locations MUST be created prior to submitting a cluster entity to Falcon. -*staging* must have 777 permissions and the parent dirs must have execute permissions so multiple -users can write to this location -*working* must have 755 permissions and the parent dirs must have execute permissions so multiple +*staging* should have atleast 755 permissions and is a mandatory location .The parent dirs must have execute permissions so multiple +users can write to this location. *working* must have 755 permissions and is a optional location. +If *working* is not specified, falcon creates a sub directory in the *staging* location with 755 perms. +The parent dir for *working* must have execute permissions so multiple users can read from this location ---+++ ACL http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java index 0dfd895..876ffa4 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java @@ -21,6 +21,7 @@ package org.apache.falcon.regression.core.bundle; import org.apache.commons.lang.StringUtils; import org.apache.falcon.entity.v0.Frequency; import org.apache.falcon.entity.v0.Frequency.TimeUnit; +import org.apache.falcon.entity.v0.cluster.ClusterLocationType; import org.apache.falcon.entity.v0.cluster.Interface; import org.apache.falcon.entity.v0.cluster.Interfaces; import org.apache.falcon.entity.v0.cluster.Interfacetype; @@ -685,7 +686,7 @@ public class Bundle { public void setCLusterWorkingPath(String clusterData, String path) { ClusterMerlin c = new ClusterMerlin(clusterData); for (int i = 0; i < c.getLocations().getLocations().size(); i++) { - if (c.getLocations().getLocations().get(i).getName().contains("working")) { + if (c.getLocations().getLocations().get(i).getName().equals(ClusterLocationType.WORKING)) { c.getLocations().getLocations().get(i).setPath(path); } } http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java index 4218815..c53d927 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java @@ -21,6 +21,7 @@ package org.apache.falcon.regression.core.util; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; +import org.apache.falcon.entity.v0.cluster.ClusterLocationType; import org.apache.falcon.entity.v0.cluster.Location; import org.apache.falcon.entity.v0.cluster.Property; import org.apache.falcon.regression.Entities.ClusterMerlin; @@ -115,11 +116,11 @@ public final class BundleUtil { //set staging and working locations clusterMerlin.getLocations().getLocations().clear(); final Location staging = new Location(); - staging.setName("staging"); + staging.setName(ClusterLocationType.STAGING); staging.setPath(MerlinConstants.STAGING_LOCATION); clusterMerlin.getLocations().getLocations().add(staging); final Location working = new Location(); - working.setName("working"); + working.setName(ClusterLocationType.WORKING); working.setPath(MerlinConstants.WORKING_LOCATION); clusterMerlin.getLocations().getLocations().add(working); final String protectionPropName = "hadoop.rpc.protection"; http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java index de11df6..03063f4 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java @@ -24,6 +24,7 @@ import org.apache.falcon.entity.ClusterHelper; import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.cluster.ClusterLocationType; import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.falcon.oozie.bundle.BUNDLEAPP; import org.apache.falcon.oozie.bundle.CONFIGURATION; @@ -122,7 +123,8 @@ public abstract class OozieBundleBuilder<T extends Entity> extends OozieEntityBu properties.setProperty(OozieClient.USER_NAME, CurrentUser.getUser()); properties.setProperty(OozieClient.USE_SYSTEM_LIBPATH, "true"); - properties.setProperty("falcon.libpath", ClusterHelper.getLocation(cluster, "working") + "/lib"); + properties.setProperty("falcon.libpath", + ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath() + "/lib"); if (EntityUtil.isTableStorageType(cluster, entity)) { Tag tag = EntityUtil.getWorkflowNameTag(coordName, entity); http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java index 3186c4a..49f9e07 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java @@ -26,6 +26,7 @@ import org.apache.falcon.entity.ClusterHelper; import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.cluster.ClusterLocationType; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.hadoop.HadoopClientFactory; @@ -209,7 +210,7 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend } protected void addLibExtensionsToWorkflow(Cluster cluster, WORKFLOWAPP wf, Tag tag) throws FalconException { - String libext = ClusterHelper.getLocation(cluster, "working") + "/libext"; + String libext = ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath() + "/libext"; FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem( ClusterHelper.getConfiguration(cluster)); try { http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java b/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java index 9567c5f..ba87a62 100644 --- a/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java +++ b/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java @@ -25,6 +25,7 @@ import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.cluster.ClusterLocationType; import org.apache.falcon.entity.v0.cluster.Interfacetype; import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.falcon.util.StartupProperties; @@ -69,8 +70,8 @@ public class SharedLibraryHostingService implements ConfigurationChangeListener }; private void addLibsTo(Cluster cluster) throws FalconException { - Path lib = new Path(ClusterHelper.getLocation(cluster, "working"), "lib"); - Path libext = new Path(ClusterHelper.getLocation(cluster, "working"), "libext"); + Path lib = new Path(ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath(), "lib"); + Path libext = new Path(ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath(), "libext"); try { FileSystem fs = HadoopClientFactory.get().createFalconFileSystem( ClusterHelper.getConfiguration(cluster)); http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java ---------------------------------------------------------------------- diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java index 723f909..48449d4 100644 --- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java +++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java @@ -31,6 +31,7 @@ import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.SchemaHelper; import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.cluster.ClusterLocationType; import org.apache.falcon.entity.v0.cluster.Interfacetype; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.hadoop.HadoopClientFactory; @@ -714,14 +715,14 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { } private void verifyClusterLocationsUMask(Cluster aCluster, FileSystem fs) throws IOException { - String stagingLocation = ClusterHelper.getLocation(aCluster, "staging"); + String stagingLocation = ClusterHelper.getLocation(aCluster, ClusterLocationType.STAGING).getPath(); Path stagingPath = new Path(stagingLocation); if (fs.exists(stagingPath)) { FileStatus fileStatus = fs.getFileStatus(stagingPath); Assert.assertEquals(fileStatus.getPermission().toShort(), 511); } - String workingLocation = ClusterHelper.getLocation(aCluster, "working"); + String workingLocation = ClusterHelper.getLocation(aCluster, ClusterLocationType.WORKING).getPath(); Path workingPath = new Path(workingLocation); if (fs.exists(workingPath)) { FileStatus fileStatus = fs.getFileStatus(workingPath); http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java ---------------------------------------------------------------------- diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java b/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java index b549cfb..6488682 100644 --- a/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java +++ b/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java @@ -26,6 +26,7 @@ import org.apache.falcon.entity.store.ConfigurationStore; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.cluster.ClusterLocationType; import org.apache.falcon.entity.v0.cluster.Interfacetype; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.process.Process; @@ -75,10 +76,10 @@ public class AbstractTestBase { if (writeEndpoint != null) { ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(writeEndpoint); FileSystem fs = new Path(writeEndpoint).getFileSystem(EmbeddedCluster.newConfiguration()); - fs.create( - new Path(ClusterHelper.getLocation(cluster, "working"), "libext/FEED/retention/ext.jar")).close(); - fs.create( - new Path(ClusterHelper.getLocation(cluster, "working"), "libext/FEED/replication/ext.jar")).close(); + fs.create(new Path(ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath(), + "libext/FEED/retention/ext.jar")).close(); + fs.create(new Path(ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath(), + "libext/FEED/replication/ext.jar")).close(); } return cluster; http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java ---------------------------------------------------------------------- diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java index ef21f4d..545beb1 100644 --- a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java +++ b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java @@ -32,6 +32,7 @@ import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.Frequency; import org.apache.falcon.entity.v0.SchemaHelper; import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.cluster.ClusterLocationType; import org.apache.falcon.entity.v0.cluster.Interfacetype; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.feed.LocationType; @@ -123,7 +124,8 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase { ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(hdfsUrl); ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY).setEndpoint("thrift://localhost:49083"); fs = new Path(hdfsUrl).getFileSystem(EmbeddedCluster.newConfiguration()); - fs.create(new Path(ClusterHelper.getLocation(cluster, "working"), "libext/PROCESS/ext.jar")).close(); + fs.create(new Path(ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath(), + "libext/PROCESS/ext.jar")).close(); Process process = store.get(EntityType.PROCESS, "clicksummary"); Path wfpath = new Path(process.getWorkflow().getPath()); http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/src/main/examples/entity/filesystem/embedded-cluster.xml ---------------------------------------------------------------------- diff --git a/src/main/examples/entity/filesystem/embedded-cluster.xml b/src/main/examples/entity/filesystem/embedded-cluster.xml index effcbd5..c505066 100644 --- a/src/main/examples/entity/filesystem/embedded-cluster.xml +++ b/src/main/examples/entity/filesystem/embedded-cluster.xml @@ -42,9 +42,9 @@ <interface type="messaging" endpoint="tcp://localhost:61616?daemon=true" version="5.4.3"/> </interfaces> <locations> - <location name="staging" path="/projects/falcon/staging"/> - <location name="temp" path="/projects/falcon/tmp"/> - <location name="working" path="/projects/falcon/working"/> + <location name="staging" path="/projects/falcon/staging"/> <!--mandatory--> + <location name="temp" path="/projects/falcon/tmp"/> <!--optional--> + <location name="working" path="/projects/falcon/working"/> <!--optional--> </locations> <properties> </properties> http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/src/main/examples/entity/filesystem/standalone-cluster.xml ---------------------------------------------------------------------- diff --git a/src/main/examples/entity/filesystem/standalone-cluster.xml b/src/main/examples/entity/filesystem/standalone-cluster.xml index 6fe4df3..4f8a5fc 100644 --- a/src/main/examples/entity/filesystem/standalone-cluster.xml +++ b/src/main/examples/entity/filesystem/standalone-cluster.xml @@ -34,9 +34,9 @@ <interface type="messaging" endpoint="tcp://localhost:61616?daemon=true" version="5.4.3"/> </interfaces> <locations> - <location name="staging" path="/projects/falcon/staging"/> - <location name="temp" path="/projects/falcon/tmp"/> - <location name="working" path="/projects/falcon/working"/> + <location name="staging" path="/projects/falcon/staging"/> <!--mandatory--> + <location name="temp" path="/projects/falcon/tmp"/> <!--optional--> + <location name="working" path="/projects/falcon/working"/> <!--optional--> </locations> <properties> </properties> http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/src/main/examples/entity/filesystem/standalone-target-cluster.xml ---------------------------------------------------------------------- diff --git a/src/main/examples/entity/filesystem/standalone-target-cluster.xml b/src/main/examples/entity/filesystem/standalone-target-cluster.xml index 442449d..cb5c4c6 100644 --- a/src/main/examples/entity/filesystem/standalone-target-cluster.xml +++ b/src/main/examples/entity/filesystem/standalone-target-cluster.xml @@ -34,9 +34,9 @@ <interface type="messaging" endpoint="tcp://localhost:61616?daemon=true" version="5.4.3"/> </interfaces> <locations> - <location name="staging" path="/projects/falcon/staging-target"/> - <location name="temp" path="/projects/falcon/tmp-target"/> - <location name="working" path="/projects/falcon/working-target"/> + <location name="staging" path="/projects/falcon/staging-target"/> <!--mandatory--> + <location name="temp" path="/projects/falcon/tmp-target"/> <!--optional--> + <location name="working" path="/projects/falcon/working-target"/> <!--optional--> </locations> <properties> </properties> http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java ---------------------------------------------------------------------- diff --git a/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java b/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java index c67b8fc..f410b21 100644 --- a/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java +++ b/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java @@ -19,6 +19,7 @@ package org.apache.falcon.cluster.util; import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.cluster.ClusterLocationType; import org.apache.falcon.entity.v0.cluster.Interface; import org.apache.falcon.entity.v0.cluster.Interfaces; import org.apache.falcon.entity.v0.cluster.Interfacetype; @@ -121,12 +122,12 @@ public class EmbeddedCluster { clusterEntity.setInterfaces(interfaces); Location location = new Location(); - location.setName("staging"); + location.setName(ClusterLocationType.STAGING); location.setPath("/projects/falcon/staging"); Locations locs = new Locations(); locs.getLocations().add(location); location = new Location(); - location.setName("working"); + location.setName(ClusterLocationType.WORKING); location.setPath("/projects/falcon/working"); locs.getLocations().add(location); clusterEntity.setLocations(locs); http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/webapp/src/test/java/org/apache/falcon/lifecycle/FileSystemFeedReplicationIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/lifecycle/FileSystemFeedReplicationIT.java b/webapp/src/test/java/org/apache/falcon/lifecycle/FileSystemFeedReplicationIT.java index 9383026..d7f36da 100644 --- a/webapp/src/test/java/org/apache/falcon/lifecycle/FileSystemFeedReplicationIT.java +++ b/webapp/src/test/java/org/apache/falcon/lifecycle/FileSystemFeedReplicationIT.java @@ -20,6 +20,7 @@ package org.apache.falcon.lifecycle; import org.apache.falcon.entity.ClusterHelper; import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.cluster.ClusterLocationType; import org.apache.falcon.resource.APIResult; import org.apache.falcon.resource.InstancesResult; import org.apache.falcon.resource.TestContext; @@ -96,7 +97,7 @@ public class FileSystemFeedReplicationIT { private void copyLibsToHDFS(Cluster cluster) throws IOException { // set up kahadb to be sent as part of workflows StartupProperties.get().setProperty("libext.paths", "./target/libext"); - String libext = ClusterHelper.getLocation(cluster, "working") + "/libext"; + String libext = ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath() + "/libext"; String targetStorageUrl = ClusterHelper.getStorageUrl(cluster); FSUtils.copyOozieShareLibsToHDFS("./target/libext", targetStorageUrl + libext); } http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedReplicationIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedReplicationIT.java b/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedReplicationIT.java index 55610b7..13bc480 100644 --- a/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedReplicationIT.java +++ b/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedReplicationIT.java @@ -20,6 +20,7 @@ package org.apache.falcon.lifecycle; import org.apache.falcon.entity.ClusterHelper; import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.cluster.ClusterLocationType; import org.apache.falcon.entity.v0.cluster.Interfacetype; import org.apache.falcon.resource.APIResult; import org.apache.falcon.resource.InstancesResult; @@ -112,7 +113,7 @@ public class TableStorageFeedReplicationIT { private void copyLibsToHDFS(Cluster cluster) throws IOException { // set up kahadb to be sent as part of workflows StartupProperties.get().setProperty("libext.paths", "./target/libext"); - String libext = ClusterHelper.getLocation(cluster, "working") + "/libext"; + String libext = ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath() + "/libext"; String targetStorageUrl = ClusterHelper.getStorageUrl(cluster); FSUtils.copyOozieShareLibsToHDFS("./target/libext", targetStorageUrl + libext); } http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/webapp/src/test/java/org/apache/falcon/process/PigProcessIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/process/PigProcessIT.java b/webapp/src/test/java/org/apache/falcon/process/PigProcessIT.java index edbe32a..575b870 100644 --- a/webapp/src/test/java/org/apache/falcon/process/PigProcessIT.java +++ b/webapp/src/test/java/org/apache/falcon/process/PigProcessIT.java @@ -20,6 +20,7 @@ package org.apache.falcon.process; import org.apache.falcon.entity.ClusterHelper; import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.cluster.ClusterLocationType; import org.apache.falcon.resource.APIResult; import org.apache.falcon.resource.InstancesResult; import org.apache.falcon.resource.TestContext; @@ -80,7 +81,7 @@ public class PigProcessIT { private void copyLibsToHDFS(Cluster cluster, String storageUrl) throws IOException { // set up kahadb to be sent as part of workflows StartupProperties.get().setProperty("libext.paths", "./target/libext"); - String libext = ClusterHelper.getLocation(cluster, "working") + "/libext"; + String libext = ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath() + "/libext"; FSUtils.copyOozieShareLibsToHDFS("./target/libext", storageUrl + libext); } http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/webapp/src/test/java/org/apache/falcon/process/TableStorageProcessIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/process/TableStorageProcessIT.java b/webapp/src/test/java/org/apache/falcon/process/TableStorageProcessIT.java index 9cb6850..2e882ea 100644 --- a/webapp/src/test/java/org/apache/falcon/process/TableStorageProcessIT.java +++ b/webapp/src/test/java/org/apache/falcon/process/TableStorageProcessIT.java @@ -20,6 +20,7 @@ package org.apache.falcon.process; import org.apache.falcon.entity.ClusterHelper; import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.cluster.ClusterLocationType; import org.apache.falcon.entity.v0.cluster.Interfacetype; import org.apache.falcon.resource.APIResult; import org.apache.falcon.resource.InstancesResult; @@ -100,7 +101,7 @@ public class TableStorageProcessIT { private void copyLibsToHDFS(Cluster cluster, String storageUrl) throws IOException { // set up kahadb to be sent as part of workflows StartupProperties.get().setProperty("libext.paths", "./target/libext"); - String libext = ClusterHelper.getLocation(cluster, "working") + "/libext"; + String libext = ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath() + "/libext"; FSUtils.copyOozieShareLibsToHDFS("./target/libext", storageUrl + libext); } http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/webapp/src/test/java/org/apache/falcon/resource/TestContext.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java index 593079a..7b227b3 100644 --- a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java +++ b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java @@ -38,6 +38,7 @@ import org.apache.falcon.entity.store.ConfigurationStore; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.SchemaHelper; import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.cluster.ClusterLocationType; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.falcon.security.CurrentUser; @@ -315,34 +316,39 @@ public class TestContext { return submitFileToFalcon(entityType, tmpFile); } - public static void deleteClusterLocations(Cluster clusterEntity, - FileSystem fs) throws IOException { - String stagingLocation = ClusterHelper.getLocation(clusterEntity, "staging"); + public static void deleteClusterLocations(Cluster clusterEntity, FileSystem fs) throws IOException { + String stagingLocation = ClusterHelper.getLocation(clusterEntity, ClusterLocationType.STAGING).getPath(); Path stagingPath = new Path(stagingLocation); if (fs.exists(stagingPath)) { fs.delete(stagingPath, true); } - String workingLocation = ClusterHelper.getLocation(clusterEntity, "working"); + String workingLocation = ClusterHelper.getLocation(clusterEntity, ClusterLocationType.WORKING).getPath(); Path workingPath = new Path(workingLocation); if (fs.exists(workingPath)) { fs.delete(workingPath, true); } } - public static void createClusterLocations(Cluster clusterEntity, - FileSystem fs) throws IOException { - String stagingLocation = ClusterHelper.getLocation(clusterEntity, "staging"); + public static void createClusterLocations(Cluster clusterEntity, FileSystem fs) throws IOException { + createClusterLocations(clusterEntity, fs, true); + } + + public static void createClusterLocations(Cluster clusterEntity, FileSystem fs, boolean withWorking) + throws IOException { + String stagingLocation = ClusterHelper.getLocation(clusterEntity, ClusterLocationType.STAGING).getPath(); Path stagingPath = new Path(stagingLocation); - if (!fs.exists(stagingPath)) { - HadoopClientFactory.mkdirs(fs, stagingPath, HadoopClientFactory.ALL_PERMISSION); + if (fs.exists(stagingPath)) { + fs.delete(stagingPath, true); } - - String workingLocation = ClusterHelper.getLocation(clusterEntity, "working"); - Path workingPath = new Path(workingLocation); - if (!fs.exists(workingPath)) { - HadoopClientFactory - .mkdirs(fs, workingPath, HadoopClientFactory.READ_EXECUTE_PERMISSION); + HadoopClientFactory.mkdirs(fs, stagingPath, HadoopClientFactory.ALL_PERMISSION); + if (withWorking) { + String workingLocation = ClusterHelper.getLocation(clusterEntity, ClusterLocationType.WORKING).getPath(); + Path workingPath = new Path(workingLocation); + if (fs.exists(workingPath)) { + fs.delete(workingPath, true); + } + HadoopClientFactory.mkdirs(fs, workingPath, HadoopClientFactory.READ_EXECUTE_PERMISSION); } } @@ -503,12 +509,11 @@ public class TestContext { initClusterLocations(cluster, fs); } - private static void initClusterLocations(EmbeddedCluster cluster, - FileSystem fs) throws Exception { - String stagingPath = ClusterHelper.getLocation(cluster.getCluster(), "staging"); + private static void initClusterLocations(EmbeddedCluster cluster, FileSystem fs) throws Exception { + String stagingPath = ClusterHelper.getLocation(cluster.getCluster(), ClusterLocationType.STAGING).getPath(); mkdir(fs, new Path(stagingPath), HadoopClientFactory.ALL_PERMISSION); - String workingPath = ClusterHelper.getLocation(cluster.getCluster(), "working"); + String workingPath = ClusterHelper.getLocation(cluster.getCluster(), ClusterLocationType.WORKING).getPath(); mkdir(fs, new Path(workingPath), HadoopClientFactory.READ_EXECUTE_PERMISSION); } http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/webapp/src/test/java/org/apache/falcon/validation/ClusterEntityValidationIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/validation/ClusterEntityValidationIT.java b/webapp/src/test/java/org/apache/falcon/validation/ClusterEntityValidationIT.java index cbbf90a..431d334 100644 --- a/webapp/src/test/java/org/apache/falcon/validation/ClusterEntityValidationIT.java +++ b/webapp/src/test/java/org/apache/falcon/validation/ClusterEntityValidationIT.java @@ -26,8 +26,10 @@ import org.apache.falcon.entity.parser.ValidationException; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.cluster.ACL; import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.cluster.ClusterLocationType; import org.apache.falcon.entity.v0.cluster.Interface; import org.apache.falcon.entity.v0.cluster.Interfacetype; +import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.falcon.resource.TestContext; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -156,32 +158,48 @@ public class ClusterEntityValidationIT { parser.validate(cluster); } - @Test (expectedExceptions = ValidationException.class) + @Test + public void testValidateClusterLocationsWithoutWorking() throws Exception { + overlay = context.getUniqueOverlay(); + String filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay); + InputStream stream = new FileInputStream(filePath); + Cluster clusterEntity = (Cluster) EntityType.CLUSTER.getUnmarshaller().unmarshal(stream); + clusterEntity.getLocations().getLocations().remove(2); + FileSystem clusterFileSystem = FileSystem.get(ClusterHelper.getConfiguration(cluster)); + TestContext.createClusterLocations(clusterEntity, clusterFileSystem, false); + parser.validate(clusterEntity); + String expectedPath = + ClusterHelper.getLocation(clusterEntity, ClusterLocationType.STAGING).getPath() + "/working"; + Assert.assertEquals(ClusterHelper.getLocation(clusterEntity, ClusterLocationType.WORKING).getPath(), + expectedPath); + Assert.assertTrue(clusterFileSystem.getFileLinkStatus(new Path(expectedPath)).isDirectory()); + Assert.assertEquals(clusterFileSystem.getFileLinkStatus(new Path(expectedPath)).getPermission(), + HadoopClientFactory.READ_EXECUTE_PERMISSION); + } + + @Test(expectedExceptions = ValidationException.class) public void testValidateClusterLocationsThatDontExist() throws Exception { TestContext.deleteClusterLocations(cluster, fs); parser.validate(cluster); Assert.fail("Should have thrown a validation exception"); } - @Test (expectedExceptions = ValidationException.class) + @Test(expectedExceptions = ValidationException.class) public void testValidateClusterLocationsThatExistWithBadOwner() throws Exception { - TestContext.deleteClusterLocations(cluster, fs); createClusterLocationsBadPermissions(cluster); parser.validate(cluster); Assert.fail("Should have thrown a validation exception"); } private void createClusterLocationsBadPermissions(Cluster clusterEntity) throws IOException { - String stagingLocation = ClusterHelper.getLocation(clusterEntity, "staging"); + FileSystem clusterFileSystem = FileSystem.get(ClusterHelper.getConfiguration(clusterEntity)); + TestContext.deleteClusterLocations(clusterEntity, clusterFileSystem); + String stagingLocation = ClusterHelper.getLocation(clusterEntity, ClusterLocationType.STAGING).getPath(); Path stagingPath = new Path(stagingLocation); - if (!fs.exists(stagingPath)) { - FileSystem.mkdirs(fs, stagingPath, OWNER_ONLY_PERMISSION); - } + FileSystem.mkdirs(clusterFileSystem, stagingPath, OWNER_ONLY_PERMISSION); - String workingLocation = ClusterHelper.getLocation(clusterEntity, "working"); + String workingLocation = ClusterHelper.getLocation(clusterEntity, ClusterLocationType.WORKING).getPath(); Path workingPath = new Path(workingLocation); - if (!fs.exists(workingPath)) { - FileSystem.mkdirs(fs, stagingPath, OWNER_ONLY_PERMISSION); - } + FileSystem.mkdirs(clusterFileSystem, stagingPath, OWNER_ONLY_PERMISSION); } }
