Repository: oozie Updated Branches: refs/heads/master d0921f691 -> 9959e2ca0
OOZIE-2285 Change in concurrency should trigger coord action ready command (kailongs via rohini) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/9959e2ca Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/9959e2ca Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/9959e2ca Branch: refs/heads/master Commit: 9959e2ca0b2aa71f86da7b68b4905f670680b20c Parents: d0921f6 Author: Rohini Palaniswamy <[email protected]> Authored: Tue Jul 7 10:31:27 2015 -0700 Committer: Rohini Palaniswamy <[email protected]> Committed: Tue Jul 7 10:31:27 2015 -0700 ---------------------------------------------------------------------- .../command/coord/CoordChangeXCommand.java | 14 +++++--- .../command/coord/TestCoordChangeXCommand.java | 37 +++++++++++++++++--- release-log.txt | 1 + 3 files changed, 44 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/9959e2ca/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java index 00c547d..d060859 100644 --- a/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java @@ -39,17 +39,17 @@ import org.apache.oozie.client.rest.JsonBean; import org.apache.oozie.command.CommandException; import org.apache.oozie.command.PreconditionException; import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; +import org.apache.oozie.executor.jpa.BatchQueryExecutor; +import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobGetActionByActionNumberJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; +import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; +import org.apache.oozie.executor.jpa.JPAExecutorException; import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor; import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery; import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor; -import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery; -import org.apache.oozie.executor.jpa.JPAExecutorException; -import org.apache.oozie.executor.jpa.BatchQueryExecutor; -import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; import org.apache.oozie.service.JPAService; import org.apache.oozie.service.Services; import org.apache.oozie.sla.SLARegistrationBean; @@ -64,6 +64,7 @@ import org.apache.oozie.util.StatusUtils; public class CoordChangeXCommand extends CoordinatorXCommand<Void> { private final String jobId; private Date newEndTime = null; + private Integer oldConcurrency = null; private Integer newConcurrency = null; private Date newPauseTime = null; private Date oldPauseTime = null; @@ -344,6 +345,7 @@ public class CoordChangeXCommand extends CoordinatorXCommand<Void> { LOG.info("STARTED CoordChangeXCommand for jobId=" + jobId); try { + oldConcurrency = this.coordJob.getConcurrency(); if (newEndTime != null) { // during coord materialization, nextMaterializedTime is set to // startTime + n(actions materialized) * frequency and this can be AFTER endTime, @@ -452,6 +454,10 @@ public class CoordChangeXCommand extends CoordinatorXCommand<Void> { updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_CHANGE, coordJob)); BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, deleteList); + if (newConcurrency != null && newConcurrency > oldConcurrency) { + queue(new CoordActionReadyXCommand(jobId)); + } + return null; } catch (XException ex) { http://git-wip-us.apache.org/repos/asf/oozie/blob/9959e2ca/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java index 7c154c8..3a91aa5 100644 --- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java @@ -29,20 +29,21 @@ import org.apache.oozie.client.CoordinatorAction; import org.apache.oozie.client.CoordinatorJob; import org.apache.oozie.client.CoordinatorJob.Execution; import org.apache.oozie.client.CoordinatorJob.Timeunit; -import org.apache.oozie.client.rest.JsonBean; import org.apache.oozie.client.Job; +import org.apache.oozie.client.rest.JsonBean; import org.apache.oozie.command.CommandException; import org.apache.oozie.executor.jpa.BatchQueryExecutor; +import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; import org.apache.oozie.executor.jpa.CoordJobGetActionByActionNumberJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; -import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor; -import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor; import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; +import org.apache.oozie.executor.jpa.JPAExecutorException; +import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor; import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery; +import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor; import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery; -import org.apache.oozie.executor.jpa.JPAExecutorException; import org.apache.oozie.service.JPAService; import org.apache.oozie.service.Services; import org.apache.oozie.service.StatusTransitService; @@ -628,6 +629,34 @@ public class TestCoordChangeXCommand extends XDataTestCase { assertNotNull(slaSummaryBean1); } + public void testCoordChangeConcurrency() throws Exception { + Date startTime = DateUtils.parseDateOozieTZ("2013-08-01T00:00Z"); + Date endTime = DateUtils.parseDateOozieTZ("2013-08-01T04:59Z"); + final CoordinatorJobBean job = addRecordToCoordJobTableForPauseTimeTest(CoordinatorJob.Status.RUNNING, + startTime, endTime, endTime, true, false, 4); + CoordinatorActionBean ca1 = addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.RUNNING, + "coord-action-get.xml", 0); + CoordinatorActionBean ca2 = addRecordToCoordActionTable(job.getId(), 2, CoordinatorAction.Status.RUNNING, + "coord-action-get.xml", 0); + CoordinatorActionBean ca3 = addRecordToCoordActionTable(job.getId(), 3, CoordinatorAction.Status.READY, + "coord-action-get.xml", 0); + CoordinatorActionBean ca4 = addRecordToCoordActionTable(job.getId(), 4, CoordinatorAction.Status.READY, + "coord-action-get.xml", 0); + new CoordChangeXCommand(job.getId(), "concurrency=4").call(); + Thread.sleep(100); + ca1 = CoordActionQueryExecutor.getInstance().get(CoordActionQueryExecutor.CoordActionQuery.GET_COORD_ACTION, + job.getId() + "@1"); + ca2 = CoordActionQueryExecutor.getInstance().get(CoordActionQueryExecutor.CoordActionQuery.GET_COORD_ACTION, + job.getId() + "@2"); + ca3 = CoordActionQueryExecutor.getInstance().get(CoordActionQueryExecutor.CoordActionQuery.GET_COORD_ACTION, + job.getId() + "@3"); + ca4 = CoordActionQueryExecutor.getInstance().get(CoordActionQueryExecutor.CoordActionQuery.GET_COORD_ACTION, + job.getId() + "@4"); + assertEquals(ca1.getStatusStr(), CoordinatorAction.Status.RUNNING.toString()); + assertEquals(ca2.getStatusStr(), CoordinatorAction.Status.RUNNING.toString()); + assertFalse(ca3.getStatus().equals(CoordinatorAction.Status.READY)); + assertFalse(ca4.getStatus().equals(CoordinatorAction.Status.READY)); + } // Checks that RUNNING coord action is not deleted public void testChangeTimeDeleteRunning() throws Exception { Date startTime = DateUtils.parseDateOozieTZ("2013-08-01T00:00Z"); http://git-wip-us.apache.org/repos/asf/oozie/blob/9959e2ca/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 521a6ca..73ef9c7 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.3.0 release (trunk - unreleased) +OOZIE-2285 Change in concurrency should trigger coord action ready command (kailongs via rohini) OOZIE-2284 HBaseCredentials should only add hbase-default.xml and hbase-site.xml to actionConf (rohini) OOZIE-2286 Update Log4j and Log4j-extras to latest 1.2.x release (rkanter) OOZIE-2250 Show log for WAITING and TIMEDOUT coord actions (kailongs via rohini)
