Repository: falcon
Updated Branches:
  refs/heads/master db0604da8 -> 4576c582f


FALCON-1573 Supply user-defined properties to Oozie workflows during schedule. 
Contributed by Daniel Del Castillo.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/4576c582
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/4576c582
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/4576c582

Branch: refs/heads/master
Commit: 4576c582f21ee7872df81193f3dc1d7fcee164aa
Parents: db0604d
Author: Ajay Yadava <[email protected]>
Authored: Wed Nov 18 21:37:46 2015 +0530
Committer: Ajay Yadava <[email protected]>
Committed: Thu Nov 19 19:09:29 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 docs/src/site/twiki/FalconCLI.twiki             |  5 +-
 .../src/site/twiki/restapi/EntitySchedule.twiki | 70 +++++++++++++++++++-
 .../apache/falcon/oozie/OozieEntityBuilder.java | 23 +++++++
 .../workflow/engine/OozieWorkflowEngine.java    |  3 +-
 .../feed/OozieFeedWorkflowBuilderTest.java      | 16 +++++
 .../OozieProcessWorkflowBuilderTest.java        | 18 +++++
 7 files changed, 134 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/4576c582/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4f46450..a95094d 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,8 @@ Trunk (Unreleased)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+    FALCON-1573 Supply user-defined properties to Oozie workflows during 
schedule(Daniel Del Castillo via Ajay Yadava)
+
     FALCON-1559 Config changes required for native scheduler (Pallavi Rao)
 
     FALCON-1459 Ability to import from database(Venkat Ramachandran via Sowmya 
Ramesh)

http://git-wip-us.apache.org/repos/asf/falcon/blob/4576c582/docs/src/site/twiki/FalconCLI.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconCLI.twiki 
b/docs/src/site/twiki/FalconCLI.twiki
index e001a7f..26e6b33 100644
--- a/docs/src/site/twiki/FalconCLI.twiki
+++ b/docs/src/site/twiki/FalconCLI.twiki
@@ -44,7 +44,10 @@ Once submitted, an entity can be scheduled using schedule 
option. Process and fe
 Usage:
 $FALCON_HOME/bin/falcon entity  -type [process|feed] -name <<name>> -schedule
 
-Optional Arg : -skipDryRun. When this argument is specified, Falcon skips 
oozie dryrun.
+Optional Arg : -skipDryRun -doAs <username>
+-properties <<key1:val1,...,keyN:valN>>
+
+<a href="./Restapi/EntitySchedule.html">Optional params described here.</a>
 
 Example:
 $FALCON_HOME/bin/falcon entity  -type process -name sampleProcess -schedule

http://git-wip-us.apache.org/repos/asf/falcon/blob/4576c582/docs/src/site/twiki/restapi/EntitySchedule.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/EntitySchedule.twiki 
b/docs/src/site/twiki/restapi/EntitySchedule.twiki
index 263d152..0dede9b 100644
--- a/docs/src/site/twiki/restapi/EntitySchedule.twiki
+++ b/docs/src/site/twiki/restapi/EntitySchedule.twiki
@@ -12,15 +12,81 @@ Schedule an entity.
    * :entity-name is name of the entity.
    * skipDryRun : Optional query param, Falcon skips oozie dryrun when value 
is set to true.
    * doAs <optional query param> allows the current user to impersonate the 
user passed in doAs when interacting with the Falcon system.
+   * properties <key1:val1,...,keyN:valN> : Optional query param, supplies a 
set of key-value pairs that will be available to the entity in the coordinator 
configuration. These values will not override properties with the same name 
predefined in the entity specification. For example, to change the scheduler 
used for scheduling the entity you would set the property _falcon.scheduler_ in 
the properties parameter to _native_ to use the Falcon Scheduler or to _oozie_ 
to use the Oozie Scheduler.
 
 
 ---++ Results
 Result of the schedule command.
 
 ---++ Examples
