Repository: oozie
Updated Branches:
  refs/heads/master f83f484a1 -> 41312e956


OOZIE-2522 There can be multiple coord submit from bundle in case of ZK glitch


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/41312e95
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/41312e95
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/41312e95

Branch: refs/heads/master
Commit: 41312e9564390bc499e76c8770c9b5d4c1be02d5
Parents: f83f484
Author: Purshotam Shah <[email protected]>
Authored: Tue Jun 7 09:59:01 2016 -0700
Committer: Purshotam Shah <[email protected]>
Committed: Tue Jun 7 09:59:01 2016 -0700

----------------------------------------------------------------------
 .../bundle/BundleCoordSubmitXCommand.java       | 77 ++++++++++++++++++++
 .../command/bundle/BundleStartXCommand.java     |  3 +-
 .../command/coord/CoordSubmitXCommand.java      |  6 +-
 .../apache/oozie/service/RecoveryService.java   | 65 +++++++++++------
 .../apache/oozie/service/ZKLocksService.java    |  3 +-
 .../bundle/TestBundleSubmitXCommand.java        | 44 ++++++++++-
 .../oozie/service/TestRecoveryService.java      | 34 +++++++++
 release-log.txt                                 |  1 +
 8 files changed, 204 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/41312e95/core/src/main/java/org/apache/oozie/command/bundle/BundleCoordSubmitXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/command/bundle/BundleCoordSubmitXCommand.java
 
