Repository: oozie Updated Branches: refs/heads/master 2ae61cd85 -> 06c8dcdba
OOZIE-2088 Exponential retries for transient failures (pavan kumar via shwethags) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/06c8dcdb Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/06c8dcdb Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/06c8dcdb Branch: refs/heads/master Commit: 06c8dcdbaf48d05b2662b83a0dc579d237677258 Parents: 2ae61cd Author: shwethags <[email protected]> Authored: Wed Jan 21 16:29:26 2015 +0530 Committer: shwethags <[email protected]> Committed: Wed Jan 21 16:29:26 2015 +0530 ---------------------------------------------------------------------- .../org/apache/oozie/action/ActionExecutor.java | 51 ++++++++++++++++++-- .../oozie/command/wf/ActionCheckXCommand.java | 5 -- .../apache/oozie/command/wf/ActionXCommand.java | 18 ++++++- core/src/main/resources/oozie-default.xml | 16 ++++++ .../apache/oozie/action/TestActionExecutor.java | 8 ++- .../command/wf/TestActionCheckXCommand.java | 3 -- release-log.txt | 1 + 7 files changed, 87 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/06c8dcdb/core/src/main/java/org/apache/oozie/action/ActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/ActionExecutor.java b/core/src/main/java/org/apache/oozie/action/ActionExecutor.java index ff836fb..ea652f3 100644 --- a/core/src/main/java/org/apache/oozie/action/ActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/ActionExecutor.java @@ -18,6 +18,7 @@ package org.apache.oozie.action; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.Configuration; @@ -49,7 +50,11 @@ public abstract class ActionExecutor { */ public static final String CONF_PREFIX = "oozie.action."; - public static final String MAX_RETRIES = CONF_PREFIX + "retries.max"; + public static final String MAX_RETRIES = CONF_PREFIX + "retries.max"; + + public static final String ACTION_RETRY_INTERVAL = CONF_PREFIX + "retry.interval"; + + public static final String ACTION_RETRY_POLICY = CONF_PREFIX + "retry.policy"; /** * Error code used by {@link #convertException} when there is not register error information for an exception. @@ -58,6 +63,10 @@ public abstract class ActionExecutor { public boolean requiresNNJT = false; + public static enum RETRYPOLICY { + EXPONENTIAL, PERIODIC + } + private static class ErrorInfo { ActionExecutorException.ErrorType errorType; String errorCode; @@ -215,6 +224,7 @@ public abstract class ActionExecutor { private String type; private int maxRetries; private long retryInterval; + private RETRYPOLICY retryPolicy; /** * Create an action executor with default retry parameters. @@ -229,12 +239,27 @@ public abstract class ActionExecutor { * Create an action executor. * * @param type action executor type. - * @param retryInterval retry interval, in seconds. + * @param defaultRetryInterval retry interval, in seconds. */ - protected ActionExecutor(String type, long retryInterval) { + protected ActionExecutor(String type, long defaultRetryInterval) { this.type = ParamChecker.notEmpty(type, "type"); this.maxRetries = ConfigurationService.getInt(MAX_RETRIES); - this.retryInterval = retryInterval; + int retryInterval = ConfigurationService.getInt(ACTION_RETRY_INTERVAL); + this.retryInterval = retryInterval > 0 ? retryInterval : defaultRetryInterval; + this.retryPolicy = getRetryPolicyFromConf(); + } + + private RETRYPOLICY getRetryPolicyFromConf() { + String retryPolicy = ConfigurationService.get(ACTION_RETRY_POLICY); + if (StringUtils.isBlank(retryPolicy)) { + return RETRYPOLICY.PERIODIC; + } else { + try { + return RETRYPOLICY.valueOf(retryPolicy.toUpperCase().trim()); + } catch (IllegalArgumentException e) { + return RETRYPOLICY.PERIODIC; + } + } } /** @@ -357,6 +382,24 @@ public abstract class ActionExecutor { } /** + * Return the retry policy for the action executor. + * + * @return the retry policy for the action executor. + */ + public RETRYPOLICY getRetryPolicy() { + return retryPolicy; + } + + /** + * Sets the retry policy for the action executor. + * + * @param retryPolicy retry policy for the action executor. + */ + public void setRetryPolicy(RETRYPOLICY retryPolicy) { + this.retryPolicy = retryPolicy; + } + + /** * Return the retry interval for the action executor in seconds. * * @return the retry interval for the action executor in seconds. http://git-wip-us.apache.org/repos/asf/oozie/blob/06c8dcdb/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java index 1fad11c..e9488d4 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java @@ -166,11 +166,6 @@ public class ActionCheckXCommand extends ActionXCommand<Void> { @Override protected Void execute() throws CommandException { LOG.debug("STARTED ActionCheckXCommand for wf actionId=" + actionId + " priority =" + getPriority()); - - long retryInterval = Services.get().getConf().getLong(ActionCheckerService.CONF_ACTION_CHECK_INTERVAL, executor - .getRetryInterval()); - executor.setRetryInterval(retryInterval); - ActionExecutorContext context = null; boolean execSynchronous = false; try { http://git-wip-us.apache.org/repos/asf/oozie/blob/06c8dcdb/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java index 9e7243a..69a363b 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java @@ -47,7 +47,6 @@ import org.apache.oozie.service.Services; import org.apache.oozie.util.ELEvaluator; import org.apache.oozie.util.InstrumentUtils; import org.apache.oozie.util.Instrumentation; -import org.apache.oozie.util.LogUtils; import org.apache.oozie.util.XConfiguration; import org.apache.oozie.workflow.WorkflowException; import org.apache.oozie.workflow.WorkflowInstance; @@ -93,7 +92,7 @@ public abstract class ActionXCommand<T> extends WorkflowXCommand<Void> { action.setStatus(status); action.setPending(); action.incRetries(); - long retryDelayMillis = executor.getRetryInterval() * 1000; + long retryDelayMillis = getRetryDelay(actionRetryCount, executor.getRetryInterval(), executor.getRetryPolicy()); action.setPendingAge(new Date(System.currentTimeMillis() + retryDelayMillis)); LOG.info("Next Retry, Attempt Number [{0}] in [{1}] milliseconds", actionRetryCount + 1, retryDelayMillis); this.resetUsed(); @@ -254,6 +253,21 @@ public abstract class ActionXCommand<T> extends WorkflowXCommand<Void> { getInstrumentation().addCron(INSTRUMENTATION_GROUP, type + "#" + getName(), cron); } + /* + * Returns the next retry time in milliseconds, based on retry policy algorithm. + */ + private long getRetryDelay(int retryCount, long retryInterval, ActionExecutor.RETRYPOLICY retryPolicy) { + switch (retryPolicy) { + case EXPONENTIAL: + long retryTime = ((long) Math.pow(2, retryCount) * retryInterval * 1000L); + return retryTime; + case PERIODIC: + return retryInterval * 1000L; + default: + throw new UnsupportedOperationException("Retry policy not supported"); + } + } + /** * Workflow action executor context * http://git-wip-us.apache.org/repos/asf/oozie/blob/06c8dcdb/core/src/main/resources/oozie-default.xml ---------------------------------------------------------------------- diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml index e849fc7..1cb937e 100644 --- a/core/src/main/resources/oozie-default.xml +++ b/core/src/main/resources/oozie-default.xml @@ -1657,6 +1657,22 @@ </description> </property> + <property> + <name>oozie.action.retry.interval</name> + <value>10</value> + <description> + The interval between retries of an action in case of failure + </description> + </property> + + <property> + <name>oozie.action.retry.policy</name> + <value>periodic</value> + <description> + Retry policy of an action in case of failure. Possible values are periodic/exponential + </description> + </property> + <!-- SshActionExecutor --> <property> http://git-wip-us.apache.org/repos/asf/oozie/blob/06c8dcdb/core/src/test/java/org/apache/oozie/action/TestActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/TestActionExecutor.java b/core/src/test/java/org/apache/oozie/action/TestActionExecutor.java index 18da224..737510c 100644 --- a/core/src/test/java/org/apache/oozie/action/TestActionExecutor.java +++ b/core/src/test/java/org/apache/oozie/action/TestActionExecutor.java @@ -19,6 +19,8 @@ package org.apache.oozie.action; import java.io.EOFException; + +import org.apache.oozie.service.ConfigurationService; import org.apache.oozie.service.Services; import org.apache.oozie.test.XTestCase; import org.apache.oozie.client.WorkflowAction; @@ -31,6 +33,7 @@ public class TestActionExecutor extends XTestCase { private static class MyActionExecutor extends ActionExecutor { private int maxRetries; + private long retryInterval; protected MyActionExecutor() { super("type"); @@ -48,12 +51,15 @@ public class TestActionExecutor extends XTestCase { super("type", retryInterval); super.setMaxRetries(maxRetries); this.maxRetries = maxRetries; + super.setRetryInterval(retryInterval); + this.retryInterval = retryInterval; + } public void start(Context context, WorkflowAction action) throws ActionExecutorException { assertEquals("type", getType()); assertEquals(this.maxRetries, getMaxRetries()); - assertEquals(ActionExecutor.RETRY_INTERVAL, getRetryInterval()); + assertEquals(ConfigurationService.getLong(ActionExecutor.ACTION_RETRY_INTERVAL), getRetryInterval()); } public void end(Context context, WorkflowAction action) throws ActionExecutorException { http://git-wip-us.apache.org/repos/asf/oozie/blob/06c8dcdb/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java index 9304823..efde282 100644 --- a/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java @@ -546,9 +546,6 @@ public class TestActionCheckXCommand extends XDataTestCase { */ public void testCheckInterval() throws Exception { long testedValue = 10; - Services.get().getConf().setLong(ActionCheckerService.CONF_ACTION_CHECK_INTERVAL, - testedValue); - WorkflowJobBean job0 = this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING); final String jobId = job0.getId(); WorkflowActionBean action0 = this.addRecordToWfActionTable(jobId, "1", WorkflowAction.Status.RUNNING); http://git-wip-us.apache.org/repos/asf/oozie/blob/06c8dcdb/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 8683eb5..76f3f6d 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.2.0 release (trunk - unreleased) +OOZIE-2088 Exponential retries for transient failures (pavan kumar via shwethags) OOZIE-2111 Kerberized Oozie doesn't allow connections from users with a lot of groups (rkanter) OOZIE-2104 oozie server dies on startup if oozie-site redefines ActionExecutor classes (rkanter) OOZIE-2092 Provide option to supply config to workflow during rerun of coordinator (jaydeepvishwakarma via shwethags)