+---+++ Oozie Workflow
+<verbatim>
+<workflow-app xmlns="uri:oozie:workflow:0.4" name="aggregator-wf">
+  <start to="aggregator" />
+  <action name="aggregator">
+    <java>
+      <job-tracker>${jobTracker}</job-tracker>
+      <name-node>${nameNode}</name-node>
+      <configuration>
+        <property>
+          <name>mapred.job.queue.name</name>
+          <value>${queueName}</value>
+        </property>
+      </configuration>
+      <main-class>com.company.hadoop.AggregatorJob</main-class>
+      
<java-opts>-Dframework.instrumentation.host=${instrumentationServer}</java-opts>
+      <arg>--input.path=${inputBasePath}</arg>
+      <arg>--output.path=${outputBasePath}</arg>
+    </java>
+    <ok to="end" />
+    <error to="fail" />
+  </action>
+  <kill name="fail">
+    <message>Action failed, error 
message[${wf:errorMessage(wf:lastErrorNode())}]</message>
+  </kill>
+</workflow-app>
+</verbatim>
+---+++ Submitted Process
+<verbatim>
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Daily sample process. Runs at 6th hour every day. Input - last day's 
hourly data. Generates output for yesterday -->
+<process xmlns="uri:falcon:process:0.1" name="SampleProcess" >
+    <clusters>
+      <cluster name="primary-cluster">
+        <validity start="2012-04-03T06:00Z" end="2022-12-30T00:00Z" />
+      </cluster>
+    </clusters>
+
+    <parallel>1</parallel>
+    <order>FIFO</order>
+    <frequency>hours(1)</frequency>
+
+    <inputs>
+        <input name="input" feed="SampleInput" start="yesterday(0,0)" 
end="today(-1,0)" />
+    </inputs>
+
+    <outputs>
+        <output name="output" feed="SampleOutput" instance="yesterday(0,0)" />
+    </outputs>
+
+    <properties>
+        <property name="queueName" value="default" />
+        <property name="ssh.host" value="localhost" />
+        <property name="fileTimestamp" 
value="${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd')}" />
+        <property name="instrumentationServer" 
value="${coord:conf('instrumentation.host')}" />
+    </properties>
+
+    <workflow engine="oozie" path="/examples/apps/aggregator" />
+    <retry policy="exp-backoff" delay="minutes(5)" attempts="3" />
+    
+    <late-process policy="exp-backoff" delay="hours(1)">
+        <late-input input="input" 
workflow-path="/projects/bootcamp/workflow/lateinput" />
+    </late-process>
+</process>
+</verbatim>
 ---+++ Rest Call
 <verbatim>
-POST 
http://localhost:15000/api/entities/schedule/process/SampleProcess?skipDryRun=false&doAs=joe
+POST 
http://localhost:15000/api/entities/schedule/process/SampleProcess?skipDryRun=false&doAs=joe&properties=instrumentation.host:intrumentation.localdomain
 </verbatim>
 ---+++ Result
 <verbatim>
@@ -30,3 +96,5 @@ POST 
http://localhost:15000/api/entities/schedule/process/SampleProcess?skipDryR
     "status": "SUCCEEDED"
 }
 </verbatim>
+---+++ Notes
+In this example, the value of _framework.instrumentation.host_ in the Oozie 
workflow will be _intrumentation.localdomain_ which is the property passed when 
the process is scheduled.

http://git-wip-us.apache.org/repos/asf/falcon/blob/4576c582/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
----------------------------------------------------------------------
diff --git 
a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java 
b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
index e0af30d..a36ee79 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
@@ -60,6 +60,7 @@ import java.io.OutputStream;
 import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
 
