Updated Branches: refs/heads/master 605ed64f7 -> 67e54f610
OOZIE-1622 Multiple CoordSubmit for same bundle (shwethags via virag) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/67e54f61 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/67e54f61 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/67e54f61 Branch: refs/heads/master Commit: 67e54f61097f2be5f9efd487a5b54de2f4dd4613 Parents: 605ed64 Author: Virag Kothari <[email protected]> Authored: Wed Jan 29 12:42:18 2014 -0800 Committer: Virag Kothari <[email protected]> Committed: Wed Jan 29 12:42:18 2014 -0800 ---------------------------------------------------------------------- .../apache/oozie/service/RecoveryService.java | 25 ++-- .../command/bundle/TestBundleRerunXCommand.java | 40 +++---- .../oozie/service/TestRecoveryService.java | 120 ++++++++++++++++--- .../oozie/service/TestStatusTransitService.java | 10 +- .../org/apache/oozie/test/XDataTestCase.java | 2 +- release-log.txt | 1 + 6 files changed, 144 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/67e54f61/core/src/main/java/org/apache/oozie/service/RecoveryService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/RecoveryService.java b/core/src/main/java/org/apache/oozie/service/RecoveryService.java index c3ac6b3..2749bc4 100644 --- a/core/src/main/java/org/apache/oozie/service/RecoveryService.java +++ b/core/src/main/java/org/apache/oozie/service/RecoveryService.java @@ -179,31 +179,28 @@ public class RecoveryService implements Service { try { Services.get().get(InstrumentationService.class).get() .incr(INSTRUMENTATION_GROUP, INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER, 1); - if (baction.getCoordId() == null) { + if (baction.getCoordId() == null && baction.getStatus() != Job.Status.PREP) { log.error("CoordId is null for Bundle action " + baction.getBundleActionId()); continue; } if (Services.get().get(JobsConcurrencyService.class).isJobIdForThisServer(baction.getCoordId())) { - if (baction.getStatus() == Job.Status.PREP) { + if (baction.getStatus() == Job.Status.PREP && baction.getCoordId() == null) { BundleJobBean bundleJob = null; if (jpaService != null) { bundleJob = BundleJobQueryExecutor.getInstance().get( BundleJobQuery.GET_BUNDLE_JOB_ID_JOBXML_CONF, baction.getBundleId()); } - if (bundleJob != null) { - Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml()); - List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace()); - for (Element coordElem : coordElems) { - Attribute name = coordElem.getAttribute("name"); - if (name.getValue().equals(baction.getCoordName())) { - Configuration coordConf = mergeConfig(coordElem, bundleJob); - coordConf.set(OozieClient.BUNDLE_ID, baction.getBundleId()); - queueCallable(new CoordSubmitXCommand(coordConf, - bundleJob.getId(), name.getValue())); - } + Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml()); + List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace()); + for (Element coordElem : coordElems) { + Attribute name = coordElem.getAttribute("name"); + if (name.getValue().equals(baction.getCoordName())) { + Configuration coordConf = mergeConfig(coordElem, bundleJob); + coordConf.set(OozieClient.BUNDLE_ID, baction.getBundleId()); + queueCallable(new CoordSubmitXCommand(coordConf, + bundleJob.getId(), name.getValue())); } } - } else if (baction.getStatus() == Job.Status.KILLED) { queueCallable(new CoordKillXCommand(baction.getCoordId())); http://git-wip-us.apache.org/repos/asf/oozie/blob/67e54f61/core/src/test/java/org/apache/oozie/command/bundle/TestBundleRerunXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/bundle/TestBundleRerunXCommand.java b/core/src/test/java/org/apache/oozie/command/bundle/TestBundleRerunXCommand.java index 178171e..150fe82 100644 --- a/core/src/test/java/org/apache/oozie/command/bundle/TestBundleRerunXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/bundle/TestBundleRerunXCommand.java @@ -58,10 +58,10 @@ public class TestBundleRerunXCommand extends XDataTestCase { */ public void testBundleRerun1() throws Exception { BundleJobBean job = this.addRecordToBundleJobTable(Job.Status.SUCCEEDED, false); - this.addRecordToBundleActionTable(job.getId(), "action1", 0, Job.Status.SUCCEEDED); - this.addRecordToBundleActionTable(job.getId(), "action2", 0, Job.Status.SUCCEEDED); - addRecordToCoordJobTable("action1", CoordinatorJob.Status.SUCCEEDED, false, false); - addRecordToCoordJobTable("action2", CoordinatorJob.Status.SUCCEEDED, false, false); + CoordinatorJobBean coord1 = addRecordToCoordJobTable("action1", CoordinatorJob.Status.SUCCEEDED, false, false); + CoordinatorJobBean coord2 = addRecordToCoordJobTable("action2", CoordinatorJob.Status.SUCCEEDED, false, false); + this.addRecordToBundleActionTable(job.getId(), coord1.getId(), "action1", 0, Job.Status.SUCCEEDED); + this.addRecordToBundleActionTable(job.getId(), coord2.getId(), "action2", 0, Job.Status.SUCCEEDED); JPAService jpaService = Services.get().get(JPAService.class); assertNotNull(jpaService); @@ -83,10 +83,10 @@ public class TestBundleRerunXCommand extends XDataTestCase { */ public void testBundleRerun2() throws Exception { BundleJobBean job = this.addRecordToBundleJobTable(Job.Status.SUCCEEDED, false); - this.addRecordToBundleActionTable(job.getId(), "action1", 0, Job.Status.SUCCEEDED); - this.addRecordToBundleActionTable(job.getId(), "action2", 0, Job.Status.SUCCEEDED); - addRecordToCoordJobTable("action1", CoordinatorJob.Status.SUCCEEDED, false, false); - addRecordToCoordJobTable("action2", CoordinatorJob.Status.SUCCEEDED, false, false); + CoordinatorJobBean coord1 = addRecordToCoordJobTable("action1", CoordinatorJob.Status.SUCCEEDED, false, false); + CoordinatorJobBean coord2 = addRecordToCoordJobTable("action2", CoordinatorJob.Status.SUCCEEDED, false, false); + this.addRecordToBundleActionTable(job.getId(), coord1.getId(), "action1", 0, Job.Status.SUCCEEDED); + this.addRecordToBundleActionTable(job.getId(), coord2.getId(), "action2", 0, Job.Status.SUCCEEDED); JPAService jpaService = Services.get().get(JPAService.class); assertNotNull(jpaService); @@ -136,10 +136,10 @@ public class TestBundleRerunXCommand extends XDataTestCase { services = new Services(); services.init(); BundleJobBean job = this.addRecordToBundleJobTable(Job.Status.DONEWITHERROR, false); - this.addRecordToBundleActionTable(job.getId(), "action1", 0, Job.Status.SUCCEEDED); - this.addRecordToBundleActionTable(job.getId(), "action2", 0, Job.Status.FAILED); - addRecordToCoordJobTable("action1", CoordinatorJob.Status.SUCCEEDED, false, false); - addRecordToCoordJobTable("action2", CoordinatorJob.Status.FAILED, false, false); + CoordinatorJobBean coord1 = addRecordToCoordJobTable("action1", CoordinatorJob.Status.SUCCEEDED, false, false); + CoordinatorJobBean coord2 = addRecordToCoordJobTable("action2", CoordinatorJob.Status.FAILED, false, false); + this.addRecordToBundleActionTable(job.getId(), coord1.getId(), "action1", 0, Job.Status.SUCCEEDED); + this.addRecordToBundleActionTable(job.getId(), coord2.getId(), "action2", 0, Job.Status.FAILED); JPAService jpaService = Services.get().get(JPAService.class); assertNotNull(jpaService); @@ -244,10 +244,10 @@ public class TestBundleRerunXCommand extends XDataTestCase { */ public void testBundleRerunInSuspended() throws Exception { BundleJobBean job = this.addRecordToBundleJobTable(Job.Status.SUSPENDED, false); - this.addRecordToBundleActionTable(job.getId(), "action1", 0, Job.Status.SUSPENDED); - this.addRecordToBundleActionTable(job.getId(), "action2", 0, Job.Status.SUSPENDED); - addRecordToCoordJobTable("action1", CoordinatorJob.Status.SUSPENDED, false, false); - addRecordToCoordJobTable("action2", CoordinatorJob.Status.SUSPENDED, false, false); + CoordinatorJobBean coord1 = addRecordToCoordJobTable("action1", CoordinatorJob.Status.SUSPENDED, false, false); + CoordinatorJobBean coord2 = addRecordToCoordJobTable("action2", CoordinatorJob.Status.SUSPENDED, false, false); + this.addRecordToBundleActionTable(job.getId(), coord1.getId(), "action1", 0, Job.Status.SUSPENDED); + this.addRecordToBundleActionTable(job.getId(), coord2.getId(), "action2", 0, Job.Status.SUSPENDED); JPAService jpaService = Services.get().get(JPAService.class); assertNotNull(jpaService); @@ -272,10 +272,10 @@ public class TestBundleRerunXCommand extends XDataTestCase { services = new Services(); services.init(); BundleJobBean job = this.addRecordToBundleJobTable(Job.Status.SUSPENDEDWITHERROR, false); - this.addRecordToBundleActionTable(job.getId(), "action1", 0, Job.Status.SUSPENDED); - this.addRecordToBundleActionTable(job.getId(), "action2", 0, Job.Status.SUSPENDEDWITHERROR); - addRecordToCoordJobTable("action1", CoordinatorJob.Status.SUSPENDED, false, false); - addRecordToCoordJobTable("action2", CoordinatorJob.Status.SUSPENDEDWITHERROR, false, false); + CoordinatorJobBean coord1 = addRecordToCoordJobTable("action1", CoordinatorJob.Status.SUSPENDED, false, false); + CoordinatorJobBean coord2 = addRecordToCoordJobTable("action2", CoordinatorJob.Status.SUSPENDEDWITHERROR, false, false); + this.addRecordToBundleActionTable(job.getId(), coord1.getId(), "action1", 0, Job.Status.SUSPENDED); + this.addRecordToBundleActionTable(job.getId(), coord2.getId(), "action2", 0, Job.Status.SUSPENDEDWITHERROR); JPAService jpaService = Services.get().get(JPAService.class); assertNotNull(jpaService); http://git-wip-us.apache.org/repos/asf/oozie/blob/67e54f61/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java b/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java index 543a8cf..c0eb829 100644 --- a/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java +++ b/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java @@ -17,20 +17,6 @@ */ package org.apache.oozie.service; -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.io.PrintWriter; -import java.io.Reader; -import java.io.StringReader; -import java.io.Writer; -import java.net.URI; -import java.util.Collection; -import java.util.Date; -import java.util.List; -import java.util.Map; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -38,6 +24,8 @@ import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobID; import org.apache.hadoop.mapred.RunningJob; +import org.apache.oozie.BundleActionBean; +import org.apache.oozie.BundleJobBean; import org.apache.oozie.CoordinatorActionBean; import org.apache.oozie.CoordinatorEngine; import org.apache.oozie.CoordinatorJobBean; @@ -50,17 +38,20 @@ import org.apache.oozie.action.hadoop.MapReduceActionExecutor; import org.apache.oozie.action.hadoop.MapperReducerForTest; import org.apache.oozie.client.CoordinatorAction; import org.apache.oozie.client.CoordinatorJob; +import org.apache.oozie.client.CoordinatorJob.Execution; +import org.apache.oozie.client.Job; import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.client.WorkflowJob; -import org.apache.oozie.client.CoordinatorJob.Execution; import org.apache.oozie.command.wf.ActionXCommand; import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext; import org.apache.oozie.coord.CoordELFunctions; import org.apache.oozie.dependency.FSURIHandler; import org.apache.oozie.dependency.HCatURIHandler; +import org.apache.oozie.executor.jpa.BundleActionGetJPAExecutor; import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor; import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor; +import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; import org.apache.oozie.executor.jpa.JPAExecutorException; import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor; import org.apache.oozie.executor.jpa.WorkflowActionInsertJPAExecutor; @@ -78,6 +69,20 @@ import org.apache.oozie.util.XLog; import org.apache.oozie.util.XmlUtils; import org.apache.oozie.workflow.WorkflowInstance; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.io.Reader; +import java.io.StringReader; +import java.io.Writer; +import java.net.URI; +import java.util.Collection; +import java.util.Date; +import java.util.List; +import java.util.Map; + public class TestRecoveryService extends XDataTestCase { private Services services; private String server; @@ -245,6 +250,91 @@ public class TestRecoveryService extends XDataTestCase { assertTrue(LauncherMapperHelper.hasIdSwap(actionData)); } + /** + * If the bundle action is in PREP state and coord is not yet created, recovery should submit new coord + * @throws Exception + */ + public void testBundleRecoveryCoordCreate() throws Exception { + CoordinatorStore store = Services.get().get(StoreService.class).getStore(CoordinatorStore.class); + final BundleActionBean bundleAction; + final BundleJobBean bundle; + store.beginTrx(); + try { + bundle = addRecordToBundleJobTable(Job.Status.RUNNING, false); + bundleAction = addRecordToBundleActionTable(bundle.getId(), "coord1", 1, Job.Status.PREP); + store.commitTrx(); + } + finally { + store.closeTrx(); + } + final JPAService jpaService = Services.get().get(JPAService.class); + + sleep(3000); + Runnable recoveryRunnable = new RecoveryRunnable(0, 1,1); + recoveryRunnable.run(); + + waitFor(10000, new Predicate() { + public boolean evaluate() throws Exception { + BundleActionBean mybundleAction = + jpaService.execute(new BundleActionGetJPAExecutor(bundle.getId(), "coord1")); + try { + if (mybundleAction.getCoordId() != null) { + CoordinatorJobBean coord = jpaService.execute(new CoordJobGetJPAExecutor(mybundleAction.getCoordId())); + return true; + } + } catch (Exception e) { + } + return false; + } + }); + + BundleActionBean mybundleAction = jpaService.execute(new BundleActionGetJPAExecutor(bundle.getId(), "coord1")); + assertNotNull(mybundleAction.getCoordId()); + + try { + jpaService.execute(new CoordJobGetJPAExecutor(mybundleAction.getCoordId())); + } catch(Exception e) { + e.printStackTrace(); + fail("Expected coord " + mybundleAction.getCoordId() + " to be created"); + } + } + + /** + * If the bundle action is in PREP state and coord is already created, recovery should not submit new coord + * @throws Exception + */ + public void testBundleRecoveryCoordExists() throws Exception { + CoordinatorStore store = Services.get().get(StoreService.class).getStore(CoordinatorStore.class); + final BundleActionBean bundleAction; + final BundleJobBean bundle; + final CoordinatorJob coord; + store.beginTrx(); + try { + bundle = addRecordToBundleJobTable(Job.Status.RUNNING, false); + coord = addRecordToCoordJobTable(Job.Status.PREP, false, false); + bundleAction = addRecordToBundleActionTable(bundle.getId(), coord.getId(), "coord1", 1, Job.Status.PREP); + store.commitTrx(); + } + finally { + store.closeTrx(); + } + final JPAService jpaService = Services.get().get(JPAService.class); + + sleep(3000); + Runnable recoveryRunnable = new RecoveryRunnable(0, 1,1); + recoveryRunnable.run(); + + waitFor(3000, new Predicate() { + public boolean evaluate() throws Exception { + BundleActionBean mybundleAction = + jpaService.execute(new BundleActionGetJPAExecutor(bundle.getId(), "coord1")); + return !mybundleAction.getCoordId().equals(coord.getId()); + } + }); + + BundleActionBean mybundleAction = jpaService.execute(new BundleActionGetJPAExecutor(bundle.getId(), "coord1")); + assertEquals(coord.getId(), mybundleAction.getCoordId()); + } /** * Tests functionality of the Recovery Service Runnable command. </p> Insert a coordinator job with RUNNING and http://git-wip-us.apache.org/repos/asf/oozie/blob/67e54f61/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java b/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java index 80a7048..d5c2c4b 100644 --- a/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java +++ b/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java @@ -1034,17 +1034,19 @@ public class TestStatusTransitService extends XDataTestCase { assertNotNull(jpaService); final String bundleId = bundleJob.getId(); - // Add a bundle action with no coordinator to make it fail - addRecordToBundleActionTable(bundleId, null, 0, Job.Status.KILLED); - addRecordToBundleActionTable(bundleId, "action2", 0, Job.Status.RUNNING); String currentDatePlusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1); Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth); Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth); - addRecordToCoordJobTableWithBundle(bundleId, "action2", CoordinatorJob.Status.RUNNING, start, end, true, true, 2); + CoordinatorJobBean coord = addRecordToCoordJobTableWithBundle(bundleId, "action2", + CoordinatorJob.Status.RUNNING, start, end, true, true, 2); addRecordToCoordActionTable("action2", 1, CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0); + // Add a bundle action with no coordinator to make it fail + addRecordToBundleActionTable(bundleId, null, 0, Job.Status.KILLED); + addRecordToBundleActionTable(bundleId, coord.getId(), "action2", 0, Job.Status.RUNNING); + Runnable runnable = new StatusTransitRunnable(); // first time, service will call bundle kill runnable.run(); http://git-wip-us.apache.org/repos/asf/oozie/blob/67e54f61/core/src/test/java/org/apache/oozie/test/XDataTestCase.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java index 0181d50..317885b 100644 --- a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java +++ b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java @@ -916,7 +916,7 @@ public abstract class XDataTestCase extends XHCatTestCase { */ protected BundleActionBean addRecordToBundleActionTable(String jobId, String coordName, int pending, Job.Status status) throws Exception { - BundleActionBean action = createBundleAction(jobId, coordName, coordName, pending, status); + BundleActionBean action = createBundleAction(jobId, null, coordName, pending, status); try { JPAService jpaService = Services.get().get(JPAService.class); http://git-wip-us.apache.org/repos/asf/oozie/blob/67e54f61/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index dee1cba..35726b9 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.1.0 release (trunk - unreleased) +OOZIE-1622 Multiple CoordSubmit for same bundle (shwethags via virag) OOZIE-1644 Default config from config-default.xml is not propagated to actions (mona) OOZIE-1645 Oozie upgrade DB command fails due to missing dependencies for mssql (omaliuvanchuk via rkanter) OOZIE-1668 Coord log streaming start and end time should be of action list start and end time (puru via rohini)
