FALCON-55 Update suspends old oozie coords. Contributed by Shwetha GS
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/9d273658 Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/9d273658 Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/9d273658 Branch: refs/heads/v0.3 Commit: 9d2736589d2d93758774a9f81d70e9266a4d582e Parents: 54d827f Author: srikanth.sundarrajan <srikanth.sundarra...@inmobi.com> Authored: Mon Jul 22 19:13:11 2013 +0530 Committer: srikanth.sundarrajan <srikanth.sundarra...@inmobi.com> Committed: Mon Jul 22 19:13:11 2013 +0530 ---------------------------------------------------------------------- CHANGES.txt | 6 +++ .../workflow/engine/OozieWorkflowEngine.java | 27 ++++++------ .../oozie/client/CustomOozieClientTest.java | 38 ----------------- .../falcon/resource/EntityManagerJerseyIT.java | 44 ++++++++++++-------- .../org/apache/falcon/resource/TestContext.java | 4 ++ 5 files changed, 50 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9d273658/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 4c655c8..d7e91ae 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,12 @@ Release Version: 0.3 NEW FEATURES IMPROVEMENTS + + FALCON-56 Update Falcon packing to include all source files. (Srikanth + Sundarrajan) + + FALCON-55 Update suspends old oozie coords. (Shwetha GS via Srikanth + Sundarrajan) FALCON-52 Main module configured used with jetty:run has issues with app start. (Srikanth Sundarrajan) http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9d273658/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java index fcb9d80..28a60c2 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java @@ -35,7 +35,7 @@ import org.apache.falcon.workflow.WorkflowBuilder; import org.apache.log4j.Logger; import org.apache.oozie.client.*; import org.apache.oozie.client.CoordinatorJob.Timeunit; -import org.apache.oozie.client.WorkflowJob.Status; +import org.apache.oozie.client.Job.Status; import java.util.*; import java.util.Map.Entry; @@ -51,14 +51,14 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { public static final String ENGINE = "oozie"; private static final BundleJob MISSING = new NullBundleJob(); - private static final List<Status> WF_KILL_PRECOND = Arrays.asList(Status.PREP, - Status.RUNNING, Status.SUSPENDED, Status.FAILED); - private static final List<Status> WF_SUSPEND_PRECOND = Arrays - .asList(Status.RUNNING); - private static final List<Status> WF_RESUME_PRECOND = Arrays - .asList(Status.SUSPENDED); - private static final List<Status> WF_RERUN_PRECOND = Arrays.asList(Status.FAILED, - Status.KILLED, Status.SUCCEEDED); + private static final List<WorkflowJob.Status> WF_KILL_PRECOND = Arrays.asList(WorkflowJob.Status.PREP, + WorkflowJob.Status.RUNNING, WorkflowJob.Status.SUSPENDED, WorkflowJob.Status.FAILED); + private static final List<WorkflowJob.Status> WF_SUSPEND_PRECOND = Arrays + .asList(WorkflowJob.Status.RUNNING); + private static final List<WorkflowJob.Status> WF_RESUME_PRECOND = Arrays + .asList(WorkflowJob.Status.SUSPENDED); + private static final List<WorkflowJob.Status> WF_RERUN_PRECOND = Arrays.asList(WorkflowJob.Status.FAILED, + WorkflowJob.Status.KILLED, WorkflowJob.Status.SUCCEEDED); private static final List<Job.Status> BUNDLE_ACTIVE_STATUS = Arrays.asList( Job.Status.PREP, Job.Status.RUNNING, Job.Status.SUSPENDED, @@ -845,7 +845,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { cal.setTime(coord.getLastActionTime()); Frequency freq = createFrequency(coord.getFrequency(), coord.getTimeUnit()); - cal.add(freq.getTimeUnit().getCalendarUnit(), -1); + cal.add(freq.getTimeUnit().getCalendarUnit(), -freq.getFrequency()); return cal.getTime(); } return null; @@ -906,6 +906,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } private void resume(String cluster, BundleJob bundle) throws FalconException { + bundle = getBundleInfo(cluster, bundle.getId()); for (CoordinatorJob coord : bundle.getCoordinators()) { resume(cluster, coord.getId()); } @@ -1029,7 +1030,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { jobprops.remove(OozieClient.COORDINATOR_APP_PATH); jobprops.remove(OozieClient.BUNDLE_APP_PATH); client.reRun(jobId, jobprops); - assertStatus(cluster, jobId, WorkflowJob.Status.RUNNING); + assertStatus(cluster, jobId, Job.Status.RUNNING); LOG.info("Rerun job " + jobId + " on cluster " + cluster); } catch (Exception e) { LOG.error("Unable to rerun workflows", e); @@ -1119,7 +1120,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { OozieClient client = OozieClientFactory.get(cluster); try { client.suspend(jobId); - assertStatus(cluster, jobId, Status.SUSPENDED, Status.SUCCEEDED, + assertStatus(cluster, jobId, Status.PREPSUSPENDED, Status.SUSPENDED, Status.SUCCEEDED, Status.FAILED, Status.KILLED); LOG.info("Suspended job " + jobId + " on cluster " + cluster); } catch (OozieClientException e) { @@ -1242,7 +1243,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { try { WorkflowJob jobInfo = client.getJobInfo(jobId); instance.startTime = jobInfo.getStartTime(); - if (jobInfo.getStatus() == Status.RUNNING) { + if (jobInfo.getStatus().name().equals(Status.RUNNING.name())) { instance.endTime = new Date(); } else { instance.endTime = jobInfo.getEndTime(); http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9d273658/oozie/src/test/java/org/apache/oozie/client/CustomOozieClientTest.java ---------------------------------------------------------------------- diff --git a/oozie/src/test/java/org/apache/oozie/client/CustomOozieClientTest.java b/oozie/src/test/java/org/apache/oozie/client/CustomOozieClientTest.java deleted file mode 100644 index cf0b385..0000000 --- a/oozie/src/test/java/org/apache/oozie/client/CustomOozieClientTest.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.oozie.client; - -import org.testng.annotations.Test; - -import java.util.Properties; - -/** - * Test to verify if the oozie client provided via CustomOozieClient is valid. - */ -public class CustomOozieClientTest { - - @Test(enabled = false) - public void testGetConfiguration() throws Exception { - CustomOozieClient client = new CustomOozieClient("http://localhost:11000/oozie"); - Properties props = client.getConfiguration(); - System.out.println(props); - props = client.getProperties(); - System.out.println(props); - } -} http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9d273658/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java index b94cff0..803e7f6 100644 --- a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java +++ b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java @@ -17,7 +17,23 @@ */ package org.apache.falcon.resource; -import com.sun.jersey.api.client.ClientResponse; +import java.io.File; +import java.io.InputStream; +import java.io.StringReader; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimeZone; + +import javax.servlet.ServletInputStream; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.xml.bind.JAXBException; + import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.process.Input; @@ -32,26 +48,13 @@ import org.apache.hadoop.fs.Path; import org.apache.oozie.client.BundleJob; import org.apache.oozie.client.Job; import org.apache.oozie.client.Job.Status; +import org.apache.oozie.client.OozieClient; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import javax.servlet.ServletInputStream; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import javax.xml.bind.JAXBException; -import java.io.File; -import java.io.InputStream; -import java.io.StringReader; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.TimeZone; +import com.sun.jersey.api.client.ClientResponse; /** * Test class for Entity REST APIs. @@ -186,6 +189,10 @@ public class EntityManagerJerseyIT { TestContext context = newContext(); context.scheduleProcess(); context.waitForBundleStart(Job.Status.RUNNING); + List<BundleJob> bundles = context.getBundles(); + Assert.assertEquals(bundles.size(), 1); + OozieClient ozClient = context.getOozieClient(); + String coordId = ozClient.getBundleJobInfo(bundles.get(0).getId()).getCoordinators().get(0).getId(); ClientResponse response = context.service.path("api/entities/definition/process/" + context.processName).header( @@ -218,9 +225,10 @@ public class EntityManagerJerseyIT { .post(ClientResponse.class, context.getServletInputStream(tmpFile.getAbsolutePath())); context.assertSuccessful(response); - //Assert that update creates new bundle - List<BundleJob> bundles = context.getBundles(); + //Assert that update creates new bundle and old coord is running + bundles = context.getBundles(); Assert.assertEquals(bundles.size(), 2); + Assert.assertEquals(ozClient.getCoordJobInfo(coordId).getStatus(), Status.RUNNING); } @Test http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9d273658/webapp/src/test/java/org/apache/falcon/resource/TestContext.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java index 6fa07c9..a7ee75c 100644 --- a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java +++ b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java @@ -366,6 +366,10 @@ public class TestContext { return File.createTempFile("test", ".xml", target); } + public OozieClient getOozieClient() throws FalconException { + return OozieClientFactory.get(cluster.getCluster()); + } + public List<BundleJob> getBundles() throws Exception { List<BundleJob> bundles = new ArrayList<BundleJob>(); if (clusterName == null) {