Repository: oozie
Updated Branches:
  refs/heads/master 2ae61cd85 -> 06c8dcdba


OOZIE-2088 Exponential retries for transient failures (pavan kumar via 
shwethags)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/06c8dcdb
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/06c8dcdb
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/06c8dcdb

Branch: refs/heads/master
Commit: 06c8dcdbaf48d05b2662b83a0dc579d237677258
Parents: 2ae61cd
Author: shwethags <[email protected]>
Authored: Wed Jan 21 16:29:26 2015 +0530
Committer: shwethags <[email protected]>
Committed: Wed Jan 21 16:29:26 2015 +0530

----------------------------------------------------------------------
 .../org/apache/oozie/action/ActionExecutor.java | 51 ++++++++++++++++++--
 .../oozie/command/wf/ActionCheckXCommand.java   |  5 --
 .../apache/oozie/command/wf/ActionXCommand.java | 18 ++++++-
 core/src/main/resources/oozie-default.xml       | 16 ++++++
 .../apache/oozie/action/TestActionExecutor.java |  8 ++-
 .../command/wf/TestActionCheckXCommand.java     |  3 --
 release-log.txt                                 |  1 +
 7 files changed, 87 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/06c8dcdb/core/src/main/java/org/apache/oozie/action/ActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/ActionExecutor.java 
b/core/src/main/java/org/apache/oozie/action/ActionExecutor.java
index ff836fb..ea652f3 100644
--- a/core/src/main/java/org/apache/oozie/action/ActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/ActionExecutor.java
@@ -18,6 +18,7 @@
 
 package org.apache.oozie.action;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.conf.Configuration;
