Repository: falcon Updated Branches: refs/heads/master 960812d10 -> d7c1a1e72
FALCON-1829 [TEST] Add regression for submit and schedule process on native â¦scheduler (time based) Author: Pragya <[email protected]> Reviewers: Paul Isaychuk <[email protected]> Closes #39 from pragya-mittal/master Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/d7c1a1e7 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/d7c1a1e7 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/d7c1a1e7 Branch: refs/heads/master Commit: d7c1a1e727c938a5c0a511a4636753a62629024b Parents: 960812d Author: Pragya <[email protected]> Authored: Thu Feb 11 17:48:32 2016 +0530 Committer: Pallavi Rao <[email protected]> Committed: Thu Feb 11 17:48:32 2016 +0530 ---------------------------------------------------------------------- falcon-regression/CHANGES.txt | 2 + .../helpers/entity/AbstractEntityHelper.java | 52 +++-- .../falcon/regression/AuthorizationTest.java | 4 +- .../nativeScheduler/NativeScheduleTest.java | 231 +++++++++++++++++++ .../falcon/regression/security/EntityOp.java | 4 +- .../src/test/resources/sleep/workflow.xml | 85 +++++++ 6 files changed, 354 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/d7c1a1e7/falcon-regression/CHANGES.txt ---------------------------------------------------------------------- diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt index b4717f4..b3769f0 100644 --- a/falcon-regression/CHANGES.txt +++ b/falcon-regression/CHANGES.txt @@ -5,6 +5,8 @@ Trunk (Unreleased) INCOMPATIBLE CHANGES NEW FEATURES + FALCON-1829 Add regression for submit and schedule process on native scheduler (time based) (Pragya Mittal) + FALCON-1766 Add CLI metrics check for HiveDR, HDFS and feed replication (Paul Isaychuk) FALCON-1777 Add regression for HDFS replication (recipe) (Paul Isaychuk) http://git-wip-us.apache.org/repos/asf/falcon/blob/d7c1a1e7/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/AbstractEntityHelper.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/AbstractEntityHelper.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/AbstractEntityHelper.java index 29c97b2..27e12d0 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/AbstractEntityHelper.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/AbstractEntityHelper.java @@ -301,44 +301,55 @@ public abstract class AbstractEntityHelper { public ServiceResponse submitEntity(String data, String user) throws IOException, URISyntaxException, AuthenticationException, InterruptedException { LOGGER.info("Submitting " + getEntityType() + ": \n" + Util.prettyPrintXml(data)); - return Util.sendRequest(createUrl(this.hostname + URLS.SUBMIT_URL.getValue(), - getEntityType() + colo), "post", data, user); + return Util.sendRequest(createUrl(this.hostname + URLS.SUBMIT_URL.getValue(), getEntityType() + colo), "post", + data, user); } public ServiceResponse validateEntity(String data, String user) throws IOException, URISyntaxException, AuthenticationException, InterruptedException { LOGGER.info("Validating " + getEntityType() + ": \n" + Util.prettyPrintXml(data)); - return Util.sendRequest(createUrl(this.hostname + URLS.VALIDATE_URL.getValue(), - getEntityType() + colo), "post", data, user); + return Util.sendRequest(createUrl(this.hostname + URLS.VALIDATE_URL.getValue(), getEntityType() + colo), "post", + data, user); } public ServiceResponse schedule(String processData) throws IOException, URISyntaxException, AuthenticationException, InterruptedException { - return schedule(processData, null); + return schedule(processData, null, ""); } - public ServiceResponse schedule(String processData, String user) + public ServiceResponse schedule(String data, String user, String params) throws IOException, URISyntaxException, AuthenticationException, InterruptedException { - return Util.sendRequest(createUrl(this.hostname + URLS.SCHEDULE_URL.getValue(), - getEntityType(), getEntityName(processData) + colo), "post", user); + + String url = createUrl(this.hostname + URLS.SCHEDULE_URL.getValue(), getEntityType(), + getEntityName(data) + colo); + if (StringUtils.isNotBlank(params)) { + url += (colo.isEmpty() ? "?" : "&") + params; + } + LOGGER.info("url is : " + url); + return Util.sendRequest(createUrl(url), "post", data, user); } public ServiceResponse submitAndSchedule(String data) throws IOException, URISyntaxException, AuthenticationException, InterruptedException { - return submitAndSchedule(data, null); + return submitAndSchedule(data, null, ""); } - public ServiceResponse submitAndSchedule(String data, String user) + public ServiceResponse submitAndSchedule(String data, String user, String params) throws IOException, URISyntaxException, AuthenticationException, InterruptedException { LOGGER.info("Submitting " + getEntityType() + ": \n" + Util.prettyPrintXml(data)); - return Util.sendRequest(createUrl(this.hostname + URLS.SUBMIT_AND_SCHEDULE_URL.getValue(), - getEntityType()), "post", data, user); + + String url = createUrl(this.hostname + URLS.SUBMIT_AND_SCHEDULE_URL.getValue(), getEntityType() + colo); + if (StringUtils.isNotBlank(params)) { + url += (colo.isEmpty() ? "?" : "&") + params; + } + return Util.sendRequest(createUrl(url), "post", data, user); } public ServiceResponse deleteByName(String entityName, String user) throws AuthenticationException, IOException, URISyntaxException, InterruptedException { - return Util.sendRequest(createUrl(this.hostname + URLS.DELETE_URL.getValue(), - getEntityType(), entityName + colo), "delete", user); + return Util.sendRequest( + createUrl(this.hostname + URLS.DELETE_URL.getValue(), getEntityType(), entityName + colo), "delete", + user); } public ServiceResponse delete(String data) @@ -348,8 +359,9 @@ public abstract class AbstractEntityHelper { public ServiceResponse delete(String data, String user) throws IOException, URISyntaxException, AuthenticationException, InterruptedException { - return Util.sendRequest(createUrl(this.hostname + URLS.DELETE_URL.getValue(), - getEntityType(), getEntityName(data) + colo), "delete", user); + return Util.sendRequest( + createUrl(this.hostname + URLS.DELETE_URL.getValue(), getEntityType(), getEntityName(data) + colo), + "delete", user); } public ServiceResponse suspend(String data) @@ -398,8 +410,9 @@ public abstract class AbstractEntityHelper { public ServiceResponse getEntityDependencies(String data, String user) throws IOException, URISyntaxException, AuthenticationException, InterruptedException { - return Util.sendRequest(createUrl(this.hostname + URLS.DEPENDENCIES.getValue(), - getEntityType(), getEntityName(data) + colo), "get", user); + return Util.sendRequest( + createUrl(this.hostname + URLS.DEPENDENCIES.getValue(), getEntityType(), getEntityName(data) + colo), + "get", user); } public InstancesResult getRunningInstance(String name) @@ -661,8 +674,7 @@ public abstract class AbstractEntityHelper { */ public ServiceResponse getDependencies(String entityName) throws URISyntaxException, AuthenticationException, InterruptedException, IOException { - String url = createUrl(this.hostname + URLS.DEPENDENCIES.getValue(), getEntityType(), - entityName + colo); + String url = createUrl(this.hostname + URLS.DEPENDENCIES.getValue(), getEntityType(), entityName + colo); return Util.sendRequest(url, "get", null, null); } http://git-wip-us.apache.org/repos/asf/falcon/blob/d7c1a1e7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java index 9c37562..714a21f 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java @@ -581,7 +581,7 @@ public class AuthorizationTest extends BaseTestClass { //by U2 schedule process dependant on scheduled feed by U1 ServiceResponse serviceResponse = prism.getProcessHelper() - .submitAndSchedule(bundles[0].getProcessData(), MerlinConstants.USER2_NAME); + .submitAndSchedule(bundles[0].getProcessData(), MerlinConstants.USER2_NAME, ""); AssertUtil.assertSucceeded(serviceResponse); AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(), Job.Status.RUNNING); @@ -631,7 +631,7 @@ public class AuthorizationTest extends BaseTestClass { //by U2 schedule process dependent on scheduled feed by U1 ServiceResponse serviceResponse = prism.getProcessHelper().submitAndSchedule(bundles[0].getProcessData(), - MerlinConstants.USER2_NAME); + MerlinConstants.USER2_NAME, ""); AssertUtil.assertSucceeded(serviceResponse); AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(), Job.Status.RUNNING); http://git-wip-us.apache.org/repos/asf/falcon/blob/d7c1a1e7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/nativeScheduler/NativeScheduleTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/nativeScheduler/NativeScheduleTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/nativeScheduler/NativeScheduleTest.java new file mode 100644 index 0000000..fe61cdf --- /dev/null +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/nativeScheduler/NativeScheduleTest.java @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.regression.nativeScheduler; +import org.apache.falcon.entity.v0.Frequency; +import org.apache.falcon.regression.Entities.ProcessMerlin; +import org.apache.falcon.regression.core.bundle.Bundle; +import org.apache.falcon.regression.core.helpers.ColoHelper; +import org.apache.falcon.regression.core.response.ServiceResponse; +import org.apache.falcon.regression.core.util.*; +import org.apache.falcon.regression.testHelper.BaseTestClass; +import org.apache.log4j.Logger; +import org.apache.oozie.client.OozieClient; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * Schedule process via native scheduler. + */ + +@Test(groups = "distributed") +public class NativeScheduleTest extends BaseTestClass { + private ColoHelper cluster1 = servers.get(0); + private ColoHelper cluster2 = servers.get(1); + private String baseTestHDFSDir = cleanAndGetTestDir(); + private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator"; + private static final Logger LOGGER = Logger.getLogger(NativeScheduleTest.class); + private String startTime; + private String endTime; + + + + @BeforeClass(alwaysRun = true) + public void uploadWorkflow() throws Exception { + uploadDirToClusters(aggregateWorkflowDir, OSUtil.concat(OSUtil.RESOURCES, "sleep")); + } + + @BeforeMethod(alwaysRun = true) + public void setUp() throws Exception { + startTime = TimeUtil.getTimeWrtSystemTime(-10); + endTime = TimeUtil.addMinsToTime(startTime, 50); + LOGGER.info("Time range between : " + startTime + " and " + endTime); + Bundle bundle = BundleUtil.readELBundle(); + + for (int i = 0; i < 2; i++) { + bundles[i] = new Bundle(bundle, servers.get(i)); + bundles[i].generateUniqueBundle(this); + bundles[i].setProcessWorkflow(aggregateWorkflowDir); + bundles[i].submitClusters(prism); + bundles[i].setProcessConcurrency(2); + bundles[i].setProcessValidity(startTime, endTime); + bundles[i].setProcessPeriodicity(1, Frequency.TimeUnit.minutes); + } + } + + @AfterMethod(alwaysRun = true) + public void tearDown() { + removeTestClassEntities(); + } + + + /** + * Successfully schedule process via native scheduler through prism and server on single cluster. + * Schedule the same process on oozie. It should fail. + */ + @Test + public void scheduleProcessWithNativeUsingProperties() throws Exception { + ProcessMerlin processMerlin = bundles[0].getProcessObject(); + processMerlin.setInputs(null); + processMerlin.setOutputs(null); + LOGGER.info(processMerlin.toString()); + + ServiceResponse response = prism.getProcessHelper().submitEntity(processMerlin.toString()); + AssertUtil.assertSucceeded(response); + + // Schedule with prism + response = prism.getProcessHelper().schedule(processMerlin.toString(), null, + "properties=falcon.scheduler:native"); + AssertUtil.assertSucceeded(response); + + // Schedule with server + response = cluster1.getProcessHelper().schedule(processMerlin.toString(), null, + "properties=falcon.scheduler:oozie"); + AssertUtil.assertFailed(response); + + // Schedule with oozie via prism + response = prism.getProcessHelper().schedule(processMerlin.toString(), null, + "properties=falcon.scheduler:oozie"); + AssertUtil.assertFailed(response); + + // Schedule with oozie via server + response = cluster1.getProcessHelper().schedule(processMerlin.toString(), null, + "properties=falcon.scheduler:native"); + AssertUtil.assertSucceeded(response); + + } + + /** + * Successfully schedule process via oozie scheduler (using properties) through prism and server on single cluster. + * Schedule the same process on native scheduler. It should fail. + */ + @Test + public void scheduleProcessWithOozieUsingProperties() throws Exception { + ProcessMerlin processMerlin = bundles[0].getProcessObject(); + processMerlin.setInputs(null); + processMerlin.setOutputs(null); + LOGGER.info(processMerlin.toString()); + + ServiceResponse response = prism.getProcessHelper().submitEntity(processMerlin.toString()); + AssertUtil.assertSucceeded(response); + + // Schedule with prism + response = prism.getProcessHelper().schedule(processMerlin.toString(), null, + "properties=falcon.scheduler:oozie"); + AssertUtil.assertSucceeded(response); + + // Schedule with server + response = cluster1.getProcessHelper().schedule(processMerlin.toString(), null, + "properties=falcon.scheduler:oozie"); + AssertUtil.assertSucceeded(response); + + // Schedule with native via server + response = cluster1.getProcessHelper().schedule(processMerlin.toString(), null, + "properties=falcon.scheduler:native"); + AssertUtil.assertFailed(response); + + // Schedule with native via prism + response = prism.getProcessHelper().schedule(processMerlin.toString(), null, + "properties=falcon.scheduler:native"); + AssertUtil.assertFailed(response); + + } + + /** + * Successfully schedule process via oozie scheduler(without properties) through prism and server on single cluster. + * Schedule the same process on native using properties. It should fail. + */ + @Test + public void scheduleProcessWithOozieWithNoParams() throws Exception { + ProcessMerlin processMerlin = bundles[0].getProcessObject(); + processMerlin.setInputs(null); + processMerlin.setOutputs(null); + LOGGER.info(processMerlin.toString()); + + ServiceResponse response = prism.getProcessHelper().submitEntity(processMerlin.toString()); + AssertUtil.assertSucceeded(response); + + // Schedule with prism + response = prism.getProcessHelper().schedule(processMerlin.toString(), null, ""); + AssertUtil.assertSucceeded(response); + + // Schedule with native via server + response = cluster1.getProcessHelper().schedule(processMerlin.toString(), null, + "properties=falcon.scheduler:native"); + AssertUtil.assertFailed(response); + + // Schedule with native via prism + response = prism.getProcessHelper().schedule(processMerlin.toString(), null, + "properties=falcon.scheduler:native"); + AssertUtil.assertFailed(response); + + } + + /** + * Successfully schedule process via native scheduler through prism and server on multiple cluster. + * Schedule the same process on oozie. It should fail. + */ + @Test(groups = {"prism", "0.2"}) + public void scheduleProcessWithNativeOnTwoClusters() throws Exception { + + ProcessMerlin processMerlinNative = bundles[0].getProcessObject(); + processMerlinNative.clearProcessCluster(); + processMerlinNative.addProcessCluster( + new ProcessMerlin.ProcessClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) + .withValidity(startTime, endTime).build()); + processMerlinNative.addProcessCluster( + new ProcessMerlin.ProcessClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0))) + .withValidity(startTime, endTime).build()); + processMerlinNative.setInputs(null); + processMerlinNative.setOutputs(null); + LOGGER.info(processMerlinNative.toString()); + + // Schedule with native via prism + ServiceResponse response = prism.getProcessHelper(). + submitAndSchedule(processMerlinNative.toString(), null, "properties=falcon.scheduler:native"); + AssertUtil.assertSucceeded(response); + + // Schedule with native via server1 + response = cluster1.getProcessHelper().schedule(processMerlinNative.toString(), null, + "properties=falcon.scheduler:native"); + AssertUtil.assertSucceeded(response); + + // Schedule with native via server2 + response = cluster2.getProcessHelper().schedule(processMerlinNative.toString(), null, + "properties=falcon.scheduler:native"); + AssertUtil.assertSucceeded(response); + + // Schedule with oozie via prism + response = prism.getProcessHelper().schedule(processMerlinNative.toString(), null, + "properties=falcon.scheduler:oozie"); + AssertUtil.assertFailed(response); + + // Schedule with oozie via server1 + response = cluster1.getProcessHelper().schedule(processMerlinNative.toString(), null, + "properties=falcon.scheduler:oozie"); + AssertUtil.assertFailed(response); + + // Schedule with oozie via server2 + response = cluster2.getProcessHelper().schedule(processMerlinNative.toString(), null, + "properties=falcon.scheduler:oozie"); + AssertUtil.assertFailed(response); + + } + +} http://git-wip-us.apache.org/repos/asf/falcon/blob/d7c1a1e7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/security/EntityOp.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/security/EntityOp.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/security/EntityOp.java index dbaae9b..7b03f32 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/security/EntityOp.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/security/EntityOp.java @@ -163,7 +163,7 @@ enum EntityOp { public boolean executeAs(String user, AbstractEntityHelper helper, String data) { final ServiceResponse response; try { - response = helper.schedule(data, user); + response = helper.schedule(data, user, ""); } catch (IOException e) { logger.error("Caught exception: " + e); return false; @@ -207,7 +207,7 @@ enum EntityOp { public boolean executeAs(String user, AbstractEntityHelper helper, String data) { final ServiceResponse response; try { - response = helper.submitAndSchedule(data, user); + response = helper.submitAndSchedule(data, user, ""); } catch (IOException e) { logger.error("Caught exception: " + e); return false; http://git-wip-us.apache.org/repos/asf/falcon/blob/d7c1a1e7/falcon-regression/merlin/src/test/resources/sleep/workflow.xml ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/resources/sleep/workflow.xml b/falcon-regression/merlin/src/test/resources/sleep/workflow.xml new file mode 100644 index 0000000..bd7c821 --- /dev/null +++ b/falcon-regression/merlin/src/test/resources/sleep/workflow.xml @@ -0,0 +1,85 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<workflow-app xmlns="uri:oozie:workflow:0.2" name="aggregator-wf"> + <start to="hdfscommands"/> + <action name="hdfscommands"> + <fs> + <delete path='${nameNode}/tmp/falcon-regression/test'/> + <mkdir path='${nameNode}/tmp/falcon-regression/test'/> + </fs> + <ok to="aggregator"/> + <error to="fail"/> + </action> + <action name="aggregator"> + <map-reduce> + <job-tracker>${jobTracker}</job-tracker> + <name-node>${nameNode}</name-node> + <prepare> + <delete path="${nameNode}/tmp/falcon-regression/test/output/"/> + </prepare> + <configuration> + <property> + <name>mapred.job.queue.name</name> + <value>${queueName}</value> + </property> + <property> + <name>mapred.mapper.class</name> + <value>org.apache.hadoop.mapred.lib.IdentityMapper</value> + </property> + <property> + <name>mapred.reducer.class</name> + <value>org.apache.hadoop.mapred.lib.IdentityReducer</value> + </property> + <property> + <name>mapred.map.tasks</name> + <value>1</value> + </property> + <property> + <name>mapred.input.dir</name> + <value>${nameNode}/tmp/falcon-regression/test</value> + </property> + <property> + <name>mapred.output.dir</name> + <value>${nameNode}/tmp/falcon-regression/test/output/</value> + </property> + </configuration> + </map-reduce> + <ok to="java-node"/> + <error to="fail"/> + </action> + <action name="java-node"> + <java> + <job-tracker>${jobTracker}</job-tracker> + <name-node>${nameNode}</name-node> + <main-class>org.apache.hadoop.mapreduce.SleepJob</main-class> + <arg>-m</arg> + <arg>1</arg> + <arg>-mt</arg> + <arg>60000</arg> + <arg>-r</arg> + <arg>0</arg> + </java> + <ok to="end"/> + <error to="fail"/> + </action> + + <kill name="fail"> + <message>Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> + </kill> + <end name="end"/> +</workflow-app>
