Author: rkanter
Date: Wed Aug  7 21:53:50 2013
New Revision: 1511513

URL: http://svn.apache.org/r1511513
Log:
OOZIE-1449 Coordinator Workflow parent relationship is broken for purge service 
(rkanter)

Added:
    
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor.java
    
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java
    
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsGetFromCoordParentIdJPAExecutor.java
    
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsGetFromWorkflowParentIdJPAExecutor.java
    
oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor.java
    
oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java
    
oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsGetFromCoordParentIdJPAExecutor.java
    
oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsGetFromWorkflowParentIdJPAExecutor.java
Removed:
    
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor.java
    
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsGetFromParentIdJPAExecutor.java
    
oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromParentIdJPAExecutor.java
    
oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsGetFromParentIdJPAExecutor.java
Modified:
    oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowJobBean.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java
    oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
    oozie/trunk/release-log.txt

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowJobBean.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowJobBean.java?rev=1511513&r1=1511512&r2=1511513&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowJobBean.java 
(original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowJobBean.java Wed 
Aug  7 21:53:50 2013
@@ -71,9 +71,13 @@ import org.apache.openjpa.persistence.jd
 
     @NamedQuery(name = "GET_WORKFLOWS_COUNT_WITH_STATUS_IN_LAST_N_SECS", query 
= "select count(w) from WorkflowJobBean w where w.status = :status and 
w.lastModifiedTimestamp > :lastModTime"),
 
-    @NamedQuery(name = "GET_WORKFLOWS_WITH_PARENT_ID", query = "select w.id 
from WorkflowJobBean w where w.parentId = :parentId"),
+    @NamedQuery(name = "GET_WORKFLOWS_WITH_WORKFLOW_PARENT_ID", query = 
"select w.id from WorkflowJobBean w where w.parentId = :parentId"),
 
-    @NamedQuery(name = 
"GET_WORKFLOWS_COUNT_WITH_PARENT_ID_NOT_READY_FOR_PURGE", query = "select 
count(w) from WorkflowJobBean w where w.parentId = :parentId and (w.status = 
'PREP' OR w.status = 'RUNNING' OR w.status = 'SUSPENDED' OR w.endTimestamp >= 
:endTime)"),
+    @NamedQuery(name = "GET_WORKFLOWS_WITH_COORD_PARENT_ID", query = "select 
w.id from WorkflowJobBean w where w.parentId like :parentId"), // when setting 
parentId parameter, make sure to append a '%' (percent symbol) at the end (e.g. 
0000004-130709155224435-oozie-rkan-C%")
+
+    @NamedQuery(name = 
"GET_WORKFLOWS_COUNT_WITH_WORKFLOW_PARENT_ID_NOT_READY_FOR_PURGE", query = 
"select count(w) from WorkflowJobBean w where w.parentId = :parentId and 
(w.status = 'PREP' OR w.status = 'RUNNING' OR w.status = 'SUSPENDED' OR 
w.endTimestamp >= :endTime)"),
+
+    @NamedQuery(name = 
"GET_WORKFLOWS_COUNT_WITH_COORD_PARENT_ID_NOT_READY_FOR_PURGE", query = "select 
count(w) from WorkflowJobBean w where w.parentId like :parentId and (w.status = 
'PREP' OR w.status = 'RUNNING' OR w.status = 'SUSPENDED' OR w.endTimestamp >= 
:endTime)"), // when setting parentId parameter, make sure to append a '%' 
(percent symbol) at the end (e.g. 0000004-130709155224435-oozie-rkan-C%")
 
     @NamedQuery(name = "GET_WORKFLOW_FOR_USER", query = "select w.user from 
WorkflowJobBean w where w.id = :id")
         })

Modified: 
oozie/trunk/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java?rev=1511513&r1=1511512&r2=1511513&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java 
(original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java 
Wed Aug  7 21:53:50 2013
@@ -31,10 +31,12 @@ import org.apache.oozie.executor.jpa.Coo
 import org.apache.oozie.executor.jpa.CoordJobsGetForPurgeJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobsGetFromParentIdJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
-import 
org.apache.oozie.executor.jpa.WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor;
+import 
org.apache.oozie.executor.jpa.WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor;
+import 
org.apache.oozie.executor.jpa.WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobsDeleteJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobsGetFromParentIdJPAExecutor;
+import 
org.apache.oozie.executor.jpa.WorkflowJobsGetFromWorkflowParentIdJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobsGetForPurgeJPAExecutor;
+import 
org.apache.oozie.executor.jpa.WorkflowJobsGetFromCoordParentIdJPAExecutor;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
 
@@ -176,7 +178,7 @@ public class PurgeXCommand extends XComm
         for (String wfId : wfs) {
             // We only purge the workflow and its children if they are all 
ready to be purged
             long numChildrenNotReady = jpaService.execute(
-                    new 
WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor(wfOlderThan, wfId));
+                    new 
WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(wfOlderThan, wfId));
             if (numChildrenNotReady == 0) {
                 wfsToPurge.add(wfId);
                 // Get all of the direct children for this workflow
@@ -184,7 +186,8 @@ public class PurgeXCommand extends XComm
                 int size;
                 do {
                     size = children.size();
-                    children.addAll(jpaService.execute(new 
WorkflowJobsGetFromParentIdJPAExecutor(wfId, children.size(), limit)));
+                    children.addAll(jpaService.execute(
+                            new 
WorkflowJobsGetFromWorkflowParentIdJPAExecutor(wfId, children.size(), limit)));
                 } while (size != children.size());
                 subwfs.addAll(children);
             }
@@ -206,7 +209,7 @@ public class PurgeXCommand extends XComm
         for (String coordId : coords) {
             // We only purge the coord and its children if they are all ready 
to be purged
             long numChildrenNotReady = jpaService.execute(
-                    new 
WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor(wfOlderThan, coordId));
+                    new 
WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(wfOlderThan, coordId));
             if (numChildrenNotReady == 0) {
                 coordsToPurge.add(coordId);
                 // Get all of the direct children for this coord
@@ -215,7 +218,7 @@ public class PurgeXCommand extends XComm
                 do {
                     size = children.size();
                     children.addAll(jpaService.execute(
-                            new 
WorkflowJobsGetFromParentIdJPAExecutor(coordId, children.size(), limit)));
+                            new 
WorkflowJobsGetFromCoordParentIdJPAExecutor(coordId, children.size(), limit)));
                 } while (size != children.size());
                 wfsToPurge.addAll(children);
             }

Added: 
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor.java?rev=1511513&view=auto
==============================================================================
--- 
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor.java
 (added)
+++ 
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor.java
 Wed Aug  7 21:53:50 2013
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.sql.Timestamp;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.ErrorCode;
+
+/**
+ * Count the number of Workflow children of a parent Coordinator that are not 
ready to be purged
+ */
+public class WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor 
implements JPAExecutor<Long> {
+
+    private static final long DAY_IN_MS = 24 * 60 * 60 * 1000;
+    private long olderThanDays;
+    private String parentId;
+
+    public WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(long 
olderThanDays, String parentId) {
+        this.olderThanDays = olderThanDays;
+        this.parentId = parentId;
+    }
+
+    @Override
+    public String getName() {
+        return "WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor";
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public Long execute(EntityManager em) throws JPAExecutorException {
+        Long count = 0L;
+        try {
+            Timestamp maxEndTime = new Timestamp(System.currentTimeMillis() - 
(olderThanDays * DAY_IN_MS));
+            Query jobQ = 
em.createNamedQuery("GET_WORKFLOWS_COUNT_WITH_COORD_PARENT_ID_NOT_READY_FOR_PURGE");
+            jobQ.setParameter("parentId", parentId + "%");  // The '%' is the 
wildcard
+            jobQ.setParameter("endTime", maxEndTime);
+            count = (Long) jobQ.getSingleResult();
+        }
+        catch (Exception e) {
+            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
+        }
+        return count;
+    }
+
+}

Added: 
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java?rev=1511513&view=auto
==============================================================================
--- 
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java
 (added)
+++ 
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java
 Wed Aug  7 21:53:50 2013
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.sql.Timestamp;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.ErrorCode;
+
+/**
+ * Count the number of Workflow children of a parent Workflow that are not 
ready to be purged
+ */
+public class WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor 
implements JPAExecutor<Long> {
+
+    private static final long DAY_IN_MS = 24 * 60 * 60 * 1000;
+    private long olderThanDays;
+    private String parentId;
+
+    public WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(long 
olderThanDays, String parentId) {
+        this.olderThanDays = olderThanDays;
+        this.parentId = parentId;
+    }
+
+    @Override
+    public String getName() {
+        return "WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor";
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public Long execute(EntityManager em) throws JPAExecutorException {
+        Long count = 0L;
+        try {
+            Timestamp maxEndTime = new Timestamp(System.currentTimeMillis() - 
(olderThanDays * DAY_IN_MS));
+            Query jobQ = 
em.createNamedQuery("GET_WORKFLOWS_COUNT_WITH_WORKFLOW_PARENT_ID_NOT_READY_FOR_PURGE");
+            jobQ.setParameter("parentId", parentId);
+            jobQ.setParameter("endTime", maxEndTime);
+            count = (Long) jobQ.getSingleResult();
+        }
+        catch (Exception e) {
+            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
+        }
+        return count;
+    }
+
+}

Added: 
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsGetFromCoordParentIdJPAExecutor.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsGetFromCoordParentIdJPAExecutor.java?rev=1511513&view=auto
==============================================================================
--- 
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsGetFromCoordParentIdJPAExecutor.java
 (added)
+++ 
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsGetFromCoordParentIdJPAExecutor.java
 Wed Aug  7 21:53:50 2013
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.List;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.ErrorCode;
+
+/**
+ * Load the list of WorkflowJob with the passed in coordinator parentId.  The 
parent id field for a workflow with a coordinator
+ * parent is the id of the coordinator action, not the coordinator job.  So, 
we have to use a wildcard to match (coordinator action
+ * ids start with the coordinator job id).
+ */
+public class WorkflowJobsGetFromCoordParentIdJPAExecutor implements 
JPAExecutor<List<String>> {
+
+    private String parentId;
+    private int limit;
+    private int offset;
+
+    public WorkflowJobsGetFromCoordParentIdJPAExecutor(String parentId, int 
limit) {
+        this(parentId, 0, limit);
+    }
+
+    public WorkflowJobsGetFromCoordParentIdJPAExecutor(String parentId, int 
offset, int limit) {
+        this.parentId = parentId;
+        this.offset = offset;
+        this.limit = limit;
+    }
+
+    @Override
+    public String getName() {
+        return "WorkflowJobsGetFromCoordParentIdJPAExecutor";
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public List<String> execute(EntityManager em) throws JPAExecutorException {
+        List<String> workflows = null;
+        try {
+            Query jobQ = 
em.createNamedQuery("GET_WORKFLOWS_WITH_COORD_PARENT_ID");
+            jobQ.setParameter("parentId", parentId + "%");  // The '%' is the 
wildcard
+            jobQ.setMaxResults(limit);
+            jobQ.setFirstResult(offset);
+            workflows = jobQ.getResultList();
+        }
+        catch (Exception e) {
+            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
+        }
+        return workflows;
+    }
+
+}

Added: 
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsGetFromWorkflowParentIdJPAExecutor.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsGetFromWorkflowParentIdJPAExecutor.java?rev=1511513&view=auto
==============================================================================
--- 
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsGetFromWorkflowParentIdJPAExecutor.java
 (added)
+++ 
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsGetFromWorkflowParentIdJPAExecutor.java
 Wed Aug  7 21:53:50 2013
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.List;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.ErrorCode;
+
+/**
+ * Load the list of WorkflowJob with the passed in workflow parentId
+ */
+public class WorkflowJobsGetFromWorkflowParentIdJPAExecutor implements 
JPAExecutor<List<String>> {
+
+    private String parentId;
+    private int limit;
+    private int offset;
+
+    public WorkflowJobsGetFromWorkflowParentIdJPAExecutor(String parentId, int 
limit) {
+        this(parentId, 0, limit);
+    }
+
+    public WorkflowJobsGetFromWorkflowParentIdJPAExecutor(String parentId, int 
offset, int limit) {
+        this.parentId = parentId;
+        this.offset = offset;
+        this.limit = limit;
+    }
+
+    @Override
+    public String getName() {
+        return "WorkflowJobsGetFromWorkflowParentIdJPAExecutor";
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public List<String> execute(EntityManager em) throws JPAExecutorException {
+        List<String> workflows = null;
+        try {
+            Query jobQ = 
em.createNamedQuery("GET_WORKFLOWS_WITH_WORKFLOW_PARENT_ID");
+            jobQ.setParameter("parentId", parentId);
+            jobQ.setMaxResults(limit);
+            jobQ.setFirstResult(offset);
+            workflows = jobQ.getResultList();
+        }
+        catch (Exception e) {
+            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
+        }
+        return workflows;
+    }
+
+}

Added: 
oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor.java?rev=1511513&view=auto
==============================================================================
--- 
oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor.java
 (added)
+++ 
oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor.java
 Wed Aug  7 21:53:50 2013
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.command.TestPurgeXCommand;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.workflow.WorkflowInstance;
+
+public class TestWorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor 
extends XDataTestCase {
+    Services services;
+    private String[] excludedServices = { 
"org.apache.oozie.service.StatusTransitService",
+            "org.apache.oozie.service.PauseTransitService", 
"org.apache.oozie.service.PurgeService",
+            "org.apache.oozie.service.CoordMaterializeTriggerService", 
"org.apache.oozie.service.RecoveryService" };
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        services = new Services();
+        setClassesToBeExcluded(services.getConf(), excludedServices);
+        services.init();
+        cleanUpDBTables();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        services.destroy();
+        super.tearDown();
+    }
+
+    public void testCount() throws Exception {
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+
+        CoordinatorJobBean coordJob = 
addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+        String coordJobId = coordJob.getId();
+        int days = 1;
+        assertEquals(0, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId)));
+
+        WorkflowJobBean wfJob1 = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED, coordJobId);
+        wfJob1 = TestPurgeXCommand.setEndTime(wfJob1, "2009-12-01T01:00Z");
+        CoordinatorActionBean coordAction1 = 
addRecordToCoordActionTable(coordJobId, 1, CoordinatorAction.Status.SUCCEEDED,
+                "coord-action-get.xml", wfJob1.getId(), wfJob1.getStatusStr(), 
0);
+        days = 1;
+        assertEquals(0, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId)));
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob1.getEndTime());
+        assertEquals(1, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId)));
+
+        WorkflowJobBean wfJob2 = 
addRecordToWfJobTable(WorkflowJob.Status.FAILED, 
WorkflowInstance.Status.FAILED, coordJobId);
+        wfJob2 = TestPurgeXCommand.setEndTime(wfJob2, "2009-11-01T01:00Z");
+        CoordinatorActionBean coordAction2 = 
addRecordToCoordActionTable(coordJobId, 2, CoordinatorAction.Status.FAILED,
+                "coord-action-get.xml", wfJob2.getId(), wfJob2.getStatusStr(), 
0);
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob1.getEndTime());
+        assertEquals(1, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId)));
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob2.getEndTime());
+        assertEquals(2, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId)));
+
+        WorkflowJobBean wfJob3 = 
addRecordToWfJobTable(WorkflowJob.Status.KILLED, 
WorkflowInstance.Status.KILLED, coordJobId);
+        wfJob3 = TestPurgeXCommand.setEndTime(wfJob3, "2009-10-01T01:00Z");
+        CoordinatorActionBean coordAction3 = 
addRecordToCoordActionTable(coordJobId, 3, CoordinatorAction.Status.KILLED,
+                "coord-action-get.xml", wfJob3.getId(), wfJob3.getStatusStr(), 
0);
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob2.getEndTime());
+        assertEquals(2, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId)));
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob3.getEndTime());
+        assertEquals(3, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId)));
+
+        WorkflowJobBean wfJob4 = 
addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP, 
coordJobId);
+        wfJob4 = TestPurgeXCommand.setEndTime(wfJob4, "2009-09-01T01:00Z");
+        CoordinatorActionBean coordAction4 = 
addRecordToCoordActionTable(coordJobId, 4, CoordinatorAction.Status.RUNNING,
+                "coord-action-get.xml", wfJob4.getId(), wfJob4.getStatusStr(), 
0);
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob3.getEndTime());
+        assertEquals(4, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId)));
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob4.getEndTime());
+        assertEquals(4, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId)));
+
+        WorkflowJobBean wfJob5 = 
addRecordToWfJobTable(WorkflowJob.Status.RUNNING, 
WorkflowInstance.Status.RUNNING, coordJobId);
+        wfJob5 = TestPurgeXCommand.setEndTime(wfJob5, "2009-08-01T01:00Z");
+        CoordinatorActionBean coordAction5 = 
addRecordToCoordActionTable(coordJobId, 5, CoordinatorAction.Status.RUNNING,
+                "coord-action-get.xml", wfJob5.getId(), wfJob5.getStatusStr(), 
0);
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob4.getEndTime());
+        assertEquals(5, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId)));
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob5.getEndTime());
+        assertEquals(5, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId)));
+
+        WorkflowJobBean wfJob6 = 
addRecordToWfJobTable(WorkflowJob.Status.SUSPENDED, 
WorkflowInstance.Status.SUSPENDED, coordJobId);
+        wfJob6 = TestPurgeXCommand.setEndTime(wfJob6, "2009-07-01T01:00Z");
+        CoordinatorActionBean coordAction6 = 
addRecordToCoordActionTable(coordJobId, 6, CoordinatorAction.Status.SUSPENDED,
+                "coord-action-get.xml", wfJob6.getId(), wfJob6.getStatusStr(), 
0);
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob5.getEndTime());
+        assertEquals(6, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId)));
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob6.getEndTime());
+        assertEquals(6, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId)));
+    }
+}

