Added: oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsGetFromParentIdJPAExecutor.java URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsGetFromParentIdJPAExecutor.java?rev=1465054&view=auto ============================================================================== --- oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsGetFromParentIdJPAExecutor.java (added) +++ oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsGetFromParentIdJPAExecutor.java Fri Apr 5 17:25:08 2013 @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.executor.jpa; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.oozie.CoordinatorActionBean; +import org.apache.oozie.CoordinatorJobBean; +import org.apache.oozie.WorkflowActionBean; +import org.apache.oozie.WorkflowJobBean; +import org.apache.oozie.client.CoordinatorAction; +import org.apache.oozie.client.CoordinatorJob; +import org.apache.oozie.client.WorkflowAction; +import org.apache.oozie.client.WorkflowJob; +import org.apache.oozie.service.JPAService; +import org.apache.oozie.service.Services; +import org.apache.oozie.test.XDataTestCase; +import org.apache.oozie.workflow.WorkflowInstance; + +public class TestWorkflowJobsGetFromParentIdJPAExecutor extends XDataTestCase { + Services services; + private String[] excludedServices = { "org.apache.oozie.service.StatusTransitService", + "org.apache.oozie.service.PauseTransitService", "org.apache.oozie.service.PurgeService", + "org.apache.oozie.service.CoordMaterializeTriggerService", "org.apache.oozie.service.RecoveryService" }; + + @Override + protected void setUp() throws Exception { + super.setUp(); + services = new Services(); + setClassesToBeExcluded(services.getConf(), excludedServices); + services.init(); + cleanUpDBTables(); + } + + @Override + protected void tearDown() throws Exception { + services.destroy(); + super.tearDown(); + } + + public void testGetCoordinatorParent() throws Exception { + JPAService jpaService = Services.get().get(JPAService.class); + assertNotNull(jpaService); + + CoordinatorJobBean coordJobA = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false); + CoordinatorJobBean coordJobB = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false); + WorkflowJobBean wfJobA1 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED); + WorkflowJobBean wfJobA2 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED); + WorkflowJobBean wfJobB = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED); + WorkflowActionBean wfActionA1 = addRecordToWfActionTable(wfJobA1.getId(), "1", WorkflowAction.Status.OK); + WorkflowActionBean wfActionA2 = addRecordToWfActionTable(wfJobA2.getId(), "1", WorkflowAction.Status.OK); + WorkflowActionBean wfActionB = addRecordToWfActionTable(wfJobB.getId(), "1", WorkflowAction.Status.OK); + CoordinatorActionBean coordActionA1 = addRecordToCoordActionTable(coordJobA.getId(), 1, CoordinatorAction.Status.SUCCEEDED, + "coord-action-get.xml", wfJobA1.getId(), "SUCCEEDED", 0); + CoordinatorActionBean coordActionA2 = addRecordToCoordActionTable(coordJobA.getId(), 2, CoordinatorAction.Status.SUCCEEDED, + "coord-action-get.xml", wfJobA2.getId(), "SUCCEEDED", 0); + CoordinatorActionBean coordActionB = addRecordToCoordActionTable(coordJobB.getId(), 1, CoordinatorAction.Status.SUCCEEDED, + "coord-action-get.xml", wfJobB.getId(), "SUCCEEDED", 0); + + List<String> children = new ArrayList<String>(); + children.addAll(jpaService.execute(new WorkflowJobsGetFromParentIdJPAExecutor(coordJobA.getId(), 10))); + checkChildren(children, wfJobA1.getId(), wfJobA2.getId()); + + children = new ArrayList<String>(); + children.addAll(jpaService.execute(new WorkflowJobsGetFromParentIdJPAExecutor(coordJobB.getId(), 10))); + checkChildren(children, wfJobB.getId()); + } + + public void testGetWorkflowParent() throws Exception { + JPAService jpaService = Services.get().get(JPAService.class); + assertNotNull(jpaService); + + WorkflowJobBean wfJobA = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED); + WorkflowJobBean wfJobB = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED); + WorkflowJobBean subwfJobA1 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED, + wfJobA.getId()); + WorkflowJobBean subwfJobA2 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED, + wfJobA.getId()); + WorkflowJobBean subwfJobB = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED, + wfJobB.getId()); + WorkflowActionBean wfActionA = addRecordToWfActionTable(wfJobA.getId(), "1", WorkflowAction.Status.OK); + WorkflowActionBean wfActionB = addRecordToWfActionTable(wfJobB.getId(), "1", WorkflowAction.Status.OK); + WorkflowActionBean subwfActionA1 = addRecordToWfActionTable(subwfJobA1.getId(), "1", WorkflowAction.Status.OK); + WorkflowActionBean subwfActionA2 = addRecordToWfActionTable(subwfJobA2.getId(), "1", WorkflowAction.Status.OK); + WorkflowActionBean subwfActionB = addRecordToWfActionTable(subwfJobB.getId(), "1", WorkflowAction.Status.OK); + + List<String> children = new ArrayList<String>(); + children.addAll(jpaService.execute(new WorkflowJobsGetFromParentIdJPAExecutor(wfJobA.getId(), 10))); + checkChildren(children, subwfJobA1.getId(), subwfJobA2.getId()); + + children = new ArrayList<String>(); + children.addAll(jpaService.execute(new WorkflowJobsGetFromParentIdJPAExecutor(wfJobB.getId(), 10))); + checkChildren(children, subwfJobB.getId()); + } + + public void testGetWorkflowParentTooMany() throws Exception { + JPAService jpaService = Services.get().get(JPAService.class); + assertNotNull(jpaService); + + CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false); + WorkflowJobBean wfJob1 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED, + coordJob.getId()); + WorkflowJobBean wfJob2 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED, + coordJob.getId()); + WorkflowJobBean wfJob3 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED, + coordJob.getId()); + WorkflowJobBean wfJob4 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED, + coordJob.getId()); + WorkflowJobBean wfJob5 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED, + coordJob.getId()); + WorkflowActionBean wfAction1 = addRecordToWfActionTable(wfJob1.getId(), "1", WorkflowAction.Status.OK); + WorkflowActionBean wfAction2 = addRecordToWfActionTable(wfJob2.getId(), "2", WorkflowAction.Status.OK); + WorkflowActionBean wfAction3 = addRecordToWfActionTable(wfJob3.getId(), "2", WorkflowAction.Status.OK); + WorkflowActionBean wfAction4 = addRecordToWfActionTable(wfJob4.getId(), "1", WorkflowAction.Status.OK); + WorkflowActionBean wfAction5 = addRecordToWfActionTable(wfJob5.getId(), "1", WorkflowAction.Status.OK); + CoordinatorActionBean coordAction1 = addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED, + "coord-action-get.xml", wfJob1.getId(), "SUCCEEDED", 0); + CoordinatorActionBean coordAction2 = addRecordToCoordActionTable(coordJob.getId(), 2, CoordinatorAction.Status.SUCCEEDED, + "coord-action-get.xml", wfJob2.getId(), "SUCCEEDED", 0); + CoordinatorActionBean coordAction3 = addRecordToCoordActionTable(coordJob.getId(), 3, CoordinatorAction.Status.SUCCEEDED, + "coord-action-get.xml", wfJob3.getId(), "SUCCEEDED", 0); + CoordinatorActionBean coordAction4 = addRecordToCoordActionTable(coordJob.getId(), 4, CoordinatorAction.Status.SUCCEEDED, + "coord-action-get.xml", wfJob4.getId(), "SUCCEEDED", 0); + CoordinatorActionBean coordAction5 = addRecordToCoordActionTable(coordJob.getId(), 5, CoordinatorAction.Status.SUCCEEDED, + "coord-action-get.xml", wfJob5.getId(), "SUCCEEDED", 0); + + List<String> children = new ArrayList<String>(); + // Get the first 3 + children.addAll(jpaService.execute(new WorkflowJobsGetFromParentIdJPAExecutor(coordJob.getId(), 3))); + assertEquals(3, children.size()); + // Get the next 3 (though there's only 2 more) + children.addAll(jpaService.execute(new WorkflowJobsGetFromParentIdJPAExecutor(coordJob.getId(), 3, 3))); + assertEquals(5, children.size()); + checkChildren(children, wfJob1.getId(), wfJob2.getId(), wfJob3.getId(), wfJob4.getId(), wfJob5.getId()); + } + + public void testGetCoordinatorParentTooMany() throws Exception { + JPAService jpaService = Services.get().get(JPAService.class); + assertNotNull(jpaService); + + WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED); + WorkflowJobBean subwfJob1 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED, + wfJob.getId()); + WorkflowJobBean subwfJob2 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED, + wfJob.getId()); + WorkflowJobBean subwfJob3 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED, + wfJob.getId()); + WorkflowJobBean subwfJob4 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED, + wfJob.getId()); + WorkflowJobBean subwfJob5 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED, + wfJob.getId()); + WorkflowActionBean wfAction1 = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK); + WorkflowActionBean wfAction2 = addRecordToWfActionTable(wfJob.getId(), "2", WorkflowAction.Status.OK); + WorkflowActionBean wfAction3 = addRecordToWfActionTable(wfJob.getId(), "3", WorkflowAction.Status.OK); + WorkflowActionBean wfAction4 = addRecordToWfActionTable(wfJob.getId(), "4", WorkflowAction.Status.OK); + WorkflowActionBean wfAction5 = addRecordToWfActionTable(wfJob.getId(), "5", WorkflowAction.Status.OK); + WorkflowActionBean subwfAction1 = addRecordToWfActionTable(subwfJob1.getId(), "1", WorkflowAction.Status.OK); + WorkflowActionBean subwfAction2 = addRecordToWfActionTable(subwfJob2.getId(), "1", WorkflowAction.Status.OK); + WorkflowActionBean subwfAction3 = addRecordToWfActionTable(subwfJob3.getId(), "1", WorkflowAction.Status.OK); + WorkflowActionBean subwfAction4 = addRecordToWfActionTable(subwfJob4.getId(), "1", WorkflowAction.Status.OK); + WorkflowActionBean subwfAction5 = addRecordToWfActionTable(subwfJob5.getId(), "1", WorkflowAction.Status.OK); + + List<String> children = new ArrayList<String>(); + // Get the first 3 + children.addAll(jpaService.execute(new WorkflowJobsGetFromParentIdJPAExecutor(wfJob.getId(), 3))); + assertEquals(3, children.size()); + // Get the next 3 (though there's only 2 more) + children.addAll(jpaService.execute(new WorkflowJobsGetFromParentIdJPAExecutor(wfJob.getId(), 3, 3))); + assertEquals(5, children.size()); + checkChildren(children, subwfJob1.getId(), subwfJob2.getId(), subwfJob3.getId(), subwfJob4.getId(), subwfJob5.getId()); + } + + private void checkChildren(List<String> children, String... wfJobIDs) { + assertEquals(wfJobIDs.length, children.size()); + Arrays.sort(wfJobIDs); + Collections.sort(children); + + for (int i = 0; i < wfJobIDs.length; i++) { + assertEquals(wfJobIDs[i], children.get(i)); + } + } +}
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/service/TestPurgeService.java URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestPurgeService.java?rev=1465054&r1=1465053&r2=1465054&view=diff ============================================================================== --- oozie/trunk/core/src/test/java/org/apache/oozie/service/TestPurgeService.java (original) +++ oozie/trunk/core/src/test/java/org/apache/oozie/service/TestPurgeService.java Fri Apr 5 17:25:08 2013 @@ -40,7 +40,7 @@ import org.apache.oozie.DagEngineExcepti import org.apache.oozie.ForTestingActionExecutor; import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.ErrorCode; -import org.apache.oozie.command.wf.PurgeXCommand; +import org.apache.oozie.command.PurgeXCommand; import org.apache.oozie.executor.jpa.BundleActionGetJPAExecutor; import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor; import org.apache.oozie.executor.jpa.BundleJobInsertJPAExecutor; @@ -114,7 +114,7 @@ public class TestPurgeService extends XD } }); assertEquals(WorkflowJob.Status.SUCCEEDED, engine.getJob(jobId).getStatus()); - new PurgeXCommand(1, 10000).call(); + new PurgeXCommand(1, 1, 1, 10000).call(); sleep(1000); JPAService jpaService = Services.get().get(JPAService.class); Modified: oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java?rev=1465054&r1=1465053&r2=1465054&view=diff ============================================================================== --- oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java (original) +++ oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java Fri Apr 5 17:25:08 2013 @@ -55,11 +55,15 @@ import org.apache.oozie.executor.jpa.Bun import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor; import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor; import org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor; +import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor; +import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor; import org.apache.oozie.executor.jpa.JPAExecutorException; import org.apache.oozie.executor.jpa.SLAEventInsertJPAExecutor; import org.apache.oozie.executor.jpa.WorkflowActionInsertJPAExecutor; +import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor; +import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor; import org.apache.oozie.service.JPAService; import org.apache.oozie.service.LiteWorkflowStoreService; import org.apache.oozie.service.Services; @@ -532,6 +536,12 @@ public abstract class XDataTestCase exte assertNotNull(jpaService); CoordActionInsertJPAExecutor coordActionInsertExecutor = new CoordActionInsertJPAExecutor(action); jpaService.execute(coordActionInsertExecutor); + + if (wfId != null) { + WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(wfId)); + wfJob.setParentId(jobId); + jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob)); + } } catch (JPAExecutorException je) { je.printStackTrace(); @@ -592,6 +602,32 @@ public abstract class XDataTestCase exte } /** + * Insert subwf job for testing. + * + * @param jobStatus workflow job status + * @param instanceStatus workflow instance status + * @param parentId the id of the parent workflow + * @return workflow job bean + * @throws Exception thrown if unable to create workflow job bean + */ + protected WorkflowJobBean addRecordToWfJobTable(WorkflowJob.Status jobStatus, WorkflowInstance.Status instanceStatus, + String parentId) throws Exception { + WorkflowJobBean subwfBean = addRecordToWfJobTable(jobStatus, instanceStatus); + subwfBean.setParentId(parentId); + try { + JPAService jpaService = Services.get().get(JPAService.class); + assertNotNull(jpaService); + jpaService.execute(new WorkflowJobUpdateJPAExecutor(subwfBean)); + } + catch (JPAExecutorException je) { + je.printStackTrace(); + fail("Unable to insert the test wf job record to table"); + throw je; + } + return subwfBean; + } + + /** * Insert wf job for testing. * * @param jobStatus workflow job status @@ -778,13 +814,47 @@ public abstract class XDataTestCase exte */ protected BundleActionBean addRecordToBundleActionTable(String jobId, String coordName, int pending, Job.Status status) throws Exception { - BundleActionBean action = createBundleAction(jobId, coordName, pending, status); + BundleActionBean action = createBundleAction(jobId, coordName, coordName, pending, status); + + try { + JPAService jpaService = Services.get().get(JPAService.class); + assertNotNull(jpaService); + BundleActionInsertJPAExecutor bundleActionJPAExecutor = new BundleActionInsertJPAExecutor(action); + jpaService.execute(bundleActionJPAExecutor); + } + catch (JPAExecutorException ex) { + ex.printStackTrace(); + fail("Unable to insert the test bundle action record to table"); + throw ex; + } + + return action; + } + + /** + * Create bundle action bean and save to db + * + * @param jobId bundle job id + * @param coordId coordinator id + * @param coordName coordinator name + * @param pending true if action is pending + * @param status job status + * @return bundle action bean + * @throws Exception + */ + protected BundleActionBean addRecordToBundleActionTable(String jobId, String coordId, String coordName, int pending, + Job.Status status) throws Exception { + BundleActionBean action = createBundleAction(jobId, coordId, coordName, pending, status); try { JPAService jpaService = Services.get().get(JPAService.class); assertNotNull(jpaService); BundleActionInsertJPAExecutor bundleActionJPAExecutor = new BundleActionInsertJPAExecutor(action); jpaService.execute(bundleActionJPAExecutor); + + CoordinatorJobBean coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordId)); + coordJob.setBundleId(jobId); + jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob)); } catch (JPAExecutorException ex) { ex.printStackTrace(); @@ -799,19 +869,20 @@ public abstract class XDataTestCase exte * Create bundle action bean * * @param jobId bundle job id + * @param coordId coordinator id * @param coordName coordinator name * @param pending true if action is pending * @param status job status * @return bundle action bean * @throws Exception */ - protected BundleActionBean createBundleAction(String jobId, String coordName, int pending, Job.Status status) + protected BundleActionBean createBundleAction(String jobId, String coordId, String coordName, int pending, Job.Status status) throws Exception { BundleActionBean action = new BundleActionBean(); action.setBundleId(jobId); action.setBundleActionId(jobId + "_" + coordName); action.setPending(pending); - action.setCoordId(coordName); + action.setCoordId(coordId); action.setCoordName(coordName); action.setStatus(status); action.setLastModifiedTime(new Date()); Modified: oozie/trunk/release-log.txt URL: http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1465054&r1=1465053&r2=1465054&view=diff ============================================================================== --- oozie/trunk/release-log.txt (original) +++ oozie/trunk/release-log.txt Fri Apr 5 17:25:08 2013 @@ -1,5 +1,6 @@ -- Oozie 4.1.0 release (trunk - unreleased) +OOZIE-1118 improve logic of purge service (rkanter) OOZIE-1205 If the JobTracker is restarted during a Fork, Oozie doesn't fail all of the currently running actions (rkanter) OOZIE-1286 SSH Action does not properly handle arguments that have spaces (rkanter) OOZIE-1300 [Doc] Error in the the email action XML schema (harsh)
