FALCON-1447 Integration Tests for native scheduler. Contributed by Pavan Kumar Kolamuri.
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/4591ffb6 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/4591ffb6 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/4591ffb6 Branch: refs/heads/master Commit: 4591ffb61f0820c674b0f84f5b18721f7cbc39e3 Parents: 47f8a60 Author: Ajay Yadava <[email protected]> Authored: Tue Dec 15 17:05:12 2015 +0530 Committer: Ajay Yadava <[email protected]> Committed: Tue Dec 15 17:53:17 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../oozie/client/LocalProxyOozieClient.java | 8 +- .../falcon/resource/AbstractEntityManager.java | 3 +- scheduler/pom.xml | 18 +++ .../workflow/engine/FalconWorkflowEngine.java | 6 +- .../falcon/state/AbstractSchedulerTestBase.java | 4 +- .../apache/falcon/unit/FalconUnitClient.java | 4 +- .../unit/LocalSchedulableEntityManager.java | 4 +- unit/src/main/resources/oozie-site.xml | 47 +++++++ .../apache/falcon/unit/FalconUnitTestBase.java | 6 +- .../unit/examples/JavaHelloWorldExample.java | 33 +++++ webapp/pom.xml | 8 ++ .../AbstractSchedulerManagerJerseyIT.java | 123 +++++++++++++++++ .../falcon/resource/EntityManagerJerseyIT.java | 2 +- .../EntitySchedulerManagerJerseyIT.java | 117 +++++++++++++++++ .../InstanceSchedulerManagerJerseyIT.java | 131 +++++++++++++++++++ .../resource/ProcessInstanceManagerIT.java | 2 +- .../apache/falcon/resource/UnitTestContext.java | 8 +- .../src/test/resources/helloworldworkflow.xml | 39 ++++++ .../local-process-noinputs-template.xml | 42 ++++++ webapp/src/test/resources/runtime.properties | 2 + webapp/src/test/resources/startup.properties | 31 ++++- 22 files changed, 620 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/4591ffb6/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 4c2e42c..ce346a8 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -35,6 +35,8 @@ Trunk (Unreleased) FALCON-1213 Base framework of the native scheduler(Pallavi Rao) IMPROVEMENTS + FALCON-1447 Integration Tests for native scheduler(Pavan Kumar Kolamuri via Ajay Yadava) + FALCON-1617 Enable SLA monitoring for instances in past(Narayan Periwal via Ajay Yadava) FALCON-1577 Migration of EntityManagerJerseyIT to use falcon unit (Narayan Periwal via Pallavi Rao) http://git-wip-us.apache.org/repos/asf/falcon/blob/4591ffb6/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java b/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java index f6e87c4..81f4c54 100644 --- a/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java +++ b/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java @@ -90,7 +90,13 @@ public class LocalProxyOozieClient extends OozieClient { } public String run(Properties conf) throws OozieClientException { - return getLocalOozieClientBundle().run(conf); + if (conf.getProperty("oozie.wf.application.path") != null) { + return getLocalOozieClient().run(conf); + } else if (conf.getProperty("oozie.coord.application.path") != null) { + return getLocalOozieClientCoordProxy().run(conf); + } else { + return getLocalOozieClientBundle().run(conf); + } } @Override http://git-wip-us.apache.org/repos/asf/falcon/blob/4591ffb6/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java index 2f97c0d..5a6d2dc 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java @@ -530,7 +530,7 @@ public abstract class AbstractEntityManager { return new String(data); } - private enum EntityStatus { + protected enum EntityStatus { SUBMITTED, SUSPENDED, RUNNING, COMPLETED } @@ -553,7 +553,6 @@ public abstract class AbstractEntityManager { } catch (FalconWebException e) { throw e; } catch (Exception e) { - LOG.error("Unable to get status for entity {} ({})", entity, type, e); throw FalconWebException.newException(e, Response.Status.BAD_REQUEST); } http://git-wip-us.apache.org/repos/asf/falcon/blob/4591ffb6/scheduler/pom.xml ---------------------------------------------------------------------- diff --git a/scheduler/pom.xml b/scheduler/pom.xml index c934b3e..72568f0 100644 --- a/scheduler/pom.xml +++ b/scheduler/pom.xml @@ -214,6 +214,24 @@ </executions> </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>2.4</version> + <configuration> + <excludes> + <exclude>**/log4j.xml</exclude> + </excludes> + </configuration> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> </build> </project> http://git-wip-us.apache.org/repos/asf/falcon/blob/4591ffb6/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java index 2c45fbd..c19cada 100644 --- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java +++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java @@ -175,13 +175,17 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine { states = new ArrayList<>(); states.add(InstanceState.STATE.SUSPENDED); break; - case STATUS: case PARAMS: // Applicable only for running and finished jobs. states = InstanceState.getRunningStates(); states.addAll(InstanceState.getTerminalStates()); states.add(InstanceState.STATE.SUSPENDED); break; + case STATUS: + states = InstanceState.getActiveStates(); + states.addAll(InstanceState.getTerminalStates()); + states.add(InstanceState.STATE.SUSPENDED); + break; default: throw new IllegalArgumentException("Unhandled action " + action); } http://git-wip-us.apache.org/repos/asf/falcon/blob/4591ffb6/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java ---------------------------------------------------------------------- diff --git a/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java b/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java index 48c1426..a8be06d 100644 --- a/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java +++ b/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java @@ -30,7 +30,7 @@ import java.io.File; import java.io.IOException; /** - * TestBase for tests in scheduler. + * TestBase for tests using Falcon Native Scheduler. */ public class AbstractSchedulerTestBase extends AbstractTestBase { private static final String DB_BASE_DIR = "target/test-data/falcondb"; @@ -56,7 +56,7 @@ public class AbstractSchedulerTestBase extends AbstractTestBase { fs.delete(new Path(DB_BASE_DIR), true); } - protected void createDB(String file) { + public void createDB(String file) { File sqlFile = new File(file); String[] argsCreate = { "create", "-sqlfile", sqlFile.getAbsolutePath(), "-run" }; int result = execDBCLICommands(argsCreate); http://git-wip-us.apache.org/repos/asf/falcon/blob/4591ffb6/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java index f34a90c..e86e3e8 100644 --- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java +++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java @@ -128,8 +128,8 @@ public class FalconUnitClient extends AbstractFalconClient { } @Override - public APIResult delete(EntityType entityType, String entityName, String doAsUser) { - return localSchedulableEntityManager.delete(entityType, entityName, doAsUser); + public APIResult delete(EntityType entityType, String entityName, String colo) { + return localSchedulableEntityManager.delete(entityType, entityName, colo); } @Override http://git-wip-us.apache.org/repos/asf/falcon/blob/4591ffb6/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java b/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java index 0065c71..d30b028 100644 --- a/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java +++ b/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java @@ -49,11 +49,11 @@ public class LocalSchedulableEntityManager extends AbstractSchedulableEntityMana return super.getStatus(type, entity, colo); } - public APIResult delete(EntityType entityType, String entityName, String doAsUser) { + public APIResult delete(EntityType entityType, String entityName, String colo) { if (entityType == null) { throw new IllegalStateException("Entity-Type cannot be null"); } - return super.delete(entityType.name(), entityName, doAsUser); + return super.delete(entityType.name(), entityName, colo); } public APIResult validate(String entityType, String filePath, Boolean skipDryRun, http://git-wip-us.apache.org/repos/asf/falcon/blob/4591ffb6/unit/src/main/resources/oozie-site.xml ---------------------------------------------------------------------- diff --git a/unit/src/main/resources/oozie-site.xml b/unit/src/main/resources/oozie-site.xml index 23d41eb..bb7015d 100644 --- a/unit/src/main/resources/oozie-site.xml +++ b/unit/src/main/resources/oozie-site.xml @@ -167,4 +167,51 @@ <name>oozie.service.coord.check.maximum.frequency</name> <value>false</value> </property> + + <!-- Required to Notify Falcon --> + <property> + <name>oozie.services.ext</name> + <value> + org.apache.oozie.service.JMSAccessorService, + org.apache.oozie.service.JMSTopicService, + org.apache.oozie.service.EventHandlerService + </value> + </property> + <property> + <name>oozie.service.EventHandlerService.event.listeners</name> + <value> + org.apache.oozie.jms.JMSJobEventListener + </value> + </property> + <property> + <name>oozie.jms.producer.connection.properties</name> + <value> + java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#vm://localhost:61616 + </value> + </property> + <property> + <name>oozie.service.JMSTopicService.topic.name</name> + <value> + WORKFLOW=ENTITY.TOPIC, COORDINATOR=ENTITY.TOPIC + </value> + <description> + Topic options are ${username} or a fixed string which can be specified as default or for a + particular job type. + For e.g To have a fixed string topic for workflows, coordinators and bundles, + specify in the following comma-separated format: {jobtype1}={some_string1}, {jobtype2}={some_string2} + where job type can be WORKFLOW, COORDINATOR or BUNDLE. + Following example defines topic for workflow job, workflow action, coordinator job, coordinator action, + bundle job and bundle action + WORKFLOW=workflow,â¨COORDINATOR=coordinator, + BUNDLE=bundle + For jobs with no defined topic, default topic will be ${username} + </description> + </property> + <property> + <name>oozie.service.JMSTopicService.topic.prefix</name> + <value>FALCON.</value> + <description> + This can be used to append a prefix to the topic in oozie.service.JMSTopicService.topic.name. For eg: oozie. + </description> + </property> </configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/4591ffb6/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java ---------------------------------------------------------------------- diff --git a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java index 83afac7..382e0c9 100644 --- a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java +++ b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java @@ -92,8 +92,8 @@ public class FalconUnitTestBase { private static final String DEFAULT_CLUSTER = "local"; private static final String DEFAULT_COLO = "local"; - private static final String CLUSTER = "cluster"; - private static final String COLO = "colo"; + protected static final String CLUSTER = "cluster"; + protected static final String COLO = "colo"; protected static final String CLUSTER_TEMPLATE = "/local-cluster-template.xml"; protected static final String STAGING_PATH = "/projects/falcon/staging"; protected static final String WORKING_PATH = "/projects/falcon/working"; @@ -105,7 +105,7 @@ public class FalconUnitTestBase { protected static ConfigurationStore configStore; @BeforeClass - public void setup() throws FalconException, IOException { + public void setup() throws Exception { FalconUnit.start(true); falconUnitClient = FalconUnit.getClient(); fs = (JailedFileSystem) FalconUnit.getFileSystem(); http://git-wip-us.apache.org/repos/asf/falcon/blob/4591ffb6/unit/src/test/java/org/apache/falcon/unit/examples/JavaHelloWorldExample.java ---------------------------------------------------------------------- diff --git a/unit/src/test/java/org/apache/falcon/unit/examples/JavaHelloWorldExample.java b/unit/src/test/java/org/apache/falcon/unit/examples/JavaHelloWorldExample.java new file mode 100644 index 0000000..1481c4b --- /dev/null +++ b/unit/src/test/java/org/apache/falcon/unit/examples/JavaHelloWorldExample.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.unit.examples; + +import java.io.IOException; + +/** + * Hello World Example for Unit Tests. This is used in JavaAction in Falcon Unit Tests to check + * whether workflow succeeded or not. + */ +public final class JavaHelloWorldExample { + + private JavaHelloWorldExample() {} + + public static void main(String[] args) throws IOException { + System.out.println("Hello World"); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4591ffb6/webapp/pom.xml ---------------------------------------------------------------------- diff --git a/webapp/pom.xml b/webapp/pom.xml index 428f67e..05616c5 100644 --- a/webapp/pom.xml +++ b/webapp/pom.xml @@ -140,6 +140,14 @@ <dependency> <groupId>org.apache.falcon</groupId> + <artifactId>falcon-scheduler</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.falcon</groupId> <artifactId>falcon-retention</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/falcon/blob/4591ffb6/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java new file mode 100644 index 0000000..f5bcc54 --- /dev/null +++ b/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.resource; + +import org.apache.commons.lang3.StringUtils; +import org.apache.falcon.client.FalconCLIException; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.state.AbstractSchedulerTestBase; +import org.apache.falcon.state.store.service.FalconJPAService; +import org.apache.falcon.unit.FalconUnitTestBase; +import org.apache.falcon.util.StartupProperties; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +/** + * Base class for tests using Native Scheduler. + */ +public class AbstractSchedulerManagerJerseyIT extends FalconUnitTestBase { + + public static final String PROCESS_TEMPLATE = "/local-process-noinputs-template.xml"; + public static final String PROCESS_NAME = "processName"; + protected static final String START_INSTANCE = "2012-04-20T00:00Z"; + private static FalconJPAService falconJPAService = FalconJPAService.get(); + private static final String DB_BASE_DIR = "target/test-data/falcondb"; + protected static String dbLocation = DB_BASE_DIR + File.separator + "data.db"; + protected static String url = "jdbc:derby:"+ dbLocation +";create=true"; + protected static final String DB_SQL_FILE = DB_BASE_DIR + File.separator + "out.sql"; + protected LocalFileSystem localFS = new LocalFileSystem(); + + + @BeforeClass + public void setup() throws Exception { + Configuration localConf = new Configuration(); + localFS.initialize(LocalFileSystem.getDefaultUri(localConf), localConf); + cleanupDB(); + localFS.mkdirs(new Path(DB_BASE_DIR)); + updateStartUpProps(); + falconJPAService.init(); + createDB(); + super.setup(); + } + + private void updateStartUpProps() { + StartupProperties.get().setProperty("workflow.engine.impl", + "org.apache.falcon.workflow.engine.FalconWorkflowEngine"); + StartupProperties.get().setProperty("dag.engine.impl", + "org.apache.falcon.workflow.engine.OozieDAGEngine"); + String[] listeners = StartupProperties.get().getProperty("configstore.listeners").split(","); + List<String> configListeners = new ArrayList<>(Arrays.asList(listeners)); + configListeners.remove("org.apache.falcon.service.SharedLibraryHostingService"); + configListeners.add("org.apache.falcon.state.store.jdbc.JDBCStateStore"); + StartupProperties.get().setProperty("configstore.listeners", StringUtils.join(configListeners, ",")); + StartupProperties.get().getProperty("falcon.state.store.impl", + "org.apache.falcon.state.store.jdbc.JDBCStateStore"); + } + + protected void submitProcess(Map<String, String> overlay) throws IOException, FalconCLIException { + String tmpFile = TestContext.overlayParametersOverTemplate(PROCESS_TEMPLATE, overlay); + APIResult result = submit(EntityType.PROCESS, tmpFile); + assertStatus(result); + } + + protected void scheduleProcess(String processName, String cluster, + String startTime, int noOfInstances) throws FalconCLIException { + APIResult result = falconUnitClient.schedule(EntityType.PROCESS, processName, startTime, noOfInstances, + cluster, true, null); + assertStatus(result); + } + + protected void setupProcessExecution(UnitTestContext context, + Map<String, String> overlay, int numInstances) throws Exception { + String colo = overlay.get(COLO); + String cluster = overlay.get(CLUSTER); + submitCluster(colo, cluster, null); + context.prepare(); + submitProcess(overlay); + + String processName = overlay.get(PROCESS_NAME); + scheduleProcess(processName, cluster, START_INSTANCE, numInstances); + } + + private void createDB() throws Exception { + AbstractSchedulerTestBase abstractSchedulerTestBase = new AbstractSchedulerTestBase(); + StartupProperties.get().setProperty(FalconJPAService.URL, url); + abstractSchedulerTestBase.createDB(DB_SQL_FILE); + } + + @AfterClass + public void cleanup() throws Exception { + super.cleanup(); + cleanupDB(); + } + + private void cleanupDB() throws IOException { + localFS.delete(new Path(DB_BASE_DIR), true); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4591ffb6/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java index 439d148..258bb1a 100644 --- a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java +++ b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java @@ -89,7 +89,7 @@ public class EntityManagerJerseyIT extends FalconUnitTestBase { @BeforeClass @Override - public void setup() throws FalconException, IOException { + public void setup() throws Exception { String version = System.getProperty("project.version"); String buildDir = System.getProperty("project.build.directory"); System.setProperty("falcon.libext", buildDir + "/../../unit/target/falcon-unit-" + version + ".jar"); http://git-wip-us.apache.org/repos/asf/falcon/blob/4591ffb6/webapp/src/test/java/org/apache/falcon/resource/EntitySchedulerManagerJerseyIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/resource/EntitySchedulerManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/EntitySchedulerManagerJerseyIT.java new file mode 100644 index 0000000..35119f0 --- /dev/null +++ b/webapp/src/test/java/org/apache/falcon/resource/EntitySchedulerManagerJerseyIT.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.resource; + +import org.apache.falcon.entity.v0.EntityType; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.util.Map; + + +/** + * Test cases for Entity operations using Falcon Native Scheduler. + */ +public class EntitySchedulerManagerJerseyIT extends AbstractSchedulerManagerJerseyIT { + + @Test + public void testEntitySubmitAndSchedule() throws Exception { + UnitTestContext context = new UnitTestContext(); + Map<String, String> overlay = context.getUniqueOverlay(); + String colo = overlay.get(COLO); + String cluster = overlay.get(CLUSTER); + submitCluster(colo, cluster, null); + context.prepare(); + + submitProcess(overlay); + + String processName = overlay.get(PROCESS_NAME); + APIResult result = falconUnitClient.getStatus(EntityType.PROCESS, overlay.get(PROCESS_NAME), cluster, null); + assertStatus(result); + Assert.assertEquals(AbstractEntityManager.EntityStatus.SUBMITTED.name(), result.getMessage()); + + scheduleProcess(processName, cluster, START_INSTANCE, 1); + + result = falconUnitClient.getStatus(EntityType.PROCESS, processName, cluster, null); + assertStatus(result); + Assert.assertEquals(AbstractEntityManager.EntityStatus.RUNNING.name(), result.getMessage()); + + } + + @Test + public void testEntitySuspendResume() throws Exception { + UnitTestContext context = new UnitTestContext(); + Map<String, String> overlay = context.getUniqueOverlay(); + String colo = overlay.get(COLO); + String cluster = overlay.get(CLUSTER); + submitCluster(colo, cluster, null); + context.prepare(); + + submitProcess(overlay); + + String processName = overlay.get(PROCESS_NAME); + APIResult result = falconUnitClient.getStatus(EntityType.PROCESS, processName, cluster, null); + assertStatus(result); + Assert.assertEquals(AbstractEntityManager.EntityStatus.SUBMITTED.name(), result.getMessage()); + + scheduleProcess(processName, cluster, START_INSTANCE, 1); + + result = falconUnitClient.suspend(EntityType.PROCESS, processName, cluster, null); + assertStatus(result); + + result = falconUnitClient.getStatus(EntityType.PROCESS, processName, cluster, null); + assertStatus(result); + Assert.assertEquals(AbstractEntityManager.EntityStatus.SUSPENDED.name(), result.getMessage()); + + result = falconUnitClient.resume(EntityType.PROCESS, processName, cluster, null); + assertStatus(result); + + result = falconUnitClient.getStatus(EntityType.PROCESS, processName, cluster, null); + assertStatus(result); + Assert.assertEquals(AbstractEntityManager.EntityStatus.RUNNING.name(), result.getMessage()); + + } + + + @Test + public void testProcessDelete() throws Exception { + UnitTestContext context = new UnitTestContext(); + Map<String, String> overlay = context.getUniqueOverlay(); + String colo = overlay.get(COLO); + String cluster = overlay.get(CLUSTER); + submitCluster(colo, cluster, null); + context.prepare(); + + submitProcess(overlay); + + String processName = overlay.get(PROCESS_NAME); + APIResult result = falconUnitClient.getStatus(EntityType.PROCESS, processName, cluster, null); + assertStatus(result); + Assert.assertEquals(AbstractEntityManager.EntityStatus.SUBMITTED.name(), result.getMessage()); + + scheduleProcess(processName, cluster, START_INSTANCE, 1); + + result = falconUnitClient.getStatus(EntityType.PROCESS, processName, cluster, null); + assertStatus(result); + + result = falconUnitClient.delete(EntityType.PROCESS, processName, cluster); + assertStatus(result); + + } + +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4591ffb6/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java new file mode 100644 index 0000000..7959b63 --- /dev/null +++ b/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.resource; + +import org.apache.falcon.FalconException; +import org.apache.falcon.client.FalconCLIException; +import org.apache.falcon.entity.v0.EntityType; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.Map; + +/** + * Tests for Instance operations using Falcon Native Scheduler. + */ +public class InstanceSchedulerManagerJerseyIT extends AbstractSchedulerManagerJerseyIT { + + + private static final String END_TIME = "2012-04-21T00:00Z"; + private static final String HELLO_WORLD_WORKFLOW = "helloworldworkflow.xml"; + + + @Test + public void testProcessInstanceExecution() throws Exception { + UnitTestContext context = new UnitTestContext(); + Map<String, String> overlay = context.getUniqueOverlay(); + String colo = overlay.get(COLO); + String cluster = overlay.get(CLUSTER); + + submitCluster(colo, cluster, null); + context.prepare(HELLO_WORLD_WORKFLOW); + submitProcess(overlay); + + String processName = overlay.get(PROCESS_NAME); + scheduleProcess(processName, cluster, START_INSTANCE, 1); + + waitForStatus(EntityType.PROCESS.toString(), processName, + START_INSTANCE, InstancesResult.WorkflowStatus.SUCCEEDED); + + InstancesResult.WorkflowStatus status = getClient().getInstanceStatus(EntityType.PROCESS.name(), + processName, START_INSTANCE); + Assert.assertEquals(status, InstancesResult.WorkflowStatus.SUCCEEDED); + + } + + @Test + public void testKillInstances() throws Exception { + UnitTestContext context = new UnitTestContext(); + Map<String, String> overlay = context.getUniqueOverlay(); + + setupProcessExecution(context, overlay, 1); + + String processName = overlay.get(PROCESS_NAME); + String colo = overlay.get(COLO); + + waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE, + InstancesResult.WorkflowStatus.RUNNING); + + InstancesResult result = falconUnitClient.killInstances(EntityType.PROCESS.toString(), + processName, START_INSTANCE, END_TIME, colo, null, null, null, null); + assertStatus(result); + + InstancesResult.WorkflowStatus status = getClient().getInstanceStatus(EntityType.PROCESS.name(), + processName, START_INSTANCE); + Assert.assertEquals(status, InstancesResult.WorkflowStatus.KILLED); + + + } + + @Test + public void testSuspendResumeInstances() throws Exception { + UnitTestContext context = new UnitTestContext(); + Map<String, String> overlay = context.getUniqueOverlay(); + + setupProcessExecution(context, overlay, 1); + + String processName = overlay.get(PROCESS_NAME); + String colo = overlay.get(COLO); + + waitForStatus(EntityType.PROCESS.toString(), processName, + START_INSTANCE, InstancesResult.WorkflowStatus.RUNNING); + + falconUnitClient.suspendInstances(EntityType.PROCESS.toString(), processName, START_INSTANCE, + END_TIME, colo, null, null, null, null); + + InstancesResult.WorkflowStatus status = getClient().getInstanceStatus(EntityType.PROCESS.name(), + processName, START_INSTANCE); + Assert.assertEquals(status, InstancesResult.WorkflowStatus.SUSPENDED); + + falconUnitClient.resumeInstances(EntityType.PROCESS.toString(), processName, START_INSTANCE, + END_TIME, colo, null, null, null, null); + status = getClient().getInstanceStatus(EntityType.PROCESS.name(), + processName, START_INSTANCE); + Assert.assertEquals(status, InstancesResult.WorkflowStatus.RUNNING); + } + + @Test + public void testListInstances() throws Exception { + UnitTestContext context = new UnitTestContext(); + Map<String, String> overlay = context.getUniqueOverlay(); + + setupProcessExecution(context, overlay, 4); + + String processName = overlay.get(PROCESS_NAME); + String colo = overlay.get(COLO); + + waitForStatus(EntityType.PROCESS.toString(), processName, + START_INSTANCE, InstancesResult.WorkflowStatus.RUNNING); + + InstancesResult result = falconUnitClient.getStatusOfInstances(EntityType.PROCESS.toString(), processName, + START_INSTANCE, "2012-04-23T00:00Z", colo, null, null, null, null, 0, 3, null); + Assert.assertEquals(3, result.getInstances().length); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4591ffb6/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java b/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java index 769d059..f94bd8c 100644 --- a/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java +++ b/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java @@ -47,7 +47,7 @@ public class ProcessInstanceManagerIT extends FalconUnitTestBase { @BeforeClass @Override - public void setup() throws FalconException, IOException { + public void setup() throws Exception { String version = System.getProperty("project.version"); String buildDir = System.getProperty("project.build.directory"); System.setProperty("falcon.libext", buildDir + "/../../unit/target/falcon-unit-" + version + ".jar"); http://git-wip-us.apache.org/repos/asf/falcon/blob/4591ffb6/webapp/src/test/java/org/apache/falcon/resource/UnitTestContext.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/resource/UnitTestContext.java b/webapp/src/test/java/org/apache/falcon/resource/UnitTestContext.java index b222305..1d3167b 100644 --- a/webapp/src/test/java/org/apache/falcon/resource/UnitTestContext.java +++ b/webapp/src/test/java/org/apache/falcon/resource/UnitTestContext.java @@ -87,7 +87,7 @@ public class UnitTestContext { } } - protected void prepare() throws Exception { + protected void prepare(String workflow) throws Exception { mkdir(fs, new Path("/falcon"), new FsPermission((short) 511)); Path wfParent = new Path("/falcon/test"); @@ -96,7 +96,7 @@ public class UnitTestContext { mkdir(fs, wfPath); mkdir(fs, new Path("/falcon/test/workflow/lib")); fs.copyFromLocalFile(false, true, - new Path(TestContext.class.getResource("/sleepWorkflow.xml").getPath()), + new Path(TestContext.class.getResource("/" + workflow).getPath()), new Path(wfPath, "workflow.xml")); mkdir(fs, new Path(wfParent, "input/2012/04/20/00")); mkdir(fs, new Path(wfParent, "input/2012/04/21/00")); @@ -104,6 +104,10 @@ public class UnitTestContext { mkdir(fs, outPath, new FsPermission((short) 511)); } + protected void prepare() throws Exception { + prepare("sleepWorkflow.xml"); + } + public static File getTempFile() throws IOException { return getTempFile("test", ".xml"); } http://git-wip-us.apache.org/repos/asf/falcon/blob/4591ffb6/webapp/src/test/resources/helloworldworkflow.xml ---------------------------------------------------------------------- diff --git a/webapp/src/test/resources/helloworldworkflow.xml b/webapp/src/test/resources/helloworldworkflow.xml new file mode 100644 index 0000000..354bb34 --- /dev/null +++ b/webapp/src/test/resources/helloworldworkflow.xml @@ -0,0 +1,39 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<workflow-app xmlns="uri:oozie:workflow:0.2" name="java-main-wf"> + <start to="java-node"/> + <action name="java-node"> + <java> + <job-tracker>local</job-tracker> + <name-node>jail://global:00</name-node> + <configuration> + <property> + <name>mapred.job.queue.name</name> + <value>default</value> + </property> + </configuration> + <main-class>org.apache.falcon.unit.examples.JavaHelloWorldExample</main-class> + </java> + <ok to="end"/> + <error to="fail"/> + </action> + <kill name="fail"> + <message>Java failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> + </kill> + <end name="end"/> +</workflow-app> http://git-wip-us.apache.org/repos/asf/falcon/blob/4591ffb6/webapp/src/test/resources/local-process-noinputs-template.xml ---------------------------------------------------------------------- diff --git a/webapp/src/test/resources/local-process-noinputs-template.xml b/webapp/src/test/resources/local-process-noinputs-template.xml new file mode 100644 index 0000000..aabdc6a --- /dev/null +++ b/webapp/src/test/resources/local-process-noinputs-template.xml @@ -0,0 +1,42 @@ +<?xml version="1.0" encoding="UTF-8" standalone="yes"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<process name="##processName##" xmlns="uri:falcon:process:0.1"> + <tags>[email protected], [email protected], department=forecasting</tags> + <pipelines>testPipeline,dataReplicationPipeline</pipelines> + <clusters> + <cluster name="##src.cluster.name##"> + <validity end="##processEndDate##" start="2012-04-20T00:00Z"/> + </cluster> + </clusters> + + <parallel>2</parallel> + <order>FIFO</order> + <frequency>days(1)</frequency> + <timezone>UTC</timezone> + + <properties> + <property name="fileTime" value="${formatTime(dateOffset(instanceTime(), 1, 'DAY'), 'yyyy-MMM-dd')}"/> + <property name="user" value="${user()}"/> + <property name="baseTime" value="${today(0,0)}"/> + <property name="sundayThisWeek" value="${currentWeek('SUN', 0, 0)}"/> + </properties> + <workflow engine="oozie" path="##workflow.path##" lib="##workflow.lib.path##"/> + <retry policy="periodic" delay="minutes(10)" attempts="3"/> +</process> http://git-wip-us.apache.org/repos/asf/falcon/blob/4591ffb6/webapp/src/test/resources/runtime.properties ---------------------------------------------------------------------- diff --git a/webapp/src/test/resources/runtime.properties b/webapp/src/test/resources/runtime.properties index 1da0ca7..fec9e44 100644 --- a/webapp/src/test/resources/runtime.properties +++ b/webapp/src/test/resources/runtime.properties @@ -47,4 +47,6 @@ *.falcon.service.ProxyUserService.proxyuser.#USER#.groups=* +*.falcon.jms.notification.enabled=true + ######### Proxyuser Configuration End ######### http://git-wip-us.apache.org/repos/asf/falcon/blob/4591ffb6/webapp/src/test/resources/startup.properties ---------------------------------------------------------------------- diff --git a/webapp/src/test/resources/startup.properties b/webapp/src/test/resources/startup.properties index 756f315..bc88534 100644 --- a/webapp/src/test/resources/startup.properties +++ b/webapp/src/test/resources/startup.properties @@ -20,7 +20,6 @@ ######### Implementation classes ######### ## DONT MODIFY UNLESS SURE ABOUT CHANGE ## - *.workflow.engine.impl=org.apache.falcon.workflow.engine.OozieWorkflowEngine *.oozie.process.workflow.builder=org.apache.falcon.workflow.OozieProcessWorkflowBuilder *.oozie.feed.workflow.builder=org.apache.falcon.workflow.OozieFeedWorkflowBuilder @@ -33,11 +32,17 @@ *.application.services=org.apache.falcon.security.AuthenticationInitializationService,\ org.apache.falcon.workflow.WorkflowJobEndNotificationService, \ org.apache.falcon.service.ProcessSubscriberService,\ + org.apache.falcon.state.store.service.FalconJPAService,\ org.apache.falcon.entity.store.ConfigurationStore,\ org.apache.falcon.rerun.service.RetryService,\ org.apache.falcon.rerun.service.LateRunService,\ org.apache.falcon.metadata.MetadataMappingService,\ - org.apache.falcon.service.ProxyUserService + org.apache.falcon.service.ProxyUserService,\ + org.apache.falcon.notification.service.impl.JobCompletionService,\ + org.apache.falcon.notification.service.impl.SchedulerService,\ + org.apache.falcon.notification.service.impl.AlarmService,\ + org.apache.falcon.notification.service.impl.DataAvailabilityService,\ + org.apache.falcon.execution.FalconExecutionService ##### Falcon Configuration Store Change listeners ##### *.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\ @@ -46,6 +51,7 @@ org.apache.falcon.entity.store.FeedLocationStore,\ org.apache.falcon.service.SharedLibraryHostingService + ##### JMS MQ Broker Implementation class ##### *.broker.impl.class=org.apache.activemq.ActiveMQConnectionFactory @@ -140,4 +146,23 @@ debug.libext.process.paths=${falcon.libext} *.falcon.graph.serialize.path=${user.dir}/target/graphdb *.falcon.graph.preserve.history=false *.falcon.graph.transaction.retry.count=3 -*.falcon.graph.transaction.retry.delay=5 \ No newline at end of file +*.falcon.graph.transaction.retry.delay=5 + +######## StateStore Properties ##### +*.falcon.state.store.impl=org.apache.falcon.state.store.jdbc.JDBCStateStore +*.falcon.statestore.jdbc.driver=org.apache.derby.jdbc.EmbeddedDriver +*.falcon.statestore.jdbc.url=jdbc:derby:target/test-data/data.db;create=true +*.falcon.statestore.jdbc.username=sa +*.falcon.statestore.jdbc.password= +*.falcon.statestore.connection.data.source=org.apache.commons.dbcp.BasicDataSource +# Maximum number of active connections that can be allocated from this pool at the same time. +*.falcon.statestore.pool.max.active.conn=10 +*.falcon.statestore.connection.properties= +# Indicates the interval (in milliseconds) between eviction runs. +*.falcon.statestore.validate.db.connection.eviction.interval=300000 +# The number of objects to examine during each run of the idle object evictor thread. +*.falcon.statestore.validate.db.connection.eviction.num=10 +# Creates Falcon DB. +# If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP. +# If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up. +*.falcon.statestore.create.db.schema=true \ No newline at end of file
