Repository: falcon
Updated Branches:
  refs/heads/master 960812d10 -> d7c1a1e72


FALCON-1829 [TEST] Add regression for submit and schedule process on native

…scheduler (time based)

Author: Pragya <[email protected]>

Reviewers: Paul Isaychuk <[email protected]>

Closes #39 from pragya-mittal/master


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/d7c1a1e7
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/d7c1a1e7
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/d7c1a1e7

Branch: refs/heads/master
Commit: d7c1a1e727c938a5c0a511a4636753a62629024b
Parents: 960812d
Author: Pragya <[email protected]>
Authored: Thu Feb 11 17:48:32 2016 +0530
Committer: Pallavi Rao <[email protected]>
Committed: Thu Feb 11 17:48:32 2016 +0530

----------------------------------------------------------------------
 falcon-regression/CHANGES.txt                   |   2 +
 .../helpers/entity/AbstractEntityHelper.java    |  52 +++--
 .../falcon/regression/AuthorizationTest.java    |   4 +-
 .../nativeScheduler/NativeScheduleTest.java     | 231 +++++++++++++++++++
 .../falcon/regression/security/EntityOp.java    |   4 +-
 .../src/test/resources/sleep/workflow.xml       |  85 +++++++
 6 files changed, 354 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/d7c1a1e7/falcon-regression/CHANGES.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt
index b4717f4..b3769f0 100644
--- a/falcon-regression/CHANGES.txt
+++ b/falcon-regression/CHANGES.txt
@@ -5,6 +5,8 @@ Trunk (Unreleased)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+   FALCON-1829 Add regression for submit and schedule process on native 
scheduler (time based) (Pragya Mittal)
+
    FALCON-1766 Add CLI metrics check for HiveDR, HDFS and feed replication 
(Paul Isaychuk)
 
    FALCON-1777 Add regression for HDFS replication (recipe) (Paul Isaychuk)

http://git-wip-us.apache.org/repos/asf/falcon/blob/d7c1a1e7/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/AbstractEntityHelper.java
----------------------------------------------------------------------
diff --git 
a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/AbstractEntityHelper.java
 
