Updated Branches: refs/heads/master 8ca266fac -> 6c69089fd
OOZIE-1632 Coordinators that undergo change endtime but are doneMaterialization, not getting picked for StatusTransit (mona) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/6c69089f Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/6c69089f Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/6c69089f Branch: refs/heads/master Commit: 6c69089fde5aeaffc090b0d42ff5345a1d568651 Parents: 8ca266f Author: Mona Chitnis <[email protected]> Authored: Mon Dec 30 16:34:11 2013 -0800 Committer: Mona Chitnis <[email protected]> Committed: Mon Dec 30 16:34:11 2013 -0800 ---------------------------------------------------------------------- .../org/apache/oozie/CoordinatorJobBean.java | 2 + .../command/coord/CoordChangeXCommand.java | 36 +++-- .../executor/jpa/CoordActionQueryExecutor.java | 34 ++++- ...ActionsGetByLastModifiedTimeJPAExecutor.java | 68 ---------- .../executor/jpa/CoordJobQueryExecutor.java | 11 +- .../oozie/service/StatusTransitService.java | 21 +-- .../command/coord/TestCoordChangeXCommand.java | 132 ++++++++++++++++++- ...dActionGetByLastModifiedTimeJPAExecutor.java | 80 ----------- .../executor/jpa/TestCoordJobQueryExecutor.java | 24 +++- release-log.txt | 1 + 10 files changed, 233 insertions(+), 176 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/6c69089f/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java index f189b69..16209ce 100644 --- a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java +++ b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java @@ -97,6 +97,8 @@ import org.json.simple.JSONObject; @NamedQuery(name = "GET_COORD_JOBS_PENDING", query = "select OBJECT(w) from CoordinatorJobBean w where w.pending = 1 order by w.lastModifiedTimestamp"), + @NamedQuery(name = "GET_COORD_JOBS_CHANGED", query = "select OBJECT(w) from CoordinatorJobBean w where w.pending = 1 AND w.doneMaterialization = 1 AND w.lastModifiedTimestamp >= :lastModifiedTime"), + @NamedQuery(name = "GET_COORD_JOBS_COUNT", query = "select count(w) from CoordinatorJobBean w"), @NamedQuery(name = "GET_COORD_JOBS_COLUMNS", query = "select w.id, w.appName, w.statusStr, w.user, w.group, w.startTimestamp, w.endTimestamp, w.appPath, w.concurrency, w.frequency, w.lastActionTimestamp, w.nextMaterializedTimestamp, w.createdTimestamp, w.timeUnitStr, w.timeZone, w.timeOut from CoordinatorJobBean w order by w.createdTimestamp desc"), http://git-wip-us.apache.org/repos/asf/oozie/blob/6c69089f/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java index 65685db..4957330 100644 --- a/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java @@ -325,19 +325,30 @@ public class CoordChangeXCommand extends CoordinatorXCommand<Void> { try { if (newEndTime != null) { - coordJob.setEndTime(newEndTime); - if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED){ - coordJob.setStatus(CoordinatorJob.Status.RUNNING); - } - if (coordJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR - || coordJob.getStatus() == CoordinatorJob.Status.FAILED) { - // Check for backward compatibility for Oozie versions (3.2 and before) - // when RUNNINGWITHERROR, SUSPENDEDWITHERROR and - // PAUSEDWITHERROR is not supported - coordJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(CoordinatorJob.Status.RUNNINGWITHERROR)); + // during coord materialization, nextMaterializedTime is set to + // startTime + n(actions materialized) * frequency and this can be AFTER endTime, + // while doneMaterialization is true. Hence the following checks + // for newEndTime being in the middle of endTime and nextMatdTime. + // Since job is already done materialization so no need to change + boolean dontChange = coordJob.getEndTime().before(newEndTime) + && coordJob.getNextMaterializedTime() != null + && coordJob.getNextMaterializedTime().after(newEndTime); + if (!dontChange) { + coordJob.setEndTime(newEndTime); + if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED) { + coordJob.setStatus(CoordinatorJob.Status.RUNNING); + } + if (coordJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR + || coordJob.getStatus() == CoordinatorJob.Status.FAILED) { + // Check for backward compatibility for Oozie versions (3.2 and before) + // when RUNNINGWITHERROR, SUSPENDEDWITHERROR and + // PAUSEDWITHERROR is not supported + coordJob.setStatus(StatusUtils + .getStatusIfBackwardSupportTrue(CoordinatorJob.Status.RUNNINGWITHERROR)); + } + coordJob.setPending(); + coordJob.resetDoneMaterialization(); } - coordJob.setPending(); - coordJob.resetDoneMaterialization(); } if (newConcurrency != null) { @@ -376,6 +387,7 @@ public class CoordChangeXCommand extends CoordinatorXCommand<Void> { coordJob.setDoneMaterialization(); } + coordJob.setLastModifiedTime(new Date()); updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_CHANGE, coordJob)); BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, deleteList); http://git-wip-us.apache.org/repos/asf/oozie/blob/6c69089f/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java index fc65cf3..f5304ca 100644 --- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java +++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java @@ -17,10 +17,14 @@ */ package org.apache.oozie.executor.jpa; +import java.sql.Timestamp; +import java.util.ArrayList; import java.util.Date; import java.util.List; + import javax.persistence.EntityManager; import javax.persistence.Query; + import org.apache.oozie.CoordinatorActionBean; import org.apache.oozie.ErrorCode; import org.apache.oozie.service.JPAService; @@ -44,7 +48,8 @@ public class CoordActionQueryExecutor extends UPDATE_COORD_ACTION_FOR_MODIFIED_DATE, UPDATE_COORD_ACTION_RERUN, GET_COORD_ACTION, - GET_COORD_ACTIVE_ACTIONS_COUNT_BY_JOBID + GET_COORD_ACTIVE_ACTIONS_COUNT_BY_JOBID, + GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME }; private static CoordActionQueryExecutor instance = new CoordActionQueryExecutor(); @@ -171,6 +176,9 @@ public class CoordActionQueryExecutor extends case GET_COORD_ACTION: query.setParameter("id", parameters[0]); break; + case GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME: + query.setParameter("lastModifiedTime", new Timestamp(((Date) parameters[0]).getTime())); + break; default: throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for " + caQuery.name()); @@ -202,14 +210,30 @@ public class CoordActionQueryExecutor extends throws JPAExecutorException { EntityManager em = jpaService.getEntityManager(); Query query = getSelectQuery(namedQuery, em, parameters); - List<CoordinatorActionBean> beanList = (List<CoordinatorActionBean>) jpaService.executeGetList( - namedQuery.name(), query, em); - if (beanList == null || beanList.size() == 0) { - throw new JPAExecutorException(ErrorCode.E0605, query.toString()); + List<?> retList = (List<?>) jpaService.executeGetList(namedQuery.name(), query, em); + List<CoordinatorActionBean> beanList = new ArrayList<CoordinatorActionBean>(); + if (retList != null) { + for (Object ret : retList) { + beanList.add(constructBean(namedQuery, ret)); + } } return beanList; } + private CoordinatorActionBean constructBean(CoordActionQuery namedQuery, Object ret) throws JPAExecutorException { + CoordinatorActionBean bean; + switch (namedQuery) { + case GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME: + bean = new CoordinatorActionBean(); + bean.setJobId((String) ret); + break; + default: + throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct action bean for " + + namedQuery.name()); + } + return bean; + } + @VisibleForTesting public static void destroy() { if (instance != null) { http://git-wip-us.apache.org/repos/asf/oozie/blob/6c69089f/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetByLastModifiedTimeJPAExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetByLastModifiedTimeJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetByLastModifiedTimeJPAExecutor.java deleted file mode 100644 index 4f1e30b..0000000 --- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetByLastModifiedTimeJPAExecutor.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.oozie.executor.jpa; - -import java.sql.Timestamp; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; - -import javax.persistence.EntityManager; -import javax.persistence.Query; - -import org.apache.oozie.CoordinatorActionBean; -import org.apache.oozie.ErrorCode; - -/** - * Load the list of CoordinatorAction ordered by lastModifiedTime - */ -public class CoordActionsGetByLastModifiedTimeJPAExecutor implements JPAExecutor<List<String>> { - private Date d = null; - - public CoordActionsGetByLastModifiedTimeJPAExecutor(Date d) { - this.d = d; - } - - /* (non-Javadoc) - * @see org.apache.oozie.executor.jpa.JPAExecutor#getName() - */ - @Override - public String getName() { - return "CoordActionsGetByLastModifiedTimeJPAExecutor"; - } - - /* (non-Javadoc) - * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.EntityManager) - */ - @Override - @SuppressWarnings("unchecked") - public List<String> execute(EntityManager em) throws JPAExecutorException { - try { - Query q = em.createNamedQuery("GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME"); - q.setParameter("lastModifiedTime", new Timestamp(d.getTime())); - List<String> coordJobIds = q.getResultList(); - return coordJobIds; - } - catch (Exception e) { - throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e); - } - - - } - -} http://git-wip-us.apache.org/repos/asf/oozie/blob/6c69089f/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java index 8e97436..240b352 100644 --- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java +++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java @@ -19,7 +19,9 @@ package org.apache.oozie.executor.jpa; import java.sql.Timestamp; import java.util.ArrayList; +import java.util.Date; import java.util.List; + import javax.persistence.EntityManager; import javax.persistence.Query; @@ -58,7 +60,8 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo GET_COORD_JOB_ACTION_KILL, GET_COORD_JOB_MATERIALIZE, GET_COORD_JOB_SUSPEND_KILL, - GET_COORD_JOB_STATUS_PARENTID + GET_COORD_JOB_STATUS_PARENTID, + GET_COORD_JOBS_CHANGED }; private static CoordJobQueryExecutor instance = new CoordJobQueryExecutor(); @@ -200,6 +203,9 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo case GET_COORD_JOB_STATUS_PARENTID: query.setParameter("id", parameters[0]); break; + case GET_COORD_JOBS_CHANGED: + query.setParameter("lastModifiedTime", new Timestamp(((Date)parameters[0]).getTime())); + break; default: throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for " + namedQuery.name()); @@ -300,6 +306,9 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo bean.setStatusStr((String) arr[0]); bean.setBundleId((String) arr[1]); break; + case GET_COORD_JOBS_CHANGED: + bean = (CoordinatorJobBean) ret; + break; default: throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct job bean for " + namedQuery.name()); http://git-wip-us.apache.org/repos/asf/oozie/blob/6c69089f/core/src/main/java/org/apache/oozie/service/StatusTransitService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/StatusTransitService.java b/core/src/main/java/org/apache/oozie/service/StatusTransitService.java index 4e05a05..e093c7d 100644 --- a/core/src/main/java/org/apache/oozie/service/StatusTransitService.java +++ b/core/src/main/java/org/apache/oozie/service/StatusTransitService.java @@ -29,6 +29,7 @@ import java.util.Comparator; import org.apache.hadoop.conf.Configuration; import org.apache.oozie.BundleActionBean; import org.apache.oozie.BundleJobBean; +import org.apache.oozie.CoordinatorActionBean; import org.apache.oozie.CoordinatorJobBean; import org.apache.oozie.ErrorCode; import org.apache.oozie.client.CoordinatorAction; @@ -41,9 +42,9 @@ import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery; import org.apache.oozie.executor.jpa.BundleActionQueryExecutor; import org.apache.oozie.executor.jpa.BundleJobsGetRunningOrPendingJPAExecutor; -import org.apache.oozie.executor.jpa.CoordActionsGetByLastModifiedTimeJPAExecutor; +import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; +import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; import org.apache.oozie.executor.jpa.CoordJobGetActionsStatusJPAExecutor; -import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobGetPendingActionsCountJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; @@ -707,18 +708,20 @@ public class StatusTransitService implements Service { LOG.info("Running coordinator status service from last instance time = " + DateUtils.formatDateOozieTZ(lastInstanceStartTime)); // this is not the first instance, we should only check jobs - // that have actions been + // that have actions or jobs been // updated >= start time of last service run; - List<String> coordJobIdList = jpaService - .execute(new CoordActionsGetByLastModifiedTimeJPAExecutor(lastInstanceStartTime)); + List<CoordinatorActionBean> actionsList = CoordActionQueryExecutor.getInstance().getList( + CoordActionQuery.GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME, lastInstanceStartTime); Set<String> coordIds = new HashSet<String>(); - coordIds.addAll(coordJobIdList); + for (CoordinatorActionBean action : actionsList) { + coordIds.add(action.getJobId()); + } pendingJobCheckList = new ArrayList<CoordinatorJobBean>(); for (String coordId : coordIds.toArray(new String[coordIds.size()])) { CoordinatorJobBean coordJob; - try{ - coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordId)); + try { + coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, coordId); } catch (JPAExecutorException jpaee) { if (jpaee.getErrorCode().equals(ErrorCode.E0604)) { @@ -737,6 +740,8 @@ public class StatusTransitService implements Service { pendingJobCheckList.add(coordJob); } } + pendingJobCheckList.addAll(CoordJobQueryExecutor.getInstance().getList( + CoordJobQuery.GET_COORD_JOBS_CHANGED, lastInstanceStartTime)); } aggregateCoordJobsStatus(pendingJobCheckList); } http://git-wip-us.apache.org/repos/asf/oozie/blob/6c69089f/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java index 906882a..b9bbf16 100644 --- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java @@ -35,6 +35,8 @@ import org.apache.oozie.executor.jpa.BatchQueryExecutor; import org.apache.oozie.executor.jpa.CoordJobGetActionByActionNumberJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor; +import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; +import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; import org.apache.oozie.executor.jpa.JPAExecutorException; import org.apache.oozie.executor.jpa.sla.SLARegistrationGetJPAExecutor; import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor; @@ -280,6 +282,133 @@ public class TestCoordChangeXCommand extends XDataTestCase { } /** + * Testcase when changing end-time == nextMaterializedTime + * reflects correct job status via StatusTransit + * + * @throws Exception + */ + public void testCoordChangeEndTime1() throws Exception { + + JPAService jpaService = Services.get().get(JPAService.class); + + Date startTime = new Date(); + Date endTime = new Date(startTime.getTime() + (50 * 60 * 1000)); + CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, true, true, 1); + coordJob.setNextMaterializedTime(new Date(startTime.getTime() + (30 * 60 * 1000))); + CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJob); + addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0); + + Runnable runnable = new StatusTransitService.StatusTransitRunnable(); + runnable.run(); // dummy run so we get to the interval check following coord job change + sleep(1000); + + assertEquals(endTime.getTime(), coordJob.getEndTime().getTime()); // checking before change + + String newEndTime = convertDateToString(startTime.getTime() + 30 * 60 * 1000); + + new CoordChangeXCommand(coordJob.getId(), "endtime=" + newEndTime).call(); + try { + checkCoordJobs(coordJob.getId(), DateUtils.parseDateOozieTZ(newEndTime), null, null, false); + } + catch (Exception ex) { + ex.printStackTrace(); + fail("Invalid date" + ex); + } + + CoordJobGetJPAExecutor coordGetCmd = new CoordJobGetJPAExecutor(coordJob.getId()); + coordJob = jpaService.execute(coordGetCmd); + assertEquals(Job.Status.RUNNING, coordJob.getStatus()); + assertEquals(newEndTime, convertDateToString(coordJob.getEndTime().getTime())); // checking after change + assertTrue(coordJob.isPending()); + assertTrue(coordJob.isDoneMaterialization()); + + runnable.run(); + sleep(1000); + coordJob = jpaService.execute(coordGetCmd); + assertEquals(Job.Status.SUCCEEDED, coordJob.getStatus()); + assertFalse(coordJob.isPending()); + assertTrue(coordJob.isDoneMaterialization()); + } + + /** + * Testcase when changing end-time > nextMaterializedTime, but < original end + * reflects correct job state and values + * + * @throws Exception + */ + public void testCoordChangeEndTime2() throws Exception { + + JPAService jpaService = Services.get().get(JPAService.class); + + Date startTime = new Date(); + Date endTime = new Date(startTime.getTime() + (50 * 60 * 1000)); + CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, true, true, 1); + coordJob.setNextMaterializedTime(new Date(startTime.getTime() + (30 * 60 * 1000))); + CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJob); + addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0); + + assertTrue(coordJob.isDoneMaterialization()); // checking initial condition before change + + Runnable runnable = new StatusTransitService.StatusTransitRunnable(); + runnable.run(); // dummy run so we get to the interval check following coord job change + sleep(1000); + + String newEndTime = convertDateToString(startTime.getTime() + 40 * 60 * 1000); + + new CoordChangeXCommand(coordJob.getId(), "endtime=" + newEndTime).call(); + try { + checkCoordJobs(coordJob.getId(), DateUtils.parseDateOozieTZ(newEndTime), null, null, false); + } + catch (Exception ex) { + ex.printStackTrace(); + fail("Invalid date" + ex); + } + + CoordJobGetJPAExecutor coordGetCmd = new CoordJobGetJPAExecutor(coordJob.getId()); + coordJob = jpaService.execute(coordGetCmd); + assertEquals(Job.Status.RUNNING, coordJob.getStatus()); + assertTrue(coordJob.isPending()); + assertFalse(coordJob.isDoneMaterialization()); // <-- changed + assertEquals(newEndTime, convertDateToString(coordJob.getEndTime().getTime())); + + } + + /** + * Testcase when changing end-time to after original end-time + * but before nextMaterializedTime should not cause unnecessary changes + * + * @throws Exception + */ + public void testCoordChangeEndTime3() throws Exception { + JPAService jpaService = Services.get().get(JPAService.class); + Date startTime = new Date(); + Date endTime = new Date(startTime.getTime() + (10 * 60 * 1000)); + CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, true, true, 1); + coordJob.setNextMaterializedTime(new Date(startTime.getTime() + (40 * 60 * 1000))); + CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJob); + addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0); + + Runnable runnable = new StatusTransitService.StatusTransitRunnable(); + runnable.run(); + + CoordJobGetJPAExecutor coordGetCmd = new CoordJobGetJPAExecutor(coordJob.getId()); + coordJob = jpaService.execute(coordGetCmd); + assertEquals(Job.Status.SUCCEEDED, coordJob.getStatus()); + assertFalse(coordJob.isPending()); + assertTrue(coordJob.isDoneMaterialization()); + + String newEndTime = convertDateToString(startTime.getTime() + 20 * 60 * 1000); + + new CoordChangeXCommand(coordJob.getId(), "endtime=" + newEndTime).call(); + + coordJob = jpaService.execute(coordGetCmd); + assertFalse(Job.Status.RUNNING == coordJob.getStatus()); + assertFalse(coordJob.isPending()); + assertTrue(coordJob.isDoneMaterialization()); + + } + + /** * Change the pause time and end time of a failed coordinator job. Check whether the status changes * to RUNNINGWITHERROR * @throws Exception @@ -407,8 +536,9 @@ public class TestCoordChangeXCommand extends XDataTestCase { new CoordChangeXCommand(job.getId(), pauseTimeChangeStr).call(); fail("Should not reach here."); } catch(CommandException e) { - if(e.getErrorCode() != ErrorCode.E1022) + if(e.getErrorCode() != ErrorCode.E1022) { fail("Error code should be E1022"); + } } } http://git-wip-us.apache.org/repos/asf/oozie/blob/6c69089f/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionGetByLastModifiedTimeJPAExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionGetByLastModifiedTimeJPAExecutor.java b/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionGetByLastModifiedTimeJPAExecutor.java deleted file mode 100644 index e799e2d..0000000 --- a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionGetByLastModifiedTimeJPAExecutor.java +++ /dev/null @@ -1,80 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.oozie.executor.jpa; - -import java.util.Date; -import java.util.List; - -import org.apache.oozie.CoordinatorJobBean; -import org.apache.oozie.client.CoordinatorAction; -import org.apache.oozie.client.CoordinatorJob; -import org.apache.oozie.local.LocalOozie; -import org.apache.oozie.service.JPAService; -import org.apache.oozie.service.Services; -import org.apache.oozie.test.XDataTestCase; - -public class TestCoordActionGetByLastModifiedTimeJPAExecutor extends XDataTestCase { - Services services; - - @Override - protected void setUp() throws Exception { - super.setUp(); - services = new Services(); - services.init(); - } - - @Override - protected void tearDown() throws Exception { - services.destroy(); - super.tearDown(); - } - - public void testCoordActionGet() throws Exception { - // Get date before action is created - Date dateBeforeAction = new Date(); - int actionNum = 1; - // Add two jobs with lastmodifiedtime > dateBeforeAction - CoordinatorJobBean job1 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false); - String jobId1 = job1.getId(); - CoordinatorJobBean job2 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false); - String jobId2 = job2.getId(); - addRecordToCoordActionTable(jobId1, actionNum++, - CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0); - addRecordToCoordActionTable(jobId2, actionNum++, - CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0); - - _testCoordActionGetByLastModifiedTime(jobId1, jobId2, dateBeforeAction); - } - - - private void _testCoordActionGetByLastModifiedTime(String jobId1, String jobId2, Date dateBeforeAction) - throws Exception { - JPAService jpaService = Services.get().get(JPAService.class); - assertNotNull(jpaService); - // Call JPAExecutor to get actions which are modified after the given - // date - List<String> coordJobIds = jpaService - .execute(new CoordActionsGetByLastModifiedTimeJPAExecutor(dateBeforeAction)); - assertEquals(2, coordJobIds.size()); - assertEquals(jobId1, coordJobIds.get(0)); - assertEquals(jobId2, coordJobIds.get(1)); - } - - - -} http://git-wip-us.apache.org/repos/asf/oozie/blob/6c69089f/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobQueryExecutor.java b/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobQueryExecutor.java index 9939bbd..1e75bbd 100644 --- a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobQueryExecutor.java +++ b/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobQueryExecutor.java @@ -17,6 +17,10 @@ */ package org.apache.oozie.executor.jpa; +import java.sql.Timestamp; +import java.util.Date; +import java.util.List; + import javax.persistence.EntityManager; import javax.persistence.Query; @@ -257,7 +261,25 @@ public class TestCoordJobQueryExecutor extends XDataTestCase { } public void testGetList() throws Exception { - // TODO + CoordinatorJobBean bean1 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, true, true); + CoordinatorJobBean bean2 = addRecordToCoordJobTable(CoordinatorJob.Status.DONEWITHERROR, true, true); + + // time to check last modified time against + Date queryTime = new Date(); + bean1.setLastModifiedTime(new Date(queryTime.getTime() + 1000)); + bean2.setLastModifiedTime(new Date(queryTime.getTime() + 2000)); + CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_LAST_MODIFIED_TIME, bean1); + CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_LAST_MODIFIED_TIME, bean2); + + // GET_COORD_JOBS_CHANGED + List<CoordinatorJobBean> retBeans = CoordJobQueryExecutor.getInstance().getList( + CoordJobQuery.GET_COORD_JOBS_CHANGED, new Timestamp(queryTime.getTime())); + assertEquals(2, retBeans.size()); + assertEquals(bean1.getId(), retBeans.get(0).getId()); + assertEquals(bean1.getStatus(), retBeans.get(0).getStatus()); + + assertEquals(bean2.getId(), retBeans.get(1).getId()); + assertEquals(bean2.getStatus(), retBeans.get(1).getStatus()); } public void testInsert() throws Exception { http://git-wip-us.apache.org/repos/asf/oozie/blob/6c69089f/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 3aa3186..6ee2aa2 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.1.0 release (trunk - unreleased) +OOZIE-1632 Coordinators that undergo change endtime but are doneMaterialization, not getting picked for StatusTransit (mona) OOZIE-1548 OozieDBCLI changes to convert clob to blob and remove the discriminator column (virag) OOZIE-1504 Allow specifying a fixed instance as the start instance of a data-in (puru via rohini) OOZIE-1576 Add documentation for Oozie Sqoop CLI (bowenzhangusa via rkanter)
