Repository: falcon Updated Branches: refs/heads/master 53bd6c38e -> bd4434333
FALCON-1260 Instance dependency API produces incorrect results. Contributed by Ajay Yadava. Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/bd443433 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/bd443433 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/bd443433 Branch: refs/heads/master Commit: bd44343333e166e672a62c15c5ef306fadc46dd7 Parents: 53bd6c3 Author: Ajay Yadava <[email protected]> Authored: Mon Jun 29 12:49:43 2015 +0530 Committer: Ajay Yadava <[email protected]> Committed: Mon Jun 29 13:42:33 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../resource/SchedulableEntityInstance.java | 18 +- .../org/apache/falcon/entity/EntityUtil.java | 16 +- .../org/apache/falcon/entity/FeedHelper.java | 38 +- .../org/apache/falcon/entity/ProcessHelper.java | 10 +- .../apache/falcon/entity/FeedHelperTest.java | 349 +++++++++++++++++-- .../apache/falcon/entity/ProcessHelperTest.java | 100 ++++-- docs/src/site/twiki/FalconCLI.twiki | 10 +- .../resource/AbstractInstanceManager.java | 1 + .../resource/proxy/InstanceManagerProxy.java | 7 +- 10 files changed, 458 insertions(+), 93 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/bd443433/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e81af1e..2928497 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -45,6 +45,8 @@ Trunk (Unreleased) (Suhas Vasu) BUG FIXES + FALCON-1260 Instance dependency API produces incorrect results (Ajay Yadava) + FALCON-99 Adding late data to process doesn't create new coord (Pallavi Rao via Suhas Vasu) FALCON-1101 Cluster submission in falcon does not create an owned-by edge(Sowmya Ramesh via Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/bd443433/client/src/main/java/org/apache/falcon/resource/SchedulableEntityInstance.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/resource/SchedulableEntityInstance.java b/client/src/main/java/org/apache/falcon/resource/SchedulableEntityInstance.java index 2a7ecdb..f5be63d 100644 --- a/client/src/main/java/org/apache/falcon/resource/SchedulableEntityInstance.java +++ b/client/src/main/java/org/apache/falcon/resource/SchedulableEntityInstance.java @@ -40,7 +40,7 @@ public class SchedulableEntityInstance { private EntityType entityType; - private String tag; + private String tags; //for JAXB private SchedulableEntityInstance() { @@ -56,12 +56,12 @@ public class SchedulableEntityInstance { } } - public String getTag() { - return tag; + public String getTags() { + return tags; } - public void setTag(String tag) { - this.tag = tag; + public void setTags(String tags) { + this.tags = tags; } public String getEntityName() { @@ -103,7 +103,7 @@ public class SchedulableEntityInstance { + ", type: " + entityType + ", cluster: " + cluster + ", instanceTime: " + SchemaHelper.formatDateUTC(instanceTime)); - sb.append(", tag: " + ((tag != null) ? tag : "")); + sb.append(", tags: " + ((tags != null) ? tags : "")); return sb.toString(); } @@ -134,7 +134,7 @@ public class SchedulableEntityInstance { return false; } - if (!StringUtils.equals(tag, that.tag)) { + if (!StringUtils.equals(tags, that.tags)) { return false; } @@ -147,8 +147,8 @@ public class SchedulableEntityInstance { result = 31 * result + entityName.hashCode(); result = 31 * result + entityType.hashCode(); result = 31 * result + cluster.hashCode(); - if (tag != null) { - result = 31 * result + tag.hashCode(); + if (tags != null) { + result = 31 * result + tags.hashCode(); } return result; } http://git-wip-us.apache.org/repos/asf/falcon/blob/bd443433/common/src/main/java/org/apache/falcon/entity/EntityUtil.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java index b86d9d7..63dfb9d 100644 --- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java +++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java @@ -803,12 +803,12 @@ public final class EntityUtil { Calendar insCal = Calendar.getInstance(tz); insCal.setTime(startTime); - int instanceCount = getInstanceSequence(startTime, frequency, tz, referenceTime); + int instanceCount = getInstanceSequence(startTime, frequency, tz, referenceTime) - 1; final int freq = frequency.getFrequencyAsInt() * instanceCount; insCal.add(frequency.getTimeUnit().getCalendarUnit(), freq); while (insCal.getTime().after(referenceTime)) { - insCal.add(frequency.getTimeUnit().getCalendarUnit(), -1); + insCal.add(frequency.getTimeUnit().getCalendarUnit(), frequency.getFrequencyAsInt() * -1); } return insCal.getTime(); } @@ -849,7 +849,7 @@ public final class EntityUtil { /** * Find instance times given first instance start time and frequency till a given end time. * - * It finds the first valid instance time in the given time range, it then uses frequency to find next instances + * It finds the first valid instance time for the given time range, it then uses frequency to find next instances * in the given time range. * * @param startTime startTime of the entity (time of first instance ever of the given entity) @@ -866,15 +866,15 @@ public final class EntityUtil { timeZone = TimeZone.getTimeZone("UTC"); } - while(true){ - Date nextStartTime = getNextStartTime(startTime, frequency, timeZone, startRange); - if (nextStartTime.before(startRange) || nextStartTime.after(endRange)){ + Date current = getPreviousInstanceTime(startTime, frequency, timeZone, startRange); + while (true) { + Date nextStartTime = getNextStartTime(startTime, frequency, timeZone, current); + if (nextStartTime.after(endRange)){ break; } - result.add(nextStartTime); // this is required because getNextStartTime returns greater than or equal to referenceTime - startRange = new Date(nextStartTime.getTime() + ONE_MS); // 1 milli seconds later + current = new Date(nextStartTime.getTime() + ONE_MS); // 1 milli seconds later } return result; } http://git-wip-us.apache.org/repos/asf/falcon/blob/bd443433/common/src/main/java/org/apache/falcon/entity/FeedHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java index 9f4eb61..eadc8d6 100644 --- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java +++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java @@ -448,7 +448,7 @@ public final class FeedHelper { // validate that instanceTime is in validity range if (feedCluster.getValidity().getStart().after(instanceTime) - || feedCluster.getValidity().getEnd().before(instanceTime)) { + || !feedCluster.getValidity().getEnd().after(instanceTime)) { throw new IllegalArgumentException("instanceTime: " + instanceTime + " is not in validity range for" + " Feed: " + feed.getName() + " on cluster:" + cluster.getName()); } @@ -479,14 +479,20 @@ public final class FeedHelper { //validate the inputs validateFeedInstance(feed, feedInstanceTime, cluster); - Process process = getProducerProcess(feed); if (process != null) { + org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(process, + cluster.getName()); + Date pStart = processCluster.getValidity().getStart(); + Date pEnd = processCluster.getValidity().getEnd(); try { Date processInstanceTime = getProducerInstanceTime(feed, feedInstanceTime, process, cluster); + if (processInstanceTime.before(pStart) || !processInstanceTime.before(pEnd)) { + return null; + } SchedulableEntityInstance producer = new SchedulableEntityInstance(process.getName(), cluster.getName(), processInstanceTime, EntityType.PROCESS); - producer.setTag(SchedulableEntityInstance.OUTPUT); + producer.setTags(SchedulableEntityInstance.OUTPUT); return producer; } catch (FalconException e) { LOG.error("Error in trying to get producer process: {}'s instance time for feed: {}'s instance: } " @@ -576,7 +582,7 @@ public final class FeedHelper { for (Date date : consumerInstanceTimes) { SchedulableEntityInstance in = new SchedulableEntityInstance(p.getName(), cluster.getName(), date, EntityType.PROCESS); - in.setTag(SchedulableEntityInstance.INPUT); + in.setTags(SchedulableEntityInstance.INPUT); result.add(in); } } @@ -645,7 +651,7 @@ public final class FeedHelper { ExpressionHelper.setReferenceDate(processStartDate); ExpressionHelper evaluator = ExpressionHelper.get(); Date startRelative = evaluator.evaluate(in.getStart(), Date.class); - Date startTimeActual = EntityUtil.getNextStartTime(feedStartDate, + Date startTimeActual = EntityUtil.getPreviousInstanceTime(feedStartDate, feed.getFrequency(), feed.getTimezone(), startRelative); Long offset = processStartDate.getTime() - startTimeActual.getTime(); @@ -663,10 +669,15 @@ public final class FeedHelper { ExpressionHelper.setReferenceDate(nextConsumerInstance); evaluator = ExpressionHelper.get(); - Long rangeStart = evaluator.evaluate(in.getStart(), Date.class).getTime(); + Date inputStart = evaluator.evaluate(in.getStart(), Date.class); + Long rangeStart = EntityUtil.getPreviousInstanceTime(feedStartDate, feed.getFrequency(), + feed.getTimezone(), inputStart).getTime(); Long rangeEnd = evaluator.evaluate(in.getEnd(), Date.class).getTime(); - if (rangeStart <= feedInstancetime.getTime() && feedInstancetime.getTime() < rangeEnd) { - result.add(nextConsumerInstance); + if (rangeStart <= feedInstancetime.getTime() && feedInstancetime.getTime() <= rangeEnd) { + if (!nextConsumerInstance.before(processCluster.getValidity().getStart()) + && nextConsumerInstance.before(processCluster.getValidity().getEnd())) { + result.add(nextConsumerInstance); + } } else { break; } @@ -681,10 +692,15 @@ public final class FeedHelper { ExpressionHelper.setReferenceDate(nextConsumerInstance); evaluator = ExpressionHelper.get(); - Long rangeStart = evaluator.evaluate(in.getStart(), Date.class).getTime(); + Date inputStart = evaluator.evaluate(in.getStart(), Date.class); + Long rangeStart = EntityUtil.getPreviousInstanceTime(feedStartDate, feed.getFrequency(), + feed.getTimezone(), inputStart).getTime(); Long rangeEnd = evaluator.evaluate(in.getEnd(), Date.class).getTime(); - if (rangeStart <= feedInstancetime.getTime() && feedInstancetime.getTime() < rangeEnd) { - result.add(nextConsumerInstance); + if (rangeStart <= feedInstancetime.getTime() && feedInstancetime.getTime() <= rangeEnd) { + if (!nextConsumerInstance.before(processCluster.getValidity().getStart()) + && nextConsumerInstance.before(processCluster.getValidity().getEnd())) { + result.add(nextConsumerInstance); + } } else { break; } http://git-wip-us.apache.org/repos/asf/falcon/blob/bd443433/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java index fe78bc8..bbfca68 100644 --- a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java +++ b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java @@ -98,7 +98,7 @@ public final class ProcessHelper { // check if instanceTime is in validity range if (instanceTime.before(processCluster.getValidity().getStart()) - || instanceTime.after(processCluster.getValidity().getEnd())) { + || !instanceTime.before(processCluster.getValidity().getEnd())) { throw new IllegalArgumentException("Instance time provided: " + instanceTime + " is not in validity range of process: " + process.getName() + "on cluster: " + cluster.getName()); @@ -148,7 +148,7 @@ public final class ProcessHelper { SchedulableEntityInstance instance; for (Date time : instanceTimes) { instance = new SchedulableEntityInstance(feed.getName(), cluster.getName(), time, EntityType.FEED); - instance.setTag(SchedulableEntityInstance.INPUT); + instance.setTags(SchedulableEntityInstance.INPUT); result.add(instance); } } @@ -175,11 +175,11 @@ public final class ProcessHelper { // find the feed Feed feed = store.get(EntityType.FEED, output.getFeed()); org.apache.falcon.entity.v0.feed.Cluster fCluster = FeedHelper.getCluster(feed, cluster.getName()); - outputInstance = EntityUtil.getNextStartTime(fCluster.getValidity().getStart(), feed.getFrequency(), - feed.getTimezone(), outputInstance); + outputInstance = EntityUtil.getPreviousInstanceTime(fCluster.getValidity().getStart(), + feed.getFrequency(), feed.getTimezone(), outputInstance); candidate = new SchedulableEntityInstance(output.getFeed(), cluster.getName(), outputInstance, EntityType.FEED); - candidate.setTag(SchedulableEntityInstance.OUTPUT); + candidate.setTags(SchedulableEntityInstance.OUTPUT); result.add(candidate); } } http://git-wip-us.apache.org/repos/asf/falcon/blob/bd443433/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java index f70edfb..b15f023 100644 --- a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java +++ b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java @@ -78,6 +78,155 @@ public class FeedHelperTest extends AbstractTestBase { Assert.assertEquals(FeedHelper.normalizePartitionExpression(null, null), ""); } + @Test(expectedExceptions = IllegalArgumentException.class) + public void testInstanceBeforeStart() throws Exception { + Cluster cluster = publishCluster(); + Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); + Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2012-02-28 10:47 UTC"); + Outputs outputs = new Outputs(); + Output outFeed = new Output(); + outFeed.setName("outputFeed"); + outFeed.setFeed(feed.getName()); + outFeed.setInstance("now(0,0)"); + outputs.getOutputs().add(outFeed); + process.setOutputs(outputs); + store.publish(EntityType.PROCESS, process); + FeedHelper.getProducerInstance(feed, getDate("2011-02-27 10:00 UTC"), cluster); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testInstanceEqualsEnd() throws Exception { + Cluster cluster = publishCluster(); + Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); + Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2012-02-28 10:47 UTC"); + Outputs outputs = new Outputs(); + Output outFeed = new Output(); + outFeed.setName("outputFeed"); + outFeed.setFeed(feed.getName()); + outFeed.setInstance("now(0,0)"); + outputs.getOutputs().add(outFeed); + process.setOutputs(outputs); + store.publish(EntityType.PROCESS, process); + FeedHelper.getProducerInstance(feed, getDate("2016-02-28 10:00 UTC"), cluster); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testInstanceOutOfSync() throws Exception { + Cluster cluster = publishCluster(); + Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); + Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2012-02-28 10:47 UTC"); + Outputs outputs = new Outputs(); + Output outFeed = new Output(); + outFeed.setName("outputFeed"); + outFeed.setFeed(feed.getName()); + outFeed.setInstance("now(0,0)"); + outputs.getOutputs().add(outFeed); + process.setOutputs(outputs); + store.publish(EntityType.PROCESS, process); + FeedHelper.getProducerInstance(feed, getDate("2016-02-28 09:04 UTC"), cluster); + } + + @Test + public void testGetProducerOutOfValidity() throws FalconException, ParseException { + Cluster cluster = publishCluster(); + Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); + Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2012-02-28 10:47 UTC"); + Outputs outputs = new Outputs(); + Output outFeed = new Output(); + outFeed.setName("outputFeed"); + outFeed.setFeed(feed.getName()); + outFeed.setInstance("now(0,0)"); + outputs.getOutputs().add(outFeed); + process.setOutputs(outputs); + store.publish(EntityType.PROCESS, process); + Assert.assertEquals(FeedHelper.getProducerProcess(feed).getName(), process.getName()); + SchedulableEntityInstance result = FeedHelper.getProducerInstance(feed, getDate("2012-02-28 10:45 UTC"), + cluster); + Assert.assertNull(result); + } + + @Test + public void testGetConsumersOutOfValidity() throws Exception { + Cluster cluster = publishCluster(); + Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); + Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2012-02-28 10:47 UTC"); + Inputs inputs = new Inputs(); + Input inFeed = new Input(); + inFeed.setName("inputFeed"); + inFeed.setFeed(feed.getName()); + inFeed.setStart("now(0, -20)"); + inFeed.setEnd("now(0, 0)"); + inputs.getInputs().add(inFeed); + process.setInputs(inputs); + store.publish(EntityType.PROCESS, process); + Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed, getDate("2016-02-28 09:00 UTC"), + cluster); + Assert.assertTrue(result.isEmpty()); + } + + @Test + public void testGetConsumersFirstInstance() throws Exception { + Cluster cluster = publishCluster(); + Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); + Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2012-02-28 10:47 UTC"); + Inputs inputs = new Inputs(); + Input inFeed = new Input(); + inFeed.setName("inputFeed"); + inFeed.setFeed(feed.getName()); + inFeed.setStart("now(0, -20)"); + inFeed.setEnd("now(0, 0)"); + inputs.getInputs().add(inFeed); + process.setInputs(inputs); + store.publish(EntityType.PROCESS, process); + Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed, getDate("2012-02-28 10:15 UTC"), + cluster); + Set<SchedulableEntityInstance> expected = new HashSet<>(); + SchedulableEntityInstance consumer = new SchedulableEntityInstance(process.getName(), cluster.getName(), + getDate("2012-02-28 10:37 UTC"), EntityType.PROCESS); + consumer.setTags(SchedulableEntityInstance.INPUT); + expected.add(consumer); + Assert.assertEquals(result, expected); + } + + @Test + public void testGetConsumersLastInstance() throws Exception { + Cluster cluster = publishCluster(); + Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); + Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:20 UTC", "2016-02-28 10:00 UTC"); + Inputs inputs = new Inputs(); + Input inFeed = new Input(); + inFeed.setName("inputFeed"); + inFeed.setFeed(feed.getName()); + inFeed.setStart("now(0, -20)"); + inFeed.setEnd("now(0, 0)"); + inputs.getInputs().add(inFeed); + process.setInputs(inputs); + store.publish(EntityType.PROCESS, process); + Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed, getDate("2012-02-28 10:15 UTC"), + cluster); + Set<SchedulableEntityInstance> expected = new HashSet<>(); + String[] consumers = { "2012-02-28 10:20 UTC", "2012-02-28 10:30 UTC", }; + for (String d : consumers) { + SchedulableEntityInstance i = new SchedulableEntityInstance(process.getName(), cluster.getName(), + getDate(d), EntityType.PROCESS); + i.setTags(SchedulableEntityInstance.INPUT); + expected.add(i); + } + Assert.assertEquals(result, expected); + } + + @Test + public void testFeedWithNoDependencies() throws Exception { + Cluster cluster = publishCluster(); + Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); + Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed, getDate("2016-02-28 09:00 UTC"), + cluster); + Assert.assertTrue(result.isEmpty()); + SchedulableEntityInstance res = FeedHelper.getProducerInstance(feed, getDate("2012-02-28 10:45 UTC"), + cluster); + Assert.assertNull(res); + } + @Test public void testEvaluateExpression() throws Exception { Cluster cluster = new Cluster(); @@ -144,29 +293,24 @@ public class FeedHelperTest extends AbstractTestBase { @Test public void testGetProducerProcessWithOffset() throws FalconException, ParseException { - //create a feed, submit it, test that ProducerProcess is null - Cluster cluster = publishCluster(); - Feed feed = publishFeed(cluster, "days(1)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); + Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); Assert.assertNull(FeedHelper.getProducerProcess(feed)); - - // create it's producer process submit it, test it's ProducerProcess - Process process = prepareProcess(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); + Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2016-02-28 10:37 UTC"); Outputs outputs = new Outputs(); Output outFeed = new Output(); outFeed.setName("outputFeed"); outFeed.setFeed(feed.getName()); - outFeed.setInstance("today(0,0)"); + outFeed.setInstance("now(0,0)"); outputs.getOutputs().add(outFeed); process.setOutputs(outputs); store.publish(EntityType.PROCESS, process); - Assert.assertEquals(FeedHelper.getProducerProcess(feed).getName(), process.getName()); - SchedulableEntityInstance result = FeedHelper.getProducerInstance(feed, getDate("2013-02-27 10:00 UTC"), + SchedulableEntityInstance result = FeedHelper.getProducerInstance(feed, getDate("2013-02-28 10:35 UTC"), cluster); SchedulableEntityInstance expected = new SchedulableEntityInstance(process.getName(), cluster.getName(), - getDate("2013-02-28 10:00 UTC"), EntityType.PROCESS); - expected.setTag(SchedulableEntityInstance.OUTPUT); + getDate("2013-02-28 10:37 UTC"), EntityType.PROCESS); + expected.setTags(SchedulableEntityInstance.OUTPUT); Assert.assertEquals(result, expected); } @@ -192,7 +336,7 @@ public class FeedHelperTest extends AbstractTestBase { cluster); SchedulableEntityInstance expected = new SchedulableEntityInstance(process.getName(), cluster.getName(), getDate("2013-02-28 10:00 UTC"), EntityType.PROCESS); - expected.setTag(SchedulableEntityInstance.OUTPUT); + expected.setTags(SchedulableEntityInstance.OUTPUT); Assert.assertEquals(result, expected); } @@ -218,7 +362,7 @@ public class FeedHelperTest extends AbstractTestBase { cluster); SchedulableEntityInstance expected = new SchedulableEntityInstance(process.getName(), cluster.getName(), getDate("2013-02-28 10:00 UTC"), EntityType.PROCESS); - expected.setTag(SchedulableEntityInstance.OUTPUT); + expected.setTags(SchedulableEntityInstance.OUTPUT); Assert.assertEquals(result, expected); } @@ -245,7 +389,7 @@ public class FeedHelperTest extends AbstractTestBase { cluster); SchedulableEntityInstance expected = new SchedulableEntityInstance(process.getName(), cluster.getName(), getDate("2013-02-28 10:00 UTC"), EntityType.PROCESS); - expected.setTag(SchedulableEntityInstance.OUTPUT); + expected.setTags(SchedulableEntityInstance.OUTPUT); Assert.assertEquals(result, expected); } @@ -270,7 +414,7 @@ public class FeedHelperTest extends AbstractTestBase { cluster); SchedulableEntityInstance expected = new SchedulableEntityInstance(process.getName(), cluster.getName(), getDate("2013-02-28 10:00 UTC"), EntityType.PROCESS); - expected.setTag(SchedulableEntityInstance.OUTPUT); + expected.setTags(SchedulableEntityInstance.OUTPUT); Assert.assertEquals(result, expected); } @@ -322,19 +466,99 @@ public class FeedHelperTest extends AbstractTestBase { Set<SchedulableEntityInstance> expected = new HashSet<>(); SchedulableEntityInstance ins = new SchedulableEntityInstance(process.getName(), cluster.getName(), getDate("2012-02-28 10:00 UTC"), EntityType.PROCESS); - ins.setTag(SchedulableEntityInstance.INPUT); + ins.setTags(SchedulableEntityInstance.INPUT); expected.add(ins); Assert.assertEquals(result, expected); + } + @Test + public void testGetConsumerProcessInstancesWithNonUnitFrequency() throws Exception { + //create a feed, submit it, test that ConsumerProcesses is blank list + Cluster cluster = publishCluster(); + Feed feed = publishFeed(cluster, "minutes(5)", "2012-02-28 00:00 UTC", "2016-02-28 00:00 UTC"); + + //create a consumer Process and submit it, assert that this is returned in ConsumerProcesses + Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 09:37 UTC", "2016-02-28 10:00 UTC"); + Inputs inputs = new Inputs(); + Input inFeed = new Input(); + inFeed.setName("inputFeed"); + inFeed.setFeed(feed.getName()); + inFeed.setStart("now(0, -20)"); + inFeed.setEnd("now(0,0)"); + inputs.getInputs().add(inFeed); + process.setInputs(inputs); + store.publish(EntityType.PROCESS, process); + + Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed, + getDate("2012-02-28 09:40 UTC"), cluster); + + Set<SchedulableEntityInstance> expected = new HashSet<>(); + String[] consumers = {"2012-02-28 09:47 UTC", "2012-02-28 09:57 UTC"}; + for (String d : consumers) { + SchedulableEntityInstance i = new SchedulableEntityInstance(process.getName(), cluster.getName(), + getDate(d), EntityType.PROCESS); + i.setTags(SchedulableEntityInstance.INPUT); + expected.add(i); + } + Assert.assertEquals(result, expected); } @Test - public void testGetMultipleConsumerInstances() throws Exception { + public void testGetConsumersOutOfValidityRange() throws Exception { + //create a feed, submit it, test that ConsumerProcesses is blank list Cluster cluster = publishCluster(); - Feed feed = publishFeed(cluster, "hours(1)", "2012-02-28 00:00 UTC", "2016-02-28 00:00 UTC"); + Feed feed = publishFeed(cluster, "minutes(5)", "2010-02-28 00:00 UTC", "2016-02-28 00:00 UTC"); + + //create a consumer Process and submit it, assert that this is returned in ConsumerProcesses + Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 09:37 UTC", "2016-02-28 10:00 UTC"); + Inputs inputs = new Inputs(); + Input inFeed = new Input(); + inFeed.setName("inputFeed"); + inFeed.setFeed(feed.getName()); + inFeed.setStart("now(0, -20)"); + inFeed.setEnd("now(0,0)"); + inputs.getInputs().add(inFeed); + process.setInputs(inputs); + store.publish(EntityType.PROCESS, process); + + Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed, + getDate("2010-02-28 09:40 UTC"), cluster); + Assert.assertEquals(result.size(), 0); + } + + @Test + public void testGetConsumersLargeOffsetShortValidity() throws Exception { + //create a feed, submit it, test that ConsumerProcesses is blank list + Cluster cluster = publishCluster(); + Feed feed = publishFeed(cluster, "minutes(5)", "2010-02-28 00:00 UTC", "2016-02-28 00:00 UTC"); //create a consumer Process and submit it, assert that this is returned in ConsumerProcesses - Process process = prepareProcess(cluster, "hours(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); + Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 09:37 UTC", "2012-02-28 09:47 UTC"); + Inputs inputs = new Inputs(); + Input inFeed = new Input(); + inFeed.setName("inputFeed"); + inFeed.setFeed(feed.getName()); + inFeed.setStart("today(-2, 0)"); + inFeed.setEnd("now(0,0)"); + inputs.getInputs().add(inFeed); + process.setInputs(inputs); + store.publish(EntityType.PROCESS, process); + + Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed, + getDate("2012-02-28 09:35 UTC"), cluster); + Set<SchedulableEntityInstance> expected = new HashSet<>(); + SchedulableEntityInstance consumer = new SchedulableEntityInstance(process.getName(), cluster.getName(), + getDate("2012-02-28 09:37 UTC"), EntityType.PROCESS); + consumer.setTags(SchedulableEntityInstance.INPUT); + expected.add(consumer); + Assert.assertEquals(result, expected); + } + + @Test + public void testGetMultipleConsumerInstances() throws Exception { + Cluster cluster = publishCluster(); + Feed feed = publishFeed(cluster, "hours(1)", "2012-02-27 00:00 UTC", "2016-02-28 00:00 UTC"); + Process process = prepareProcess(cluster, "hours(1)", "2012-02-27 10:00 UTC", "2016-02-28 10:00 UTC"); Inputs inputs = new Inputs(); Input inFeed = new Input(); inFeed.setName("inputFeed"); @@ -347,28 +571,27 @@ public class FeedHelperTest extends AbstractTestBase { Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed, getDate("2012-02-28 09:00 UTC"), cluster); - Assert.assertEquals(result.size(), 8); - + Assert.assertEquals(result.size(), 9); Set<SchedulableEntityInstance> expected = new HashSet<>(); String[] consumers = { "2012-02-28 05:00 UTC", "2012-02-28 06:00 UTC", "2012-02-28 07:00 UTC", "2012-02-28 08:00 UTC", "2012-02-28 09:00 UTC", "2012-02-28 10:00 UTC", "2012-02-28 11:00 UTC", - "2012-02-28 12:00 UTC", }; + "2012-02-28 12:00 UTC", "2012-02-28 13:00 UTC", }; for (String d : consumers) { SchedulableEntityInstance i = new SchedulableEntityInstance(process.getName(), cluster.getName(), getDate(d), EntityType.PROCESS); - i.setTag(SchedulableEntityInstance.INPUT); + i.setTags(SchedulableEntityInstance.INPUT); expected.add(i); } Assert.assertEquals(result, expected); } @Test - public void testGetConsumerWithNow() throws Exception { + public void testGetConsumerWithVariableEnd() throws Exception { Cluster cluster = publishCluster(); - Feed feed = publishFeed(cluster, "hours(1)", "2012-02-28 00:00 UTC", "2016-02-28 00:00 UTC"); + Feed feed = publishFeed(cluster, "hours(1)", "2012-02-27 00:00 UTC", "2016-02-28 00:00 UTC"); //create a consumer Process and submit it, assert that this is returned in ConsumerProcesses - Process process = prepareProcess(cluster, "hours(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); + Process process = prepareProcess(cluster, "hours(1)", "2012-02-27 10:00 UTC", "2016-02-28 10:00 UTC"); Inputs inputs = new Inputs(); Input inFeed = new Input(); inFeed.setName("inputFeed"); @@ -378,19 +601,66 @@ public class FeedHelperTest extends AbstractTestBase { inputs.getInputs().add(inFeed); process.setInputs(inputs); store.publish(EntityType.PROCESS, process); - Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed, getDate("2012-02-28 00:00 UTC"), cluster); - Assert.assertEquals(result.size(), 23); + Set<SchedulableEntityInstance> expected = new HashSet<>(); + String[] consumers = {"2012-02-28 11:00 UTC", "2012-02-28 16:00 UTC", "2012-02-28 18:00 UTC", + "2012-02-28 20:00 UTC", "2012-02-28 13:00 UTC", "2012-02-28 03:00 UTC", "2012-02-28 04:00 UTC", + "2012-02-28 06:00 UTC", "2012-02-28 05:00 UTC", "2012-02-28 17:00 UTC", "2012-02-28 00:00 UTC", + "2012-02-28 23:00 UTC", "2012-02-28 21:00 UTC", "2012-02-28 15:00 UTC", "2012-02-28 22:00 UTC", + "2012-02-28 14:00 UTC", "2012-02-28 08:00 UTC", "2012-02-28 12:00 UTC", "2012-02-28 02:00 UTC", + "2012-02-28 01:00 UTC", "2012-02-28 19:00 UTC", "2012-02-28 10:00 UTC", "2012-02-28 09:00 UTC", + "2012-02-28 07:00 UTC", }; + for (String d : consumers) { + SchedulableEntityInstance i = new SchedulableEntityInstance(process.getName(), cluster.getName(), + getDate(d), EntityType.PROCESS); + i.setTags(SchedulableEntityInstance.INPUT); + expected.add(i); + } + Assert.assertEquals(result, expected); } @Test - public void testGetConsumerWithLatest() throws Exception { + public void testGetConsumerWithVariableStart() throws Exception { Cluster cluster = publishCluster(); - Feed feed = publishFeed(cluster, "hours(1)", "2012-02-28 00:00 UTC", "2016-02-28 00:00 UTC"); + Feed feed = publishFeed(cluster, "hours(1)", "2012-02-27 00:00 UTC", "2016-02-28 00:00 UTC"); //create a consumer Process and submit it, assert that this is returned in ConsumerProcesses - Process process = prepareProcess(cluster, "hours(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); + Process process = prepareProcess(cluster, "hours(1)", "2012-02-27 10:00 UTC", "2016-02-28 10:00 UTC"); + Inputs inputs = new Inputs(); + Input inFeed = new Input(); + inFeed.setName("inputFeed"); + inFeed.setFeed(feed.getName()); + inFeed.setStart("now(0, 0)"); + inFeed.setEnd("today(24, 0)"); + inputs.getInputs().add(inFeed); + process.setInputs(inputs); + store.publish(EntityType.PROCESS, process); + + Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed, + getDate("2012-03-28 00:00 UTC"), cluster); + Set<SchedulableEntityInstance> expected = new HashSet<>(); + String[] consumers = {"2012-03-27 16:00 UTC", "2012-03-27 01:00 UTC", "2012-03-27 10:00 UTC", + "2012-03-27 03:00 UTC", "2012-03-27 08:00 UTC", "2012-03-27 07:00 UTC", "2012-03-27 19:00 UTC", + "2012-03-27 22:00 UTC", "2012-03-27 12:00 UTC", "2012-03-27 20:00 UTC", "2012-03-27 09:00 UTC", + "2012-03-27 04:00 UTC", "2012-03-27 14:00 UTC", "2012-03-27 05:00 UTC", "2012-03-27 23:00 UTC", + "2012-03-27 17:00 UTC", "2012-03-27 13:00 UTC", "2012-03-27 18:00 UTC", "2012-03-27 15:00 UTC", + "2012-03-28 00:00 UTC", "2012-03-27 02:00 UTC", "2012-03-27 11:00 UTC", "2012-03-27 21:00 UTC", + "2012-03-27 00:00 UTC", "2012-03-27 06:00 UTC", }; + for (String d : consumers) { + SchedulableEntityInstance i = new SchedulableEntityInstance(process.getName(), cluster.getName(), + getDate(d), EntityType.PROCESS); + i.setTags(SchedulableEntityInstance.INPUT); + expected.add(i); + } + Assert.assertEquals(result, expected); + } + + @Test + public void testGetConsumerWithLatest() throws Exception { + Cluster cluster = publishCluster(); + Feed feed = publishFeed(cluster, "hours(1)", "2012-02-27 00:00 UTC", "2016-02-28 00:00 UTC"); + Process process = prepareProcess(cluster, "hours(1)", "2012-02-27 10:00 UTC", "2016-02-28 10:00 UTC"); Inputs inputs = new Inputs(); Input inFeed = new Input(); inFeed.setName("inputFeed"); @@ -403,8 +673,21 @@ public class FeedHelperTest extends AbstractTestBase { Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed, getDate("2012-02-28 00:00 UTC"), cluster); - System.out.println("result.size() = " + result.size()); - Assert.assertEquals(result.size(), 23); + Set<SchedulableEntityInstance> expected = new HashSet<>(); + String[] consumers = {"2012-02-28 23:00 UTC", "2012-02-28 04:00 UTC", "2012-02-28 10:00 UTC", + "2012-02-28 07:00 UTC", "2012-02-28 17:00 UTC", "2012-02-28 13:00 UTC", "2012-02-28 05:00 UTC", + "2012-02-28 22:00 UTC", "2012-02-28 03:00 UTC", "2012-02-28 21:00 UTC", "2012-02-28 11:00 UTC", + "2012-02-28 20:00 UTC", "2012-02-28 06:00 UTC", "2012-02-28 01:00 UTC", "2012-02-28 14:00 UTC", + "2012-02-28 00:00 UTC", "2012-02-28 18:00 UTC", "2012-02-28 12:00 UTC", "2012-02-28 16:00 UTC", + "2012-02-28 09:00 UTC", "2012-02-28 15:00 UTC", "2012-02-28 19:00 UTC", "2012-02-28 08:00 UTC", + "2012-02-28 02:00 UTC", }; + for (String d : consumers) { + SchedulableEntityInstance i = new SchedulableEntityInstance(process.getName(), cluster.getName(), + getDate(d), EntityType.PROCESS); + i.setTags(SchedulableEntityInstance.INPUT); + expected.add(i); + } + Assert.assertEquals(result, expected); } private Validity getFeedValidity(String start, String end) throws ParseException { http://git-wip-us.apache.org/repos/asf/falcon/blob/bd443433/common/src/test/java/org/apache/falcon/entity/ProcessHelperTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/ProcessHelperTest.java b/common/src/test/java/org/apache/falcon/entity/ProcessHelperTest.java index 0d396ae..0729f15 100644 --- a/common/src/test/java/org/apache/falcon/entity/ProcessHelperTest.java +++ b/common/src/test/java/org/apache/falcon/entity/ProcessHelperTest.java @@ -62,40 +62,98 @@ public class ProcessHelperTest extends AbstractTestBase { store = ConfigurationStore.get(); } + @Test(expectedExceptions = IllegalArgumentException.class) + public void testBeforeStartInstance() throws FalconException, ParseException { + // create a process with input feeds + Cluster cluster = publishCluster(); + Feed feed = publishFeed(cluster, "minutes(5)", "2012-02-27 10:00 UTC", "2016-02-28 10:00 UTC"); + + // find the input Feed instances time + Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2012-02-28 10:47 UTC"); + Inputs inputs = new Inputs(); + Input input = getInput("inputFeed", feed.getName(), "now(0,-20)", "now(0,0)", false); + inputs.getInputs().add(input); + process.setInputs(inputs); + store.publish(EntityType.PROCESS, process); + + Date processInstanceDate = getDate("2012-02-28 10:27 UTC"); + ProcessHelper.getInputFeedInstances(process, processInstanceDate, cluster, false); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testEqualsToEndInstance() throws FalconException, ParseException { + // create a process with input feeds + Cluster cluster = publishCluster(); + Feed feed = publishFeed(cluster, "minutes(5)", "2012-02-27 10:00 UTC", "2016-02-28 10:00 UTC"); + + // find the input Feed instances time + Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2012-02-28 10:47 UTC"); + Inputs inputs = new Inputs(); + Input input = getInput("inputFeed", feed.getName(), "now(0,-20)", "now(0,0)", false); + inputs.getInputs().add(input); + process.setInputs(inputs); + store.publish(EntityType.PROCESS, process); + + Date processInstanceDate = getDate("2012-02-28 10:47 UTC"); + ProcessHelper.getInputFeedInstances(process, processInstanceDate, cluster, false); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testOutOfSyncInstance() throws FalconException, ParseException { + Cluster cluster = publishCluster(); + Feed feed = publishFeed(cluster, "minutes(5)", "2012-02-27 10:00 UTC", "2016-02-28 10:00 UTC"); + Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2012-02-28 10:47 UTC"); + Inputs inputs = new Inputs(); + Input input = getInput("inputFeed", feed.getName(), "now(0,-20)", "now(0,0)", false); + inputs.getInputs().add(input); + process.setInputs(inputs); + store.publish(EntityType.PROCESS, process); + Date processInstanceDate = getDate("2012-02-28 10:40 UTC"); + ProcessHelper.getInputFeedInstances(process, processInstanceDate, cluster, false); + } + + @Test + public void testProcessWithNoDependencies() throws Exception { + Cluster cluster = publishCluster(); + Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2012-02-28 10:47 UTC"); + store.publish(EntityType.PROCESS, process); + Date processInstanceDate = getDate("2012-02-28 10:37 UTC"); + Set<SchedulableEntityInstance> inputFeedInstances = ProcessHelper.getInputFeedInstances(process, + processInstanceDate, cluster, false); + Assert.assertTrue(inputFeedInstances.isEmpty()); + Set<SchedulableEntityInstance> res = ProcessHelper.getOutputFeedInstances(process, processInstanceDate, + cluster); + Assert.assertTrue(res.isEmpty()); + } + @Test public void testGetInputFeedInstances() throws FalconException, ParseException { // create a process with input feeds Cluster cluster = publishCluster(); - Feed feed = publishFeed(cluster, "hours(1)", "2012-02-27 10:00 UTC", "2016-02-28 10:00 UTC"); + Feed feed = publishFeed(cluster, "minutes(5)", "2012-02-27 10:00 UTC", "2016-02-28 10:00 UTC"); // find the input Feed instances time - Process process = prepareProcess(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); + Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2016-02-28 10:37 UTC"); Inputs inputs = new Inputs(); - Input input = getInput("inputFeed", feed.getName(), "today(0,-30)", "today(2,30)", false); + Input input = getInput("inputFeed", feed.getName(), "now(0,-20)", "now(0,0)", false); inputs.getInputs().add(input); process.setInputs(inputs); store.publish(EntityType.PROCESS, process); - Date processInstanceDate = getDate("2012-02-28 10:00 UTC"); + Date processInstanceDate = getDate("2012-02-28 10:37 UTC"); Set<SchedulableEntityInstance> inputFeedInstances = ProcessHelper.getInputFeedInstances(process, processInstanceDate, cluster, false); - Assert.assertEquals(inputFeedInstances.size(), 3); + Assert.assertEquals(inputFeedInstances.size(), 5); Set<SchedulableEntityInstance> expectedInputFeedInstances = new HashSet<>(); - SchedulableEntityInstance instance = new SchedulableEntityInstance(feed.getName(), cluster.getName(), - getDate("2012-02-28 00:00 UTC"), EntityType.FEED); - instance.setTag(SchedulableEntityInstance.INPUT); - expectedInputFeedInstances.add(instance); - instance = new SchedulableEntityInstance(feed.getName(), cluster.getName(), getDate("2012-02-28 01:00 UTC"), - EntityType.FEED); - instance.setTag(SchedulableEntityInstance.INPUT); - expectedInputFeedInstances.add(instance); - instance = new SchedulableEntityInstance(feed.getName(), cluster.getName(), getDate("2012-02-28 02:00 UTC"), - EntityType.FEED); - instance.setTag(SchedulableEntityInstance.INPUT); - expectedInputFeedInstances.add(instance); - - //Validate with expected result + String[] inputInstances = { "2012-02-28 10:15 UTC", "2012-02-28 10:20 UTC", "2012-02-28 10:25 UTC", + "2012-02-28 10:30 UTC", "2012-02-28 10:35 UTC", }; + for (String d : inputInstances) { + SchedulableEntityInstance i = new SchedulableEntityInstance(feed.getName(), cluster.getName(), + getDate(d), EntityType.FEED); + i.setTags(SchedulableEntityInstance.INPUT); + expectedInputFeedInstances.add(i); + } Assert.assertTrue(inputFeedInstances.equals(expectedInputFeedInstances)); } @@ -115,8 +173,8 @@ public class ProcessHelperTest extends AbstractTestBase { Set<SchedulableEntityInstance> expected = new HashSet<>(); SchedulableEntityInstance ins = new SchedulableEntityInstance(feed.getName(), cluster.getName(), - getDate("2012-02-28 11:00 UTC"), EntityType.FEED); - ins.setTag(SchedulableEntityInstance.OUTPUT); + getDate("2012-02-27 11:00 UTC"), EntityType.FEED); + ins.setTags(SchedulableEntityInstance.OUTPUT); expected.add(ins); Assert.assertEquals(result, expected); http://git-wip-us.apache.org/repos/asf/falcon/blob/bd443433/docs/src/site/twiki/FalconCLI.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/FalconCLI.twiki b/docs/src/site/twiki/FalconCLI.twiki index 50dce84..233e4a6 100644 --- a/docs/src/site/twiki/FalconCLI.twiki +++ b/docs/src/site/twiki/FalconCLI.twiki @@ -301,11 +301,11 @@ $FALCON_HOME/bin/falcon instance -type <<feed/process>> -name <<name>> -params - For example: $FALCON_HOME/bin/falcon instance -dependency -type feed -name out -instanceTime 2014-12-15T00:00Z -name: producer, type: PROCESS, cluster: local, instanceTime: 2014-12-15T00:00Z, tag: Output -name: consumer, type: PROCESS, cluster: local, instanceTime: 2014-12-15T00:03Z, tag: Input -name: consumer, type: PROCESS, cluster: local, instanceTime: 2014-12-15T00:04Z, tag: Input -name: consumer, type: PROCESS, cluster: local, instanceTime: 2014-12-15T00:02Z, tag: Input -name: consumer, type: PROCESS, cluster: local, instanceTime: 2014-12-15T00:05Z, tag: Input +name: producer, type: PROCESS, cluster: local, instanceTime: 2014-12-15T00:00Z, tags: Output +name: consumer, type: PROCESS, cluster: local, instanceTime: 2014-12-15T00:03Z, tags: Input +name: consumer, type: PROCESS, cluster: local, instanceTime: 2014-12-15T00:04Z, tags: Input +name: consumer, type: PROCESS, cluster: local, instanceTime: 2014-12-15T00:02Z, tags: Input +name: consumer, type: PROCESS, cluster: local, instanceTime: 2014-12-15T00:05Z, tags: Input Response: default/Success! http://git-wip-us.apache.org/repos/asf/falcon/blob/bd443433/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java index 72f9fe4..d114d8d 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java @@ -227,6 +227,7 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager { } } catch (Throwable throwable) { + LOG.error("Failed to get instance dependencies:", throwable); throw FalconWebException.newInstanceException(throwable, Response.Status.BAD_REQUEST); } http://git-wip-us.apache.org/repos/asf/falcon/blob/bd443433/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java index 1a8396c..757fda8 100644 --- a/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java +++ b/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java @@ -32,6 +32,8 @@ import org.apache.falcon.resource.InstancesResult; import org.apache.falcon.resource.InstancesSummaryResult; import org.apache.falcon.resource.channel.Channel; import org.apache.falcon.resource.channel.ChannelFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.DefaultValue; @@ -55,6 +57,8 @@ import java.util.Set; */ @Path("instance") public class InstanceManagerProxy extends AbstractInstanceManager { + private static final Logger LOG = LoggerFactory.getLogger(InstanceManagerProxy.class); + private final Map<String, Channel> processInstanceManagerChannels = new HashMap<String, Channel>(); public InstanceManagerProxy() { @@ -389,7 +393,8 @@ public class InstanceManagerProxy extends AbstractInstanceManager { try { T resultHolder = doExecute(colo); results.put(colo, resultHolder); - } catch (FalconException e) { + } catch (Throwable e) { + LOG.error("Failed to fetch results for colo:{}", colo, e); results.put(colo, getResultInstance(APIResult.Status.FAILED, e.getClass().getName() + "::" + e.getMessage())); }
