Repository: oozie Updated Branches: refs/heads/master e0ada0ab7 -> d8425480e
OOZIE-2345 Parallel job submission for forked actions Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/d8425480 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/d8425480 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/d8425480 Branch: refs/heads/master Commit: d8425480ea917fb39a2fb74b59d17da7cb85ca07 Parents: e0ada0a Author: Purshotam Shah <[email protected]> Authored: Thu Sep 17 13:46:50 2015 -0700 Committer: Purshotam Shah <[email protected]> Committed: Thu Sep 17 13:46:50 2015 -0700 ---------------------------------------------------------------------- .../oozie/command/wf/ActionStartXCommand.java | 60 ++--- .../apache/oozie/command/wf/ActionXCommand.java | 24 +- .../command/wf/ForkedActionStartXCommand.java | 99 ++++++++ .../apache/oozie/command/wf/SignalXCommand.java | 118 ++++++++- .../oozie/service/CallableQueueService.java | 27 +- .../apache/oozie/util/PriorityDelayQueue.java | 19 +- core/src/main/resources/oozie-default.xml | 9 + .../wf/TestForkedActionStartXCommand.java | 244 +++++++++++++++++++ .../oozie/command/wf/TestReRunXCommand.java | 10 +- .../oozie/command/wf/TestSignalXCommand.java | 35 ++- .../service/ExtendedCallableQueueService.java | 37 +++ .../oozie/util/TestPriorityDelayQueue.java | 225 ++++++++++------- release-log.txt | 1 + 13 files changed, 761 insertions(+), 147 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/d8425480/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java index e06649c..85a6cd7 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java @@ -21,7 +21,6 @@ package org.apache.oozie.command.wf; import java.util.ArrayList; import java.util.Date; import java.util.List; - import javax.servlet.jsp.el.ELException; import org.apache.hadoop.conf.Configuration; @@ -62,7 +61,7 @@ import org.apache.oozie.util.XmlUtils; import org.apache.oozie.util.db.SLADbXOperations; @SuppressWarnings("deprecation") -public class ActionStartXCommand extends ActionXCommand<Void> { +public class ActionStartXCommand extends ActionXCommand<org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext> { public static final String EL_ERROR = "EL_ERROR"; public static final String EL_EVAL_ERROR = "EL_EVAL_ERROR"; public static final String COULD_NOT_START = "COULD_NOT_START"; @@ -71,13 +70,14 @@ public class ActionStartXCommand extends ActionXCommand<Void> { public static final String OOZIE_ACTION_YARN_TAG = "oozie.action.yarn.tag"; private String jobId = null; - private String actionId = null; + protected String actionId = null; private WorkflowJobBean wfJob = null; - private WorkflowActionBean wfAction = null; + protected WorkflowActionBean wfAction = null; private JPAService jpaService = null; private ActionExecutor executor = null; private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); private List<JsonBean> insertList = new ArrayList<JsonBean>(); + protected ActionExecutorContext context = null; public ActionStartXCommand(String actionId, String type) { super("action.start", type, 0); @@ -157,8 +157,7 @@ public class ActionStartXCommand extends ActionXCommand<Void> { } @Override - protected Void execute() throws CommandException { - + protected ActionExecutorContext execute() throws CommandException { LOG.debug("STARTED ActionStartXCommand for wf actionId=" + actionId); Configuration conf = wfJob.getWorkflowInstance().getConf(); @@ -174,7 +173,6 @@ public class ActionStartXCommand extends ActionXCommand<Void> { executor.setMaxRetries(maxRetries); executor.setRetryInterval(retryInterval); - ActionExecutorContext context = null; try { boolean isRetry = false; if (wfAction.getStatus() == WorkflowActionBean.Status.START_RETRY @@ -284,8 +282,7 @@ public class ActionStartXCommand extends ActionXCommand<Void> { LOG.info(XLog.STD, "[***" + wfAction.getId() + "***]" + "Action status=" + wfAction.getStatusStr()); updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START, wfAction)); - wfJob.setLastModifiedTime(new Date()); - updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob)); + updateJobLastModified(); // Add SLA status event (STARTED) for WF_ACTION SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.STARTED, SlaAppType.WORKFLOW_ACTION); @@ -318,18 +315,12 @@ public class ActionStartXCommand extends ActionXCommand<Void> { case FAILED: try { failJob(context); - updateParentIfNecessary(wfJob, 3); - new WfEndXCommand(wfJob).call(); // To delete the WF temp dir + endWF(); SLAEventBean slaEvent1 = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.FAILED, SlaAppType.WORKFLOW_ACTION); if(slaEvent1 != null) { insertList.add(slaEvent1); } - SLAEventBean slaEvent2 = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), wfJob.getId(), Status.FAILED, - SlaAppType.WORKFLOW_JOB); - if(slaEvent2 != null) { - insertList.add(slaEvent2); - } } catch (XException x) { LOG.warn("ActionStartXCommand - case:FAILED ", x.getMessage()); @@ -337,8 +328,7 @@ public class ActionStartXCommand extends ActionXCommand<Void> { break; } updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START, wfAction)); - wfJob.setLastModifiedTime(new Date()); - updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob)); + updateJobLastModified(); } finally { try { @@ -349,7 +339,7 @@ public class ActionStartXCommand extends ActionXCommand<Void> { if (execSynchronous) { // Changing to synchronous call from asynchronous queuing to prevent // undue delay from ::start:: to action due to queuing - new ActionEndXCommand(wfAction.getId(), wfAction.getType()).call(getEntityKey()); + callActionEnd(); } } catch (JPAExecutorException e) { @@ -362,24 +352,36 @@ public class ActionStartXCommand extends ActionXCommand<Void> { return null; } - private void handleError(ActionExecutorContext context, WorkflowJobBean workflow, WorkflowActionBean action) + protected void callActionEnd() throws CommandException { + new ActionEndXCommand(wfAction.getId(), wfAction.getType()).call(getEntityKey()); + } + + protected void updateJobLastModified(){ + wfJob.setLastModifiedTime(new Date()); + updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob)); + } + + protected void endWF() throws CommandException{ + updateParentIfNecessary(wfJob, 3); + new WfEndXCommand(wfJob).call(); // To delete the WF temp dir + SLAEventBean slaEvent2 = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), wfJob.getId(), Status.FAILED, + SlaAppType.WORKFLOW_JOB); + if(slaEvent2 != null) { + insertList.add(slaEvent2); + } + } + + protected void handleError(ActionExecutorContext context, WorkflowJobBean workflow, WorkflowActionBean action) throws CommandException { failJob(context); updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START, wfAction)); - wfJob.setLastModifiedTime(new Date()); - updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob)); + updateJobLastModified(); SLAEventBean slaEvent1 = SLADbXOperations.createStatusEvent(action.getSlaXml(), action.getId(), Status.FAILED, SlaAppType.WORKFLOW_ACTION); if(slaEvent1 != null) { insertList.add(slaEvent1); } - SLAEventBean slaEvent2 = SLADbXOperations.createStatusEvent(workflow.getSlaXml(), workflow.getId(), - Status.FAILED, SlaAppType.WORKFLOW_JOB); - if(slaEvent2 != null) { - insertList.add(slaEvent2); - } - - new WfEndXCommand(wfJob).call(); //To delete the WF temp dir + endWF(); return; } http://git-wip-us.apache.org/repos/asf/oozie/blob/d8425480/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java index 2616d32..b024bd0 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java @@ -34,6 +34,7 @@ import org.apache.oozie.ErrorCode; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.action.ActionExecutor; +import org.apache.oozie.client.Job; import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.client.WorkflowJob; import org.apache.oozie.command.CommandException; @@ -56,7 +57,7 @@ import org.apache.oozie.workflow.lite.LiteWorkflowInstance; * Base class for Action execution commands. Provides common functionality to handle different types of errors while * attempting to start or end an action. */ -public abstract class ActionXCommand<T> extends WorkflowXCommand<Void> { +public abstract class ActionXCommand<T> extends WorkflowXCommand<T> { private static final String INSTRUMENTATION_GROUP = "action.executors"; protected static final String RECOVERY_ID_SEPARATOR = "@"; @@ -281,8 +282,10 @@ public abstract class ActionXCommand<T> extends WorkflowXCommand<Void> { private boolean started; private boolean ended; private boolean executed; + private boolean shouldEndWF; + private Job.Status jobStatus; - /** + /** * Constructing the ActionExecutorContext, setting the private members * and constructing the proto configuration */ @@ -498,6 +501,23 @@ public abstract class ActionXCommand<T> extends WorkflowXCommand<Void> { public void setErrorInfo(String str, String exMsg) { action.setErrorInfo(str, exMsg); } + + public boolean isShouldEndWF() { + return shouldEndWF; + } + + public void setShouldEndWF(boolean shouldEndWF) { + this.shouldEndWF = shouldEndWF; + } + + public Job.Status getJobStatus() { + return jobStatus; + } + + public void setJobStatus(Job.Status jobStatus) { + this.jobStatus = jobStatus; + } + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/d8425480/core/src/main/java/org/apache/oozie/command/wf/ForkedActionStartXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/ForkedActionStartXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ForkedActionStartXCommand.java new file mode 100644 index 0000000..47dca75 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/command/wf/ForkedActionStartXCommand.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.command.wf; + +import org.apache.oozie.WorkflowActionBean; +import org.apache.oozie.WorkflowJobBean; +import org.apache.oozie.action.ActionExecutor; +import org.apache.oozie.client.Job; +import org.apache.oozie.client.WorkflowAction; +import org.apache.oozie.command.CommandException; +import org.apache.oozie.command.XCommand; + +public class ForkedActionStartXCommand extends ActionStartXCommand { + + public ForkedActionStartXCommand(String actionId, String type) { + super(actionId, type); + } + + public ForkedActionStartXCommand(WorkflowJobBean wfJob, String id, String type) { + super(wfJob, id, type); + } + + protected ActionExecutorContext execute() throws CommandException { + super.execute(); + return context; + } + + @Override + public String getEntityKey() { + return actionId; + } + + // In case of requeue follow the old approach. + @Override + protected void queue(XCommand<?> command, long msDelay) { + + if (command instanceof ForkedActionStartXCommand) { + LOG.debug("Queueing ActionStartXCommand command"); + super.queue(new ActionStartXCommand(wfAction.getId(), wfAction.getType()), msDelay); + } + else { + LOG.debug("Queueing " + command); + super.queue(command, msDelay); + } + } + + // Job will be failed by SignalXcommand, because ForkedActionStartXCommand doesn't have lock on jobId. + @Override + public void failJob(ActionExecutor.Context context, WorkflowActionBean action) throws CommandException { + this.context.setJobStatus(Job.Status.FAILED); + } + + @Override + protected void updateParentIfNecessary(WorkflowJobBean wfjob, int maxRetries) throws CommandException { + } + + @Override + protected void handleNonTransient(ActionExecutor.Context context, ActionExecutor executor, + WorkflowAction.Status status) throws CommandException { + this.context.setJobStatus(Job.Status.SUSPENDED); + } + + @Override + protected void handleError(ActionExecutorContext context, WorkflowJobBean workflow, WorkflowActionBean action) + throws CommandException { + this.context.setJobStatus(Job.Status.FAILED); + } + + @Override + protected void updateJobLastModified() { + } + + // Not killing job, setting flag so that signalX can kill the job + @Override + protected void endWF() { + context.setShouldEndWF(true); + } + + @Override + protected void callActionEnd() { + queue(new ActionEndXCommand(wfAction.getId(), wfAction.getType())); + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/d8425480/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java index d1fcd1a..6f64647 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java @@ -25,6 +25,7 @@ import org.apache.oozie.action.ActionExecutor; import org.apache.oozie.action.control.ForkActionExecutor; import org.apache.oozie.action.control.StartActionExecutor; import org.apache.oozie.action.oozie.SubWorkflowActionExecutor; +import org.apache.oozie.client.Job; import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.client.WorkflowJob; import org.apache.oozie.client.SLAEvent.SlaAppType; @@ -46,6 +47,9 @@ import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQ import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; import org.apache.oozie.service.ActionService; +import org.apache.oozie.service.CallableQueueService; +import org.apache.oozie.service.CallableQueueService.CallableWrapper; +import org.apache.oozie.service.ConfigurationService; import org.apache.oozie.service.ELService; import org.apache.oozie.service.EventHandlerService; import org.apache.oozie.service.JPAService; @@ -70,6 +74,7 @@ import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Map; +import java.util.concurrent.Future; import org.apache.oozie.client.OozieClient; @@ -86,6 +91,7 @@ public class SignalXCommand extends WorkflowXCommand<Void> { private boolean generateEvent = false; private String wfJobErrorCode; private String wfJobErrorMsg; + public final static String FORK_PARALLEL_JOBSUBMISSION = "oozie.workflow.parallel.fork.action.start"; public SignalXCommand(String name, int priority, String jobId) { super(name, name, priority); @@ -164,6 +170,8 @@ public class SignalXCommand extends WorkflowXCommand<Void> { WorkflowJob.Status prevStatus = wfJob.getStatus(); boolean completed = false, skipAction = false; WorkflowActionBean syncAction = null; + List<WorkflowActionBean> workflowActionBeanListForForked = new ArrayList<WorkflowActionBean>(); + if (wfAction == null) { if (wfJob.getStatus() == WorkflowJob.Status.PREP) { @@ -387,15 +395,31 @@ public class SignalXCommand extends WorkflowXCommand<Void> { ActionExecutor current = as.getExecutor(wfAction.getType()); LOG.trace("Current Action Type:" + current.getClass()); if (!suspendNewAction) { - if (!(current instanceof ForkActionExecutor) && !(current instanceof StartActionExecutor)) { + if (current instanceof StartActionExecutor) { // Excluding :start: here from executing first action synchronously since it // blocks the consumer thread till the action is submitted to Hadoop, // in turn reducing the number of new submissions the threads can accept. // Would also be susceptible to longer delays in case Hadoop cluster is busy. - syncAction = newAction; + queue(new ActionStartXCommand(newAction.getId(), newAction.getType())); + } + else if (current instanceof ForkActionExecutor) { + if (ConfigurationService.getBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION)) { + workflowActionBeanListForForked.add(newAction); + } + else { + queue(new ActionStartXCommand(newAction.getId(), newAction.getType())); + + } } else { - queue(new ActionStartXCommand(newAction.getId(), newAction.getType())); + syncAction = newAction; + } + } + else { + // suspend check will happen later... where if one of action is suspended all forked action + // will be ignored. + if (ConfigurationService.getBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION)) { + workflowActionBeanListForForked.add(newAction); } } } @@ -434,10 +458,87 @@ public class SignalXCommand extends WorkflowXCommand<Void> { else if (syncAction != null) { new ActionStartXCommand(wfJob, syncAction.getId(), syncAction.getType()).call(getEntityKey()); } + else if (!workflowActionBeanListForForked.isEmpty() && !checkForSuspendNode(workflowActionBeanListForForked)){ + startForkedActions(workflowActionBeanListForForked); + } LOG.debug("ENDED SignalCommand for jobid=" + jobId + ", actionId=" + actionId); return null; } + public void startForkedActions(List<WorkflowActionBean> workflowActionBeanListForForked) throws CommandException { + + List<CallableWrapper<ActionExecutorContext>> tasks = + new ArrayList<CallableWrapper<ActionExecutorContext>>(); + boolean updateLastModified = true; + for (WorkflowActionBean workflowActionBean : workflowActionBeanListForForked) { + LOG.debug("Starting forked actions parallely : " + workflowActionBean.getId()); + tasks.add(Services.get().get(CallableQueueService.class).new CallableWrapper<ActionExecutorContext>( + new ForkedActionStartXCommand(wfJob, workflowActionBean.getId(), workflowActionBean.getType()), 0)); + } + + try { + List<Future<ActionExecutorContext>> futures = Services.get().get(CallableQueueService.class).invokeAll(tasks); + for (Future<ActionExecutorContext> result : futures) { + ActionExecutorContext context = result.get(); + if (context.getJobStatus() != null && context.getJobStatus().equals(Job.Status.FAILED)) { + LOG.warn("Action has failed, failing job" + context.getAction().getId()); + new ActionStartXCommand(context.getAction().getId(), null).failJob(context); + updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START, + (WorkflowActionBean) context.getAction())); + if (context.isShouldEndWF()) { + endWF(); + updateLastModified = false; + break; + } + } + if (context.getJobStatus() != null && context.getJobStatus().equals(Job.Status.SUSPENDED)) { + LOG.warn("Action has failed, failing job" + context.getAction().getId()); + new ActionStartXCommand(context.getAction().getId(), null).handleNonTransient(context, null, + WorkflowAction.Status.START_MANUAL); + updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START, + (WorkflowActionBean) context.getAction())); + if (context.isShouldEndWF()) { + endWF(); + updateLastModified = false; + break; + } + } + } + } + catch (Exception e) { + LOG.error("Error running forked jobs parallely", e); + startForkedActionsByQueuing(workflowActionBeanListForForked); + } + if (updateLastModified) { + updateJobLastModified(); + } + LOG.debug("forked actions submitted parallely"); + } + + public void startForkedActionsByQueuing(List<WorkflowActionBean> workflowActionBeanListForForked) throws CommandException { + //queuing all jobs, submitted job will fail in precondition + for (WorkflowActionBean workflowActionBean : workflowActionBeanListForForked) { + LOG.debug("Queuing fork action " + workflowActionBean.getId()); + queue(new ActionStartXCommand(wfJob, workflowActionBean.getId(), workflowActionBean.getType())); + } + } + + protected void updateJobLastModified() { + wfJob.setLastModifiedTime(new Date()); + updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, + wfJob)); + } + + protected void endWF() throws CommandException { + updateParentIfNecessary(wfJob, 3); + new WfEndXCommand(wfJob).call(); // To delete the WF temp dir + SLAEventBean slaEvent2 = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), wfJob.getId(), Status.FAILED, + SlaAppType.WORKFLOW_JOB); + if (slaEvent2 != null) { + insertList.add(slaEvent2); + } + } + public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) { ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group); for (Map.Entry<String, String> entry : conf) { @@ -538,4 +639,15 @@ public class SignalXCommand extends WorkflowXCommand<Void> { return suspendNewAction; } + + +private boolean checkForSuspendNode(List<WorkflowActionBean> workflowActionBeanListForForked) { + for(WorkflowActionBean bean :workflowActionBeanListForForked) + if(checkForSuspendNode(bean)){ + return true; + } + return false; +} + + } http://git-wip-us.apache.org/repos/asf/oozie/blob/d8425480/core/src/main/java/org/apache/oozie/service/CallableQueueService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/CallableQueueService.java b/core/src/main/java/org/apache/oozie/service/CallableQueueService.java index 830a58e..3333c77 100644 --- a/core/src/main/java/org/apache/oozie/service/CallableQueueService.java +++ b/core/src/main/java/org/apache/oozie/service/CallableQueueService.java @@ -29,14 +29,16 @@ import java.util.Map; import java.util.Set; import java.util.Map.Entry; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.concurrent.RunnableFuture; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.oozie.client.OozieClient.SYSTEM_MODE; -import org.apache.oozie.command.XCommand; import org.apache.oozie.util.Instrumentable; import org.apache.oozie.util.Instrumentation; import org.apache.oozie.util.PollablePriorityDelayQueue; @@ -144,10 +146,10 @@ public class CallableQueueService implements Service, Instrumentable { // and instrumentation. // The wrapper implements Runnable and Comparable to be able to work with an // executor and a priority queue. - class CallableWrapper extends PriorityDelayQueue.QueueElement<XCallable<?>> implements Runnable { + public class CallableWrapper<E> extends PriorityDelayQueue.QueueElement<E> implements Runnable, Callable<E> { private Instrumentation.Cron cron; - public CallableWrapper(XCallable<?> callable, long delay) { + public CallableWrapper(XCallable<E> callable, long delay) { super(callable, callable.getPriority(), delay, TimeUnit.MILLISECONDS); cron = new Instrumentation.Cron(); cron.start(); @@ -172,7 +174,8 @@ public class CallableQueueService implements Service, Instrumentable { log.trace("executing callable [{0}]", callable.getName()); try { - callable.call(); + //FutureTask.run() will invoke cllable.call() + super.run(); incrCounter(INSTR_EXECUTED_COUNTER, 1); log.trace("executed callable [{0}]", callable.getName()); } @@ -245,6 +248,13 @@ public class CallableQueueService implements Service, Instrumentable { uniqueCallables.remove(callable.getKey()); } } + + //this will not get called, bcz newTaskFor of threadpool will convert it in futureTask which is a runnable. + // futureTask will call the cllable.call from run method. so we override run to call super.run method. + @Override + public E call() throws Exception { + return null; + } } class CompositeCallable implements XCallable<Void> { @@ -498,6 +508,9 @@ public class CallableQueueService implements Service, Instrumentable { super.beforeExecute(t,r); XLog.Info.get().clear(); } + protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { + return (RunnableFuture<T>)callable; + } }; for (int i = 0; i < threads; i++) { @@ -770,4 +783,10 @@ public class CallableQueueService implements Service, Instrumentable { return list; } + // Refer executor.invokeAll + public <T> List<Future<T>> invokeAll(List<CallableWrapper<T>> tasks) + throws InterruptedException { + return executor.invokeAll(tasks); + } + } http://git-wip-us.apache.org/repos/asf/oozie/blob/d8425480/core/src/main/java/org/apache/oozie/util/PriorityDelayQueue.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/PriorityDelayQueue.java b/core/src/main/java/org/apache/oozie/util/PriorityDelayQueue.java index ae54506..1ce6fae 100644 --- a/core/src/main/java/org/apache/oozie/util/PriorityDelayQueue.java +++ b/core/src/main/java/org/apache/oozie/util/PriorityDelayQueue.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; +import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; @@ -59,8 +60,8 @@ public class PriorityDelayQueue<E> extends AbstractQueue<PriorityDelayQueue.Queu * <p> * This wrapper keeps track of the priority and the age of a queue element. */ - public static class QueueElement<E> implements Delayed { - private E element; + public static class QueueElement<E> extends FutureTask<E> implements Delayed { + private XCallable<E> element; private int priority; private long baseTime; boolean inQueue; @@ -76,7 +77,8 @@ public class PriorityDelayQueue<E> extends AbstractQueue<PriorityDelayQueue.Queu * @throws IllegalArgumentException if the element is <tt>NULL</tt>, the priority is negative or if the delay is * negative. */ - public QueueElement(E element, int priority, long delay, TimeUnit unit) { + public QueueElement(XCallable<E> element, int priority, long delay, TimeUnit unit) { + super(element); if (element == null) { throw new IllegalArgumentException("element cannot be null"); } @@ -92,20 +94,11 @@ public class PriorityDelayQueue<E> extends AbstractQueue<PriorityDelayQueue.Queu } /** - * Create an Element wrapper with no delay and minimum priority. - * - * @param element element. - */ - public QueueElement(E element) { - this(element, 0, 0, TimeUnit.MILLISECONDS); - } - - /** * Return the element from the wrapper. * * @return the element. */ - public E getElement() { + public XCallable<E> getElement() { return element; } http://git-wip-us.apache.org/repos/asf/oozie/blob/d8425480/core/src/main/resources/oozie-default.xml ---------------------------------------------------------------------- diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml index 4b9a0bc..400569b 100644 --- a/core/src/main/resources/oozie-default.xml +++ b/core/src/main/resources/oozie-default.xml @@ -2127,6 +2127,15 @@ </description> </property> + <property> + <name>oozie.workflow.parallel.fork.action.start</name> + <value>true</value> + <description> + Determines how Oozie processes starting of forked actions. If true, forked actions and their job submissions + are done in parallel which is best for performance. If false, they are submitted sequentially. + </description> + </property> + <property> <name>oozie.coord.action.get.all.attributes</name> <value>false</value> http://git-wip-us.apache.org/repos/asf/oozie/blob/d8425480/core/src/test/java/org/apache/oozie/command/wf/TestForkedActionStartXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestForkedActionStartXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestForkedActionStartXCommand.java new file mode 100644 index 0000000..e685621 --- /dev/null +++ b/core/src/test/java/org/apache/oozie/command/wf/TestForkedActionStartXCommand.java @@ -0,0 +1,244 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.command.wf; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.net.URI; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.oozie.ForTestingActionExecutor; +import org.apache.oozie.WorkflowActionBean; +import org.apache.oozie.client.OozieClient; +import org.apache.oozie.client.WorkflowJob; +import org.apache.oozie.command.CommandException; +import org.apache.oozie.executor.jpa.JPAExecutorException; +import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor; +import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; +import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; +import org.apache.oozie.service.ActionService; +import org.apache.oozie.service.ConfigurationService; +import org.apache.oozie.service.ExtendedCallableQueueService; +import org.apache.oozie.service.JPAService; +import org.apache.oozie.service.LiteWorkflowStoreService; +import org.apache.oozie.service.SchemaService; +import org.apache.oozie.service.Services; +import org.apache.oozie.test.XDataTestCase; +import org.apache.oozie.util.XConfiguration; + +public class TestForkedActionStartXCommand extends XDataTestCase { + + private Services services; + + @Override + protected void setUp() throws Exception { + super.setUp(); + setSystemProperty(SchemaService.WF_CONF_EXT_SCHEMAS, "wf-ext-schema.xsd"); + setSystemProperty(LiteWorkflowStoreService.CONF_USER_RETRY_ERROR_CODE_EXT, ForTestingActionExecutor.TEST_ERROR); + setSystemProperty(Services.CONF_SERVICE_EXT_CLASSES, ExtendedCallableQueueService.class.getName()); + services = new Services(); + services.init(); + services.get(ActionService.class).registerAndInitExecutor(ForTestingActionExecutor.class); + ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, true); + + } + + @Override + protected void tearDown() throws Exception { + services.destroy(); + super.tearDown(); + } + + public void testWfSuccess() throws Exception { + Configuration conf = new XConfiguration(); + String workflowUri = getTestCaseFileUri("workflow.xml"); + //@formatter:off + String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:0.4\" name=\"wf-fork\">" + + "<start to=\"fork1\"/>" + + "<fork name=\"fork1\">" + + "<path start=\"action1\"/>" + + "<path start=\"action2\"/>" + + "</fork>" + + "<action name=\"action1\">" + + "<fs></fs>" + + "<ok to=\"join1\"/>" + + "<error to=\"kill\"/>" + + "</action><action name=\"action2\">" + + "<fs></fs><ok to=\"join1\"/>" + + "<error to=\"kill\"/>" + + "</action>" + + "<join name=\"join1\" to=\"end\"/>" + + "<kill name=\"kill\"><message>killed</message>" + + "</kill><" + + "end name=\"end\"/>" + + "</workflow-app>"; + //@Formatter:on + + writeToFile(appXml, workflowUri); + conf.set(OozieClient.APP_PATH, workflowUri); + conf.set(OozieClient.USER_NAME, getTestUser()); + + SubmitXCommand sc = new SubmitXCommand(conf); + final String jobId = sc.call(); + new StartXCommand(jobId).call(); + waitFor(20 * 1000, new Predicate() { + @Override + public boolean evaluate() throws Exception { + return WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId).getStatus() + == WorkflowJob.Status.SUCCEEDED; + } + }); + assertEquals(WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId).getStatus(), + WorkflowJob.Status.SUCCEEDED); + } + + public void testWfFailure() throws Exception { + Configuration conf = new XConfiguration(); + String workflowUri = getTestCaseFileUri("workflow.xml"); + //@formatter:off + String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:0.4\" name=\"wf-fork\">" + + "<start to=\"fork1\"/>" + + "<fork name=\"fork1\">" + + "<path start=\"action1\"/>" + + "<path start=\"action2\"/>" + + "</fork>" + + "<action name=\"action1\">" + + "<fs></fs>" + + "<ok to=\"kill\"/>" + + "<error to=\"kill\"/>" + + "</action><action name=\"action2\">" + + "<fs></fs><ok to=\"join1\"/>" + + "<error to=\"kill\"/>" + + "</action>" + + "<join name=\"join1\" to=\"end\"/>" + + "<kill name=\"kill\"><message>killed</message>" + + "</kill><" + + "end name=\"end\"/>" + + "</workflow-app>"; + //@Formatter:on + + writeToFile(appXml, workflowUri); + conf.set(OozieClient.APP_PATH, workflowUri); + conf.set(OozieClient.USER_NAME, getTestUser()); + SubmitXCommand sc = new SubmitXCommand(conf); + final String jobId = sc.call(); + new StartXCommand(jobId).call(); + waitFor(200 * 1000, new Predicate() { + @Override + public boolean evaluate() throws Exception { + return WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId).getStatus() + == WorkflowJob.Status.KILLED; + } + }); + assertEquals(WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId).getStatus(), + WorkflowJob.Status.KILLED); + } + + public void testUserRetry() throws JPAExecutorException, IOException, CommandException{ + Configuration conf = new XConfiguration(); + String workflowUri = getTestCaseFileUri("workflow.xml"); + + //@formatter:off + String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:0.3\" name=\"wf-fork\">" + + "<start to=\"fork1\"/>" + + "<fork name=\"fork1\">" + + "<path start=\"action1\"/>" + + "<path start=\"action2\"/>" + + "</fork>" + +"<action name=\"action1\" retry-max=\"2\" retry-interval=\"0\">" + + "<test xmlns=\"uri:test\">" + + "<signal-value>${wf:conf('signal-value')}</signal-value>" + + "<external-status>${wf:conf('external-status')}</external-status> " + + "<error>${wf:conf('error')}</error>" + + "<avoid-set-execution-data>${wf:conf('avoid-set-execution-data')}</avoid-set-execution-data>" + + "<avoid-set-end-data>${wf:conf('avoid-set-end-data')}</avoid-set-end-data>" + + "<running-mode>${wf:conf('running-mode')}</running-mode>" + + "</test>" + + "<ok to=\"join1\"/>" + + "<error to=\"kill\"/>" + + "</action>" + + "<action name=\"action2\">" + + "<fs></fs><ok to=\"join1\"/>" + + "<error to=\"kill\"/>" + + "</action>" + + "<join name=\"join1\" to=\"end\"/>" + + "<kill name=\"kill\"><message>killed</message></kill>" + + "<end name=\"end\"/>" + + "</workflow-app>"; + //@Formatter:on + writeToFile(appXml, workflowUri); + conf.set(OozieClient.APP_PATH, workflowUri); + conf.set(OozieClient.USER_NAME, getTestUser()); + conf.set("error", "start.error"); + conf.set("external-status", "error"); + conf.set("signal-value", "based_on_action_status"); + + SubmitXCommand sc = new SubmitXCommand(conf); + final String jobId = sc.call(); + new StartXCommand(jobId).call(); + final WorkflowActionsGetForJobJPAExecutor actionsGetExecutor = new WorkflowActionsGetForJobJPAExecutor(jobId); + final JPAService jpaService = Services.get().get(JPAService.class); + + waitFor(20 * 1000, new Predicate() { + public boolean evaluate() throws Exception { + List<WorkflowActionBean> actions = jpaService.execute(actionsGetExecutor); + WorkflowActionBean action = null; + for (WorkflowActionBean bean : actions) { + if (bean.getType().equals("test")) { + action = bean; + break; + } + } + return (action != null && action.getUserRetryCount() == 2); + } + }); + + List<WorkflowActionBean> actions = jpaService.execute(actionsGetExecutor); + WorkflowActionBean action = null; + for (WorkflowActionBean bean : actions) { + if (bean.getType().equals("test")) { + action = bean; + break; + } + } + assertNotNull(action); + assertEquals(2, action.getUserRetryCount()); + } + + private void writeToFile(String appXml, String appPath) throws IOException { + File wf = new File(URI.create(appPath)); + PrintWriter out = null; + try { + out = new PrintWriter(new FileWriter(wf)); + out.println(appXml); + } + catch (IOException iex) { + throw iex; + } + finally { + if (out != null) { + out.close(); + } + } + } +} + http://git-wip-us.apache.org/repos/asf/oozie/blob/d8425480/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java index 02f6166..45cbbc4 100644 --- a/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.io.Reader; import java.io.Writer; import java.util.List; + import org.apache.hadoop.fs.Path; import org.apache.oozie.local.LocalOozie; import org.apache.oozie.client.CoordinatorAction; @@ -37,6 +38,7 @@ import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.command.coord.CoordActionStartXCommand; import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor; import org.apache.oozie.service.ActionService; +import org.apache.oozie.service.ConfigurationService; import org.apache.oozie.service.JPAService; import org.apache.oozie.service.SchemaService; import org.apache.oozie.service.Services; @@ -123,7 +125,13 @@ public class TestReRunXCommand extends XDataTestCase { * * @throws Exception */ - public void testRerunFork() throws Exception { + public void testRerunFork() throws Exception{ + ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, true); + _testRerunFork(); + ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, false); + _testRerunFork(); + } + public void _testRerunFork() throws Exception { // We need the shell schema and action for this test Services.get().setService(ActionService.class); Services.get().getConf().set(SchemaService.WF_CONF_EXT_SCHEMAS, "shell-action-0.3.xsd"); http://git-wip-us.apache.org/repos/asf/oozie/blob/d8425480/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java index 4268b30..810bc1e 100644 --- a/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java @@ -38,6 +38,7 @@ import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.client.WorkflowJob; import org.apache.oozie.local.LocalOozie; +import org.apache.oozie.service.ConfigurationService; import org.apache.oozie.service.Services; import org.apache.oozie.test.XDataTestCase; import org.apache.oozie.util.IOUtils; @@ -51,8 +52,8 @@ public class TestSignalXCommand extends XDataTestCase { protected void setUp() throws Exception { super.setUp(); services = new Services(); - services.getConf().setBoolean(LiteWorkflowAppParser.VALIDATE_FORK_JOIN, false); services.init(); + ConfigurationService.setBoolean(LiteWorkflowAppParser.VALIDATE_FORK_JOIN, false); } @@ -61,8 +62,34 @@ public class TestSignalXCommand extends XDataTestCase { services.destroy(); super.tearDown(); } + public void testJoinFail() throws Exception{ + ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, true); + _testJoinFail(); + ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, false); + _testJoinFail(); + } + + public void testSuspendPoints() throws Exception{ + ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, true); + _testSuspendPoints(); + services.destroy(); + services = new Services(); + services.init(); + ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, false); + _testSuspendPoints(); + } + + public void testSuspendPointsAll() throws Exception{ + ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, true); + _testSuspendPointsAll(); + services.destroy(); + services = new Services(); + services.init(); + ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, false); + _testSuspendPointsAll(); + } - public void testJoinFail() throws Exception { + public void _testJoinFail() throws Exception { Logger logger = Logger.getLogger(SignalXCommand.class); ByteArrayOutputStream out = new ByteArrayOutputStream(); Layout layout = new SimpleLayout(); @@ -94,7 +121,7 @@ public class TestSignalXCommand extends XDataTestCase { assertFalse(out.toString().contains("EntityExistsException")); } - public void testSuspendPoints() throws Exception { + public void _testSuspendPoints() throws Exception { services.destroy(); LocalOozie.start(); FileSystem fs = getFileSystem(); @@ -168,7 +195,7 @@ public class TestSignalXCommand extends XDataTestCase { LocalOozie.stop(); } - public void testSuspendPointsAll() throws Exception { + public void _testSuspendPointsAll() throws Exception { services.destroy(); LocalOozie.start(); FileSystem fs = getFileSystem(); http://git-wip-us.apache.org/repos/asf/oozie/blob/d8425480/core/src/test/java/org/apache/oozie/service/ExtendedCallableQueueService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/ExtendedCallableQueueService.java b/core/src/test/java/org/apache/oozie/service/ExtendedCallableQueueService.java new file mode 100644 index 0000000..4fdeee5 --- /dev/null +++ b/core/src/test/java/org/apache/oozie/service/ExtendedCallableQueueService.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.service; + +import java.util.List; +import java.util.concurrent.Future; + +import org.apache.oozie.service.CallableQueueService; + +public class ExtendedCallableQueueService extends CallableQueueService { + @Override + public <T> List<Future<T>> invokeAll(List<CallableWrapper<T>> tasks) throws InterruptedException { + try { + return super.invokeAll(tasks); + } + catch (Throwable e) { + throw new Error(); + } + } + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/d8425480/core/src/test/java/org/apache/oozie/util/TestPriorityDelayQueue.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/util/TestPriorityDelayQueue.java b/core/src/test/java/org/apache/oozie/util/TestPriorityDelayQueue.java index 857f4e3..b48e0cc 100644 --- a/core/src/test/java/org/apache/oozie/util/TestPriorityDelayQueue.java +++ b/core/src/test/java/org/apache/oozie/util/TestPriorityDelayQueue.java @@ -22,7 +22,6 @@ import junit.framework.TestCase; import java.text.MessageFormat; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -48,47 +47,47 @@ public class TestPriorityDelayQueue extends TestCase { Object obj = new Object(); try { - new PriorityDelayQueue.QueueElement<Object>(null); + new TestQueueElement<Object>(null); fail(); } catch (IllegalArgumentException ex) { } try { - new PriorityDelayQueue.QueueElement<Object>(null, 0, 0, TimeUnit.MILLISECONDS); + new TestQueueElement<Object>(null, 0, 0, TimeUnit.MILLISECONDS); fail(); } catch (IllegalArgumentException ex) { } try { - new PriorityDelayQueue.QueueElement<Object>(obj, -1, 0, TimeUnit.MILLISECONDS); + new TestQueueElement<Object>(obj, -1, 0, TimeUnit.MILLISECONDS); fail(); } catch (IllegalArgumentException ex) { } try { - new PriorityDelayQueue.QueueElement<Object>(obj, 0, -1, TimeUnit.MILLISECONDS); + new TestQueueElement<Object>(obj, 0, -1, TimeUnit.MILLISECONDS); fail(); } catch (IllegalArgumentException ex) { } - PriorityDelayQueue.QueueElement<Object> e1 = new PriorityDelayQueue.QueueElement<Object>(obj); - assertEquals(obj, e1.getElement()); + TestQueueElement<Object> e1 = new TestQueueElement<Object>(obj); + assertEquals(obj, e1.getElement().call()); assertEquals(0, e1.getPriority()); assertTrue(e1.getDelay(TimeUnit.MILLISECONDS) <= 0); - e1 = new PriorityDelayQueue.QueueElement<Object>(obj, 1, 200, TimeUnit.MILLISECONDS); - assertEquals(obj, e1.getElement()); + e1 = new TestQueueElement<Object>(obj, 1, 200, TimeUnit.MILLISECONDS); + assertEquals(obj, e1.getElement().call()); assertEquals(1, e1.getPriority()); assertTrue(e1.getDelay(TimeUnit.MILLISECONDS) <= 200); assertTrue(e1.getDelay(TimeUnit.MILLISECONDS) >= 100); Thread.sleep(300); assertTrue(e1.getDelay(TimeUnit.MILLISECONDS) <= 0); - PriorityDelayQueue.QueueElement<Object> e2 = new PriorityDelayQueue.QueueElement<Object>(obj); + TestQueueElement<Object> e2 = new TestQueueElement<Object>(obj); assertTrue(e1.compareTo(e2) < 0); } @@ -129,23 +128,23 @@ public class TestPriorityDelayQueue extends TestCase { assertEquals(-1, q.getMaxSize()); assertEquals(1000, q.getMaxWait(TimeUnit.MILLISECONDS)); assertEquals(0, q.size()); - assertTrue(q.offer(new PriorityDelayQueue.QueueElement<Integer>(1))); + assertTrue(q.offer(new TestQueueElement<Integer>(1))); assertEquals(1, q.size()); - assertTrue(q.offer(new PriorityDelayQueue.QueueElement<Integer>(1))); + assertTrue(q.offer(new TestQueueElement<Integer>(1))); assertEquals(2, q.size()); - assertTrue(q.offer(new PriorityDelayQueue.QueueElement<Integer>(1))); + assertTrue(q.offer(new TestQueueElement<Integer>(1))); assertEquals(3, q.size()); q = new PriorityDelayQueue<Integer>(1, 1000, TimeUnit.MILLISECONDS, 1); assertEquals(1, q.getMaxSize()); assertEquals(0, q.size()); - assertTrue(q.offer(new PriorityDelayQueue.QueueElement<Integer>(1))); + assertTrue(q.offer(new TestQueueElement<Integer>(1))); assertEquals(1, q.size()); - assertFalse(q.offer(new PriorityDelayQueue.QueueElement<Integer>(1))); + assertFalse(q.offer(new TestQueueElement<Integer>(1))); assertEquals(1, q.size()); assertNotNull(q.poll()); assertEquals(0, q.size()); - assertTrue(q.offer(new PriorityDelayQueue.QueueElement<Integer>(1))); + assertTrue(q.offer(new TestQueueElement<Integer>(1))); assertEquals(1, q.size()); } @@ -154,88 +153,88 @@ public class TestPriorityDelayQueue extends TestCase { //test immediate offer polling - q.offer(new PriorityDelayQueue.QueueElement<Integer>(1)); - assertEquals((Integer) 1, q.poll().getElement()); + q.offer(new TestQueueElement<Integer>(1)); + assertEquals((Integer) 1, q.poll().getElement().call()); assertEquals(0, q.size()); //test delayed offer polling - q.offer(new PriorityDelayQueue.QueueElement<Integer>(2, 0, 10, TimeUnit.MILLISECONDS)); + q.offer(new TestQueueElement<Integer>(2, 0, 10, TimeUnit.MILLISECONDS)); assertNull(q.poll()); Thread.sleep(11); - assertEquals((Integer) 2, q.poll().getElement()); + assertEquals((Integer) 2, q.poll().getElement().call()); assertEquals(0, q.size()); //test different priorities immediate offer polling - q.offer(new PriorityDelayQueue.QueueElement<Integer>(10, 0, 0, TimeUnit.MILLISECONDS)); - q.offer(new PriorityDelayQueue.QueueElement<Integer>(30, 2, 0, TimeUnit.MILLISECONDS)); - q.offer(new PriorityDelayQueue.QueueElement<Integer>(20, 1, 0, TimeUnit.MILLISECONDS)); + q.offer(new TestQueueElement<Integer>(10, 0, 0, TimeUnit.MILLISECONDS)); + q.offer(new TestQueueElement<Integer>(30, 2, 0, TimeUnit.MILLISECONDS)); + q.offer(new TestQueueElement<Integer>(20, 1, 0, TimeUnit.MILLISECONDS)); - assertEquals((Integer) 30, q.poll().getElement()); - assertEquals((Integer) 20, q.poll().getElement()); - assertEquals((Integer) 10, q.poll().getElement()); + assertEquals((Integer) 30, q.poll().getElement().call()); + assertEquals((Integer) 20, q.poll().getElement().call()); + assertEquals((Integer) 10, q.poll().getElement().call()); assertEquals(0, q.size()); //test different priorities equal delayed offer polling - q.offer(new PriorityDelayQueue.QueueElement<Integer>(10, 0, 10, TimeUnit.MILLISECONDS)); - q.offer(new PriorityDelayQueue.QueueElement<Integer>(30, 2, 10, TimeUnit.MILLISECONDS)); - q.offer(new PriorityDelayQueue.QueueElement<Integer>(20, 1, 10, TimeUnit.MILLISECONDS)); + q.offer(new TestQueueElement<Integer>(10, 0, 10, TimeUnit.MILLISECONDS)); + q.offer(new TestQueueElement<Integer>(30, 2, 10, TimeUnit.MILLISECONDS)); + q.offer(new TestQueueElement<Integer>(20, 1, 10, TimeUnit.MILLISECONDS)); Thread.sleep(11); - List<Integer> list = new ArrayList<Integer>(); + List<XCallable> list = new ArrayList<XCallable>(); while (list.size() != 3) { QueueElement<Integer> e = q.poll(); if (e != null) { list.add(e.getElement()); } } - assertEquals((Integer) 30, list.get(0)); - assertEquals((Integer) 20, list.get(1)); - assertEquals((Integer) 10, list.get(2)); + assertEquals((Integer) 30, list.get(0).call()); + assertEquals((Integer) 20, list.get(1).call()); + assertEquals((Integer) 10, list.get(2).call()); assertEquals(0, q.size()); //test different priorities different delayed offer polling after delay - q.offer(new PriorityDelayQueue.QueueElement<Integer>(10, 0, 10, TimeUnit.MILLISECONDS)); - q.offer(new PriorityDelayQueue.QueueElement<Integer>(30, 2, 20, TimeUnit.MILLISECONDS)); - q.offer(new PriorityDelayQueue.QueueElement<Integer>(20, 1, 0, TimeUnit.MILLISECONDS)); + q.offer(new TestQueueElement<Integer>(10, 0, 10, TimeUnit.MILLISECONDS)); + q.offer(new TestQueueElement<Integer>(30, 2, 20, TimeUnit.MILLISECONDS)); + q.offer(new TestQueueElement<Integer>(20, 1, 0, TimeUnit.MILLISECONDS)); Thread.sleep(21); - list = new ArrayList<Integer>(); + list = new ArrayList<XCallable>(); while (list.size() != 3) { QueueElement<Integer> e = q.poll(); if (e != null) { list.add(e.getElement()); } } - assertEquals((Integer) 30, list.get(0)); - assertEquals((Integer) 20, list.get(1)); - assertEquals((Integer) 10, list.get(2)); + assertEquals((Integer) 30, list.get(0).call()); + assertEquals((Integer) 20, list.get(1).call()); + assertEquals((Integer) 10, list.get(2).call()); assertEquals(0, q.size()); //test different priorities different delayed offer polling within delay long start = System.currentTimeMillis(); - q.offer(new PriorityDelayQueue.QueueElement<Integer>(10, 0, 100, TimeUnit.MILLISECONDS)); - q.offer(new PriorityDelayQueue.QueueElement<Integer>(30, 2, 200, TimeUnit.MILLISECONDS)); - q.offer(new PriorityDelayQueue.QueueElement<Integer>(20, 1, 0, TimeUnit.MILLISECONDS)); + q.offer(new TestQueueElement<Integer>(10, 0, 100, TimeUnit.MILLISECONDS)); + q.offer(new TestQueueElement<Integer>(30, 2, 200, TimeUnit.MILLISECONDS)); + q.offer(new TestQueueElement<Integer>(20, 1, 0, TimeUnit.MILLISECONDS)); - assertEquals((Integer) 20, q.poll().getElement()); + assertEquals((Integer) 20, q.poll().getElement().call()); long delay = System.currentTimeMillis() - start; Thread.sleep(101 - delay); - assertEquals((Integer) 10, q.poll().getElement()); + assertEquals((Integer) 10, q.poll().getElement().call()); start = System.currentTimeMillis(); delay = System.currentTimeMillis() - start; Thread.sleep(101 - delay); - assertEquals((Integer) 30, q.poll().getElement()); + assertEquals((Integer) 30, q.poll().getElement().call()); assertEquals(0, q.size()); } @@ -245,46 +244,46 @@ public class TestPriorityDelayQueue extends TestCase { //test immediate offer peeking - q.offer(new PriorityDelayQueue.QueueElement<Integer>(1)); - assertEquals((Integer) 1, q.peek().getElement()); + q.offer(new TestQueueElement<Integer>(1)); + assertEquals((Integer) 1, q.peek().getElement().call()); q.poll(); assertEquals(0, q.size()); //test delay offer peeking - q.offer(new PriorityDelayQueue.QueueElement<Integer>(1, 1, 10, TimeUnit.MILLISECONDS)); - assertEquals((Integer) 1, q.peek().getElement()); + q.offer(new TestQueueElement<Integer>(1, 1, 10, TimeUnit.MILLISECONDS)); + assertEquals((Integer) 1, q.peek().getElement().call()); Thread.sleep(11); assertNotNull(q.poll()); assertEquals(0, q.size()); //test different priorities immediate offer peeking - q.offer(new PriorityDelayQueue.QueueElement<Integer>(10, 0, 0, TimeUnit.MILLISECONDS)); - q.offer(new PriorityDelayQueue.QueueElement<Integer>(30, 2, 0, TimeUnit.MILLISECONDS)); - q.offer(new PriorityDelayQueue.QueueElement<Integer>(20, 1, 0, TimeUnit.MILLISECONDS)); + q.offer(new TestQueueElement<Integer>(10, 0, 0, TimeUnit.MILLISECONDS)); + q.offer(new TestQueueElement<Integer>(30, 2, 0, TimeUnit.MILLISECONDS)); + q.offer(new TestQueueElement<Integer>(20, 1, 0, TimeUnit.MILLISECONDS)); - assertEquals((Integer) 30, q.peek().getElement()); + assertEquals((Integer) 30, q.peek().getElement().call()); assertNotNull(q.poll()); - assertEquals((Integer) 20, q.peek().getElement()); + assertEquals((Integer) 20, q.peek().getElement().call()); assertNotNull(q.poll()); - assertEquals((Integer) 10, q.peek().getElement()); + assertEquals((Integer) 10, q.peek().getElement().call()); assertNotNull(q.poll()); assertEquals(0, q.size()); //test different priorities delayed offer peeking - q.offer(new PriorityDelayQueue.QueueElement<Integer>(30, 2, 200, TimeUnit.MILLISECONDS)); - q.offer(new PriorityDelayQueue.QueueElement<Integer>(10, 0, 100, TimeUnit.MILLISECONDS)); - q.offer(new PriorityDelayQueue.QueueElement<Integer>(20, 1, 150, TimeUnit.MILLISECONDS)); + q.offer(new TestQueueElement<Integer>(30, 2, 200, TimeUnit.MILLISECONDS)); + q.offer(new TestQueueElement<Integer>(10, 0, 100, TimeUnit.MILLISECONDS)); + q.offer(new TestQueueElement<Integer>(20, 1, 150, TimeUnit.MILLISECONDS)); - assertEquals((Integer) 10, q.peek().getElement()); + assertEquals((Integer) 10, q.peek().getElement().call()); Thread.sleep(100); assertNotNull(q.poll()); - assertEquals((Integer) 20, q.peek().getElement()); + assertEquals((Integer) 20, q.peek().getElement().call()); Thread.sleep(50); assertNotNull(q.poll()); - assertEquals((Integer) 30, q.peek().getElement()); + assertEquals((Integer) 30, q.peek().getElement().call()); Thread.sleep(50); assertNotNull(q.poll()); assertEquals(0, q.size()); @@ -292,7 +291,7 @@ public class TestPriorityDelayQueue extends TestCase { public void testAntiStarvation() throws Exception { PriorityDelayQueue<Integer> q = new PriorityDelayQueue<Integer>(3, 500, TimeUnit.MILLISECONDS, -1); - q.offer(new PriorityDelayQueue.QueueElement<Integer>(1)); + q.offer(new TestQueueElement<Integer>(1)); q.peek(); assertEquals(1, q.sizes()[0]); Thread.sleep(600); @@ -317,7 +316,7 @@ public class TestPriorityDelayQueue extends TestCase { for (int j = 0; j < 10; j++) { String msg = count + " - " + j; try { - queue.offer(new PriorityDelayQueue.QueueElement<String>(msg, + queue.offer(new TestQueueElement<String>(msg, (int) (Math.random() * priorities), (int) (Math.random() * 500), TimeUnit.MILLISECONDS)); Thread.sleep((int) (Math.random() * 50)); @@ -346,48 +345,92 @@ public class TestPriorityDelayQueue extends TestCase { public void testIterator() throws Exception { PriorityDelayQueue<Integer> q = new PriorityDelayQueue<Integer>(3, 500, TimeUnit.MILLISECONDS, -1); - q.offer(new PriorityDelayQueue.QueueElement<Integer>(1, 1, 10, TimeUnit.MILLISECONDS)); - q.offer(new PriorityDelayQueue.QueueElement<Integer>(10, 0, 0, TimeUnit.MILLISECONDS)); - q.offer(new PriorityDelayQueue.QueueElement<Integer>(30, 2, 0, TimeUnit.MILLISECONDS)); - q.offer(new PriorityDelayQueue.QueueElement<Integer>(20, 1, 0, TimeUnit.MILLISECONDS)); + q.offer(new TestQueueElement<Integer>(1, 1, 10, TimeUnit.MILLISECONDS)); + q.offer(new TestQueueElement<Integer>(10, 0, 0, TimeUnit.MILLISECONDS)); + q.offer(new TestQueueElement<Integer>(30, 2, 0, TimeUnit.MILLISECONDS)); + q.offer(new TestQueueElement<Integer>(20, 1, 0, TimeUnit.MILLISECONDS)); - Iterator<PriorityDelayQueue.QueueElement<Integer>> it = q.iterator(); - assertTrue(it.hasNext()); - - int size = 0; - while (it.hasNext()) { - it.next(); - size++; - } - assertEquals(4, size); + assertEquals(4, q.size()); assertNotNull(q.poll()); assertNotNull(q.poll()); - q.offer(new PriorityDelayQueue.QueueElement<Integer>(40, 0, 0, TimeUnit.MILLISECONDS)); - q.offer(new PriorityDelayQueue.QueueElement<Integer>(50, 2, 0, TimeUnit.MILLISECONDS)); - q.offer(new PriorityDelayQueue.QueueElement<Integer>(60, 1, 0, TimeUnit.MILLISECONDS)); + q.offer(new TestQueueElement<Integer>(40, 0, 0, TimeUnit.MILLISECONDS)); + q.offer(new TestQueueElement<Integer>(50, 2, 0, TimeUnit.MILLISECONDS)); + q.offer(new TestQueueElement<Integer>(60, 1, 0, TimeUnit.MILLISECONDS)); assertEquals(5, q.size()); assertNotNull(q.poll()); - - it = q.iterator(); - Thread.sleep(50); - size = 0; - while (it.hasNext()) { - it.next(); - size++; - } - assertEquals(4, size); + assertEquals(4, q.size()); } public void testClear() { PriorityDelayQueue<Integer> q = new PriorityDelayQueue<Integer>(3, 500, TimeUnit.MILLISECONDS, -1); - q.offer(new PriorityDelayQueue.QueueElement<Integer>(1, 1, 10, TimeUnit.MILLISECONDS)); - q.offer(new PriorityDelayQueue.QueueElement<Integer>(10, 0, 0, TimeUnit.MILLISECONDS)); + q.offer(new TestQueueElement<Integer>(1, 1, 10, TimeUnit.MILLISECONDS)); + q.offer(new TestQueueElement<Integer>(10, 0, 0, TimeUnit.MILLISECONDS)); assertEquals(2, q.size()); q.clear(); assertEquals(0, q.size()); } + public static class TestQueueElement<E> extends QueueElement<E> { + + public TestQueueElement(final E element, int priority, long delay, TimeUnit unit) { + super(new XCallable<E>() { + + @Override + public E call() throws Exception { + return element; + } + + @Override + public String getName() { + return null; + } + + @Override + public int getPriority() { + return 0; + } + + @Override + public String getType() { + return null; + } + + @Override + public long getCreatedTime() { + return 0; + } + + @Override + public String getKey() { + return null; + } + + @Override + public String getEntityKey() { + return null; + } + + @Override + public void setInterruptMode(boolean mode) { + } + + @Override + public boolean inInterruptMode() { + return false; + } + }, priority, delay, unit); + ParamChecker.notNull(element, "element can't be null"); + } + + public TestQueueElement(E element) { + this(element, 0, 0, TimeUnit.MILLISECONDS); + } + + protected void debug(String template, Object... args) { + System.out.println(MessageFormat.format(template, args)); + } + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/d8425480/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index d533d78..160cd72 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.3.0 release (trunk - unreleased) +OOZIE-2345 Parallel job submission for forked actions (puru) OOZIE-2358 Coord rerun cleanup should reuse hcat connections (rohini) OOZIE-2356 Add a way to enable/disable credentials in a workflow (rkanter) OOZIE-2355 Hive2 Action doesn't pass along oozie configs to jobconf (rkanter)
