Repository: oozie Updated Branches: refs/heads/branch-4.0 3817f3346 -> f3ab5da94
OOZIE-1632 Coordinators that undergo change endtime but are doneMaterialization, not getting picked for StatusTransit (mona) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/f3ab5da9 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/f3ab5da9 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/f3ab5da9 Branch: refs/heads/branch-4.0 Commit: f3ab5da942856b4d76234aa6755a66dc3ebd57d7 Parents: 3817f33 Author: Mona Chitnis <[email protected]> Authored: Fri Mar 21 13:50:49 2014 -0700 Committer: Mona Chitnis <[email protected]> Committed: Fri Mar 21 13:50:49 2014 -0700 ---------------------------------------------------------------------- .../org/apache/oozie/CoordinatorJobBean.java | 2 + .../command/coord/CoordChangeXCommand.java | 31 ++++--- .../jpa/CoordJobsGetChangedJPAExecutor.java | 60 ++++++++++++++ .../oozie/service/StatusTransitService.java | 5 +- .../command/coord/TestCoordChangeXCommand.java | 85 +++++++++++++++++++- release-log.txt | 1 + 6 files changed, 170 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/f3ab5da9/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java index d51bc73..3a8c769 100644 --- a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java +++ b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java @@ -53,6 +53,8 @@ import org.apache.openjpa.persistence.jdbc.Index; @NamedQuery(name = "GET_COORD_JOBS_PENDING", query = "select OBJECT(w) from CoordinatorJobBean w where w.pending = 1 order by w.lastModifiedTimestamp"), + @NamedQuery(name = "GET_COORD_JOBS_CHANGED", query = "select w.id from CoordinatorJobBean w where w.pending = 1 AND w.doneMaterialization = 1 AND w.lastModifiedTimestamp >= :lastModifiedTime"), + @NamedQuery(name = "GET_COORD_JOBS_COUNT", query = "select count(w) from CoordinatorJobBean w"), @NamedQuery(name = "GET_COORD_JOBS_COLUMNS", query = "select w.id, w.appName, w.status, w.user, w.group, w.startTimestamp, w.endTimestamp, w.appPath, w.concurrency, w.frequency, w.lastActionTimestamp, w.nextMaterializedTimestamp, w.createdTimestamp, w.timeUnitStr, w.timeZone, w.timeOut from CoordinatorJobBean w order by w.createdTimestamp desc"), http://git-wip-us.apache.org/repos/asf/oozie/blob/f3ab5da9/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java index 7b55634..fe4921a 100644 --- a/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java @@ -316,19 +316,25 @@ public class CoordChangeXCommand extends CoordinatorXCommand<Void> { try { if (newEndTime != null) { - coordJob.setEndTime(newEndTime); - if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED){ - coordJob.setStatus(CoordinatorJob.Status.RUNNING); - } - if (coordJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR - || coordJob.getStatus() == CoordinatorJob.Status.FAILED) { - // Check for backward compatibility for Oozie versions (3.2 and before) - // when RUNNINGWITHERROR, SUSPENDEDWITHERROR and - // PAUSEDWITHERROR is not supported - coordJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(CoordinatorJob.Status.RUNNINGWITHERROR)); + boolean dontChange = coordJob.getEndTime().before(newEndTime) + && coordJob.getNextMaterializedTime() != null + && coordJob.getNextMaterializedTime().after(newEndTime); + if (!dontChange) { + coordJob.setEndTime(newEndTime); + if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED) { + coordJob.setStatus(CoordinatorJob.Status.RUNNING); + } + if (coordJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR + || coordJob.getStatus() == CoordinatorJob.Status.FAILED) { + // Check for backward compatibility for Oozie versions (3.2 and before) + // when RUNNINGWITHERROR, SUSPENDEDWITHERROR and + // PAUSEDWITHERROR is not supported + coordJob.setStatus(StatusUtils + .getStatusIfBackwardSupportTrue(CoordinatorJob.Status.RUNNINGWITHERROR)); + } + coordJob.setPending(); + coordJob.resetDoneMaterialization(); } - coordJob.setPending(); - coordJob.resetDoneMaterialization(); } if (newConcurrency != null) { @@ -367,6 +373,7 @@ public class CoordChangeXCommand extends CoordinatorXCommand<Void> { coordJob.setDoneMaterialization(); } + coordJob.setLastModifiedTime(new Date()); updateList.add(coordJob); jpaService.execute(new BulkUpdateDeleteJPAExecutor(updateList, deleteList, false)); http://git-wip-us.apache.org/repos/asf/oozie/blob/f3ab5da9/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsGetChangedJPAExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsGetChangedJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsGetChangedJPAExecutor.java new file mode 100644 index 0000000..4e32250 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsGetChangedJPAExecutor.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.executor.jpa; + +import java.sql.Timestamp; +import java.util.Date; +import java.util.List; + +import javax.persistence.EntityManager; +import javax.persistence.Query; + +import org.apache.oozie.ErrorCode; + +/** + * Load the list of Coordinator jobs changed by CoordChangeXCommand + */ +public class CoordJobsGetChangedJPAExecutor implements JPAExecutor<List<String>> { + private Date d = null; + + public CoordJobsGetChangedJPAExecutor(Date d) { + this.d = d; + } + + @Override + public String getName() { + return "CoordJobsGetChangedJPAExecutor"; + } + + @Override + @SuppressWarnings("unchecked") + public List<String> execute(EntityManager em) throws JPAExecutorException { + try { + Query q = em.createNamedQuery("GET_COORD_JOBS_CHANGED"); + q.setParameter("lastModifiedTime", new Timestamp(d.getTime())); + List<String> coordJobIds = q.getResultList(); + return coordJobIds; + } + catch (Exception e) { + throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e); + } + + + } + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/f3ab5da9/core/src/main/java/org/apache/oozie/service/StatusTransitService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/StatusTransitService.java b/core/src/main/java/org/apache/oozie/service/StatusTransitService.java index f36d852..acbcfd5 100644 --- a/core/src/main/java/org/apache/oozie/service/StatusTransitService.java +++ b/core/src/main/java/org/apache/oozie/service/StatusTransitService.java @@ -46,6 +46,7 @@ import org.apache.oozie.executor.jpa.CoordJobGetActionsStatusJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobGetPendingActionsCountJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor; +import org.apache.oozie.executor.jpa.CoordJobsGetChangedJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobsGetPendingJPAExecutor; import org.apache.oozie.executor.jpa.JPAExecutorException; import org.apache.oozie.util.DateUtils; @@ -710,7 +711,7 @@ public class StatusTransitService implements Service { LOG.info("Running coordinator status service from last instance time = " + DateUtils.formatDateOozieTZ(lastInstanceStartTime)); // this is not the first instance, we should only check jobs - // that have actions been + // that have actions or jobs been // updated >= start time of last service run; List<String> coordJobIdList = jpaService .execute(new CoordActionsGetByLastModifiedTimeJPAExecutor(lastInstanceStartTime)); @@ -718,6 +719,8 @@ public class StatusTransitService implements Service { for (String coordJobId : coordJobIdList) { coordIds.add(coordJobId); } + coordIds.addAll(jpaService.execute(new CoordJobsGetChangedJPAExecutor(lastInstanceStartTime))); + pendingJobCheckList = new ArrayList<CoordinatorJobBean>(); for (String coordId : coordIds.toArray(new String[coordIds.size()])) { CoordinatorJobBean coordJob; http://git-wip-us.apache.org/repos/asf/oozie/blob/f3ab5da9/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java index f7eb38f..8eae7be 100644 --- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java @@ -34,6 +34,7 @@ import org.apache.oozie.command.CommandException; import org.apache.oozie.executor.jpa.CoordJobGetActionByActionNumberJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor; +import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor; import org.apache.oozie.executor.jpa.JPAExecutorException; import org.apache.oozie.executor.jpa.sla.SLACalculationInsertUpdateJPAExecutor; import org.apache.oozie.executor.jpa.sla.SLARegistrationGetJPAExecutor; @@ -281,6 +282,87 @@ public class TestCoordChangeXCommand extends XDataTestCase { } /** + * Testcase when changing end-time to before nextMaterializedTime + * reflects correct job status via StatusTransit + * + * @throws Exception + */ + public void testCoordChangeEndTime1() throws Exception { + + JPAService jpaService = Services.get().get(JPAService.class); + + Date startTime = new Date(); + Date endTime = new Date(startTime.getTime() + (20 * 60 * 1000)); + CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, true, true, 1); + coordJob.setNextMaterializedTime(new Date(startTime.getTime() + (10 * 60 * 1000))); + jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob)); + addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0); + + Runnable runnable = new StatusTransitService.StatusTransitRunnable(); + runnable.run(); // dummy run so we get to the interval check following coord job change + sleep(1000); + + String newEndTime = convertDateToString(startTime.getTime() + 5 * 60 * 1000); + + new CoordChangeXCommand(coordJob.getId(), "endtime=" + newEndTime).call(); + try { + checkCoordJobs(coordJob.getId(), DateUtils.parseDateOozieTZ(newEndTime), null, null, false); + } + catch (Exception ex) { + ex.printStackTrace(); + fail("Invalid date" + ex); + } + + CoordJobGetJPAExecutor coordGetCmd = new CoordJobGetJPAExecutor(coordJob.getId()); + coordJob = jpaService.execute(coordGetCmd); + assertEquals(Job.Status.RUNNING, coordJob.getStatus()); + assertTrue(coordJob.isPending()); + assertTrue(coordJob.isDoneMaterialization()); + + runnable.run(); + sleep(1000); + coordJob = jpaService.execute(coordGetCmd); + assertEquals(Job.Status.SUCCEEDED, coordJob.getStatus()); + assertFalse(coordJob.isPending()); + assertTrue(coordJob.isDoneMaterialization()); + } + + /** + * Testcase when changing end-time to after original end-time + * but before nextMaterializedTime should not cause unnecessary changes + * + * @throws Exception + */ + public void testCoordChangeEndTime2() throws Exception { + JPAService jpaService = Services.get().get(JPAService.class); + Date startTime = new Date(); + Date endTime = new Date(startTime.getTime() + (10 * 60 * 1000)); + CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, true, true, 1); + coordJob.setNextMaterializedTime(new Date(startTime.getTime() + (40 * 60 * 1000))); + jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob)); + addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0); + + Runnable runnable = new StatusTransitService.StatusTransitRunnable(); + runnable.run(); + + CoordJobGetJPAExecutor coordGetCmd = new CoordJobGetJPAExecutor(coordJob.getId()); + coordJob = jpaService.execute(coordGetCmd); + assertEquals(Job.Status.SUCCEEDED, coordJob.getStatus()); + assertFalse(coordJob.isPending()); + assertTrue(coordJob.isDoneMaterialization()); + + String newEndTime = convertDateToString(startTime.getTime() + 20 * 60 * 1000); + + new CoordChangeXCommand(coordJob.getId(), "endtime=" + newEndTime).call(); + + coordJob = jpaService.execute(coordGetCmd); + assertFalse(Job.Status.RUNNING == coordJob.getStatus()); + assertFalse(coordJob.isPending()); + assertTrue(coordJob.isDoneMaterialization()); + + } + + /** * Change the pause time and end time of a failed coordinator job. Check whether the status changes * to RUNNINGWITHERROR * @throws Exception @@ -408,8 +490,9 @@ public class TestCoordChangeXCommand extends XDataTestCase { new CoordChangeXCommand(job.getId(), pauseTimeChangeStr).call(); fail("Should not reach here."); } catch(CommandException e) { - if(e.getErrorCode() != ErrorCode.E1022) + if(e.getErrorCode() != ErrorCode.E1022) { fail("Error code should be E1022"); + } } } http://git-wip-us.apache.org/repos/asf/oozie/blob/f3ab5da9/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 6152fa8..8d0f26d 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.0.1 release +OOZIE-1632 Coordinators that undergo change endtime but are doneMaterialization, not getting picked for StatusTransit (mona) OOZIE-1736 Switch to Hadoop 2.3.0 for the hadoop-2 profile (rkanter) OOZIE-1670 Workflow kill command doesn't kill child job for map-reduce action (puru via rohini) OOZIE-1630 <prepare> operations fail when path doesn't have scheme (ryota)
