Repository: oozie Updated Branches: refs/heads/master b4c600689 -> a9b3c7bb4
http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/core/src/test/java/org/apache/oozie/command/bundle/TestBulkBundleXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/bundle/TestBulkBundleXCommand.java b/core/src/test/java/org/apache/oozie/command/bundle/TestBulkBundleXCommand.java new file mode 100644 index 0000000..8e63263 --- /dev/null +++ b/core/src/test/java/org/apache/oozie/command/bundle/TestBulkBundleXCommand.java @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.command.bundle; + +import org.apache.oozie.BundleActionBean; +import org.apache.oozie.BundleJobBean; +import org.apache.oozie.CoordinatorJobBean; +import org.apache.oozie.client.BundleJob; +import org.apache.oozie.client.CoordinatorJob; +import org.apache.oozie.client.Job; +import org.apache.oozie.command.OperationType; +import org.apache.oozie.command.PreconditionException; +import org.apache.oozie.executor.jpa.BundleActionQueryExecutor; +import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery; +import org.apache.oozie.executor.jpa.BundleJobQueryExecutor; +import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; +import org.apache.oozie.service.Services; +import org.apache.oozie.test.XDataTestCase; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class TestBulkBundleXCommand extends XDataTestCase { + private Services services; + + @Override + protected void setUp() throws Exception { + super.setUp(); + services = new Services(); + services.init(); + } + + @Override + protected void tearDown() throws Exception { + services.destroy(); + super.tearDown(); + } + + public void testBulkBundleSuspendResumeKillSuccess() throws Exception { + BundleJobBean job1 = this.addRecordToBundleJobTable(Job.Status.RUNNING, false); + BundleJobBean job2 = this.addRecordToBundleJobTable(Job.Status.RUNNING, false); + + Map<String, List<String>> map = new HashMap<String, List<String>>(); + List<String> names = new ArrayList<String>(); + names.add("BUNDLE-TEST"); + map.put("name", names); + + new BulkBundleXCommand(map, 1, 50, OperationType.Suspend).call(); + verifyJobStatus(job1.getId(), BundleJob.Status.SUSPENDED); + verifyChildrenStatus(job1.getId(), CoordinatorJob.Status.SUSPENDED); + + verifyJobStatus(job2.getId(), BundleJob.Status.SUSPENDED); + verifyChildrenStatus(job2.getId(), CoordinatorJob.Status.SUSPENDED); + + new BulkBundleXCommand(map, 1, 50, OperationType.Resume).call(); + verifyJobStatus(job1.getId(), BundleJob.Status.RUNNING); + verifyChildrenStatus(job1.getId(), CoordinatorJob.Status.RUNNING); + + verifyJobStatus(job2.getId(), BundleJob.Status.RUNNING); + verifyChildrenStatus(job2.getId(), CoordinatorJob.Status.RUNNING); + + new BulkBundleXCommand(map, 1, 50, OperationType.Kill).call(); + verifyJobStatus(job1.getId(), BundleJob.Status.KILLED); + verifyChildrenStatus(job1.getId(), CoordinatorJob.Status.KILLED); + + verifyJobStatus(job2.getId(), BundleJob.Status.KILLED); + verifyChildrenStatus(job2.getId(), CoordinatorJob.Status.KILLED); + } + + public void testBulkBundleKillNoOp() throws Exception { + BundleJobBean job = this.addRecordToBundleJobTable(Job.Status.KILLED, false); + + Map<String, List<String>> map = new HashMap<String, List<String>>(); + List<String> names = new ArrayList<String>(); + names.add("BUNDLE-TEST"); + map.put("name", names); + + new BulkBundleXCommand(map, 1, 50, OperationType.Kill).call(); + + verifyJobStatus(job.getId(), BundleJob.Status.KILLED); + verifyChildrenStatus(job.getId(), CoordinatorJob.Status.KILLED); + } + + public void testBulkBundleKillNegative() throws Exception { + BundleJobBean job1 = this.addRecordToBundleJobTable(Job.Status.RUNNING, false); + BundleJobBean job2 = this.addRecordToBundleJobTable(Job.Status.RUNNING, false); + + Map<String, List<String>> map = new HashMap<String, List<String>>(); + List<String> names = new ArrayList<String>(); + names.add("BUNDLE"); + map.put("name", names); + + new BulkBundleXCommand(map, 1, 50, OperationType.Kill).call(); + + verifyJobStatus(job1.getId(), BundleJob.Status.RUNNING); + verifyChildrenStatus(job1.getId(), CoordinatorJob.Status.RUNNING); + + verifyJobStatus(job2.getId(), BundleJob.Status.RUNNING); + verifyChildrenStatus(job2.getId(), CoordinatorJob.Status.RUNNING); + + } + + public void testBulkBundleSuspendNoOp() throws Exception { + BundleJobBean job1 = this.addRecordToBundleJobTable(Job.Status.RUNNING, false); + BundleJobBean job2 = this.addRecordToBundleJobTable(Job.Status.KILLED, false); + + Map<String, List<String>> map = new HashMap<String, List<String>>(); + List<String> names = new ArrayList<String>(); + names.add("BUNDLE-TEST"); + map.put("name", names); + + new BulkBundleXCommand(map, 1, 10, OperationType.Suspend).call(); + + verifyJobStatus(job1.getId(), BundleJob.Status.SUSPENDED); + verifyChildrenStatus(job1.getId(), CoordinatorJob.Status.SUSPENDED); + + verifyJobStatus(job2.getId(), BundleJob.Status.KILLED); + verifyChildrenStatus(job2.getId(), CoordinatorJob.Status.KILLED); + } + + public void testBulkBundleSuspendNegative() throws Exception { + BundleJobBean job1 = this.addRecordToBundleJobTable(Job.Status.RUNNING, false); + BundleJobBean job2 = this.addRecordToBundleJobTable(Job.Status.KILLED, false); + + Map<String, List<String>> map = new HashMap<String, List<String>>(); + List<String> names = new ArrayList<String>(); + names.add("BUNDLE"); + map.put("name", names); + + new BulkBundleXCommand(map, 1, 10, OperationType.Suspend).call(); + + verifyJobStatus(job1.getId(), BundleJob.Status.RUNNING); + verifyChildrenStatus(job1.getId(), CoordinatorJob.Status.RUNNING); + + verifyJobStatus(job2.getId(), BundleJob.Status.KILLED); + verifyChildrenStatus(job2.getId(), CoordinatorJob.Status.KILLED); + } + + public void testBulkBundleResumeNegative() throws Exception { + BundleJobBean job1 = this.addRecordToBundleJobTable(Job.Status.RUNNING, false); + BundleJobBean job2 = this.addRecordToBundleJobTable(Job.Status.SUSPENDED, false); + + Map<String, List<String>> map = new HashMap<String, List<String>>(); + List<String> names = new ArrayList<String>(); + names.add("BUNDLE"); + map.put("name", names); + + new BulkBundleXCommand(map, 1, 10, OperationType.Resume).call(); + + verifyJobStatus(job1.getId(), BundleJob.Status.RUNNING); + verifyChildrenStatus(job1.getId(), CoordinatorJob.Status.RUNNING); + + verifyJobStatus(job2.getId(), BundleJob.Status.SUSPENDED); + verifyChildrenStatus(job2.getId(), CoordinatorJob.Status.SUSPENDED); + } + + public void testBulkBundleResumeNoOp() throws Exception { + BundleJobBean job1 = this.addRecordToBundleJobTable(Job.Status.RUNNING, false); + BundleJobBean job2 = this.addRecordToBundleJobTable(Job.Status.SUSPENDED, false); + + Map<String, List<String>> map = new HashMap<String, List<String>>(); + List<String> names = new ArrayList<String>(); + names.add("BUNDLE-TEST"); + map.put("name", names); + + new BulkBundleXCommand(map, 1, 10, OperationType.Resume).call(); + + verifyJobStatus(job1.getId(), BundleJob.Status.RUNNING); + verifyChildrenStatus(job1.getId(), CoordinatorJob.Status.RUNNING); + + verifyJobStatus(job2.getId(), BundleJob.Status.RUNNING); + verifyChildrenStatus(job2.getId(), CoordinatorJob.Status.RUNNING); + } + + private void verifyJobStatus(String jobId, BundleJob.Status status) throws Exception { + BundleJobBean job = BundleJobQueryExecutor.getInstance().get( + BundleJobQueryExecutor.BundleJobQuery.GET_BUNDLE_JOB, jobId); + assertEquals(status, job.getStatus()); + } + + private void verifyChildrenStatus(String jobId, CoordinatorJob.Status status) throws Exception { + List<BundleActionBean> actions = BundleActionQueryExecutor.getInstance().getList( + BundleActionQuery.GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE, jobId); + + for (BundleActionBean action : actions) { + String coordId = action.getCoordId(); + CoordinatorJobBean coordJob = CoordJobQueryExecutor.getInstance().get( + CoordJobQueryExecutor.CoordJobQuery.GET_COORD_JOB, coordId); + assertEquals(status, coordJob.getStatus()); + } + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/core/src/test/java/org/apache/oozie/command/coord/TestBulkCoordXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestBulkCoordXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestBulkCoordXCommand.java new file mode 100644 index 0000000..27e294d --- /dev/null +++ b/core/src/test/java/org/apache/oozie/command/coord/TestBulkCoordXCommand.java @@ -0,0 +1,259 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.command.coord; + +import org.apache.oozie.CoordinatorActionBean; +import org.apache.oozie.CoordinatorJobBean; +import org.apache.oozie.client.CoordinatorAction; +import org.apache.oozie.client.CoordinatorJob; +import org.apache.oozie.client.Job; +import org.apache.oozie.command.OperationType; +import org.apache.oozie.command.PreconditionException; +import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; +import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; +import org.apache.oozie.service.Services; +import org.apache.oozie.test.XDataTestCase; +import org.apache.oozie.util.DateUtils; + +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class TestBulkCoordXCommand extends XDataTestCase { + private Services services; + private String[] excludedServices = { "org.apache.oozie.service.StatusTransitService", + "org.apache.oozie.service.PauseTransitService", + "org.apache.oozie.service.CoordMaterializeTriggerService", "org.apache.oozie.service.RecoveryService" }; + + @Override + protected void setUp() throws Exception { + super.setUp(); + services = new Services(); + setClassesToBeExcluded(services.getConf(), excludedServices); + services.init(); + } + + @Override + protected void tearDown() throws Exception { + services.destroy(); + super.tearDown(); + } + + public void testBulkCoordSuspendResumeKillSuccess() throws Exception { + String currentDatePlusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1); + Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth); + Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth); + + CoordinatorJobBean job1 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, false, false, 0); + CoordinatorActionBean action1 = addRecordToCoordActionTable(job1.getId(), 1, + CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0); + + CoordinatorJobBean job2 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, false, false, 0); + CoordinatorActionBean action2 = addRecordToCoordActionTable(job2.getId(), 1, + CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0); + + List<String> jobIds = new ArrayList<String>(); + jobIds.add(job1.getId()); + jobIds.add(job2.getId()); + + List<String> actionIds = new ArrayList<String>(); + actionIds.add(action1.getId()); + actionIds.add(action2.getId()); + + Map<String, List<String>> map = new HashMap<String, List<String>>(); + List<String> names = new ArrayList<String>(); + names.add("COORD-TEST"); + map.put("name", names); + + new BulkCoordXCommand(map, 1, 50, OperationType.Suspend).call(); + verifyJobsStatus(jobIds, CoordinatorJob.Status.SUSPENDED); + verifyActionsStatus(actionIds, CoordinatorAction.Status.SUSPENDED); + + new BulkCoordXCommand(map, 1, 50, OperationType.Resume).call(); + verifyJobsStatus(jobIds, CoordinatorJob.Status.RUNNING); + verifyActionsStatus(actionIds, CoordinatorAction.Status.RUNNING); + + new BulkCoordXCommand(map, 1, 50, OperationType.Kill).call(); + verifyJobsStatus(jobIds, CoordinatorJob.Status.KILLED); + verifyActionsStatus(actionIds, CoordinatorAction.Status.KILLED); + } + + public void testBulkCoordKillNegative() throws Exception { + String currentDatePlusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1); + Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth); + Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth); + + CoordinatorJobBean job1 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, false, false, 0); + CoordinatorActionBean action1 = addRecordToCoordActionTable(job1.getId(), 1, + CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0); + + Map<String, List<String>> map = new HashMap<String, List<String>>(); + List<String> names = new ArrayList<String>(); + names.add("COORD"); + map.put("name", names); + + new BulkCoordXCommand(map, 1, 50, OperationType.Kill).call(); + + List<String> jobIds = new ArrayList<String>(); + jobIds.add(job1.getId()); + List<String> actionIds = new ArrayList<String>(); + actionIds.add(action1.getId()); + + verifyJobsStatus(jobIds, CoordinatorJob.Status.RUNNING); + verifyActionsStatus(actionIds, CoordinatorAction.Status.RUNNING); + } + + public void testBulkCoordKillNoOp() throws Exception { + String currentDatePlusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1); + Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth); + Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth); + + CoordinatorJobBean job1 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, start, end, false, false, 0); + CoordinatorActionBean action1 = addRecordToCoordActionTable(job1.getId(), 1, + CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0); + + Map<String, List<String>> map = new HashMap<String, List<String>>(); + List<String> names = new ArrayList<String>(); + names.add("COORD-TEST"); + map.put("name", names); + + new BulkCoordXCommand(map, 1, 50, OperationType.Kill).call(); + + List<String> jobIds = new ArrayList<String>(); + jobIds.add(job1.getId()); + List<String> actionIds = new ArrayList<String>(); + actionIds.add(action1.getId()); + + verifyJobsStatus(jobIds, CoordinatorJob.Status.SUCCEEDED); + verifyActionsStatus(actionIds, CoordinatorAction.Status.SUCCEEDED); + } + + public void testBulkCoordSuspendNoOp() throws Exception { + String currentDatePlusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1); + Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth); + Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth); + + CoordinatorJobBean job1 = addRecordToCoordJobTable(CoordinatorJob.Status.KILLED, start, end, false, false, 0); + CoordinatorActionBean action1 = addRecordToCoordActionTable(job1.getId(), 1, + CoordinatorAction.Status.KILLED, "coord-action-get.xml", 0); + + Map<String, List<String>> map = new HashMap<String, List<String>>(); + List<String> names = new ArrayList<String>(); + names.add("COORD-TEST"); + map.put("name", names); + + new BulkCoordXCommand(map, 1, 50, OperationType.Suspend).call(); + + List<String> jobIds = new ArrayList<String>(); + jobIds.add(job1.getId()); + List<String> actionIds = new ArrayList<String>(); + actionIds.add(action1.getId()); + verifyJobsStatus(jobIds, CoordinatorJob.Status.KILLED); + verifyActionsStatus(actionIds, CoordinatorAction.Status.KILLED); + } + + public void testBulkCoordSuspendNegative() throws Exception { + String currentDatePlusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1); + Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth); + Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth); + + CoordinatorJobBean job1 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, false, false, 0); + CoordinatorActionBean action1 = addRecordToCoordActionTable(job1.getId(), 1, + CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0); + + Map<String, List<String>> map = new HashMap<String, List<String>>(); + List<String> names = new ArrayList<String>(); + names.add("COORD"); + map.put("name", names); + + new BulkCoordXCommand(map, 1, 50, OperationType.Suspend).call(); + + List<String> jobIds = new ArrayList<String>(); + jobIds.add(job1.getId()); + List<String> actionIds = new ArrayList<String>(); + actionIds.add(action1.getId()); + verifyJobsStatus(jobIds, CoordinatorJob.Status.RUNNING); + verifyActionsStatus(actionIds, CoordinatorAction.Status.RUNNING); + } + + public void testBulkCoordResumeNoOp() throws Exception { + String currentDatePlusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1); + Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth); + Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth); + + CoordinatorJobBean job1 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, false, false, 0); + CoordinatorActionBean action1 = addRecordToCoordActionTable(job1.getId(), 1, + CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0); + + Map<String, List<String>> map = new HashMap<String, List<String>>(); + List<String> names = new ArrayList<String>(); + names.add("COORD-TEST"); + map.put("name", names); + + new BulkCoordXCommand(map, 1, 50, OperationType.Resume).call(); + List<String> jobIds = new ArrayList<String>(); + jobIds.add(job1.getId()); + List<String> actionIds = new ArrayList<String>(); + actionIds.add(action1.getId()); + + verifyJobsStatus(jobIds, CoordinatorJob.Status.RUNNING); + verifyActionsStatus(actionIds, CoordinatorAction.Status.RUNNING); + } + + public void testBulkCoordResumeNegative() throws Exception { + String currentDatePlusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1); + Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth); + Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth); + + CoordinatorJobBean job1 = addRecordToCoordJobTable(CoordinatorJob.Status.SUSPENDED, start, end, false, false, 0); + CoordinatorActionBean action1 = addRecordToCoordActionTable(job1.getId(), 1, + CoordinatorAction.Status.SUSPENDED, "coord-action-get.xml", 0); + + Map<String, List<String>> map = new HashMap<String, List<String>>(); + List<String> names = new ArrayList<String>(); + names.add("COORD"); + map.put("name", names); + + new BulkCoordXCommand(map, 1, 50, OperationType.Resume).call(); + List<String> jobIds = new ArrayList<String>(); + jobIds.add(job1.getId()); + List<String> actionIds = new ArrayList<String>(); + actionIds.add(action1.getId()); + + verifyJobsStatus(jobIds, CoordinatorJob.Status.SUSPENDED); + verifyActionsStatus(actionIds, CoordinatorAction.Status.SUSPENDED); + } + + private void verifyJobsStatus(List<String> jobIds, CoordinatorJob.Status status) throws Exception { + for (String id : jobIds) { + CoordinatorJobBean job = CoordJobQueryExecutor.getInstance().get( + CoordJobQueryExecutor.CoordJobQuery.GET_COORD_JOB, id); + assertEquals(status, job.getStatus()); + } + } + + private void verifyActionsStatus(List<String> actionIds, CoordinatorAction.Status status) throws Exception { + for (String id : actionIds) { + CoordinatorActionBean action = CoordActionQueryExecutor.getInstance().get( + CoordActionQueryExecutor.CoordActionQuery.GET_COORD_ACTION, id); + assertEquals(status, action.getStatus()); + } + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/core/src/test/java/org/apache/oozie/command/wf/TestBulkWorkflowXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestBulkWorkflowXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestBulkWorkflowXCommand.java new file mode 100644 index 0000000..a63ffd8 --- /dev/null +++ b/core/src/test/java/org/apache/oozie/command/wf/TestBulkWorkflowXCommand.java @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.command.wf; + +import org.apache.oozie.command.OperationType; +import org.apache.oozie.command.PreconditionException; +import org.apache.oozie.WorkflowActionBean; +import org.apache.oozie.WorkflowJobBean; +import org.apache.oozie.client.WorkflowAction; +import org.apache.oozie.client.WorkflowJob; +import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor; +import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; +import org.apache.oozie.service.Services; +import org.apache.oozie.test.XDataTestCase; +import org.apache.oozie.workflow.WorkflowInstance; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class TestBulkWorkflowXCommand extends XDataTestCase { + private Services services; + + @Override + protected void setUp() throws Exception { + super.setUp(); + services = new Services(); + services.init(); + } + + @Override + protected void tearDown() throws Exception { + services.destroy(); + super.tearDown(); + } + + public void testbulkWfKillSuspendResumeSuccess() throws Exception { + WorkflowJobBean job1 = this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING); + WorkflowActionBean action1 = this.addRecordToWfActionTable(job1.getId(), "1", WorkflowAction.Status.PREP); + + WorkflowJobBean job2 = this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING); + WorkflowActionBean action2 = this.addRecordToWfActionTable(job2.getId(), "1", WorkflowAction.Status.PREP); + + Map<String, List<String>> map = new HashMap<String, List<String>>(); + List<String> names = new ArrayList<String>(); + names.add("testApp"); + map.put("name", names); + + new BulkWorkflowXCommand(map, 1, 10, OperationType.Suspend).call(); + verifyJobStatus(job1.getId(), WorkflowJob.Status.SUSPENDED); + verifyJobStatus(job2.getId(), WorkflowJob.Status.SUSPENDED); + + new BulkWorkflowXCommand(map, 1, 10, OperationType.Resume).call(); + verifyJobStatus(job1.getId(), WorkflowJob.Status.RUNNING); + verifyJobStatus(job2.getId(), WorkflowJob.Status.RUNNING); + + new BulkWorkflowXCommand(map, 1, 10, OperationType.Kill).call(); + verifyJobStatus(job1.getId(), WorkflowJob.Status.KILLED); + verifyJobStatus(job2.getId(), WorkflowJob.Status.KILLED); + verifyActionStatus(action1.getId(), WorkflowAction.Status.KILLED); + verifyActionStatus(action2.getId(), WorkflowAction.Status.KILLED); + } + + public void testbulkWfKillSuccess() throws Exception { + WorkflowJobBean job1 = this.addRecordToWfJobTable(WorkflowJob.Status.SUSPENDED, WorkflowInstance.Status.SUSPENDED); + WorkflowActionBean action1 = this.addRecordToWfActionTable(job1.getId(), "1", WorkflowAction.Status.RUNNING); + + WorkflowJobBean job2 = this.addRecordToWfJobTable(WorkflowJob.Status.SUSPENDED, WorkflowInstance.Status.SUSPENDED); + WorkflowActionBean action2 = this.addRecordToWfActionTable(job2.getId(), "1", WorkflowAction.Status.RUNNING); + + Map<String, List<String>> map = new HashMap<String, List<String>>(); + List<String> names = new ArrayList<String>(); + names.add("testApp"); + map.put("name", names); + + new BulkWorkflowXCommand(map, 1, 50, OperationType.Kill).call(); + verifyJobStatus(job1.getId(), WorkflowJob.Status.KILLED); + verifyJobStatus(job2.getId(), WorkflowJob.Status.KILLED); + verifyActionStatus(action1.getId(), WorkflowAction.Status.KILLED); + verifyActionStatus(action2.getId(), WorkflowAction.Status.KILLED); + } + + public void testbulkWfKillNoOp() throws Exception { + WorkflowJobBean job1 = this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING); + WorkflowActionBean action1 = this.addRecordToWfActionTable(job1.getId(), "1", WorkflowAction.Status.RUNNING); + + WorkflowJobBean job2 = this.addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED); + WorkflowActionBean action2 = this.addRecordToWfActionTable(job2.getId(), "1", WorkflowAction.Status.DONE); + + Map<String, List<String>> map = new HashMap<String, List<String>>(); + List<String> names = new ArrayList<String>(); + names.add("testApp"); + map.put("name", names); + + new BulkWorkflowXCommand(map, 1, 50, OperationType.Kill).call(); + verifyJobStatus(job1.getId(), WorkflowJob.Status.KILLED); + verifyJobStatus(job2.getId(), WorkflowJob.Status.SUCCEEDED); + verifyActionStatus(action1.getId(), WorkflowAction.Status.KILLED); + verifyActionStatus(action2.getId(), WorkflowAction.Status.DONE); + } + + public void testbulkWfKillNegative() throws Exception { + WorkflowJobBean job1 = this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING); + WorkflowActionBean action1 = this.addRecordToWfActionTable(job1.getId(), "1", WorkflowAction.Status.RUNNING); + + Map<String, List<String>> map = new HashMap<String, List<String>>(); + List<String> names = new ArrayList<String>(); + names.add("testApp-new"); + map.put("name", names); + + new BulkWorkflowXCommand(map, 1, 50, OperationType.Kill).call(); + verifyJobStatus(job1.getId(), WorkflowJob.Status.RUNNING); + verifyActionStatus(action1.getId(), WorkflowAction.Status.RUNNING); + } + + public void testBulkSuspendNoOp() throws Exception { + WorkflowJobBean job1 = this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING); + WorkflowActionBean action1 = this.addRecordToWfActionTable(job1.getId(), "1", WorkflowAction.Status.RUNNING); + + WorkflowJobBean job2 = this.addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED); + WorkflowActionBean action2 = this.addRecordToWfActionTable(job2.getId(), "1", WorkflowAction.Status.DONE); + + Map<String, List<String>> map = new HashMap<String, List<String>>(); + List<String> names = new ArrayList<String>(); + names.add("testApp"); + map.put("name", names); + + new BulkWorkflowXCommand(map, 1, 50, OperationType.Suspend).call(); + verifyJobStatus(job1.getId(), WorkflowJob.Status.SUSPENDED); + verifyActionStatus(action1.getId(), WorkflowAction.Status.RUNNING); + verifyJobStatus(job2.getId(), WorkflowJob.Status.SUCCEEDED); + verifyActionStatus(action2.getId(), WorkflowAction.Status.DONE); + } + + public void testBulkSuspendNegative() throws Exception { + WorkflowJobBean job1 = this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING); + WorkflowActionBean action1 = this.addRecordToWfActionTable(job1.getId(), "1", WorkflowAction.Status.RUNNING); + + Map<String, List<String>> map = new HashMap<String, List<String>>(); + List<String> names = new ArrayList<String>(); + names.add("testApp-new"); + map.put("name", names); + + new BulkWorkflowXCommand(map, 1, 50, OperationType.Suspend).call(); + verifyJobStatus(job1.getId(), WorkflowJob.Status.RUNNING); + verifyActionStatus(action1.getId(), WorkflowAction.Status.RUNNING); + } + + public void testBulkResumeNegative() throws Exception { + WorkflowJobBean job1 = this.addRecordToWfJobTable(WorkflowJob.Status.SUSPENDED, WorkflowInstance.Status.SUSPENDED); + WorkflowActionBean action1 = this.addRecordToWfActionTable(job1.getId(), "1", WorkflowAction.Status.RUNNING); + + Map<String, List<String>> map = new HashMap<String, List<String>>(); + List<String> names = new ArrayList<String>(); + names.add("testApp-new"); + map.put("name", names); + + new BulkWorkflowXCommand(map, 1, 50, OperationType.Resume).call(); + verifyJobStatus(job1.getId(), WorkflowJob.Status.SUSPENDED); + verifyActionStatus(action1.getId(), WorkflowAction.Status.RUNNING); + } + + public void testBulkResumeNoOp() throws Exception { + WorkflowJobBean job1 = this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING); + WorkflowActionBean action1 = this.addRecordToWfActionTable(job1.getId(), "1", WorkflowAction.Status.RUNNING); + + Map<String, List<String>> map = new HashMap<String, List<String>>(); + List<String> names = new ArrayList<String>(); + names.add("testApp"); + map.put("name", names); + + new BulkWorkflowXCommand(map, 1, 50, OperationType.Resume).call(); + verifyJobStatus(job1.getId(), WorkflowJob.Status.RUNNING); + verifyActionStatus(action1.getId(), WorkflowAction.Status.RUNNING); + } + + private void verifyJobStatus(String jobId, WorkflowJob.Status status) throws Exception { + WorkflowJobBean job = WorkflowJobQueryExecutor.getInstance().get( + WorkflowJobQueryExecutor.WorkflowJobQuery.GET_WORKFLOW, jobId); + assertEquals(status, job.getStatus()); + } + + private void verifyActionStatus(String actionId, WorkflowAction.Status status) throws Exception { + WorkflowActionBean action = WorkflowActionQueryExecutor.getInstance().get( + WorkflowActionQueryExecutor.WorkflowActionQuery.GET_ACTION, actionId); + assertEquals(status, action.getStatus()); + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/core/src/test/java/org/apache/oozie/servlet/MockCoordinatorEngineService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/servlet/MockCoordinatorEngineService.java b/core/src/test/java/org/apache/oozie/servlet/MockCoordinatorEngineService.java index ca6ae19..035a8d4 100644 --- a/core/src/test/java/org/apache/oozie/servlet/MockCoordinatorEngineService.java +++ b/core/src/test/java/org/apache/oozie/servlet/MockCoordinatorEngineService.java @@ -32,6 +32,7 @@ import org.apache.oozie.CoordinatorActionInfo; import org.apache.oozie.CoordinatorEngine; import org.apache.oozie.CoordinatorEngineException; import org.apache.oozie.CoordinatorJobBean; +import org.apache.oozie.CoordinatorJobInfo; import org.apache.oozie.ErrorCode; import org.apache.oozie.XException; import org.apache.oozie.client.CoordinatorAction; @@ -250,6 +251,27 @@ public class MockCoordinatorEngineService extends CoordinatorEngineService { return ""; } + @Override + public CoordinatorJobInfo suspendJobs(String filter, int start, int length) + throws CoordinatorEngineException { + did = RestConstants.JOBS; + return new CoordinatorJobInfo(new ArrayList<CoordinatorJobBean>(), 0, 0, 0); + } + + @Override + public CoordinatorJobInfo resumeJobs(String filter, int start, int length) + throws CoordinatorEngineException { + did = RestConstants.JOBS; + return new CoordinatorJobInfo(new ArrayList<CoordinatorJobBean>(), 0, 0, 0); + } + + @Override + public CoordinatorJobInfo killJobs(String filter, int start, int length) + throws CoordinatorEngineException { + did = RestConstants.JOBS; + return new CoordinatorJobInfo(new ArrayList<CoordinatorJobBean>(), 0, 0, 0); + } + private int validateCoordinatorIdx(String jobId) throws CoordinatorEngineException { int idx = -1; try { http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/core/src/test/java/org/apache/oozie/servlet/MockDagEngineService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/servlet/MockDagEngineService.java b/core/src/test/java/org/apache/oozie/servlet/MockDagEngineService.java index 89cfcea..60b7735 100644 --- a/core/src/test/java/org/apache/oozie/servlet/MockDagEngineService.java +++ b/core/src/test/java/org/apache/oozie/servlet/MockDagEngineService.java @@ -154,6 +154,24 @@ public class MockDagEngineService extends DagEngineService { } @Override + public WorkflowsInfo suspendJobs(String filter, int start, int len) throws DagEngineException { + did = RestConstants.JOBS; + return new WorkflowsInfo(new ArrayList<WorkflowJobBean>(), 0, 0, 0); + } + + @Override + public WorkflowsInfo resumeJobs(String filter, int start, int len) throws DagEngineException { + did = RestConstants.JOBS; + return new WorkflowsInfo(new ArrayList<WorkflowJobBean>(), 0, 0, 0); + } + + @Override + public WorkflowsInfo killJobs(String filter, int start, int len) throws DagEngineException { + did = RestConstants.JOBS; + return new WorkflowsInfo(new ArrayList<WorkflowJobBean>(), 0, 0, 0); + } + + @Override public void reRun(String jobId, Configuration conf) throws DagEngineException { did = RestConstants.JOB_ACTION_RERUN; int idx = validateWorkflowIdx(jobId); http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/docs/src/site/twiki/DG_CommandLineTool.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/DG_CommandLineTool.twiki b/docs/src/site/twiki/DG_CommandLineTool.twiki index bd53bf9..762e2f1 100644 --- a/docs/src/site/twiki/DG_CommandLineTool.twiki +++ b/docs/src/site/twiki/DG_CommandLineTool.twiki @@ -105,6 +105,15 @@ usage: -offset <arg> jobs offset (default '1') -oozie <arg> Oozie URL -timezone <arg> use time zone with the specified ID (default GMT). See 'oozie info -timezones' for a list + -kill kill all jobs that satisfy the filter, len, offset, or/and jobtype options. If it's used without + other options, it will kill all the first 50 workflow jobs. Command will fail if one or more + of the jobs is in wrong state. + -suspend suspend all jobs that satisfy the filter, len, offset, or/and jobtype options. If it's used without + other options, it will suspend all the first 50 workflow jobs. Command will fail if one or more + of the jobs is in wrong state. + -resume resume all jobs that satisfy the filter, len, offset, or/and jobtype options. If it's used without + other options, it will resume all the first 50 workflow jobs. Command will fail if one or more + of the jobs is in wrong state. -verbose verbose mode . oozie admin <OPTIONS> : admin operations @@ -1000,6 +1009,55 @@ Job ID Bundle Name Status Kickoff The =jobtype= option specified the job type to display, default value is 'wf'. To see the bundle jobs, value is 'bundle'. +---+++ Bulk kill, suspend or resume multiple jobs + +Example: + +<verbatim> +$ oozie jobs -oozie http://localhost:11000/oozie -kill|-suspend|-resume -filter name=cron-coord -jobtype coordinator +The following jobs have been killed|suspended|resumed +Job ID App Name Status Freq Unit Started Next Materialized +.------------------------------------------------------------------------------------------------------------------------------------ +0000005-150224141553231-oozie-bzha-C cron-coord KILLED 10 MINUTE 2015-02-24 22:05 GMT 2015-02-24 23:05 GMT +.------------------------------------------------------------------------------------------------------------------------------------ +0000001-150224141553231-oozie-bzha-C cron-coord KILLED 10 MINUTE 2015-02-24 22:00 GMT 2015-02-24 23:00 GMT +.------------------------------------------------------------------------------------------------------------------------------------ +0000000-150224141553231-oozie-bzha-C cron-coord KILLED 10 MINUTE 2015-02-25 22:00 GMT - +.------------------------------------------------------------------------------------------------------------------------------------ +</verbatim> + +The above command will kill, suspend, or resume all the coordinator jobs with name of "cron-coord" starting with offset 1 +to 50. +The =jobs= sub-command will bulk modify all the jobs that satisfy the filter, len, offset, and jobtype options when adding +a -kill|-suspend|-resume option. Another way to think about is the subcommand works to modify all the jobs that will be +displayed if the write option(kill|suspend|resume) is not there. + +The =offset= and =len= option specified the offset and number of jobs to be modified, default values are =1= and =50= +respectively. +The =jobtype= option specifies the type of jobs to be modified, be it "wf", "coordinator" or "bundle". default value is "wf". + +A filter can be specified after all options. + +The =filter=option syntax is: <code>[NAME=VALUE][;NAME=VALUE]*</code>. + +Valid filter names are: + + * name: the workflow application name from the workflow definition. + * user: the user that submitted the job. + * group: the group for the job. + * status: the status of the job. + * frequency: the frequency of the Coordinator job. + * unit: the time unit. It can take one of the following four values: months, days, hours or minutes. Time unit should be added only when frequency is specified. + +The query will do an AND among all the filter names. The query will do an OR among all the filter values for the same +name. Multiple values must be specified as different name value pairs. + +The following example shows how to suspend the first 20 bundle jobs whose name is "bundle-app": + +<verbatim> +$ oozie jobs -oozie http://localhost:11000/oozie -suspend -filter name=bundle-app -jobtype bundle -len 20 +</verbatim> + ---+++ Bulk monitoring for jobs launched via Bundles * This command-line query helps to directly query for a bulk of jobs using a set of rich filters. http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/docs/src/site/twiki/WebServicesAPI.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/WebServicesAPI.twiki b/docs/src/site/twiki/WebServicesAPI.twiki index 3dc359a..bf2bf50 100644 --- a/docs/src/site/twiki/WebServicesAPI.twiki +++ b/docs/src/site/twiki/WebServicesAPI.twiki @@ -1653,6 +1653,126 @@ The valid values of job type are: =wf=, =coordinator= or =bundle=. startCreatedTime and endCreatedTime should be specified either in *ISO8601 (UTC)* format (*yyyy-MM-dd'T'HH:mm'Z'*) or a offset value in days or hours from the current time. for example, -2d means the current time - 2 days. -3h means the current time - 3 hours. -5m means the current time - 5 minutes +---++++ Bulk modify jobs + +A HTTP PUT request can kill, suspend, or resume all jobs that satisfy the url encoded parameters. + +*Request:* + +<verbatim> +PUT /oozie/v1/jobs?action=kill&filter=name%3Dcron-coord&offset=1&len=50&jobtype=coordinator +</verbatim> + +This request will kill all the coordinators with name=cron-coord up to 50 of them. + +Note that the filter is URL encoded, its decoded value is <code>name=cron-coord</code>. +The syntax for the filter is <verbatim>[NAME=VALUE][;NAME=VALUE]*</verbatim> + +Valid filter names are: + + * name: the application name from the workflow/coordinator/bundle definition + * user: the user that submitted the job + * group: the group for the job + * status: the status of the job + +The query will do an AND among all the filter names. + +The query will do an OR among all the filter values for the same name. Multiple values must be specified as different +name value pairs. + +Additionally the =offset= and =len= parameters can be used for pagination. The start parameter is base 1. + +Moreover, the =jobtype= parameter could be used to determine what type of job is looking for. +The valid values of job type are: =wf=, =coordinator= or =bundle= + +*Response:* +<verbatim> +HTTP/1.1 200 OK +Content-Type: application/json;charset=UTF-8 +. +{ + offset: 1, + len: 50, + total: 2, +**jobs: [ + { +** jobType: "coordinator" + id: "0-200905191240-oozie-C", + appName: "cron-coord", + appPath: "hdfs://user/bansalm/app/cron-coord.xml", + user: "bansalm", + group: "other", + status: "KILLED", + createdTime: "Thu, 01 Jan 2009 00:00:00 GMT", + startTime: "Fri, 02 Jan 2009 00:00:00 GMT", + endTime: "Fri, 31 Dec 2009 00:00:00 GMT", + info: "nextAction=5", + }, + { +** jobType: "coordinator" + id: "0-200905191240-oozie-C", + appName: "cron-coord", + appPath: "hdfs://user/bansalm/myapp/cron-coord.xml", + user: "bansalm", + group: "other", + status: "KILLED", + createdTime: "Thu, 01 Jan 2009 00:00:00 GMT", + startTime: "Fri, 02 Jan 2009 00:00:00 GMT", + endTime: "Fri, 31 Dec 2009 00:00:00 GMT", + }, + ... + ] +} +</verbatim> + +<verbatim> +PUT /oozie/v1/jobs?action=suspend&filter=status%3Drunning&offset=1&len=50&jobtype=wf +</verbatim> + +This request will suspend all the workflows with status=running up to 50 of them. +Note that the filter is URL encoded, its decoded value is <code>status=running</code>. + +*Response:* +<verbatim> +HTTP/1.1 200 OK +Content-Type: application/json;charset=UTF-8 +. +{ + offset: 1, + len: 50, + total: 50, +**jobs: [ + { +** jobType: "workflow" + id: "0-200905191240-oozie-W", + appName: "indexer-workflow", + appPath: "hdfs://user/tucu/indexer-wf", + user: "bansalm", + group: "other", + status: "SUSPENDED", + createdTime: "Thu, 01 Jan 2009 00:00:00 GMT", + startTime: "Fri, 02 Jan 2009 00:00:00 GMT", + endTime: null, + info: "run=0", + }, + { +** jobType: "workflow" + id: "0-200905191240-oozie-W", + appName: "logprocessor-wf", + appPath: "hdfs://user/bansalm/myapp/workflow.xml", + user: "bansalm", + group: "other", + status: "SUSPENDED", + createdTime: "Thu, 01 Jan 2009 00:00:00 GMT", + startTime: "Fri, 02 Jan 2009 00:00:00 GMT", + endTime: null, + info: "run=0", + }, + ... + ] +} +</verbatim> + ---++++ Jobs information using Bulk API A HTTP GET request retrieves a bulk response for all actions, corresponding to a particular bundle, that satisfy user specified criteria. http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 3c3a122..721f681 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.2.0 release (trunk - unreleased) +OOZIE-2108 bulk kill, suspend, resume jobs using existing filter, offset, len, and jobtype params (bzhang) OOZIE-2167 TestCoordMaterializeTransitionXCommand fails (rkanter) OOZIE-1964 Hive Server 2 action doesn't return Hadoop Job IDs (rkanter) OOZIE-2126 SSH action can be too fast for Oozie sometimes (rkanter)
