Repository: falcon Updated Branches: refs/heads/master 6f78180ff -> 0aeb6c89a
FALCON-1125 Feed Lookup API doesnt work via prism. Contributed by Ajay Yadava Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/0aeb6c89 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/0aeb6c89 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/0aeb6c89 Branch: refs/heads/master Commit: 0aeb6c89a7fa16ebe8183c7bf66bfa8187b563a6 Parents: 6f78180 Author: Suhas Vasu <[email protected]> Authored: Tue Mar 31 00:54:25 2015 +0530 Committer: Suhas Vasu <[email protected]> Committed: Tue Mar 31 00:54:25 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 5 +- .../falcon/resource/FeedLookupResult.java | 19 ++++++++ .../falcon/entity/store/FeedLocationStore.java | 25 +++++----- .../entity/store/FeedLocationStoreTest.java | 21 +++++++-- .../falcon/resource/AbstractEntityManager.java | 2 +- .../proxy/SchedulableEntityManagerProxy.java | 48 ++++++++++++++++---- .../resource/SchedulableEntityManager.java | 3 +- 7 files changed, 96 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/0aeb6c89/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c136a86..4c7ba2b 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -123,11 +123,14 @@ Trunk (Unreleased) (Suhas vasu) BUG FIXES + FALCON-1125 Feed Lookup API doesnt work via prism + (Ajay Yadava via Suhas Vasu) + FALCON-1119 Instance logs option is not returning the log location (Suhas Vasu) FALCON-1100 UI : Failed to load data. 404 not found - (Pallavi Rao via Suhas Vasu) + (Sowmya Ramesh via Suhas Vasu) FALCON-1123 Stacktrace printed by Falcon CLI is not useful to user (Pallavi Rao via Suhas Vasu) http://git-wip-us.apache.org/repos/asf/falcon/blob/0aeb6c89/client/src/main/java/org/apache/falcon/resource/FeedLookupResult.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/resource/FeedLookupResult.java b/client/src/main/java/org/apache/falcon/resource/FeedLookupResult.java index 1198827..124feba 100644 --- a/client/src/main/java/org/apache/falcon/resource/FeedLookupResult.java +++ b/client/src/main/java/org/apache/falcon/resource/FeedLookupResult.java @@ -54,6 +54,25 @@ public class FeedLookupResult extends APIResult { this.elements = elements; } + + @Override + public Object[] getCollection() { + return getElements(); + } + + @Override + public void setCollection(Object[] items) { + if (items == null) { + setElements(new FeedProperties[0]); + } else { + FeedProperties[] newInstances = new FeedProperties[items.length]; + for (int index = 0; index < items.length; index++) { + newInstances[index] = (FeedProperties)items[index]; + } + setElements(newInstances); + } + } + @Override public String toString() { StringBuilder buffer = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/falcon/blob/0aeb6c89/common/src/main/java/org/apache/falcon/entity/store/FeedLocationStore.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/store/FeedLocationStore.java b/common/src/main/java/org/apache/falcon/entity/store/FeedLocationStore.java index c8ae0f5..fefd74b 100644 --- a/common/src/main/java/org/apache/falcon/entity/store/FeedLocationStore.java +++ b/common/src/main/java/org/apache/falcon/entity/store/FeedLocationStore.java @@ -28,6 +28,7 @@ import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.feed.Location; import org.apache.falcon.resource.FeedLookupResult; import org.apache.falcon.service.ConfigurationChangeListener; +import org.apache.falcon.util.DeploymentUtil; import org.apache.falcon.util.FalconRadixUtils; import org.apache.falcon.util.RadixTree; import org.slf4j.Logger; @@ -83,17 +84,19 @@ public final class FeedLocationStore implements ConfigurationChangeListener { if (entity.getEntityType() == EntityType.FEED){ Feed feed = (Feed) entity; List<Cluster> clusters = feed.getClusters().getClusters(); - for(Cluster cluster: clusters){ - List<Location> clusterSpecificLocations = FeedHelper.getLocations(FeedHelper.getCluster(feed, - cluster.getName()), feed); - if (clusterSpecificLocations != null) { - for(Location location: clusterSpecificLocations){ - if (location != null && StringUtils.isNotBlank(location.getPath())){ - FeedLookupResult.FeedProperties value = new FeedLookupResult.FeedProperties(feed.getName(), - location.getType(), cluster.getName()); - store.insert(StringUtils.trim(location.getPath()), value); - LOG.debug("Inserted location: {} for feed: {} and cluster: {}", - location.getPath(), feed.getName(), cluster.getName()); + for(Cluster cluster: clusters) { + if (DeploymentUtil.getCurrentClusters().contains(cluster.getName())) { + List<Location> clusterSpecificLocations = FeedHelper.getLocations(FeedHelper.getCluster(feed, + cluster.getName()), feed); + if (clusterSpecificLocations != null) { + for (Location location : clusterSpecificLocations) { + if (location != null && StringUtils.isNotBlank(location.getPath())) { + FeedLookupResult.FeedProperties value = new FeedLookupResult.FeedProperties( + feed.getName(), location.getType(), cluster.getName()); + store.insert(StringUtils.trim(location.getPath()), value); + LOG.debug("Inserted location: {} for feed: {} and cluster: {}", + location.getPath(), feed.getName(), cluster.getName()); + } } } } http://git-wip-us.apache.org/repos/asf/falcon/blob/0aeb6c89/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java b/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java index 611c205..4c4b7f3 100644 --- a/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java +++ b/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java @@ -20,6 +20,7 @@ package org.apache.falcon.entity.store; import org.apache.commons.io.FileUtils; import org.apache.falcon.FalconException; +import org.apache.falcon.entity.AbstractTestBase; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.feed.CatalogTable; import org.apache.falcon.entity.v0.feed.Cluster; @@ -45,7 +46,7 @@ import java.util.Collection; /** * Tests for FeedLocationStore. */ -public class FeedLocationStoreTest { +public class FeedLocationStoreTest extends AbstractTestBase { private ConfigurationStore store; @@ -57,8 +58,11 @@ public class FeedLocationStoreTest { FileUtils.deleteDirectory(new File(location)); cleanupStore(); + String listeners = StartupProperties.get().getProperty("configstore.listeners"); StartupProperties.get().setProperty("configstore.listeners", - "org.apache.falcon.entity.store.FeedLocationStore"); + listeners.replace("org.apache.falcon.service.SharedLibraryHostingService", "")); +// StartupProperties.get().setProperty("configstore.listeners", +// "org.apache.falcon.entity.store.FeedLocationStore"); store = ConfigurationStore.get(); store.init(); @@ -68,6 +72,7 @@ public class FeedLocationStoreTest { @BeforeMethod public void setUp() throws FalconException{ cleanupStore(); + createClusters(); } @AfterMethod @@ -200,7 +205,7 @@ public class FeedLocationStoreTest { return location; } - private void cleanupStore() throws FalconException { + protected void cleanupStore() throws FalconException { store = ConfigurationStore.get(); for (EntityType type : EntityType.values()) { Collection<String> entities = store.getEntities(type); @@ -245,4 +250,14 @@ public class FeedLocationStoreTest { return clusters; } + + private void createClusters() throws FalconException { + String[] clusterNames = {"cluster1WithLocations", "cluster2WithLocations", "blankCluster1", "blankCluster2"}; + for (String name : clusterNames) { + org.apache.falcon.entity.v0.cluster.Cluster cluster = new org.apache.falcon.entity.v0.cluster.Cluster(); + cluster.setName(name); + cluster.setColo("default"); + store.publish(EntityType.CLUSTER, cluster); + } + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/0aeb6c89/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java index 8c32469..c0df4d6 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java @@ -952,7 +952,7 @@ public abstract class AbstractEntityManager { * @param instancePath location of the data * @return Feed Name, type of the data and cluster name. */ - public FeedLookupResult reverseLookup(String type, String instancePath) { + public APIResult reverseLookup(String type, String instancePath) { try { EntityType entityType = EntityType.getEnum(type); if (entityType != EntityType.FEED) { http://git-wip-us.apache.org/repos/asf/falcon/blob/0aeb6c89/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java index 1c365ab..8bfc099 100644 --- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java +++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java @@ -42,6 +42,7 @@ import javax.ws.rs.*; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import java.lang.reflect.Constructor; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -492,33 +493,62 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana public FeedLookupResult reverseLookup( @Dimension("type") @PathParam("type") final String type, @Dimension("path") @QueryParam("path") final String path) { - return super.reverseLookup(type, path); + String entity = "DummyEntity"; // A dummy entity name to get around + return new EntityProxy<FeedLookupResult>(type, entity, FeedLookupResult.class) { + @Override + protected Set<String> getColosToApply() { + return getAllColos(); + } + + @Override + protected FeedLookupResult doExecute(String colo) throws FalconException { + return getEntityManager(colo).invoke("reverseLookup", type, path); + } + }.execute(); + } //RESUME CHECKSTYLE CHECK ParameterNumberCheck - private abstract class EntityProxy { + private abstract class EntityProxy<T extends APIResult> { + private final Class<T> clazz; private String type; private String name; - public EntityProxy(String type, String name) { + public EntityProxy(String type, String name, Class<T> resultClazz) { + this.clazz = resultClazz; this.type = type; this.name = name; } - public APIResult execute() { + + private T getResultInstance(APIResult.Status status, String message) { + try { + Constructor<T> constructor = clazz.getConstructor(APIResult.Status.class, String.class); + return constructor.newInstance(status, message); + } catch (Exception e) { + throw new FalconRuntimException("Unable to consolidate result.", e); + } + } + + public EntityProxy(String type, String name) { + this(type, name, (Class<T>) APIResult.class); + } + + public T execute() { Set<String> colos = getColosToApply(); - Map<String, APIResult> results = new HashMap<String, APIResult>(); + Map<String, T> results = new HashMap(); for (String colo : colos) { try { results.put(colo, doExecute(colo)); } catch (FalconException e) { - results.put(colo, - new APIResult(APIResult.Status.FAILED, e.getClass().getName() + "::" + e.getMessage())); + results.put(colo, getResultInstance(APIResult.Status.FAILED, e.getClass().getName() + "::" + + e.getMessage())); } } - APIResult finalResult = consolidateResult(results, APIResult.class); + + T finalResult = consolidateResult(results, clazz); if (finalResult.getStatus() != APIResult.Status.SUCCEEDED) { throw FalconWebException.newException(finalResult, Response.Status.BAD_REQUEST); } else { @@ -530,6 +560,6 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana return getApplicableColos(type, name); } - protected abstract APIResult doExecute(String colo) throws FalconException; + protected abstract T doExecute(String colo) throws FalconException; } } http://git-wip-us.apache.org/repos/asf/falcon/blob/0aeb6c89/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java index 52adb5f..261a9a9 100644 --- a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java +++ b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java @@ -166,8 +166,7 @@ public class SchedulableEntityManager extends AbstractSchedulableEntityManager { @Path("lookup/{type}/") @Produces(MediaType.APPLICATION_JSON) @Monitored(event = "reverse-lookup") - public FeedLookupResult reverseLookup( - @Context HttpServletRequest request, + public APIResult reverseLookup( @Dimension("type") @PathParam("type") String type, @Dimension("path") @QueryParam("path") String instancePath) { return super.reverseLookup(type, instancePath);
