Updated Branches:
  refs/heads/master 605ed64f7 -> 67e54f610

OOZIE-1622 Multiple CoordSubmit for same bundle (shwethags via virag)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/67e54f61
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/67e54f61
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/67e54f61

Branch: refs/heads/master
Commit: 67e54f61097f2be5f9efd487a5b54de2f4dd4613
Parents: 605ed64
Author: Virag Kothari <[email protected]>
Authored: Wed Jan 29 12:42:18 2014 -0800
Committer: Virag Kothari <[email protected]>
Committed: Wed Jan 29 12:42:18 2014 -0800

----------------------------------------------------------------------
 .../apache/oozie/service/RecoveryService.java   |  25 ++--
 .../command/bundle/TestBundleRerunXCommand.java |  40 +++----
 .../oozie/service/TestRecoveryService.java      | 120 ++++++++++++++++---
 .../oozie/service/TestStatusTransitService.java |  10 +-
 .../org/apache/oozie/test/XDataTestCase.java    |   2 +-
 release-log.txt                                 |   1 +
 6 files changed, 144 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/67e54f61/core/src/main/java/org/apache/oozie/service/RecoveryService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/RecoveryService.java 
b/core/src/main/java/org/apache/oozie/service/RecoveryService.java
index c3ac6b3..2749bc4 100644
--- a/core/src/main/java/org/apache/oozie/service/RecoveryService.java
+++ b/core/src/main/java/org/apache/oozie/service/RecoveryService.java
@@ -179,31 +179,28 @@ public class RecoveryService implements Service {
                 try {
                     Services.get().get(InstrumentationService.class).get()
                             .incr(INSTRUMENTATION_GROUP, 
INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER, 1);
-                    if (baction.getCoordId() == null) {
+                    if (baction.getCoordId() == null && baction.getStatus() != 
Job.Status.PREP) {
                         log.error("CoordId is null for Bundle action " + 
baction.getBundleActionId());
                         continue;
                     }
                     if 
(Services.get().get(JobsConcurrencyService.class).isJobIdForThisServer(baction.getCoordId()))
 {
-                        if (baction.getStatus() == Job.Status.PREP) {
+                        if (baction.getStatus() == Job.Status.PREP && 
baction.getCoordId() == null) {
                             BundleJobBean bundleJob = null;
                             if (jpaService != null) {
                                 bundleJob = 
BundleJobQueryExecutor.getInstance().get(
                                         
BundleJobQuery.GET_BUNDLE_JOB_ID_JOBXML_CONF, baction.getBundleId());
                             }
-                            if (bundleJob != null) {
-                                Element bAppXml = 
XmlUtils.parseXml(bundleJob.getJobXml());
-                                List<Element> coordElems = 
bAppXml.getChildren("coordinator", bAppXml.getNamespace());
-                                for (Element coordElem : coordElems) {
-                                    Attribute name = 
coordElem.getAttribute("name");
-                                    if 
(name.getValue().equals(baction.getCoordName())) {
-                                        Configuration coordConf = 
mergeConfig(coordElem, bundleJob);
-                                        coordConf.set(OozieClient.BUNDLE_ID, 
baction.getBundleId());
-                                        queueCallable(new 
CoordSubmitXCommand(coordConf,
-                                                bundleJob.getId(), 
name.getValue()));
-                                    }
+                            Element bAppXml = 
XmlUtils.parseXml(bundleJob.getJobXml());
+                            List<Element> coordElems = 
bAppXml.getChildren("coordinator", bAppXml.getNamespace());
+                            for (Element coordElem : coordElems) {
+                                Attribute name = 
coordElem.getAttribute("name");
+                                if 
(name.getValue().equals(baction.getCoordName())) {
+                                    Configuration coordConf = 
mergeConfig(coordElem, bundleJob);
+                                    coordConf.set(OozieClient.BUNDLE_ID, 
baction.getBundleId());
+                                    queueCallable(new 
CoordSubmitXCommand(coordConf,
+                                            bundleJob.getId(), 
name.getValue()));
                                 }
                             }
-
                         }
                         else if (baction.getStatus() == Job.Status.KILLED) {
                             queueCallable(new 
CoordKillXCommand(baction.getCoordId()));

http://git-wip-us.apache.org/repos/asf/oozie/blob/67e54f61/core/src/test/java/org/apache/oozie/command/bundle/TestBundleRerunXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/command/bundle/TestBundleRerunXCommand.java
 
b/core/src/test/java/org/apache/oozie/command/bundle/TestBundleRerunXCommand.java
index 178171e..150fe82 100644
--- 
a/core/src/test/java/org/apache/oozie/command/bundle/TestBundleRerunXCommand.java
+++ 
b/core/src/test/java/org/apache/oozie/command/bundle/TestBundleRerunXCommand.java
@@ -58,10 +58,10 @@ public class TestBundleRerunXCommand extends XDataTestCase {
      */
     public void testBundleRerun1() throws Exception {
         BundleJobBean job = 
this.addRecordToBundleJobTable(Job.Status.SUCCEEDED, false);
-        this.addRecordToBundleActionTable(job.getId(), "action1", 0, 
Job.Status.SUCCEEDED);
-        this.addRecordToBundleActionTable(job.getId(), "action2", 0, 
Job.Status.SUCCEEDED);
-        addRecordToCoordJobTable("action1", CoordinatorJob.Status.SUCCEEDED, 
false, false);
-        addRecordToCoordJobTable("action2", CoordinatorJob.Status.SUCCEEDED, 
false, false);
+        CoordinatorJobBean coord1 = addRecordToCoordJobTable("action1", 
CoordinatorJob.Status.SUCCEEDED, false, false);
+        CoordinatorJobBean coord2 = addRecordToCoordJobTable("action2", 
CoordinatorJob.Status.SUCCEEDED, false, false);
+        this.addRecordToBundleActionTable(job.getId(), coord1.getId(), 
"action1", 0, Job.Status.SUCCEEDED);
+        this.addRecordToBundleActionTable(job.getId(), coord2.getId(), 
"action2", 0, Job.Status.SUCCEEDED);
 
         JPAService jpaService = Services.get().get(JPAService.class);
         assertNotNull(jpaService);
@@ -83,10 +83,10 @@ public class TestBundleRerunXCommand extends XDataTestCase {
      */
     public void testBundleRerun2() throws Exception {
         BundleJobBean job = 
this.addRecordToBundleJobTable(Job.Status.SUCCEEDED, false);
-        this.addRecordToBundleActionTable(job.getId(), "action1", 0, 
Job.Status.SUCCEEDED);
-        this.addRecordToBundleActionTable(job.getId(), "action2", 0, 
Job.Status.SUCCEEDED);
-        addRecordToCoordJobTable("action1", CoordinatorJob.Status.SUCCEEDED, 
false, false);
-        addRecordToCoordJobTable("action2", CoordinatorJob.Status.SUCCEEDED, 
false, false);
+        CoordinatorJobBean coord1 = addRecordToCoordJobTable("action1", 
CoordinatorJob.Status.SUCCEEDED, false, false);
+        CoordinatorJobBean coord2 = addRecordToCoordJobTable("action2", 
CoordinatorJob.Status.SUCCEEDED, false, false);
+        this.addRecordToBundleActionTable(job.getId(), coord1.getId(), 
"action1", 0, Job.Status.SUCCEEDED);
+        this.addRecordToBundleActionTable(job.getId(), coord2.getId(), 
"action2", 0, Job.Status.SUCCEEDED);
 
         JPAService jpaService = Services.get().get(JPAService.class);
         assertNotNull(jpaService);
@@ -136,10 +136,10 @@ public class TestBundleRerunXCommand extends 
XDataTestCase {
         services = new Services();
         services.init();
         BundleJobBean job = 
this.addRecordToBundleJobTable(Job.Status.DONEWITHERROR, false);
-        this.addRecordToBundleActionTable(job.getId(), "action1", 0, 
Job.Status.SUCCEEDED);
-        this.addRecordToBundleActionTable(job.getId(), "action2", 0, 
Job.Status.FAILED);
-        addRecordToCoordJobTable("action1", CoordinatorJob.Status.SUCCEEDED, 
false, false);
-        addRecordToCoordJobTable("action2", CoordinatorJob.Status.FAILED, 
false, false);
+        CoordinatorJobBean coord1 = addRecordToCoordJobTable("action1", 
CoordinatorJob.Status.SUCCEEDED, false, false);
+        CoordinatorJobBean coord2 = addRecordToCoordJobTable("action2", 
CoordinatorJob.Status.FAILED, false, false);
+        this.addRecordToBundleActionTable(job.getId(), coord1.getId(), 
"action1", 0, Job.Status.SUCCEEDED);
+        this.addRecordToBundleActionTable(job.getId(), coord2.getId(), 
"action2", 0, Job.Status.FAILED);
 
         JPAService jpaService = Services.get().get(JPAService.class);
         assertNotNull(jpaService);
@@ -244,10 +244,10 @@ public class TestBundleRerunXCommand extends 
XDataTestCase {
      */
     public void testBundleRerunInSuspended() throws Exception {
         BundleJobBean job = 
this.addRecordToBundleJobTable(Job.Status.SUSPENDED, false);
-        this.addRecordToBundleActionTable(job.getId(), "action1", 0, 
Job.Status.SUSPENDED);
-        this.addRecordToBundleActionTable(job.getId(), "action2", 0, 
Job.Status.SUSPENDED);
-        addRecordToCoordJobTable("action1", CoordinatorJob.Status.SUSPENDED, 
false, false);
-        addRecordToCoordJobTable("action2", CoordinatorJob.Status.SUSPENDED, 
false, false);
+        CoordinatorJobBean coord1 = addRecordToCoordJobTable("action1", 
CoordinatorJob.Status.SUSPENDED, false, false);
+        CoordinatorJobBean coord2 = addRecordToCoordJobTable("action2", 
CoordinatorJob.Status.SUSPENDED, false, false);
+        this.addRecordToBundleActionTable(job.getId(), coord1.getId(), 
"action1", 0, Job.Status.SUSPENDED);
+        this.addRecordToBundleActionTable(job.getId(), coord2.getId(), 
"action2", 0, Job.Status.SUSPENDED);
 
         JPAService jpaService = Services.get().get(JPAService.class);
         assertNotNull(jpaService);
@@ -272,10 +272,10 @@ public class TestBundleRerunXCommand extends 
XDataTestCase {
         services = new Services();
         services.init();
         BundleJobBean job = 
this.addRecordToBundleJobTable(Job.Status.SUSPENDEDWITHERROR, false);
-        this.addRecordToBundleActionTable(job.getId(), "action1", 0, 
Job.Status.SUSPENDED);
-        this.addRecordToBundleActionTable(job.getId(), "action2", 0, 
Job.Status.SUSPENDEDWITHERROR);
-        addRecordToCoordJobTable("action1", CoordinatorJob.Status.SUSPENDED, 
false, false);
-        addRecordToCoordJobTable("action2", 
CoordinatorJob.Status.SUSPENDEDWITHERROR, false, false);
+        CoordinatorJobBean coord1 = addRecordToCoordJobTable("action1", 
CoordinatorJob.Status.SUSPENDED, false, false);
+        CoordinatorJobBean coord2 = addRecordToCoordJobTable("action2", 
CoordinatorJob.Status.SUSPENDEDWITHERROR, false, false);
+        this.addRecordToBundleActionTable(job.getId(), coord1.getId(), 
"action1", 0, Job.Status.SUSPENDED);
+        this.addRecordToBundleActionTable(job.getId(), coord2.getId(), 
"action2", 0, Job.Status.SUSPENDEDWITHERROR);
 
         JPAService jpaService = Services.get().get(JPAService.class);
         assertNotNull(jpaService);

http://git-wip-us.apache.org/repos/asf/oozie/blob/67e54f61/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java 
b/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
index 543a8cf..c0eb829 100644
--- a/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
@@ -17,20 +17,6 @@
  */
 package org.apache.oozie.service;
 
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
-import java.io.Reader;
-import java.io.StringReader;
-import java.io.Writer;
-import java.net.URI;
-import java.util.Collection;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -38,6 +24,8 @@ import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.RunningJob;
+import org.apache.oozie.BundleActionBean;
+import org.apache.oozie.BundleJobBean;
 import org.apache.oozie.CoordinatorActionBean;
 import org.apache.oozie.CoordinatorEngine;
 import org.apache.oozie.CoordinatorJobBean;
@@ -50,17 +38,20 @@ import 
org.apache.oozie.action.hadoop.MapReduceActionExecutor;
 import org.apache.oozie.action.hadoop.MapperReducerForTest;
 import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.CoordinatorJob.Execution;
+import org.apache.oozie.client.Job;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
-import org.apache.oozie.client.CoordinatorJob.Execution;
 import org.apache.oozie.command.wf.ActionXCommand;
 import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext;
 import org.apache.oozie.coord.CoordELFunctions;
 import org.apache.oozie.dependency.FSURIHandler;
 import org.apache.oozie.dependency.HCatURIHandler;
+import org.apache.oozie.executor.jpa.BundleActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowActionInsertJPAExecutor;
@@ -78,6 +69,20 @@ import org.apache.oozie.util.XLog;
 import org.apache.oozie.util.XmlUtils;
 import org.apache.oozie.workflow.WorkflowInstance;
 
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.io.Reader;
+import java.io.StringReader;
+import java.io.Writer;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
 public class TestRecoveryService extends XDataTestCase {
     private Services services;
     private String server;
@@ -245,6 +250,91 @@ public class TestRecoveryService extends XDataTestCase {
         assertTrue(LauncherMapperHelper.hasIdSwap(actionData));
     }
 
+    /**
+     * If the bundle action is in PREP state and coord is not yet created, 
recovery should submit new coord
+     * @throws Exception
+     */
+    public void testBundleRecoveryCoordCreate() throws Exception {
+        CoordinatorStore store = 
Services.get().get(StoreService.class).getStore(CoordinatorStore.class);
+        final BundleActionBean bundleAction;
+        final BundleJobBean bundle;
+        store.beginTrx();
+        try {
+            bundle = addRecordToBundleJobTable(Job.Status.RUNNING, false);
+            bundleAction = addRecordToBundleActionTable(bundle.getId(), 
"coord1", 1, Job.Status.PREP);
+            store.commitTrx();
+        }
+        finally {
+            store.closeTrx();
+        }
+        final JPAService jpaService = Services.get().get(JPAService.class);
+
+        sleep(3000);
+        Runnable recoveryRunnable = new RecoveryRunnable(0, 1,1);
+        recoveryRunnable.run();
+
+        waitFor(10000, new Predicate() {
+            public boolean evaluate() throws Exception {
+                BundleActionBean mybundleAction =
+                        jpaService.execute(new 
BundleActionGetJPAExecutor(bundle.getId(), "coord1"));
+                try {
+                    if (mybundleAction.getCoordId() != null) {
+                        CoordinatorJobBean coord = jpaService.execute(new 
CoordJobGetJPAExecutor(mybundleAction.getCoordId()));
+                        return true;
+                    }
+                } catch (Exception e) {
+                }
+                return false;
+            }
+        });
+
+        BundleActionBean mybundleAction = jpaService.execute(new 
BundleActionGetJPAExecutor(bundle.getId(), "coord1"));
+        assertNotNull(mybundleAction.getCoordId());
+
+        try {
+            jpaService.execute(new 
CoordJobGetJPAExecutor(mybundleAction.getCoordId()));
+        } catch(Exception e) {
+            e.printStackTrace();
+            fail("Expected coord " + mybundleAction.getCoordId() + " to be 
created");
+        }
+    }
+
+    /**
+     * If the bundle action is in PREP state and coord is already created, 
recovery should not submit new coord
+     * @throws Exception
+     */
+    public void testBundleRecoveryCoordExists() throws Exception {
+        CoordinatorStore store = 
Services.get().get(StoreService.class).getStore(CoordinatorStore.class);
+        final BundleActionBean bundleAction;
+        final BundleJobBean bundle;
+        final CoordinatorJob coord;
+        store.beginTrx();
+        try {
+            bundle = addRecordToBundleJobTable(Job.Status.RUNNING, false);
+            coord = addRecordToCoordJobTable(Job.Status.PREP, false, false);
+            bundleAction = addRecordToBundleActionTable(bundle.getId(), 
coord.getId(), "coord1", 1, Job.Status.PREP);
+            store.commitTrx();
+        }
+        finally {
+            store.closeTrx();
+        }
+        final JPAService jpaService = Services.get().get(JPAService.class);
+
+        sleep(3000);
+        Runnable recoveryRunnable = new RecoveryRunnable(0, 1,1);
+        recoveryRunnable.run();
+
+        waitFor(3000, new Predicate() {
+            public boolean evaluate() throws Exception {
+                BundleActionBean mybundleAction =
+                        jpaService.execute(new 
BundleActionGetJPAExecutor(bundle.getId(), "coord1"));
+                return !mybundleAction.getCoordId().equals(coord.getId());
+            }
+        });
+
+        BundleActionBean mybundleAction = jpaService.execute(new 
BundleActionGetJPAExecutor(bundle.getId(), "coord1"));
+        assertEquals(coord.getId(), mybundleAction.getCoordId());
+    }
 
     /**
      * Tests functionality of the Recovery Service Runnable command. </p> 
Insert a coordinator job with RUNNING and

http://git-wip-us.apache.org/repos/asf/oozie/blob/67e54f61/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java 
b/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java
index 80a7048..d5c2c4b 100644
--- a/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java
@@ -1034,17 +1034,19 @@ public class TestStatusTransitService extends 
XDataTestCase {
         assertNotNull(jpaService);
 
         final String bundleId = bundleJob.getId();
-        // Add a bundle action with no coordinator to make it fail
-        addRecordToBundleActionTable(bundleId, null, 0, Job.Status.KILLED);
-        addRecordToBundleActionTable(bundleId, "action2", 0, 
Job.Status.RUNNING);
 
         String currentDatePlusMonth = 
XDataTestCase.getCurrentDateafterIncrementingInMonths(1);
         Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth);
         Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth);
 
-        addRecordToCoordJobTableWithBundle(bundleId, "action2", 
CoordinatorJob.Status.RUNNING, start, end, true, true, 2);
+        CoordinatorJobBean coord = 
addRecordToCoordJobTableWithBundle(bundleId, "action2",
+                CoordinatorJob.Status.RUNNING, start, end, true, true, 2);
         addRecordToCoordActionTable("action2", 1, 
CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0);
 
+        // Add a bundle action with no coordinator to make it fail
+        addRecordToBundleActionTable(bundleId, null, 0, Job.Status.KILLED);
+        addRecordToBundleActionTable(bundleId, coord.getId(), "action2", 0, 
Job.Status.RUNNING);
+
         Runnable runnable = new StatusTransitRunnable();
         // first time, service will call bundle kill
         runnable.run();

http://git-wip-us.apache.org/repos/asf/oozie/blob/67e54f61/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java 
b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
index 0181d50..317885b 100644
--- a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
+++ b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
@@ -916,7 +916,7 @@ public abstract class XDataTestCase extends XHCatTestCase {
      */
     protected BundleActionBean addRecordToBundleActionTable(String jobId, 
String coordName, int pending,
             Job.Status status) throws Exception {
-        BundleActionBean action = createBundleAction(jobId, coordName, 
coordName, pending, status);
+        BundleActionBean action = createBundleAction(jobId, null, coordName, 
pending, status);
 
         try {
             JPAService jpaService = Services.get().get(JPAService.class);

http://git-wip-us.apache.org/repos/asf/oozie/blob/67e54f61/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index dee1cba..35726b9 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1622 Multiple CoordSubmit for same bundle (shwethags via virag)
 OOZIE-1644 Default config from config-default.xml is not propagated to actions 
(mona)
 OOZIE-1645 Oozie upgrade DB command fails due to missing dependencies for 
mssql (omaliuvanchuk via rkanter)
 OOZIE-1668 Coord log streaming start and end time should be of action list 
start and end time (puru via rohini)

Reply via email to