Added: 
oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java?rev=1511513&view=auto
==============================================================================
--- 
oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java
 (added)
+++ 
oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java
 Wed Aug  7 21:53:50 2013
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.command.TestPurgeXCommand;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.workflow.WorkflowInstance;
+
+public class TestWorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor 
extends XDataTestCase {
+    Services services;
+    private String[] excludedServices = { 
"org.apache.oozie.service.StatusTransitService",
+            "org.apache.oozie.service.PauseTransitService", 
"org.apache.oozie.service.PurgeService",
+            "org.apache.oozie.service.CoordMaterializeTriggerService", 
"org.apache.oozie.service.RecoveryService" };
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        services = new Services();
+        setClassesToBeExcluded(services.getConf(), excludedServices);
+        services.init();
+        cleanUpDBTables();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        services.destroy();
+        super.tearDown();
+    }
+
+    public void testCount() throws Exception {
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+
+        WorkflowJobBean wfJob = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED);
+        String wfJobId = wfJob.getId();
+        int days = 1;
+        assertEquals(0, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId)));
+
+        WorkflowJobBean subwfJob1 = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED, wfJobId);
+        subwfJob1 = TestPurgeXCommand.setEndTime(subwfJob1, 
"2009-12-01T01:00Z");
+        days = 1;
+        assertEquals(0, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId)));
+        days = 
TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob1.getEndTime());
+        assertEquals(1, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId)));
+
+        WorkflowJobBean subwfJob2 = 
addRecordToWfJobTable(WorkflowJob.Status.FAILED, 
WorkflowInstance.Status.FAILED, wfJobId);
+        subwfJob2 = TestPurgeXCommand.setEndTime(subwfJob2, 
"2009-11-01T01:00Z");
+        days = 
TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob1.getEndTime());
+        assertEquals(1, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId)));
+        days = 
TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob2.getEndTime());
+        assertEquals(2, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId)));
+
+        WorkflowJobBean subwfJob3 = 
addRecordToWfJobTable(WorkflowJob.Status.KILLED, 
WorkflowInstance.Status.KILLED, wfJobId);
+        subwfJob3 = TestPurgeXCommand.setEndTime(subwfJob3, 
"2009-10-01T01:00Z");
+        days = 
TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob2.getEndTime());
+        assertEquals(2, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId)));
+        days = 
TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob3.getEndTime());
+        assertEquals(3, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId)));
+
+        WorkflowJobBean subwfJob4 = 
addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP, 
wfJobId);
+        subwfJob4 = TestPurgeXCommand.setEndTime(subwfJob4, 
"2009-09-01T01:00Z");
+        days = 
TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob3.getEndTime());
+        assertEquals(4, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId)));
+        days = 
TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob4.getEndTime());
+        assertEquals(4, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId)));
+
+        WorkflowJobBean subwfJob5 = 
addRecordToWfJobTable(WorkflowJob.Status.RUNNING, 
WorkflowInstance.Status.RUNNING, wfJobId);
+        subwfJob5 = TestPurgeXCommand.setEndTime(subwfJob5, 
"2009-08-01T01:00Z");
+        days = 
TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob4.getEndTime());
+        assertEquals(5, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId)));
+        days = 
TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob5.getEndTime());
+        assertEquals(5, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId)));
+
+        WorkflowJobBean subwfJob6 = 
addRecordToWfJobTable(WorkflowJob.Status.SUSPENDED, 
WorkflowInstance.Status.SUSPENDED, wfJobId);
+        subwfJob6 = TestPurgeXCommand.setEndTime(subwfJob6, 
"2009-07-01T01:00Z");
+        days = 
TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob5.getEndTime());
+        assertEquals(6, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId)));
+        days = 
TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob6.getEndTime());
+        assertEquals(6, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId)));
+    }
+}

