Repository: oozie
Updated Branches:
  refs/heads/branch-4.0 3817f3346 -> f3ab5da94


OOZIE-1632 Coordinators that undergo change endtime but are 
doneMaterialization, not getting picked for StatusTransit (mona)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/f3ab5da9
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/f3ab5da9
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/f3ab5da9

Branch: refs/heads/branch-4.0
Commit: f3ab5da942856b4d76234aa6755a66dc3ebd57d7
Parents: 3817f33
Author: Mona Chitnis <[email protected]>
Authored: Fri Mar 21 13:50:49 2014 -0700
Committer: Mona Chitnis <[email protected]>
Committed: Fri Mar 21 13:50:49 2014 -0700

----------------------------------------------------------------------
 .../org/apache/oozie/CoordinatorJobBean.java    |  2 +
 .../command/coord/CoordChangeXCommand.java      | 31 ++++---
 .../jpa/CoordJobsGetChangedJPAExecutor.java     | 60 ++++++++++++++
 .../oozie/service/StatusTransitService.java     |  5 +-
 .../command/coord/TestCoordChangeXCommand.java  | 85 +++++++++++++++++++-
 release-log.txt                                 |  1 +
 6 files changed, 170 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/f3ab5da9/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java 
b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
index d51bc73..3a8c769 100644
--- a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
+++ b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
@@ -53,6 +53,8 @@ import org.apache.openjpa.persistence.jdbc.Index;
 
         @NamedQuery(name = "GET_COORD_JOBS_PENDING", query = "select OBJECT(w) 
from CoordinatorJobBean w where w.pending = 1 order by 
w.lastModifiedTimestamp"),
 
+        @NamedQuery(name = "GET_COORD_JOBS_CHANGED", query = "select w.id from 
CoordinatorJobBean w where w.pending = 1 AND w.doneMaterialization = 1 AND 
w.lastModifiedTimestamp >= :lastModifiedTime"),
+
         @NamedQuery(name = "GET_COORD_JOBS_COUNT", query = "select count(w) 
from CoordinatorJobBean w"),
 
         @NamedQuery(name = "GET_COORD_JOBS_COLUMNS", query = "select w.id, 
w.appName, w.status, w.user, w.group, w.startTimestamp, w.endTimestamp, 
w.appPath, w.concurrency, w.frequency, w.lastActionTimestamp, 
w.nextMaterializedTimestamp, w.createdTimestamp, w.timeUnitStr, w.timeZone, 
w.timeOut from CoordinatorJobBean w order by w.createdTimestamp desc"),

http://git-wip-us.apache.org/repos/asf/oozie/blob/f3ab5da9/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java 
b/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java
index 7b55634..fe4921a 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java
@@ -316,19 +316,25 @@ public class CoordChangeXCommand extends 
CoordinatorXCommand<Void> {
 
         try {
             if (newEndTime != null) {
-                coordJob.setEndTime(newEndTime);
-                if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED){
-                    coordJob.setStatus(CoordinatorJob.Status.RUNNING);
-                }
-                if (coordJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR
-                        || coordJob.getStatus() == 
CoordinatorJob.Status.FAILED) {
-                    // Check for backward compatibility for Oozie versions 
(3.2 and before)
-                    // when RUNNINGWITHERROR, SUSPENDEDWITHERROR and
-                    // PAUSEDWITHERROR is not supported
-                    
coordJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(CoordinatorJob.Status.RUNNINGWITHERROR));
+                boolean dontChange = coordJob.getEndTime().before(newEndTime)
+                        && coordJob.getNextMaterializedTime() != null
+                        && 
coordJob.getNextMaterializedTime().after(newEndTime);
+                if (!dontChange) {
+                    coordJob.setEndTime(newEndTime);
+                    if (coordJob.getStatus() == 
CoordinatorJob.Status.SUCCEEDED) {
+                        coordJob.setStatus(CoordinatorJob.Status.RUNNING);
+                    }
+                    if (coordJob.getStatus() == 
CoordinatorJob.Status.DONEWITHERROR
+                            || coordJob.getStatus() == 
CoordinatorJob.Status.FAILED) {
+                        // Check for backward compatibility for Oozie versions 
(3.2 and before)
+                        // when RUNNINGWITHERROR, SUSPENDEDWITHERROR and
+                        // PAUSEDWITHERROR is not supported
+                        coordJob.setStatus(StatusUtils
+                                
.getStatusIfBackwardSupportTrue(CoordinatorJob.Status.RUNNINGWITHERROR));
+                    }
+                    coordJob.setPending();
+                    coordJob.resetDoneMaterialization();
                 }
-                coordJob.setPending();
-                coordJob.resetDoneMaterialization();
             }
 
             if (newConcurrency != null) {
@@ -367,6 +373,7 @@ public class CoordChangeXCommand extends 
CoordinatorXCommand<Void> {
                 coordJob.setDoneMaterialization();
             }
 
+            coordJob.setLastModifiedTime(new Date());
             updateList.add(coordJob);
             jpaService.execute(new BulkUpdateDeleteJPAExecutor(updateList, 
deleteList, false));
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/f3ab5da9/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsGetChangedJPAExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsGetChangedJPAExecutor.java
 
b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsGetChangedJPAExecutor.java
new file mode 100644
index 0000000..4e32250
--- /dev/null
+++ 
b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsGetChangedJPAExecutor.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.sql.Timestamp;
+import java.util.Date;
+import java.util.List;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.ErrorCode;
+
+/**
+ * Load the list of Coordinator jobs changed by CoordChangeXCommand
+ */
+public class CoordJobsGetChangedJPAExecutor implements 
JPAExecutor<List<String>> {
+    private Date d = null;
+
+    public CoordJobsGetChangedJPAExecutor(Date d) {
+        this.d = d;
+    }
+
+    @Override
+    public String getName() {
+        return "CoordJobsGetChangedJPAExecutor";
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public List<String> execute(EntityManager em) throws JPAExecutorException {
+        try {
+            Query q = em.createNamedQuery("GET_COORD_JOBS_CHANGED");
+            q.setParameter("lastModifiedTime", new Timestamp(d.getTime()));
+            List<String> coordJobIds = q.getResultList();
+            return coordJobIds;
+        }
+        catch (Exception e) {
+            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
+        }
+
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/f3ab5da9/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/service/StatusTransitService.java 
b/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
index f36d852..acbcfd5 100644
--- a/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
+++ b/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
@@ -46,6 +46,7 @@ import 
org.apache.oozie.executor.jpa.CoordJobGetActionsStatusJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetPendingActionsCountJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobsGetChangedJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobsGetPendingJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.util.DateUtils;
@@ -710,7 +711,7 @@ public class StatusTransitService implements Service {
                 LOG.info("Running coordinator status service from last 
instance time =  "
                         + DateUtils.formatDateOozieTZ(lastInstanceStartTime));
                 // this is not the first instance, we should only check jobs
-                // that have actions been
+                // that have actions or jobs been
                 // updated >= start time of last service run;
                 List<String> coordJobIdList = jpaService
                         .execute(new 
CoordActionsGetByLastModifiedTimeJPAExecutor(lastInstanceStartTime));
@@ -718,6 +719,8 @@ public class StatusTransitService implements Service {
                 for (String coordJobId : coordJobIdList) {
                     coordIds.add(coordJobId);
                 }
+                coordIds.addAll(jpaService.execute(new 
CoordJobsGetChangedJPAExecutor(lastInstanceStartTime)));
+
                 pendingJobCheckList = new ArrayList<CoordinatorJobBean>();
                 for (String coordId : coordIds.toArray(new 
String[coordIds.size()])) {
                     CoordinatorJobBean coordJob;

http://git-wip-us.apache.org/repos/asf/oozie/blob/f3ab5da9/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
 
b/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
index f7eb38f..8eae7be 100644
--- 
a/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
+++ 
b/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
@@ -34,6 +34,7 @@ import org.apache.oozie.command.CommandException;
 import 
org.apache.oozie.executor.jpa.CoordJobGetActionByActionNumberJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.sla.SLACalculationInsertUpdateJPAExecutor;
 import org.apache.oozie.executor.jpa.sla.SLARegistrationGetJPAExecutor;
@@ -281,6 +282,87 @@ public class TestCoordChangeXCommand extends XDataTestCase 
{
     }
 
     /**
+     * Testcase when changing end-time to before nextMaterializedTime
+     * reflects correct job status via StatusTransit
+     *
+     * @throws Exception
+     */
+    public void testCoordChangeEndTime1() throws Exception {
+
+        JPAService jpaService = Services.get().get(JPAService.class);
+
+        Date startTime = new Date();
+        Date endTime = new Date(startTime.getTime() + (20 * 60 * 1000));
+        CoordinatorJobBean coordJob = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, 
true, true, 1);
+        coordJob.setNextMaterializedTime(new Date(startTime.getTime() + (10 * 
60 * 1000)));
+        jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
+        addRecordToCoordActionTable(coordJob.getId(), 1, 
CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0);
+
+        Runnable runnable = new StatusTransitService.StatusTransitRunnable();
+        runnable.run(); // dummy run so we get to the interval check following 
coord job change
+        sleep(1000);
+
+        String newEndTime = convertDateToString(startTime.getTime() + 5 * 60 * 
1000);
+
+        new CoordChangeXCommand(coordJob.getId(), "endtime=" + 
newEndTime).call();
+        try {
+            checkCoordJobs(coordJob.getId(), 
DateUtils.parseDateOozieTZ(newEndTime), null, null, false);
+        }
+        catch (Exception ex) {
+            ex.printStackTrace();
+            fail("Invalid date" + ex);
+        }
+
+        CoordJobGetJPAExecutor coordGetCmd = new 
CoordJobGetJPAExecutor(coordJob.getId());
+        coordJob = jpaService.execute(coordGetCmd);
+        assertEquals(Job.Status.RUNNING, coordJob.getStatus());
+        assertTrue(coordJob.isPending());
+        assertTrue(coordJob.isDoneMaterialization());
+
+        runnable.run();
+        sleep(1000);
+        coordJob = jpaService.execute(coordGetCmd);
+        assertEquals(Job.Status.SUCCEEDED, coordJob.getStatus());
+        assertFalse(coordJob.isPending());
+        assertTrue(coordJob.isDoneMaterialization());
+    }
+
+    /**
+     * Testcase when changing end-time to after original end-time
+     * but before nextMaterializedTime should not cause unnecessary changes
+     *
+     * @throws Exception
+     */
+    public void testCoordChangeEndTime2() throws Exception {
+        JPAService jpaService = Services.get().get(JPAService.class);
+        Date startTime = new Date();
+        Date endTime = new Date(startTime.getTime() + (10 * 60 * 1000));
+        CoordinatorJobBean coordJob = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, 
true, true, 1);
+        coordJob.setNextMaterializedTime(new Date(startTime.getTime() + (40 * 
60 * 1000)));
+        jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
+        addRecordToCoordActionTable(coordJob.getId(), 1, 
CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0);
+
+        Runnable runnable = new StatusTransitService.StatusTransitRunnable();
+        runnable.run();
+
+        CoordJobGetJPAExecutor coordGetCmd = new 
CoordJobGetJPAExecutor(coordJob.getId());
+        coordJob = jpaService.execute(coordGetCmd);
+        assertEquals(Job.Status.SUCCEEDED, coordJob.getStatus());
+        assertFalse(coordJob.isPending());
+        assertTrue(coordJob.isDoneMaterialization());
+
+        String newEndTime = convertDateToString(startTime.getTime() + 20 * 60 
* 1000);
+
+        new CoordChangeXCommand(coordJob.getId(), "endtime=" + 
newEndTime).call();
+
+        coordJob = jpaService.execute(coordGetCmd);
+        assertFalse(Job.Status.RUNNING == coordJob.getStatus());
+        assertFalse(coordJob.isPending());
+        assertTrue(coordJob.isDoneMaterialization());
+
+    }
+
+    /**
      * Change the pause time and end time of a failed coordinator job. Check 
whether the status changes
      * to RUNNINGWITHERROR
      * @throws Exception
@@ -408,8 +490,9 @@ public class TestCoordChangeXCommand extends XDataTestCase {
             new CoordChangeXCommand(job.getId(), pauseTimeChangeStr).call();
             fail("Should not reach here.");
         } catch(CommandException e) {
-            if(e.getErrorCode() != ErrorCode.E1022)
+            if(e.getErrorCode() != ErrorCode.E1022) {
                 fail("Error code should be E1022");
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/f3ab5da9/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 6152fa8..8d0f26d 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.0.1 release
 
+OOZIE-1632 Coordinators that undergo change endtime but are 
doneMaterialization, not getting picked for StatusTransit (mona)
 OOZIE-1736 Switch to Hadoop 2.3.0 for the hadoop-2 profile (rkanter)
 OOZIE-1670 Workflow kill command doesn't kill child job for map-reduce action 
(puru via rohini)
 OOZIE-1630 <prepare> operations fail when path doesn't have scheme (ryota)

Reply via email to