Author: rkanter
Date: Wed Aug 7 21:53:50 2013
New Revision: 1511513
URL: http://svn.apache.org/r1511513
Log:
OOZIE-1449 Coordinator Workflow parent relationship is broken for purge service
(rkanter)
Added:
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsGetFromCoordParentIdJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsGetFromWorkflowParentIdJPAExecutor.java
oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor.java
oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java
oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsGetFromCoordParentIdJPAExecutor.java
oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsGetFromWorkflowParentIdJPAExecutor.java
Removed:
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsGetFromParentIdJPAExecutor.java
oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromParentIdJPAExecutor.java
oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsGetFromParentIdJPAExecutor.java
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowJobBean.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java
oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
oozie/trunk/release-log.txt
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowJobBean.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowJobBean.java?rev=1511513&r1=1511512&r2=1511513&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowJobBean.java
(original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowJobBean.java Wed
Aug 7 21:53:50 2013
@@ -71,9 +71,13 @@ import org.apache.openjpa.persistence.jd
@NamedQuery(name = "GET_WORKFLOWS_COUNT_WITH_STATUS_IN_LAST_N_SECS", query
= "select count(w) from WorkflowJobBean w where w.status = :status and
w.lastModifiedTimestamp > :lastModTime"),
- @NamedQuery(name = "GET_WORKFLOWS_WITH_PARENT_ID", query = "select w.id
from WorkflowJobBean w where w.parentId = :parentId"),
+ @NamedQuery(name = "GET_WORKFLOWS_WITH_WORKFLOW_PARENT_ID", query =
"select w.id from WorkflowJobBean w where w.parentId = :parentId"),
- @NamedQuery(name =
"GET_WORKFLOWS_COUNT_WITH_PARENT_ID_NOT_READY_FOR_PURGE", query = "select
count(w) from WorkflowJobBean w where w.parentId = :parentId and (w.status =
'PREP' OR w.status = 'RUNNING' OR w.status = 'SUSPENDED' OR w.endTimestamp >=
:endTime)"),
+ @NamedQuery(name = "GET_WORKFLOWS_WITH_COORD_PARENT_ID", query = "select
w.id from WorkflowJobBean w where w.parentId like :parentId"), // when setting
parentId parameter, make sure to append a '%' (percent symbol) at the end (e.g.
0000004-130709155224435-oozie-rkan-C%")
+
+ @NamedQuery(name =
"GET_WORKFLOWS_COUNT_WITH_WORKFLOW_PARENT_ID_NOT_READY_FOR_PURGE", query =
"select count(w) from WorkflowJobBean w where w.parentId = :parentId and
(w.status = 'PREP' OR w.status = 'RUNNING' OR w.status = 'SUSPENDED' OR
w.endTimestamp >= :endTime)"),
+
+ @NamedQuery(name =
"GET_WORKFLOWS_COUNT_WITH_COORD_PARENT_ID_NOT_READY_FOR_PURGE", query = "select
count(w) from WorkflowJobBean w where w.parentId like :parentId and (w.status =
'PREP' OR w.status = 'RUNNING' OR w.status = 'SUSPENDED' OR w.endTimestamp >=
:endTime)"), // when setting parentId parameter, make sure to append a '%'
(percent symbol) at the end (e.g. 0000004-130709155224435-oozie-rkan-C%")
@NamedQuery(name = "GET_WORKFLOW_FOR_USER", query = "select w.user from
WorkflowJobBean w where w.id = :id")
})
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java?rev=1511513&r1=1511512&r2=1511513&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java
(original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java
Wed Aug 7 21:53:50 2013
@@ -31,10 +31,12 @@ import org.apache.oozie.executor.jpa.Coo
import org.apache.oozie.executor.jpa.CoordJobsGetForPurgeJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobsGetFromParentIdJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
-import
org.apache.oozie.executor.jpa.WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor;
+import
org.apache.oozie.executor.jpa.WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor;
+import
org.apache.oozie.executor.jpa.WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobsDeleteJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobsGetFromParentIdJPAExecutor;
+import
org.apache.oozie.executor.jpa.WorkflowJobsGetFromWorkflowParentIdJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobsGetForPurgeJPAExecutor;
+import
org.apache.oozie.executor.jpa.WorkflowJobsGetFromCoordParentIdJPAExecutor;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
@@ -176,7 +178,7 @@ public class PurgeXCommand extends XComm
for (String wfId : wfs) {
// We only purge the workflow and its children if they are all
ready to be purged
long numChildrenNotReady = jpaService.execute(
- new
WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor(wfOlderThan, wfId));
+ new
WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(wfOlderThan, wfId));
if (numChildrenNotReady == 0) {
wfsToPurge.add(wfId);
// Get all of the direct children for this workflow
@@ -184,7 +186,8 @@ public class PurgeXCommand extends XComm
int size;
do {
size = children.size();
- children.addAll(jpaService.execute(new
WorkflowJobsGetFromParentIdJPAExecutor(wfId, children.size(), limit)));
+ children.addAll(jpaService.execute(
+ new
WorkflowJobsGetFromWorkflowParentIdJPAExecutor(wfId, children.size(), limit)));
} while (size != children.size());
subwfs.addAll(children);
}
@@ -206,7 +209,7 @@ public class PurgeXCommand extends XComm
for (String coordId : coords) {
// We only purge the coord and its children if they are all ready
to be purged
long numChildrenNotReady = jpaService.execute(
- new
WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor(wfOlderThan, coordId));
+ new
WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(wfOlderThan, coordId));
if (numChildrenNotReady == 0) {
coordsToPurge.add(coordId);
// Get all of the direct children for this coord
@@ -215,7 +218,7 @@ public class PurgeXCommand extends XComm
do {
size = children.size();
children.addAll(jpaService.execute(
- new
WorkflowJobsGetFromParentIdJPAExecutor(coordId, children.size(), limit)));
+ new
WorkflowJobsGetFromCoordParentIdJPAExecutor(coordId, children.size(), limit)));
} while (size != children.size());
wfsToPurge.addAll(children);
}
Added:
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor.java?rev=1511513&view=auto
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor.java
(added)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor.java
Wed Aug 7 21:53:50 2013
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.sql.Timestamp;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.ErrorCode;
+
+/**
+ * Count the number of Workflow children of a parent Coordinator that are not
ready to be purged
+ */
+public class WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor
implements JPAExecutor<Long> {
+
+ private static final long DAY_IN_MS = 24 * 60 * 60 * 1000;
+ private long olderThanDays;
+ private String parentId;
+
+ public WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(long
olderThanDays, String parentId) {
+ this.olderThanDays = olderThanDays;
+ this.parentId = parentId;
+ }
+
+ @Override
+ public String getName() {
+ return "WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor";
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Long execute(EntityManager em) throws JPAExecutorException {
+ Long count = 0L;
+ try {
+ Timestamp maxEndTime = new Timestamp(System.currentTimeMillis() -
(olderThanDays * DAY_IN_MS));
+ Query jobQ =
em.createNamedQuery("GET_WORKFLOWS_COUNT_WITH_COORD_PARENT_ID_NOT_READY_FOR_PURGE");
+ jobQ.setParameter("parentId", parentId + "%"); // The '%' is the
wildcard
+ jobQ.setParameter("endTime", maxEndTime);
+ count = (Long) jobQ.getSingleResult();
+ }
+ catch (Exception e) {
+ throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
+ }
+ return count;
+ }
+
+}
Added:
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java?rev=1511513&view=auto
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java
(added)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java
Wed Aug 7 21:53:50 2013
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.sql.Timestamp;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.ErrorCode;
+
+/**
+ * Count the number of Workflow children of a parent Workflow that are not
ready to be purged
+ */
+public class WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor
implements JPAExecutor<Long> {
+
+ private static final long DAY_IN_MS = 24 * 60 * 60 * 1000;
+ private long olderThanDays;
+ private String parentId;
+
+ public WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(long
olderThanDays, String parentId) {
+ this.olderThanDays = olderThanDays;
+ this.parentId = parentId;
+ }
+
+ @Override
+ public String getName() {
+ return "WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor";
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Long execute(EntityManager em) throws JPAExecutorException {
+ Long count = 0L;
+ try {
+ Timestamp maxEndTime = new Timestamp(System.currentTimeMillis() -
(olderThanDays * DAY_IN_MS));
+ Query jobQ =
em.createNamedQuery("GET_WORKFLOWS_COUNT_WITH_WORKFLOW_PARENT_ID_NOT_READY_FOR_PURGE");
+ jobQ.setParameter("parentId", parentId);
+ jobQ.setParameter("endTime", maxEndTime);
+ count = (Long) jobQ.getSingleResult();
+ }
+ catch (Exception e) {
+ throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
+ }
+ return count;
+ }
+
+}
Added:
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsGetFromCoordParentIdJPAExecutor.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsGetFromCoordParentIdJPAExecutor.java?rev=1511513&view=auto
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsGetFromCoordParentIdJPAExecutor.java
(added)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsGetFromCoordParentIdJPAExecutor.java
Wed Aug 7 21:53:50 2013
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.List;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.ErrorCode;
+
+/**
+ * Load the list of WorkflowJob with the passed in coordinator parentId. The
parent id field for a workflow with a coordinator
+ * parent is the id of the coordinator action, not the coordinator job. So,
we have to use a wildcard to match (coordinator action
+ * ids start with the coordinator job id).
+ */
+public class WorkflowJobsGetFromCoordParentIdJPAExecutor implements
JPAExecutor<List<String>> {
+
+ private String parentId;
+ private int limit;
+ private int offset;
+
+ public WorkflowJobsGetFromCoordParentIdJPAExecutor(String parentId, int
limit) {
+ this(parentId, 0, limit);
+ }
+
+ public WorkflowJobsGetFromCoordParentIdJPAExecutor(String parentId, int
offset, int limit) {
+ this.parentId = parentId;
+ this.offset = offset;
+ this.limit = limit;
+ }
+
+ @Override
+ public String getName() {
+ return "WorkflowJobsGetFromCoordParentIdJPAExecutor";
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public List<String> execute(EntityManager em) throws JPAExecutorException {
+ List<String> workflows = null;
+ try {
+ Query jobQ =
em.createNamedQuery("GET_WORKFLOWS_WITH_COORD_PARENT_ID");
+ jobQ.setParameter("parentId", parentId + "%"); // The '%' is the
wildcard
+ jobQ.setMaxResults(limit);
+ jobQ.setFirstResult(offset);
+ workflows = jobQ.getResultList();
+ }
+ catch (Exception e) {
+ throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
+ }
+ return workflows;
+ }
+
+}
Added:
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsGetFromWorkflowParentIdJPAExecutor.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsGetFromWorkflowParentIdJPAExecutor.java?rev=1511513&view=auto
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsGetFromWorkflowParentIdJPAExecutor.java
(added)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsGetFromWorkflowParentIdJPAExecutor.java
Wed Aug 7 21:53:50 2013
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.List;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.ErrorCode;
+
+/**
+ * Load the list of WorkflowJob with the passed in workflow parentId
+ */
+public class WorkflowJobsGetFromWorkflowParentIdJPAExecutor implements
JPAExecutor<List<String>> {
+
+ private String parentId;
+ private int limit;
+ private int offset;
+
+ public WorkflowJobsGetFromWorkflowParentIdJPAExecutor(String parentId, int
limit) {
+ this(parentId, 0, limit);
+ }
+
+ public WorkflowJobsGetFromWorkflowParentIdJPAExecutor(String parentId, int
offset, int limit) {
+ this.parentId = parentId;
+ this.offset = offset;
+ this.limit = limit;
+ }
+
+ @Override
+ public String getName() {
+ return "WorkflowJobsGetFromWorkflowParentIdJPAExecutor";
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public List<String> execute(EntityManager em) throws JPAExecutorException {
+ List<String> workflows = null;
+ try {
+ Query jobQ =
em.createNamedQuery("GET_WORKFLOWS_WITH_WORKFLOW_PARENT_ID");
+ jobQ.setParameter("parentId", parentId);
+ jobQ.setMaxResults(limit);
+ jobQ.setFirstResult(offset);
+ workflows = jobQ.getResultList();
+ }
+ catch (Exception e) {
+ throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
+ }
+ return workflows;
+ }
+
+}
Added:
oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor.java?rev=1511513&view=auto
==============================================================================
---
oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor.java
(added)
+++
oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor.java
Wed Aug 7 21:53:50 2013
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.command.TestPurgeXCommand;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.workflow.WorkflowInstance;
+
+public class TestWorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor
extends XDataTestCase {
+ Services services;
+ private String[] excludedServices = {
"org.apache.oozie.service.StatusTransitService",
+ "org.apache.oozie.service.PauseTransitService",
"org.apache.oozie.service.PurgeService",
+ "org.apache.oozie.service.CoordMaterializeTriggerService",
"org.apache.oozie.service.RecoveryService" };
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ services = new Services();
+ setClassesToBeExcluded(services.getConf(), excludedServices);
+ services.init();
+ cleanUpDBTables();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ services.destroy();
+ super.tearDown();
+ }
+
+ public void testCount() throws Exception {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+
+ CoordinatorJobBean coordJob =
addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+ String coordJobId = coordJob.getId();
+ int days = 1;
+ assertEquals(0, (long) jpaService.execute(new
WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId)));
+
+ WorkflowJobBean wfJob1 =
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED,
WorkflowInstance.Status.SUCCEEDED, coordJobId);
+ wfJob1 = TestPurgeXCommand.setEndTime(wfJob1, "2009-12-01T01:00Z");
+ CoordinatorActionBean coordAction1 =
addRecordToCoordActionTable(coordJobId, 1, CoordinatorAction.Status.SUCCEEDED,
+ "coord-action-get.xml", wfJob1.getId(), wfJob1.getStatusStr(),
0);
+ days = 1;
+ assertEquals(0, (long) jpaService.execute(new
WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId)));
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob1.getEndTime());
+ assertEquals(1, (long) jpaService.execute(new
WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId)));
+
+ WorkflowJobBean wfJob2 =
addRecordToWfJobTable(WorkflowJob.Status.FAILED,
WorkflowInstance.Status.FAILED, coordJobId);
+ wfJob2 = TestPurgeXCommand.setEndTime(wfJob2, "2009-11-01T01:00Z");
+ CoordinatorActionBean coordAction2 =
addRecordToCoordActionTable(coordJobId, 2, CoordinatorAction.Status.FAILED,
+ "coord-action-get.xml", wfJob2.getId(), wfJob2.getStatusStr(),
0);
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob1.getEndTime());
+ assertEquals(1, (long) jpaService.execute(new
WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId)));
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob2.getEndTime());
+ assertEquals(2, (long) jpaService.execute(new
WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId)));
+
+ WorkflowJobBean wfJob3 =
addRecordToWfJobTable(WorkflowJob.Status.KILLED,
WorkflowInstance.Status.KILLED, coordJobId);
+ wfJob3 = TestPurgeXCommand.setEndTime(wfJob3, "2009-10-01T01:00Z");
+ CoordinatorActionBean coordAction3 =
addRecordToCoordActionTable(coordJobId, 3, CoordinatorAction.Status.KILLED,
+ "coord-action-get.xml", wfJob3.getId(), wfJob3.getStatusStr(),
0);
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob2.getEndTime());
+ assertEquals(2, (long) jpaService.execute(new
WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId)));
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob3.getEndTime());
+ assertEquals(3, (long) jpaService.execute(new
WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId)));
+
+ WorkflowJobBean wfJob4 =
addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP,
coordJobId);
+ wfJob4 = TestPurgeXCommand.setEndTime(wfJob4, "2009-09-01T01:00Z");
+ CoordinatorActionBean coordAction4 =
addRecordToCoordActionTable(coordJobId, 4, CoordinatorAction.Status.RUNNING,
+ "coord-action-get.xml", wfJob4.getId(), wfJob4.getStatusStr(),
0);
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob3.getEndTime());
+ assertEquals(4, (long) jpaService.execute(new
WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId)));
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob4.getEndTime());
+ assertEquals(4, (long) jpaService.execute(new
WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId)));
+
+ WorkflowJobBean wfJob5 =
addRecordToWfJobTable(WorkflowJob.Status.RUNNING,
WorkflowInstance.Status.RUNNING, coordJobId);
+ wfJob5 = TestPurgeXCommand.setEndTime(wfJob5, "2009-08-01T01:00Z");
+ CoordinatorActionBean coordAction5 =
addRecordToCoordActionTable(coordJobId, 5, CoordinatorAction.Status.RUNNING,
+ "coord-action-get.xml", wfJob5.getId(), wfJob5.getStatusStr(),
0);
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob4.getEndTime());
+ assertEquals(5, (long) jpaService.execute(new
WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId)));
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob5.getEndTime());
+ assertEquals(5, (long) jpaService.execute(new
WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId)));
+
+ WorkflowJobBean wfJob6 =
addRecordToWfJobTable(WorkflowJob.Status.SUSPENDED,
WorkflowInstance.Status.SUSPENDED, coordJobId);
+ wfJob6 = TestPurgeXCommand.setEndTime(wfJob6, "2009-07-01T01:00Z");
+ CoordinatorActionBean coordAction6 =
addRecordToCoordActionTable(coordJobId, 6, CoordinatorAction.Status.SUSPENDED,
+ "coord-action-get.xml", wfJob6.getId(), wfJob6.getStatusStr(),
0);
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob5.getEndTime());
+ assertEquals(6, (long) jpaService.execute(new
WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId)));
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob6.getEndTime());
+ assertEquals(6, (long) jpaService.execute(new
WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId)));
+ }
+}
Added:
oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java?rev=1511513&view=auto
==============================================================================
---
oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java
(added)
+++
oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java
Wed Aug 7 21:53:50 2013
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.command.TestPurgeXCommand;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.workflow.WorkflowInstance;
+
+public class TestWorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor
extends XDataTestCase {
+ Services services;
+ private String[] excludedServices = {
"org.apache.oozie.service.StatusTransitService",
+ "org.apache.oozie.service.PauseTransitService",
"org.apache.oozie.service.PurgeService",
+ "org.apache.oozie.service.CoordMaterializeTriggerService",
"org.apache.oozie.service.RecoveryService" };
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ services = new Services();
+ setClassesToBeExcluded(services.getConf(), excludedServices);
+ services.init();
+ cleanUpDBTables();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ services.destroy();
+ super.tearDown();
+ }
+
+ public void testCount() throws Exception {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+
+ WorkflowJobBean wfJob =
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED,
WorkflowInstance.Status.SUCCEEDED);
+ String wfJobId = wfJob.getId();
+ int days = 1;
+ assertEquals(0, (long) jpaService.execute(new
WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId)));
+
+ WorkflowJobBean subwfJob1 =
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED,
WorkflowInstance.Status.SUCCEEDED, wfJobId);
+ subwfJob1 = TestPurgeXCommand.setEndTime(subwfJob1,
"2009-12-01T01:00Z");
+ days = 1;
+ assertEquals(0, (long) jpaService.execute(new
WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId)));
+ days =
TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob1.getEndTime());
+ assertEquals(1, (long) jpaService.execute(new
WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId)));
+
+ WorkflowJobBean subwfJob2 =
addRecordToWfJobTable(WorkflowJob.Status.FAILED,
WorkflowInstance.Status.FAILED, wfJobId);
+ subwfJob2 = TestPurgeXCommand.setEndTime(subwfJob2,
"2009-11-01T01:00Z");
+ days =
TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob1.getEndTime());
+ assertEquals(1, (long) jpaService.execute(new
WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId)));
+ days =
TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob2.getEndTime());
+ assertEquals(2, (long) jpaService.execute(new
WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId)));
+
+ WorkflowJobBean subwfJob3 =
addRecordToWfJobTable(WorkflowJob.Status.KILLED,
WorkflowInstance.Status.KILLED, wfJobId);
+ subwfJob3 = TestPurgeXCommand.setEndTime(subwfJob3,
"2009-10-01T01:00Z");
+ days =
TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob2.getEndTime());
+ assertEquals(2, (long) jpaService.execute(new
WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId)));
+ days =
TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob3.getEndTime());
+ assertEquals(3, (long) jpaService.execute(new
WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId)));
+
+ WorkflowJobBean subwfJob4 =
addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP,
wfJobId);
+ subwfJob4 = TestPurgeXCommand.setEndTime(subwfJob4,
"2009-09-01T01:00Z");
+ days =
TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob3.getEndTime());
+ assertEquals(4, (long) jpaService.execute(new
WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId)));
+ days =
TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob4.getEndTime());
+ assertEquals(4, (long) jpaService.execute(new
WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId)));
+
+ WorkflowJobBean subwfJob5 =
addRecordToWfJobTable(WorkflowJob.Status.RUNNING,
WorkflowInstance.Status.RUNNING, wfJobId);
+ subwfJob5 = TestPurgeXCommand.setEndTime(subwfJob5,
"2009-08-01T01:00Z");
+ days =
TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob4.getEndTime());
+ assertEquals(5, (long) jpaService.execute(new
WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId)));
+ days =
TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob5.getEndTime());
+ assertEquals(5, (long) jpaService.execute(new
WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId)));
+
+ WorkflowJobBean subwfJob6 =
addRecordToWfJobTable(WorkflowJob.Status.SUSPENDED,
WorkflowInstance.Status.SUSPENDED, wfJobId);
+ subwfJob6 = TestPurgeXCommand.setEndTime(subwfJob6,
"2009-07-01T01:00Z");
+ days =
TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob5.getEndTime());
+ assertEquals(6, (long) jpaService.execute(new
WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId)));
+ days =
TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob6.getEndTime());
+ assertEquals(6, (long) jpaService.execute(new
WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId)));
+ }
+}
Added:
oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsGetFromCoordParentIdJPAExecutor.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsGetFromCoordParentIdJPAExecutor.java?rev=1511513&view=auto
==============================================================================
---
oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsGetFromCoordParentIdJPAExecutor.java
(added)
+++
oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsGetFromCoordParentIdJPAExecutor.java
Wed Aug 7 21:53:50 2013
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.workflow.WorkflowInstance;
+
+public class TestWorkflowJobsGetFromCoordParentIdJPAExecutor extends
XDataTestCase {
+ Services services;
+ private String[] excludedServices = {
"org.apache.oozie.service.StatusTransitService",
+ "org.apache.oozie.service.PauseTransitService",
"org.apache.oozie.service.PurgeService",
+ "org.apache.oozie.service.CoordMaterializeTriggerService",
"org.apache.oozie.service.RecoveryService" };
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ services = new Services();
+ setClassesToBeExcluded(services.getConf(), excludedServices);
+ services.init();
+ cleanUpDBTables();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ services.destroy();
+ super.tearDown();
+ }
+
+ public void testGetCoordinatorParent() throws Exception {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+
+ CoordinatorJobBean coordJobA =
addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+ CoordinatorJobBean coordJobB =
addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+ WorkflowJobBean wfJobA1 =
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED,
WorkflowInstance.Status.SUCCEEDED);
+ WorkflowJobBean wfJobA2 =
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED,
WorkflowInstance.Status.SUCCEEDED);
+ WorkflowJobBean wfJobB =
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED,
WorkflowInstance.Status.SUCCEEDED);
+ WorkflowActionBean wfActionA1 =
addRecordToWfActionTable(wfJobA1.getId(), "1", WorkflowAction.Status.OK);
+ WorkflowActionBean wfActionA2 =
addRecordToWfActionTable(wfJobA2.getId(), "1", WorkflowAction.Status.OK);
+ WorkflowActionBean wfActionB =
addRecordToWfActionTable(wfJobB.getId(), "1", WorkflowAction.Status.OK);
+ CoordinatorActionBean coordActionA1 =
addRecordToCoordActionTable(coordJobA.getId(), 1,
CoordinatorAction.Status.SUCCEEDED,
+ "coord-action-get.xml", wfJobA1.getId(), "SUCCEEDED", 0);
+ CoordinatorActionBean coordActionA2 =
addRecordToCoordActionTable(coordJobA.getId(), 2,
CoordinatorAction.Status.SUCCEEDED,
+ "coord-action-get.xml", wfJobA2.getId(), "SUCCEEDED", 0);
+ CoordinatorActionBean coordActionB =
addRecordToCoordActionTable(coordJobB.getId(), 1,
CoordinatorAction.Status.SUCCEEDED,
+ "coord-action-get.xml", wfJobB.getId(), "SUCCEEDED", 0);
+
+ List<String> children = new ArrayList<String>();
+ children.addAll(jpaService.execute(new
WorkflowJobsGetFromCoordParentIdJPAExecutor(coordJobA.getId(), 10)));
+ checkChildren(children, wfJobA1.getId(), wfJobA2.getId());
+
+ children = new ArrayList<String>();
+ children.addAll(jpaService.execute(new
WorkflowJobsGetFromCoordParentIdJPAExecutor(coordJobB.getId(), 10)));
+ checkChildren(children, wfJobB.getId());
+ }
+
+ public void testGetWorkflowParentTooMany() throws Exception {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+
+ CoordinatorJobBean coordJob =
addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+ WorkflowJobBean wfJob1 =
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED,
WorkflowInstance.Status.SUCCEEDED,
+ coordJob.getId());
+ WorkflowJobBean wfJob2 =
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED,
WorkflowInstance.Status.SUCCEEDED,
+ coordJob.getId());
+ WorkflowJobBean wfJob3 =
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED,
WorkflowInstance.Status.SUCCEEDED,
+ coordJob.getId());
+ WorkflowJobBean wfJob4 =
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED,
WorkflowInstance.Status.SUCCEEDED,
+ coordJob.getId());
+ WorkflowJobBean wfJob5 =
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED,
WorkflowInstance.Status.SUCCEEDED,
+ coordJob.getId());
+ WorkflowActionBean wfAction1 =
addRecordToWfActionTable(wfJob1.getId(), "1", WorkflowAction.Status.OK);
+ WorkflowActionBean wfAction2 =
addRecordToWfActionTable(wfJob2.getId(), "2", WorkflowAction.Status.OK);
+ WorkflowActionBean wfAction3 =
addRecordToWfActionTable(wfJob3.getId(), "2", WorkflowAction.Status.OK);
+ WorkflowActionBean wfAction4 =
addRecordToWfActionTable(wfJob4.getId(), "1", WorkflowAction.Status.OK);
+ WorkflowActionBean wfAction5 =
addRecordToWfActionTable(wfJob5.getId(), "1", WorkflowAction.Status.OK);
+ CoordinatorActionBean coordAction1 =
addRecordToCoordActionTable(coordJob.getId(), 1,
CoordinatorAction.Status.SUCCEEDED,
+ "coord-action-get.xml", wfJob1.getId(), "SUCCEEDED", 0);
+ CoordinatorActionBean coordAction2 =
addRecordToCoordActionTable(coordJob.getId(), 2,
CoordinatorAction.Status.SUCCEEDED,
+ "coord-action-get.xml", wfJob2.getId(), "SUCCEEDED", 0);
+ CoordinatorActionBean coordAction3 =
addRecordToCoordActionTable(coordJob.getId(), 3,
CoordinatorAction.Status.SUCCEEDED,
+ "coord-action-get.xml", wfJob3.getId(), "SUCCEEDED", 0);
+ CoordinatorActionBean coordAction4 =
addRecordToCoordActionTable(coordJob.getId(), 4,
CoordinatorAction.Status.SUCCEEDED,
+ "coord-action-get.xml", wfJob4.getId(), "SUCCEEDED", 0);
+ CoordinatorActionBean coordAction5 =
addRecordToCoordActionTable(coordJob.getId(), 5,
CoordinatorAction.Status.SUCCEEDED,
+ "coord-action-get.xml", wfJob5.getId(), "SUCCEEDED", 0);
+
+ List<String> children = new ArrayList<String>();
+ // Get the first 3
+ children.addAll(jpaService.execute(new
WorkflowJobsGetFromCoordParentIdJPAExecutor(coordJob.getId(), 3)));
+ assertEquals(3, children.size());
+ // Get the next 3 (though there's only 2 more)
+ children.addAll(jpaService.execute(new
WorkflowJobsGetFromCoordParentIdJPAExecutor(coordJob.getId(), 3, 3)));
+ assertEquals(5, children.size());
+ checkChildren(children, wfJob1.getId(), wfJob2.getId(),
wfJob3.getId(), wfJob4.getId(), wfJob5.getId());
+ }
+
+ private void checkChildren(List<String> children, String... wfJobIDs) {
+ assertEquals(wfJobIDs.length, children.size());
+ Arrays.sort(wfJobIDs);
+ Collections.sort(children);
+
+ for (int i = 0; i < wfJobIDs.length; i++) {
+ assertEquals(wfJobIDs[i], children.get(i));
+ }
+ }
+}
Added:
oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsGetFromWorkflowParentIdJPAExecutor.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsGetFromWorkflowParentIdJPAExecutor.java?rev=1511513&view=auto
==============================================================================
---
oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsGetFromWorkflowParentIdJPAExecutor.java
(added)
+++
oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsGetFromWorkflowParentIdJPAExecutor.java
Wed Aug 7 21:53:50 2013
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.workflow.WorkflowInstance;
+
+public class TestWorkflowJobsGetFromWorkflowParentIdJPAExecutor extends
XDataTestCase {
+ Services services;
+ private String[] excludedServices = {
"org.apache.oozie.service.StatusTransitService",
+ "org.apache.oozie.service.PauseTransitService",
"org.apache.oozie.service.PurgeService",
+ "org.apache.oozie.service.CoordMaterializeTriggerService",
"org.apache.oozie.service.RecoveryService" };
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ services = new Services();
+ setClassesToBeExcluded(services.getConf(), excludedServices);
+ services.init();
+ cleanUpDBTables();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ services.destroy();
+ super.tearDown();
+ }
+
+ public void testGetWorkflowParent() throws Exception {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+
+ WorkflowJobBean wfJobA =
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED,
WorkflowInstance.Status.SUCCEEDED);
+ WorkflowJobBean wfJobB =
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED,
WorkflowInstance.Status.SUCCEEDED);
+ WorkflowJobBean subwfJobA1 =
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED,
WorkflowInstance.Status.SUCCEEDED,
+ wfJobA.getId());
+ WorkflowJobBean subwfJobA2 =
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED,
WorkflowInstance.Status.SUCCEEDED,
+ wfJobA.getId());
+ WorkflowJobBean subwfJobB =
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED,
WorkflowInstance.Status.SUCCEEDED,
+ wfJobB.getId());
+ WorkflowActionBean wfActionA =
addRecordToWfActionTable(wfJobA.getId(), "1", WorkflowAction.Status.OK);
+ WorkflowActionBean wfActionB =
addRecordToWfActionTable(wfJobB.getId(), "1", WorkflowAction.Status.OK);
+ WorkflowActionBean subwfActionA1 =
addRecordToWfActionTable(subwfJobA1.getId(), "1", WorkflowAction.Status.OK);
+ WorkflowActionBean subwfActionA2 =
addRecordToWfActionTable(subwfJobA2.getId(), "1", WorkflowAction.Status.OK);
+ WorkflowActionBean subwfActionB =
addRecordToWfActionTable(subwfJobB.getId(), "1", WorkflowAction.Status.OK);
+
+ List<String> children = new ArrayList<String>();
+ children.addAll(jpaService.execute(new
WorkflowJobsGetFromWorkflowParentIdJPAExecutor(wfJobA.getId(), 10)));
+ checkChildren(children, subwfJobA1.getId(), subwfJobA2.getId());
+
+ children = new ArrayList<String>();
+ children.addAll(jpaService.execute(new
WorkflowJobsGetFromWorkflowParentIdJPAExecutor(wfJobB.getId(), 10)));
+ checkChildren(children, subwfJobB.getId());
+ }
+
+ public void testGetCoordinatorParentTooMany() throws Exception {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+
+ WorkflowJobBean wfJob =
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED,
WorkflowInstance.Status.SUCCEEDED);
+ WorkflowJobBean subwfJob1 =
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED,
WorkflowInstance.Status.SUCCEEDED,
+ wfJob.getId());
+ WorkflowJobBean subwfJob2 =
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED,
WorkflowInstance.Status.SUCCEEDED,
+ wfJob.getId());
+ WorkflowJobBean subwfJob3 =
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED,
WorkflowInstance.Status.SUCCEEDED,
+ wfJob.getId());
+ WorkflowJobBean subwfJob4 =
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED,
WorkflowInstance.Status.SUCCEEDED,
+ wfJob.getId());
+ WorkflowJobBean subwfJob5 =
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED,
WorkflowInstance.Status.SUCCEEDED,
+ wfJob.getId());
+ WorkflowActionBean wfAction1 = addRecordToWfActionTable(wfJob.getId(),
"1", WorkflowAction.Status.OK);
+ WorkflowActionBean wfAction2 = addRecordToWfActionTable(wfJob.getId(),
"2", WorkflowAction.Status.OK);
+ WorkflowActionBean wfAction3 = addRecordToWfActionTable(wfJob.getId(),
"3", WorkflowAction.Status.OK);
+ WorkflowActionBean wfAction4 = addRecordToWfActionTable(wfJob.getId(),
"4", WorkflowAction.Status.OK);
+ WorkflowActionBean wfAction5 = addRecordToWfActionTable(wfJob.getId(),
"5", WorkflowAction.Status.OK);
+ WorkflowActionBean subwfAction1 =
addRecordToWfActionTable(subwfJob1.getId(), "1", WorkflowAction.Status.OK);
+ WorkflowActionBean subwfAction2 =
addRecordToWfActionTable(subwfJob2.getId(), "1", WorkflowAction.Status.OK);
+ WorkflowActionBean subwfAction3 =
addRecordToWfActionTable(subwfJob3.getId(), "1", WorkflowAction.Status.OK);
+ WorkflowActionBean subwfAction4 =
addRecordToWfActionTable(subwfJob4.getId(), "1", WorkflowAction.Status.OK);
+ WorkflowActionBean subwfAction5 =
addRecordToWfActionTable(subwfJob5.getId(), "1", WorkflowAction.Status.OK);
+
+ List<String> children = new ArrayList<String>();
+ // Get the first 3
+ children.addAll(jpaService.execute(new
WorkflowJobsGetFromWorkflowParentIdJPAExecutor(wfJob.getId(), 3)));
+ assertEquals(3, children.size());
+ // Get the next 3 (though there's only 2 more)
+ children.addAll(jpaService.execute(new
WorkflowJobsGetFromWorkflowParentIdJPAExecutor(wfJob.getId(), 3, 3)));
+ assertEquals(5, children.size());
+ checkChildren(children, subwfJob1.getId(), subwfJob2.getId(),
subwfJob3.getId(), subwfJob4.getId(), subwfJob5.getId());
+ }
+
+ private void checkChildren(List<String> children, String... wfJobIDs) {
+ assertEquals(wfJobIDs.length, children.size());
+ Arrays.sort(wfJobIDs);
+ Collections.sort(children);
+
+ for (int i = 0; i < wfJobIDs.length; i++) {
+ assertEquals(wfJobIDs[i], children.get(i));
+ }
+ }
+}
Modified:
oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java?rev=1511513&r1=1511512&r2=1511513&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
(original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java Wed
Aug 7 21:53:50 2013
@@ -545,7 +545,7 @@ public abstract class XDataTestCase exte
if (wfId != null) {
WorkflowJobBean wfJob = jpaService.execute(new
WorkflowJobGetJPAExecutor(wfId));
- wfJob.setParentId(jobId);
+ wfJob.setParentId(action.getId());
jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
}
}
Modified: oozie/trunk/release-log.txt
URL:
http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1511513&r1=1511512&r2=1511513&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Wed Aug 7 21:53:50 2013
@@ -1,5 +1,6 @@
-- Oozie 4.1.0 release (trunk - unreleased)
+OOZIE-1449 Coordinator Workflow parent relationship is broken for purge
service (rkanter)
OOZIE-1458 If a Credentials type is not defined, Oozie should say something
(rkanter)
OOZIE-1425 param checker should validate cron syntax (bowenzhangusa via
rkanter)
OOZIE-1453 Change "frequency" to string in SyncCoordAction.java (bowenzhangusa
via rkanter)