Repository: incubator-falcon Updated Branches: refs/heads/master 7cfc6002f -> 9f722b3d0
FALCON-762 Support feed listing for file system storage. Contributed by Srikanth Sundarrajan Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/9f722b3d Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/9f722b3d Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/9f722b3d Branch: refs/heads/master Commit: 9f722b3d01fe118951ed4f7c87b16c495a3fb45e Parents: 7cfc600 Author: srikanth.sundarrajan <srikanth.sundarra...@inmobi.com> Authored: Wed Oct 15 13:54:33 2014 +0530 Committer: srikanth.sundarrajan <srikanth.sundarra...@inmobi.com> Committed: Wed Oct 15 13:54:33 2014 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../java/org/apache/falcon/cli/FalconCLI.java | 11 +- .../org/apache/falcon/client/FalconClient.java | 61 ++++++- .../org/apache/falcon/resource/APIResult.java | 7 + .../falcon/resource/FeedInstanceResult.java | 128 +++++++++++++++ .../apache/falcon/resource/InstancesResult.java | 28 ++-- .../falcon/resource/InstancesSummaryResult.java | 28 ++-- .../apache/falcon/entity/CatalogStorage.java | 7 + .../org/apache/falcon/entity/ClusterHelper.java | 7 + .../org/apache/falcon/entity/FeedHelper.java | 140 +++++++++++++++- .../falcon/entity/FeedInstanceStatus.java | 137 ++++++++++++++++ .../apache/falcon/entity/FileSystemStorage.java | 90 +++++++++++ .../java/org/apache/falcon/entity/Storage.java | 10 ++ .../falcon/expression/ExpressionHelper.java | 13 ++ .../falcon/entity/FileSystemStorageTest.java | 142 +++++++++++++++++ docs/src/site/twiki/FalconCLI.twiki | 12 ++ .../twiki/restapi/FeedInstanceListing.twiki | 45 ++++++ .../workflow/engine/OozieWorkflowEngine.java | 9 +- .../falcon/resource/AbstractEntityManager.java | 36 +++++ .../resource/AbstractInstanceManager.java | 32 +++- .../resource/proxy/InstanceManagerProxy.java | 159 ++++++------------- .../proxy/SchedulableEntityManagerProxy.java | 34 +--- .../apache/falcon/resource/InstanceManager.java | 15 +- .../java/org/apache/falcon/cli/FalconCLIIT.java | 5 + 24 files changed, 983 insertions(+), 175 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 46c58fc..71a2278 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -35,6 +35,8 @@ Trunk (Unreleased) FALCON-263 API to get workflow parameters. (pavan kumar kolamuri via Shwetha GS) IMPROVEMENTS + FALCON-762 Support feed listing for file system storage (Srikanth Sundarrajan) + FALCON-20 Remove dependency on custom InMobi DistCp (Sowmya Ramesh via Venkatesh Seetharam) http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/client/src/main/java/org/apache/falcon/cli/FalconCLI.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java index f7229ec..9cf6339 100644 --- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java +++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java @@ -108,6 +108,7 @@ public class FalconCLI { public static final String CLIENT_PROPERTIES = "/client.properties"; public static final String LIFECYCLE_OPT = "lifecycle"; public static final String PARARMS_OPT = "params"; + public static final String LISTING_OPT = "listing"; // Graph Commands public static final String GRAPH_CMD = "graph"; @@ -285,7 +286,9 @@ public class FalconCLI { filterBy, orderBy, sortOrder, offset, numResults); } else if (optionsList.contains(PARARMS_OPT)) { // start time is the nominal time of instance - result = client.getParamsOfInstance(type, entity, start, colo, clusters, sourceClusters, lifeCycles); + result = client.getParamsOfInstance(type, entity, start, colo, lifeCycles); + } else if (optionsList.contains(LISTING_OPT)) { + result = client.getFeedListing(type, entity, start, end, colo); } else { throw new FalconCLIException("Invalid command"); } @@ -728,6 +731,11 @@ public class FalconCLI { false, "Displays the workflow parameters for a given instance of specified nominal time"); + Option listing = new Option( + LISTING_OPT, + false, + "Displays feed listing and their status between a start and end time range."); + OptionGroup group = new OptionGroup(); group.addOption(running); group.addOption(list); @@ -741,6 +749,7 @@ public class FalconCLI { group.addOption(logs); group.addOption(continues); group.addOption(params); + group.addOption(listing); Option url = new Option(URL_OPTION, true, "Falcon URL"); Option start = new Option(START_OPT, true, http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/client/src/main/java/org/apache/falcon/client/FalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java index 7e46f28..576ca09 100644 --- a/client/src/main/java/org/apache/falcon/client/FalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java @@ -35,6 +35,7 @@ import org.apache.falcon.recipe.RecipeToolArgs; import org.apache.falcon.resource.APIResult; import org.apache.falcon.resource.EntityList; import org.apache.falcon.resource.EntitySummaryResult; +import org.apache.falcon.resource.FeedInstanceResult; import org.apache.falcon.resource.InstancesResult; import org.apache.falcon.resource.InstancesSummaryResult; import org.apache.hadoop.security.authentication.client.AuthenticatedURL; @@ -235,7 +236,8 @@ public class FalconClient { RERUN("api/instance/rerun/", HttpMethod.POST, MediaType.APPLICATION_JSON), LOG("api/instance/logs/", HttpMethod.GET, MediaType.APPLICATION_JSON), SUMMARY("api/instance/summary/", HttpMethod.GET, MediaType.APPLICATION_JSON), - PARAMS("api/instance/params/", HttpMethod.GET, MediaType.APPLICATION_JSON); + PARAMS("api/instance/params/", HttpMethod.GET, MediaType.APPLICATION_JSON), + LISTING("api/instance/listing/", HttpMethod.GET, MediaType.APPLICATION_JSON); private String path; private String method; @@ -406,6 +408,13 @@ public class FalconClient { null, null, colo, lifeCycles); } + public String getFeedListing(String type, String entity, String start, + String end, String colo) + throws FalconCLIException { + + return sendInstanceRequest(Instances.LISTING, type, entity, start, end, null, null, colo, null); + } + public String killInstances(String type, String entity, String start, String end, String colo, String clusters, String sourceClusters, List<LifeCycle> lifeCycles) @@ -479,7 +488,6 @@ public class FalconClient { public String getParamsOfInstance(String type, String entity, String start, String colo, - String clusters, String sourceClusters, List<LifeCycle> lifeCycles) throws FalconCLIException, UnsupportedEncodingException { @@ -729,14 +737,16 @@ public class FalconClient { } checkIfSuccessful(clientResponse); - if (instances.name().equals("LOG")) { + switch (instances) { + case LOG: return parseProcessInstanceResultLogs(clientResponse, runid); - } else if (instances.name().equals("SUMMARY")) { + case SUMMARY: return summarizeProcessInstanceResult(clientResponse); - } else { + case LISTING: + return parseFeedInstanceResult(clientResponse); + default: return parseProcessInstanceResult(clientResponse); } - } //RESUME CHECKSTYLE CHECK VisibilityModifierCheck @@ -1006,6 +1016,45 @@ public class FalconClient { return sb.toString(); } + private String parseFeedInstanceResult(ClientResponse clientResponse) { + FeedInstanceResult result = clientResponse.getEntity(FeedInstanceResult.class); + StringBuilder sb = new StringBuilder(); + String toAppend; + + sb.append("Consolidated Status: ").append(result.getStatus()).append("\n"); + + sb.append("\nInstances:\n"); + sb.append("Cluster\t\tInstance\t\tStatus\t\tSize\t\tCreationTime\t\tDetails\n"); + sb.append("-----------------------------------------------------------------------------------------------\n"); + if (result.getInstances() != null) { + for (FeedInstanceResult.Instance instance : result.getInstances()) { + + toAppend = instance.getCluster() != null ? instance.getCluster() : "-"; + sb.append(toAppend).append("\t"); + + toAppend = instance.getInstance() != null ? instance.getInstance() : "-"; + sb.append(toAppend).append("\t"); + + toAppend = instance.getStatus() != null ? instance.getStatus() : "-"; + sb.append(toAppend).append("\t"); + + toAppend = instance.getSize() != -1 ? String.valueOf(instance.getSize()) : "-"; + sb.append(toAppend).append("\t"); + + toAppend = instance.getCreationTime() != 0 + ? SchemaHelper.formatDateUTC(new Date(instance.getCreationTime())) : "-"; + sb.append(toAppend).append("\t"); + + toAppend = StringUtils.isEmpty(instance.getUri()) ? "-" : instance.getUri(); + sb.append(toAppend).append("\n"); + } + } + sb.append("\nAdditional Information:\n"); + sb.append("Response: ").append(result.getMessage()); + sb.append("Request Id: ").append(result.getRequestId()); + return sb.toString(); + } + protected static enum GraphOperations { VERTICES("api/graphs/lineage/vertices", HttpMethod.GET, MediaType.APPLICATION_JSON), http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/client/src/main/java/org/apache/falcon/resource/APIResult.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/resource/APIResult.java b/client/src/main/java/org/apache/falcon/resource/APIResult.java index 79b8a1d..3b89040 100644 --- a/client/src/main/java/org/apache/falcon/resource/APIResult.java +++ b/client/src/main/java/org/apache/falcon/resource/APIResult.java @@ -104,4 +104,11 @@ public class APIResult { return e.getMessage(); } } + + public Object[] getCollection() { + return null; + } + + public void setCollection(Object[] items) { + } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/client/src/main/java/org/apache/falcon/resource/FeedInstanceResult.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/resource/FeedInstanceResult.java b/client/src/main/java/org/apache/falcon/resource/FeedInstanceResult.java new file mode 100644 index 0000000..1d55e68 --- /dev/null +++ b/client/src/main/java/org/apache/falcon/resource/FeedInstanceResult.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.resource; + +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; + +/** + * Pojo for JAXB marshalling / unmarshalling. + */ +//SUSPEND CHECKSTYLE CHECK VisibilityModifierCheck +@XmlRootElement +@edu.umd.cs.findbugs.annotations.SuppressWarnings({"EI_EXPOSE_REP", "EI_EXPOSE_REP2"}) +public class FeedInstanceResult extends APIResult { + + @XmlElement + private Instance[] instances; + + private FeedInstanceResult() { // for jaxb + super(); + } + + public FeedInstanceResult(String message, Instance[] instances) { + this(Status.SUCCEEDED, message, instances); + } + + public FeedInstanceResult(Status status, String message, + Instance[] inInstances) { + super(status, message); + this.instances = inInstances; + } + + public FeedInstanceResult(Status status, String message) { + super(status, message); + } + + public Instance[] getInstances() { + return instances; + } + + public void setInstances(Instance[] instances) { + this.instances = instances; + } + + /** + * A single instance object inside instance result. + */ + @XmlRootElement(name = "instance") + public static class Instance { + @XmlElement + public String cluster; + + @XmlElement + public String instance; + + @XmlElement + public String status; + + @XmlElement + public String uri; + + @XmlElement + public long creationTime; + + @XmlElement + public long size; + + public Instance() { + } + + public Instance(String cluster, String instance, String status) { + this.cluster = cluster; + this.instance = instance; + this.status = status; + } + + public String getInstance() { + return instance; + } + + public String getStatus() { + return status; + } + + public String getUri() { + return uri; + } + + public String getCluster() { + return cluster; + } + + public long getCreationTime() { + return creationTime; + } + + public Long getSize() { + return size; + } + + @Override + public String toString() { + return "{instance:" + + this.instance + + ", status:" + + this.status + + (this.uri == null ? "" : ", uri: " + this.uri) + + (this.cluster == null ? "" : ", cluster:" + this.cluster) + "}"; + } + } +} +//RESUME CHECKSTYLE CHECK VisibilityModifierCheck http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/client/src/main/java/org/apache/falcon/resource/InstancesResult.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/resource/InstancesResult.java b/client/src/main/java/org/apache/falcon/resource/InstancesResult.java index 5754f97..65355f0 100644 --- a/client/src/main/java/org/apache/falcon/resource/InstancesResult.java +++ b/client/src/main/java/org/apache/falcon/resource/InstancesResult.java @@ -53,16 +53,6 @@ public class InstancesResult extends APIResult { super(); } - public InstancesResult(String message, Instance[] instances) { - this(Status.SUCCEEDED, message, instances); - } - - public InstancesResult(Status status, String message, - Instance[] instanceExes) { - super(status, message); - this.instances = instanceExes; - } - public InstancesResult(Status status, String message) { super(status, message); } @@ -76,6 +66,24 @@ public class InstancesResult extends APIResult { this.instances = instances; } + @Override + public Object[] getCollection() { + return getInstances(); + } + + @Override + public void setCollection(Object[] items) { + if (items == null) { + setInstances(new Instance[0]); + } else { + Instance[] newInstances = new Instance[items.length]; + for (int index = 0; index < items.length; index++) { + newInstances[index] = (Instance)items[index]; + } + setInstances(newInstances); + } + } + /** * A single instance object inside instance result. */ http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/client/src/main/java/org/apache/falcon/resource/InstancesSummaryResult.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/resource/InstancesSummaryResult.java b/client/src/main/java/org/apache/falcon/resource/InstancesSummaryResult.java index 0758c8b..a3dcbe4 100644 --- a/client/src/main/java/org/apache/falcon/resource/InstancesSummaryResult.java +++ b/client/src/main/java/org/apache/falcon/resource/InstancesSummaryResult.java @@ -39,16 +39,6 @@ public class InstancesSummaryResult extends APIResult { super(); } - public InstancesSummaryResult(String message, InstanceSummary[] instancesSummary) { - this(Status.SUCCEEDED, message, instancesSummary); - } - - public InstancesSummaryResult(Status status, String message, - InstanceSummary[] instancesSummary) { - super(status, message); - this.instancesSummary = instancesSummary; - } - public InstancesSummaryResult(Status status, String message) { super(status, message); } @@ -61,6 +51,24 @@ public class InstancesSummaryResult extends APIResult { this.instancesSummary = instancesSummary; } + @Override + public Object[] getCollection() { + return getInstancesSummary(); + } + + @Override + public void setCollection(Object[] items) { + if (items == null) { + setInstancesSummary(new InstanceSummary[0]); + } else { + InstanceSummary[] newInstances = new InstanceSummary[items.length]; + for (int index = 0; index < items.length; index++) { + newInstances[index] = (InstanceSummary)items[index]; + } + setInstancesSummary(newInstances); + } + } + /** * A single instance object inside instance result. */ http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java index 7ad0716..dbb0293 100644 --- a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java +++ b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java @@ -30,6 +30,7 @@ import org.apache.falcon.entity.v0.feed.LocationType; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.Date; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -346,6 +347,12 @@ public class CatalogStorage implements Storage { } @Override + public List<FeedInstanceStatus> getListing(Feed feed, String cluster, LocationType locationType, + Date start, Date end) throws FalconException { + throw new UnsupportedOperationException("getListing"); + } + + @Override public String toString() { return "CatalogStorage{" + "catalogUrl='" + catalogUrl + '\'' http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java index 6945cea..52b570d 100644 --- a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java +++ b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java @@ -18,6 +18,9 @@ package org.apache.falcon.entity; +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.store.ConfigurationStore; +import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.entity.v0.cluster.Interface; import org.apache.falcon.entity.v0.cluster.Interfacetype; @@ -40,6 +43,10 @@ public final class ClusterHelper { private ClusterHelper() { } + public static Cluster getCluster(String cluster) throws FalconException { + return ConfigurationStore.get().get(EntityType.CLUSTER, cluster); + } + public static Configuration getConfiguration(Cluster cluster) { Configuration conf = new Configuration(); http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/common/src/main/java/org/apache/falcon/entity/FeedHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java index 4174135..4532669 100644 --- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java +++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java @@ -22,28 +22,44 @@ import org.apache.commons.lang.StringUtils; import org.apache.falcon.FalconException; import org.apache.falcon.LifeCycle; import org.apache.falcon.Tag; +import org.apache.falcon.entity.common.FeedDataPath; +import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.cluster.Property; import org.apache.falcon.entity.v0.feed.CatalogTable; import org.apache.falcon.entity.v0.feed.Cluster; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.feed.Location; +import org.apache.falcon.entity.v0.feed.LocationType; import org.apache.falcon.entity.v0.feed.Locations; import org.apache.falcon.expression.ExpressionHelper; +import org.apache.falcon.resource.APIResult; +import org.apache.falcon.resource.FeedInstanceResult; import org.apache.falcon.util.BuildProperties; +import org.apache.hadoop.fs.Path; import java.net.URISyntaxException; +import java.text.DateFormat; +import java.text.ParseException; +import java.text.SimpleDateFormat; import java.util.Arrays; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; +import java.util.TimeZone; +import java.util.TreeMap; +import java.util.regex.Matcher; /** * Feed entity helper methods. */ public final class FeedHelper { + private static final String FORMAT = "yyyyMMddHHmm"; + private FeedHelper() {} public static Cluster getCluster(Feed feed, String clusterName) { @@ -227,7 +243,7 @@ public final class FeedHelper { return normalizePartitionExpression(partition, null); } - private static Properties loadClusterProperties(org.apache.falcon.entity.v0.cluster.Cluster cluster) { + public static Properties getClusterProperties(org.apache.falcon.entity.v0.cluster.Cluster cluster) { Properties properties = new Properties(); Map<String, String> clusterVars = new HashMap<String, String>(); clusterVars.put("colo", cluster.getColo()); @@ -244,7 +260,7 @@ public final class FeedHelper { public static String evaluateClusterExp(org.apache.falcon.entity.v0.cluster.Cluster clusterEntity, String exp) throws FalconException { - Properties properties = loadClusterProperties(clusterEntity); + Properties properties = getClusterProperties(clusterEntity); ExpressionHelper expHelp = ExpressionHelper.get(); expHelp.setPropertiesForVariable(properties); return expHelp.evaluateFullExpression(exp, String.class); @@ -290,4 +306,124 @@ public final class FeedHelper { props.put("userWorkflowVersion", version); return props; } + + public static Properties getFeedProperties(Feed feed) { + Properties feedProperties = new Properties(); + if (feed.getProperties() != null) { + for (org.apache.falcon.entity.v0.feed.Property property : feed.getProperties().getProperties()) { + feedProperties.put(property.getName(), property.getValue()); + } + } + return feedProperties; + } + + /** + * Replaces timed variables with corresponding time notations e.g., ${YEAR} with yyyy and so on. + * @param templatePath - template feed path + * @return time notations + */ + public static String getDateFormatInPath(String templatePath) { + String mask = extractDatePartFromPathMask(templatePath, templatePath); + //yyyyMMddHHmm + return mask.replaceAll(FeedDataPath.VARS.YEAR.regex(), "yyyy") + .replaceAll(FeedDataPath.VARS.MONTH.regex(), "MM") + .replaceAll(FeedDataPath.VARS.DAY.regex(), "dd") + .replaceAll(FeedDataPath.VARS.HOUR.regex(), "HH") + .replaceAll(FeedDataPath.VARS.MINUTE.regex(), "mm"); + } + + /** + * Extracts the date part of the path and builds a date format mask. + * @param mask - Path pattern containing ${YEAR}, ${MONTH}... + * @param inPath - Path from which date part need to be extracted + * @return - Parts of inPath with non-date-part stripped out. + * + * Example: extractDatePartFromPathMask("/data/foo/${YEAR}/${MONTH}", "/data/foo/2012/${MONTH}"); + * Returns: 2012${MONTH}. + */ + private static String extractDatePartFromPathMask(String mask, String inPath) { + String[] elements = FeedDataPath.PATTERN.split(mask); + + String out = inPath; + for (String element : elements) { + out = out.replaceFirst(element, ""); + } + return out; + } + + private static Map<FeedDataPath.VARS, String> getDatePartMap(String path, String mask) { + Map<FeedDataPath.VARS, String> map = new TreeMap<FeedDataPath.VARS, String>(); + Matcher matcher = FeedDataPath.DATE_FIELD_PATTERN.matcher(mask); + int start = 0; + while (matcher.find(start)) { + String subMask = mask.substring(matcher.start(), matcher.end()); + String subPath = path.substring(matcher.start(), matcher.end()); + FeedDataPath.VARS var = FeedDataPath.VARS.from(subMask); + if (!map.containsKey(var)) { + map.put(var, subPath); + } + start = matcher.start() + 1; + } + return map; + } + + /** + * Extracts date from the actual data path e.g., /path/2014/05/06 maps to 2014-05-06T00:00Z. + * @param file - actual data path + * @param templatePath - template path from feed definition + * @param dateMask - path mask from getDateFormatInPath() + * @param timeZone + * @return date corresponding to the path + */ + //consider just the first occurrence of the pattern + public static Date getDate(Path file, String templatePath, String dateMask, String timeZone) { + String path = extractDatePartFromPathMask(templatePath, file.toString()); + Map<FeedDataPath.VARS, String> map = getDatePartMap(path, dateMask); + + if (map.isEmpty()) { + return null; + } + + StringBuilder date = new StringBuilder(); + int ordinal = 0; + for (Map.Entry<FeedDataPath.VARS, String> entry : map.entrySet()) { + if (ordinal++ == entry.getKey().ordinal()) { + date.append(entry.getValue()); + } else { + return null; + } + } + + try { + DateFormat dateFormat = new SimpleDateFormat(FORMAT.substring(0, date.length())); + dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone)); + return dateFormat.parse(date.toString()); + } catch (ParseException e) { + return null; + } + } + + public static FeedInstanceResult getFeedInstanceListing(Entity entityObject, + Date start, Date end) throws FalconException { + Set<String> clusters = EntityUtil.getClustersDefinedInColos(entityObject); + FeedInstanceResult result = new FeedInstanceResult(APIResult.Status.SUCCEEDED, "Success"); + for (String cluster : clusters) { + Feed feed = (Feed) entityObject; + Storage storage = createStorage(cluster, feed); + List<FeedInstanceStatus> feedListing = storage.getListing(feed, cluster, LocationType.DATA, start, end); + FeedInstanceResult.Instance[] instances = new FeedInstanceResult.Instance[feedListing.size()]; + int index = 0; + for (FeedInstanceStatus feedStatus : feedListing) { + FeedInstanceResult.Instance instance = new + FeedInstanceResult.Instance(cluster, feedStatus.getInstance(), + feedStatus.getStatus().name()); + instance.creationTime = feedStatus.getCreationTime(); + instance.uri = feedStatus.getUri(); + instance.size = feedStatus.getSize(); + instances[index++] = instance; + } + result.setInstances(instances); + } + return result; + } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/common/src/main/java/org/apache/falcon/entity/FeedInstanceStatus.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/FeedInstanceStatus.java b/common/src/main/java/org/apache/falcon/entity/FeedInstanceStatus.java new file mode 100644 index 0000000..ff06554 --- /dev/null +++ b/common/src/main/java/org/apache/falcon/entity/FeedInstanceStatus.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.entity; + +/** + * Feed Instance Status is used to provide feed instance listing and corresponding status. + * + * This is used for exchanging information for getListing api + */ +public class FeedInstanceStatus { + + private String instance; + + private final String uri; + + private long creationTime; + + private long size = -1; + + private AvailabilityStatus status = AvailabilityStatus.MISSING; + + /** + * Availability status of a feed instance. + * + * Missing if the feed partition is entirely missing, + * Available if present and the availability flag is also present + * Availability flag is configured in feed definition, but availability flag is missing in data path + * Empty if the empty + */ + public enum AvailabilityStatus {MISSING, AVAILABLE, PARTIAL, EMPTY} + + public FeedInstanceStatus(String uri) { + this.uri = uri; + } + + public String getInstance() { + return instance; + } + + public void setInstance(String instance) { + this.instance = instance; + } + + public String getUri() { + return uri; + } + + public long getCreationTime() { + return creationTime; + } + + public void setCreationTime(long creationTime) { + this.creationTime = creationTime; + } + + public long getSize() { + return size; + } + + public void setSize(long size) { + this.size = size; + } + + public AvailabilityStatus getStatus() { + return status; + } + + public void setStatus(AvailabilityStatus status) { + this.status = status; + } + + @Override + public String toString() { + return "FeedInstanceStatus{" + + "instance='" + instance + '\'' + + ", uri='" + uri + '\'' + + ", creationTime=" + creationTime + + ", size=" + size + + ", status='" + status + '\'' + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + FeedInstanceStatus that = (FeedInstanceStatus) o; + + if (creationTime != that.creationTime) { + return false; + } + if (size != that.size) { + return false; + } + if (!instance.equals(that.instance)) { + return false; + } + if (status != that.status) { + return false; + } + if (uri != null ? !uri.equals(that.uri) : that.uri != null) { + return false; + } + + return true; + } + + @Override + public int hashCode() { + int result = instance.hashCode(); + result = 31 * result + (uri != null ? uri.hashCode() : 0); + result = 31 * result + (int) (creationTime ^ (creationTime >>> 32)); + result = 31 * result + (int) (size ^ (size >>> 32)); + result = 31 * result + (status != null ? status.hashCode() : 0); + return result; + } +} http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java index 58506ad..012a6e7 100644 --- a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java +++ b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java @@ -22,13 +22,17 @@ import org.apache.commons.lang.StringUtils; import org.apache.falcon.FalconException; import org.apache.falcon.entity.common.FeedDataPath; import org.apache.falcon.entity.v0.AccessControlList; +import org.apache.falcon.entity.v0.SchemaHelper; +import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.feed.Location; import org.apache.falcon.entity.v0.feed.LocationType; import org.apache.falcon.entity.v0.feed.Locations; +import org.apache.falcon.expression.ExpressionHelper; import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.falcon.security.CurrentUser; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -39,8 +43,12 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.Calendar; +import java.util.Date; import java.util.List; +import java.util.Properties; import java.util.Set; +import java.util.TimeZone; import java.util.regex.Matcher; /** @@ -241,6 +249,16 @@ public class FileSystemStorage implements Storage { return null; } + public static Properties getFeedProperties(Feed feed) { + Properties feedProperties = new Properties(); + if (feed.getProperties() != null) { + for (org.apache.falcon.entity.v0.feed.Property property : feed.getProperties().getProperties()) { + feedProperties.put(property.getName(), property.getValue()); + } + } + return feedProperties; + } + @Override public void validateACL(AccessControlList acl) throws FalconException { try { @@ -272,6 +290,78 @@ public class FileSystemStorage implements Storage { } } + @Override + @SuppressWarnings("MagicConstant") + public List<FeedInstanceStatus> getListing(Feed feed, String clusterName, LocationType locationType, + Date start, Date end) throws FalconException { + + Calendar calendar = Calendar.getInstance(); + List<Location> clusterSpecificLocation = FeedHelper. + getLocations(FeedHelper.getCluster(feed, clusterName), feed); + Location location = getLocation(clusterSpecificLocation, locationType); + try { + FileSystem fileSystem = HadoopClientFactory.get().createProxiedFileSystem(getConf()); + Cluster cluster = ClusterHelper.getCluster(clusterName); + Properties baseProperties = FeedHelper.getClusterProperties(cluster); + baseProperties.putAll(FeedHelper.getFeedProperties(feed)); + List<FeedInstanceStatus> instances = new ArrayList<FeedInstanceStatus>(); + Date feedStart = FeedHelper.getCluster(feed, clusterName).getValidity().getStart(); + TimeZone tz = feed.getTimezone(); + Date alignedStart = EntityUtil.getNextStartTime(feedStart, feed.getFrequency(), tz, start); + + String basePath = location.getPath(); + while (!end.before(alignedStart)) { + Properties allProperties = ExpressionHelper.getTimeVariables(alignedStart, tz); + allProperties.putAll(baseProperties); + String feedInstancePath = ExpressionHelper.substitute(basePath, allProperties); + FileStatus fileStatus = getFileStatus(fileSystem, new Path(feedInstancePath)); + FeedInstanceStatus instance = new FeedInstanceStatus(feedInstancePath); + String dateMask = FeedHelper.getDateFormatInPath(basePath); + Date date = FeedHelper.getDate(new Path(feedInstancePath), basePath, dateMask, tz.getID()); + instance.setInstance(SchemaHelper.formatDateUTC(date)); + if (fileStatus != null) { + instance.setCreationTime(fileStatus.getModificationTime()); + ContentSummary contentSummary = fileSystem.getContentSummary(fileStatus.getPath()); + if (contentSummary != null) { + long size = contentSummary.getSpaceConsumed(); + instance.setSize(size); + if (!StringUtils.isEmpty(feed.getAvailabilityFlag())) { + FileStatus doneFile = getFileStatus(fileSystem, + new Path(fileStatus.getPath(), feed.getAvailabilityFlag())); + if (doneFile != null) { + instance.setStatus(FeedInstanceStatus.AvailabilityStatus.AVAILABLE); + } else { + instance.setStatus(FeedInstanceStatus.AvailabilityStatus.PARTIAL); + } + } else { + instance.setStatus(size > 0 ? FeedInstanceStatus.AvailabilityStatus.AVAILABLE + : FeedInstanceStatus.AvailabilityStatus.EMPTY); + } + } + } + instances.add(instance); + calendar.setTime(alignedStart); + calendar.add(feed.getFrequency().getTimeUnit().getCalendarUnit(), + feed.getFrequency().getFrequencyAsInt()); + alignedStart = calendar.getTime(); + } + return instances; + } catch (IOException e) { + LOG.error("Unable to retrieve listing for {}:{}", locationType, getStorageUrl(), e); + throw new FalconException("Unable to retrieve listing for (URI " + getStorageUrl() + ")", e); + } + } + + public FileStatus getFileStatus(FileSystem fileSystem, Path feedInstancePath) { + FileStatus fileStatus = null; + try { + fileStatus = fileSystem.getFileStatus(feedInstancePath); + } catch (IOException ignore) { + //ignore + } + return fileStatus; + } + private Configuration getConf() { Configuration conf = new Configuration(); conf.set(HadoopClientFactory.FS_DEFAULT_NAME_KEY, storageUrl); http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/common/src/main/java/org/apache/falcon/entity/Storage.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/Storage.java b/common/src/main/java/org/apache/falcon/entity/Storage.java index f88e139..af3040b 100644 --- a/common/src/main/java/org/apache/falcon/entity/Storage.java +++ b/common/src/main/java/org/apache/falcon/entity/Storage.java @@ -20,8 +20,12 @@ package org.apache.falcon.entity; import org.apache.falcon.FalconException; import org.apache.falcon.entity.v0.AccessControlList; +import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.feed.LocationType; +import java.util.Date; +import java.util.List; + /** * A class to encapsulate the storage for a given feed which can either be * expressed as a path on the file system or a table in a catalog. @@ -81,4 +85,10 @@ public interface Storage { * @throws FalconException if the permissions are not valid. */ void validateACL(AccessControlList acl) throws FalconException; + + /** + * Get Feed Listing for a feed between a date range. + */ + List<FeedInstanceStatus> getListing(Feed feed, String cluster, LocationType locationType, + Date start, Date end) throws FalconException; } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java b/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java index 79d6e2d..e04f046 100644 --- a/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java +++ b/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java @@ -20,6 +20,7 @@ package org.apache.falcon.expression; import org.apache.commons.el.ExpressionEvaluatorImpl; import org.apache.falcon.FalconException; +import org.apache.falcon.entity.common.FeedDataPath; import javax.servlet.jsp.el.ELException; import javax.servlet.jsp.el.ExpressionEvaluator; @@ -97,6 +98,18 @@ public final class ExpressionHelper implements FunctionMapper, VariableResolver referenceDate.set(date); } + public static Properties getTimeVariables(Date date, TimeZone tz) { + Properties vars = new Properties(); + Calendar cal = Calendar.getInstance(tz); + cal.setTime(date); + vars.put(FeedDataPath.VARS.YEAR.name(), String.format("%04d", cal.get(Calendar.YEAR))); + vars.put(FeedDataPath.VARS.MONTH.name(), String.format("%02d", (cal.get(Calendar.MONTH) + 1))); + vars.put(FeedDataPath.VARS.DAY.name(), String.format("%02d", cal.get(Calendar.DAY_OF_MONTH))); + vars.put(FeedDataPath.VARS.HOUR.name(), String.format("%02d", cal.get(Calendar.HOUR_OF_DAY))); + vars.put(FeedDataPath.VARS.MINUTE.name(), String.format("%02d", cal.get(Calendar.MINUTE))); + return vars; + } + private static int getDayOffset(String weekDayName) { int day; Calendar nominalTime = Calendar.getInstance(); http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java index b97564d..1667161 100644 --- a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java +++ b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java @@ -20,11 +20,22 @@ package org.apache.falcon.entity; import org.apache.falcon.FalconException; import org.apache.falcon.cluster.util.EmbeddedCluster; +import org.apache.falcon.entity.store.ConfigurationStore; import org.apache.falcon.entity.v0.AccessControlList; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.Frequency; +import org.apache.falcon.entity.v0.SchemaHelper; +import org.apache.falcon.entity.v0.feed.Cluster; +import org.apache.falcon.entity.v0.feed.Clusters; +import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.feed.Location; import org.apache.falcon.entity.v0.feed.LocationType; +import org.apache.falcon.entity.v0.feed.Locations; +import org.apache.falcon.entity.v0.feed.Validity; +import org.apache.falcon.expression.ExpressionHelper; import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.falcon.security.CurrentUser; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.testng.Assert; @@ -32,8 +43,15 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import java.io.IOException; +import java.io.OutputStream; import java.util.ArrayList; +import java.util.Calendar; +import java.util.Date; import java.util.List; +import java.util.Properties; +import java.util.Random; +import java.util.TimeZone; /** * Test class for File System Storage. @@ -389,4 +407,128 @@ public class FileSystemStorageTest { return permission; } } + + @DataProvider(name = "testListingDataProvider") + private Object[][] createTestListingData() { + final long millis = 24L * 3600 * 1000; + final long now = System.currentTimeMillis(); + TimeZone utc = TimeZone.getTimeZone("UTC"); + return new Object[][] { + {null, Frequency.fromString("hours(2)"), utc, new Date(now - 60 * millis), new Date(now - 56 * millis)}, + {null, Frequency.fromString("days(1)"), utc, new Date(now - 20 * millis), new Date(now + 6 * millis)}, + {null, Frequency.fromString("months(1)"), utc, new Date(now - 85 * millis), new Date(now - 10 * millis)}, + }; + } + + @Test (dataProvider = "testListingDataProvider") + public void testListing(String availabilityFlag, Frequency frequency, TimeZone timeZone, + Date start, Date end) throws Exception { + EmbeddedCluster cluster = EmbeddedCluster.newCluster("TestFeedListing", false); + FileSystem fs = cluster.getFileSystem(); + ConfigurationStore.get().publish(EntityType.CLUSTER, cluster.getCluster()); + try { + Feed feed = getFeed(availabilityFlag, frequency, timeZone); + List<FeedInstanceStatus> expected = prepareData(fs, feed, start, end); + FileSystemStorage fileSystemStorage = new FileSystemStorage(cluster.getFileSystem(). + getUri().toString(), feed.getLocations()); + List<FeedInstanceStatus> actual = fileSystemStorage. + getListing(feed, "TestFeedListing", LocationType.DATA, start, end); + Assert.assertEquals(actual, expected, "Feed instance Listings doesn't match"); + } finally { + ConfigurationStore.get().remove(EntityType.CLUSTER, cluster.getCluster().getName()); + } + } + + @SuppressWarnings("MagicConstant") + private List<FeedInstanceStatus> prepareData(FileSystem fs, Feed feed, + Date start, Date end) throws Exception { + fs.delete(new Path("/TestFeedListing"), true); + Random random = new Random(); + List<FeedInstanceStatus> instances = new ArrayList<FeedInstanceStatus>(); + String basePath = feed.getLocations().getLocations().get(0).getPath(); + Frequency frequency = feed.getFrequency(); + TimeZone tz = feed.getTimezone(); + Date dataStart = EntityUtil.getNextStartTime(feed.getClusters().getClusters().get(0).getValidity().getStart(), + feed.getFrequency(), tz, new Date(start.getTime())); + Date dataEnd = new Date(end.getTime()); + while (dataStart.before(dataEnd)) { + Properties properties = ExpressionHelper.getTimeVariables(dataStart, tz); + String path = ExpressionHelper.substitute(basePath, properties); + FeedInstanceStatus instance = new FeedInstanceStatus(path); + instance.setStatus(FeedInstanceStatus.AvailabilityStatus.MISSING); + instance.setSize(-1); + instance.setCreationTime(0); + String dateMask = FeedHelper.getDateFormatInPath(basePath); + Date date = FeedHelper.getDate(new Path(path), basePath, dateMask, tz.getID()); + instance.setInstance(SchemaHelper.formatDateUTC(date)); + Calendar cal = Calendar.getInstance(); + cal.setTime(dataStart); + cal.add(frequency.getTimeUnit().getCalendarUnit(), frequency.getFrequencyAsInt()); + dataStart.setTime(cal.getTimeInMillis()); + if (random.nextBoolean()) { + OutputStream out = fs.create(new Path(path, "file")); + out.write("Hello World\n".getBytes()); + out.close(); + instance.setSize(12); + if (feed.getAvailabilityFlag() == null + || (feed.getAvailabilityFlag() != null && random.nextBoolean())) { + //If availability is not present or if ok to create availability file, mark as available + instance.setStatus(FeedInstanceStatus.AvailabilityStatus.AVAILABLE); + if (feed.getAvailabilityFlag() != null) { + fs.create(new Path(path, feed.getAvailabilityFlag())).close(); + } + } else if (feed.getAvailabilityFlag() != null) { + //If availability is present or not ok to create availability file, mark as partial + fs.mkdirs(new Path(path)); + instance.setStatus(FeedInstanceStatus.AvailabilityStatus.PARTIAL); + } + } else { + if (feed.getAvailabilityFlag() == null && random.nextBoolean()) { + //If availability is not present or ok to create dir, mark as empty + fs.mkdirs(new Path(path)); + instance.setStatus(FeedInstanceStatus.AvailabilityStatus.EMPTY); + instance.setSize(0); + } else if (feed.getAvailabilityFlag() != null && random.nextBoolean()) { + //If availability is present and ok to create dir, mark as partial + fs.mkdirs(new Path(path)); + instance.setStatus(FeedInstanceStatus.AvailabilityStatus.PARTIAL); + } else if (feed.getAvailabilityFlag() != null) { + //If availability is present and ok to create empty instance + fs.create(new Path(path, feed.getAvailabilityFlag())).close(); + instance.setStatus(FeedInstanceStatus.AvailabilityStatus.EMPTY); + instance.setSize(0); + } + } + try { + FileStatus fileStatus = fs.getFileStatus(new Path(path)); + instance.setCreationTime(fileStatus.getModificationTime()); + } catch (IOException e) { + //ignore + } + instances.add(instance); + } + return instances; + } + + private Feed getFeed(String availabilityFlag, Frequency frequency, TimeZone timeZone) { + Feed feed = new Feed(); + feed.setAvailabilityFlag(availabilityFlag); + feed.setFrequency(frequency); + feed.setTimezone(timeZone); + feed.setLocations(new Locations()); + Location dataLocation = new Location(); + feed.getLocations().getLocations().add(dataLocation); + dataLocation.setPath("/TestFeedListing/data/${YEAR}/${MONTH}/${DAY}" + + (frequency.getTimeUnit() == Frequency.TimeUnit.hours ? "/${HOUR}" : "") + "/MORE"); + dataLocation.setType(LocationType.DATA); + feed.setClusters(new Clusters()); + Cluster cluster = new Cluster(); + cluster.setName("TestFeedListing"); + feed.getClusters().getClusters().add(cluster); + Validity validity = new Validity(); + cluster.setValidity(validity); + validity.setStart(new Date(System.currentTimeMillis() - (1000L * 24 * 3600000))); + validity.setEnd(new Date(System.currentTimeMillis() + (1000L * 24 * 3600000))); + return feed; + } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/docs/src/site/twiki/FalconCLI.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/FalconCLI.twiki b/docs/src/site/twiki/FalconCLI.twiki index 1d536a1..b8d36bb 100644 --- a/docs/src/site/twiki/FalconCLI.twiki +++ b/docs/src/site/twiki/FalconCLI.twiki @@ -214,6 +214,18 @@ Optional Args : -colo <<colo>> -lifecycle <<lifecycles>> <a href="./Restapi/InstanceRunning.html">Optional params described here.</a> +---+++FeedInstanceListing + +Get falcon feed instance availability. + +Usage: +$FALCON_HOME/bin/falcon instance -entity feed -name <<name>> -listing + +Optional Args : -start "yyyy-MM-dd'T'HH:mm'Z'" -end "yyyy-MM-dd'T'HH:mm'Z'" +-colo <<colo>> + +<a href="./Restapi/FeedInstanceListing.html">Optional params described here.</a> + ---+++Logs Get logs for instance actions http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/docs/src/site/twiki/restapi/FeedInstanceListing.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/restapi/FeedInstanceListing.twiki b/docs/src/site/twiki/restapi/FeedInstanceListing.twiki new file mode 100644 index 0000000..a3e306d --- /dev/null +++ b/docs/src/site/twiki/restapi/FeedInstanceListing.twiki @@ -0,0 +1,45 @@ +---++ GET /api/instance/listing/feed/:entity-name + * <a href="#Description">Description</a> + * <a href="#Parameters">Parameters</a> + * <a href="#Results">Results</a> + * <a href="#Examples">Examples</a> + +---++ Description +Get falcon feed instance availability. + +---++ Parameters + * :entity-name Name of the entity. + * start <optional param> Show instances from this date. Date format is yyyy-MM-dd'T'HH:mm'Z'. + * By default, it is set to (end - (10 * entityFrequency)). + * end <optional param> Show instances up to this date. Date format is yyyy-MM-dd'T'HH:mm'Z'. + * Default is set to now. + * colo <optional param> Colo on which the query should be run. + +---++ Results +Feed instance availability status + +---++ Examples +---+++ Rest Call +<verbatim> +GET http://localhost:15000/api/instance/listing/feed/SampleFeed?colo=*&start=2012-04-03T07:00Z +</verbatim> +---+++ Result +<verbatim> +{ + "instances": [ + { + "size": "450231212222", + "creationTime": "1236679827365", + "cluster": "primary-cluster", + "uri": "/data/SampleFeed/2012-04-03", + "status": "AVAILABLE", + "instance": "2012-04-03T07:00Z" + } + ], + "requestId": "default\/3527038e-8334-4e50-8173-76c4fa430d0b\n", + "message": "default\/STATUS\n", + "status": "SUCCEEDED" +} +</verbatim> + + http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java index fd8c63f..d5432eb 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java @@ -454,7 +454,9 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } } } - return new InstancesResult("Running Instances", runInstances.toArray(new Instance[runInstances.size()])); + InstancesResult result = new InstancesResult(APIResult.Status.SUCCEEDED, "Running Instances"); + result.setInstances(runInstances.toArray(new Instance[runInstances.size()])); + return result; } catch (OozieClientException e) { throw new FalconException(e); @@ -1440,7 +1442,10 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } instance.cluster = cluster; instances[0] = instance; - return new InstancesResult("Instance for workflow id:" + jobId, instances); + InstancesResult result = new InstancesResult(APIResult.Status.SUCCEEDED, + "Instance for workflow id:" + jobId); + result.setInstances(instances); + return result; } catch (Exception e) { throw new FalconException(e); } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java index 41cd601..e50da2d 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java @@ -50,6 +50,7 @@ import javax.servlet.http.HttpServletRequest; import javax.ws.rs.core.Response; import java.io.IOException; import java.io.InputStream; +import java.lang.reflect.Constructor; import java.util.*; /** @@ -807,4 +808,39 @@ public abstract class AbstractEntityManager { protected AbstractWorkflowEngine getWorkflowEngine() { return this.workflowEngine; } + + protected <T extends APIResult> T consolidateResult(Map<String, T> results, Class<T> clazz) { + if (results == null || results.isEmpty()) { + return null; + } + + StringBuilder message = new StringBuilder(); + StringBuilder requestIds = new StringBuilder(); + List instances = new ArrayList(); + int statusCount = 0; + for (Map.Entry<String, T> entry : results.entrySet()) { + String colo = entry.getKey(); + T result = results.get(colo); + message.append(colo).append('/').append(result.getMessage()).append('\n'); + requestIds.append(colo).append('/').append(result.getRequestId()).append('\n'); + statusCount += result.getStatus().ordinal(); + + if (result.getCollection() == null) { + continue; + } + Collections.addAll(instances, result.getCollection()); + } + Object[] arrInstances = instances.toArray(); + APIResult.Status status = (statusCount == 0) ? APIResult.Status.SUCCEEDED + : ((statusCount == results.size() * 2) ? APIResult.Status.FAILED : APIResult.Status.PARTIAL); + try { + Constructor<T> constructor = clazz.getConstructor(Status.class, String.class); + T result = constructor.newInstance(status, message.toString()); + result.setCollection(arrInstances); + result.setRequestId(requestIds.toString()); + return result; + } catch (Exception e) { + throw new FalconRuntimException("Unable to consolidate result.", e); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java index 2070713..9df6a2b 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java @@ -21,6 +21,7 @@ package org.apache.falcon.resource; import org.apache.commons.lang.StringUtils; import org.apache.falcon.*; import org.apache.falcon.entity.EntityUtil; +import org.apache.falcon.entity.FeedHelper; import org.apache.falcon.entity.parser.ValidationException; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; @@ -49,7 +50,7 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager { protected static final long DAY_IN_MILLIS = 86400000L; private static final long MONTH_IN_MILLIS = 2592000000L; - protected void checkType(String type) { + protected EntityType checkType(String type) { if (StringUtils.isEmpty(type)) { throw FalconWebException.newInstanceException("entity type is empty", Response.Status.BAD_REQUEST); @@ -60,9 +61,11 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager { "Instance management functions don't apply to Cluster entities", Response.Status.BAD_REQUEST); } + return entityType; } } + protected List<LifeCycle> checkAndUpdateLifeCycle(List<LifeCycle> lifeCycleValues, String type) throws FalconException { EntityType entityType = EntityType.valueOf(type.toUpperCase().trim()); @@ -190,14 +193,16 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager { instanceSet = filteredInstanceSet(resultSet, instanceSet, getFilterByFieldsValues(filterBy)); int pageCount = super.getRequiredNumberOfResults(instanceSet.size(), offset, numResults); + InstancesResult result = new InstancesResult(resultSet.getStatus(), resultSet.getMessage()); if (pageCount == 0) { // return empty result set - return new InstancesResult(resultSet.getMessage(), new Instance[0]); + result.setInstances(new Instance[0]); + return result; } // Sort the ArrayList using orderBy instanceSet = sortInstances(instanceSet, orderBy, sortOrder); - return new InstancesResult(resultSet.getMessage(), - instanceSet.subList(offset, (offset+pageCount)).toArray(new Instance[pageCount])); + result.setCollection(instanceSet.subList(offset, (offset+pageCount)).toArray(new Instance[pageCount])); + return result; } private ArrayList<Instance> filteredInstanceSet(InstancesResult resultSet, ArrayList<Instance> instanceSet, @@ -310,6 +315,25 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager { //RESUME CHECKSTYLE CHECK ParameterNumberCheck + public FeedInstanceResult getListing(String type, String entity, String startStr, + String endStr, String colo) { + checkColo(colo); + EntityType entityType = checkType(type); + try { + if (entityType != EntityType.FEED) { + throw new IllegalArgumentException("getLocation is not applicable for " + type); + } + validateParams(type, entity); + Entity entityObject = EntityUtil.getEntity(type, entity); + Pair<Date, Date> startAndEndDate = getStartAndEndDate(entityObject, startStr, endStr); + + return FeedHelper.getFeedInstanceListing(entityObject, startAndEndDate.first, startAndEndDate.second); + } catch (Throwable e) { + LOG.error("Failed to get instances listing", e); + throw FalconWebException.newInstanceException(e, Response.Status.BAD_REQUEST); + } + } + public InstancesResult getInstanceParams(String type, String entity, String startTime, String colo, List<LifeCycle> lifeCycles) { http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java index d172c3e..e6cf904 100644 --- a/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java +++ b/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java @@ -26,10 +26,9 @@ import org.apache.falcon.monitors.Dimension; import org.apache.falcon.monitors.Monitored; import org.apache.falcon.resource.APIResult; import org.apache.falcon.resource.AbstractInstanceManager; +import org.apache.falcon.resource.FeedInstanceResult; import org.apache.falcon.resource.InstancesResult; -import org.apache.falcon.resource.InstancesResult.Instance; import org.apache.falcon.resource.InstancesSummaryResult; -import org.apache.falcon.resource.InstancesSummaryResult.InstanceSummary; import org.apache.falcon.resource.channel.Channel; import org.apache.falcon.resource.channel.ChannelFactory; @@ -38,6 +37,7 @@ import javax.ws.rs.*; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import java.lang.reflect.Constructor; import java.util.*; /** @@ -87,7 +87,7 @@ public class InstanceManagerProxy extends AbstractInstanceManager { @DefaultValue("") @QueryParam("sortOrder") final String sortOrder, @DefaultValue("0") @QueryParam("offset") final Integer offset, @DefaultValue(DEFAULT_NUM_RESULTS) @QueryParam("numResults") final Integer resultsPerPage) { - return new InstanceProxy() { + return new InstanceProxy<InstancesResult>(InstancesResult.class) { @Override protected InstancesResult doExecute(String colo) throws FalconException { return getInstanceManager(colo). @@ -118,7 +118,7 @@ public class InstanceManagerProxy extends AbstractInstanceManager { @DefaultValue("") @QueryParam("sortOrder") final String sortOrder, @DefaultValue("0") @QueryParam("offset") final Integer offset, @DefaultValue(DEFAULT_NUM_RESULTS) @QueryParam("numResults") final Integer resultsPerPage) { - return new InstanceProxy() { + return new InstanceProxy<InstancesResult>(InstancesResult.class) { @Override protected InstancesResult doExecute(String colo) throws FalconException { return getInstanceManager(colo).invoke("getInstances", @@ -145,7 +145,7 @@ public class InstanceManagerProxy extends AbstractInstanceManager { @DefaultValue("") @QueryParam("sortOrder") final String sortOrder, @DefaultValue("0") @QueryParam("offset") final Integer offset, @DefaultValue(DEFAULT_NUM_RESULTS) @QueryParam("numResults") final Integer resultsPerPage) { - return new InstanceProxy() { + return new InstanceProxy<InstancesResult>(InstancesResult.class) { @Override protected InstancesResult doExecute(String colo) throws FalconException { return getInstanceManager(colo).invoke("getStatus", @@ -167,7 +167,7 @@ public class InstanceManagerProxy extends AbstractInstanceManager { @Dimension("end-time") @QueryParam("end") final String endStr, @Dimension("colo") @QueryParam("colo") final String colo, @Dimension("lifecycle") @QueryParam("lifecycle") final List<LifeCycle> lifeCycles) { - return new InstanceSummaryProxy() { + return new InstanceProxy<InstancesSummaryResult>(InstancesSummaryResult.class) { @Override protected InstancesSummaryResult doExecute(String colo) throws FalconException { return getInstanceManager(colo).invoke("getSummary", @@ -177,6 +177,26 @@ public class InstanceManagerProxy extends AbstractInstanceManager { } @GET + @Path("listing/{type}/{entity}") + @Produces(MediaType.APPLICATION_JSON) + @Monitored(event = "instance-listing") + @Override + public FeedInstanceResult getListing( + @Dimension("type") @PathParam("type") final String type, + @Dimension("entity") @PathParam("entity") final String entity, + @Dimension("start-time") @QueryParam("start") final String start, + @Dimension("end-time") @QueryParam("end") final String end, + @Dimension("colo") @QueryParam("colo") String colo) { + return new InstanceProxy<FeedInstanceResult>(FeedInstanceResult.class) { + @Override + protected FeedInstanceResult doExecute(String colo) throws FalconException { + return getInstanceManager(colo).invoke("getListing", + type, entity, start, end, colo); + } + }.execute(colo, type, entity); + } + + @GET @Path("params/{type}/{entity}") @Produces(MediaType.APPLICATION_JSON) @Monitored(event = "instance-params") @@ -187,7 +207,7 @@ public class InstanceManagerProxy extends AbstractInstanceManager { @Dimension("start-time") @QueryParam("start") final String start, @Dimension("colo") @QueryParam("colo") String colo, @Dimension("lifecycle") @QueryParam("lifecycle") final List<LifeCycle> lifeCycles) { - return new InstanceProxy() { + return new InstanceProxy<InstancesResult>(InstancesResult.class) { @Override protected InstancesResult doExecute(String colo) throws FalconException { return getInstanceManager(colo).invoke("getInstanceParams", @@ -196,7 +216,6 @@ public class InstanceManagerProxy extends AbstractInstanceManager { }.execute(colo, type, entity); } - @GET @Path("logs/{type}/{entity}") @Produces(MediaType.APPLICATION_JSON) @@ -215,7 +234,7 @@ public class InstanceManagerProxy extends AbstractInstanceManager { @DefaultValue("") @QueryParam("sortOrder") final String sortOrder, @DefaultValue("0") @QueryParam("offset") final Integer offset, @DefaultValue(DEFAULT_NUM_RESULTS) @QueryParam("numResults") final Integer resultsPerPage) { - return new InstanceProxy() { + return new InstanceProxy<InstancesResult>(InstancesResult.class) { @Override protected InstancesResult doExecute(String colo) throws FalconException { return getInstanceManager(colo).invoke("getLogs", @@ -240,7 +259,7 @@ public class InstanceManagerProxy extends AbstractInstanceManager { @Dimension("lifecycle") @QueryParam("lifecycle") final List<LifeCycle> lifeCycles) { final HttpServletRequest bufferedRequest = new BufferedRequest(request); - return new InstanceProxy() { + return new InstanceProxy<InstancesResult>(InstancesResult.class) { @Override protected InstancesResult doExecute(String colo) throws FalconException { return getInstanceManager(colo).invoke("killInstance", @@ -263,7 +282,7 @@ public class InstanceManagerProxy extends AbstractInstanceManager { @Dimension("colo") @QueryParam("colo") String colo, @Dimension("lifecycle") @QueryParam("lifecycle") final List<LifeCycle> lifeCycles) { final HttpServletRequest bufferedRequest = new BufferedRequest(request); - return new InstanceProxy() { + return new InstanceProxy<InstancesResult>(InstancesResult.class) { @Override protected InstancesResult doExecute(String colo) throws FalconException { return getInstanceManager(colo).invoke("suspendInstance", @@ -287,7 +306,7 @@ public class InstanceManagerProxy extends AbstractInstanceManager { @Dimension("lifecycle") @QueryParam("lifecycle") final List<LifeCycle> lifeCycles) { final HttpServletRequest bufferedRequest = new BufferedRequest(request); - return new InstanceProxy() { + return new InstanceProxy<InstancesResult>(InstancesResult.class) { @Override protected InstancesResult doExecute(String colo) throws FalconException { return getInstanceManager(colo).invoke("resumeInstance", @@ -311,7 +330,7 @@ public class InstanceManagerProxy extends AbstractInstanceManager { @Dimension("lifecycle") @QueryParam("lifecycle") final List<LifeCycle> lifeCycles) { final HttpServletRequest bufferedRequest = new BufferedRequest(request); - return new InstanceProxy() { + return new InstanceProxy<InstancesResult>(InstancesResult.class) { @Override protected InstancesResult doExecute(String colo) throws FalconException { return getInstanceManager(colo).invoke("reRunInstance", @@ -321,50 +340,28 @@ public class InstanceManagerProxy extends AbstractInstanceManager { } //RESUME CHECKSTYLE CHECK ParameterNumberCheck - private abstract class InstanceProxy { + private abstract class InstanceProxy<T extends APIResult> { - public InstancesResult execute(String coloExpr, String type, String name) { - Set<String> colos = getColosFromExpression(coloExpr, type, name); + private final Class<T> clazz; - Map<String, InstancesResult> results = new HashMap<String, InstancesResult>(); - for (String colo : colos) { - try { - InstancesResult resultHolder = doExecute(colo); - results.put(colo, resultHolder); - } catch (FalconException e) { - results.put(colo, new InstancesResult(APIResult.Status.FAILED, - e.getClass().getName() + "::" + e.getMessage(), - new InstancesResult.Instance[0])); - } - } - InstancesResult finalResult = consolidateInstanceResult(results); - if (finalResult.getStatus() != APIResult.Status.SUCCEEDED) { - throw FalconWebException.newException(finalResult, Response.Status.BAD_REQUEST); - } else { - return finalResult; - } + public InstanceProxy(Class<T> resultClazz) { + this.clazz = resultClazz; } - protected abstract InstancesResult doExecute(String colo) throws FalconException; - } - - private abstract class InstanceSummaryProxy { - - public InstancesSummaryResult execute(String coloExpr, String type, String name) { + public T execute(String coloExpr, String type, String name) { Set<String> colos = getColosFromExpression(coloExpr, type, name); - Map<String, InstancesSummaryResult> results = new HashMap<String, InstancesSummaryResult>(); + Map<String, T> results = new HashMap<String, T>(); for (String colo : colos) { try { - InstancesSummaryResult resultHolder = doExecute(colo); + T resultHolder = doExecute(colo); results.put(colo, resultHolder); } catch (FalconException e) { - results.put(colo, new InstancesSummaryResult(APIResult.Status.FAILED, - e.getClass().getName() + "::" + e.getMessage(), - new InstancesSummaryResult.InstanceSummary[0])); + results.put(colo, getResultInstance(APIResult.Status.FAILED, + e.getClass().getName() + "::" + e.getMessage())); } } - InstancesSummaryResult finalResult = consolidateInstanceSummaryResult(results); + T finalResult = consolidateResult(results, clazz); if (finalResult.getStatus() != APIResult.Status.SUCCEEDED) { throw FalconWebException.newException(finalResult, Response.Status.BAD_REQUEST); } else { @@ -372,73 +369,15 @@ public class InstanceManagerProxy extends AbstractInstanceManager { } } - protected abstract InstancesSummaryResult doExecute(String colo) throws FalconException; - } - - private InstancesResult consolidateInstanceResult(Map<String, InstancesResult> results) { - if (results == null || results.isEmpty()) { - return null; - } - - StringBuilder message = new StringBuilder(); - StringBuilder requestIds = new StringBuilder(); - List<Instance> instances = new ArrayList<Instance>(); - int statusCount = 0; - for (Map.Entry<String, InstancesResult> entry : results.entrySet()) { - String colo = entry.getKey(); - InstancesResult result = results.get(colo); - message.append(colo).append('/').append(result.getMessage()).append('\n'); - requestIds.append(colo).append('/').append(result.getRequestId()).append('\n'); - statusCount += result.getStatus().ordinal(); - - if (result.getInstances() == null) { - continue; - } - - for (Instance instance : result.getInstances()) { - instance.instance = instance.getInstance(); - instances.add(instance); - } - } - Instance[] arrInstances = new Instance[instances.size()]; - APIResult.Status status = (statusCount == 0) ? APIResult.Status.SUCCEEDED - : ((statusCount == results.size() * 2) ? APIResult.Status.FAILED : APIResult.Status.PARTIAL); - InstancesResult result = new InstancesResult(status, message.toString(), instances.toArray(arrInstances)); - result.setRequestId(requestIds.toString()); - return result; - } - - private InstancesSummaryResult consolidateInstanceSummaryResult(Map<String, InstancesSummaryResult> results) { - if (results == null || results.isEmpty()) { - return null; - } - - StringBuilder message = new StringBuilder(); - StringBuilder requestIds = new StringBuilder(); - List<InstanceSummary> instances = new ArrayList<InstanceSummary>(); - int statusCount = 0; - for (Map.Entry<String, InstancesSummaryResult> entry : results.entrySet()) { - String colo = entry.getKey(); - InstancesSummaryResult result = results.get(colo); - message.append(colo).append('/').append(result.getMessage()).append('\n'); - requestIds.append(colo).append('/').append(result.getRequestId()).append('\n'); - statusCount += result.getStatus().ordinal(); - - if (result.getInstancesSummary() == null) { - continue; - } + protected abstract T doExecute(String colo) throws FalconException; - for (InstanceSummary instance : result.getInstancesSummary()) { - instance.summaryMap = instance.getSummaryMap(); - instances.add(instance); + private T getResultInstance(APIResult.Status status, String message) { + try { + Constructor<T> constructor = clazz.getConstructor(APIResult.Status.class, String.class); + return constructor.newInstance(status, message); + } catch (Exception e) { + throw new FalconRuntimException("Unable to consolidate result.", e); } } - InstanceSummary[] arrInstances = new InstanceSummary[instances.size()]; - APIResult.Status status = (statusCount == 0) ? APIResult.Status.SUCCEEDED - : ((statusCount == results.size() * 2) ? APIResult.Status.FAILED : APIResult.Status.PARTIAL); - InstancesSummaryResult result = new InstancesSummaryResult(status, message.toString(), - instances.toArray(arrInstances)); - result.setRequestId(requestIds.toString()); - return result; } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java index fbccd6b..a4122a5 100644 --- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java +++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java @@ -43,7 +43,6 @@ import javax.ws.rs.core.Response; import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; /** @@ -129,7 +128,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana if (!embeddedMode) { results.put(PRISM_TAG, super.submit(bufferedRequest, type, currentColo)); } - return consolidateResult(results); + return consolidateResult(results, APIResult.class); } private Entity getEntity(HttpServletRequest request, String type) { @@ -187,7 +186,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana if (!embeddedMode) { results.put(PRISM_TAG, super.delete(bufferedRequest, type, entity, currentColo)); } - return consolidateResult(results); + return consolidateResult(results, APIResult.class); } @POST @@ -258,7 +257,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana results.put(PRISM_TAG, super.update(bufferedRequest, type, entityName, currentColo, effectiveTime)); } - return consolidateResult(results); + return consolidateResult(results, APIResult.class); } @GET @@ -338,7 +337,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana Map<String, APIResult> results = new HashMap<String, APIResult>(); results.put("submit", submit(bufferedRequest, type, coloExpr)); results.put("schedule", schedule(bufferedRequest, type, entity, coloExpr)); - return consolidateResult(results); + return consolidateResult(results, APIResult.class); } @POST @@ -451,7 +450,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana new APIResult(APIResult.Status.FAILED, e.getClass().getName() + "::" + e.getMessage())); } } - APIResult finalResult = consolidateResult(results); + APIResult finalResult = consolidateResult(results, APIResult.class); if (finalResult.getStatus() != APIResult.Status.SUCCEEDED) { throw FalconWebException.newException(finalResult, Response.Status.BAD_REQUEST); } else { @@ -465,27 +464,4 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana protected abstract APIResult doExecute(String colo) throws FalconException; } - - private APIResult consolidateResult(Map<String, APIResult> results) { - if (results == null || results.size() == 0) { - return null; - } - - StringBuilder buffer = new StringBuilder(); - StringBuilder requestIds = new StringBuilder(); - int statusCount = 0; - for (Entry<String, APIResult> entry : results.entrySet()) { - String colo = entry.getKey(); - APIResult result = entry.getValue(); - buffer.append(colo).append('/').append(result.getMessage()).append('\n'); - requestIds.append(colo).append('/').append(result.getRequestId()).append('\n'); - statusCount += result.getStatus().ordinal(); - } - - APIResult.Status status = (statusCount == 0) ? APIResult.Status.SUCCEEDED - : ((statusCount == results.size() * 2) ? APIResult.Status.FAILED : APIResult.Status.PARTIAL); - APIResult result = new APIResult(status, buffer.toString()); - result.setRequestId(requestIds.toString()); - return result; - } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java b/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java index 7a7d993..d4e0ae0 100644 --- a/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java +++ b/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java @@ -115,6 +115,20 @@ public class InstanceManager extends AbstractInstanceManager { } @GET + @Path("listing/{type}/{entity}") + @Produces(MediaType.APPLICATION_JSON) + @Monitored(event = "instance-listing") + @Override + public FeedInstanceResult getListing( + @Dimension("type") @PathParam("type") String type, + @Dimension("entity") @PathParam("entity") String entity, + @Dimension("start-time") @QueryParam("start") String start, + @Dimension("end-time") @QueryParam("end") String end, + @Dimension("colo") @QueryParam("colo") String colo) { + return super.getListing(type, entity, start, end, colo); + } + + @GET @Path("logs/{type}/{entity}") @Produces(MediaType.APPLICATION_JSON) @Monitored(event = "instance-logs") @@ -150,7 +164,6 @@ public class InstanceManager extends AbstractInstanceManager { return super.getInstanceParams(type, entity, start, colo, lifeCycles); } - @POST @Path("kill/{type}/{entity}") @Produces(MediaType.APPLICATION_JSON) http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java index 0943103..6694af1 100644 --- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java +++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java @@ -404,6 +404,11 @@ public class FalconCLIIT { + " -start " + SchemaHelper.getDateFormat().format(new Date()))); Assert.assertEquals(0, + executeWithURL("instance -listing -type feed -name " + + overlay.get("outputFeedName") + + " -start " + SchemaHelper.getDateFormat().format(new Date()))); + + Assert.assertEquals(0, executeWithURL("instance -status -type process -name " + overlay.get("processName") + " -start " + START_INSTANCE));