@@ -107,6 +108,28 @@ public abstract class OozieEntityBuilder<T extends Entity> 
{
 
     public abstract Properties build(Cluster cluster, Path buildPath) throws 
FalconException;
 
+    public Properties build(Cluster cluster, Path buildPath, Map<String, 
String> properties) throws FalconException {
+        Properties builderProperties = build(cluster, buildPath);
+        if (properties == null || properties.isEmpty()) {
+            return builderProperties;
+        }
+
+        Properties propertiesCopy = new Properties();
+        propertiesCopy.putAll(properties);
+
+        // Builder properties shadow any user-defined property
+        for(String propertyName : builderProperties.stringPropertyNames()) {
+            String propertyValue = builderProperties.getProperty(propertyName);
+            if (propertiesCopy.contains(propertyName)) {
+                LOG.warn("User provided property {} is already declared in the 
entity and will be ignored.",
+                    propertyName);
+            }
+            propertiesCopy.put(propertyName, propertyValue);
+        }
+
+        return propertiesCopy;
+    }
+
     protected String getStoragePath(Path path) {
         if (path != null) {
             return getStoragePath(path.toString());

http://git-wip-us.apache.org/repos/asf/falcon/blob/4576c582/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git 
a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
 
b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 7262964..38b6f32 100644
--- 
a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ 
b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -169,11 +169,12 @@ public class OozieWorkflowEngine extends 
AbstractWorkflowEngine {
                 Cluster cluster = STORE.get(EntityType.CLUSTER, clusterName);
                 prepareEntityBuildPath(entity, cluster);
                 Path buildPath = EntityUtil.getNewStagingPath(cluster, entity);
-                Properties properties = builder.build(cluster, buildPath);
+                Properties properties = builder.build(cluster, buildPath, 
suppliedProps);
                 if (properties == null) {
                     LOG.info("Entity {} is not scheduled on cluster {}", 
entity.getName(), cluster);
                     continue;
                 }
+
                 //Do dryRun of coords before schedule as schedule is 
asynchronous
                 dryRunInternal(cluster, new 
Path(properties.getProperty(OozieEntityBuilder.ENTITY_PATH)), skipDryRun);
                 scheduleEntity(clusterName, properties, entity);

http://git-wip-us.apache.org/repos/asf/falcon/blob/4576c582/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
 
b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
index 176a15e..d034b1a 100644
--- 
a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
+++ 
b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
@@ -869,4 +869,20 @@ public class OozieFeedWorkflowBuilderTest extends 
AbstractTestBase {
         property.setValue(umask);
         cluster.getProperties().getProperties().add(property);
     }
+
+    @Test
+    public void testUserDefinedProperties() throws Exception {
+        Map<String, String> suppliedProps = new HashMap<>();
+        suppliedProps.put("custom.property", "custom value");
+        suppliedProps.put("ENTITY_NAME", "MyEntity");
+
+        OozieEntityBuilder builder = 
OozieEntityBuilder.get(lifecycleRetentionFeed);
+        Path bundlePath = new Path("/projects/falcon/");
+        Properties props = builder.build(trgCluster, bundlePath, 
suppliedProps);
+
+        Assert.assertNotNull(props);
+        Assert.assertEquals(props.get("ENTITY_NAME"), 
lifecycleRetentionFeed.getName());
+        Assert.assertEquals(props.get("custom.property"), "custom value");
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/4576c582/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
 
b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
index 9f492d7..8d824ba 100644
--- 
a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
+++ 
b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
@@ -75,6 +75,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
@@ -767,4 +768,21 @@ public class OozieProcessWorkflowBuilderTest extends 
AbstractTestBase {
         
Assert.assertEquals(props.get(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()),
 "impressions");
         
Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName()),
 "NONE");
     }
+
+    @Test
+    public void testUserDefinedProperties() throws Exception {
+        Map<String, String> suppliedProps = new HashMap<>();
+        suppliedProps.put("custom.property", "custom value");
+        suppliedProps.put("ENTITY_NAME", "MyEntity");
+
+        Process process = ConfigurationStore.get().get(EntityType.PROCESS, 
"clicksummary");
+        Path bundlePath = new Path("/projects/falcon/");
+        OozieEntityBuilder builder = OozieEntityBuilder.get(process);
+        Properties props = builder.build(cluster, bundlePath, suppliedProps);
+
+        Assert.assertNotNull(props);
+        Assert.assertEquals(props.get("ENTITY_NAME"), process.getName());
+        Assert.assertEquals(props.get("custom.property"), "custom value");
+    }
+
 }

Reply via email to