@@ -49,7 +50,11 @@ public abstract class ActionExecutor {
      */
        public static final String CONF_PREFIX = "oozie.action.";
 
-       public static final String MAX_RETRIES = CONF_PREFIX + "retries.max";
+    public static final String MAX_RETRIES = CONF_PREFIX + "retries.max";
+
+    public static final String ACTION_RETRY_INTERVAL = CONF_PREFIX + 
"retry.interval";
+
+    public static final String ACTION_RETRY_POLICY = CONF_PREFIX + 
"retry.policy";
 
     /**
      * Error code used by {@link #convertException} when there is not register 
error information for an exception.
@@ -58,6 +63,10 @@ public abstract class ActionExecutor {
     
     public boolean requiresNNJT = false;
 
+    public static enum RETRYPOLICY {
+        EXPONENTIAL, PERIODIC
+    }
+
     private static class ErrorInfo {
         ActionExecutorException.ErrorType errorType;
         String errorCode;
@@ -215,6 +224,7 @@ public abstract class ActionExecutor {
     private String type;
     private int maxRetries;
     private long retryInterval;
+    private RETRYPOLICY retryPolicy;
 
     /**
      * Create an action executor with default retry parameters.
@@ -229,12 +239,27 @@ public abstract class ActionExecutor {
      * Create an action executor.
      *
      * @param type action executor type.
-     * @param retryInterval retry interval, in seconds.
+     * @param defaultRetryInterval retry interval, in seconds.
      */
-    protected ActionExecutor(String type, long retryInterval) {
+    protected ActionExecutor(String type, long defaultRetryInterval) {
         this.type = ParamChecker.notEmpty(type, "type");
         this.maxRetries = ConfigurationService.getInt(MAX_RETRIES);
-        this.retryInterval = retryInterval;
+        int retryInterval = ConfigurationService.getInt(ACTION_RETRY_INTERVAL);
+        this.retryInterval = retryInterval > 0 ? retryInterval : 
defaultRetryInterval;
+        this.retryPolicy = getRetryPolicyFromConf();
+    }
+
+    private RETRYPOLICY getRetryPolicyFromConf() {
+        String retryPolicy = ConfigurationService.get(ACTION_RETRY_POLICY);
+        if (StringUtils.isBlank(retryPolicy)) {
+            return RETRYPOLICY.PERIODIC;
+        } else {
+            try {
+                return RETRYPOLICY.valueOf(retryPolicy.toUpperCase().trim());
+            } catch (IllegalArgumentException e) {
+                return RETRYPOLICY.PERIODIC;
+            }
+        }
     }
 
     /**
@@ -357,6 +382,24 @@ public abstract class ActionExecutor {
     }
 
     /**
+     * Return the retry policy for the action executor.
+     *
+     * @return the retry policy for the action executor.
+     */
+    public RETRYPOLICY getRetryPolicy() {
+        return retryPolicy;
+    }
+
+    /**
+     * Sets the retry policy for the action executor.
+     *
+     * @param retryPolicy retry policy for the action executor.
+     */
+    public void setRetryPolicy(RETRYPOLICY retryPolicy) {
+        this.retryPolicy = retryPolicy;
+    }
+
+    /**
      * Return the retry interval for the action executor in seconds.
      *
      * @return the retry interval for the action executor in seconds.

http://git-wip-us.apache.org/repos/asf/oozie/blob/06c8dcdb/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java 
b/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
index 1fad11c..e9488d4 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
@@ -166,11 +166,6 @@ public class ActionCheckXCommand extends 
ActionXCommand<Void> {
     @Override
     protected Void execute() throws CommandException {
         LOG.debug("STARTED ActionCheckXCommand for wf actionId=" + actionId + 
" priority =" + getPriority());
-
-        long retryInterval = 
Services.get().getConf().getLong(ActionCheckerService.CONF_ACTION_CHECK_INTERVAL,
 executor
-                .getRetryInterval());
-        executor.setRetryInterval(retryInterval);
-
         ActionExecutorContext context = null;
         boolean execSynchronous = false;
         try {

http://git-wip-us.apache.org/repos/asf/oozie/blob/06c8dcdb/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java 
b/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java
index 9e7243a..69a363b 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java
@@ -47,7 +47,6 @@ import org.apache.oozie.service.Services;
 import org.apache.oozie.util.ELEvaluator;
 import org.apache.oozie.util.InstrumentUtils;
 import org.apache.oozie.util.Instrumentation;
-import org.apache.oozie.util.LogUtils;
 import org.apache.oozie.util.XConfiguration;
 import org.apache.oozie.workflow.WorkflowException;
 import org.apache.oozie.workflow.WorkflowInstance;
@@ -93,7 +92,7 @@ public abstract class ActionXCommand<T> extends 
WorkflowXCommand<Void> {
             action.setStatus(status);
             action.setPending();
             action.incRetries();
-            long retryDelayMillis = executor.getRetryInterval() * 1000;
+            long retryDelayMillis = getRetryDelay(actionRetryCount, 
executor.getRetryInterval(), executor.getRetryPolicy());
             action.setPendingAge(new Date(System.currentTimeMillis() + 
retryDelayMillis));
             LOG.info("Next Retry, Attempt Number [{0}] in [{1}] milliseconds", 
actionRetryCount + 1, retryDelayMillis);
             this.resetUsed();
@@ -254,6 +253,21 @@ public abstract class ActionXCommand<T> extends 
WorkflowXCommand<Void> {
         getInstrumentation().addCron(INSTRUMENTATION_GROUP, type + "#" + 
getName(), cron);
     }
 
+    /*
+     * Returns the next retry time in milliseconds, based on retry policy 
algorithm.
+     */
+    private long getRetryDelay(int retryCount, long retryInterval, 
ActionExecutor.RETRYPOLICY retryPolicy) {
+        switch (retryPolicy) {
+            case EXPONENTIAL:
+                long retryTime = ((long) Math.pow(2, retryCount) * 
retryInterval * 1000L);
+                return retryTime;
+            case PERIODIC:
+                return retryInterval * 1000L;
+            default:
+                throw new UnsupportedOperationException("Retry policy not 
supported");
+        }
+    }
+
     /**
      * Workflow action executor context
      *

http://git-wip-us.apache.org/repos/asf/oozie/blob/06c8dcdb/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml 
b/core/src/main/resources/oozie-default.xml
index e849fc7..1cb937e 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -1657,6 +1657,22 @@
         </description>
     </property>
 
+    <property>
+        <name>oozie.action.retry.interval</name>
+        <value>10</value>
+        <description>
+            The interval between retries of an action in case of failure
+        </description>
+    </property>
+
+    <property>
+        <name>oozie.action.retry.policy</name>
+        <value>periodic</value>
+        <description>
+            Retry policy of an action in case of failure. Possible values are 
periodic/exponential
+        </description>
+    </property>
+
     <!-- SshActionExecutor -->
 
     <property>

http://git-wip-us.apache.org/repos/asf/oozie/blob/06c8dcdb/core/src/test/java/org/apache/oozie/action/TestActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/TestActionExecutor.java 
b/core/src/test/java/org/apache/oozie/action/TestActionExecutor.java
index 18da224..737510c 100644
--- a/core/src/test/java/org/apache/oozie/action/TestActionExecutor.java
+++ b/core/src/test/java/org/apache/oozie/action/TestActionExecutor.java
@@ -19,6 +19,8 @@
 package org.apache.oozie.action;
 
 import java.io.EOFException;
+
+import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.test.XTestCase;
 import org.apache.oozie.client.WorkflowAction;
@@ -31,6 +33,7 @@ public class TestActionExecutor extends XTestCase {
     private static class MyActionExecutor extends ActionExecutor {
 
                private int maxRetries;
+        private long retryInterval;
 
         protected MyActionExecutor() {
             super("type");
@@ -48,12 +51,15 @@ public class TestActionExecutor extends XTestCase {
             super("type", retryInterval);
             super.setMaxRetries(maxRetries);
             this.maxRetries = maxRetries;
+            super.setRetryInterval(retryInterval);
+            this.retryInterval = retryInterval;
+
         }
 
         public void start(Context context, WorkflowAction action) throws 
ActionExecutorException {
             assertEquals("type", getType());
             assertEquals(this.maxRetries, getMaxRetries());
-            assertEquals(ActionExecutor.RETRY_INTERVAL, getRetryInterval());
+            
assertEquals(ConfigurationService.getLong(ActionExecutor.ACTION_RETRY_INTERVAL),
 getRetryInterval());
         }
 
         public void end(Context context, WorkflowAction action) throws 
ActionExecutorException {

http://git-wip-us.apache.org/repos/asf/oozie/blob/06c8dcdb/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java 
b/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java
index 9304823..efde282 100644
--- 
a/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java
+++ 
b/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java
@@ -546,9 +546,6 @@ public class TestActionCheckXCommand extends XDataTestCase {
      */
     public void testCheckInterval() throws Exception {
         long testedValue = 10;
-        
Services.get().getConf().setLong(ActionCheckerService.CONF_ACTION_CHECK_INTERVAL,
-                testedValue);
-
         WorkflowJobBean job0 = 
this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, 
WorkflowInstance.Status.RUNNING);
         final String jobId = job0.getId();
         WorkflowActionBean action0 = this.addRecordToWfActionTable(jobId, "1", 
WorkflowAction.Status.RUNNING);

http://git-wip-us.apache.org/repos/asf/oozie/blob/06c8dcdb/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 8683eb5..76f3f6d 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.2.0 release (trunk - unreleased)
 
+OOZIE-2088 Exponential retries for transient failures (pavan kumar via 
shwethags)
 OOZIE-2111 Kerberized Oozie doesn't allow connections from users with a lot of 
groups (rkanter)
 OOZIE-2104 oozie server dies on startup if oozie-site redefines ActionExecutor 
classes (rkanter)
 OOZIE-2092 Provide option to supply config to workflow during rerun of 
coordinator (jaydeepvishwakarma via shwethags)

Reply via email to