b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/AbstractEntityHelper.java
index 29c97b2..27e12d0 100644
--- 
a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/AbstractEntityHelper.java
+++ 
b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/AbstractEntityHelper.java
@@ -301,44 +301,55 @@ public abstract class AbstractEntityHelper {
     public ServiceResponse submitEntity(String data, String user)
         throws IOException, URISyntaxException, AuthenticationException, 
InterruptedException {
         LOGGER.info("Submitting " + getEntityType() + ": \n" + 
Util.prettyPrintXml(data));
-        return Util.sendRequest(createUrl(this.hostname + 
URLS.SUBMIT_URL.getValue(),
-            getEntityType() + colo), "post", data, user);
+        return Util.sendRequest(createUrl(this.hostname + 
URLS.SUBMIT_URL.getValue(), getEntityType() + colo), "post",
+                data, user);
     }
 
     public ServiceResponse validateEntity(String data, String user)
         throws IOException, URISyntaxException, AuthenticationException, 
InterruptedException {
         LOGGER.info("Validating " + getEntityType() + ": \n" + 
Util.prettyPrintXml(data));
-        return Util.sendRequest(createUrl(this.hostname + 
URLS.VALIDATE_URL.getValue(),
-            getEntityType() + colo), "post", data, user);
+        return Util.sendRequest(createUrl(this.hostname + 
URLS.VALIDATE_URL.getValue(), getEntityType() + colo), "post",
+                data, user);
     }
 
     public ServiceResponse schedule(String processData)
         throws IOException, URISyntaxException, AuthenticationException, 
InterruptedException {
-        return schedule(processData, null);
+        return schedule(processData, null, "");
     }
 
-    public ServiceResponse schedule(String processData, String user)
+    public ServiceResponse schedule(String data, String user, String params)
         throws IOException, URISyntaxException, AuthenticationException, 
InterruptedException {
-        return Util.sendRequest(createUrl(this.hostname + 
URLS.SCHEDULE_URL.getValue(),
-            getEntityType(), getEntityName(processData) + colo), "post", user);
+
+        String url = createUrl(this.hostname + URLS.SCHEDULE_URL.getValue(), 
getEntityType(),
+                getEntityName(data) + colo);
+        if (StringUtils.isNotBlank(params)) {
+            url += (colo.isEmpty() ? "?" : "&") + params;
+        }
+        LOGGER.info("url is : " + url);
+        return Util.sendRequest(createUrl(url), "post", data, user);
     }
 
     public ServiceResponse submitAndSchedule(String data)
         throws IOException, URISyntaxException, AuthenticationException, 
InterruptedException {
-        return submitAndSchedule(data, null);
+        return submitAndSchedule(data, null, "");
     }
 
-    public ServiceResponse submitAndSchedule(String data, String user)
+    public ServiceResponse submitAndSchedule(String data, String user, String 
params)
         throws IOException, URISyntaxException, AuthenticationException, 
InterruptedException {
         LOGGER.info("Submitting " + getEntityType() + ": \n" + 
Util.prettyPrintXml(data));
-        return Util.sendRequest(createUrl(this.hostname + 
URLS.SUBMIT_AND_SCHEDULE_URL.getValue(),
-            getEntityType()), "post", data, user);
+
+        String url = createUrl(this.hostname + 
URLS.SUBMIT_AND_SCHEDULE_URL.getValue(), getEntityType() + colo);
+        if (StringUtils.isNotBlank(params)) {
+            url += (colo.isEmpty() ? "?" : "&") + params;
+        }
+        return Util.sendRequest(createUrl(url), "post", data, user);
     }
 
     public ServiceResponse deleteByName(String entityName, String user)
         throws AuthenticationException, IOException, URISyntaxException, 
InterruptedException {
-        return Util.sendRequest(createUrl(this.hostname + 
URLS.DELETE_URL.getValue(),
-            getEntityType(), entityName + colo), "delete", user);
+        return Util.sendRequest(
+                createUrl(this.hostname + URLS.DELETE_URL.getValue(), 
getEntityType(), entityName + colo), "delete",
+                user);
     }
 
     public ServiceResponse delete(String data)
@@ -348,8 +359,9 @@ public abstract class AbstractEntityHelper {
 
     public ServiceResponse delete(String data, String user)
         throws IOException, URISyntaxException, AuthenticationException, 
InterruptedException {
-        return Util.sendRequest(createUrl(this.hostname + 
URLS.DELETE_URL.getValue(),
-            getEntityType(), getEntityName(data) + colo), "delete", user);
+        return Util.sendRequest(
+                createUrl(this.hostname + URLS.DELETE_URL.getValue(), 
getEntityType(), getEntityName(data) + colo),
+                "delete", user);
     }
 
     public ServiceResponse suspend(String data)
@@ -398,8 +410,9 @@ public abstract class AbstractEntityHelper {
 
     public ServiceResponse getEntityDependencies(String data, String user)
         throws IOException, URISyntaxException, AuthenticationException, 
InterruptedException {
-        return Util.sendRequest(createUrl(this.hostname + 
URLS.DEPENDENCIES.getValue(),
-            getEntityType(), getEntityName(data) + colo), "get", user);
+        return Util.sendRequest(
+                createUrl(this.hostname + URLS.DEPENDENCIES.getValue(), 
getEntityType(), getEntityName(data) + colo),
+                "get", user);
     }
 
     public InstancesResult getRunningInstance(String name)
@@ -661,8 +674,7 @@ public abstract class AbstractEntityHelper {
      */
     public ServiceResponse getDependencies(String entityName)
         throws URISyntaxException, AuthenticationException, 
InterruptedException, IOException {
-        String url = createUrl(this.hostname + URLS.DEPENDENCIES.getValue(), 
getEntityType(),
-            entityName + colo);
+        String url = createUrl(this.hostname + URLS.DEPENDENCIES.getValue(), 
getEntityType(), entityName + colo);
         return Util.sendRequest(url, "get", null, null);
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/d7c1a1e7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java
----------------------------------------------------------------------
diff --git 
a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java
 
b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java
index 9c37562..714a21f 100644
--- 
a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java
+++ 
b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java
@@ -581,7 +581,7 @@ public class AuthorizationTest extends BaseTestClass {
 
         //by U2 schedule process dependant on scheduled feed by U1
         ServiceResponse serviceResponse = prism.getProcessHelper()
-            .submitAndSchedule(bundles[0].getProcessData(), 
MerlinConstants.USER2_NAME);
+            .submitAndSchedule(bundles[0].getProcessData(), 
MerlinConstants.USER2_NAME, "");
         AssertUtil.assertSucceeded(serviceResponse);
         AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, 
bundles[0].getProcessData(), Job.Status.RUNNING);
 
@@ -631,7 +631,7 @@ public class AuthorizationTest extends BaseTestClass {
 
         //by U2 schedule process dependent on scheduled feed by U1
         ServiceResponse serviceResponse = 
prism.getProcessHelper().submitAndSchedule(bundles[0].getProcessData(),
-                MerlinConstants.USER2_NAME);
+                MerlinConstants.USER2_NAME, "");
         AssertUtil.assertSucceeded(serviceResponse);
         AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, 
bundles[0].getProcessData(), Job.Status.RUNNING);
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/d7c1a1e7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/nativeScheduler/NativeScheduleTest.java
----------------------------------------------------------------------
diff --git 
a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/nativeScheduler/NativeScheduleTest.java
 
b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/nativeScheduler/NativeScheduleTest.java
new file mode 100644
index 0000000..fe61cdf
--- /dev/null
+++ 
b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/nativeScheduler/NativeScheduleTest.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.regression.nativeScheduler;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.regression.Entities.ProcessMerlin;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.ServiceResponse;
+import org.apache.falcon.regression.core.util.*;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.OozieClient;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Schedule process via native scheduler.
+ */
+
+@Test(groups = "distributed")
+public class NativeScheduleTest extends BaseTestClass {
+    private ColoHelper cluster1 = servers.get(0);
+    private ColoHelper cluster2 = servers.get(1);
+    private String baseTestHDFSDir = cleanAndGetTestDir();
+    private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator";
+    private static final Logger LOGGER = 
Logger.getLogger(NativeScheduleTest.class);
+    private String startTime;
+    private String endTime;
+
+
+
+    @BeforeClass(alwaysRun = true)
+    public void uploadWorkflow() throws Exception {
+        uploadDirToClusters(aggregateWorkflowDir, 
OSUtil.concat(OSUtil.RESOURCES, "sleep"));
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    public void setUp() throws Exception {
+        startTime = TimeUtil.getTimeWrtSystemTime(-10);
+        endTime = TimeUtil.addMinsToTime(startTime, 50);
+        LOGGER.info("Time range between : " + startTime + " and " + endTime);
+        Bundle bundle = BundleUtil.readELBundle();
+
+        for (int i = 0; i < 2; i++) {
+            bundles[i] = new Bundle(bundle, servers.get(i));
+            bundles[i].generateUniqueBundle(this);
+            bundles[i].setProcessWorkflow(aggregateWorkflowDir);
+            bundles[i].submitClusters(prism);
+            bundles[i].setProcessConcurrency(2);
+            bundles[i].setProcessValidity(startTime, endTime);
+            bundles[i].setProcessPeriodicity(1, Frequency.TimeUnit.minutes);
+        }
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() {
+        removeTestClassEntities();
+    }
+
+
+    /**
+     * Successfully schedule process via native scheduler through prism and 
server on single cluster.
+     * Schedule the same process on oozie. It should fail.
+     */
+    @Test
+    public void scheduleProcessWithNativeUsingProperties() throws Exception {
+        ProcessMerlin processMerlin = bundles[0].getProcessObject();
+        processMerlin.setInputs(null);
+        processMerlin.setOutputs(null);
+        LOGGER.info(processMerlin.toString());
+
+        ServiceResponse response = 
prism.getProcessHelper().submitEntity(processMerlin.toString());
+        AssertUtil.assertSucceeded(response);
+
+        // Schedule with prism
+        response = prism.getProcessHelper().schedule(processMerlin.toString(), 
null,
+                "properties=falcon.scheduler:native");
+        AssertUtil.assertSucceeded(response);
+
+        // Schedule with server
+        response = 
cluster1.getProcessHelper().schedule(processMerlin.toString(), null,
+                "properties=falcon.scheduler:oozie");
+        AssertUtil.assertFailed(response);
+
+        // Schedule with oozie via prism
+        response = prism.getProcessHelper().schedule(processMerlin.toString(), 
null,
+                "properties=falcon.scheduler:oozie");
+        AssertUtil.assertFailed(response);
+
+        // Schedule with oozie via server
+        response = 
cluster1.getProcessHelper().schedule(processMerlin.toString(), null,
+                "properties=falcon.scheduler:native");
+        AssertUtil.assertSucceeded(response);
+
+    }
+
+    /**
+     * Successfully schedule process via oozie scheduler (using properties) 
through prism and server on single cluster.
+     * Schedule the same process on native scheduler. It should fail.
+     */
+    @Test
+    public void scheduleProcessWithOozieUsingProperties() throws Exception {
+        ProcessMerlin processMerlin = bundles[0].getProcessObject();
+        processMerlin.setInputs(null);
+        processMerlin.setOutputs(null);
+        LOGGER.info(processMerlin.toString());
+
+        ServiceResponse response = 
prism.getProcessHelper().submitEntity(processMerlin.toString());
+        AssertUtil.assertSucceeded(response);
+
+        // Schedule with prism
+        response = prism.getProcessHelper().schedule(processMerlin.toString(), 
null,
+                "properties=falcon.scheduler:oozie");
+        AssertUtil.assertSucceeded(response);
+
+        // Schedule with server
+        response = 
cluster1.getProcessHelper().schedule(processMerlin.toString(), null,
+                "properties=falcon.scheduler:oozie");
+        AssertUtil.assertSucceeded(response);
+
+        // Schedule with native via server
+        response = 
cluster1.getProcessHelper().schedule(processMerlin.toString(), null,
+                "properties=falcon.scheduler:native");
+        AssertUtil.assertFailed(response);
+
+        // Schedule with native via prism
+        response = prism.getProcessHelper().schedule(processMerlin.toString(), 
null,
+                "properties=falcon.scheduler:native");
+        AssertUtil.assertFailed(response);
+
+    }
+
+    /**
+     * Successfully schedule process via oozie scheduler(without properties) 
through prism and server on single cluster.
+     * Schedule the same process on native using properties. It should fail.
+     */
+    @Test
+    public void scheduleProcessWithOozieWithNoParams() throws Exception {
+        ProcessMerlin processMerlin = bundles[0].getProcessObject();
+        processMerlin.setInputs(null);
+        processMerlin.setOutputs(null);
+        LOGGER.info(processMerlin.toString());
+
+        ServiceResponse response = 
prism.getProcessHelper().submitEntity(processMerlin.toString());
+        AssertUtil.assertSucceeded(response);
+
+        // Schedule with prism
+        response = prism.getProcessHelper().schedule(processMerlin.toString(), 
null, "");
+        AssertUtil.assertSucceeded(response);
+
+        // Schedule with native via server
+        response = 
cluster1.getProcessHelper().schedule(processMerlin.toString(), null,
+                "properties=falcon.scheduler:native");
+        AssertUtil.assertFailed(response);
+
+        // Schedule with native via prism
+        response = prism.getProcessHelper().schedule(processMerlin.toString(), 
null,
+                "properties=falcon.scheduler:native");
+        AssertUtil.assertFailed(response);
+
+    }
+
+    /**
+     * Successfully schedule process via native scheduler through prism and 
server on multiple cluster.
+     * Schedule the same process on oozie. It should fail.
+     */
+    @Test(groups = {"prism", "0.2"})
+    public void scheduleProcessWithNativeOnTwoClusters() throws Exception {
+
+        ProcessMerlin processMerlinNative = bundles[0].getProcessObject();
+        processMerlinNative.clearProcessCluster();
+        processMerlinNative.addProcessCluster(
+                new 
ProcessMerlin.ProcessClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
+                        .withValidity(startTime, endTime).build());
+        processMerlinNative.addProcessCluster(
+                new 
ProcessMerlin.ProcessClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
+                        .withValidity(startTime, endTime).build());
+        processMerlinNative.setInputs(null);
+        processMerlinNative.setOutputs(null);
+        LOGGER.info(processMerlinNative.toString());
+
+        // Schedule with native via prism
+        ServiceResponse response = prism.getProcessHelper().
+                submitAndSchedule(processMerlinNative.toString(), null, 
"properties=falcon.scheduler:native");
+        AssertUtil.assertSucceeded(response);
+
+        // Schedule with native via server1
+        response = 
cluster1.getProcessHelper().schedule(processMerlinNative.toString(), null,
+                "properties=falcon.scheduler:native");
+        AssertUtil.assertSucceeded(response);
+
+        // Schedule with native via server2
+        response = 
cluster2.getProcessHelper().schedule(processMerlinNative.toString(), null,
+                "properties=falcon.scheduler:native");
+        AssertUtil.assertSucceeded(response);
+
+        // Schedule with oozie via prism
+        response = 
prism.getProcessHelper().schedule(processMerlinNative.toString(), null,
+                "properties=falcon.scheduler:oozie");
+        AssertUtil.assertFailed(response);
+
+        // Schedule with oozie via server1
+        response = 
cluster1.getProcessHelper().schedule(processMerlinNative.toString(), null,
+                "properties=falcon.scheduler:oozie");
+        AssertUtil.assertFailed(response);
+
+        // Schedule with oozie via server2
+        response = 
cluster2.getProcessHelper().schedule(processMerlinNative.toString(), null,
+                "properties=falcon.scheduler:oozie");
+        AssertUtil.assertFailed(response);
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/d7c1a1e7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/security/EntityOp.java
----------------------------------------------------------------------
diff --git 
a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/security/EntityOp.java
 
b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/security/EntityOp.java
index dbaae9b..7b03f32 100644
--- 
a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/security/EntityOp.java
+++ 
b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/security/EntityOp.java
@@ -163,7 +163,7 @@ enum EntityOp {
         public boolean executeAs(String user, AbstractEntityHelper helper, 
String data) {
             final ServiceResponse response;
             try {
-                response = helper.schedule(data, user);
+                response = helper.schedule(data, user, "");
             } catch (IOException e) {
                 logger.error("Caught exception: " + e);
                 return false;
@@ -207,7 +207,7 @@ enum EntityOp {
         public boolean executeAs(String user, AbstractEntityHelper helper, 
String data) {
             final ServiceResponse response;
             try {
-                response = helper.submitAndSchedule(data, user);
+                response = helper.submitAndSchedule(data, user, "");
             } catch (IOException e) {
                 logger.error("Caught exception: " + e);
                 return false;

http://git-wip-us.apache.org/repos/asf/falcon/blob/d7c1a1e7/falcon-regression/merlin/src/test/resources/sleep/workflow.xml
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/resources/sleep/workflow.xml 
b/falcon-regression/merlin/src/test/resources/sleep/workflow.xml
new file mode 100644
index 0000000..bd7c821
--- /dev/null
+++ b/falcon-regression/merlin/src/test/resources/sleep/workflow.xml
@@ -0,0 +1,85 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<workflow-app xmlns="uri:oozie:workflow:0.2" name="aggregator-wf">
+    <start to="hdfscommands"/>
+    <action name="hdfscommands">
+        <fs>
+            <delete path='${nameNode}/tmp/falcon-regression/test'/>
+            <mkdir path='${nameNode}/tmp/falcon-regression/test'/>
+        </fs>
+        <ok to="aggregator"/>
+        <error to="fail"/>
+    </action>
+    <action name="aggregator">
+        <map-reduce>
+            <job-tracker>${jobTracker}</job-tracker>
+            <name-node>${nameNode}</name-node>
+            <prepare>
+                <delete path="${nameNode}/tmp/falcon-regression/test/output/"/>
+            </prepare>
+            <configuration>
+                <property>
+                    <name>mapred.job.queue.name</name>
+                    <value>${queueName}</value>
+                </property>
+                <property>
+                    <name>mapred.mapper.class</name>
+                    <value>org.apache.hadoop.mapred.lib.IdentityMapper</value>
+                </property>
+                <property>
+                    <name>mapred.reducer.class</name>
+                    <value>org.apache.hadoop.mapred.lib.IdentityReducer</value>
+                </property>
+                <property>
+                    <name>mapred.map.tasks</name>
+                    <value>1</value>
+                </property>
+                <property>
+                    <name>mapred.input.dir</name>
+                    <value>${nameNode}/tmp/falcon-regression/test</value>
+                </property>
+                <property>
+                    <name>mapred.output.dir</name>
+                    
<value>${nameNode}/tmp/falcon-regression/test/output/</value>
+                </property>
+            </configuration>
+        </map-reduce>
+        <ok to="java-node"/>
+        <error to="fail"/>
+    </action>
+    <action name="java-node">
+        <java>
+            <job-tracker>${jobTracker}</job-tracker>
+            <name-node>${nameNode}</name-node>
+            <main-class>org.apache.hadoop.mapreduce.SleepJob</main-class>
+            <arg>-m</arg>
+            <arg>1</arg>
+            <arg>-mt</arg>
+            <arg>60000</arg>
+            <arg>-r</arg>
+            <arg>0</arg>
+        </java>
+        <ok to="end"/>
+        <error to="fail"/>
+    </action>
+
+    <kill name="fail">
+        <message>Map/Reduce failed, error 
message[${wf:errorMessage(wf:lastErrorNode())}]</message>
+    </kill>
+    <end name="end"/>
+</workflow-app>

Reply via email to