Repository: falcon Updated Branches: refs/heads/master c3f681711 -> f37729df0
FALCON-822 Add reverse look up API. Contributed by Ajay Yadava Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/f37729df Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/f37729df Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/f37729df Branch: refs/heads/master Commit: f37729df0ce2e8df50057b4c004d2e967db71560 Parents: c3f6817 Author: Suhas Vasu <[email protected]> Authored: Tue Mar 10 16:08:33 2015 +0530 Committer: Suhas Vasu <[email protected]> Committed: Tue Mar 10 16:08:33 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../java/org/apache/falcon/ResponseHelper.java | 10 ++ .../java/org/apache/falcon/cli/FalconCLI.java | 27 +++- .../org/apache/falcon/client/FalconClient.java | 13 ++ .../falcon/resource/FeedLookupResult.java | 137 +++++++++++++++++++ .../falcon/entity/store/FeedLocationStore.java | 96 ++++--------- .../apache/falcon/util/FalconRadixUtils.java | 7 +- .../java/org/apache/falcon/util/RadixTree.java | 3 + common/src/main/resources/startup.properties | 1 + .../entity/store/FeedLocationStoreTest.java | 28 ++++ .../org/apache/falcon/util/RadixTreeTest.java | 20 ++- docs/src/site/twiki/FalconCLI.twiki | 13 ++ docs/src/site/twiki/restapi/FeedLookup.twiki | 36 +++++ docs/src/site/twiki/restapi/ResourceList.twiki | 1 + .../falcon/resource/AbstractEntityManager.java | 46 +++++++ .../proxy/SchedulableEntityManagerProxy.java | 11 ++ .../falcon/resource/EntityManagerTest.java | 61 ++++++++- src/conf/startup.properties | 1 + .../resource/SchedulableEntityManager.java | 11 ++ .../java/org/apache/falcon/cli/FalconCLIIT.java | 4 + 20 files changed, 445 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/f37729df/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 69528e3..1debcba 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,8 @@ Trunk (Unreleased) NEW FEATURES FALCON-949 Force update feature (pavan kumar kolamuri via Suhas Vasu) + FALCON-822 Add reverse look up API (Ajay Yadava via Suhas Vasu) + IMPROVEMENTS FALCON-1024 Updating tags and pipeline elements of the feed/process definition resubmits a new Oozie bundle (Pallavi Rao via Suhas Vasu) http://git-wip-us.apache.org/repos/asf/falcon/blob/f37729df/client/src/main/java/org/apache/falcon/ResponseHelper.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/ResponseHelper.java b/client/src/main/java/org/apache/falcon/ResponseHelper.java index 7d5cbe5..4b40012 100644 --- a/client/src/main/java/org/apache/falcon/ResponseHelper.java +++ b/client/src/main/java/org/apache/falcon/ResponseHelper.java @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.falcon.entity.v0.SchemaHelper; import org.apache.falcon.resource.FeedInstanceResult; +import org.apache.falcon.resource.FeedLookupResult; import org.apache.falcon.resource.InstancesResult; import org.apache.falcon.resource.InstancesSummaryResult; import org.apache.falcon.resource.EntitySummaryResult; @@ -263,4 +264,13 @@ public final class ResponseHelper { sb.append("Request Id: ").append(result.getRequestId()); return sb.toString(); } + + public static String getString(FeedLookupResult feedLookupResult) { + StringBuilder sb = new StringBuilder(); + sb.append(feedLookupResult.toString()); + sb.append("\nAdditional Information:\n"); + sb.append("Response: ").append(feedLookupResult.getMessage()); + sb.append("Request Id: ").append(feedLookupResult.getRequestId()); + return sb.toString(); + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/f37729df/client/src/main/java/org/apache/falcon/cli/FalconCLI.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java index 869d56c..ab34856 100644 --- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java +++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java @@ -33,6 +33,7 @@ import org.apache.falcon.client.FalconClient; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.SchemaHelper; import org.apache.falcon.resource.EntityList; +import org.apache.falcon.resource.FeedLookupResult; import org.apache.falcon.resource.InstancesResult; import java.io.IOException; @@ -83,6 +84,8 @@ public class FalconCLI { public static final String SUMMARY_OPT = "summary"; public static final String DEFINITION_OPT = "definition"; public static final String DEPENDENCY_OPT = "dependency"; + public static final String LOOKUP_OPT = "lookup"; + public static final String PATH_OPT = "path"; public static final String LIST_OPT = "list"; public static final String TOUCH_OPT = "touch"; @@ -389,6 +392,7 @@ public class FalconCLI { String filterTags = commandLine.getOptionValue(TAGS_OPT); String searchPattern = commandLine.getOptionValue(PATTERN_OPT); String fields = commandLine.getOptionValue(FIELDS_OPT); + String feedInstancePath = commandLine.getOptionValue(PATH_OPT); Integer offset = parseIntegerInput(commandLine.getOptionValue(OFFSET_OPT), 0, "offset"); Integer numResults = parseIntegerInput(commandLine.getOptionValue(NUM_RESULTS_OPT), FalconClient.DEFAULT_NUM_RESULTS, "numResults"); @@ -402,6 +406,11 @@ public class FalconCLI { validateNotEmpty(filePath, "file"); validateColo(optionsList); result = client.submit(entityType, filePath).getMessage(); + } else if (optionsList.contains(LOOKUP_OPT)) { + validateNotEmpty(feedInstancePath, PATH_OPT); + FeedLookupResult resp = client.reverseLookUp(entityType, feedInstancePath); + result = ResponseHelper.getString(resp); + } else if (optionsList.contains(UPDATE_OPT)) { validateNotEmpty(filePath, "file"); validateColo(optionsList); @@ -411,7 +420,7 @@ public class FalconCLI { validateNotEmpty(filePath, "file"); validateColo(optionsList); result = - client.submitAndSchedule(entityType, filePath).getMessage(); + client.submitAndSchedule(entityType, filePath).getMessage(); } else if (optionsList.contains(VALIDATE_OPT)) { validateNotEmpty(filePath, "file"); validateColo(optionsList); @@ -436,7 +445,7 @@ public class FalconCLI { validateNotEmpty(entityName, ENTITY_NAME_OPT); colo = getColo(colo); result = - client.getStatus(entityTypeEnum, entityName, colo).getMessage(); + client.getStatus(entityTypeEnum, entityName, colo).getMessage(); } else if (optionsList.contains(DEFINITION_OPT)) { validateColo(optionsList); validateNotEmpty(entityName, ENTITY_NAME_OPT); @@ -460,11 +469,11 @@ public class FalconCLI { validateFilterBy(filterBy, entityAction); validateOrderBy(orderBy, entityAction); result = - ResponseHelper.getString(client - .getEntitySummary( - entityType, cluster, start, end, fields, filterBy, - filterTags, - orderBy, sortOrder, offset, numResults, numInstances)); + ResponseHelper.getString(client + .getEntitySummary( + entityType, cluster, start, end, fields, filterBy, + filterTags, + orderBy, sortOrder, offset, numResults, numInstances)); } else if (optionsList.contains(TOUCH_OPT)) { validateNotEmpty(entityName, ENTITY_NAME_OPT); colo = getColo(colo); @@ -630,6 +639,7 @@ public class FalconCLI { "Gets the dependencies of entity"); Option list = new Option(LIST_OPT, false, "List entities registered for a type"); + Option lookup = new Option(LOOKUP_OPT, false, "Lookup a feed given its instance's path"); Option entitySummary = new Option(SUMMARY_OPT, false, "Get summary of instances for list of entities"); Option touch = new Option(TOUCH_OPT, false, @@ -648,6 +658,7 @@ public class FalconCLI { group.addOption(definition); group.addOption(dependency); group.addOption(list); + group.addOption(lookup); group.addOption(entitySummary); group.addOption(touch); @@ -679,8 +690,10 @@ public class FalconCLI { "Number of results to return per request"); Option numInstances = new Option(NUM_INSTANCES_OPT, true, "Number of instances to return per entity summary request"); + Option path = new Option(PATH_OPT, true, "Path for a feed's instance"); entityOptions.addOption(url); + entityOptions.addOption(path); entityOptions.addOptionGroup(group); entityOptions.addOption(entityType); entityOptions.addOption(entityName); http://git-wip-us.apache.org/repos/asf/falcon/blob/f37729df/client/src/main/java/org/apache/falcon/client/FalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java index a866bb0..15691a6 100644 --- a/client/src/main/java/org/apache/falcon/client/FalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java @@ -36,6 +36,7 @@ import org.apache.falcon.resource.APIResult; import org.apache.falcon.resource.EntityList; import org.apache.falcon.resource.EntitySummaryResult; import org.apache.falcon.resource.FeedInstanceResult; +import org.apache.falcon.resource.FeedLookupResult; import org.apache.falcon.resource.InstancesResult; import org.apache.falcon.resource.InstancesSummaryResult; import org.apache.falcon.resource.LineageGraphResult; @@ -190,6 +191,7 @@ public class FalconClient { DEFINITION("api/entities/definition/", HttpMethod.GET, MediaType.TEXT_XML), LIST("api/entities/list/", HttpMethod.GET, MediaType.TEXT_XML), SUMMARY("api/entities/summary", HttpMethod.GET, MediaType.APPLICATION_JSON), + LOOKUP("api/entities/lookup/", HttpMethod.GET, MediaType.APPLICATION_JSON), DEPENDENCY("api/entities/dependencies/", HttpMethod.GET, MediaType.TEXT_XML), TOUCH("api/entities/touch", HttpMethod.POST, MediaType.TEXT_XML); @@ -729,6 +731,17 @@ public class FalconClient { return clientResponse.getEntity(APIResult.class); } + public FeedLookupResult reverseLookUp(String type, String path) throws FalconCLIException { + Entities api = Entities.LOOKUP; + WebResource resource = service.path(api.path).path(type); + resource = resource.queryParam("path", path); + ClientResponse response = resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken) + .accept(api.mimeType) + .method(api.method, ClientResponse.class); + checkIfSuccessful(response); + return response.getEntity(FeedLookupResult.class); + } + //SUSPEND CHECKSTYLE CHECK VisibilityModifierCheck private InstancesResult sendInstanceRequest(Instances instances, String type, String entity, String start, String end, InputStream props, http://git-wip-us.apache.org/repos/asf/falcon/blob/f37729df/client/src/main/java/org/apache/falcon/resource/FeedLookupResult.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/resource/FeedLookupResult.java b/client/src/main/java/org/apache/falcon/resource/FeedLookupResult.java new file mode 100644 index 0000000..6edb59f --- /dev/null +++ b/client/src/main/java/org/apache/falcon/resource/FeedLookupResult.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.resource; + +import org.apache.commons.lang.StringUtils; +import org.apache.falcon.entity.v0.feed.LocationType; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; + +/** + * Entity list used for marshalling / unmarshalling with REST calls. + */ +@XmlRootElement(name = "feeds") +@XmlAccessorType(XmlAccessType.FIELD) [email protected]({"EI_EXPOSE_REP", "EI_EXPOSE_REP2"}) +public class FeedLookupResult extends APIResult { + + @XmlElement(name = "feed") + private FeedProperties[] elements; + + //For JAXB + private FeedLookupResult() { + super(); + } + + public FeedLookupResult(Status status, String message) { + super(status, message); + } + + public FeedProperties[] getElements() { + return elements; + } + + public void setElements(FeedProperties[] elements) { + this.elements = elements; + } + + @Override + public String toString() { + StringBuilder buffer = new StringBuilder(); + for (FeedProperties element : elements) { + buffer.append(element.toString()); + } + return buffer.toString(); + } + + /** + * A single instance in the result. + */ + @XmlRootElement(name = "feed") + public static class FeedProperties { + @XmlElement + private String feedName; + + @XmlElement + private LocationType locationType; + + @XmlElement + private String clusterName; + + public FeedProperties(String feedName, LocationType locationType, String clusterName){ + this.clusterName = clusterName; + this.locationType = locationType; + this.feedName = feedName; + } + + //for JAXB + private FeedProperties(){} + + public void setFeedName(String feedName) { + this.feedName = feedName; + } + + public void setLocationType(LocationType locationType) { + this.locationType = locationType; + } + + public void setClusterName(String clusterName) { + this.clusterName = clusterName; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + FeedProperties that = (FeedProperties) o; + if (!StringUtils.equals(clusterName, that.clusterName)) { + return false; + } + if (locationType != that.locationType) { + return false; + } + if (!StringUtils.equals(feedName, that.feedName)) { + return false; + } + return true; + } + + @Override + public int hashCode() { + int result = feedName.hashCode(); + result = 31 * result + (locationType != null ? locationType.hashCode() : 0); + result = 31 * result + (clusterName != null ? clusterName.hashCode() : 0); + return result; + } + + @Override + public String toString() { + return feedName + " (CLUSTER:" + clusterName + ") (LocationType:" + locationType.name() + ")"; + } + + } + +} http://git-wip-us.apache.org/repos/asf/falcon/blob/f37729df/common/src/main/java/org/apache/falcon/entity/store/FeedLocationStore.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/store/FeedLocationStore.java b/common/src/main/java/org/apache/falcon/entity/store/FeedLocationStore.java index e056d96..c8ae0f5 100644 --- a/common/src/main/java/org/apache/falcon/entity/store/FeedLocationStore.java +++ b/common/src/main/java/org/apache/falcon/entity/store/FeedLocationStore.java @@ -26,12 +26,14 @@ import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.feed.Cluster; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.feed.Location; -import org.apache.falcon.entity.v0.feed.LocationType; +import org.apache.falcon.resource.FeedLookupResult; import org.apache.falcon.service.ConfigurationChangeListener; +import org.apache.falcon.util.FalconRadixUtils; import org.apache.falcon.util.RadixTree; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collection; import java.util.List; /** @@ -64,7 +66,8 @@ import java.util.List; public final class FeedLocationStore implements ConfigurationChangeListener { private static final Logger LOG = LoggerFactory.getLogger(FeedLocationStore.class); - protected final FeedPathStore<FeedProperties> store = new RadixTree<FeedProperties>(); + protected final FeedPathStore<FeedLookupResult.FeedProperties> store = new + RadixTree<FeedLookupResult.FeedProperties>(); private static FeedLocationStore instance = new FeedLocationStore(); @@ -75,55 +78,6 @@ public final class FeedLocationStore implements ConfigurationChangeListener { return instance; } - /** - * Object stored against each path. - */ - public static class FeedProperties { - private final String feedName; - - private final LocationType locationType; - - private final String clusterName; - - public FeedProperties(String feedName, LocationType locationType, String clusterName){ - this.clusterName = clusterName; - this.locationType = locationType; - this.feedName = feedName; - } - - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - FeedProperties that = (FeedProperties) o; - if (!StringUtils.equals(clusterName, that.clusterName)) { - return false; - } - if (locationType != that.locationType) { - return false; - } - if (!StringUtils.equals(feedName, that.feedName)) { - return false; - } - return true; - } - - @Override - public int hashCode() { - int result = feedName.hashCode(); - result = 31 * result + (locationType != null ? locationType.hashCode() : 0); - result = 31 * result + (clusterName != null ? clusterName.hashCode() : 0); - return result; - } - - } - - @Override public void onAdd(Entity entity) throws FalconException { if (entity.getEntityType() == EntityType.FEED){ @@ -132,13 +86,15 @@ public final class FeedLocationStore implements ConfigurationChangeListener { for(Cluster cluster: clusters){ List<Location> clusterSpecificLocations = FeedHelper.getLocations(FeedHelper.getCluster(feed, cluster.getName()), feed); - for(Location location: clusterSpecificLocations){ - if (location != null && StringUtils.isNotBlank(location.getPath())){ - FeedProperties value = new FeedProperties(feed.getName(), location.getType(), - cluster.getName()); - store.insert(StringUtils.trim(location.getPath()), value); - LOG.debug("Inserted location: {} for feed: {} and cluster: {}", - location.getPath(), feed.getName(), cluster.getName()); + if (clusterSpecificLocations != null) { + for(Location location: clusterSpecificLocations){ + if (location != null && StringUtils.isNotBlank(location.getPath())){ + FeedLookupResult.FeedProperties value = new FeedLookupResult.FeedProperties(feed.getName(), + location.getType(), cluster.getName()); + store.insert(StringUtils.trim(location.getPath()), value); + LOG.debug("Inserted location: {} for feed: {} and cluster: {}", + location.getPath(), feed.getName(), cluster.getName()); + } } } } @@ -159,16 +115,19 @@ public final class FeedLocationStore implements ConfigurationChangeListener { for(Cluster cluster: clusters){ List<Location> clusterSpecificLocations = FeedHelper.getLocations(FeedHelper.getCluster(feed, cluster.getName()), feed); - for(Location location: clusterSpecificLocations){ - if (location != null && StringUtils.isNotBlank(location.getPath())){ - FeedProperties value = new FeedProperties(feed.getName(), location.getType(), - cluster.getName()); - store.delete(location.getPath(), value); - LOG.debug("Deleted location: {} for feed: {} and cluster: {}", - location.getPath(), feed.getName(), cluster.getName()); + if (clusterSpecificLocations != null) { + for(Location location: clusterSpecificLocations){ + if (location != null && StringUtils.isNotBlank(location.getPath())){ + FeedLookupResult.FeedProperties value = new FeedLookupResult.FeedProperties(feed.getName(), + location.getType(), cluster.getName()); + LOG.debug("Delete called for location: {} for feed: {} and cluster: {}", + location.getPath(), feed.getName(), cluster.getName()); + store.delete(location.getPath(), value); + LOG.debug("Deleted location: {} for feed: {} and cluster: {}", + location.getPath(), feed.getName(), cluster.getName()); + } } } - } } @@ -190,4 +149,9 @@ public final class FeedLocationStore implements ConfigurationChangeListener { public void onReload(Entity entity) throws FalconException { onAdd(entity); } + + + public Collection<FeedLookupResult.FeedProperties> reverseLookup(String path) { + return store.find(path, new FalconRadixUtils.FeedRegexAlgorithm()); + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/f37729df/common/src/main/java/org/apache/falcon/util/FalconRadixUtils.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/util/FalconRadixUtils.java b/common/src/main/java/org/apache/falcon/util/FalconRadixUtils.java index bbd73c7..4bf6e00 100644 --- a/common/src/main/java/org/apache/falcon/util/FalconRadixUtils.java +++ b/common/src/main/java/org/apache/falcon/util/FalconRadixUtils.java @@ -98,7 +98,12 @@ public class FalconRadixUtils { } - static class FeedRegexAlgorithm implements INodeAlgorithm { + /** + * Regular Expression Algorithm for the radix tree. + * + * It traverses the radix tree and matches expressions like ${YEAR} etc. with their allowable values e.g. 2014 + */ + public static class FeedRegexAlgorithm implements INodeAlgorithm { /** * This function matches a feed path template with feed instance's path string. http://git-wip-us.apache.org/repos/asf/falcon/blob/f37729df/common/src/main/java/org/apache/falcon/util/RadixTree.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/util/RadixTree.java b/common/src/main/java/org/apache/falcon/util/RadixTree.java index 6cd79f5..436d7f2 100644 --- a/common/src/main/java/org/apache/falcon/util/RadixTree.java +++ b/common/src/main/java/org/apache/falcon/util/RadixTree.java @@ -264,6 +264,7 @@ public class RadixTree<T> implements FeedPathStore<T>, Formattable { } if (StringUtils.equals(key, currentNode.getKey())){ + LOG.trace("Current node's key:{} and the input key:{} matched", currentNode.getKey(), key); if (currentNode.getValues().contains(value)){ LOG.debug("Given value is found in the collection of values against the given key"); currentNode.removeValue(value); @@ -315,6 +316,7 @@ public class RadixTree<T> implements FeedPathStore<T>, Formattable { return false; } } + return true; }else { LOG.debug("Current value is not found in the collection of values against the given key, no-op"); return false; @@ -326,6 +328,7 @@ public class RadixTree<T> implements FeedPathStore<T>, Formattable { RadixNode<T> newRoot = null; String remainingKey = key.substring(currentNode.getMatchLength(key)); for(RadixNode<T> el : currentNode.getChildren()){ + LOG.trace("Finding next child to follow. Current child's key:{}", el.getKey()); if (el.getKey().charAt(0) == remainingKey.charAt(0)){ newRoot = el; break; http://git-wip-us.apache.org/repos/asf/falcon/blob/f37729df/common/src/main/resources/startup.properties ---------------------------------------------------------------------- diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties index 433c2a8..99dab59 100644 --- a/common/src/main/resources/startup.properties +++ b/common/src/main/resources/startup.properties @@ -43,6 +43,7 @@ *.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\ org.apache.falcon.entity.ColoClusterRelation,\ org.apache.falcon.group.FeedGroupMap,\ + org.apache.falcon.entity.store.FeedLocationStore,\ org.apache.falcon.service.SharedLibraryHostingService ##### JMS MQ Broker Implementation class ##### http://git-wip-us.apache.org/repos/asf/falcon/blob/f37729df/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java b/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java index 86ef775..611c205 100644 --- a/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java +++ b/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java @@ -21,6 +21,7 @@ package org.apache.falcon.entity.store; import org.apache.commons.io.FileUtils; import org.apache.falcon.FalconException; import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.feed.CatalogTable; import org.apache.falcon.entity.v0.feed.Cluster; import org.apache.falcon.entity.v0.feed.Clusters; import org.apache.falcon.entity.v0.feed.Feed; @@ -28,6 +29,7 @@ import org.apache.falcon.entity.v0.feed.Location; import org.apache.falcon.entity.v0.feed.LocationType; import org.apache.falcon.entity.v0.feed.Locations; import org.apache.falcon.security.CurrentUser; +import org.apache.falcon.util.FalconRadixUtils; import org.apache.falcon.util.StartupProperties; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -144,6 +146,32 @@ public class FeedLocationStoreTest { } + @Test + public void testFindWithRegularExpression() throws FalconException { + Feed f = createFeed("findUsingRegexFeed"); + f.getLocations().getLocations().add(createLocation(LocationType.DATA, + "/falcon/test/input/${YEAR}/${MONTH}/${DAY}/${HOUR}")); + store.publish(EntityType.FEED, f); + Assert.assertNotNull(FeedLocationStore.get().store.find("/falcon/test/input/2014/12/12/23", + new FalconRadixUtils.FeedRegexAlgorithm())); + } + + @Test + public void testAddCatalogStorageFeeds() throws FalconException { + //this test ensure that catalog feeds are ignored in FeedLocationStore + Feed f = createCatalogFeed("catalogFeed"); + store.publish(EntityType.FEED, f); + Assert.assertTrue(true); + } + + private Feed createCatalogFeed(String name) { + Feed f = new Feed(); + f.setName(name); + f.setClusters(createBlankClusters()); + f.setTable(new CatalogTable()); + return f; + } + private Feed createFeed(String name){ Feed f = new Feed(); Locations locations = new Locations(); http://git-wip-us.apache.org/repos/asf/falcon/blob/f37729df/common/src/test/java/org/apache/falcon/util/RadixTreeTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/util/RadixTreeTest.java b/common/src/test/java/org/apache/falcon/util/RadixTreeTest.java index 109c24d..b602a09 100644 --- a/common/src/test/java/org/apache/falcon/util/RadixTreeTest.java +++ b/common/src/test/java/org/apache/falcon/util/RadixTreeTest.java @@ -18,9 +18,9 @@ package org.apache.falcon.util; -import org.apache.falcon.entity.store.FeedLocationStore; import org.apache.falcon.entity.store.FeedPathStore; import org.apache.falcon.entity.v0.feed.LocationType; +import org.apache.falcon.resource.FeedLookupResult; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -222,7 +222,15 @@ public class RadixTreeTest { Assert.assertTrue(tree.delete("key2", "value2")); tree.insert("water", "water"); Assert.assertTrue(tree.find("water").contains("water")); + } + @Test + public void testDeleteFromListAndChildren() { + //check that a delete of a key with multiple values and children is handled + tree.insert("keyWithManyValuesAndChild", "value1"); + tree.insert("keyWithManyValuesAndChild", "value2"); + tree.insert("keyWithManyValuesAndChildren", "childValue"); + Assert.assertTrue(tree.delete("keyWithManyValuesAndChild", "value1")); } @Test @@ -268,15 +276,15 @@ public class RadixTreeTest { @Test public void testFeedPropertiesEquals() { - FeedLocationStore.FeedProperties f1 = new FeedLocationStore.FeedProperties("feed", + FeedLookupResult.FeedProperties f1 = new FeedLookupResult.FeedProperties("feed", LocationType.DATA, "cluster"); - FeedLocationStore.FeedProperties f1Copy = new FeedLocationStore.FeedProperties("feed", + FeedLookupResult.FeedProperties f1Copy = new FeedLookupResult.FeedProperties("feed", LocationType.DATA, "cluster"); - FeedLocationStore.FeedProperties f3 = new FeedLocationStore.FeedProperties("anotherFeed", + FeedLookupResult.FeedProperties f3 = new FeedLookupResult.FeedProperties("anotherFeed", LocationType.DATA, "cluster"); - FeedLocationStore.FeedProperties f4 = new FeedLocationStore.FeedProperties("feed", + FeedLookupResult.FeedProperties f4 = new FeedLookupResult.FeedProperties("feed", LocationType.STATS, "cluster"); - FeedLocationStore.FeedProperties f5 = new FeedLocationStore.FeedProperties("feed", + FeedLookupResult.FeedProperties f5 = new FeedLookupResult.FeedProperties("feed", LocationType.DATA, "anotherCluster"); Assert.assertTrue(f1.equals(f1Copy)); http://git-wip-us.apache.org/repos/asf/falcon/blob/f37729df/docs/src/site/twiki/FalconCLI.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/FalconCLI.twiki b/docs/src/site/twiki/FalconCLI.twiki index 64e96d3..22ffbe7 100644 --- a/docs/src/site/twiki/FalconCLI.twiki +++ b/docs/src/site/twiki/FalconCLI.twiki @@ -111,6 +111,19 @@ Definition option returns the entity definition submitted earlier during submit Usage: $FALCON_HOME/bin/falcon entity -type [cluster|feed|process] -name <<name>> -definition + +---+++Lookup + +Lookup option tells you which feed does a given path belong to. This can be useful in several scenarios e.g. generally you would want to have a single definition for common feeds like metadata with same location +otherwise it can result in a problem (different retention durations can result in surprises for one team) If you want to check if there are multiple definitions of same metadata then you can pick +an instance of that and run through the lookup command like below. + +Usage: +$FALCON_HOME/bin/falcon entity -type feed -lookup -path /data/projects/my-hourly/2014/10/10/23/ + +If you have multiple feeds with location as /data/projects/my-hourly/${YEAR}/${MONTH}/${DAY}/${HOUR} then this command will return all of them. + + ---++Instance Management Options ---+++Kill http://git-wip-us.apache.org/repos/asf/falcon/blob/f37729df/docs/src/site/twiki/restapi/FeedLookup.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/restapi/FeedLookup.twiki b/docs/src/site/twiki/restapi/FeedLookup.twiki new file mode 100644 index 0000000..1ad91d8 --- /dev/null +++ b/docs/src/site/twiki/restapi/FeedLookup.twiki @@ -0,0 +1,36 @@ +---++ GET api/entities/lookup/feed + * <a href="#Description">Description</a> + * <a href="#Parameters">Parameters</a> + * <a href="#Results">Results</a> + * <a href="#Examples">Examples</a> + +---++ Description + +---++ Parameters + * path path of the instance for which you want to determine the feed. e.g. /data/project1/2014/10/10/23/ + Path has to be the complete path and can't be a part of it. + +---++ Results +Returns the name of the feed along with the location type(meta/data/stats) and cluster on which the given path belongs to this feed. + +---++ Examples +---+++ Rest Call +<verbatim> +GET http://localhost:15000/api/entities/lookup/feed?path=/data/project1/2014/10/10/23 +</verbatim> +---+++ Result +{ + "feeds": + [ + { + "feedName": "My-Feed1", + "locationType": "DATA", + "clusterName": "My-cluster1" + }, + { + "feedName": "My-Feed2", + "locationType": "DATA", + "clusterName": "My-cluster2" + } + ] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/f37729df/docs/src/site/twiki/restapi/ResourceList.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/restapi/ResourceList.twiki b/docs/src/site/twiki/restapi/ResourceList.twiki index 2f37bb3..060e0af 100644 --- a/docs/src/site/twiki/restapi/ResourceList.twiki +++ b/docs/src/site/twiki/restapi/ResourceList.twiki @@ -52,6 +52,7 @@ See also: [[../Security.twiki][Security in Falcon]] | GET | [[EntityList][api/entities/list/:entity-type]] | Get the list of entities | | GET | [[EntitySummary][api/entities/summary/:entity-type/:cluster]] | Get instance summary of all entities | | GET | [[EntityDependencies][api/entities/dependencies/:entity-type/:entity-name]] | Get the dependencies of the entity | +| GET | [[FeedLookup][api/entities/lookup/feed/]] | Get feed for given path | ---++ REST Call on Feed and Process Instances http://git-wip-us.apache.org/repos/asf/falcon/blob/f37729df/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java index 67a66c6..8c32469 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java @@ -33,6 +33,7 @@ import org.apache.falcon.entity.parser.EntityParserFactory; import org.apache.falcon.entity.parser.ValidationException; import org.apache.falcon.entity.store.ConfigurationStore; import org.apache.falcon.entity.store.EntityAlreadyExistsException; +import org.apache.falcon.entity.store.FeedLocationStore; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityGraph; import org.apache.falcon.entity.v0.EntityIntegrityChecker; @@ -944,6 +945,51 @@ public abstract class AbstractEntityManager { } } + + /** + * Given the location of data, returns the feed. + * @param type type of the entity, is valid only for feeds. + * @param instancePath location of the data + * @return Feed Name, type of the data and cluster name. + */ + public FeedLookupResult reverseLookup(String type, String instancePath) { + try { + EntityType entityType = EntityType.getEnum(type); + if (entityType != EntityType.FEED) { + LOG.error("Reverse Lookup is not supported for entitytype: {}", type); + throw new IllegalArgumentException("Reverse lookup is not supported for " + type); + } + + instancePath = StringUtils.trim(instancePath); + String instancePathWithoutSlash = + instancePath.endsWith("/") ? StringUtils.removeEnd(instancePath, "/") : instancePath; + // treat strings with and without trailing slash as same for purpose of searching e.g. + // /data/cas and /data/cas/ should be treated as same. + String instancePathWithSlash = instancePathWithoutSlash + "/"; + FeedLocationStore store = FeedLocationStore.get(); + Collection<FeedLookupResult.FeedProperties> feeds = new ArrayList<>(); + Collection<FeedLookupResult.FeedProperties> res = store.reverseLookup(instancePathWithoutSlash); + if (res != null) { + feeds.addAll(res); + } + res = store.reverseLookup(instancePathWithSlash); + if (res != null) { + feeds.addAll(res); + } + FeedLookupResult result = new FeedLookupResult(APIResult.Status.SUCCEEDED, "SUCCESS"); + FeedLookupResult.FeedProperties[] props = feeds.toArray(new FeedLookupResult.FeedProperties[0]); + result.setElements(props); + return result; + + } catch (IllegalArgumentException e) { + throw FalconWebException.newException(e, Response.Status.BAD_REQUEST); + } catch (Throwable throwable) { + LOG.error("reverse look up failed", throwable); + throw FalconWebException.newException(throwable, Response.Status.INTERNAL_SERVER_ERROR); + } + } + + protected AbstractWorkflowEngine getWorkflowEngine() { return this.workflowEngine; } http://git-wip-us.apache.org/repos/asf/falcon/blob/f37729df/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java index 7ba289a..1c365ab 100644 --- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java +++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java @@ -32,6 +32,7 @@ import org.apache.falcon.resource.APIResult; import org.apache.falcon.resource.AbstractSchedulableEntityManager; import org.apache.falcon.resource.EntityList; import org.apache.falcon.resource.EntitySummaryResult; +import org.apache.falcon.resource.FeedLookupResult; import org.apache.falcon.resource.channel.Channel; import org.apache.falcon.resource.channel.ChannelFactory; @@ -483,6 +484,16 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana return super.getEntitySummary(type, cluster, startStr, endStr, entityFields, entityFilter, entityTags, entityOrderBy, entitySortOrder, entityOffset, numEntities, numInstanceResults); } + + @GET + @Path("lookup/{type}") + @Produces({MediaType.APPLICATION_JSON, MediaType.TEXT_PLAIN}) + @Monitored(event = "reverse-lookup") + public FeedLookupResult reverseLookup( + @Dimension("type") @PathParam("type") final String type, + @Dimension("path") @QueryParam("path") final String path) { + return super.reverseLookup(type, path); + } //RESUME CHECKSTYLE CHECK ParameterNumberCheck private abstract class EntityProxy { http://git-wip-us.apache.org/repos/asf/falcon/blob/f37729df/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java ---------------------------------------------------------------------- diff --git a/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java b/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java index 9504d3f..6f75111 100644 --- a/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java +++ b/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java @@ -20,8 +20,14 @@ package org.apache.falcon.resource; import org.apache.commons.lang.StringUtils; import org.apache.falcon.FalconException; import org.apache.falcon.FalconWebException; +import org.apache.falcon.entity.store.FeedLocationStore; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.feed.Cluster; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.entity.v0.feed.Location; +import org.apache.falcon.entity.v0.feed.LocationType; +import org.apache.falcon.entity.v0.feed.Locations; import org.apache.falcon.entity.v0.process.ACL; import org.apache.falcon.entity.v0.process.Clusters; import org.apache.falcon.entity.v0.process.Process; @@ -31,7 +37,7 @@ import org.apache.falcon.util.StartupProperties; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.testng.Assert; -import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeTest; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -55,9 +61,11 @@ public class EntityManagerTest extends AbstractEntityManager { private static final String SAMPLE_INVALID_PROCESS_XML = "/process-invalid.xml"; private static final long DAY_IN_MILLIS = 86400000L; - @BeforeClass - public void init() { + @BeforeTest + public void init() throws Exception { MockitoAnnotations.initMocks(this); + configStore.unregisterListener(FeedLocationStore.get()); + configStore.registerListener(FeedLocationStore.get()); } @SuppressWarnings("unused") @@ -268,6 +276,53 @@ public class EntityManagerTest extends AbstractEntityManager { } + @Test + public void testReverseLookup() throws Exception { + Feed f = buildFeed("sampleFeed"); + configStore.publish(EntityType.FEED, f); + Assert.assertNotNull(reverseLookup("feed", "/falcon/test/input/2014/12/10/23")); + } + + private Location createLocation(LocationType type, String path){ + Location location = new Location(); + location.setPath(path); + location.setType(type); + return location; + } + + private Feed buildFeed(String name) { + org.apache.falcon.entity.v0.feed.ACL acl = new org.apache.falcon.entity.v0.feed.ACL(); + acl.setOwner("user"); + acl.setGroup("hdfs"); + acl.setPermission("*"); + + Feed feed = new Feed(); + feed.setName(name); + feed.setACL(acl); + + feed.setClusters(createBlankClusters()); + Locations locations = new Locations(); + feed.setLocations(locations); + + feed.getLocations().getLocations().add(createLocation(LocationType.DATA, + "/falcon/test/input/${YEAR}/${MONTH}/${DAY}/${HOUR}")); + return feed; + } + + private org.apache.falcon.entity.v0.feed.Clusters createBlankClusters() { + org.apache.falcon.entity.v0.feed.Clusters clusters = new org.apache.falcon.entity.v0.feed.Clusters(); + + Cluster cluster = new Cluster(); + cluster.setName("blankCluster1"); + clusters.getClusters().add(cluster); + + Cluster cluster2 = new Cluster(); + cluster2.setName("blankCluster2"); + clusters.getClusters().add(cluster2); + + return clusters; + } + private Entity buildProcess(String name, String username, String tags, String pipelines) { ACL acl = new ACL(); acl.setOwner(username); http://git-wip-us.apache.org/repos/asf/falcon/blob/f37729df/src/conf/startup.properties ---------------------------------------------------------------------- diff --git a/src/conf/startup.properties b/src/conf/startup.properties index 2db4b1e..70efc9d 100644 --- a/src/conf/startup.properties +++ b/src/conf/startup.properties @@ -50,6 +50,7 @@ prism.application.services=org.apache.falcon.entity.store.ConfigurationStore *.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\ org.apache.falcon.entity.ColoClusterRelation,\ org.apache.falcon.group.FeedGroupMap,\ + org.apache.falcon.entity.store.FeedLocationStore,\ org.apache.falcon.service.SharedLibraryHostingService ##### Prism Configuration Store Change listeners ##### http://git-wip-us.apache.org/repos/asf/falcon/blob/f37729df/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java index a83f0cf..52adb5f 100644 --- a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java +++ b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java @@ -162,4 +162,15 @@ public class SchedulableEntityManager extends AbstractSchedulableEntityManager { return super.touch(type, entityName, colo); } + @GET + @Path("lookup/{type}/") + @Produces(MediaType.APPLICATION_JSON) + @Monitored(event = "reverse-lookup") + public FeedLookupResult reverseLookup( + @Context HttpServletRequest request, + @Dimension("type") @PathParam("type") String type, + @Dimension("path") @QueryParam("path") String instancePath) { + return super.reverseLookup(type, instancePath); + } + } http://git-wip-us.apache.org/repos/asf/falcon/blob/f37729df/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java index d46f112..bfad011 100644 --- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java +++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java @@ -75,6 +75,10 @@ public class FalconCLIIT { "falcon/default/Submit successful (feed) " + overlay.get("inputFeedName")); + // Test the lookup command + Assert.assertEquals(executeWithURL("entity -lookup -type feed -path " + + "/falcon/test/input/2014/11/23/23"), 0); + filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay); Assert.assertEquals(executeWithURL("entity -submit -type feed -file " + filePath), 0); Assert.assertEquals(
