Repository: oozie Updated Branches: refs/heads/master f83f484a1 -> 41312e956
OOZIE-2522 There can be multiple coord submit from bundle in case of ZK glitch Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/41312e95 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/41312e95 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/41312e95 Branch: refs/heads/master Commit: 41312e9564390bc499e76c8770c9b5d4c1be02d5 Parents: f83f484 Author: Purshotam Shah <[email protected]> Authored: Tue Jun 7 09:59:01 2016 -0700 Committer: Purshotam Shah <[email protected]> Committed: Tue Jun 7 09:59:01 2016 -0700 ---------------------------------------------------------------------- .../bundle/BundleCoordSubmitXCommand.java | 77 ++++++++++++++++++++ .../command/bundle/BundleStartXCommand.java | 3 +- .../command/coord/CoordSubmitXCommand.java | 6 +- .../apache/oozie/service/RecoveryService.java | 65 +++++++++++------ .../apache/oozie/service/ZKLocksService.java | 3 +- .../bundle/TestBundleSubmitXCommand.java | 44 ++++++++++- .../oozie/service/TestRecoveryService.java | 34 +++++++++ release-log.txt | 1 + 8 files changed, 204 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/41312e95/core/src/main/java/org/apache/oozie/command/bundle/BundleCoordSubmitXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/bundle/BundleCoordSubmitXCommand.java b/core/src/main/java/org/apache/oozie/command/bundle/BundleCoordSubmitXCommand.java new file mode 100644 index 0000000..1376e5e --- /dev/null +++ b/core/src/main/java/org/apache/oozie/command/bundle/BundleCoordSubmitXCommand.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.command.bundle; + +import org.apache.hadoop.conf.Configuration; +import org.apache.oozie.CoordinatorJobBean; +import org.apache.oozie.ErrorCode; +import org.apache.oozie.XException; +import org.apache.oozie.command.CommandException; +import org.apache.oozie.command.coord.CoordSubmitXCommand; +import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; +import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; + +public class BundleCoordSubmitXCommand extends CoordSubmitXCommand { + + private String coordId; + + public BundleCoordSubmitXCommand(Configuration conf, String bundleId, String coordName) { + super(conf, bundleId, coordName); + } + + @Override + public String getEntityKey() { + return bundleId + "_" + coordName; + } + + @Override + protected boolean isLockRequired() { + return true; + } + + @Override + protected void verifyPrecondition() throws CommandException { + super.verifyPrecondition(); + if (coordId != null) { + LOG.warn("Coord [{0}] is already submitted for bundle [{1}]", coordId, bundleId); + throw new CommandException(ErrorCode.E1304, coordName); + } + } + + protected void loadState() throws CommandException { + super.loadState(); + try { + CoordinatorJobBean coordJobs = CoordJobQueryExecutor.getInstance().getIfExist( + CoordJobQuery.GET_COORD_JOBS_FOR_BUNDLE_BY_APPNAME_ID, coordName, bundleId); + + if (coordJobs != null) { + coordId = coordJobs.getId(); + } + } + catch (XException ex) { + throw new CommandException(ex); + } + } + + @Override + public String getKey() { + return getName() + "_" + getEntityKey(); + } + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/41312e95/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java b/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java index 27ae4a4..cc98a6d 100644 --- a/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java @@ -38,7 +38,6 @@ import org.apache.oozie.client.rest.JsonBean; import org.apache.oozie.command.CommandException; import org.apache.oozie.command.PreconditionException; import org.apache.oozie.command.StartTransitionXCommand; -import org.apache.oozie.command.coord.CoordSubmitXCommand; import org.apache.oozie.executor.jpa.BatchQueryExecutor; import org.apache.oozie.executor.jpa.BundleJobQueryExecutor; import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery; @@ -266,7 +265,7 @@ public class BundleStartXCommand extends StartTransitionXCommand { throw new CommandException(ErrorCode.E1321, e.getMessage(), e); } - queue(new CoordSubmitXCommand(coordConf, bundleJob.getId(), name.getValue())); + queue(new BundleCoordSubmitXCommand(coordConf, bundleJob.getId(), name.getValue())); } updateBundleAction(); http://git-wip-us.apache.org/repos/asf/oozie/blob/41312e95/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java index f1f9ab2..969336d 100644 --- a/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java @@ -100,8 +100,8 @@ import org.xml.sax.SAXException; public class CoordSubmitXCommand extends SubmitTransitionXCommand { protected Configuration conf; - private final String bundleId; - private final String coordName; + protected final String bundleId; + protected final String coordName; protected boolean dryrun; protected JPAService jpaService = null; private CoordinatorJob.Status prevStatus = CoordinatorJob.Status.PREP; @@ -179,7 +179,7 @@ public class CoordSubmitXCommand extends SubmitTransitionXCommand { * @param bundleId : bundle id * @param coordName : coord name */ - public CoordSubmitXCommand(Configuration conf, String bundleId, String coordName) { + protected CoordSubmitXCommand(Configuration conf, String bundleId, String coordName) { super("coord_submit", "coord_submit", 1); this.conf = ParamChecker.notNull(conf, "conf"); this.bundleId = ParamChecker.notEmpty(bundleId, "bundleId"); http://git-wip-us.apache.org/repos/asf/oozie/blob/41312e95/core/src/main/java/org/apache/oozie/service/RecoveryService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/RecoveryService.java b/core/src/main/java/org/apache/oozie/service/RecoveryService.java index 49f47d0..abcb6a8 100644 --- a/core/src/main/java/org/apache/oozie/service/RecoveryService.java +++ b/core/src/main/java/org/apache/oozie/service/RecoveryService.java @@ -37,13 +37,14 @@ import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.client.Job; import org.apache.oozie.client.OozieClient; import org.apache.oozie.command.CommandException; +import org.apache.oozie.command.bundle.BundleCoordSubmitXCommand; +import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; import org.apache.oozie.command.coord.CoordActionInputCheckXCommand; import org.apache.oozie.command.coord.CoordActionReadyXCommand; import org.apache.oozie.command.coord.CoordActionStartXCommand; import org.apache.oozie.command.coord.CoordKillXCommand; import org.apache.oozie.command.coord.CoordPushDependencyCheckXCommand; import org.apache.oozie.command.coord.CoordResumeXCommand; -import org.apache.oozie.command.coord.CoordSubmitXCommand; import org.apache.oozie.command.coord.CoordSuspendXCommand; import org.apache.oozie.command.wf.ActionEndXCommand; import org.apache.oozie.command.wf.ActionStartXCommand; @@ -195,30 +196,50 @@ public class RecoveryService implements Service { } if (Services.get().get(JobsConcurrencyService.class).isJobIdForThisServer(baction.getBundleId())) { if (baction.getStatus() == Job.Status.PREP && baction.getCoordId() == null) { - BundleJobBean bundleJob = null; - if (jpaService != null) { - bundleJob = BundleJobQueryExecutor.getInstance().get( - BundleJobQuery.GET_BUNDLE_JOB_ID_JOBXML_CONF, baction.getBundleId()); - } - Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml()); - List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace()); - for (Element coordElem : coordElems) { - Attribute name = coordElem.getAttribute("name"); - String coordName=name.getValue(); - Configuration coordConf = mergeConfig(coordElem, bundleJob); - try { - coordName = ELUtils.resolveAppName(coordName, coordConf); - } - catch (Exception e) { - log.error("Error evaluating coord name " + e.getMessage(), e); - continue; + + CoordinatorJobBean coordJobs = CoordJobQueryExecutor.getInstance().getIfExist( + CoordJobQuery.GET_COORD_JOBS_FOR_BUNDLE_BY_APPNAME_ID, baction.getCoordName(), + baction.getBundleId()); + + if (coordJobs == null) { + log.debug("Coord [{0}] for bundle [{1}] is not yet submitted , submitting new one", + baction.getCoordName(), baction.getBundleId()); + + BundleJobBean bundleJob = null; + if (jpaService != null) { + bundleJob = BundleJobQueryExecutor.getInstance().get( + BundleJobQuery.GET_BUNDLE_JOB_ID_JOBXML_CONF, baction.getBundleId()); } - if (coordName.equals(baction.getCoordName())) { - coordConf.set(OozieClient.BUNDLE_ID, baction.getBundleId()); - queueCallable(new CoordSubmitXCommand(coordConf, - bundleJob.getId(), coordName)); + Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml()); + @SuppressWarnings("unchecked") + List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace()); + for (Element coordElem : coordElems) { + Attribute name = coordElem.getAttribute("name"); + String coordName = name.getValue(); + Configuration coordConf = mergeConfig(coordElem, bundleJob); + try { + coordName = ELUtils.resolveAppName(coordName, coordConf); + } + catch (Exception e) { + log.error("Error evaluating coord name " + e.getMessage(), e); + continue; + } + if (coordName.equals(baction.getCoordName())) { + coordConf.set(OozieClient.BUNDLE_ID, baction.getBundleId()); + queueCallable(new BundleCoordSubmitXCommand(coordConf, bundleJob.getId(), + coordName)); + } } } + else { + log.debug( + "Coord [{0}] for bundle [{1}] is submitted , but bundle action is not updated.", + baction.getCoordName(), baction.getBundleId()); + coordJobs = CoordJobQueryExecutor.getInstance().getIfExist( + CoordJobQuery.GET_COORD_JOB_SUSPEND_KILL, baction.getCoordName(), + coordJobs.getId()); + queueCallable(new BundleStatusUpdateXCommand(coordJobs, baction.getStatus())); + } } else if (baction.getStatus() == Job.Status.KILLED) { queueCallable(new CoordKillXCommand(baction.getCoordId())); http://git-wip-us.apache.org/repos/asf/oozie/blob/41312e95/core/src/main/java/org/apache/oozie/service/ZKLocksService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/ZKLocksService.java b/core/src/main/java/org/apache/oozie/service/ZKLocksService.java index 35fc8a6..952b90d 100644 --- a/core/src/main/java/org/apache/oozie/service/ZKLocksService.java +++ b/core/src/main/java/org/apache/oozie/service/ZKLocksService.java @@ -173,7 +173,8 @@ public class ZKLocksService extends MemoryLocksService implements Service, Instr } } catch (Exception ex) { - throw new RuntimeException(ex); + //Not throwing exception. Should return null, so that command can be requeued + LOG.error("Error while acquiring lock", ex); } return token; } http://git-wip-us.apache.org/repos/asf/oozie/blob/41312e95/core/src/test/java/org/apache/oozie/command/bundle/TestBundleSubmitXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/bundle/TestBundleSubmitXCommand.java b/core/src/test/java/org/apache/oozie/command/bundle/TestBundleSubmitXCommand.java index b883dc2..429eb0d 100644 --- a/core/src/test/java/org/apache/oozie/command/bundle/TestBundleSubmitXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/bundle/TestBundleSubmitXCommand.java @@ -99,7 +99,7 @@ public class TestBundleSubmitXCommand extends XDataTestCase { final XConfiguration jobConf = setUpBundle(); jobConf.set("coordName1", "coord1"); jobConf.set("coordName2", "coord2"); - jobConf.set("coord1.starttime","2009-02-01T00:00Z"); + jobConf.set("coord1.starttime", "2009-02-01T00:00Z"); BundleSubmitXCommand command = new BundleSubmitXCommand(jobConf); final BundleJobBean bundleBean = (BundleJobBean) command.getJob(); @@ -141,6 +141,47 @@ public class TestBundleSubmitXCommand extends XDataTestCase { } } + public void testMultipleCoordSubmit() throws Exception { + final XConfiguration jobConf = setUpBundle(); + jobConf.set("coordName1", "coord1"); + jobConf.set("coordName2", "coord2"); + jobConf.set("coord1.starttime", "2009-02-01T00:00Z"); + + BundleSubmitXCommand command = new BundleSubmitXCommand(jobConf); + final BundleJobBean bundleBean = (BundleJobBean) command.getJob(); + bundleBean.setStartTime(new Date()); + bundleBean.setEndTime(new Date()); + final String jobId = command.call(); + sleep(2000); + new BundleStartXCommand(jobId).call(); + waitFor(2000, new Predicate() { + public boolean evaluate() throws Exception { + List<BundleActionBean> actions = BundleActionQueryExecutor.getInstance().getList( + BundleActionQuery.GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE, jobId); + return actions.get(0).getStatus().equals(Job.Status.RUNNING); + } + }); + + List<BundleActionBean> actions = BundleActionQueryExecutor.getInstance().getList( + BundleActionQuery.GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE, jobId); + assertEquals(actions.size(), 2); + assertEquals(actions.get(0).getCoordName(), "coord1"); + assertEquals(actions.get(1).getCoordName(), "coord2"); + try { + new BundleCoordSubmitXCommand(jobConf, jobId, "coord1").call(); + fail("Should fail. Coord job is already created"); + } + catch (CommandException e) { + assertEquals(e.getErrorCode(), ErrorCode.E1304); + } + actions = BundleActionQueryExecutor.getInstance().getList( + BundleActionQuery.GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE, jobId); + assertEquals(actions.size(), 2); + assertEquals(actions.get(0).getStatusStr(), "RUNNING"); + assertEquals(actions.get(1).getStatusStr(), "RUNNING"); + + } + private XConfiguration setUpBundle() throws UnsupportedEncodingException, IOException { XConfiguration jobConf = new XConfiguration(); @@ -168,4 +209,5 @@ public class TestBundleSubmitXCommand extends XDataTestCase { return jobConf; } + } http://git-wip-us.apache.org/repos/asf/oozie/blob/41312e95/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java b/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java index eab177b..8fd0c2d 100644 --- a/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java +++ b/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java @@ -50,11 +50,15 @@ import org.apache.oozie.coord.CoordELFunctions; import org.apache.oozie.dependency.FSURIHandler; import org.apache.oozie.dependency.HCatURIHandler; import org.apache.oozie.executor.jpa.BundleActionGetJPAExecutor; +import org.apache.oozie.executor.jpa.BundleActionQueryExecutor; +import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery; import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor; import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor; import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; +import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; +import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; import org.apache.oozie.executor.jpa.JPAExecutorException; import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor; import org.apache.oozie.executor.jpa.WorkflowActionInsertJPAExecutor; @@ -306,6 +310,36 @@ public class TestRecoveryService extends XDataTestCase { } } + public void testCoordCreateNotifyParentFailed() throws Exception { + final BundleActionBean bundleAction; + final BundleJobBean bundle; + bundle = addRecordToBundleJobTable(Job.Status.RUNNING, false); + bundleAction = addRecordToBundleActionTable(bundle.getId(), "coord1", 1, Job.Status.PREP); + + CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, new Date(), new Date(), + false, false, 1); + coordJob.setBundleId(bundle.getId()); + coordJob.setAppName("coord1"); + CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJob); + + assertNull(bundleAction.getCoordId()); + sleep(3000); + Runnable recoveryRunnable = new RecoveryRunnable(0, 1, 1); + recoveryRunnable.run(); + + waitFor(10000, new Predicate() { + public boolean evaluate() throws Exception { + BundleActionBean mybundleAction = BundleActionQueryExecutor.getInstance().get( + BundleActionQuery.GET_BUNDLE_ACTION, bundle.getId() + "_coord1"); + return mybundleAction.getCoordId() != null; + } + }); + + BundleActionBean mybundleAction = BundleActionQueryExecutor.getInstance().get( + BundleActionQuery.GET_BUNDLE_ACTION, bundle.getId() + "_coord1"); + assertNotNull(mybundleAction.getCoordId()); + } + /** * If the bundle action is in PREP state and coord is already created, recovery should not submit new coord * @throws Exception http://git-wip-us.apache.org/repos/asf/oozie/blob/41312e95/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index feea868..33626b1 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.3.0 release (trunk - unreleased) +OOZIE-2522 There can be multiple coord submit from bundle in case of ZK glitch (puru) OOZIE-2553 Cred tag is required for all actions in the workflow even if an action does not require it (me.venkatr via rohini) OOZIE-2503 show ChildJobURLs to spark action (satishsaley via puru) OOZIE-2551 Feature request: epoch timestamp generation (jtolar via puru)
