Repository: falcon Updated Branches: refs/heads/master 24eecc4a0 -> c8e46d161
FALCON-1086 Support execution-order of a feed to be overridden for replication coord. Contributed by Shaik Idris Ali Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/c8e46d16 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/c8e46d16 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/c8e46d16 Branch: refs/heads/master Commit: c8e46d1612d740c80c0f817840e5c4dc90c68bbf Parents: 24eecc4 Author: Suhas Vasu <[email protected]> Authored: Mon Mar 30 13:39:19 2015 +0530 Committer: Suhas Vasu <[email protected]> Committed: Mon Mar 30 13:39:19 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 3 +++ docs/src/site/twiki/EntitySpecification.twiki | 10 +++++++--- .../oozie/feed/FeedReplicationCoordinatorBuilder.java | 13 +++++++++++++ .../oozie/feed/OozieFeedWorkflowBuilderTest.java | 3 +++ oozie/src/test/resources/feed/feed.xml | 3 ++- 5 files changed, 28 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/c8e46d16/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 7539bcf..db7daca 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -10,6 +10,9 @@ Trunk (Unreleased) FALCON-822 Add reverse look up API (Ajay Yadava via Suhas Vasu) IMPROVEMENTS + FALCON-1086 Support execution-order of a feed to be overridden + for replication coord (Shaik Idris Ali via Suhas Vasu) + FALCON-915 Failed to load data. Error: 400 Param user.name can't be empty - Web UI/0.6 (Balu Vellanki via Suhas Vasu) http://git-wip-us.apache.org/repos/asf/falcon/blob/c8e46d16/docs/src/site/twiki/EntitySpecification.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/EntitySpecification.twiki b/docs/src/site/twiki/EntitySpecification.twiki index 86df6d4..758d591 100644 --- a/docs/src/site/twiki/EntitySpecification.twiki +++ b/docs/src/site/twiki/EntitySpecification.twiki @@ -280,12 +280,16 @@ permission indicates the permission. <property name="parallel" value="3"/> <property name="maxMaps" value="8"/> <property name="mapBandwidth" value="1"/> + <property name="order" value="LIFO"/> </properties> </verbatim> A key-value pair, which are propagated to the workflow engine. "queueName" and "jobPriority" are special properties -available to user to specify the Hadoop job queue and priority, the same value is used by Falcons launcher job. -"timeout" and "parallel" are other special properties which decides replication instance's timeout value while -waiting for the feed instance and parallel decides the concurrent replication instances that can run at any given time. "maxMaps" represents the maximum number of maps used during replication. "mapBandwidth" represents the bandwidth in MB/s used by each mapper during replication. +available to user to specify the Hadoop job queue and priority, the same values are used by Falcon's launcher job. +"timeout", "parallel" and "order" are other special properties which decides replication instance's timeout value while +waiting for the feed instance, parallel decides the concurrent replication instances that can run at any given time and +order decides the execution order for replication instances like FIFO, LIFO and LAST_ONLY. +"maxMaps" represents the maximum number of maps used during replication. "mapBandwidth" represents the bandwidth in MB/s +used by each mapper during replication. ---++ Process Specification A process defines configuration for a workflow. A workflow is a directed acyclic graph(DAG) which defines the job for the workflow engine. A process definition defines the configurations required to run the workflow job. For example, process defines the frequency at which the workflow should run, the clusters on which the workflow should run, the inputs and outputs for the workflow, how the workflow failures should be handled, how the late inputs should be handled and so on. http://git-wip-us.apache.org/repos/asf/falcon/blob/c8e46d16/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java index 2451bbe..04aab29 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java @@ -36,6 +36,7 @@ import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.entity.v0.feed.ClusterType; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.feed.LocationType; +import org.apache.falcon.entity.v0.process.ExecutionType; import org.apache.falcon.expression.ExpressionHelper; import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.falcon.oozie.OozieCoordinatorBuilder; @@ -74,6 +75,7 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F private static final String TIMEOUT = "timeout"; private static final String MR_MAX_MAPS = "maxMaps"; private static final String MR_MAP_BANDWIDTH = "mapBandwidth"; + private static final String ORDER = "order"; public FeedReplicationCoordinatorBuilder(Feed entity) { super(entity, LifeCycle.REPLICATION); @@ -383,6 +385,17 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F } } coord.getControls().setConcurrency(String.valueOf(parallel)); + + String orderProp = props.getProperty(ORDER); + ExecutionType order = ExecutionType.FIFO; + if (orderProp != null) { + try { + order = ExecutionType.fromValue(orderProp); + } catch (IllegalArgumentException ignore) { + LOG.error("Unable to parse order:", ignore); + } + } + coord.getControls().setExecution(order.name()); } http://git-wip-us.apache.org/repos/asf/falcon/blob/c8e46d16/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java ---------------------------------------------------------------------- diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java index 48449d4..1c599b5 100644 --- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java +++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java @@ -168,6 +168,9 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { Assert.assertEquals("FALCON_FEED_REPLICATION_" + feed.getName() + "_" + srcCluster.getName(), coord.getName()); Assert.assertEquals("${coord:minutes(20)}", coord.getFrequency()); + Assert.assertEquals("2", coord.getControls().getConcurrency()); + Assert.assertEquals("120", coord.getControls().getTimeout()); + Assert.assertEquals("FIFO", coord.getControls().getExecution()); SYNCDATASET inputDataset = (SYNCDATASET) coord.getDatasets() .getDatasetOrAsyncDataset().get(0); SYNCDATASET outputDataset = (SYNCDATASET) coord.getDatasets() http://git-wip-us.apache.org/repos/asf/falcon/blob/c8e46d16/oozie/src/test/resources/feed/feed.xml ---------------------------------------------------------------------- diff --git a/oozie/src/test/resources/feed/feed.xml b/oozie/src/test/resources/feed/feed.xml index 4da222e..baa8f26 100644 --- a/oozie/src/test/resources/feed/feed.xml +++ b/oozie/src/test/resources/feed/feed.xml @@ -50,7 +50,8 @@ <properties> <property name="field3" value="value3"/> <property name="field2" value="value2"/> - + <property name="order" value="FIFO" /> + <property name="parallel" value="2" /> <property name="field4" value="value2"/> </properties> </feed>
