Repository: oozie Updated Branches: refs/heads/master 273003062 -> 99a3df568
OOZIE-2440 Exponential re-try policy for workflow action (satishsaley via jaydeepvishwakarma) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/99a3df56 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/99a3df56 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/99a3df56 Branch: refs/heads/master Commit: 99a3df568d182e75751126aac5fe8b63d876e64f Parents: 2730030 Author: jvishwakarma <[email protected]> Authored: Tue Aug 2 23:36:18 2016 +0530 Committer: jvishwakarma <[email protected]> Committed: Tue Aug 2 23:36:18 2016 +0530 ---------------------------------------------------------------------- .../src/main/resources/oozie-workflow-0.5.xsd | 1 + .../oozie/command/wf/ActionCheckXCommand.java | 2 +- .../oozie/command/wf/ActionEndXCommand.java | 2 +- .../apache/oozie/command/wf/ActionXCommand.java | 52 ++++- .../oozie/service/LiteWorkflowStoreService.java | 13 +- .../oozie/workflow/lite/ActionNodeDef.java | 5 +- .../workflow/lite/LiteWorkflowAppParser.java | 11 +- .../org/apache/oozie/workflow/lite/NodeDef.java | 82 ++++++- core/src/main/resources/oozie-default.xml | 12 +- .../oozie/command/wf/TestActionUserRetry.java | 215 +++++++++++++++++++ .../wf/TestForkedActionStartXCommand.java | 77 ------- .../command/wf/TestWorkflowKillXCommand.java | 19 ++ .../src/site/twiki/WorkflowFunctionalSpec.twiki | 11 +- release-log.txt | 1 + 14 files changed, 396 insertions(+), 107 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/99a3df56/client/src/main/resources/oozie-workflow-0.5.xsd ---------------------------------------------------------------------- diff --git a/client/src/main/resources/oozie-workflow-0.5.xsd b/client/src/main/resources/oozie-workflow-0.5.xsd index fda49ed..8fe2b47 100644 --- a/client/src/main/resources/oozie-workflow-0.5.xsd +++ b/client/src/main/resources/oozie-workflow-0.5.xsd @@ -160,6 +160,7 @@ <xs:attribute name="cred" type="xs:string"/> <xs:attribute name="retry-max" type="xs:string"/> <xs:attribute name="retry-interval" type="xs:string"/> + <xs:attribute name="retry-policy" type="xs:string"/> </xs:complexType> <xs:complexType name="MAP-REDUCE"> http://git-wip-us.apache.org/repos/asf/oozie/blob/99a3df56/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java index ea4d340..d0551ff 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java @@ -210,7 +210,7 @@ public class ActionCheckXCommand extends ActionXCommand<Void> { switch (ex.getErrorType()) { case ERROR: // If allowed to retry, this will handle it; otherwise, we should fall through to FAILED - if (handleUserRetry(wfAction)) { + if (handleUserRetry(wfAction, wfJob)) { break; } case FAILED: http://git-wip-us.apache.org/repos/asf/oozie/blob/99a3df56/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java index d030a10..740b8d3 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java @@ -216,7 +216,7 @@ public class ActionEndXCommand extends ActionXCommand<Void> { shouldHandleUserRetry = true; break; } - if (!shouldHandleUserRetry || !handleUserRetry(wfAction)) { + if (!shouldHandleUserRetry || !handleUserRetry(wfAction, wfJob)) { SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), slaStatus, SlaAppType.WORKFLOW_ACTION); if(slaEvent != null) { insertList.add(slaEvent); http://git-wip-us.apache.org/repos/asf/oozie/blob/99a3df56/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java index e65c3bf..836e5d4 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java @@ -41,6 +41,7 @@ import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.client.WorkflowJob; import org.apache.oozie.command.CommandException; import org.apache.oozie.service.CallbackService; +import org.apache.oozie.service.ConfigurationService; import org.apache.oozie.service.ELService; import org.apache.oozie.service.HadoopAccessorException; import org.apache.oozie.service.HadoopAccessorService; @@ -53,7 +54,9 @@ import org.apache.oozie.util.Instrumentation; import org.apache.oozie.util.XConfiguration; import org.apache.oozie.workflow.WorkflowException; import org.apache.oozie.workflow.WorkflowInstance; +import org.apache.oozie.workflow.lite.LiteWorkflowApp; import org.apache.oozie.workflow.lite.LiteWorkflowInstance; +import org.apache.oozie.workflow.lite.NodeDef; /** * Base class for Action execution commands. Provides common functionality to handle different types of errors while @@ -154,8 +157,8 @@ public abstract class ActionXCommand<T> extends WorkflowXCommand<T> { LOG.warn("Setting Action Status to [{0}]", status); ActionExecutorContext aContext = (ActionExecutorContext) context; WorkflowActionBean action = (WorkflowActionBean) aContext.getAction(); - - if (!handleUserRetry(action)) { + WorkflowJobBean wfJob = (WorkflowJobBean) context.getWorkflow(); + if (!handleUserRetry(action, wfJob)) { incrActionErrorCounter(action.getType(), "error", 1); action.setPending(); if (isStart) { @@ -189,7 +192,7 @@ public abstract class ActionXCommand<T> extends WorkflowXCommand<T> { */ public void failJob(ActionExecutor.Context context, WorkflowActionBean action) throws CommandException { WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow(); - if (!handleUserRetry(action)) { + if (!handleUserRetry(action, workflow)) { incrActionErrorCounter(action.getType(), "failed", 1); LOG.warn("Failing Job due to failed action [{0}]", action.getName()); try { @@ -217,7 +220,7 @@ public abstract class ActionXCommand<T> extends WorkflowXCommand<T> { * @return true if user-retry has to be handled for this action * @throws CommandException thrown if unable to fail job */ - public boolean handleUserRetry(WorkflowActionBean action) throws CommandException { + public boolean handleUserRetry(WorkflowActionBean action, WorkflowJobBean wfJob) throws CommandException { String errorCode = action.getErrorCode(); Set<String> allowedRetryCode = LiteWorkflowStoreService.getUserRetryErrorCode(); @@ -226,7 +229,8 @@ public abstract class ActionXCommand<T> extends WorkflowXCommand<T> { LOG.info("Preparing retry this action [{0}], errorCode [{1}], userRetryCount [{2}], " + "userRetryMax [{3}], userRetryInterval [{4}]", action.getId(), errorCode, action .getUserRetryCount(), action.getUserRetryMax(), action.getUserRetryInterval()); - int interval = action.getUserRetryInterval() * 60 * 1000; + ActionExecutor.RETRYPOLICY retryPolicy = getUserRetryPolicy(action, wfJob); + long interval = getRetryDelay(action.getUserRetryCount(), action.getUserRetryInterval() * 60, retryPolicy); action.setStatus(WorkflowAction.Status.USER_RETRY); action.incrmentUserRetryCount(); action.setPending(); @@ -559,4 +563,42 @@ public abstract class ActionXCommand<T> extends WorkflowXCommand<T> { } } + /* + * Returns user retry policy + */ + private ActionExecutor.RETRYPOLICY getUserRetryPolicy(WorkflowActionBean wfAction, WorkflowJobBean wfJob) { + WorkflowInstance wfInstance = wfJob.getWorkflowInstance(); + LiteWorkflowApp wfApp = (LiteWorkflowApp) wfInstance.getApp(); + NodeDef nodeDef = wfApp.getNode(wfAction.getName()); + if (nodeDef == null) { + return ActionExecutor.RETRYPOLICY.valueOf(LiteWorkflowStoreService.DEFAULT_USER_RETRY_POLICY); + } + String userRetryPolicy = nodeDef.getUserRetryPolicy().toUpperCase(); + String userRetryPolicyInSysConfig = ConfigurationService.get(LiteWorkflowStoreService.CONF_USER_RETRY_POLICY) + .toUpperCase(); + if (isValidRetryPolicy(userRetryPolicy)) { + return ActionExecutor.RETRYPOLICY.valueOf(userRetryPolicy); + } + else if (isValidRetryPolicy(userRetryPolicyInSysConfig)) { + return ActionExecutor.RETRYPOLICY.valueOf(userRetryPolicyInSysConfig); + } + else { + return ActionExecutor.RETRYPOLICY.valueOf(LiteWorkflowStoreService.DEFAULT_USER_RETRY_POLICY); + } + } + + /* + * Returns true if policy is valid, otherwise false + */ + private static boolean isValidRetryPolicy(String policy) { + try { + ActionExecutor.RETRYPOLICY.valueOf(policy.toUpperCase().trim()); + } + catch (IllegalArgumentException e) { + // Invalid Policy + return false; + } + return true; + } + } http://git-wip-us.apache.org/repos/asf/oozie/blob/99a3df56/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java b/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java index 99ace13..ffc29af 100644 --- a/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java +++ b/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java @@ -60,11 +60,14 @@ public abstract class LiteWorkflowStoreService extends WorkflowStoreService { public static final String CONF_PREFIX_USER_RETRY = CONF_PREFIX + "user.retry."; public static final String CONF_USER_RETRY_MAX = CONF_PREFIX_USER_RETRY + "max"; public static final String CONF_USER_RETRY_INTEVAL = CONF_PREFIX_USER_RETRY + "inteval"; + public static final String CONF_USER_RETRY_POLICY = CONF_PREFIX_USER_RETRY + "policy"; public static final String CONF_USER_RETRY_ERROR_CODE = CONF_PREFIX_USER_RETRY + "error.code"; public static final String CONF_USER_RETRY_ERROR_CODE_EXT = CONF_PREFIX_USER_RETRY + "error.code.ext"; + public static final String DEFAULT_USER_RETRY_POLICY = "PERIODIC"; public static final String NODE_DEF_VERSION_0 = "_oozie_inst_v_0"; public static final String NODE_DEF_VERSION_1 = "_oozie_inst_v_1"; + public static final String NODE_DEF_VERSION_2 = "_oozie_inst_v_2"; public static final String CONF_NODE_DEF_VERSION = CONF_PREFIX + "node.def.version"; public static final String USER_ERROR_CODE_ALL = "ALL"; @@ -195,15 +198,17 @@ public abstract class LiteWorkflowStoreService extends WorkflowStoreService { } /** - * Get NodeDef default version, _oozie_inst_v_0 or _oozie_inst_v_1 + * Get NodeDef default version, _oozie_inst_v_0, _oozie_inst_v_1 or + * _oozie_inst_v_2 * * @return nodedef default version - * @throws WorkflowException thrown if there was an error parsing the action configuration. - */ + * @throws WorkflowException thrown if there was an error parsing the action + * configuration. + */ public static String getNodeDefDefaultVersion() throws WorkflowException { String ret = ConfigurationService.get(CONF_NODE_DEF_VERSION); if (ret == null) { - ret = NODE_DEF_VERSION_1; + ret = NODE_DEF_VERSION_2; } return ret; } http://git-wip-us.apache.org/repos/asf/oozie/blob/99a3df56/core/src/main/java/org/apache/oozie/workflow/lite/ActionNodeDef.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/workflow/lite/ActionNodeDef.java b/core/src/main/java/org/apache/oozie/workflow/lite/ActionNodeDef.java index d3a2793..97c7134 100644 --- a/core/src/main/java/org/apache/oozie/workflow/lite/ActionNodeDef.java +++ b/core/src/main/java/org/apache/oozie/workflow/lite/ActionNodeDef.java @@ -44,7 +44,8 @@ public class ActionNodeDef extends NodeDef { } public ActionNodeDef(String name, String conf, Class<? extends ActionNodeHandler> actionHandlerClass, String onOk, - String onError, String cred, String userRetryMax, String userRetryInterval) { - super(name, ParamChecker.notNull(conf, "conf"), actionHandlerClass, Arrays.asList(onOk, onError), cred, userRetryMax, userRetryInterval); + String onError, String cred, String userRetryMax, String userRetryInterval, String userRetryPolicy) { + super(name, ParamChecker.notNull(conf, "conf"), actionHandlerClass, Arrays.asList(onOk, onError), cred, + userRetryMax, userRetryInterval, userRetryPolicy); } } http://git-wip-us.apache.org/repos/asf/oozie/blob/99a3df56/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java index a1b9cdb..bbd81a9 100644 --- a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java +++ b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java @@ -21,6 +21,8 @@ package org.apache.oozie.workflow.lite; import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.io.Writable; import org.apache.oozie.action.hadoop.FsActionExecutor; +import org.apache.oozie.ErrorCode; +import org.apache.oozie.action.ActionExecutor; import org.apache.oozie.action.oozie.SubWorkflowActionExecutor; import org.apache.oozie.service.ConfigurationService; import org.apache.oozie.util.ELUtils; @@ -90,6 +92,7 @@ public class LiteWorkflowAppParser { private static final String USER_RETRY_MAX_A = "retry-max"; private static final String USER_RETRY_INTERVAL_A = "retry-interval"; private static final String TO_A = "to"; + private static final String USER_RETRY_POLICY_A = "retry-policy"; private static final String FORK_PATH_E = "path"; private static final String FORK_START_A = "start"; @@ -485,6 +488,7 @@ public class LiteWorkflowAppParser { String credStr = eNode.getAttributeValue(CRED_A); String userRetryMaxStr = eNode.getAttributeValue(USER_RETRY_MAX_A); String userRetryIntervalStr = eNode.getAttributeValue(USER_RETRY_INTERVAL_A); + String userRetryPolicyStr = eNode.getAttributeValue(USER_RETRY_POLICY_A); try { if (!StringUtils.isEmpty(userRetryMaxStr)) { userRetryMaxStr = ELUtils.resolveAppName(userRetryMaxStr, jobConf); @@ -492,6 +496,9 @@ public class LiteWorkflowAppParser { if (!StringUtils.isEmpty(userRetryIntervalStr)) { userRetryIntervalStr = ELUtils.resolveAppName(userRetryIntervalStr, jobConf); } + if (!StringUtils.isEmpty(userRetryPolicyStr)) { + userRetryPolicyStr = ELUtils.resolveAppName(userRetryPolicyStr, jobConf); + } } catch (Exception e) { throw new WorkflowException(ErrorCode.E0703, e.getMessage()); @@ -499,8 +506,8 @@ public class LiteWorkflowAppParser { String actionConf = XmlUtils.prettyPrint(eActionConf).toString(); def.addNode(new ActionNodeDef(eNode.getAttributeValue(NAME_A), actionConf, actionHandlerClass, - transitions[0], transitions[1], credStr, - userRetryMaxStr, userRetryIntervalStr)); + transitions[0], transitions[1], credStr, userRetryMaxStr, userRetryIntervalStr, + userRetryPolicyStr)); } else if (SLA_INFO.equals(eNode.getName()) || CREDENTIALS.equals(eNode.getName())) { // No operation is required } else if (eNode.getName().equals(GLOBAL)) { http://git-wip-us.apache.org/repos/asf/oozie/blob/99a3df56/core/src/main/java/org/apache/oozie/workflow/lite/NodeDef.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/workflow/lite/NodeDef.java b/core/src/main/java/org/apache/oozie/workflow/lite/NodeDef.java index 9e66d28..496b008 100644 --- a/core/src/main/java/org/apache/oozie/workflow/lite/NodeDef.java +++ b/core/src/main/java/org/apache/oozie/workflow/lite/NodeDef.java @@ -43,6 +43,7 @@ public class NodeDef implements Writable { private String cred = null; private String userRetryMax = "null"; private String userRetryInterval = "null"; + private String userRetryPolicy = "null"; NodeDef() { } @@ -62,7 +63,7 @@ public class NodeDef implements Writable { } NodeDef(String name, String conf, Class<? extends NodeHandler> handlerClass, List<String> transitions, String cred, - String userRetryMax, String userRetryInterval) { + String userRetryMax, String userRetryInterval, String userRetryPolicy) { this(name, conf, handlerClass, transitions, cred); if (userRetryMax != null) { this.userRetryMax = userRetryMax; @@ -70,6 +71,9 @@ public class NodeDef implements Writable { if (userRetryInterval != null) { this.userRetryInterval = userRetryInterval; } + if (userRetryPolicy != null) { + this.userRetryPolicy = userRetryPolicy; + } } public boolean equals(NodeDef other) { @@ -115,12 +119,20 @@ public class NodeDef implements Writable { nodeDefVersion = LiteWorkflowStoreService.getNodeDefDefaultVersion(); } catch (WorkflowException e) { - nodeDefVersion = LiteWorkflowStoreService.NODE_DEF_VERSION_1; + nodeDefVersion = LiteWorkflowStoreService.NODE_DEF_VERSION_2; } } return nodeDefVersion; } + public String getUserRetryPolicy() { + return userRetryPolicy; + } + + public void setUserRetryPolicy(String userRetryPolicy) { + this.userRetryPolicy = userRetryPolicy; + } + @SuppressWarnings("unchecked") private void readVersionZero(DataInput dataInput, String firstField) throws IOException { if (firstField.equals(LiteWorkflowStoreService.NODE_DEF_VERSION_0)) { @@ -151,7 +163,24 @@ public class NodeDef implements Writable { } @SuppressWarnings("unchecked") private void readVersionOne(DataInput dataInput, String firstField) throws IOException { - nodeDefVersion = LiteWorkflowStoreService.NODE_DEF_VERSION_1; + readCommon(dataInput, firstField, LiteWorkflowStoreService.NODE_DEF_VERSION_1); + } + + /* + * Reads according to version 2 + */ + @SuppressWarnings("unchecked") + private void readVersionTwo(DataInput dataInput, String firstField) throws IOException { + readCommon(dataInput, firstField, LiteWorkflowStoreService.NODE_DEF_VERSION_2); + userRetryPolicy = dataInput.readUTF(); + } + + /* + * Reads common part + */ + @SuppressWarnings("unchecked") + private void readCommon(DataInput dataInput, String firstField, String nodeDefVer) throws IOException { + nodeDefVersion = nodeDefVer; name = dataInput.readUTF(); cred = dataInput.readUTF(); if (cred.equals("null")) { @@ -185,12 +214,16 @@ public class NodeDef implements Writable { @Override public void readFields(DataInput dataInput) throws IOException { String firstField = dataInput.readUTF(); - if (!firstField.equals(LiteWorkflowStoreService.NODE_DEF_VERSION_1)) { - readVersionZero(dataInput, firstField); - } else { - //since oozie version 3.1 + if (firstField.equals(LiteWorkflowStoreService.NODE_DEF_VERSION_1)) { + // since oozie version 3.1 readVersionOne(dataInput, firstField); } + else if (firstField.equals(LiteWorkflowStoreService.NODE_DEF_VERSION_2)) { + readVersionTwo(dataInput, firstField); + } + else { + readVersionZero(dataInput, firstField); + } } private void writeVersionZero(DataOutput dataOutput) throws IOException { @@ -222,6 +255,29 @@ public class NodeDef implements Writable { * @throws IOException thrown if fail to write */ private void writeVersionOne(DataOutput dataOutput) throws IOException { + writeCommon(dataOutput); + } + + /** + * Write as version two format, this version was since 4.4.4.1. + * + * @param dataOutput data output to serialize node def + * @throws IOException thrown if fail to write + */ + private void writeVersionTwo(DataOutput dataOutput) throws IOException { + writeCommon(dataOutput); + if (userRetryPolicy != null) { + dataOutput.writeUTF(userRetryPolicy); + } + else { + dataOutput.writeUTF("null"); + } + } + + /* + * Write the common part + */ + private void writeCommon(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(nodeDefVersion); dataOutput.writeUTF(name); if (cred != null) { @@ -260,12 +316,16 @@ public class NodeDef implements Writable { */ @Override public void write(DataOutput dataOutput) throws IOException { - if (!getNodeDefVersion().equals(LiteWorkflowStoreService.NODE_DEF_VERSION_1)) { - writeVersionZero(dataOutput); - } else { - //since oozie version 3.1 + if (getNodeDefVersion().equals(LiteWorkflowStoreService.NODE_DEF_VERSION_1)) { + // since oozie version 3.1 writeVersionOne(dataOutput); } + else if (getNodeDefVersion().equals(LiteWorkflowStoreService.NODE_DEF_VERSION_2)) { + writeVersionTwo(dataOutput); + } + else { + writeVersionZero(dataOutput); + } } } http://git-wip-us.apache.org/repos/asf/oozie/blob/99a3df56/core/src/main/resources/oozie-default.xml ---------------------------------------------------------------------- diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml index 4563c73..530c2ed 100644 --- a/core/src/main/resources/oozie-default.xml +++ b/core/src/main/resources/oozie-default.xml @@ -2116,6 +2116,14 @@ will be the requeue interval for the actions which are waiting for a long time w </property> <property> + <name>oozie.service.LiteWorkflowStoreService.user.retry.policy</name> + <value>periodic</value> + <description> + Automatic retry policy for workflow action. Possible values are periodic or exponential, periodic being the default. + </description> + </property> + + <property> <name>oozie.service.LiteWorkflowStoreService.user.retry.error.code</name> <value>JA008,JA009,JA017,JA018,JA019,FS009,FS008,FS014</value> <description> @@ -2142,9 +2150,9 @@ will be the requeue interval for the actions which are waiting for a long time w <property> <name>oozie.service.LiteWorkflowStoreService.node.def.version</name> - <value>_oozie_inst_v_1</value> + <value>_oozie_inst_v_2</value> <description> - NodeDef default version, _oozie_inst_v_0 or _oozie_inst_v_1 + NodeDef default version, _oozie_inst_v_0, _oozie_inst_v_1 or _oozie_inst_v_2 </description> </property> http://git-wip-us.apache.org/repos/asf/oozie/blob/99a3df56/core/src/test/java/org/apache/oozie/command/wf/TestActionUserRetry.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestActionUserRetry.java b/core/src/test/java/org/apache/oozie/command/wf/TestActionUserRetry.java new file mode 100644 index 0000000..ca2b5a2 --- /dev/null +++ b/core/src/test/java/org/apache/oozie/command/wf/TestActionUserRetry.java @@ -0,0 +1,215 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.command.wf; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.net.URI; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.oozie.ForTestingActionExecutor; +import org.apache.oozie.WorkflowActionBean; +import org.apache.oozie.client.OozieClient; +import org.apache.oozie.command.CommandException; +import org.apache.oozie.executor.jpa.JPAExecutorException; +import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor; +import org.apache.oozie.service.ActionService; +import org.apache.oozie.service.ConfigurationService; +import org.apache.oozie.service.ExtendedCallableQueueService; +import org.apache.oozie.service.JPAService; +import org.apache.oozie.service.LiteWorkflowStoreService; +import org.apache.oozie.service.SchemaService; +import org.apache.oozie.service.Services; +import org.apache.oozie.test.XDataTestCase; +import org.apache.oozie.util.XConfiguration; + +public class TestActionUserRetry extends XDataTestCase { + private Services services; + + @Override + protected void setUp() throws Exception { + super.setUp(); + setSystemProperty(SchemaService.WF_CONF_EXT_SCHEMAS, "wf-ext-schema.xsd"); + setSystemProperty(LiteWorkflowStoreService.CONF_USER_RETRY_ERROR_CODE_EXT, ForTestingActionExecutor.TEST_ERROR); + setSystemProperty(Services.CONF_SERVICE_EXT_CLASSES, ExtendedCallableQueueService.class.getName()); + services = new Services(); + services.init(); + services.get(ActionService.class).registerAndInitExecutor(ForTestingActionExecutor.class); + ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, true); + } + + @Override + protected void tearDown() throws Exception { + services.destroy(); + super.tearDown(); + } + + public void testUserRetry() throws JPAExecutorException, IOException, CommandException{ + Configuration conf = new XConfiguration(); + String workflowUri = getTestCaseFileUri("workflow.xml"); + + //@formatter:off + String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:0.3\" name=\"wf-fork\">" + + "<start to=\"fork1\"/>" + + "<fork name=\"fork1\">" + + "<path start=\"action1\"/>" + + "<path start=\"action2\"/>" + + "</fork>" + +"<action name=\"action1\" retry-max=\"2\" retry-interval=\"0\">" + + "<test xmlns=\"uri:test\">" + + "<signal-value>${wf:conf('signal-value')}</signal-value>" + + "<external-status>${wf:conf('external-status')}</external-status> " + + "<error>${wf:conf('error')}</error>" + + "<avoid-set-execution-data>${wf:conf('avoid-set-execution-data')}</avoid-set-execution-data>" + + "<avoid-set-end-data>${wf:conf('avoid-set-end-data')}</avoid-set-end-data>" + + "<running-mode>${wf:conf('running-mode')}</running-mode>" + + "</test>" + + "<ok to=\"join1\"/>" + + "<error to=\"kill\"/>" + + "</action>" + + "<action name=\"action2\">" + + "<fs></fs><ok to=\"join1\"/>" + + "<error to=\"kill\"/>" + + "</action>" + + "<join name=\"join1\" to=\"end\"/>" + + "<kill name=\"kill\"><message>killed</message></kill>" + + "<end name=\"end\"/>" + + "</workflow-app>"; + //@Formatter:on + writeToFile(appXml, workflowUri); + conf.set(OozieClient.APP_PATH, workflowUri); + conf.set(OozieClient.USER_NAME, getTestUser()); + conf.set("error", "start.error"); + conf.set("external-status", "error"); + conf.set("signal-value", "based_on_action_status"); + + SubmitXCommand sc = new SubmitXCommand(conf); + final String jobId = sc.call(); + new StartXCommand(jobId).call(); + final WorkflowActionsGetForJobJPAExecutor actionsGetExecutor = new WorkflowActionsGetForJobJPAExecutor(jobId); + final JPAService jpaService = Services.get().get(JPAService.class); + + waitFor(20 * 1000, new Predicate() { + public boolean evaluate() throws Exception { + List<WorkflowActionBean> actions = jpaService.execute(actionsGetExecutor); + WorkflowActionBean action = null; + for (WorkflowActionBean bean : actions) { + if (bean.getType().equals("test")) { + action = bean; + break; + } + } + return (action != null && action.getUserRetryCount() == 2); + } + }); + + List<WorkflowActionBean> actions = jpaService.execute(actionsGetExecutor); + WorkflowActionBean action = null; + for (WorkflowActionBean bean : actions) { + if (bean.getType().equals("test")) { + action = bean; + break; + } + } + assertNotNull(action); + assertEquals(2, action.getUserRetryCount()); + } + + public void testUserRetryPolicy() throws JPAExecutorException, IOException, CommandException { + Configuration conf = new XConfiguration(); + String workflowUri = getTestCaseFileUri("workflow.xml"); + + //@formatter:off + String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:0.5\" name=\"wf-fork\">" + "<start to=\"fork1\"/>" + + "<fork name=\"fork1\">" + "<path start=\"action1\"/>" + "<path start=\"action2\"/>" + "</fork>" + + "<action name=\"action1\" retry-max=\"2\" retry-interval=\"1\" retry-policy=\"exponential\">" + + "<test xmlns=\"uri:test\">" + + "<signal-value>${wf:conf('signal-value')}</signal-value>" + + "<external-status>${wf:conf('external-status')}</external-status> " + + "<error>${wf:conf('error')}</error>" + + "<avoid-set-execution-data>${wf:conf('avoid-set-execution-data')}</avoid-set-execution-data>" + + "<avoid-set-end-data>${wf:conf('avoid-set-end-data')}</avoid-set-end-data>" + + "<running-mode>${wf:conf('running-mode')}</running-mode>" + "</test>" + "<ok to=\"join1\"/>" + + "<error to=\"kill\"/>" + "</action>" + "<action name=\"action2\">" + "<fs></fs><ok to=\"join1\"/>" + + "<error to=\"kill\"/>" + "</action>" + "<join name=\"join1\" to=\"end\"/>" + + "<kill name=\"kill\"><message>killed</message></kill>" + "<end name=\"end\"/>" + "</workflow-app>"; + // @Formatter:on + writeToFile(appXml, workflowUri); + conf.set(OozieClient.APP_PATH, workflowUri); + conf.set(OozieClient.USER_NAME, getTestUser()); + conf.set("error", "start.error"); + conf.set("external-status", "error"); + conf.set("signal-value", "based_on_action_status"); + + SubmitXCommand sc = new SubmitXCommand(conf); + final String jobId = sc.call(); + new StartXCommand(jobId).call(); + final WorkflowActionsGetForJobJPAExecutor actionsGetExecutor = new WorkflowActionsGetForJobJPAExecutor(jobId); + final JPAService jpaService = Services.get().get(JPAService.class); + // set a timeout for exponential retry of action with respect to given + // retry-interval and retry-max. + // If retry-interval is 1 then, for first retry, delay will be 1 min, + // for second retry it will be 2 min, 4, 8, 16 & so on. + int timeout = (1 + 2) * 60 * 1000; + waitFor(timeout, new Predicate() { + public boolean evaluate() throws Exception { + List<WorkflowActionBean> actions = jpaService.execute(actionsGetExecutor); + WorkflowActionBean action = null; + for (WorkflowActionBean bean : actions) { + if (bean.getType().equals("test")) { + action = bean; + break; + } + } + return (action != null && action.getUserRetryCount() == 2); + } + }); + + List<WorkflowActionBean> actions = jpaService.execute(actionsGetExecutor); + WorkflowActionBean action = null; + for (WorkflowActionBean bean : actions) { + if (bean.getType().equals("test")) { + action = bean; + break; + } + } + assertNotNull(action); + assertEquals(2, action.getUserRetryCount()); + } + + private void writeToFile(String appXml, String appPath) throws IOException { + File wf = new File(URI.create(appPath)); + PrintWriter out = null; + try { + out = new PrintWriter(new FileWriter(wf)); + out.println(appXml); + } + catch (IOException iex) { + throw iex; + } + finally { + if (out != null) { + out.close(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/99a3df56/core/src/test/java/org/apache/oozie/command/wf/TestForkedActionStartXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestForkedActionStartXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestForkedActionStartXCommand.java index e685621..8eb7438 100644 --- a/core/src/test/java/org/apache/oozie/command/wf/TestForkedActionStartXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/wf/TestForkedActionStartXCommand.java @@ -23,22 +23,16 @@ import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; import java.net.URI; -import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.oozie.ForTestingActionExecutor; -import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.WorkflowJob; -import org.apache.oozie.command.CommandException; -import org.apache.oozie.executor.jpa.JPAExecutorException; -import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor; import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; import org.apache.oozie.service.ActionService; import org.apache.oozie.service.ConfigurationService; import org.apache.oozie.service.ExtendedCallableQueueService; -import org.apache.oozie.service.JPAService; import org.apache.oozie.service.LiteWorkflowStoreService; import org.apache.oozie.service.SchemaService; import org.apache.oozie.service.Services; @@ -153,77 +147,6 @@ public class TestForkedActionStartXCommand extends XDataTestCase { WorkflowJob.Status.KILLED); } - public void testUserRetry() throws JPAExecutorException, IOException, CommandException{ - Configuration conf = new XConfiguration(); - String workflowUri = getTestCaseFileUri("workflow.xml"); - - //@formatter:off - String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:0.3\" name=\"wf-fork\">" - + "<start to=\"fork1\"/>" - + "<fork name=\"fork1\">" - + "<path start=\"action1\"/>" - + "<path start=\"action2\"/>" - + "</fork>" - +"<action name=\"action1\" retry-max=\"2\" retry-interval=\"0\">" - + "<test xmlns=\"uri:test\">" - + "<signal-value>${wf:conf('signal-value')}</signal-value>" - + "<external-status>${wf:conf('external-status')}</external-status> " - + "<error>${wf:conf('error')}</error>" - + "<avoid-set-execution-data>${wf:conf('avoid-set-execution-data')}</avoid-set-execution-data>" - + "<avoid-set-end-data>${wf:conf('avoid-set-end-data')}</avoid-set-end-data>" - + "<running-mode>${wf:conf('running-mode')}</running-mode>" - + "</test>" - + "<ok to=\"join1\"/>" - + "<error to=\"kill\"/>" - + "</action>" - + "<action name=\"action2\">" - + "<fs></fs><ok to=\"join1\"/>" - + "<error to=\"kill\"/>" - + "</action>" - + "<join name=\"join1\" to=\"end\"/>" - + "<kill name=\"kill\"><message>killed</message></kill>" - + "<end name=\"end\"/>" - + "</workflow-app>"; - //@Formatter:on - writeToFile(appXml, workflowUri); - conf.set(OozieClient.APP_PATH, workflowUri); - conf.set(OozieClient.USER_NAME, getTestUser()); - conf.set("error", "start.error"); - conf.set("external-status", "error"); - conf.set("signal-value", "based_on_action_status"); - - SubmitXCommand sc = new SubmitXCommand(conf); - final String jobId = sc.call(); - new StartXCommand(jobId).call(); - final WorkflowActionsGetForJobJPAExecutor actionsGetExecutor = new WorkflowActionsGetForJobJPAExecutor(jobId); - final JPAService jpaService = Services.get().get(JPAService.class); - - waitFor(20 * 1000, new Predicate() { - public boolean evaluate() throws Exception { - List<WorkflowActionBean> actions = jpaService.execute(actionsGetExecutor); - WorkflowActionBean action = null; - for (WorkflowActionBean bean : actions) { - if (bean.getType().equals("test")) { - action = bean; - break; - } - } - return (action != null && action.getUserRetryCount() == 2); - } - }); - - List<WorkflowActionBean> actions = jpaService.execute(actionsGetExecutor); - WorkflowActionBean action = null; - for (WorkflowActionBean bean : actions) { - if (bean.getType().equals("test")) { - action = bean; - break; - } - } - assertNotNull(action); - assertEquals(2, action.getUserRetryCount()); - } - private void writeToFile(String appXml, String appPath) throws IOException { File wf = new File(URI.create(appPath)); PrintWriter out = null; http://git-wip-us.apache.org/repos/asf/oozie/blob/99a3df56/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowKillXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowKillXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowKillXCommand.java index e493d4d..8cc3694 100644 --- a/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowKillXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowKillXCommand.java @@ -159,6 +159,25 @@ public class TestWorkflowKillXCommand extends XDataTestCase { assertEquals(action.getStatus(), WorkflowAction.Status.KILLED); wfInstance = job.getWorkflowInstance(); assertEquals(wfInstance.getStatus(), WorkflowInstance.Status.KILLED); + + services.destroy(); + + sleep(5000); + + setSystemProperty(LiteWorkflowStoreService.CONF_NODE_DEF_VERSION, LiteWorkflowStoreService.NODE_DEF_VERSION_2); + services = new Services(); + services.init(); + + sleep(5000); + + jpaService = Services.get().get(JPAService.class); + job = jpaService.execute(wfJobGetCmd); + action = jpaService.execute(wfActionGetCmd); + assertEquals(job.getStatus(), WorkflowJob.Status.KILLED); + assertEquals(action.getStatus(), WorkflowAction.Status.KILLED); + wfInstance = job.getWorkflowInstance(); + assertEquals(wfInstance.getStatus(), WorkflowInstance.Status.KILLED); + } http://git-wip-us.apache.org/repos/asf/oozie/blob/99a3df56/docs/src/site/twiki/WorkflowFunctionalSpec.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/WorkflowFunctionalSpec.twiki b/docs/src/site/twiki/WorkflowFunctionalSpec.twiki index e7ac50d..9db2a0d 100644 --- a/docs/src/site/twiki/WorkflowFunctionalSpec.twiki +++ b/docs/src/site/twiki/WorkflowFunctionalSpec.twiki @@ -2550,11 +2550,17 @@ Oozie adminstrator can allow more error codes to be handled for User-Retry. By a =oozie.service.LiteWorkflowStoreService.user.retry.error.code.ext= to =oozie.site.xml= and error codes as value, these error codes will be considered as User-Retry after system restart. +Since Oozie 4.3, User-retry allows user to mention retry policy. The value for policy can be =periodic= +or =exponential=, =periodic= being the default. Oozie administrator can define user retry policy for all workflow +actions by adding this configuration =oozie.service.LiteWorkflowStoreService.user.retry.policy= to =oozie.site.xml=. +This value will be considered as user retry policy after system restart. This value can be overridden while defining +actions in workflow xml if needed. The =retry-interval= should be specified in minutes. + Examples of User-Retry in a workflow action is : <verbatim> -<workflow-app xmlns="uri:oozie:workflow:0.3" name="wf-name"> -<action name="a" retry-max="2" retry-interval="1"> +<workflow-app xmlns="uri:oozie:workflow:0.5" name="wf-name"> +<action name="a" retry-max="2" retry-interval="1" retry-policy="exponential"> </action> </verbatim> @@ -2750,6 +2756,7 @@ to be executed. <xs:attribute name="cred" type="xs:string"/> <xs:attribute name="retry-max" type="xs:string"/> <xs:attribute name="retry-interval" type="xs:string"/> + <xs:attribute name="retry-policy" type="xs:string"/> </xs:complexType> <xs:complexType name="MAP-REDUCE"> http://git-wip-us.apache.org/repos/asf/oozie/blob/99a3df56/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 01a8099..5913ab7 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.3.0 release (trunk - unreleased) +OOZIE-2440 Exponential re-try policy for workflow action (satishsaley via jaydeepvishwakarma) OOZIE-2539 Incorrect property key is used for 'hive log4j configuration file for execution mode' (abhishekbafna via jaydeepvishwakarma) OOZIE-2565 [Oozie web Console] Make the timezones in settings tab to be sorted by default (meetchandan via jaydeepvishwakarma) OOZIE-2520 SortBy filter for ordering the jobs query results (abhishekbafna via jaydeepvishwakarma)
