Repository: falcon Updated Branches: refs/heads/master 55ce6bb0a -> c31af6214
FALCON-99 Adding late data to process doesn't create new coord. COntributed by Pallavi Rao Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/c31af621 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/c31af621 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/c31af621 Branch: refs/heads/master Commit: c31af621452ea58dba3089ef90d3c0f7490c8eb2 Parents: 55ce6bb Author: Suhas Vasu <[email protected]> Authored: Thu Jun 18 15:35:10 2015 +0530 Committer: Suhas Vasu <[email protected]> Committed: Thu Jun 18 15:35:10 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ .../org/apache/falcon/entity/EntityUtil.java | 5 ++++ .../apache/falcon/update/UpdateHelperTest.java | 31 +++++++++++++++++++- 3 files changed, 37 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/c31af621/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c80909a..99c3528 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -43,6 +43,8 @@ Trunk (Unreleased) (Suhas Vasu) BUG FIXES + FALCON-99 Adding late data to process doesn't create new coord (Pallavi Rao via Suhas Vasu) + FALCON-1101 Cluster submission in falcon does not create an owned-by edge(Sowmya Ramesh via Ajay Yadava) FALCON-1104 Exception while adding process instance to graphdb when feed has partition expression http://git-wip-us.apache.org/repos/asf/falcon/blob/c31af621/common/src/main/java/org/apache/falcon/entity/EntityUtil.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java index f4f266a..b86d9d7 100644 --- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java +++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java @@ -428,6 +428,11 @@ public final class EntityUtil { if (!key.equals("class")) { mapToProperties(map.get(key), name != null ? name + "." + key : key, propMap, filterProps); + } else { + // Just add the parent element to the list too. + // Required to detect addition/removal of optional elements with child nodes. + // For example, late-process + propMap.put(((Class)map.get(key)).getSimpleName(), ""); } } } catch (Exception e1) { http://git-wip-us.apache.org/repos/asf/falcon/blob/c31af621/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java b/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java index 716552d..3e48e26 100644 --- a/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java +++ b/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java @@ -39,6 +39,7 @@ import org.apache.falcon.entity.v0.feed.Locations; import org.apache.falcon.entity.v0.feed.Partition; import org.apache.falcon.entity.v0.feed.Properties; import org.apache.falcon.entity.v0.feed.Property; +import org.apache.falcon.entity.v0.process.LateProcess; import org.apache.falcon.entity.v0.process.PolicyType; import org.apache.falcon.entity.v0.process.Process; import org.apache.hadoop.fs.FileSystem; @@ -251,7 +252,6 @@ public class UpdateHelperTest extends AbstractTestBase { Path feedPath = EntityUtil.getNewStagingPath(clusterEntity, oldFeed); Assert.assertFalse(UpdateHelper.isEntityUpdated(oldFeed, newFeed, cluster, feedPath)); - Assert.assertFalse(UpdateHelper.isEntityUpdated(oldFeed, newFeed, cluster, feedPath)); newFeed.getACL().setOwner("new-user"); newFeed.getACL().setGroup("new-group"); Assert.assertNotEquals(oldFeed.getACL().getOwner(), newFeed.getACL().getOwner()); @@ -272,6 +272,35 @@ public class UpdateHelperTest extends AbstractTestBase { Assert.assertTrue(UpdateHelper.isEntityUpdated(oldProcess, newProcess, cluster, procPath)); } + @Test + public void testIsEntityLateProcessUpdated() throws Exception { + String cluster = "testCluster"; + Cluster clusterEntity = ConfigurationStore.get().get(EntityType.CLUSTER, cluster); + Process oldProcess = processParser.parseAndValidate(this.getClass().getResourceAsStream(PROCESS_XML)); + prepare(oldProcess); + Path procPath = EntityUtil.getNewStagingPath(clusterEntity, oldProcess); + + // The Process should not be updated when late processing is updated. + // As the definition does not affect the Oozie workflow. + Process newProcess = (Process) oldProcess.copy(); + newProcess.getLateProcess().setPolicy(PolicyType.FINAL); + Assert.assertFalse(UpdateHelper.isEntityUpdated(oldProcess, newProcess, cluster, procPath)); + + LateProcess lateProcess = newProcess.getLateProcess(); + newProcess.setLateProcess(null); + + // The Process should be updated when late processing is removed. + // Pre-processing needs to be removed from the workflow + Assert.assertTrue(UpdateHelper.isEntityUpdated(oldProcess, newProcess, cluster, procPath)); + + Process newerProcess = (Process) newProcess.copy(); + newerProcess.setLateProcess(lateProcess); + + // The Process should be updated when late processing is added. + // Pre-processing needs to be added to the workflow + Assert.assertTrue(UpdateHelper.isEntityUpdated(newProcess, newerProcess, cluster, procPath)); + } + private static Location getLocation(Feed feed, LocationType type, String cluster) { org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster); if (feedCluster.getLocations() != null) {
