FALCON-1039 Add instance dependency API in falcon. Contributed by Ajay Yadava
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/9fd86b78 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/9fd86b78 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/9fd86b78 Branch: refs/heads/master Commit: 9fd86b786195ac03fc25d31c6f35062c4014f10a Parents: 42f175a Author: Ajay Yadava <[email protected]> Authored: Thu Jun 4 11:42:59 2015 +0530 Committer: Ajay Yadava <[email protected]> Committed: Thu Jun 4 11:43:32 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../java/org/apache/falcon/ResponseHelper.java | 14 + .../java/org/apache/falcon/cli/FalconCLI.java | 21 +- .../org/apache/falcon/client/FalconClient.java | 18 + .../org/apache/falcon/resource/EntityList.java | 6 +- .../resource/InstanceDependencyResult.java | 86 +++++ .../resource/SchedulableEntityInstance.java | 155 ++++++++ .../org/apache/falcon/entity/EntityUtil.java | 142 ++++++- .../org/apache/falcon/entity/FeedHelper.java | 287 +++++++++++++- .../org/apache/falcon/entity/ProcessHelper.java | 108 ++++++ .../apache/falcon/entity/FeedHelperTest.java | 370 ++++++++++++++++++- .../apache/falcon/entity/ProcessHelperTest.java | 207 +++++++++++ docs/src/site/twiki/FalconCLI.twiki | 35 ++ .../site/twiki/restapi/InstanceDependency.twiki | 49 +++ docs/src/site/twiki/restapi/ResourceList.twiki | 1 + .../falcon/resource/AbstractEntityManager.java | 4 +- .../resource/AbstractInstanceManager.java | 79 +++- .../resource/proxy/InstanceManagerProxy.java | 37 +- .../apache/falcon/resource/InstanceManager.java | 20 +- .../resource/SchedulableEntityManager.java | 9 +- .../java/org/apache/falcon/cli/FalconCLIIT.java | 4 + 21 files changed, 1624 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 9c84f85..7ce2ba3 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -4,6 +4,7 @@ Trunk (Unreleased) INCOMPATIBLE CHANGES NEW FEATURES + FALCON-1039 Add instance dependency API in falcon(Ajay Yadava) IMPROVEMENTS FALCON-1060 Handle transaction failures in Lineage(Pavan Kumar Kolamuri via Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/client/src/main/java/org/apache/falcon/ResponseHelper.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/ResponseHelper.java b/client/src/main/java/org/apache/falcon/ResponseHelper.java index 2261ceb..78598ba 100644 --- a/client/src/main/java/org/apache/falcon/ResponseHelper.java +++ b/client/src/main/java/org/apache/falcon/ResponseHelper.java @@ -24,6 +24,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.falcon.entity.v0.SchemaHelper; import org.apache.falcon.resource.FeedInstanceResult; import org.apache.falcon.resource.FeedLookupResult; +import org.apache.falcon.resource.InstanceDependencyResult; import org.apache.falcon.resource.InstancesResult; import org.apache.falcon.resource.InstancesSummaryResult; import org.apache.falcon.resource.EntitySummaryResult; @@ -277,4 +278,17 @@ public final class ResponseHelper { sb.append("\nRequest Id: ").append(feedLookupResult.getRequestId()); return sb.toString(); } + + public static String getString(InstanceDependencyResult dependencyResult) { + StringBuilder sb = new StringBuilder(); + String results = dependencyResult.toString(); + if (StringUtils.isEmpty(results)) { + sb.append("No dependencies found!"); + } else { + sb.append(results); + } + sb.append("\n\nResponse: ").append(dependencyResult.getMessage()); + sb.append("\nRequest Id: ").append(dependencyResult.getRequestId()); + return sb.toString(); + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/client/src/main/java/org/apache/falcon/cli/FalconCLI.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java index a5e3728..f169917 100644 --- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java +++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java @@ -18,7 +18,6 @@ package org.apache.falcon.cli; -import org.apache.falcon.ResponseHelper; import com.sun.jersey.api.client.ClientHandlerException; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; @@ -28,12 +27,14 @@ import org.apache.commons.cli.ParseException; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.falcon.LifeCycle; +import org.apache.falcon.ResponseHelper; import org.apache.falcon.client.FalconCLIException; import org.apache.falcon.client.FalconClient; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.SchemaHelper; import org.apache.falcon.resource.EntityList; import org.apache.falcon.resource.FeedLookupResult; +import org.apache.falcon.resource.InstanceDependencyResult; import org.apache.falcon.resource.InstancesResult; import java.io.IOException; @@ -102,6 +103,7 @@ public class FalconCLI { public static final String FORCE_RERUN_FLAG = "force"; public static final String INSTANCE_CMD = "instance"; + public static final String INSTANCE_TIME_OPT = "instanceTime"; public static final String START_OPT = "start"; public static final String END_OPT = "end"; public static final String RUNNING_OPT = "running"; @@ -229,6 +231,7 @@ public class FalconCLI { String result; String type = commandLine.getOptionValue(ENTITY_TYPE_OPT); String entity = commandLine.getOptionValue(ENTITY_NAME_OPT); + String instanceTime = commandLine.getOptionValue(INSTANCE_TIME_OPT); String start = commandLine.getOptionValue(START_OPT); String end = commandLine.getOptionValue(END_OPT); String filePath = commandLine.getOptionValue(FILE_PATH_OPT); @@ -250,7 +253,12 @@ public class FalconCLI { validateInstanceCommands(optionsList, entity, type, colo); - if (optionsList.contains(RUNNING_OPT)) { + if (optionsList.contains(DEPENDENCY_OPT)) { + validateNotEmpty(instanceTime, INSTANCE_TIME_OPT); + InstanceDependencyResult response = client.getInstanceDependencies(type, entity, instanceTime, colo); + result = ResponseHelper.getString(response); + + } else if (optionsList.contains(RUNNING_OPT)) { validateOrderBy(orderBy, instanceAction); validateFilterBy(filterBy, instanceAction); result = @@ -785,6 +793,11 @@ public class FalconCLI { false, "Displays feed listing and their status between a start and end time range."); + Option dependency = new Option( + DEPENDENCY_OPT, + false, + "Displays dependent instances for a specified instance."); + OptionGroup group = new OptionGroup(); group.addOption(running); group.addOption(list); @@ -798,6 +811,7 @@ public class FalconCLI { group.addOption(logs); group.addOption(params); group.addOption(listing); + group.addOption(dependency); Option url = new Option(URL_OPTION, true, "Falcon URL"); Option start = new Option(START_OPT, true, @@ -843,6 +857,8 @@ public class FalconCLI { Option forceRerun = new Option(FORCE_RERUN_FLAG, false, "Flag to forcefully rerun entire workflow of an instance"); + Option instanceTime = new Option(INSTANCE_TIME_OPT, true, "Time for an instance"); + instanceOptions.addOption(url); instanceOptions.addOptionGroup(group); instanceOptions.addOption(start); @@ -861,6 +877,7 @@ public class FalconCLI { instanceOptions.addOption(sortOrder); instanceOptions.addOption(numResults); instanceOptions.addOption(forceRerun); + instanceOptions.addOption(instanceTime); return instanceOptions; } http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/client/src/main/java/org/apache/falcon/client/FalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java index 786e0a0..20c32e4 100644 --- a/client/src/main/java/org/apache/falcon/client/FalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java @@ -37,6 +37,7 @@ import org.apache.falcon.resource.EntityList; import org.apache.falcon.resource.EntitySummaryResult; import org.apache.falcon.resource.FeedInstanceResult; import org.apache.falcon.resource.FeedLookupResult; +import org.apache.falcon.resource.InstanceDependencyResult; import org.apache.falcon.resource.InstancesResult; import org.apache.falcon.resource.InstancesSummaryResult; import org.apache.falcon.resource.LineageGraphResult; @@ -242,6 +243,7 @@ public class FalconClient { LOG("api/instance/logs/", HttpMethod.GET, MediaType.APPLICATION_JSON), SUMMARY("api/instance/summary/", HttpMethod.GET, MediaType.APPLICATION_JSON), PARAMS("api/instance/params/", HttpMethod.GET, MediaType.APPLICATION_JSON), + DEPENDENCY("api/instance/dependencies/", HttpMethod.GET, MediaType.APPLICATION_JSON), LISTING("api/instance/listing/", HttpMethod.GET, MediaType.APPLICATION_JSON); private String path; @@ -805,6 +807,22 @@ public class FalconClient { return clientResponse; } + public InstanceDependencyResult getInstanceDependencies(String entityType, String entityName, String instanceTime, + String colo) throws FalconCLIException { + checkType(entityType); + Instances api = Instances.DEPENDENCY; + + WebResource resource = service.path(api.path).path(entityType).path(entityName); + resource = resource.queryParam("instanceTime", instanceTime); + resource = resource.queryParam("colo", colo); + ClientResponse clientResponse = resource + .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) + .accept(api.mimeType) + .method(api.method, ClientResponse.class); + checkIfSuccessful(clientResponse); + return clientResponse.getEntity(InstanceDependencyResult.class); + } + //RESUME CHECKSTYLE CHECK VisibilityModifierCheck private void checkLifeCycleOption(List<LifeCycle> lifeCycles, String type) throws FalconCLIException { http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/client/src/main/java/org/apache/falcon/resource/EntityList.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/resource/EntityList.java b/client/src/main/java/org/apache/falcon/resource/EntityList.java index ee33234..6e132f0 100644 --- a/client/src/main/java/org/apache/falcon/resource/EntityList.java +++ b/client/src/main/java/org/apache/falcon/resource/EntityList.java @@ -36,6 +36,8 @@ import java.util.List; @XmlAccessorType(XmlAccessType.FIELD) @edu.umd.cs.findbugs.annotations.SuppressWarnings({"EI_EXPOSE_REP", "EI_EXPOSE_REP2"}) public class EntityList { + public static final String INPUT_TAG = "Input"; + public static final String OUTPUT_TAG = "Output"; @XmlElement private int totalResults; @@ -184,14 +186,14 @@ public class EntityList { if (process.getInputs() != null) { for (Input i : process.getInputs().getInputs()) { if (i.getFeed().equals(entityNameToMatch)) { - tagList.add("Input"); + tagList.add(INPUT_TAG); } } } if (process.getOutputs() != null) { for (Output o : process.getOutputs().getOutputs()) { if (o.getFeed().equals(entityNameToMatch)) { - tagList.add("Output"); + tagList.add(OUTPUT_TAG); } } } http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/client/src/main/java/org/apache/falcon/resource/InstanceDependencyResult.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/resource/InstanceDependencyResult.java b/client/src/main/java/org/apache/falcon/resource/InstanceDependencyResult.java new file mode 100644 index 0000000..0751f12 --- /dev/null +++ b/client/src/main/java/org/apache/falcon/resource/InstanceDependencyResult.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.resource; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; + +/** + * Entity list used for marshalling / unmarshalling with REST calls. + */ +@XmlRootElement(name = "dependents") +@XmlAccessorType(XmlAccessType.FIELD) [email protected]({"EI_EXPOSE_REP", "EI_EXPOSE_REP2"}) +public class InstanceDependencyResult extends APIResult { + + @XmlElement(name = "dependencies") + private SchedulableEntityInstance[] dependencies; + + //For JAXB + private InstanceDependencyResult() { + super(); + } + + public InstanceDependencyResult(Status status, String message) { + super(status, message); + } + + public SchedulableEntityInstance[] getDependencies() { + return dependencies; + } + + public void setDependencies(SchedulableEntityInstance[] dependencies) { + this.dependencies = dependencies; + } + + + @Override + public Object[] getCollection() { + return getDependencies(); + } + + @Override + public void setCollection(Object[] items) { + if (items == null) { + setDependencies(new SchedulableEntityInstance[0]); + } else { + SchedulableEntityInstance[] newInstances = new SchedulableEntityInstance[items.length]; + for (int index = 0; index < items.length; index++) { + newInstances[index] = (SchedulableEntityInstance)items[index]; + } + setDependencies(newInstances); + } + } + + @Override + public String toString() { + StringBuilder buffer = new StringBuilder(); + if (dependencies != null) { + for (SchedulableEntityInstance element : dependencies) { + buffer.append(element.toString()); + buffer.append("\n"); + } + } + return buffer.toString(); + } + + +} http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/client/src/main/java/org/apache/falcon/resource/SchedulableEntityInstance.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/resource/SchedulableEntityInstance.java b/client/src/main/java/org/apache/falcon/resource/SchedulableEntityInstance.java new file mode 100644 index 0000000..2a7ecdb --- /dev/null +++ b/client/src/main/java/org/apache/falcon/resource/SchedulableEntityInstance.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.resource; + +import org.apache.commons.lang3.StringUtils; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.SchemaHelper; + +import java.util.Date; + +/** + * Instance of a Schedulable Entity (Feed/Process). + */ +public class SchedulableEntityInstance { + + public static final String INPUT = "Input"; + public static final String OUTPUT = "Output"; + + private String entityName; + + private String cluster; + + private Date instanceTime; + + private EntityType entityType; + + private String tag; + + //for JAXB + private SchedulableEntityInstance() { + + } + + public SchedulableEntityInstance(String entityName, String cluster, Date instanceTime, EntityType type) { + this.entityName = entityName; + this.cluster = cluster; + this.entityType = type; + if (instanceTime != null) { + this.instanceTime = new Date(instanceTime.getTime()); + } + } + + public String getTag() { + return tag; + } + + public void setTag(String tag) { + this.tag = tag; + } + + public String getEntityName() { + return entityName; + } + + public void setEntityName(String entityName) { + this.entityName = entityName; + } + + public String getCluster() { + return cluster; + } + + public void setCluster(String cluster) { + this.cluster = cluster; + } + + public EntityType getEntityType() { + return entityType; + } + + public void setEntityType(EntityType entityType) { + this.entityType = entityType; + } + + public Date getInstanceTime() { + return new Date(instanceTime.getTime()); + } + + public void setInstanceTime(Date instanceTime) { + this.instanceTime = new Date(instanceTime.getTime()); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("name: " + entityName + + ", type: " + entityType + + ", cluster: " + cluster + + ", instanceTime: " + SchemaHelper.formatDateUTC(instanceTime)); + sb.append(", tag: " + ((tag != null) ? tag : "")); + return sb.toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + SchedulableEntityInstance that = (SchedulableEntityInstance) o; + + if (instanceTime == null ? that.instanceTime != null : !instanceTime.equals(that.instanceTime)) { + return false; + } + + if (!entityType.equals(that.entityType)) { + return false; + } + + if (!StringUtils.equals(entityName, that.entityName)) { + return false; + } + + if (!StringUtils.equals(cluster, that.cluster)) { + return false; + } + + if (!StringUtils.equals(tag, that.tag)) { + return false; + } + + return true; + } + + @Override + public int hashCode() { + int result = instanceTime.hashCode(); + result = 31 * result + entityName.hashCode(); + result = 31 * result + entityType.hashCode(); + result = 31 * result + cluster.hashCode(); + if (tag != null) { + result = 31 * result + tag.hashCode(); + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/common/src/main/java/org/apache/falcon/entity/EntityUtil.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java index 26d3da2..7ebf39e 100644 --- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java +++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java @@ -28,6 +28,7 @@ import org.apache.falcon.Tag; import org.apache.falcon.entity.WorkflowNameBuilder.WorkflowName; import org.apache.falcon.entity.store.ConfigurationStore; import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityGraph; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.Frequency; import org.apache.falcon.entity.v0.SchemaHelper; @@ -35,9 +36,13 @@ import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.entity.v0.cluster.ClusterLocationType; import org.apache.falcon.entity.v0.feed.ClusterType; import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.entity.v0.process.*; +import org.apache.falcon.entity.v0.process.LateInput; +import org.apache.falcon.entity.v0.process.LateProcess; +import org.apache.falcon.entity.v0.process.PolicyType; import org.apache.falcon.entity.v0.process.Process; +import org.apache.falcon.entity.v0.process.Retry; import org.apache.falcon.hadoop.HadoopClientFactory; +import org.apache.falcon.resource.EntityList; import org.apache.falcon.util.DeploymentUtil; import org.apache.falcon.util.RuntimeProperties; import org.apache.hadoop.fs.FileStatus; @@ -53,7 +58,19 @@ import java.lang.reflect.Method; import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Calendar; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TimeZone; /** * Helper to get entity object. @@ -65,6 +82,7 @@ public final class EntityUtil { private static final long HOUR_IN_MS = 60 * MINUTE_IN_MS; private static final long DAY_IN_MS = 24 * HOUR_IN_MS; private static final long MONTH_IN_MS = 31 * DAY_IN_MS; + private static final long ONE_MS = 1; public static final String SUCCEEDED_FILE_NAME = "_SUCCESS"; @@ -244,8 +262,8 @@ public final class EntityUtil { return feed.getTimezone(); } - public static Date getNextStartTime(Date startTime, Frequency frequency, TimeZone timezone, Date now) { - if (startTime.after(now)) { + public static Date getNextStartTime(Date startTime, Frequency frequency, TimeZone timezone, Date referenceTime) { + if (startTime.after(referenceTime)) { return startTime; } @@ -255,16 +273,16 @@ public final class EntityUtil { int count = 0; switch (frequency.getTimeUnit()) { case months: - count = (int) ((now.getTime() - startTime.getTime()) / MONTH_IN_MS); + count = (int) ((referenceTime.getTime() - startTime.getTime()) / MONTH_IN_MS); break; case days: - count = (int) ((now.getTime() - startTime.getTime()) / DAY_IN_MS); + count = (int) ((referenceTime.getTime() - startTime.getTime()) / DAY_IN_MS); break; case hours: - count = (int) ((now.getTime() - startTime.getTime()) / HOUR_IN_MS); + count = (int) ((referenceTime.getTime() - startTime.getTime()) / HOUR_IN_MS); break; case minutes: - count = (int) ((now.getTime() - startTime.getTime()) / MINUTE_IN_MS); + count = (int) ((referenceTime.getTime() - startTime.getTime()) / MINUTE_IN_MS); break; default: } @@ -273,7 +291,7 @@ public final class EntityUtil { if (count > 2) { startCal.add(frequency.getTimeUnit().getCalendarUnit(), ((count - 2) / freq) * freq); } - while (startCal.getTime().before(now)) { + while (startCal.getTime().before(referenceTime)) { startCal.add(frequency.getTimeUnit().getCalendarUnit(), freq); } return startCal.getTime(); @@ -747,6 +765,12 @@ public final class EntityUtil { return pipelines; } + public static EntityList getEntityDependencies(Entity entity) throws FalconException { + Set<Entity> dependents = EntityGraph.get().getDependents(entity); + Entity[] dependentEntities = dependents.toArray(new Entity[dependents.size()]); + return new EntityList(dependentEntities, entity); + } + public static Pair<Date, Date> getEntityStartEndDates(Entity entityObject) { Set<String> clusters = EntityUtil.getClustersDefined(entityObject); Pair<Date, String> clusterMinStartDate = null; @@ -761,4 +785,104 @@ public final class EntityUtil { } return new Pair<Date, Date>(clusterMinStartDate.first, clusterMaxEndDate.first); } + + /** + * Returns the previous instance(before or on) for a given referenceTime + * + * Example: For a feed in "UTC" with startDate "2014-01-01 00:00" and frequency of "days(1)" a referenceTime + * of "2015-01-01 00:00" will return "2015-01-01 00:00". + * + * Similarly for the above feed if we give a reference Time of "2015-01-01 04:00" will also result in + * "2015-01-01 00:00" + * + * @param startTime start time of the entity + * @param frequency frequency of the entity + * @param tz timezone of the entity + * @param referenceTime time before which the instanceTime is desired + * @return instance(before or on) the referenceTime + */ + public static Date getPreviousInstanceTime(Date startTime, Frequency frequency, TimeZone tz, Date referenceTime) { + if (tz == null) { + tz = TimeZone.getTimeZone("UTC"); + } + Calendar insCal = Calendar.getInstance(tz); + insCal.setTime(startTime); + + int instanceCount = getInstanceSequence(startTime, frequency, tz, referenceTime); + final int freq = frequency.getFrequencyAsInt() * instanceCount; + insCal.add(frequency.getTimeUnit().getCalendarUnit(), freq); + + while (insCal.getTime().after(referenceTime)) { + insCal.add(frequency.getTimeUnit().getCalendarUnit(), -1); + } + return insCal.getTime(); + } + + /** + * Find the times at which the given entity will run in a given time range. + * <p/> + * Both start and end Date are inclusive. + * + * @param entity feed or process entity whose instance times are to be found + * @param clusterName name of the cluster + * @param startRange start time for the input range + * @param endRange end time for the input range + * @return List of instance times at which the entity will run in the given time range + */ + public static List<Date> getEntityInstanceTimes(Entity entity, String clusterName, Date startRange, Date endRange) { + Date start = null; + switch (entity.getEntityType()) { + + case FEED: + Feed feed = (Feed) entity; + start = FeedHelper.getCluster(feed, clusterName).getValidity().getStart(); + return getInstanceTimes(start, feed.getFrequency(), feed.getTimezone(), + startRange, endRange); + + case PROCESS: + Process process = (Process) entity; + start = ProcessHelper.getCluster(process, clusterName).getValidity().getStart(); + return getInstanceTimes(start, process.getFrequency(), + process.getTimezone(), startRange, endRange); + + default: + throw new IllegalArgumentException("Unhandled type: " + entity.getEntityType()); + } + } + + + /** + * Find instance times given first instance start time and frequency till a given end time. + * + * It finds the first valid instance time in the given time range, it then uses frequency to find next instances + * in the given time range. + * + * @param startTime startTime of the entity (time of first instance ever of the given entity) + * @param frequency frequency of the entity + * @param timeZone timeZone of the entity + * @param startRange start time for the input range of interest. + * @param endRange end time for the input range of interest. + * @return list of instance run times of the given entity in the given time range. + */ + public static List<Date> getInstanceTimes(Date startTime, Frequency frequency, TimeZone timeZone, + Date startRange, Date endRange) { + List<Date> result = new LinkedList<>(); + if (timeZone == null) { + timeZone = TimeZone.getTimeZone("UTC"); + } + + while(true){ + Date nextStartTime = getNextStartTime(startTime, frequency, timeZone, startRange); + if (nextStartTime.before(startRange) || nextStartTime.after(endRange)){ + break; + } + + result.add(nextStartTime); + // this is required because getNextStartTime returns greater than or equal to referenceTime + startRange = new Date(nextStartTime.getTime() + ONE_MS); // 1 milli seconds later + } + return result; + } + + } http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/common/src/main/java/org/apache/falcon/entity/FeedHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java index acb8598..9f4eb61 100644 --- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java +++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java @@ -19,6 +19,7 @@ package org.apache.falcon.entity; import org.apache.commons.lang3.StringUtils; + import org.apache.falcon.FalconException; import org.apache.falcon.LifeCycle; import org.apache.falcon.Tag; @@ -33,16 +34,33 @@ import org.apache.falcon.entity.v0.feed.Location; import org.apache.falcon.entity.v0.feed.LocationType; import org.apache.falcon.entity.v0.feed.Locations; import org.apache.falcon.entity.v0.feed.Sla; +import org.apache.falcon.entity.v0.process.Input; +import org.apache.falcon.entity.v0.process.Output; +import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.expression.ExpressionHelper; import org.apache.falcon.resource.APIResult; +import org.apache.falcon.resource.EntityList; import org.apache.falcon.resource.FeedInstanceResult; +import org.apache.falcon.resource.SchedulableEntityInstance; import org.apache.falcon.util.BuildProperties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.URISyntaxException; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Calendar; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.TimeZone; import java.util.regex.Matcher; /** @@ -50,6 +68,9 @@ import java.util.regex.Matcher; */ public final class FeedHelper { + private static final Logger LOG = LoggerFactory.getLogger(FeedHelper.class); + private static final int ONE_MS = 1; + public static final String FORMAT = "yyyyMMddHHmm"; private FeedHelper() {} @@ -273,7 +294,7 @@ public final class FeedHelper { public static Properties getClusterProperties(org.apache.falcon.entity.v0.cluster.Cluster cluster) { Properties properties = new Properties(); - Map<String, String> clusterVars = new HashMap<String, String>(); + Map<String, String> clusterVars = new HashMap<>(); clusterVars.put("colo", cluster.getColo()); clusterVars.put("name", cluster.getName()); if (cluster.getProperties() != null) { @@ -354,7 +375,7 @@ public final class FeedHelper { * Extracts date from the actual data path e.g., /path/2014/05/06 maps to 2014-05-06T00:00Z. * @param instancePath - actual data path * @param templatePath - template path from feed definition - * @param timeZone + * @param timeZone timeZone * @return date corresponding to the path */ //consider just the first occurrence of the pattern @@ -364,7 +385,7 @@ public final class FeedHelper { Calendar cal = Calendar.getInstance(timeZone); int lastEnd = 0; - Set<FeedDataPath.VARS> matchedVars = new HashSet<FeedDataPath.VARS>(); + Set<FeedDataPath.VARS> matchedVars = new HashSet<>(); while (matcher.find()) { FeedDataPath.VARS pathVar = FeedDataPath.VARS.from(matcher.group()); String pad = templatePath.substring(lastEnd, matcher.start()); @@ -415,6 +436,264 @@ public final class FeedHelper { } + private static void validateFeedInstance(Feed feed, Date instanceTime, + org.apache.falcon.entity.v0.cluster.Cluster cluster) { + + // validate the cluster + Cluster feedCluster = getCluster(feed, cluster.getName()); + if (feedCluster == null) { + throw new IllegalArgumentException("Cluster :" + cluster.getName() + " is not a valid cluster for feed:" + + feed.getName()); + } + + // validate that instanceTime is in validity range + if (feedCluster.getValidity().getStart().after(instanceTime) + || feedCluster.getValidity().getEnd().before(instanceTime)) { + throw new IllegalArgumentException("instanceTime: " + instanceTime + " is not in validity range for" + + " Feed: " + feed.getName() + " on cluster:" + cluster.getName()); + } + + // validate instanceTime on basis of startTime and frequency + Date nextInstance = EntityUtil.getNextStartTime(feedCluster.getValidity().getStart(), feed.getFrequency(), + feed.getTimezone(), instanceTime); + if (!nextInstance.equals(instanceTime)) { + throw new IllegalArgumentException("instanceTime: " + instanceTime + " is not a valid instance for the " + + " feed: " + feed.getName() + " on cluster: " + cluster.getName() + + " on the basis of startDate and frequency"); + } + } + + /** + * Given a feed Instance finds the generating process instance. + * + * [process, cluster, instanceTime] + * + * If the feed is replicated, then it returns null. + * + * @param feed output feed + * @param feedInstanceTime instance time of the feed + * @return returns the instance of the process which produces the given feed + */ + public static SchedulableEntityInstance getProducerInstance(Feed feed, Date feedInstanceTime, + org.apache.falcon.entity.v0.cluster.Cluster cluster) throws FalconException { + + //validate the inputs + validateFeedInstance(feed, feedInstanceTime, cluster); + + Process process = getProducerProcess(feed); + if (process != null) { + try { + Date processInstanceTime = getProducerInstanceTime(feed, feedInstanceTime, process, cluster); + SchedulableEntityInstance producer = new SchedulableEntityInstance(process.getName(), cluster.getName(), + processInstanceTime, EntityType.PROCESS); + producer.setTag(SchedulableEntityInstance.OUTPUT); + return producer; + } catch (FalconException e) { + LOG.error("Error in trying to get producer process: {}'s instance time for feed: {}'s instance: } " + + " on cluster:{}", process.getName(), feed.getName(), feedInstanceTime, cluster.getName()); + } + } + return null; + } + + /** + * Given a feed find it's generating process. + * + * If no generating process is found it returns null. + * @param feed output feed + * @return Process which produces the given feed. + */ + public static Process getProducerProcess(Feed feed) throws FalconException { + + EntityList dependencies = EntityUtil.getEntityDependencies(feed); + + for (EntityList.EntityElement e : dependencies.getElements()) { + if (e.tag.contains(EntityList.OUTPUT_TAG)) { + return EntityUtil.getEntity(EntityType.PROCESS, e.name); + } + } + return null; + } + + /** + * Find the producerInstance which will generate the given feedInstance. + * + * @param feed output feed + * @param feedInstanceTime instance time of the output feed + * @param producer producer process + * @return time of the producer instance which will produce the given feed instance. + */ + private static Date getProducerInstanceTime(Feed feed, Date feedInstanceTime, Process producer, + org.apache.falcon.entity.v0.cluster.Cluster cluster) throws FalconException { + + String clusterName = cluster.getName(); + Cluster feedCluster = getCluster(feed, clusterName); + org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(producer, clusterName); + Date producerStartDate = processCluster.getValidity().getStart(); + + // read the process definition and find the relative time difference between process and output feed + // if output process instance time is now then output FeedInstance time is x + String outputInstance = null; + for (Output op : producer.getOutputs().getOutputs()) { + if (StringUtils.equals(feed.getName(), op.getFeed())) { + outputInstance = op.getInstance(); + } + } + + ExpressionHelper.setReferenceDate(producerStartDate); + ExpressionHelper evaluator = ExpressionHelper.get(); + // producerInstance = feedInstanceTime + (difference between producer process and feed) + // the feedInstance before or equal to this time is the required one + Date relativeFeedInstance = evaluator.evaluate(outputInstance, Date.class); + Date feedInstanceActual = EntityUtil.getPreviousInstanceTime(feedCluster.getValidity().getStart(), + feed.getFrequency(), feed.getTimezone(), relativeFeedInstance); + Long producerInstanceTime = feedInstanceTime.getTime() + (producerStartDate.getTime() + - feedInstanceActual.getTime()); + Date producerInstance = new Date(producerInstanceTime); + + //validate that the producerInstance is in the validity range on the provided cluster + if (producerInstance.before(processCluster.getValidity().getStart()) + || producerInstance.after(processCluster.getValidity().getEnd())) { + throw new IllegalArgumentException("Instance time provided: " + feedInstanceTime + + " for feed " + feed.getName() + + " is outside the range of instances produced by the producer process: " + producer.getName() + + " in it's validity range on provided cluster: " + cluster.getName()); + } + return producerInstance; + } + + + public static Set<SchedulableEntityInstance> getConsumerInstances(Feed feed, Date feedInstanceTime, + org.apache.falcon.entity.v0.cluster.Cluster cluster) throws FalconException { + + Set<SchedulableEntityInstance> result = new HashSet<>(); + // validate that the feed has this cluster & validate that the instanceTime is a valid instanceTime + validateFeedInstance(feed, feedInstanceTime, cluster); + + Set<Process> consumers = getConsumerProcesses(feed); + for (Process p : consumers) { + Set<Date> consumerInstanceTimes = getConsumerProcessInstanceTimes(feed, feedInstanceTime, p, cluster); + for (Date date : consumerInstanceTimes) { + SchedulableEntityInstance in = new SchedulableEntityInstance(p.getName(), cluster.getName(), date, + EntityType.PROCESS); + in.setTag(SchedulableEntityInstance.INPUT); + result.add(in); + } + } + return result; + } + + + /** + * Returns the consumer processes for a given feed if any, null otherwise. + * + * @param feed input feed + * @return the set of processes which use the given feed as input, empty set if no consumers. + */ + public static Set<Process> getConsumerProcesses(Feed feed) throws FalconException { + Set<Process> result = new HashSet<>(); + EntityList dependencies = EntityUtil.getEntityDependencies(feed); + + for (EntityList.EntityElement e : dependencies.getElements()) { + if (e.tag.contains(EntityList.INPUT_TAG)) { + Process consumer = EntityUtil.getEntity(EntityType.PROCESS, e.name); + result.add(consumer); + } + } + return result; + } + + // return all instances of a process which will consume the given feed instance + private static Set<Date> getConsumerProcessInstanceTimes(Feed feed, Date feedInstancetime, Process consumer, + org.apache.falcon.entity.v0.cluster.Cluster cluster) throws FalconException { + + Set<Date> result = new HashSet<>(); + // find relevant cluster for the process + org.apache.falcon.entity.v0.process.Cluster processCluster = + ProcessHelper.getCluster(consumer, cluster.getName()); + if (processCluster == null) { + throw new IllegalArgumentException("Cluster is not valid for process"); + } + Date processStartDate = processCluster.getValidity().getStart(); + Cluster feedCluster = getCluster(feed, cluster.getName()); + Date feedStartDate = feedCluster.getValidity().getStart(); + + // find all corresponding Inputs as a process may refer same feed multiple times + List<Input> inputFeeds = new ArrayList<>(); + if (consumer.getInputs() != null && consumer.getInputs().getInputs() != null) { + for (Input input : consumer.getInputs().getInputs()) { + if (StringUtils.equals(input.getFeed(), feed.getName())) { + inputFeeds.add(input); + } + } + } + + // for each input corresponding to given feed, find corresponding consumer instances + for (Input in : inputFeeds) { + /* Algorithm for finding a consumer instance for an input feed instance + Step 1. Find one instance which will consume the given feed instance. + a. take process start date and find last input feed instance time. In this step take care of + frequencies being out of sync. + b. using the above find the time difference between the process instance and feed instance. + c. Adding the above time difference to given feed instance for which we want to find the consumer + instances we will get one consumer process instance. + Step 2. Keep checking for next instances of process till they consume the given feed Instance. + Step 3. Similarly check for all previous instances of process till they consume the given feed instance. + */ + + // Step 1.a & 1.b + ExpressionHelper.setReferenceDate(processStartDate); + ExpressionHelper evaluator = ExpressionHelper.get(); + Date startRelative = evaluator.evaluate(in.getStart(), Date.class); + Date startTimeActual = EntityUtil.getNextStartTime(feedStartDate, + feed.getFrequency(), feed.getTimezone(), startRelative); + Long offset = processStartDate.getTime() - startTimeActual.getTime(); + + // Step 1.c + Date processInstanceStartRelative = new Date(feedInstancetime.getTime() + offset); + Date processInstanceStartActual = EntityUtil.getPreviousInstanceTime(processStartDate, + consumer.getFrequency(), consumer.getTimezone(), processInstanceStartRelative); + + + // Step 2. + Date currentInstance = processInstanceStartActual; + while (true) { + Date nextConsumerInstance = EntityUtil.getNextStartTime(processStartDate, + consumer.getFrequency(), consumer.getTimezone(), currentInstance); + + ExpressionHelper.setReferenceDate(nextConsumerInstance); + evaluator = ExpressionHelper.get(); + Long rangeStart = evaluator.evaluate(in.getStart(), Date.class).getTime(); + Long rangeEnd = evaluator.evaluate(in.getEnd(), Date.class).getTime(); + if (rangeStart <= feedInstancetime.getTime() && feedInstancetime.getTime() < rangeEnd) { + result.add(nextConsumerInstance); + } else { + break; + } + currentInstance = new Date(nextConsumerInstance.getTime() + ONE_MS); + } + + // Step 3. + currentInstance = processInstanceStartActual; + while (true) { + Date nextConsumerInstance = EntityUtil.getPreviousInstanceTime(processStartDate, + consumer.getFrequency(), consumer.getTimezone(), currentInstance); + + ExpressionHelper.setReferenceDate(nextConsumerInstance); + evaluator = ExpressionHelper.get(); + Long rangeStart = evaluator.evaluate(in.getStart(), Date.class).getTime(); + Long rangeEnd = evaluator.evaluate(in.getEnd(), Date.class).getTime(); + if (rangeStart <= feedInstancetime.getTime() && feedInstancetime.getTime() < rangeEnd) { + result.add(nextConsumerInstance); + } else { + break; + } + currentInstance = new Date(nextConsumerInstance.getTime() - ONE_MS); + } + } + return result; + } + public static FeedInstanceResult getFeedInstanceListing(Entity entityObject, Date start, Date end) throws FalconException { Set<String> clusters = EntityUtil.getClustersDefinedInColos(entityObject); http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java index 29aefa0..fe78bc8 100644 --- a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java +++ b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java @@ -20,12 +20,20 @@ package org.apache.falcon.entity; import org.apache.commons.lang3.StringUtils; import org.apache.falcon.FalconException; +import org.apache.falcon.entity.store.ConfigurationStore; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.process.Cluster; import org.apache.falcon.entity.v0.process.Input; import org.apache.falcon.entity.v0.process.Output; import org.apache.falcon.entity.v0.process.Process; +import org.apache.falcon.expression.ExpressionHelper; +import org.apache.falcon.resource.SchedulableEntityInstance; + +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Set; /** * Helper methods for accessing process members. @@ -34,6 +42,7 @@ public final class ProcessHelper { private ProcessHelper() {} + public static Cluster getCluster(Process process, String clusterName) { for (Cluster cluster : process.getClusters().getClusters()) { if (cluster.getName().equals(clusterName)) { @@ -77,4 +86,103 @@ public final class ProcessHelper { return storageType; } + + private static void validateProcessInstance(Process process, Date instanceTime, + org.apache.falcon.entity.v0.cluster.Cluster cluster) { + //validate the cluster + Cluster processCluster = getCluster(process, cluster.getName()); + if (processCluster == null) { + throw new IllegalArgumentException("Cluster provided: " + cluster.getName() + + " is not a valid cluster for the process: " + process.getName()); + } + + // check if instanceTime is in validity range + if (instanceTime.before(processCluster.getValidity().getStart()) + || instanceTime.after(processCluster.getValidity().getEnd())) { + throw new IllegalArgumentException("Instance time provided: " + instanceTime + + " is not in validity range of process: " + process.getName() + + "on cluster: " + cluster.getName()); + } + + // check instanceTime is valid on the basis of startTime and frequency + Date nextInstance = EntityUtil.getNextStartTime(processCluster.getValidity().getStart(), + process.getFrequency(), process.getTimezone(), instanceTime); + if (!nextInstance.equals(instanceTime)) { + throw new IllegalArgumentException("Instance time provided: " + instanceTime + + " for process: " + process.getName() + " is not a valid instance time on cluster: " + + cluster.getName() + " on the basis of startDate and frequency"); + } + } + + /** + * Given a process instance, returns the feed instances which are used as input for this process instance. + * + * @param process given process + * @param instanceTime nominal time of the process instance + * @param cluster - cluster for the process instance + * @param allowOptionalFeeds switch to indicate whether optional feeds should be considered in input feeds. + * @return Set of input feed instances which are consumed by the given process instance. + * @throws org.apache.falcon.FalconException + */ + public static Set<SchedulableEntityInstance> getInputFeedInstances(Process process, Date instanceTime, + org.apache.falcon.entity.v0.cluster.Cluster cluster, boolean allowOptionalFeeds) throws FalconException { + + // validate the inputs + validateProcessInstance(process, instanceTime, cluster); + + Set<SchedulableEntityInstance> result = new HashSet<>(); + if (process.getInputs() != null) { + ConfigurationStore store = ConfigurationStore.get(); + for (Input i : process.getInputs().getInputs()) { + if (i.isOptional() && !allowOptionalFeeds) { + continue; + } + Feed feed = store.get(EntityType.FEED, i.getFeed()); + // inputStart is process instance time + (now - startTime) + ExpressionHelper evaluator = ExpressionHelper.get(); + ExpressionHelper.setReferenceDate(instanceTime); + Date inputInstanceStartDate = evaluator.evaluate(i.getStart(), Date.class); + Date inputInstanceEndDate = evaluator.evaluate(i.getEnd(), Date.class); + List<Date> instanceTimes = EntityUtil.getEntityInstanceTimes(feed, cluster.getName(), + inputInstanceStartDate, inputInstanceEndDate); + SchedulableEntityInstance instance; + for (Date time : instanceTimes) { + instance = new SchedulableEntityInstance(feed.getName(), cluster.getName(), time, EntityType.FEED); + instance.setTag(SchedulableEntityInstance.INPUT); + result.add(instance); + } + } + } + return result; + } + + public static Set<SchedulableEntityInstance> getOutputFeedInstances(Process process, Date instanceTime, + org.apache.falcon.entity.v0.cluster.Cluster cluster) throws FalconException { + Set<SchedulableEntityInstance> result = new HashSet<>(); + + // validate the inputs + validateProcessInstance(process, instanceTime, cluster); + + if (process.getOutputs() != null && process.getOutputs().getOutputs() != null) { + + ExpressionHelper.setReferenceDate(instanceTime); + ExpressionHelper evaluator = ExpressionHelper.get(); + SchedulableEntityInstance candidate; + ConfigurationStore store = ConfigurationStore.get(); + for (Output output : process.getOutputs().getOutputs()) { + + Date outputInstance = evaluator.evaluate(output.getInstance(), Date.class); + // find the feed + Feed feed = store.get(EntityType.FEED, output.getFeed()); + org.apache.falcon.entity.v0.feed.Cluster fCluster = FeedHelper.getCluster(feed, cluster.getName()); + outputInstance = EntityUtil.getNextStartTime(fCluster.getValidity().getStart(), feed.getFrequency(), + feed.getTimezone(), outputInstance); + candidate = new SchedulableEntityInstance(output.getFeed(), cluster.getName(), outputInstance, + EntityType.FEED); + candidate.setTag(SchedulableEntityInstance.OUTPUT); + result.add(candidate); + } + } + return result; + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java index 266d029..f70edfb 100644 --- a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java +++ b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java @@ -18,24 +18,58 @@ package org.apache.falcon.entity; +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.store.ConfigurationStore; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.Frequency; import org.apache.falcon.entity.v0.SchemaHelper; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.entity.v0.cluster.Properties; import org.apache.falcon.entity.v0.cluster.Property; -import org.apache.falcon.entity.v0.feed.*; +import org.apache.falcon.entity.v0.feed.Clusters; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.entity.v0.feed.Location; +import org.apache.falcon.entity.v0.feed.LocationType; +import org.apache.falcon.entity.v0.feed.Locations; +import org.apache.falcon.entity.v0.feed.Validity; +import org.apache.falcon.entity.v0.process.Input; +import org.apache.falcon.entity.v0.process.Inputs; +import org.apache.falcon.entity.v0.process.Output; +import org.apache.falcon.entity.v0.process.Outputs; +import org.apache.falcon.entity.v0.process.Process; +import org.apache.falcon.resource.SchedulableEntityInstance; import org.apache.hadoop.fs.Path; import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import java.text.DateFormat; +import java.text.ParseException; +import java.text.SimpleDateFormat; import java.util.Date; +import java.util.HashSet; +import java.util.Set; import java.util.TimeZone; /** * Test for feed helper methods. */ -public class FeedHelperTest { - public static final TimeZone UTC = TimeZone.getTimeZone("UTC"); +public class FeedHelperTest extends AbstractTestBase { + private static final TimeZone UTC = TimeZone.getTimeZone("UTC"); + private ConfigurationStore store; + + @BeforeClass + public void init() throws Exception { + initConfigStore(); + } + + @BeforeMethod + public void setUp() throws Exception { + cleanupStore(); + store = getStore(); + } @Test public void testPartitionExpression() { @@ -107,4 +141,334 @@ public class FeedHelperTest { locations.getLocations()); Assert.assertEquals(FeedHelper.getLocation(feed, cluster, LocationType.DATA), location2); } + + @Test + public void testGetProducerProcessWithOffset() throws FalconException, ParseException { + //create a feed, submit it, test that ProducerProcess is null + + Cluster cluster = publishCluster(); + Feed feed = publishFeed(cluster, "days(1)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); + Assert.assertNull(FeedHelper.getProducerProcess(feed)); + + // create it's producer process submit it, test it's ProducerProcess + Process process = prepareProcess(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); + Outputs outputs = new Outputs(); + Output outFeed = new Output(); + outFeed.setName("outputFeed"); + outFeed.setFeed(feed.getName()); + outFeed.setInstance("today(0,0)"); + outputs.getOutputs().add(outFeed); + process.setOutputs(outputs); + store.publish(EntityType.PROCESS, process); + + Assert.assertEquals(FeedHelper.getProducerProcess(feed).getName(), process.getName()); + SchedulableEntityInstance result = FeedHelper.getProducerInstance(feed, getDate("2013-02-27 10:00 UTC"), + cluster); + SchedulableEntityInstance expected = new SchedulableEntityInstance(process.getName(), cluster.getName(), + getDate("2013-02-28 10:00 UTC"), EntityType.PROCESS); + expected.setTag(SchedulableEntityInstance.OUTPUT); + Assert.assertEquals(result, expected); + } + + @Test + public void testGetProducerProcessForNow() throws FalconException, ParseException { + Cluster cluster = publishCluster(); + Feed feed = publishFeed(cluster, "days(1)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); + Assert.assertNull(FeedHelper.getProducerProcess(feed)); + + // create it's producer process submit it, test it's ProducerProcess + Process process = prepareProcess(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); + Outputs outputs = new Outputs(); + Output outFeed = new Output(); + outFeed.setName("outputFeed"); + outFeed.setFeed(feed.getName()); + outFeed.setInstance("now(0,0)"); + outputs.getOutputs().add(outFeed); + process.setOutputs(outputs); + store.publish(EntityType.PROCESS, process); + + Assert.assertEquals(FeedHelper.getProducerProcess(feed).getName(), process.getName()); + SchedulableEntityInstance result = FeedHelper.getProducerInstance(feed, getDate("2013-02-28 10:00 UTC"), + cluster); + SchedulableEntityInstance expected = new SchedulableEntityInstance(process.getName(), cluster.getName(), + getDate("2013-02-28 10:00 UTC"), EntityType.PROCESS); + expected.setTag(SchedulableEntityInstance.OUTPUT); + Assert.assertEquals(result, expected); + } + + @Test + public void testGetProducerWithNowNegativeOffset() throws FalconException, ParseException { + Cluster cluster = publishCluster(); + Feed feed = publishFeed(cluster, "days(1)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); + Assert.assertNull(FeedHelper.getProducerProcess(feed)); + + // create it's producer process submit it, test it's ProducerProcess + Process process = prepareProcess(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); + Outputs outputs = new Outputs(); + Output outFeed = new Output(); + outFeed.setName("outputFeed"); + outFeed.setFeed(feed.getName()); + outFeed.setInstance("now(-4,0)"); + outputs.getOutputs().add(outFeed); + process.setOutputs(outputs); + store.publish(EntityType.PROCESS, process); + + Assert.assertEquals(FeedHelper.getProducerProcess(feed).getName(), process.getName()); + SchedulableEntityInstance result = FeedHelper.getProducerInstance(feed, getDate("2013-02-27 10:00 UTC"), + cluster); + SchedulableEntityInstance expected = new SchedulableEntityInstance(process.getName(), cluster.getName(), + getDate("2013-02-28 10:00 UTC"), EntityType.PROCESS); + expected.setTag(SchedulableEntityInstance.OUTPUT); + Assert.assertEquals(result, expected); + } + + + @Test + public void testGetProducerWithNowPositiveOffset() throws FalconException, ParseException { + Cluster cluster = publishCluster(); + Feed feed = publishFeed(cluster, "days(1)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); + Assert.assertNull(FeedHelper.getProducerProcess(feed)); + + // create it's producer process submit it, test it's ProducerProcess + Process process = prepareProcess(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); + Outputs outputs = new Outputs(); + Output outFeed = new Output(); + outFeed.setName("outputFeed"); + outFeed.setFeed(feed.getName()); + outFeed.setInstance("now(4,0)"); + outputs.getOutputs().add(outFeed); + process.setOutputs(outputs); + store.publish(EntityType.PROCESS, process); + + Assert.assertEquals(FeedHelper.getProducerProcess(feed).getName(), process.getName()); + SchedulableEntityInstance result = FeedHelper.getProducerInstance(feed, getDate("2013-02-28 10:00 UTC"), + cluster); + SchedulableEntityInstance expected = new SchedulableEntityInstance(process.getName(), cluster.getName(), + getDate("2013-02-28 10:00 UTC"), EntityType.PROCESS); + expected.setTag(SchedulableEntityInstance.OUTPUT); + Assert.assertEquals(result, expected); + } + + @Test + public void testGetProducerProcessInstance() throws FalconException, ParseException { + //create a feed, submit it, test that ProducerProcess is null + Cluster cluster = publishCluster(); + Feed feed = publishFeed(cluster, "days(1)", "2011-02-28 00:00 UTC", "2016-02-28 10:00 UTC"); + + // create it's producer process submit it, test it's ProducerProcess + Process process = prepareProcess(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); + Outputs outputs = new Outputs(); + Output outFeed = new Output(); + outFeed.setName("outputFeed"); + outFeed.setFeed(feed.getName()); + outFeed.setInstance("today(0,0)"); + outputs.getOutputs().add(outFeed); + process.setOutputs(outputs); + store.publish(EntityType.PROCESS, process); + Assert.assertEquals(FeedHelper.getProducerProcess(feed).getName(), process.getName()); + SchedulableEntityInstance result = FeedHelper.getProducerInstance(feed, getDate("2013-02-28 00:00 UTC"), + cluster); + SchedulableEntityInstance expected = new SchedulableEntityInstance(process.getName(), cluster.getName(), + getDate("2013-02-28 10:00 UTC"), EntityType.PROCESS); + expected.setTag(SchedulableEntityInstance.OUTPUT); + Assert.assertEquals(result, expected); + } + + @Test + public void testGetConsumerProcesses() throws FalconException, ParseException { + //create a feed, submit it, test that ConsumerProcesses is blank list + Cluster cluster = publishCluster(); + Feed feed = publishFeed(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); + + //create a consumer Process and submit it, assert that this is returned in ConsumerProcesses + Process process = prepareProcess(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); + Inputs inputs = new Inputs(); + Input inFeed = new Input(); + inFeed.setName("outputFeed"); + inFeed.setFeed(feed.getName()); + inFeed.setStart("today(0,0)"); + inFeed.setEnd("today(0,0)"); + inputs.getInputs().add(inFeed); + process.setInputs(inputs); + store.publish(EntityType.PROCESS, process); + + Set<Process> result = FeedHelper.getConsumerProcesses(feed); + Assert.assertEquals(result.size(), 1); + Assert.assertTrue(result.contains(process)); + } + + @Test + public void testGetConsumerProcessInstances() throws Exception { + //create a feed, submit it, test that ConsumerProcesses is blank list + Cluster cluster = publishCluster(); + Feed feed = publishFeed(cluster, "hours(1)", "2012-02-28 00:00 UTC", "2016-02-28 00:00 UTC"); + + //create a consumer Process and submit it, assert that this is returned in ConsumerProcesses + Process process = prepareProcess(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); + Inputs inputs = new Inputs(); + Input inFeed = new Input(); + inFeed.setName("inputFeed"); + inFeed.setFeed(feed.getName()); + inFeed.setStart("now(-4, 30)"); + inFeed.setEnd("now(4, 30)"); + inputs.getInputs().add(inFeed); + process.setInputs(inputs); + store.publish(EntityType.PROCESS, process); + + Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed, + getDate("2012-02-28 09:00 UTC"), cluster); + Assert.assertEquals(result.size(), 1); + + Set<SchedulableEntityInstance> expected = new HashSet<>(); + SchedulableEntityInstance ins = new SchedulableEntityInstance(process.getName(), cluster.getName(), + getDate("2012-02-28 10:00 UTC"), EntityType.PROCESS); + ins.setTag(SchedulableEntityInstance.INPUT); + expected.add(ins); + Assert.assertEquals(result, expected); + + } + + @Test + public void testGetMultipleConsumerInstances() throws Exception { + Cluster cluster = publishCluster(); + Feed feed = publishFeed(cluster, "hours(1)", "2012-02-28 00:00 UTC", "2016-02-28 00:00 UTC"); + + //create a consumer Process and submit it, assert that this is returned in ConsumerProcesses + Process process = prepareProcess(cluster, "hours(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); + Inputs inputs = new Inputs(); + Input inFeed = new Input(); + inFeed.setName("inputFeed"); + inFeed.setFeed(feed.getName()); + inFeed.setStart("now(-4, 30)"); + inFeed.setEnd("now(4, 30)"); + inputs.getInputs().add(inFeed); + process.setInputs(inputs); + store.publish(EntityType.PROCESS, process); + + Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed, + getDate("2012-02-28 09:00 UTC"), cluster); + Assert.assertEquals(result.size(), 8); + + Set<SchedulableEntityInstance> expected = new HashSet<>(); + String[] consumers = { "2012-02-28 05:00 UTC", "2012-02-28 06:00 UTC", "2012-02-28 07:00 UTC", + "2012-02-28 08:00 UTC", "2012-02-28 09:00 UTC", "2012-02-28 10:00 UTC", "2012-02-28 11:00 UTC", + "2012-02-28 12:00 UTC", }; + for (String d : consumers) { + SchedulableEntityInstance i = new SchedulableEntityInstance(process.getName(), cluster.getName(), + getDate(d), EntityType.PROCESS); + i.setTag(SchedulableEntityInstance.INPUT); + expected.add(i); + } + Assert.assertEquals(result, expected); + } + + @Test + public void testGetConsumerWithNow() throws Exception { + Cluster cluster = publishCluster(); + Feed feed = publishFeed(cluster, "hours(1)", "2012-02-28 00:00 UTC", "2016-02-28 00:00 UTC"); + + //create a consumer Process and submit it, assert that this is returned in ConsumerProcesses + Process process = prepareProcess(cluster, "hours(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); + Inputs inputs = new Inputs(); + Input inFeed = new Input(); + inFeed.setName("inputFeed"); + inFeed.setFeed(feed.getName()); + inFeed.setStart("today(0, 0)"); + inFeed.setEnd("now(0, 0)"); + inputs.getInputs().add(inFeed); + process.setInputs(inputs); + store.publish(EntityType.PROCESS, process); + + Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed, + getDate("2012-02-28 00:00 UTC"), cluster); + Assert.assertEquals(result.size(), 23); + } + + @Test + public void testGetConsumerWithLatest() throws Exception { + Cluster cluster = publishCluster(); + Feed feed = publishFeed(cluster, "hours(1)", "2012-02-28 00:00 UTC", "2016-02-28 00:00 UTC"); + + //create a consumer Process and submit it, assert that this is returned in ConsumerProcesses + Process process = prepareProcess(cluster, "hours(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); + Inputs inputs = new Inputs(); + Input inFeed = new Input(); + inFeed.setName("inputFeed"); + inFeed.setFeed(feed.getName()); + inFeed.setStart("today(0, 0)"); + inFeed.setEnd("latest(0)"); + inputs.getInputs().add(inFeed); + process.setInputs(inputs); + store.publish(EntityType.PROCESS, process); + + Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed, + getDate("2012-02-28 00:00 UTC"), cluster); + System.out.println("result.size() = " + result.size()); + Assert.assertEquals(result.size(), 23); + } + + private Validity getFeedValidity(String start, String end) throws ParseException { + Validity validity = new Validity(); + validity.setStart(getDate(start)); + validity.setEnd(getDate(end)); + return validity; + } + + private org.apache.falcon.entity.v0.process.Validity getProcessValidity(String start, String end) throws + ParseException { + + org.apache.falcon.entity.v0.process.Validity validity = new org.apache.falcon.entity.v0.process.Validity(); + validity.setStart(getDate(start)); + validity.setEnd(getDate(end)); + return validity; + } + + private Date getDate(String dateString) throws ParseException { + DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm Z"); + return format.parse(dateString); + } + + private Cluster publishCluster() throws FalconException { + Cluster cluster = new Cluster(); + cluster.setName("feedCluster"); + cluster.setColo("colo"); + store.publish(EntityType.CLUSTER, cluster); + return cluster; + + } + + private Feed publishFeed(Cluster cluster, String frequency, String start, String end) + throws FalconException, ParseException { + + Feed feed = new Feed(); + feed.setName("feed"); + Frequency f = new Frequency(frequency); + feed.setFrequency(f); + feed.setTimezone(UTC); + Clusters fClusters = new Clusters(); + org.apache.falcon.entity.v0.feed.Cluster fCluster = new org.apache.falcon.entity.v0.feed.Cluster(); + fCluster.setName(cluster.getName()); + fCluster.setValidity(getFeedValidity(start, end)); + fClusters.getClusters().add(fCluster); + feed.setClusters(fClusters); + store.publish(EntityType.FEED, feed); + + return feed; + } + + private Process prepareProcess(Cluster cluster, String frequency, String start, String end) throws ParseException { + Process process = new Process(); + process.setName("process"); + process.setTimezone(UTC); + org.apache.falcon.entity.v0.process.Clusters pClusters = new org.apache.falcon.entity.v0.process.Clusters(); + org.apache.falcon.entity.v0.process.Cluster pCluster = new org.apache.falcon.entity.v0.process.Cluster(); + pCluster.setName(cluster.getName()); + org.apache.falcon.entity.v0.process.Validity validity = getProcessValidity(start, end); + pCluster.setValidity(validity); + pClusters.getClusters().add(pCluster); + process.setClusters(pClusters); + Frequency f = new Frequency(frequency); + process.setFrequency(f); + return process; + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/common/src/test/java/org/apache/falcon/entity/ProcessHelperTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/ProcessHelperTest.java b/common/src/test/java/org/apache/falcon/entity/ProcessHelperTest.java new file mode 100644 index 0000000..0d396ae --- /dev/null +++ b/common/src/test/java/org/apache/falcon/entity/ProcessHelperTest.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.entity; + +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.store.ConfigurationStore; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.Frequency; +import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.feed.Clusters; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.entity.v0.process.Input; +import org.apache.falcon.entity.v0.process.Inputs; +import org.apache.falcon.entity.v0.process.Output; +import org.apache.falcon.entity.v0.process.Outputs; +import org.apache.falcon.entity.v0.process.Process; +import org.apache.falcon.resource.SchedulableEntityInstance; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashSet; +import java.util.Set; +import java.util.TimeZone; + + +/** + * Tests for ProcessHelper methods. + */ +public class ProcessHelperTest extends AbstractTestBase { + private static final TimeZone UTC = TimeZone.getTimeZone("UTC"); + private ConfigurationStore store; + + @BeforeClass + public void init() throws Exception { + initConfigStore(); + } + + @BeforeMethod + public void setUp() throws Exception { + cleanupStore(); + store = ConfigurationStore.get(); + } + + @Test + public void testGetInputFeedInstances() throws FalconException, ParseException { + // create a process with input feeds + Cluster cluster = publishCluster(); + Feed feed = publishFeed(cluster, "hours(1)", "2012-02-27 10:00 UTC", "2016-02-28 10:00 UTC"); + + // find the input Feed instances time + Process process = prepareProcess(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); + Inputs inputs = new Inputs(); + Input input = getInput("inputFeed", feed.getName(), "today(0,-30)", "today(2,30)", false); + inputs.getInputs().add(input); + process.setInputs(inputs); + store.publish(EntityType.PROCESS, process); + + Date processInstanceDate = getDate("2012-02-28 10:00 UTC"); + Set<SchedulableEntityInstance> inputFeedInstances = ProcessHelper.getInputFeedInstances(process, + processInstanceDate, cluster, false); + Assert.assertEquals(inputFeedInstances.size(), 3); + + Set<SchedulableEntityInstance> expectedInputFeedInstances = new HashSet<>(); + SchedulableEntityInstance instance = new SchedulableEntityInstance(feed.getName(), cluster.getName(), + getDate("2012-02-28 00:00 UTC"), EntityType.FEED); + instance.setTag(SchedulableEntityInstance.INPUT); + expectedInputFeedInstances.add(instance); + instance = new SchedulableEntityInstance(feed.getName(), cluster.getName(), getDate("2012-02-28 01:00 UTC"), + EntityType.FEED); + instance.setTag(SchedulableEntityInstance.INPUT); + expectedInputFeedInstances.add(instance); + instance = new SchedulableEntityInstance(feed.getName(), cluster.getName(), getDate("2012-02-28 02:00 UTC"), + EntityType.FEED); + instance.setTag(SchedulableEntityInstance.INPUT); + expectedInputFeedInstances.add(instance); + + //Validate with expected result + Assert.assertTrue(inputFeedInstances.equals(expectedInputFeedInstances)); + } + + @Test + public void testGetOutputFeedInstances() throws FalconException, ParseException { + // create a process with input feeds + Cluster cluster = publishCluster(); + Feed feed = publishFeed(cluster, "days(1)", "2012-02-27 11:00 UTC", "2016-02-28 11:00 UTC"); + Process process = prepareProcess(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); + Outputs outputs = new Outputs(); + outputs.getOutputs().add(getOutput("outputFeed", feed.getName(), "now(0,0)")); + process.setOutputs(outputs); + store.publish(EntityType.PROCESS, process); + + Set<SchedulableEntityInstance> result = ProcessHelper.getOutputFeedInstances(process, + getDate("2012-02-28 10:00 UTC"), cluster); + + Set<SchedulableEntityInstance> expected = new HashSet<>(); + SchedulableEntityInstance ins = new SchedulableEntityInstance(feed.getName(), cluster.getName(), + getDate("2012-02-28 11:00 UTC"), EntityType.FEED); + ins.setTag(SchedulableEntityInstance.OUTPUT); + expected.add(ins); + + Assert.assertEquals(result, expected); + + } + + private org.apache.falcon.entity.v0.process.Validity getProcessValidity(String start, String end) throws + ParseException { + + org.apache.falcon.entity.v0.process.Validity validity = new org.apache.falcon.entity.v0.process.Validity(); + validity.setStart(getDate(start)); + validity.setEnd(getDate(end)); + return validity; + } + + private Date getDate(String dateString) throws ParseException { + return new SimpleDateFormat("yyyy-MM-dd HH:mm Z").parse(dateString); + } + + private org.apache.falcon.entity.v0.feed.Validity getFeedValidity(String start, String end) throws ParseException { + org.apache.falcon.entity.v0.feed.Validity validity = new org.apache.falcon.entity.v0.feed.Validity(); + validity.setStart(getDate(start)); + validity.setEnd(getDate(end)); + return validity; + } + + private Input getInput(String name, String feedName, String start, String end, boolean isOptional) { + Input inFeed = new Input(); + inFeed.setName(name); + inFeed.setFeed(feedName); + inFeed.setStart(start); + inFeed.setEnd(end); + inFeed.setOptional(isOptional); + return inFeed; + } + + private Output getOutput(String name, String feedName, String instance) { + Output output = new Output(); + output.setInstance(instance); + output.setFeed(feedName); + output.setName(name); + return output; + } + + private Cluster publishCluster() throws FalconException { + Cluster cluster = new Cluster(); + cluster.setName("feedCluster"); + cluster.setColo("colo"); + store.publish(EntityType.CLUSTER, cluster); + return cluster; + + } + + private Feed publishFeed(Cluster cluster, String frequency, String start, String end) + throws FalconException, ParseException { + + Feed feed = new Feed(); + feed.setName("feed"); + Frequency f = new Frequency(frequency); + feed.setFrequency(f); + feed.setTimezone(UTC); + Clusters fClusters = new Clusters(); + org.apache.falcon.entity.v0.feed.Cluster fCluster = new org.apache.falcon.entity.v0.feed.Cluster(); + fCluster.setName(cluster.getName()); + fCluster.setValidity(getFeedValidity(start, end)); + fClusters.getClusters().add(fCluster); + feed.setClusters(fClusters); + store.publish(EntityType.FEED, feed); + + return feed; + } + + private Process prepareProcess(Cluster cluster, String frequency, String start, String end) throws ParseException { + Process process = new Process(); + process.setName("process"); + process.setTimezone(UTC); + org.apache.falcon.entity.v0.process.Clusters pClusters = new org.apache.falcon.entity.v0.process.Clusters(); + org.apache.falcon.entity.v0.process.Cluster pCluster = new org.apache.falcon.entity.v0.process.Cluster(); + pCluster.setName(cluster.getName()); + org.apache.falcon.entity.v0.process.Validity validity = getProcessValidity(start, end); + pCluster.setValidity(validity); + pClusters.getClusters().add(pCluster); + process.setClusters(pClusters); + Frequency f = new Frequency(frequency); + process.setFrequency(f); + return process; + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/docs/src/site/twiki/FalconCLI.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/FalconCLI.twiki b/docs/src/site/twiki/FalconCLI.twiki index e447915..50dce84 100644 --- a/docs/src/site/twiki/FalconCLI.twiki +++ b/docs/src/site/twiki/FalconCLI.twiki @@ -280,6 +280,41 @@ Usage: $FALCON_HOME/bin/falcon instance -type <<feed/process>> -name <<name>> -params -start "yyyy-MM-dd'T'HH:mm'Z'" + +---+++Dependency +Display the dependent instances which are dependent on the given instance. For example for a given process instance it will +list all the input feed instances(if any) and the output feed instances(if any). + +An example use case of this command is as follows: +Suppose you find out that the data in a feed instance was incorrect and you need to figure out which all process instances +consumed this feed instance so that you can reprocess them after correcting the feed instance. You can give the feed instance +and it will tell you which process instance produced this feed and which all process instances consumed this feed. + +NOTE: +1. instanceTime must be a valid instanceTime e.g. instanceTime of a feed should be in it's validity range on applicable clusters, + and it should be in the range of instances produced by the producer process(if any) + +2. For processes with inputs like latest() which vary with time the results are not guaranteed to be correct. + +Usage: +$FALCON_HOME/bin/falcon instance -type <<feed/process>> -name <<name>> -params -instanceTime "yyyy-MM-dd'T'HH:mm'Z'" + +For example: +$FALCON_HOME/bin/falcon instance -dependency -type feed -name out -instanceTime 2014-12-15T00:00Z +name: producer, type: PROCESS, cluster: local, instanceTime: 2014-12-15T00:00Z, tag: Output +name: consumer, type: PROCESS, cluster: local, instanceTime: 2014-12-15T00:03Z, tag: Input +name: consumer, type: PROCESS, cluster: local, instanceTime: 2014-12-15T00:04Z, tag: Input +name: consumer, type: PROCESS, cluster: local, instanceTime: 2014-12-15T00:02Z, tag: Input +name: consumer, type: PROCESS, cluster: local, instanceTime: 2014-12-15T00:05Z, tag: Input + + +Response: default/Success! + +Request Id: default/1125035965@qtp-503156953-7 - 447be0ad-1d38-4dce-b438-20f3de69b172 + + +<a href="./Restapi/InstanceDependency.html">Optional params described here.</a> + ---++ Metadata Lineage Options ---+++Lineage http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/docs/src/site/twiki/restapi/InstanceDependency.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/restapi/InstanceDependency.twiki b/docs/src/site/twiki/restapi/InstanceDependency.twiki new file mode 100644 index 0000000..dc452de --- /dev/null +++ b/docs/src/site/twiki/restapi/InstanceDependency.twiki @@ -0,0 +1,49 @@ +---++ GET /api/instance/dependency/:entity-type/:entity-name + * <a href="#Description">Description</a> + * <a href="#Parameters">Parameters</a> + * <a href="#Results">Results</a> + * <a href="#Examples">Examples</a> + +---++ Description +Get dependent instances for a particular instance. + +---++ Parameters + * :entity-type Valid options are feed or process. + * :entity-name Name of the entity + * instanceTime <mandatory param> time of the given instance + * colo <optional param> name of the colo + + +---++ Results +Dependent instances for the specified instance + +---++ Examples +---+++ Rest Call +<verbatim> +GET http://localhost:15000/api/instance/dependency/feed/myFeed?colo=*&instanceTime=2012-04-03T07:00Z +</verbatim> +---+++ Result +<verbatim> +{ + 'status': 'SUCCEEDED', + 'message': 'default/Success!\n', + 'dependencies': [ + { + 'cluster': 'local', + 'entityName': 'consumer-process', + 'entityType': 'PROCESS', + 'instanceTime': '2014-12-18T00:00Z', + 'tags': 'Input' + }, + { + 'cluster': 'local', + 'entityName': 'producer-process', + 'entityType': 'PROCESS', + 'instanceTime': '2014-12-18T00:00Z', + 'tags': 'Output' + } + ], + 'requestId': 'default/1405883107@qtp-1501726962-6-0c2e690f-546b-47b0-a5ee-0365d4522a31\n' +} +</verbatim> + http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/docs/src/site/twiki/restapi/ResourceList.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/restapi/ResourceList.twiki b/docs/src/site/twiki/restapi/ResourceList.twiki index 060e0af..49dddb7 100644 --- a/docs/src/site/twiki/restapi/ResourceList.twiki +++ b/docs/src/site/twiki/restapi/ResourceList.twiki @@ -67,6 +67,7 @@ See also: [[../Security.twiki][Security in Falcon]] | POST | [[InstanceRerun][api/instance/rerun/:entity-type/:entity-name]] | Rerun a given instance | | GET | [[InstanceLogs][api/instance/logs/:entity-type/:entity-name]] | Get logs of a given instance | | GET | [[InstanceSummary][api/instance/summary/:entity-type/:entity-name]] | Return summary of instances for an entity | +| GET | [[InstanceDependency][api/instance/dependency/:entity-type/:entity-name]] | Return dependent instances for a given instance | ---++ REST Call on Metadata Lineage Resource
