Repository: falcon Updated Branches: refs/heads/master 85ed40eba -> 16d2b39b2
FALCON-1859 Feed export instances are not added to Graph DB Author: Venkatesan Ramachandran <[email protected]> Reviewers: "Balu Vellanki <[email protected]>" Closes #165 from vramachan/FALCON-1859.Export.GraphDB Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/16d2b39b Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/16d2b39b Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/16d2b39b Branch: refs/heads/master Commit: 16d2b39b24f30b3562874585fcdc58ecf4460ad0 Parents: 85ed40e Author: Venkatesan Ramachandran <[email protected]> Authored: Fri Jun 3 07:50:13 2016 -0700 Committer: bvellanki <[email protected]> Committed: Fri Jun 3 07:50:13 2016 -0700 ---------------------------------------------------------------------- .../apache/falcon/entity/v0/EntityGraph.java | 10 +++ .../falcon/entity/v0/EntityGraphTest.java | 73 +++++++++++++++++++- 2 files changed, 82 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/16d2b39b/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java b/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java index e4d9385..acb570e 100644 --- a/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java +++ b/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java @@ -200,6 +200,16 @@ public final class EntityGraph implements ConfigurationChangeListener { feedEdges.add(dbNode); dbEdges.add(feedNode); } + + if (FeedHelper.isExportEnabled(cluster)) { + Node dbNode = new Node(EntityType.DATASOURCE, FeedHelper.getExportDatasourceName(cluster)); + if (!nodeEdges.containsKey(dbNode)) { + nodeEdges.put(dbNode, new HashSet<Node>()); + } + Set<Node> dbEdges = nodeEdges.get(dbNode); + feedEdges.add(dbNode); + dbEdges.add(feedNode); + } } return nodeEdges; } http://git-wip-us.apache.org/repos/asf/falcon/blob/16d2b39b/common/src/test/java/org/apache/falcon/entity/v0/EntityGraphTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/v0/EntityGraphTest.java b/common/src/test/java/org/apache/falcon/entity/v0/EntityGraphTest.java index 23f69d7..b41cc03 100644 --- a/common/src/test/java/org/apache/falcon/entity/v0/EntityGraphTest.java +++ b/common/src/test/java/org/apache/falcon/entity/v0/EntityGraphTest.java @@ -20,17 +20,22 @@ package org.apache.falcon.entity.v0; import org.apache.falcon.entity.AbstractTestBase; import org.apache.falcon.entity.store.ConfigurationStore; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.entity.v0.feed.Load; import org.apache.falcon.entity.v0.feed.Argument; import org.apache.falcon.entity.v0.feed.Arguments; import org.apache.falcon.entity.v0.feed.Clusters; import org.apache.falcon.entity.v0.feed.ClusterType; import org.apache.falcon.entity.v0.feed.Extract; import org.apache.falcon.entity.v0.feed.ExtractMethod; -import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.feed.FieldsType; import org.apache.falcon.entity.v0.feed.FieldIncludeExclude; import org.apache.falcon.entity.v0.feed.Import; import org.apache.falcon.entity.v0.feed.MergeType; +import org.apache.falcon.entity.v0.feed.Export; +import org.apache.falcon.entity.v0.feed.LoadMethod; + + import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.entity.v0.datasource.Datasource; import org.apache.falcon.entity.v0.process.Input; @@ -184,6 +189,36 @@ public class EntityGraphTest extends AbstractTestBase { return imp; } + private Feed addFeedExport(String feed, Cluster cluster, Datasource ds) { + + Feed f1 = new Feed(); + f1.setName(feed); + org.apache.falcon.entity.v0.feed.Cluster feedCluster = + new org.apache.falcon.entity.v0.feed.Cluster(); + feedCluster.setName(cluster.getName()); + feedCluster.setType(ClusterType.SOURCE); + Clusters clusters = new Clusters(); + clusters.getClusters().add(feedCluster); + f1.setClusters(clusters); + + Export exp = getAnExport(LoadMethod.UPDATEONLY, ds); + f1.getClusters().getClusters().get(0).setExport(exp); + return f1; + } + + private Export getAnExport(LoadMethod loadMethod, Datasource ds) { + + org.apache.falcon.entity.v0.feed.Datasource target = new org.apache.falcon.entity.v0.feed.Datasource(); + target.setName(ds.getName()); + target.setTableName("test-table"); + Load load = new Load(); + load.setType(loadMethod); + target.setLoad(load); + Export exp = new Export(); + exp.setTarget(target); + return exp; + } + private void attachInput(Process process, Feed feed) { if (process.getInputs() == null) { process.setInputs(new Inputs()); @@ -382,6 +417,42 @@ public class EntityGraphTest extends AbstractTestBase { } @Test + public void testOnAddExport() throws Exception { + + Datasource ds = new Datasource(); + ds.setName("test-db"); + ds.setColo("c1"); + + Cluster cluster = new Cluster(); + cluster.setName("ci1"); + cluster.setColo("c1"); + + Feed f1 = addFeedExport("fe1", cluster, ds); + + store.publish(EntityType.CLUSTER, cluster); + store.publish(EntityType.DATASOURCE, ds); + store.publish(EntityType.FEED, f1); + + Set<Entity> entities = graph.getDependents(cluster); + Assert.assertEquals(entities.size(), 1); + Assert.assertTrue(entities.contains(f1)); + + entities = graph.getDependents(ds); + Assert.assertEquals(entities.size(), 1); + Assert.assertTrue(entities.contains(f1)); + + entities = graph.getDependents(f1); + Assert.assertEquals(entities.size(), 2); + Assert.assertTrue(entities.contains(cluster)); + Assert.assertTrue(entities.contains(ds)); + + store.remove(EntityType.FEED, "fe1"); + store.remove(EntityType.DATASOURCE, "test-db"); + store.remove(EntityType.CLUSTER, "ci1"); + } + + + @Test public void testOnRemoveDatasource() throws Exception { Datasource ds = new Datasource();