Added: 
oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsGetFromCoordParentIdJPAExecutor.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsGetFromCoordParentIdJPAExecutor.java?rev=1511513&view=auto
==============================================================================
--- 
oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsGetFromCoordParentIdJPAExecutor.java
 (added)
+++ 
oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsGetFromCoordParentIdJPAExecutor.java
 Wed Aug  7 21:53:50 2013
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.workflow.WorkflowInstance;
+
+public class TestWorkflowJobsGetFromCoordParentIdJPAExecutor extends 
XDataTestCase {
+    Services services;
+    private String[] excludedServices = { 
"org.apache.oozie.service.StatusTransitService",
+            "org.apache.oozie.service.PauseTransitService", 
"org.apache.oozie.service.PurgeService",
+            "org.apache.oozie.service.CoordMaterializeTriggerService", 
"org.apache.oozie.service.RecoveryService" };
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        services = new Services();
+        setClassesToBeExcluded(services.getConf(), excludedServices);
+        services.init();
+        cleanUpDBTables();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        services.destroy();
+        super.tearDown();
+    }
+
+    public void testGetCoordinatorParent() throws Exception {
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+
+        CoordinatorJobBean coordJobA = 
addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+        CoordinatorJobBean coordJobB = 
addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+        WorkflowJobBean wfJobA1 = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED);
+        WorkflowJobBean wfJobA2 = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED);
+        WorkflowJobBean wfJobB = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED);
+        WorkflowActionBean wfActionA1 = 
addRecordToWfActionTable(wfJobA1.getId(), "1", WorkflowAction.Status.OK);
+        WorkflowActionBean wfActionA2 = 
addRecordToWfActionTable(wfJobA2.getId(), "1", WorkflowAction.Status.OK);
+        WorkflowActionBean wfActionB = 
addRecordToWfActionTable(wfJobB.getId(), "1", WorkflowAction.Status.OK);
+        CoordinatorActionBean coordActionA1 = 
addRecordToCoordActionTable(coordJobA.getId(), 1, 
CoordinatorAction.Status.SUCCEEDED,
+                "coord-action-get.xml", wfJobA1.getId(), "SUCCEEDED", 0);
+        CoordinatorActionBean coordActionA2 = 
addRecordToCoordActionTable(coordJobA.getId(), 2, 
CoordinatorAction.Status.SUCCEEDED,
+                "coord-action-get.xml", wfJobA2.getId(), "SUCCEEDED", 0);
+        CoordinatorActionBean coordActionB = 
addRecordToCoordActionTable(coordJobB.getId(), 1, 
CoordinatorAction.Status.SUCCEEDED,
+                "coord-action-get.xml", wfJobB.getId(), "SUCCEEDED", 0);
+
+        List<String> children = new ArrayList<String>();
+        children.addAll(jpaService.execute(new 
WorkflowJobsGetFromCoordParentIdJPAExecutor(coordJobA.getId(), 10)));
+        checkChildren(children, wfJobA1.getId(), wfJobA2.getId());
+
+        children = new ArrayList<String>();
+        children.addAll(jpaService.execute(new 
WorkflowJobsGetFromCoordParentIdJPAExecutor(coordJobB.getId(), 10)));
+        checkChildren(children, wfJobB.getId());
+    }
+
+    public void testGetWorkflowParentTooMany() throws Exception {
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+
+        CoordinatorJobBean coordJob = 
addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+        WorkflowJobBean wfJob1 = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED,
+                coordJob.getId());
+        WorkflowJobBean wfJob2 = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED,
+                coordJob.getId());
+        WorkflowJobBean wfJob3 = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED,
+                coordJob.getId());
+        WorkflowJobBean wfJob4 = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED,
+                coordJob.getId());
+        WorkflowJobBean wfJob5 = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED,
+                coordJob.getId());
+        WorkflowActionBean wfAction1 = 
addRecordToWfActionTable(wfJob1.getId(), "1", WorkflowAction.Status.OK);
+        WorkflowActionBean wfAction2 = 
addRecordToWfActionTable(wfJob2.getId(), "2", WorkflowAction.Status.OK);
+        WorkflowActionBean wfAction3 = 
addRecordToWfActionTable(wfJob3.getId(), "2", WorkflowAction.Status.OK);
+        WorkflowActionBean wfAction4 = 
addRecordToWfActionTable(wfJob4.getId(), "1", WorkflowAction.Status.OK);
+        WorkflowActionBean wfAction5 = 
addRecordToWfActionTable(wfJob5.getId(), "1", WorkflowAction.Status.OK);
+        CoordinatorActionBean coordAction1 = 
addRecordToCoordActionTable(coordJob.getId(), 1, 
CoordinatorAction.Status.SUCCEEDED,
+                "coord-action-get.xml", wfJob1.getId(), "SUCCEEDED", 0);
+        CoordinatorActionBean coordAction2 = 
addRecordToCoordActionTable(coordJob.getId(), 2, 
CoordinatorAction.Status.SUCCEEDED,
+                "coord-action-get.xml", wfJob2.getId(), "SUCCEEDED", 0);
+        CoordinatorActionBean coordAction3 = 
addRecordToCoordActionTable(coordJob.getId(), 3, 
CoordinatorAction.Status.SUCCEEDED,
+                "coord-action-get.xml", wfJob3.getId(), "SUCCEEDED", 0);
+        CoordinatorActionBean coordAction4 = 
addRecordToCoordActionTable(coordJob.getId(), 4, 
CoordinatorAction.Status.SUCCEEDED,
+                "coord-action-get.xml", wfJob4.getId(), "SUCCEEDED", 0);
+        CoordinatorActionBean coordAction5 = 
addRecordToCoordActionTable(coordJob.getId(), 5, 
CoordinatorAction.Status.SUCCEEDED,
+                "coord-action-get.xml", wfJob5.getId(), "SUCCEEDED", 0);
+
+        List<String> children = new ArrayList<String>();
+        // Get the first 3
+        children.addAll(jpaService.execute(new 
WorkflowJobsGetFromCoordParentIdJPAExecutor(coordJob.getId(), 3)));
+        assertEquals(3, children.size());
+        // Get the next 3 (though there's only 2 more)
+        children.addAll(jpaService.execute(new 
WorkflowJobsGetFromCoordParentIdJPAExecutor(coordJob.getId(), 3, 3)));
+        assertEquals(5, children.size());
+        checkChildren(children, wfJob1.getId(), wfJob2.getId(), 
wfJob3.getId(), wfJob4.getId(), wfJob5.getId());
+    }
+
+    private void checkChildren(List<String> children, String... wfJobIDs) {
+        assertEquals(wfJobIDs.length, children.size());
+        Arrays.sort(wfJobIDs);
+        Collections.sort(children);
+
+        for (int i = 0; i < wfJobIDs.length; i++) {
+            assertEquals(wfJobIDs[i], children.get(i));
+        }
+    }
+}