b/core/src/main/java/org/apache/oozie/command/bundle/BundleCoordSubmitXCommand.java
new file mode 100644
index 0000000..1376e5e
--- /dev/null
+++ 
b/core/src/main/java/org/apache/oozie/command/bundle/BundleCoordSubmitXCommand.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.bundle;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.XException;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.coord.CoordSubmitXCommand;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
+
+public class BundleCoordSubmitXCommand extends CoordSubmitXCommand {
+
+    private String coordId;
+
+    public BundleCoordSubmitXCommand(Configuration conf, String bundleId, 
String coordName) {
+        super(conf, bundleId, coordName);
+    }
+
+    @Override
+    public String getEntityKey() {
+        return bundleId + "_" + coordName;
+    }
+
+    @Override
+    protected boolean isLockRequired() {
+        return true;
+    }
+
+    @Override
+    protected void verifyPrecondition() throws CommandException {
+        super.verifyPrecondition();
+        if (coordId != null) {
+            LOG.warn("Coord [{0}] is already submitted for bundle [{1}]", 
coordId, bundleId);
+            throw new CommandException(ErrorCode.E1304, coordName);
+        }
+    }
+
+    protected void loadState() throws CommandException {
+        super.loadState();
+        try {
+            CoordinatorJobBean coordJobs = 
CoordJobQueryExecutor.getInstance().getIfExist(
+                    CoordJobQuery.GET_COORD_JOBS_FOR_BUNDLE_BY_APPNAME_ID, 
coordName, bundleId);
+
+            if (coordJobs != null) {
+                coordId = coordJobs.getId();
+            }
+        }
+        catch (XException ex) {
+            throw new CommandException(ex);
+        }
+    }
+
+    @Override
+    public String getKey() {
+        return getName() + "_" + getEntityKey();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/41312e95/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java 
b/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java
index 27ae4a4..cc98a6d 100644
--- 
a/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java
+++ 
b/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java
@@ -38,7 +38,6 @@ import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.PreconditionException;
 import org.apache.oozie.command.StartTransitionXCommand;
-import org.apache.oozie.command.coord.CoordSubmitXCommand;
 import org.apache.oozie.executor.jpa.BatchQueryExecutor;
 import org.apache.oozie.executor.jpa.BundleJobQueryExecutor;
 import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery;
@@ -266,7 +265,7 @@ public class BundleStartXCommand extends 
StartTransitionXCommand {
                         throw new CommandException(ErrorCode.E1321, 
e.getMessage(), e);
 
                     }
-                    queue(new CoordSubmitXCommand(coordConf, 
bundleJob.getId(), name.getValue()));
+                    queue(new BundleCoordSubmitXCommand(coordConf, 
bundleJob.getId(), name.getValue()));
 
                 }
                 updateBundleAction();

http://git-wip-us.apache.org/repos/asf/oozie/blob/41312e95/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java 
b/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java
index f1f9ab2..969336d 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java
@@ -100,8 +100,8 @@ import org.xml.sax.SAXException;
 public class CoordSubmitXCommand extends SubmitTransitionXCommand {
 
     protected Configuration conf;
-    private final String bundleId;
-    private final String coordName;
+    protected final String bundleId;
+    protected final String coordName;
     protected boolean dryrun;
     protected JPAService jpaService = null;
     private CoordinatorJob.Status prevStatus = CoordinatorJob.Status.PREP;
@@ -179,7 +179,7 @@ public class CoordSubmitXCommand extends 
SubmitTransitionXCommand {
      * @param bundleId : bundle id
      * @param coordName : coord name
      */
-    public CoordSubmitXCommand(Configuration conf, String bundleId, String 
coordName) {
+    protected CoordSubmitXCommand(Configuration conf, String bundleId, String 
coordName) {
         super("coord_submit", "coord_submit", 1);
         this.conf = ParamChecker.notNull(conf, "conf");
         this.bundleId = ParamChecker.notEmpty(bundleId, "bundleId");

http://git-wip-us.apache.org/repos/asf/oozie/blob/41312e95/core/src/main/java/org/apache/oozie/service/RecoveryService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/RecoveryService.java 
b/core/src/main/java/org/apache/oozie/service/RecoveryService.java
index 49f47d0..abcb6a8 100644
--- a/core/src/main/java/org/apache/oozie/service/RecoveryService.java
+++ b/core/src/main/java/org/apache/oozie/service/RecoveryService.java
@@ -37,13 +37,14 @@ import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.client.Job;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.bundle.BundleCoordSubmitXCommand;
+import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
 import org.apache.oozie.command.coord.CoordActionInputCheckXCommand;
 import org.apache.oozie.command.coord.CoordActionReadyXCommand;
 import org.apache.oozie.command.coord.CoordActionStartXCommand;
 import org.apache.oozie.command.coord.CoordKillXCommand;
 import org.apache.oozie.command.coord.CoordPushDependencyCheckXCommand;
 import org.apache.oozie.command.coord.CoordResumeXCommand;
-import org.apache.oozie.command.coord.CoordSubmitXCommand;
 import org.apache.oozie.command.coord.CoordSuspendXCommand;
 import org.apache.oozie.command.wf.ActionEndXCommand;
 import org.apache.oozie.command.wf.ActionStartXCommand;
@@ -195,30 +196,50 @@ public class RecoveryService implements Service {
                     }
                     if 
(Services.get().get(JobsConcurrencyService.class).isJobIdForThisServer(baction.getBundleId()))
 {
                         if (baction.getStatus() == Job.Status.PREP && 
baction.getCoordId() == null) {
-                            BundleJobBean bundleJob = null;
-                            if (jpaService != null) {
-                                bundleJob = 
BundleJobQueryExecutor.getInstance().get(
-                                        
BundleJobQuery.GET_BUNDLE_JOB_ID_JOBXML_CONF, baction.getBundleId());
-                            }
-                            Element bAppXml = 
XmlUtils.parseXml(bundleJob.getJobXml());
-                            List<Element> coordElems = 
bAppXml.getChildren("coordinator", bAppXml.getNamespace());
-                            for (Element coordElem : coordElems) {
-                                Attribute name = 
coordElem.getAttribute("name");
-                                String coordName=name.getValue();
-                                Configuration coordConf = 
mergeConfig(coordElem, bundleJob);
-                                try {
-                                    coordName = 
ELUtils.resolveAppName(coordName, coordConf);
-                                }
-                                catch (Exception e) {
-                                    log.error("Error evaluating coord name " + 
e.getMessage(), e);
-                                    continue;
+
+                            CoordinatorJobBean coordJobs = 
CoordJobQueryExecutor.getInstance().getIfExist(
+                                    
CoordJobQuery.GET_COORD_JOBS_FOR_BUNDLE_BY_APPNAME_ID, baction.getCoordName(),
+                                    baction.getBundleId());
+
+                            if (coordJobs == null) {
+                                log.debug("Coord [{0}] for bundle [{1}] is not 
yet submitted , submitting new one",
+                                        baction.getCoordName(), 
baction.getBundleId());
+
+                                BundleJobBean bundleJob = null;
+                                if (jpaService != null) {
+                                    bundleJob = 
BundleJobQueryExecutor.getInstance().get(
+                                            
BundleJobQuery.GET_BUNDLE_JOB_ID_JOBXML_CONF, baction.getBundleId());
                                 }
-                                if (coordName.equals(baction.getCoordName())) {
-                                    coordConf.set(OozieClient.BUNDLE_ID, 
baction.getBundleId());
-                                    queueCallable(new 
CoordSubmitXCommand(coordConf,
-                                            bundleJob.getId(), coordName));
+                                Element bAppXml = 
XmlUtils.parseXml(bundleJob.getJobXml());
+                                @SuppressWarnings("unchecked")
+                                List<Element> coordElems = 
bAppXml.getChildren("coordinator", bAppXml.getNamespace());
+                                for (Element coordElem : coordElems) {
+                                    Attribute name = 
coordElem.getAttribute("name");
+                                    String coordName = name.getValue();
+                                    Configuration coordConf = 
mergeConfig(coordElem, bundleJob);
+                                    try {
+                                        coordName = 
ELUtils.resolveAppName(coordName, coordConf);
+                                    }
+                                    catch (Exception e) {
+                                        log.error("Error evaluating coord name 
" + e.getMessage(), e);
+                                        continue;
+                                    }
+                                    if 
(coordName.equals(baction.getCoordName())) {
+                                        coordConf.set(OozieClient.BUNDLE_ID, 
baction.getBundleId());
+                                        queueCallable(new 
BundleCoordSubmitXCommand(coordConf, bundleJob.getId(),
+                                                coordName));
+                                    }
                                 }
                             }
+                            else {
+                                log.debug(
+                                        "Coord [{0}] for bundle [{1}] is 
submitted , but bundle action is not updated.",
+                                        baction.getCoordName(), 
baction.getBundleId());
+                                coordJobs = 
CoordJobQueryExecutor.getInstance().getIfExist(
+                                        
CoordJobQuery.GET_COORD_JOB_SUSPEND_KILL, baction.getCoordName(),
+                                        coordJobs.getId());
+                                queueCallable(new 
BundleStatusUpdateXCommand(coordJobs, baction.getStatus()));
+                            }
                         }
                         else if (baction.getStatus() == Job.Status.KILLED) {
                             queueCallable(new 
CoordKillXCommand(baction.getCoordId()));

http://git-wip-us.apache.org/repos/asf/oozie/blob/41312e95/core/src/main/java/org/apache/oozie/service/ZKLocksService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/ZKLocksService.java 
b/core/src/main/java/org/apache/oozie/service/ZKLocksService.java
index 35fc8a6..952b90d 100644
--- a/core/src/main/java/org/apache/oozie/service/ZKLocksService.java
+++ b/core/src/main/java/org/apache/oozie/service/ZKLocksService.java
@@ -173,7 +173,8 @@ public class ZKLocksService extends MemoryLocksService 
implements Service, Instr
             }
         }
         catch (Exception ex) {
-            throw new RuntimeException(ex);
+            //Not throwing exception. Should return null, so that command can 
be requeued
+            LOG.error("Error while acquiring lock", ex);
         }
         return token;
     }

http://git-wip-us.apache.org/repos/asf/oozie/blob/41312e95/core/src/test/java/org/apache/oozie/command/bundle/TestBundleSubmitXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/command/bundle/TestBundleSubmitXCommand.java
 
b/core/src/test/java/org/apache/oozie/command/bundle/TestBundleSubmitXCommand.java
index b883dc2..429eb0d 100644
--- 
a/core/src/test/java/org/apache/oozie/command/bundle/TestBundleSubmitXCommand.java
+++ 
b/core/src/test/java/org/apache/oozie/command/bundle/TestBundleSubmitXCommand.java
@@ -99,7 +99,7 @@ public class TestBundleSubmitXCommand extends XDataTestCase {
         final XConfiguration jobConf = setUpBundle();
         jobConf.set("coordName1", "coord1");
         jobConf.set("coordName2", "coord2");
-        jobConf.set("coord1.starttime","2009-02-01T00:00Z");
+        jobConf.set("coord1.starttime", "2009-02-01T00:00Z");
 
         BundleSubmitXCommand command = new BundleSubmitXCommand(jobConf);
         final BundleJobBean bundleBean = (BundleJobBean) command.getJob();
@@ -141,6 +141,47 @@ public class TestBundleSubmitXCommand extends 
XDataTestCase {
         }
     }
 
+    public void testMultipleCoordSubmit() throws Exception {
+        final XConfiguration jobConf = setUpBundle();
+        jobConf.set("coordName1", "coord1");
+        jobConf.set("coordName2", "coord2");
+        jobConf.set("coord1.starttime", "2009-02-01T00:00Z");
+
+        BundleSubmitXCommand command = new BundleSubmitXCommand(jobConf);
+        final BundleJobBean bundleBean = (BundleJobBean) command.getJob();
+        bundleBean.setStartTime(new Date());
+        bundleBean.setEndTime(new Date());
+        final String jobId = command.call();
+        sleep(2000);
+        new BundleStartXCommand(jobId).call();
+        waitFor(2000, new Predicate() {
+            public boolean evaluate() throws Exception {
+                List<BundleActionBean> actions = 
BundleActionQueryExecutor.getInstance().getList(
+                        
BundleActionQuery.GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE, jobId);
+                return actions.get(0).getStatus().equals(Job.Status.RUNNING);
+            }
+        });
+
+        List<BundleActionBean> actions = 
BundleActionQueryExecutor.getInstance().getList(
+                
BundleActionQuery.GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE, jobId);
+        assertEquals(actions.size(), 2);
+        assertEquals(actions.get(0).getCoordName(), "coord1");
+        assertEquals(actions.get(1).getCoordName(), "coord2");
+        try {
+            new BundleCoordSubmitXCommand(jobConf, jobId, "coord1").call();
+            fail("Should fail. Coord job is already created");
+        }
+        catch (CommandException e) {
+            assertEquals(e.getErrorCode(), ErrorCode.E1304);
+        }
+        actions = BundleActionQueryExecutor.getInstance().getList(
+                
BundleActionQuery.GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE, jobId);
+        assertEquals(actions.size(), 2);
+        assertEquals(actions.get(0).getStatusStr(), "RUNNING");
+        assertEquals(actions.get(1).getStatusStr(), "RUNNING");
+
+    }
+
     private XConfiguration setUpBundle() throws UnsupportedEncodingException, 
IOException {
         XConfiguration jobConf = new XConfiguration();
 
@@ -168,4 +209,5 @@ public class TestBundleSubmitXCommand extends XDataTestCase 
{
         return jobConf;
 
     }
+
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/41312e95/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java 
b/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
index eab177b..8fd0c2d 100644
--- a/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
@@ -50,11 +50,15 @@ import org.apache.oozie.coord.CoordELFunctions;
 import org.apache.oozie.dependency.FSURIHandler;
 import org.apache.oozie.dependency.HCatURIHandler;
 import org.apache.oozie.executor.jpa.BundleActionGetJPAExecutor;
+import org.apache.oozie.executor.jpa.BundleActionQueryExecutor;
+import 
org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery;
 import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
 import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowActionInsertJPAExecutor;
@@ -306,6 +310,36 @@ public class TestRecoveryService extends XDataTestCase {
         }
     }
 
+    public void testCoordCreateNotifyParentFailed() throws Exception {
+        final BundleActionBean bundleAction;
+        final BundleJobBean bundle;
+        bundle = addRecordToBundleJobTable(Job.Status.RUNNING, false);
+        bundleAction = addRecordToBundleActionTable(bundle.getId(), "coord1", 
1, Job.Status.PREP);
+
+        CoordinatorJobBean coordJob = 
addRecordToCoordJobTable(CoordinatorJob.Status.PREP, new Date(), new Date(),
+                false, false, 1);
+        coordJob.setBundleId(bundle.getId());
+        coordJob.setAppName("coord1");
+        
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB,
 coordJob);
+
+        assertNull(bundleAction.getCoordId());
+        sleep(3000);
+        Runnable recoveryRunnable = new RecoveryRunnable(0, 1, 1);
+        recoveryRunnable.run();
+
+        waitFor(10000, new Predicate() {
+            public boolean evaluate() throws Exception {
+                BundleActionBean mybundleAction = 
BundleActionQueryExecutor.getInstance().get(
+                        BundleActionQuery.GET_BUNDLE_ACTION, bundle.getId() + 
"_coord1");
+                return mybundleAction.getCoordId() != null;
+            }
+        });
+
+        BundleActionBean mybundleAction = 
BundleActionQueryExecutor.getInstance().get(
+                BundleActionQuery.GET_BUNDLE_ACTION, bundle.getId() + 
"_coord1");
+        assertNotNull(mybundleAction.getCoordId());
+    }
+
     /**
      * If the bundle action is in PREP state and coord is already created, 
recovery should not submit new coord
      * @throws Exception

http://git-wip-us.apache.org/repos/asf/oozie/blob/41312e95/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index feea868..33626b1 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.3.0 release (trunk - unreleased)
 
+OOZIE-2522 There can be multiple coord submit from bundle in case of ZK glitch 
(puru)
 OOZIE-2553 Cred tag is required for all actions in the workflow even if an 
action does not require it (me.venkatr via rohini)
 OOZIE-2503 show ChildJobURLs to spark action (satishsaley via puru)
 OOZIE-2551 Feature request: epoch timestamp generation (jtolar via puru)

Reply via email to