Repository: falcon Updated Branches: refs/heads/master bb6032b2c -> f3ff8b27f
FALCON-1085 Support Cluster entity updates in Falcon Server Added basic documentation, https://issues.apache.org/jira/browse/FALCON-1937 will contain detailed documentation. Author: bvellanki <[email protected]> Reviewers: "Venkat Ranganathan <[email protected]>, yzheng-hortonworks <[email protected]>, peeyush b <[email protected]>" Closes #127 from bvellanki/FALCON-1085 Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/f3ff8b27 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/f3ff8b27 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/f3ff8b27 Branch: refs/heads/master Commit: f3ff8b27f0a77d802306f0fc9ffdff51ae6c7486 Parents: bb6032b Author: bvellanki <[email protected]> Authored: Tue May 10 13:27:33 2016 -0700 Committer: bvellanki <[email protected]> Committed: Tue May 10 13:27:33 2016 -0700 ---------------------------------------------------------------------- .../org/apache/falcon/cli/FalconEntityCLI.java | 9 +- .../org/apache/falcon/FalconCLIConstants.java | 1 + .../org/apache/falcon/client/FalconClient.java | 9 ++ client/src/main/resources/cluster-0.1.xsd | 1 + client/src/main/resources/datasource-0.1.xsd | 3 +- client/src/main/resources/feed-0.1.xsd | 2 + client/src/main/resources/process-0.1.xsd | 2 + .../org/apache/falcon/entity/ClusterHelper.java | 50 ++++++- .../falcon/entity/ColoClusterRelation.java | 3 +- .../org/apache/falcon/entity/EntityUtil.java | 69 +++++++++ .../falcon/entity/parser/FeedEntityParser.java | 8 ++ .../entity/parser/ProcessEntityParser.java | 8 ++ .../falcon/entity/store/ConfigurationStore.java | 18 ++- .../EntityRelationshipGraphBuilder.java | 28 +++- .../org/apache/falcon/update/UpdateHelper.java | 37 +++++ .../falcon/entity/ColoClusterRelationTest.java | 20 +++ .../apache/falcon/entity/EntityUtilTest.java | 21 +++ .../entity/parser/ClusterEntityParserTest.java | 3 + .../parser/DatasourceEntityParserTest.java | 1 + .../entity/parser/FeedEntityParserTest.java | 10 +- .../entity/parser/ProcessEntityParserTest.java | 12 ++ .../entity/store/ConfigurationStoreTest.java | 31 +++++ .../metadata/MetadataMappingServiceTest.java | 23 +++ .../apache/falcon/update/UpdateHelperTest.java | 64 +++++++++ .../resources/config/process/process-0.1.xml | 2 +- .../src/site/twiki/falconcli/UpdateEntity.twiki | 7 +- docs/src/site/twiki/restapi/EntityUpdate.twiki | 4 +- .../falcon/resource/AbstractEntityManager.java | 95 ++++++++++++- .../proxy/SchedulableEntityManagerProxy.java | 86 +++++++++--- .../org/apache/falcon/unit/TestFalconUnit.java | 6 +- .../falcon/resource/ConfigSyncService.java | 16 +++ .../resource/SchedulableEntityManager.java | 33 ++++- .../falcon/cli/FalconClusterUpdateCLIIT.java | 139 +++++++++++++++++++ .../apache/falcon/cli/FalconSafemodeCLIIT.java | 3 - .../org/apache/falcon/resource/TestContext.java | 1 + .../test/resources/cluster-updated-template.xml | 42 ++++++ 36 files changed, 817 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/cli/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java ---------------------------------------------------------------------- diff --git a/cli/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java b/cli/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java index 37a6992..78b2225 100644 --- a/cli/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java +++ b/cli/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java @@ -66,6 +66,8 @@ public class FalconEntityCLI extends FalconCLI { "Submits an entity xml to Falcon"); Option update = new Option(FalconCLIConstants.UPDATE_OPT, false, "Updates an existing entity xml"); + Option updateClusterDependents = new Option(FalconCLIConstants.UPDATE_CLUSTER_DEPENDENTS_OPT, false, + "Updates dependent entities of a cluster in workflow engine"); Option schedule = new Option(FalconCLIConstants.SCHEDULE_OPT, false, "Schedules a submited entity in Falcon"); Option suspend = new Option(FalconCLIConstants.SUSPEND_OPT, false, @@ -96,6 +98,7 @@ public class FalconEntityCLI extends FalconCLI { OptionGroup group = new OptionGroup(); group.addOption(submit); group.addOption(update); + group.addOption(updateClusterDependents); group.addOption(schedule); group.addOption(suspend); group.addOption(resume); @@ -217,7 +220,8 @@ public class FalconEntityCLI extends FalconCLI { } EntityType entityTypeEnum = null; - if (optionsList.contains(FalconCLIConstants.LIST_OPT)) { + if (optionsList.contains(FalconCLIConstants.LIST_OPT) + || optionsList.contains(FalconCLIConstants.UPDATE_CLUSTER_DEPENDENTS_OPT)) { if (entityType == null) { entityType = ""; } @@ -255,6 +259,9 @@ public class FalconEntityCLI extends FalconCLI { validateColo(optionsList); validateNotEmpty(entityName, FalconCLIConstants.ENTITY_NAME_OPT); result = client.update(entityType, entityName, filePath, skipDryRun, doAsUser).getMessage(); + } else if (optionsList.contains(FalconCLIConstants.UPDATE_CLUSTER_DEPENDENTS_OPT)) { + validateNotEmpty(cluster, FalconCLIConstants.CLUSTER_OPT); + result = client.updateClusterDependents(cluster, skipDryRun, doAsUser).getMessage(); } else if (optionsList.contains(FalconCLIConstants.SUBMIT_AND_SCHEDULE_OPT)) { validateNotEmpty(filePath, "file"); validateColo(optionsList); http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/client/src/main/java/org/apache/falcon/FalconCLIConstants.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/FalconCLIConstants.java b/client/src/main/java/org/apache/falcon/FalconCLIConstants.java index 1db5cfe..31ead63 100644 --- a/client/src/main/java/org/apache/falcon/FalconCLIConstants.java +++ b/client/src/main/java/org/apache/falcon/FalconCLIConstants.java @@ -47,6 +47,7 @@ public final class FalconCLIConstants { public static final String VERSION_OPT = "version"; public static final String SUBMIT_OPT = "submit"; public static final String UPDATE_OPT = "update"; + public static final String UPDATE_CLUSTER_DEPENDENTS_OPT = "updateClusterDependents"; public static final String DELETE_OPT = "delete"; public static final String SUBMIT_AND_SCHEDULE_OPT = "submitAndSchedule"; public static final String VALIDATE_OPT = "validate"; http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/client/src/main/java/org/apache/falcon/client/FalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java index 7a48973..1014d64 100644 --- a/client/src/main/java/org/apache/falcon/client/FalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java @@ -236,6 +236,7 @@ public class FalconClient extends AbstractFalconClient { VALIDATE("api/entities/validate/", HttpMethod.POST, MediaType.TEXT_XML), SUBMIT("api/entities/submit/", HttpMethod.POST, MediaType.TEXT_XML), UPDATE("api/entities/update/", HttpMethod.POST, MediaType.TEXT_XML), + UPDATEDEPENDENTS("api/entities/updateClusterDependents/", HttpMethod.POST, MediaType.TEXT_XML), SUBMITANDSCHEDULE("api/entities/submitAndSchedule/", HttpMethod.POST, MediaType.TEXT_XML), SCHEDULE("api/entities/schedule/", HttpMethod.POST, MediaType.TEXT_XML), SUSPEND("api/entities/suspend/", HttpMethod.POST, MediaType.TEXT_XML), @@ -430,6 +431,14 @@ public class FalconClient extends AbstractFalconClient { return getResponse(APIResult.class, clientResponse); } + public APIResult updateClusterDependents(String clusterName, Boolean skipDryRun, + String doAsUser) throws FalconCLIException { + ClientResponse clientResponse = new ResourceBuilder().path(Entities.UPDATEDEPENDENTS.path, clusterName) + .addQueryParam(SKIP_DRYRUN, skipDryRun).addQueryParam(DO_AS_OPT, doAsUser) + .call(Entities.UPDATEDEPENDENTS); + return getResponse(APIResult.class, clientResponse); + } + @Override public APIResult submitAndSchedule(String entityType, String filePath, Boolean skipDryRun, String doAsUser, String properties) throws FalconCLIException { http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/client/src/main/resources/cluster-0.1.xsd ---------------------------------------------------------------------- diff --git a/client/src/main/resources/cluster-0.1.xsd b/client/src/main/resources/cluster-0.1.xsd index 0e0ada8..03e9f84 100644 --- a/client/src/main/resources/cluster-0.1.xsd +++ b/client/src/main/resources/cluster-0.1.xsd @@ -75,6 +75,7 @@ <xs:attribute type="IDENTIFIER" name="name" use="required"/> <xs:attribute type="xs:string" name="description"/> <xs:attribute type="xs:string" name="colo" use="required"/> + <xs:attribute type="xs:int" name="version" use="optional" default="0"/> </xs:complexType> <xs:complexType name="locations"> <xs:annotation> http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/client/src/main/resources/datasource-0.1.xsd ---------------------------------------------------------------------- diff --git a/client/src/main/resources/datasource-0.1.xsd b/client/src/main/resources/datasource-0.1.xsd index 1202ba1..ef78239 100644 --- a/client/src/main/resources/datasource-0.1.xsd +++ b/client/src/main/resources/datasource-0.1.xsd @@ -76,6 +76,7 @@ <xs:attribute type="IDENTIFIER" name="name" use="required"/> <xs:attribute type="xs:string" name="colo" use="required"/> <xs:attribute type="xs:string" name="description"/> + <xs:attribute type="xs:int" name="version" use="optional" default="0"/> <xs:attribute type="datasource-type" name="type" use="required"> <xs:annotation> <xs:documentation> @@ -263,7 +264,7 @@ <xs:complexType name="ACL"> <xs:annotation> <xs:documentation> - Access control list for this cluster. + Access control list for this Entity. owner is the Owner of this entity. group is the one which has access to read - not used at this time. permission is not enforced at this time http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/client/src/main/resources/feed-0.1.xsd ---------------------------------------------------------------------- diff --git a/client/src/main/resources/feed-0.1.xsd b/client/src/main/resources/feed-0.1.xsd index 77e8663..3488233 100644 --- a/client/src/main/resources/feed-0.1.xsd +++ b/client/src/main/resources/feed-0.1.xsd @@ -129,6 +129,7 @@ </xs:sequence> <xs:attribute type="IDENTIFIER" name="name" use="required"/> <xs:attribute type="xs:string" name="description"/> + <xs:attribute type="xs:int" name="version" use="optional" default="0"/> </xs:complexType> <xs:complexType name="cluster"> <xs:annotation> @@ -168,6 +169,7 @@ <xs:attribute type="cluster-type" name="type" use="optional"/> <xs:attribute type="xs:string" name="partition" use="optional"/> <xs:attribute type="frequency-type" name="delay" use="optional" /> + <xs:attribute type="xs:int" name="version" use="optional" default="0"/> </xs:complexType> <xs:complexType name="partitions"> <xs:annotation> http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/client/src/main/resources/process-0.1.xsd ---------------------------------------------------------------------- diff --git a/client/src/main/resources/process-0.1.xsd b/client/src/main/resources/process-0.1.xsd index 456ebf9..0d01e33 100644 --- a/client/src/main/resources/process-0.1.xsd +++ b/client/src/main/resources/process-0.1.xsd @@ -188,6 +188,7 @@ <xs:element type="ACL" name="ACL" minOccurs="0"/> </xs:sequence> <xs:attribute type="IDENTIFIER" name="name" use="required"/> + <xs:attribute type="xs:int" name="version" use="optional" default="0"/> </xs:complexType> <xs:simpleType name="IDENTIFIER"> @@ -219,6 +220,7 @@ <xs:element type="validity" name="validity"/> </xs:sequence> <xs:attribute type="IDENTIFIER" name="name" use="required"/> + <xs:attribute type="xs:int" name="version" use="optional" default="0"/> </xs:complexType> <xs:complexType name="validity"> http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java index 9d79742..aff4405 100644 --- a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java +++ b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java @@ -32,6 +32,8 @@ import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.falcon.security.SecurityUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.List; @@ -45,7 +47,7 @@ public final class ClusterHelper { public static final String WORKINGDIR = "working"; public static final String NO_USER_BROKER_URL = "NA"; public static final String EMPTY_DIR_NAME = "EMPTY_DIR_DONT_DELETE"; - + private static final Logger LOG = LoggerFactory.getLogger(ClusterHelper.class); private ClusterHelper() { } @@ -123,6 +125,9 @@ public final class ClusterHelper { } public static Interface getInterface(Cluster cluster, Interfacetype type) { + if (cluster.getInterfaces() == null) { + return null; + } for (Interface interf : cluster.getInterfaces().getInterfaces()) { if (interf.getType() == type) { return interf; @@ -143,6 +148,9 @@ public final class ClusterHelper { public static Location getLocation(Cluster cluster, ClusterLocationType clusterLocationType) { + if (cluster.getLocations() == null) { + return null; + } for (Location loc : cluster.getLocations().getLocations()) { if (loc.getName().equals(clusterLocationType)) { return loc; @@ -211,4 +219,44 @@ public final class ClusterHelper { return getStorageUrl(cluster) + getLocation(cluster, ClusterLocationType.STAGING).getPath() + "/" + EMPTY_DIR_NAME; } + + public static boolean matchInterface(final Cluster oldEntity, final Cluster newEntity, + final Interfacetype interfaceType) { + Interface oldInterface = getInterface(oldEntity, interfaceType); + Interface newInterface = getInterface(newEntity, interfaceType); + String oldEndpoint = (oldInterface == null) ? null : oldInterface.getEndpoint(); + String newEndpoint = (newInterface == null) ? null : newInterface.getEndpoint(); + LOG.debug("Verifying if Interfaces match for cluster {} : Old - {}, New - {}", + interfaceType.name(), oldEndpoint, newEndpoint); + return StringUtils.isBlank(oldEndpoint) && StringUtils.isBlank(newEndpoint) + || StringUtils.isNotBlank(oldEndpoint) && oldEndpoint.equalsIgnoreCase(newEndpoint); + } + + public static boolean matchLocations(final Cluster oldEntity, final Cluster newEntity, + final ClusterLocationType locationType) { + Location oldLocation = getLocation(oldEntity, locationType); + Location newLocation = getLocation(newEntity, locationType); + String oldLocationPath = (oldLocation == null) ? null : oldLocation.getPath(); + String newLocationPath = (newLocation == null) ? null : newLocation.getPath(); + LOG.debug("Verifying if Locations match {} : Old - {}, New - {}", + locationType.name(), oldLocationPath, newLocationPath); + return StringUtils.isBlank(oldLocationPath) && StringUtils.isBlank(newLocationPath) + || StringUtils.isNotBlank(oldLocationPath) && oldLocationPath.equalsIgnoreCase(newLocationPath); + } + + public static boolean matchProperties(final Cluster oldEntity, final Cluster newEntity) { + Map<String, String> oldProps = getClusterProperties(oldEntity); + Map<String, String> newProps = getClusterProperties(newEntity); + return oldProps.equals(newProps); + } + + private static Map<String, String> getClusterProperties(final Cluster cluster) { + Map<String, String> returnProps = new HashMap<String, String>(); + if (cluster.getProperties() != null) { + for (Property prop : cluster.getProperties().getProperties()) { + returnProps.put(prop.getName(), prop.getValue()); + } + } + return returnProps; + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/main/java/org/apache/falcon/entity/ColoClusterRelation.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/ColoClusterRelation.java b/common/src/main/java/org/apache/falcon/entity/ColoClusterRelation.java index e4ca91b..a141e43 100644 --- a/common/src/main/java/org/apache/falcon/entity/ColoClusterRelation.java +++ b/common/src/main/java/org/apache/falcon/entity/ColoClusterRelation.java @@ -79,7 +79,8 @@ public final class ColoClusterRelation implements ConfigurationChangeListener { if (oldEntity.getEntityType() != EntityType.CLUSTER) { return; } - throw new FalconException("change shouldn't be supported on cluster!"); + onRemove(oldEntity); + onAdd(newEntity); } @Override http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/main/java/org/apache/falcon/entity/EntityUtil.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java index b181ece..51172f2 100644 --- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java +++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java @@ -35,6 +35,7 @@ import org.apache.falcon.entity.v0.Frequency; import org.apache.falcon.entity.v0.SchemaHelper; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.entity.v0.cluster.ClusterLocationType; +import org.apache.falcon.entity.v0.datasource.Datasource; import org.apache.falcon.entity.v0.datasource.DatasourceType; import org.apache.falcon.entity.v0.cluster.Property; import org.apache.falcon.entity.v0.feed.ClusterType; @@ -130,6 +131,7 @@ public final class EntityUtil { public enum ENTITY_OPERATION { SUBMIT, UPDATE, + UPDATE_CLUSTER_DEPENDENTS, SCHEDULE, SUBMIT_AND_SCHEDULE, DELETE, @@ -706,6 +708,40 @@ public final class EntityUtil { } } + public static Integer getVersion(final Entity entity) throws FalconException { + switch (entity.getEntityType()) { + case FEED: + return ((Feed)entity).getVersion(); + case PROCESS: + return ((Process)entity).getVersion(); + case CLUSTER: + return ((Cluster)entity).getVersion(); + case DATASOURCE: + return ((Datasource)entity).getVersion(); + default: + throw new FalconException("Invalid entity type:" + entity.getEntityType()); + } + } + + public static void setVersion(Entity entity, final Integer version) throws FalconException { + switch (entity.getEntityType()) { + case FEED: + ((Feed)entity).setVersion(version); + break; + case PROCESS: + ((Process)entity).setVersion(version); + break; + case CLUSTER: + ((Cluster)entity).setVersion(version); + break; + case DATASOURCE: + ((Datasource)entity).setVersion(version); + break; + default: + throw new FalconException("Invalid entity type:" + entity.getEntityType()); + } + } + //Staging path that stores scheduler configs like oozie coord/bundle xmls, parent workflow xml //Each entity update creates a new staging path //Base staging path is the base path for all staging dirs @@ -1123,4 +1159,37 @@ public final class EntityUtil { return instancePath; } + /** + * Returns true if entity is dependent on cluster, else returns false. + * @param entity + * @param clusterName + * @return + */ + public static boolean isEntityDependentOnCluster(Entity entity, String clusterName) { + switch (entity.getEntityType()) { + case CLUSTER: + return entity.getName().equalsIgnoreCase(clusterName); + + case FEED: + Feed feed = (Feed) entity; + for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed.getClusters().getClusters()) { + if (cluster.getName().equalsIgnoreCase(clusterName)) { + return true; + } + } + break; + + case PROCESS: + Process process = (Process) entity; + for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) { + if (cluster.getName().equalsIgnoreCase(clusterName)) { + return true; + } + } + break; + default: + } + return false; + } + } http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java index 2a9a852..28fdaf8 100644 --- a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java +++ b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java @@ -97,6 +97,14 @@ public class FeedEntityParser extends EntityParser<Feed> { cluster.getValidity().setEnd(DateUtil.NEVER); } + // set Cluster version + int clusterVersion = ClusterHelper.getCluster(cluster.getName()).getVersion(); + if (cluster.getVersion() > 0 && cluster.getVersion() > clusterVersion) { + throw new ValidationException("Feed should not set cluster to a version that does not exist"); + } else { + cluster.setVersion(clusterVersion); + } + validateClusterValidity(cluster.getValidity().getStart(), cluster.getValidity().getEnd(), cluster.getName()); validateClusterHasRegistry(feed, cluster); http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java index 16fd8b3..8edec5b 100644 --- a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java +++ b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java @@ -91,6 +91,14 @@ public class ProcessEntityParser extends EntityParser<Process> { cluster.getValidity().setEnd(DateUtil.NEVER); } + // set Cluster version + int clusterVersion = ClusterHelper.getCluster(cluster.getName()).getVersion(); + if (cluster.getVersion() > 0 && cluster.getVersion() > clusterVersion) { + throw new ValidationException("Process should not set cluster to a version that does not exist"); + } else { + cluster.setVersion(clusterVersion); + } + validateProcessValidity(cluster.getValidity().getStart(), cluster.getValidity().getEnd()); validateHDFSPaths(process, clusterName); validateProperties(process); http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java index bdcd1af..7f2b172 100644 --- a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java +++ b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java @@ -20,12 +20,15 @@ package org.apache.falcon.entity.store; import org.apache.commons.codec.CharEncoding; import org.apache.falcon.FalconException; +import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.v0.AccessControlList; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.falcon.service.ConfigurationChangeListener; import org.apache.falcon.service.FalconService; +import org.apache.falcon.update.UpdateHelper; import org.apache.falcon.util.ReflectionUtils; import org.apache.falcon.util.StartupProperties; import org.apache.hadoop.fs.FileStatus; @@ -242,9 +245,10 @@ public final class ConfigurationStore implements FalconService { private synchronized void updateInternal(EntityType type, Entity entity) throws FalconException { try { if (get(type, entity.getName()) != null) { - persist(type, entity); ConcurrentHashMap<String, Entity> entityMap = dictionary.get(type); Entity oldEntity = entityMap.get(entity.getName()); + updateVersion(oldEntity, entity); + persist(type, entity); onChange(oldEntity, entity); entityMap.put(entity.getName(), entity); } else { @@ -256,6 +260,18 @@ public final class ConfigurationStore implements FalconService { AUDIT.info(type + "/" + entity.getName() + " is replaced into config store"); } + private void updateVersion(Entity oldentity, Entity newEntity) throws FalconException { + // increase version number for cluster only if dependent feeds/process needs to be updated. + if (oldentity.getEntityType().equals(EntityType.CLUSTER)) { + if (UpdateHelper.isClusterEntityUpdated((Cluster) oldentity, (Cluster) newEntity)) { + EntityUtil.setVersion(newEntity, EntityUtil.getVersion(oldentity) + 1); + } + } else if (!EntityUtil.equals(oldentity, newEntity)) { + // Increase version for other entities if they actually changed. + EntityUtil.setVersion(newEntity, EntityUtil.getVersion(oldentity)); + } + } + public synchronized void update(EntityType type, Entity entity) throws FalconException { if (updatesInProgress.get() == entity) { try { http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java index 25bbf0c..e6851df 100644 --- a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java +++ b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java @@ -120,7 +120,7 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder { EntityType entityType = oldEntity.getEntityType(); switch (entityType) { case CLUSTER: - // a cluster cannot be updated + updateClusterEntity((Cluster) oldEntity, (Cluster) newEntity); break; case PROCESS: updateProcessEntity((Process) oldEntity, (Process) newEntity); @@ -133,7 +133,33 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder { } } + private void updateClusterEntity(Cluster oldCluster, Cluster newCluster) { + LOG.info("Updating Cluster entity: {}", newCluster.getName()); + Vertex clusterEntityVertex = findVertex(oldCluster.getName(), RelationshipType.CLUSTER_ENTITY); + if (clusterEntityVertex == null) { + LOG.error("Illegal State: Cluster entity vertex must exist for {}", oldCluster.getName()); + throw new IllegalStateException(oldCluster.getName() + " entity vertex must exist."); + } + updateColoEdge(oldCluster.getColo(), newCluster.getColo(), clusterEntityVertex); + updateDataClassification(oldCluster.getTags(), newCluster.getTags(), clusterEntityVertex); + } + + private void updateColoEdge(String oldColo, String newColo, Vertex clusterEntityVertex) { + if (areSame(oldColo, newColo)) { + return; + } + Vertex oldColoVertex = findVertex(oldColo, RelationshipType.COLO); + if (oldColoVertex != null) { + removeEdge(clusterEntityVertex, oldColoVertex, RelationshipLabel.CLUSTER_COLO.getName()); + } + Vertex newColoVertex = findVertex(newColo, RelationshipType.COLO); + if (newColoVertex == null) { + newColoVertex = addVertex(newColo, RelationshipType.COLO); + } + + addEdge(clusterEntityVertex, newColoVertex, RelationshipLabel.CLUSTER_COLO.getName()); + } public void updateFeedEntity(Feed oldFeed, Feed newFeed) { LOG.info("Updating feed entity: {}", newFeed.getName()); http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/main/java/org/apache/falcon/update/UpdateHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java index 6603bc6..ae88a01 100644 --- a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java +++ b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java @@ -20,12 +20,15 @@ package org.apache.falcon.update; import org.apache.commons.lang3.StringUtils; import org.apache.falcon.FalconException; +import org.apache.falcon.entity.ClusterHelper; import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.FeedHelper; import org.apache.falcon.entity.ProcessHelper; import org.apache.falcon.entity.Storage; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.cluster.ClusterLocationType; +import org.apache.falcon.entity.v0.cluster.Interfacetype; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.process.Cluster; import org.apache.falcon.entity.v0.process.Process; @@ -71,6 +74,10 @@ public final class UpdateHelper { case PROCESS: return !EntityUtil.equals(oldView, newView, PROCESS_FIELDS); + case CLUSTER: + return isClusterEntityUpdated((org.apache.falcon.entity.v0.cluster.Cluster) oldEntity, + (org.apache.falcon.entity.v0.cluster.Cluster) newEntity); + default: } throw new IllegalArgumentException("Unhandled entity type " + oldEntity.getEntityType()); @@ -129,4 +136,34 @@ public final class UpdateHelper { throw new FalconException("Don't know what to do. Unexpected scenario"); } } + + public static boolean isClusterEntityUpdated(final org.apache.falcon.entity.v0.cluster.Cluster oldEntity, + final org.apache.falcon.entity.v0.cluster.Cluster newEntity) { + /* + * Name should not be updated. + * interface, locations, properties, colo : Update bundle/coord for dependent entities. + * Description, tags, ACL : no need to update bundle/coord for dependent entities. + */ + if (!oldEntity.getColo().equals(newEntity.getColo())) { + return true; + } + + for(Interfacetype interfacetype : Interfacetype.values()) { + if (!ClusterHelper.matchInterface(oldEntity, newEntity, interfacetype)) { + return true; + } + } + + for(ClusterLocationType locationType : ClusterLocationType.values()) { + if (!ClusterHelper.matchLocations(oldEntity, newEntity, locationType)) { + return true; + } + } + + if (!ClusterHelper.matchProperties(oldEntity, newEntity)) { + return true; + } + + return false; + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/test/java/org/apache/falcon/entity/ColoClusterRelationTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/ColoClusterRelationTest.java b/common/src/test/java/org/apache/falcon/entity/ColoClusterRelationTest.java index 0d6e754..2abcece 100644 --- a/common/src/test/java/org/apache/falcon/entity/ColoClusterRelationTest.java +++ b/common/src/test/java/org/apache/falcon/entity/ColoClusterRelationTest.java @@ -69,5 +69,25 @@ public class ColoClusterRelationTest extends AbstractTestBase { clusters = relation.getClusters("colo1"); Assert.assertNotNull(clusters); Assert.assertEquals(0, clusters.size()); + + Cluster updatedCluster3 = new Cluster(); + updatedCluster3.setName(cluster3.getName()); + updatedCluster3.setColo("colo3"); + try { + getStore().initiateUpdate(updatedCluster3); + getStore().update(EntityType.CLUSTER, updatedCluster3); + } finally { + getStore().cleanupUpdateInit(); + } + + relation = ColoClusterRelation.get(); + clusters = relation.getClusters("colo3"); + Assert.assertNotNull(clusters); + Assert.assertEquals(1, clusters.size()); + Assert.assertTrue(clusters.contains(updatedCluster3.getName())); + + clusters = relation.getClusters("colo2"); + Assert.assertNotNull(clusters); + Assert.assertEquals(0, clusters.size()); } } http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java b/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java index c87449c..766b2fa 100644 --- a/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java +++ b/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java @@ -450,4 +450,25 @@ public class EntityUtilTest extends AbstractTestBase { // Ensure latest is returned. Assert.assertEquals(EntityUtil.getLatestStagingPath(cluster, process).getName(), md5 + "_1436357052992"); } + + @Test + public void testIsClusterUsedByEntity() throws Exception { + Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal( + getClass().getResourceAsStream(PROCESS_XML)); + Feed feed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal( + getClass().getResourceAsStream(FEED_XML)); + org.apache.falcon.entity.v0.cluster.Cluster cluster = + (org.apache.falcon.entity.v0.cluster.Cluster) EntityType.CLUSTER.getUnmarshaller().unmarshal( + getClass().getResourceAsStream(CLUSTER_XML)); + + Assert.assertTrue(EntityUtil.isEntityDependentOnCluster(cluster, "testCluster")); + Assert.assertTrue(EntityUtil.isEntityDependentOnCluster(feed, "testCluster")); + Assert.assertTrue(EntityUtil.isEntityDependentOnCluster(feed, "backupCluster")); + Assert.assertTrue(EntityUtil.isEntityDependentOnCluster(process, "testCluster")); + + Assert.assertFalse(EntityUtil.isEntityDependentOnCluster(cluster, "fakeCluster")); + Assert.assertFalse(EntityUtil.isEntityDependentOnCluster(feed, "fakeCluster")); + Assert.assertFalse(EntityUtil.isEntityDependentOnCluster(process, "fakeCluster")); + } + } http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java index c45909f..4b4b657 100644 --- a/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java +++ b/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java @@ -175,6 +175,9 @@ public class ClusterEntityParserTest extends AbstractTestBase { // Good set of properties, should work clusterEntityParser.validateProperties(cluster); + // validate version + Assert.assertEquals(cluster.getVersion(), 0); + // add duplicate property, should throw validation exception. Property property1 = new Property(); property1.setName("field1"); http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/test/java/org/apache/falcon/entity/parser/DatasourceEntityParserTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/parser/DatasourceEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/DatasourceEntityParserTest.java index 3893917..6ade9c9 100644 --- a/common/src/test/java/org/apache/falcon/entity/parser/DatasourceEntityParserTest.java +++ b/common/src/test/java/org/apache/falcon/entity/parser/DatasourceEntityParserTest.java @@ -76,6 +76,7 @@ public class DatasourceEntityParserTest extends AbstractTestBase { Assert.assertEquals("test-hsql-db", databaseEntity.getName()); Assert.assertEquals("hsql", databaseEntity.getType().value()); Assert.assertEquals("org.hsqldb.jdbcDriver", databaseEntity.getDriver().getClazz()); + Assert.assertEquals(datasource.getVersion(), 0); } @Test http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java index ceec3c4..c642fb8 100644 --- a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java +++ b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java @@ -35,11 +35,11 @@ import org.apache.falcon.entity.v0.feed.ActionType; import org.apache.falcon.entity.v0.feed.Argument; import org.apache.falcon.entity.v0.feed.ClusterType; import org.apache.falcon.entity.v0.feed.ExtractMethod; +import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.feed.Location; -import org.apache.falcon.entity.v0.feed.Locations; import org.apache.falcon.entity.v0.feed.LocationType; +import org.apache.falcon.entity.v0.feed.Locations; import org.apache.falcon.entity.v0.feed.MergeType; -import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.feed.Partition; import org.apache.falcon.entity.v0.feed.Partitions; import org.apache.falcon.entity.v0.feed.Property; @@ -87,11 +87,13 @@ public class FeedEntityParserTest extends AbstractTestBase { Cluster cluster = (Cluster) unmarshaller.unmarshal(this.getClass() .getResourceAsStream(CLUSTER_XML)); cluster.setName("testCluster"); + cluster.setVersion(0); store.publish(EntityType.CLUSTER, cluster); cluster = (Cluster) unmarshaller.unmarshal(this.getClass() .getResourceAsStream(CLUSTER_XML)); cluster.setName("backupCluster"); + cluster.setVersion(1); store.publish(EntityType.CLUSTER, cluster); LifecyclePolicyMap.get().init(); @@ -123,11 +125,14 @@ public class FeedEntityParserTest extends AbstractTestBase { assertEquals(feed.getSla().getSlaHigh().toString(), "hours(3)"); assertEquals(feed.getSla().getSlaLow().toString(), "hours(2)"); assertEquals(feed.getGroups(), "online,bi"); + Assert.assertEquals(feed.getVersion(), 0); assertEquals(feed.getClusters().getClusters().get(0).getName(), "testCluster"); assertEquals(feed.getClusters().getClusters().get(0).getSla().getSlaLow().toString(), "hours(3)"); assertEquals(feed.getClusters().getClusters().get(0).getSla().getSlaHigh().toString(), "hours(4)"); + assertEquals(feed.getClusters().getClusters().get(0).getVersion(), 0); + assertEquals(feed.getClusters().getClusters().get(1).getVersion(), 1); assertEquals(feed.getClusters().getClusters().get(0).getType(), ClusterType.SOURCE); @@ -633,6 +638,7 @@ public class FeedEntityParserTest extends AbstractTestBase { Cluster cluster = (Cluster) unmarshaller.unmarshal(this.getClass() .getResourceAsStream(("/config/cluster/cluster-no-registry.xml"))); cluster.setName("badTestCluster"); + cluster.setVersion(0); ConfigurationStore.get().publish(EntityType.CLUSTER, cluster); http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java index 7159966..64f62a5 100644 --- a/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java +++ b/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java @@ -121,6 +121,7 @@ public class ProcessEntityParserTest extends AbstractTestBase { Assert.assertEquals(SchemaHelper.formatDateUTC(processCluster.getValidity().getStart()), "2011-11-02T00:00Z"); Assert.assertEquals(SchemaHelper.formatDateUTC(processCluster.getValidity().getEnd()), "2091-12-30T00:00Z"); Assert.assertEquals(process.getTimezone().getID(), "UTC"); + Assert.assertEquals(processCluster.getVersion(), 0); Assert.assertEquals(process.getSla().getShouldStartIn().toString(), "hours(2)"); Assert.assertEquals(process.getSla().getShouldEndIn().toString(), "hours(4)"); @@ -386,6 +387,17 @@ public class ProcessEntityParserTest extends AbstractTestBase { } @Test + public void testValidateVersion() throws Exception { + InputStream stream = this.getClass().getResourceAsStream(PROCESS_XML); + + Process process = parser.parse(stream); + Assert.assertEquals(process.getVersion(), 0); + process.setVersion(10); + parser.validate(process); + Assert.assertEquals(process.getVersion(), 10); + } + + @Test public void testValidateACLWithACLAndAuthorizationDisabled() throws Exception { InputStream stream = this.getClass().getResourceAsStream("/config/process/process-table.xml"); http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/test/java/org/apache/falcon/entity/store/ConfigurationStoreTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/store/ConfigurationStoreTest.java b/common/src/test/java/org/apache/falcon/entity/store/ConfigurationStoreTest.java index fa3d3f4..8056e80 100644 --- a/common/src/test/java/org/apache/falcon/entity/store/ConfigurationStoreTest.java +++ b/common/src/test/java/org/apache/falcon/entity/store/ConfigurationStoreTest.java @@ -19,8 +19,10 @@ package org.apache.falcon.entity.store; import org.apache.falcon.FalconException; +import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.service.ConfigurationChangeListener; import org.apache.falcon.util.StartupProperties; @@ -96,6 +98,7 @@ public class ConfigurationStoreTest { store.publish(EntityType.PROCESS, process); Process p = store.get(EntityType.PROCESS, "hello"); Assert.assertEquals(p, process); + Assert.assertEquals(p.getVersion(), 0); store.registerListener(listener); process.setName("world"); @@ -109,6 +112,34 @@ public class ConfigurationStoreTest { } @Test + public void testUpdate() throws Exception { + Cluster cluster1 = createClusterObj(); + store.publish(EntityType.CLUSTER, cluster1); + Assert.assertEquals(EntityUtil.getVersion(store.get(EntityType.CLUSTER, "cluster1")).intValue(), 0); + + Cluster cluster2 = createClusterObj(); + cluster2.setDescription("new Desc"); + store.initiateUpdate(cluster2); + store.update(EntityType.CLUSTER, cluster2); + store.cleanupUpdateInit(); + Assert.assertEquals(EntityUtil.getVersion(store.get(EntityType.CLUSTER, "cluster1")).intValue(), 0); + + Cluster cluster3 = createClusterObj(); + cluster3.setColo("newColo"); + store.initiateUpdate(cluster3); + store.update(EntityType.CLUSTER, cluster3); + store.cleanupUpdateInit(); + Assert.assertEquals(EntityUtil.getVersion(store.get(EntityType.CLUSTER, "cluster1")).intValue(), 1); + } + + private Cluster createClusterObj() { + Cluster cluster = new Cluster(); + cluster.setName("cluster1"); + cluster.setColo("colo1"); + return cluster; + } + + @Test public void testGet() throws Exception { Process p = store.get(EntityType.PROCESS, "notfound"); Assert.assertNull(p); http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java index 29f933d..228f522 100644 --- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java +++ b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java @@ -606,6 +606,29 @@ public class MetadataMappingServiceTest { verifyLineageGraphForJobCounters(context); } + @Test(dependsOnMethods = "testOnFeedEntityChange") + public void testOnClusterEntityChange() throws Exception { + long beforeVerticesCount = getVerticesCount(service.getGraph()); + long beforeEdgesCount = getEdgesCount(service.getGraph()); + + Cluster oldCluster = clusterEntity; + Cluster newCluster = EntityBuilderTestUtil.buildCluster(oldCluster.getName(), + "clusterUpdateColo", oldCluster.getTags() + ",clusterUpdateTagKey=clusterUpdateTagVal"); + + try { + configStore.initiateUpdate(newCluster); + configStore.update(EntityType.CLUSTER, newCluster); + } finally { + configStore.cleanupUpdateInit(); + } + + Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 2); // +1 new tag +1 new colo + Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 1); // +1 new tag edge + Vertex newClusterVertex = getEntityVertex(newCluster.getName(), RelationshipType.CLUSTER_ENTITY); + verifyVertexForEdge(newClusterVertex, Direction.OUT, RelationshipLabel.CLUSTER_COLO.getName(), + "clusterUpdateColo", RelationshipType.COLO.getName()); + } + private void verifyUpdatedEdges(Process newProcess) { Vertex processVertex = getEntityVertex(newProcess.getName(), RelationshipType.PROCESS_ENTITY); http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java b/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java index 3e48e26..52b7103 100644 --- a/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java +++ b/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java @@ -21,6 +21,7 @@ package org.apache.falcon.update; import org.apache.falcon.FalconException; import org.apache.falcon.cluster.util.EmbeddedCluster; import org.apache.falcon.entity.AbstractTestBase; +import org.apache.falcon.entity.ClusterHelper; import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.FeedHelper; import org.apache.falcon.entity.parser.EntityParserFactory; @@ -30,7 +31,11 @@ import org.apache.falcon.entity.store.ConfigurationStore; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.Frequency; import org.apache.falcon.entity.v0.SchemaHelper; +import org.apache.falcon.entity.v0.cluster.ACL; import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.cluster.ClusterLocationType; +import org.apache.falcon.entity.v0.cluster.Interface; +import org.apache.falcon.entity.v0.cluster.Interfacetype; import org.apache.falcon.entity.v0.feed.CatalogTable; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.feed.Location; @@ -50,6 +55,7 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import javax.xml.bind.Unmarshaller; import java.io.IOException; import java.io.InputStream; @@ -301,6 +307,64 @@ public class UpdateHelperTest extends AbstractTestBase { Assert.assertTrue(UpdateHelper.isEntityUpdated(newProcess, newerProcess, cluster, procPath)); } + @Test + public void testIsClusterEntityUpdated() throws Exception { + Unmarshaller unmarshaller = EntityType.CLUSTER.getUnmarshaller(); + + String cluster = "testCluster"; + Cluster clusterEntity = ConfigurationStore.get().get(EntityType.CLUSTER, cluster); + Cluster newClusterEntity = (Cluster) unmarshaller.unmarshal(this.getClass().getResource(CLUSTER_XML)); + newClusterEntity.setName(cluster); + Assert.assertNotNull(newClusterEntity); + + // Tags, ACL, description update should not update bundle/workflow for dependent entities + ACL acl = new ACL(); + acl.setOwner("Test"); + acl.setGroup("testGroup"); + acl.setPermission("*"); + newClusterEntity.setACL(acl); + newClusterEntity.setDescription("New Description"); + newClusterEntity.setTags("test=val,test2=val2"); + Assert.assertFalse(UpdateHelper.isClusterEntityUpdated(clusterEntity, newClusterEntity)); + + // Changing colo should trigger update + newClusterEntity.setColo("NewColoValue"); + Assert.assertTrue(UpdateHelper.isClusterEntityUpdated(clusterEntity, newClusterEntity)); + + + // Updating an interface should trigger update bundle/workflow for dependent entities + Interface writeInterface = ClusterHelper.getInterface(newClusterEntity, Interfacetype.WRITE); + newClusterEntity.getInterfaces().getInterfaces().remove(writeInterface); + Assert.assertNotNull(writeInterface); + writeInterface.setEndpoint("hdfs://test.host.name:8020"); + writeInterface.setType(Interfacetype.WRITE); + writeInterface.setVersion("2.2.0"); + newClusterEntity.getInterfaces().getInterfaces().add(writeInterface); + Assert.assertTrue(UpdateHelper.isClusterEntityUpdated(clusterEntity, newClusterEntity)); + + // Updating a property should trigger update bundle/workflow for dependent entities + newClusterEntity = (Cluster) unmarshaller.unmarshal(this.getClass().getResource(CLUSTER_XML)); + newClusterEntity.setName(cluster); + Assert.assertNotNull(newClusterEntity); + org.apache.falcon.entity.v0.cluster.Property property = new org.apache.falcon.entity.v0.cluster.Property(); + property.setName("testName"); + property.setValue("testValue"); + newClusterEntity.getProperties().getProperties().add(property); + Assert.assertTrue(UpdateHelper.isClusterEntityUpdated(clusterEntity, newClusterEntity)); + + // Updating a location should trigger update bundle/workflow for dependent entities + newClusterEntity = (Cluster) unmarshaller.unmarshal(this.getClass().getResource(CLUSTER_XML)); + newClusterEntity.setName(cluster); + Assert.assertNotNull(newClusterEntity); + org.apache.falcon.entity.v0.cluster.Location stagingLocation = + ClusterHelper.getLocation(newClusterEntity, ClusterLocationType.STAGING); + Assert.assertNotNull(stagingLocation); + newClusterEntity.getInterfaces().getInterfaces().remove(stagingLocation); + stagingLocation.setPath("/test/path/here"); + newClusterEntity.getLocations().getLocations().add(stagingLocation); + Assert.assertTrue(UpdateHelper.isClusterEntityUpdated(clusterEntity, newClusterEntity)); + } + private static Location getLocation(Feed feed, LocationType type, String cluster) { org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster); if (feedCluster.getLocations() != null) { http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/test/resources/config/process/process-0.1.xml ---------------------------------------------------------------------- diff --git a/common/src/test/resources/config/process/process-0.1.xml b/common/src/test/resources/config/process/process-0.1.xml index 039208c..4ce7ad1 100644 --- a/common/src/test/resources/config/process/process-0.1.xml +++ b/common/src/test/resources/config/process/process-0.1.xml @@ -16,7 +16,7 @@ See the License for the specific language governing permissions and limitations under the License. --> -<process name="sample" xmlns="uri:falcon:process:0.1"> +<process name="sample" version="0" xmlns="uri:falcon:process:0.1"> <tags>[email protected], [email protected], _department_type=forecasting</tags> <pipelines>testPipeline,dataReplication_Pipeline</pipelines> <clusters> http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/docs/src/site/twiki/falconcli/UpdateEntity.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/falconcli/UpdateEntity.twiki b/docs/src/site/twiki/falconcli/UpdateEntity.twiki index 5d49a76..146a60f 100644 --- a/docs/src/site/twiki/falconcli/UpdateEntity.twiki +++ b/docs/src/site/twiki/falconcli/UpdateEntity.twiki @@ -2,12 +2,15 @@ [[CommonCLI][Common CLI Options]] -Update operation allows an already submitted/scheduled entity to be updated and put it into the archive.Archive path is defined in startup.properties in variable "config.store.uri". Cluster and datasource updates are currently not allowed. +Update operation allows an already submitted/scheduled entity to be updated and put it into the archive.Archive path is defined in startup.properties in variable "config.store.uri". Datasource updates are currently not allowed. Usage: -$FALCON_HOME/bin/falcon entity -type [feed|process] -name <<name>> -update -file <<path_to_file>> +$FALCON_HOME/bin/falcon entity -type [cluster|feed|process] -name <<name>> -update -file <<path_to_file>> Optional Arg : -skipDryRun. When this argument is specified, Falcon skips oozie dryrun. Example: $FALCON_HOME/bin/falcon entity -type process -name hourly-reports-generator -update -file /process/definition.xml + +Note: When a cluster entity is updated, the dependent feed and process bundle+coordinators are updated in the +workflow engine. Hence, only a falcon superuser who has ability to impersonate other users can update a cluster entity. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/docs/src/site/twiki/restapi/EntityUpdate.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/restapi/EntityUpdate.twiki b/docs/src/site/twiki/restapi/EntityUpdate.twiki index 46b01fc..cbf33db 100644 --- a/docs/src/site/twiki/restapi/EntityUpdate.twiki +++ b/docs/src/site/twiki/restapi/EntityUpdate.twiki @@ -8,8 +8,8 @@ Updates the submitted entity. ---++ Parameters - * :entity-type can be feed or process. - * :entity-name is name of the feed or process. + * :entity-type can be cluster, feed or process. + * :entity-name is name of the cluster, feed or process. * skipDryRun : Optional query param, Falcon skips oozie dryrun when value is set to true. * doAs <optional query param> allows the current user to impersonate the user passed in doAs when interacting with the Falcon system. http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java index b319dd1..1f6be41 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java @@ -39,10 +39,14 @@ import org.apache.falcon.entity.v0.EntityGraph; import org.apache.falcon.entity.v0.EntityIntegrityChecker; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.feed.Clusters; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.resource.APIResult.Status; import org.apache.falcon.resource.EntityList.EntityElement; import org.apache.falcon.resource.metadata.AbstractMetadataResource; import org.apache.falcon.security.CurrentUser; +import org.apache.falcon.security.DefaultAuthorizationProvider; import org.apache.falcon.security.SecurityUtil; import org.apache.falcon.util.DeploymentUtil; import org.apache.falcon.util.RuntimeProperties; @@ -50,6 +54,7 @@ import org.apache.falcon.util.StartupProperties; import org.apache.falcon.workflow.WorkflowEngineFactory; import org.apache.falcon.workflow.engine.AbstractWorkflowEngine; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -333,8 +338,8 @@ public abstract class AbstractEntityManager extends AbstractMetadataResource { obtainEntityLocks(oldEntity, "update", tokenList); StringBuilder result = new StringBuilder("Updated successfully"); - //Update in workflow engine - if (!DeploymentUtil.isPrism()) { + //Update in workflow engine if entity is not a cluster (cluster entity is not scheduled) + if (!DeploymentUtil.isPrism() && !entityType.equals(EntityType.CLUSTER)) { Set<String> oldClusters = EntityUtil.getClustersDefinedInColos(oldEntity); Set<String> newClusters = EntityUtil.getClustersDefinedInColos(newEntity); newClusters.retainAll(oldClusters); //common clusters for update @@ -359,6 +364,80 @@ public abstract class AbstractEntityManager extends AbstractMetadataResource { } } + /** + * Updates scheduled dependent entities of a cluster. + * + * @param clusterName Name of cluster + * @param colo colo + * @param skipDryRun Skip dry run during update if set to true + * @return APIResult + */ + public APIResult updateClusterDependents(String clusterName, String colo, Boolean skipDryRun) { + checkColo(colo); + try { + Cluster cluster = EntityUtil.getEntity(EntityType.CLUSTER, clusterName); + verifySafemodeOperation(cluster, EntityUtil.ENTITY_OPERATION.UPDATE_CLUSTER_DEPENDENTS); + int clusterVersion = cluster.getVersion(); + StringBuilder result = new StringBuilder("Updating entities dependent on cluster \n"); + // get dependent entities. check if cluster version changed. if yes, update dependent entities + Pair<String, EntityType>[] dependentEntities = EntityIntegrityChecker.referencedBy(cluster); + if (dependentEntities == null) { + // nothing to update + return new APIResult(APIResult.Status.SUCCEEDED, "Cluster " + + clusterName + " has no dependent entities"); + } + for (Pair<String, EntityType> depEntity : dependentEntities) { + Entity entity = EntityUtil.getEntity(depEntity.second, depEntity.first); + switch (entity.getEntityType()) { + case FEED: + Clusters feedClusters = ((Feed)entity).getClusters(); + List<org.apache.falcon.entity.v0.feed.Cluster> updatedFeedClusters = + new ArrayList<org.apache.falcon.entity.v0.feed.Cluster>(); + if (feedClusters != null) { + for(org.apache.falcon.entity.v0.feed.Cluster feedCluster : feedClusters.getClusters()) { + if (feedCluster.getName().equals(clusterName) + && feedCluster.getVersion() != clusterVersion) { + // update feed cluster entity + feedCluster.setVersion(clusterVersion); + } + updatedFeedClusters.add(feedCluster); + } + ((Feed)entity).getClusters().getClusters().clear(); + ((Feed)entity).getClusters().getClusters().addAll(updatedFeedClusters); + result.append(update(entity, entity.getEntityType().name(), + entity.getName(), skipDryRun).getMessage()); + } + break; + case PROCESS: + org.apache.falcon.entity.v0.process.Clusters processClusters = ((Process)entity).getClusters(); + List<org.apache.falcon.entity.v0.process.Cluster> updatedProcClusters = + new ArrayList<org.apache.falcon.entity.v0.process.Cluster>(); + if (processClusters != null) { + for(org.apache.falcon.entity.v0.process.Cluster procCluster : processClusters.getClusters()) { + if (procCluster.getName().equals(clusterName) + && procCluster.getVersion() != clusterVersion) { + // update feed cluster entity + procCluster.setVersion(clusterVersion); + } + updatedProcClusters.add(procCluster); + } + ((Process)entity).getClusters().getClusters().clear(); + ((Process)entity).getClusters().getClusters().addAll(updatedProcClusters); + result.append(update(entity, entity.getEntityType().name(), + entity.getName(), skipDryRun).getMessage()); + } + break; + default: + break; + } + } + return new APIResult(APIResult.Status.SUCCEEDED, result.toString()); + } catch (FalconException e) { + LOG.error("Update failed", e); + throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); + } + } + private void obtainEntityLocks(Entity entity, String command, List<Entity> tokenList) throws FalconException { //first obtain lock for the entity for which update is issued. @@ -397,14 +476,19 @@ public abstract class AbstractEntityManager extends AbstractMetadataResource { } - private void validateUpdate(Entity oldEntity, Entity newEntity) throws FalconException { + private void validateUpdate(Entity oldEntity, Entity newEntity) throws FalconException, IOException { if (oldEntity.getEntityType() != newEntity.getEntityType() || !oldEntity.equals(newEntity)) { throw new FalconException( oldEntity.toShortString() + " can't be updated with " + newEntity.toShortString()); } if (oldEntity.getEntityType() == EntityType.CLUSTER) { - throw new FalconException("Update not supported for clusters"); + final UserGroupInformation authenticatedUGI = CurrentUser.getAuthenticatedUGI(); + DefaultAuthorizationProvider authorizationProvider = new DefaultAuthorizationProvider(); + if (!authorizationProvider.isSuperUser(authenticatedUGI)) { + throw new FalconException("Permission denied : " + + "Cluster entity update can only be performed by superuser."); + } } String[] props = oldEntity.getEntityType().getImmutableProperties(); @@ -455,7 +539,7 @@ public abstract class AbstractEntityManager extends AbstractMetadataResource { if (entity.getEntityType().equals(EntityType.CLUSTER)) { return; } else { - LOG.error("Entity operation {} is not allowed on non-cluster entities during safemode", + LOG.error("Entity operation {} is only allowed on cluster entities during safemode", operation.name()); throw FalconWebException.newAPIException("Entity operation " + operation.name() + " is only allowed on cluster entities during safemode"); @@ -470,6 +554,7 @@ public abstract class AbstractEntityManager extends AbstractMetadataResource { return; } case SCHEDULE: + case UPDATE_CLUSTER_DEPENDENTS: case SUBMIT_AND_SCHEDULE: case DELETE: case RESUME: http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java index 168f18e..53a9de1 100644 --- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java +++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java @@ -18,25 +18,6 @@ package org.apache.falcon.resource.proxy; -import java.lang.reflect.Constructor; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -import javax.servlet.http.HttpServletRequest; -import javax.ws.rs.Consumes; -import javax.ws.rs.DELETE; -import javax.ws.rs.DefaultValue; -import javax.ws.rs.GET; -import javax.ws.rs.POST; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.MediaType; - import org.apache.commons.lang.StringUtils; import org.apache.falcon.FalconException; import org.apache.falcon.FalconRuntimException; @@ -58,6 +39,24 @@ import org.apache.falcon.resource.channel.Channel; import org.apache.falcon.resource.channel.ChannelFactory; import org.apache.falcon.util.DeploymentUtil; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import java.lang.reflect.Constructor; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + /** * A proxy implementation of the schedulable entity operations. */ @@ -380,6 +379,55 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana } /** + * Updates the dependent entities of a cluster in workflow engine. + * @param clusterName Name of cluster. + * @param ignore colo. + * @param skipDryRun Optional query param, Falcon skips oozie dryrun when value is set to true. + * @return Result of the validation. + */ + @POST + @Path("updateClusterDependents/{clusterName}") + @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON}) + @Monitored(event = "updateClusterDependents") + @Override + public APIResult updateClusterDependents( + @Dimension("entityName") @PathParam("clusterName") final String clusterName, + @Dimension("colo") @QueryParam("colo") String ignore, + @QueryParam("skipDryRun") final Boolean skipDryRun) { + + final Set<String> allColos = getApplicableColos("cluster", clusterName); + Map<String, APIResult> results = new HashMap<String, APIResult>(); + boolean result = true; + + if (!allColos.isEmpty()) { + results.put(FALCON_TAG + "/updateClusterDependents", new EntityProxy("cluster", clusterName) { + @Override + protected Set<String> getColosToApply() { + return allColos; + } + + @Override + protected APIResult doExecute(String colo) throws FalconException { + return getConfigSyncChannel(colo).invoke("updateClusterDependents", clusterName, + colo, skipDryRun); + } + }.execute()); + } + + for (APIResult apiResult : results.values()) { + if (apiResult.getStatus() != APIResult.Status.SUCCEEDED) { + result = false; + } + } + // update only if all are updated + if (!embeddedMode && result) { + results.put(PRISM_TAG, super.updateClusterDependents(clusterName, currentColo, skipDryRun)); + } + + return consolidateResult(results, APIResult.class); + } + + /** * Force updates the entity. * @param type Valid options are feed or process. * @param entityName Name of the entity. http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java ---------------------------------------------------------------------- diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java index 9b1ff2a..56dcf87 100644 --- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java +++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java @@ -195,7 +195,6 @@ public class TestFalconUnit extends FalconUnitTestBase { Process process = getEntity(EntityType.PROCESS, PROCESS_NAME); setDummyProperty(process); String processXml = process.toString(); - File file = new File("target/newprocess.xml"); file.createNewFile(); FileWriter fw = new FileWriter(file.getAbsoluteFile()); @@ -208,9 +207,8 @@ public class TestFalconUnit extends FalconUnitTestBase { result = falconUnitClient.touch(EntityType.PROCESS.name(), PROCESS_NAME, null, true, null); assertStatus(result); - process = getEntity(EntityType.PROCESS, - PROCESS_NAME); - Assert.assertEquals(process.toString(), processXml); + Process process2 = getEntity(EntityType.PROCESS, PROCESS_NAME); + Assert.assertEquals(process2.toString(), process.toString()); file.delete(); } http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/webapp/src/main/java/org/apache/falcon/resource/ConfigSyncService.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/falcon/resource/ConfigSyncService.java b/webapp/src/main/java/org/apache/falcon/resource/ConfigSyncService.java index aa15dcc..7b32bd5 100644 --- a/webapp/src/main/java/org/apache/falcon/resource/ConfigSyncService.java +++ b/webapp/src/main/java/org/apache/falcon/resource/ConfigSyncService.java @@ -87,4 +87,20 @@ public class ConfigSyncService extends AbstractEntityManager { throw FalconWebException.newAPIException(throwable); } } + + @POST + @Path("updateClusterDependents/{clusterName}") + @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN}) + @Monitored(event = "updateClusterDependents") + @Override + public APIResult updateClusterDependents( + @Dimension("entityName") @PathParam("clusterName") final String clusterName, + @Dimension("colo") @QueryParam("colo") String colo, + @QueryParam("skipDryRun") final Boolean skipDryRun) { + try { + return super.updateClusterDependents(clusterName, colo, skipDryRun); + } catch (Throwable throwable) { + throw FalconWebException.newAPIException(throwable); + } + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java index e97adff..657ef9e 100644 --- a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java +++ b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java @@ -18,8 +18,14 @@ package org.apache.falcon.resource; +import org.apache.commons.lang.StringUtils; +import org.apache.falcon.FalconWebException; +import org.apache.falcon.monitors.Dimension; +import org.apache.falcon.monitors.Monitored; + import javax.servlet.http.HttpServletRequest; import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; import javax.ws.rs.POST; @@ -27,16 +33,10 @@ import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; -import javax.ws.rs.DELETE; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import org.apache.commons.lang.StringUtils; -import org.apache.falcon.FalconWebException; -import org.apache.falcon.monitors.Dimension; -import org.apache.falcon.monitors.Monitored; - /** * Entity management operations as REST API for feed and process. */ @@ -79,6 +79,7 @@ public class SchedulableEntityManager extends AbstractSchedulableEntityManager { throw FalconWebException.newAPIException("delete on server is not" + " supported.Please run your operation on Prism.", Response.Status.FORBIDDEN); } + /** * Updates the submitted entity. * @param request Servlet Request @@ -103,6 +104,26 @@ public class SchedulableEntityManager extends AbstractSchedulableEntityManager { } /** + * Updates the dependent entities of a cluster in workflow engine. + * @param clusterName Name of cluster. + * @param ignore colo. + * @param skipDryRun Optional query param, Falcon skips oozie dryrun when value is set to true. + * @return Result of the validation. + */ + @POST + @Path("updateClusterDependents/{clusterName}") + @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON}) + @Monitored(event = "updateClusterDependents") + @Override + public APIResult updateClusterDependents( + @Dimension("entityName") @PathParam("clusterName") final String clusterName, + @Dimension("colo") @QueryParam("colo") String ignore, + @QueryParam("skipDryRun") final Boolean skipDryRun) { + throw FalconWebException.newAPIException("update on server is not" + + " supported.Please run your operation on Prism.", Response.Status.FORBIDDEN); + } + + /** * Submits and schedules an entity. * @param request Servlet Request * @param type Valid options are feed or process. http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/webapp/src/test/java/org/apache/falcon/cli/FalconClusterUpdateCLIIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconClusterUpdateCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconClusterUpdateCLIIT.java new file mode 100644 index 0000000..f5efa37 --- /dev/null +++ b/webapp/src/test/java/org/apache/falcon/cli/FalconClusterUpdateCLIIT.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.cli; + +import org.apache.falcon.resource.TestContext; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.OutputStream; +import java.io.PrintStream; +import java.util.Map; + +/** + * Test for Falcon CLI. + */ +@Test(groups = {"exhaustive"}) +public class FalconClusterUpdateCLIIT { + private InMemoryWriter stream = new InMemoryWriter(System.out); + private TestContext context = new TestContext(); + private Map<String, String> overlay; + + @BeforeClass + public void prepare() throws Exception { + context.prepare(); + FalconCLI.OUT.set(stream); + } + + @AfterClass + public void tearDown() throws Exception { + clearSafemode(); + context.deleteEntitiesFromStore(); + } + + + public void testUpdateClusterCommands() throws Exception { + + FalconCLI.OUT.set(stream); + + String filePath; + overlay = context.getUniqueOverlay(); + + filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay); + Assert.assertEquals(executeWithURL("entity -submit -type cluster -file " + filePath), 0); + context.setCluster(overlay.get("cluster")); + Assert.assertEquals(stream.buffer.toString().trim(), + "falcon/default/Submit successful (cluster) " + context.getClusterName()); + + filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay); + Assert.assertEquals(executeWithURL("entity -submit -type feed -file " + filePath), 0); + + filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay); + Assert.assertEquals(executeWithURL("entity -submit -type feed -file " + filePath), 0); + + filePath = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay); + Assert.assertEquals(executeWithURL("entity -submit -type process -file " + filePath), 0); + + + // Update cluster here and test that it works + + initSafemode(); + filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_UPDATED_TEMPLATE, overlay); + Assert.assertEquals(executeWithURL("entity -update -type cluster -file " + + filePath + " -name " + overlay.get("cluster")), 0); + clearSafemode(); + + // Try to update dependent entities + Assert.assertEquals(executeWithURL("entity -updateClusterDependents -cluster " + + overlay.get("cluster") + " -skipDryRun "), 0); + + // try to update cluster with wrong name, it should fail. + initSafemode(); + overlay = context.getUniqueOverlay(); + filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_UPDATED_TEMPLATE, overlay); + Assert.assertEquals(executeWithURL("entity -update -type cluster -file " + + filePath + " -name " + overlay.get("cluster")), -1); + clearSafemode(); + } + + + private void initSafemode() throws Exception { + // Set safemode + Assert.assertEquals(new FalconCLI().run(("admin -setsafemode true -url " + + TestContext.BASE_URL).split("\\s")), 0); + } + + private void clearSafemode() throws Exception { + Assert.assertEquals(new FalconCLI().run(("admin -setsafemode false -url " + + TestContext.BASE_URL).split("\\s")), 0); + } + + private int executeWithURL(String command) throws Exception { + FalconCLI.OUT.get().print("COMMAND IS "+command + " -url " + TestContext.BASE_URL + "\n"); + return new FalconCLI() + .run((command + " -url " + TestContext.BASE_URL).split("\\s+")); + } + + private static class InMemoryWriter extends PrintStream { + + private StringBuffer buffer = new StringBuffer(); + + public InMemoryWriter(OutputStream out) { + super(out); + } + + @Override + public void println(String x) { + clear(); + buffer.append(x); + super.println(x); + } + + @SuppressWarnings("UnusedDeclaration") + public String getBuffer() { + return buffer.toString(); + } + + public void clear() { + buffer.delete(0, buffer.length()); + } + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/webapp/src/test/java/org/apache/falcon/cli/FalconSafemodeCLIIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconSafemodeCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconSafemodeCLIIT.java index f640a69..d2b62b2 100644 --- a/webapp/src/test/java/org/apache/falcon/cli/FalconSafemodeCLIIT.java +++ b/webapp/src/test/java/org/apache/falcon/cli/FalconSafemodeCLIIT.java @@ -21,7 +21,6 @@ package org.apache.falcon.cli; import org.apache.falcon.entity.v0.SchemaHelper; import org.apache.falcon.resource.TestContext; import org.apache.falcon.util.FalconTestUtil; -import org.apache.falcon.util.StartupProperties; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -84,8 +83,6 @@ public class FalconSafemodeCLIIT { private void clearSafemode() throws Exception { Assert.assertEquals(new FalconCLI().run(("admin -setsafemode false -url " + TestContext.BASE_URL).split("\\s")), 0); - Assert.assertEquals(StartupProperties.get().getProperty(StartupProperties.SAFEMODE_PROPERTY, "false"), - "false"); } public void testEntityCommandsNotAllowedInSafeMode() throws Exception { http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/webapp/src/test/java/org/apache/falcon/resource/TestContext.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java index f84559f..3cf5c18 100644 --- a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java +++ b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java @@ -95,6 +95,7 @@ public class TestContext extends AbstractTestContext { public static final String DATASOURCE_TEMPLATE3 = "/datasource-template3.xml"; public static final String DATASOURCE_TEMPLATE4 = "/datasource-template4.xml"; public static final String CLUSTER_TEMPLATE = "/cluster-template.xml"; + public static final String CLUSTER_UPDATED_TEMPLATE = "/cluster-updated-template.xml"; public static final String PIG_PROCESS_TEMPLATE = "/pig-process-template.xml"; public static final String BASE_URL = "https://localhost:41443/falcon-webapp"; http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/webapp/src/test/resources/cluster-updated-template.xml ---------------------------------------------------------------------- diff --git a/webapp/src/test/resources/cluster-updated-template.xml b/webapp/src/test/resources/cluster-updated-template.xml new file mode 100644 index 0000000..f94e897 --- /dev/null +++ b/webapp/src/test/resources/cluster-updated-template.xml @@ -0,0 +1,42 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<cluster colo="##colo##" description="updated cluster" name="##cluster##" xmlns="uri:falcon:cluster:0.1"> + <interfaces> + <interface type="readonly" endpoint="jail://global:00" + version="0.20.2"/> + <interface type="write" endpoint="jail://global:00" + version="0.20.2"/> + <interface type="execute" endpoint="localhost:41021" version="0.20.2"/> + <interface type="workflow" endpoint="http://localhost:41000/oozie/" + version="3.1"/> + <interface type="messaging" endpoint="tcp://localhost:61616?daemon=true" + version="5.4.3"/> + <interface type="registry" endpoint="thrift://localhost:49083" + version="0.11.0"/> + </interfaces> + <locations> + <location name="staging" path="/projects/falcon/staging"/> + <location name="temp" path="/tmp"/> + <location name="working" path="/projects/falcon/working"/> + </locations> + <properties> + <property name="test1" value="value1"/> + </properties> +</cluster>