Added: 
oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsGetFromWorkflowParentIdJPAExecutor.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsGetFromWorkflowParentIdJPAExecutor.java?rev=1511513&view=auto
==============================================================================
--- 
oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsGetFromWorkflowParentIdJPAExecutor.java
 (added)
+++ 
oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsGetFromWorkflowParentIdJPAExecutor.java
 Wed Aug  7 21:53:50 2013
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.workflow.WorkflowInstance;
+
+public class TestWorkflowJobsGetFromWorkflowParentIdJPAExecutor extends 
XDataTestCase {
+    Services services;
+    private String[] excludedServices = { 
"org.apache.oozie.service.StatusTransitService",
+            "org.apache.oozie.service.PauseTransitService", 
"org.apache.oozie.service.PurgeService",
+            "org.apache.oozie.service.CoordMaterializeTriggerService", 
"org.apache.oozie.service.RecoveryService" };
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        services = new Services();
+        setClassesToBeExcluded(services.getConf(), excludedServices);
+        services.init();
+        cleanUpDBTables();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        services.destroy();
+        super.tearDown();
+    }
+
+    public void testGetWorkflowParent() throws Exception {
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+
+        WorkflowJobBean wfJobA = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED);
+        WorkflowJobBean wfJobB = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED);
+        WorkflowJobBean subwfJobA1 = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED,
+                wfJobA.getId());
+        WorkflowJobBean subwfJobA2 = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED,
+                wfJobA.getId());
+        WorkflowJobBean subwfJobB = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED,
+                wfJobB.getId());
+        WorkflowActionBean wfActionA = 
addRecordToWfActionTable(wfJobA.getId(), "1", WorkflowAction.Status.OK);
+        WorkflowActionBean wfActionB = 
addRecordToWfActionTable(wfJobB.getId(), "1", WorkflowAction.Status.OK);
+        WorkflowActionBean subwfActionA1 = 
addRecordToWfActionTable(subwfJobA1.getId(), "1", WorkflowAction.Status.OK);
+        WorkflowActionBean subwfActionA2 = 
addRecordToWfActionTable(subwfJobA2.getId(), "1", WorkflowAction.Status.OK);
+        WorkflowActionBean subwfActionB = 
addRecordToWfActionTable(subwfJobB.getId(), "1", WorkflowAction.Status.OK);
+
+        List<String> children = new ArrayList<String>();
+        children.addAll(jpaService.execute(new 
WorkflowJobsGetFromWorkflowParentIdJPAExecutor(wfJobA.getId(), 10)));
+        checkChildren(children, subwfJobA1.getId(), subwfJobA2.getId());
+
+        children = new ArrayList<String>();
+        children.addAll(jpaService.execute(new 
WorkflowJobsGetFromWorkflowParentIdJPAExecutor(wfJobB.getId(), 10)));
+        checkChildren(children, subwfJobB.getId());
+    }
+
+    public void testGetCoordinatorParentTooMany() throws Exception {
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+
+        WorkflowJobBean wfJob = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED);
+        WorkflowJobBean subwfJob1 = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED,
+                wfJob.getId());
+        WorkflowJobBean subwfJob2 = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED,
+                wfJob.getId());
+        WorkflowJobBean subwfJob3 = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED,
+                wfJob.getId());
+        WorkflowJobBean subwfJob4 = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED,
+                wfJob.getId());
+        WorkflowJobBean subwfJob5 = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED,
+                wfJob.getId());
+        WorkflowActionBean wfAction1 = addRecordToWfActionTable(wfJob.getId(), 
"1", WorkflowAction.Status.OK);
+        WorkflowActionBean wfAction2 = addRecordToWfActionTable(wfJob.getId(), 
"2", WorkflowAction.Status.OK);
+        WorkflowActionBean wfAction3 = addRecordToWfActionTable(wfJob.getId(), 
"3", WorkflowAction.Status.OK);
+        WorkflowActionBean wfAction4 = addRecordToWfActionTable(wfJob.getId(), 
"4", WorkflowAction.Status.OK);
+        WorkflowActionBean wfAction5 = addRecordToWfActionTable(wfJob.getId(), 
"5", WorkflowAction.Status.OK);
+        WorkflowActionBean subwfAction1 = 
addRecordToWfActionTable(subwfJob1.getId(), "1", WorkflowAction.Status.OK);
+        WorkflowActionBean subwfAction2 = 
addRecordToWfActionTable(subwfJob2.getId(), "1", WorkflowAction.Status.OK);
+        WorkflowActionBean subwfAction3 = 
addRecordToWfActionTable(subwfJob3.getId(), "1", WorkflowAction.Status.OK);
+        WorkflowActionBean subwfAction4 = 
addRecordToWfActionTable(subwfJob4.getId(), "1", WorkflowAction.Status.OK);
+        WorkflowActionBean subwfAction5 = 
addRecordToWfActionTable(subwfJob5.getId(), "1", WorkflowAction.Status.OK);
+
+        List<String> children = new ArrayList<String>();
+        // Get the first 3
+        children.addAll(jpaService.execute(new 
WorkflowJobsGetFromWorkflowParentIdJPAExecutor(wfJob.getId(), 3)));
+        assertEquals(3, children.size());
+        // Get the next 3 (though there's only 2 more)
+        children.addAll(jpaService.execute(new 
WorkflowJobsGetFromWorkflowParentIdJPAExecutor(wfJob.getId(), 3, 3)));
+        assertEquals(5, children.size());
+        checkChildren(children, subwfJob1.getId(), subwfJob2.getId(), 
subwfJob3.getId(), subwfJob4.getId(), subwfJob5.getId());
+    }
+
+    private void checkChildren(List<String> children, String... wfJobIDs) {
+        assertEquals(wfJobIDs.length, children.size());
+        Arrays.sort(wfJobIDs);
+        Collections.sort(children);
+
+        for (int i = 0; i < wfJobIDs.length; i++) {
+            assertEquals(wfJobIDs[i], children.get(i));
+        }
+    }
+}

Modified: 
oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java?rev=1511513&r1=1511512&r2=1511513&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java 
(original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java Wed 
Aug  7 21:53:50 2013
@@ -545,7 +545,7 @@ public abstract class XDataTestCase exte
 
             if (wfId != null) {
                 WorkflowJobBean wfJob = jpaService.execute(new 
WorkflowJobGetJPAExecutor(wfId));
-                wfJob.setParentId(jobId);
+                wfJob.setParentId(action.getId());
                 jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
             }
         }

Modified: oozie/trunk/release-log.txt
URL: 
http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1511513&r1=1511512&r2=1511513&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Wed Aug  7 21:53:50 2013
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1449 Coordinator Workflow parent relationship is broken for purge 
service (rkanter)
 OOZIE-1458 If a Credentials type is not defined, Oozie should say something 
(rkanter)
 OOZIE-1425 param checker should validate cron syntax (bowenzhangusa via 
rkanter)
 OOZIE-1453 Change "frequency" to string in SyncCoordAction.java (bowenzhangusa 
via rkanter)


Reply via email to