Repository: oozie
Updated Branches:
  refs/heads/master 53c1c81ef -> 2fd0b66b5


OOZIE-3156 Retry SSH action check when cannot connect to remote host (txsing 
via andras.piros)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/2fd0b66b
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/2fd0b66b
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/2fd0b66b

Branch: refs/heads/master
Commit: 2fd0b66b5eb4edc1256a630da8fb37e7c4565800
Parents: 53c1c81
Author: Andras Piros <andras.pi...@cloudera.com>
Authored: Mon Jun 4 16:16:05 2018 +0200
Committer: Andras Piros <andras.pi...@cloudera.com>
Committed: Mon Jun 4 16:16:05 2018 +0200

----------------------------------------------------------------------
 .../oozie/action/ssh/SshActionExecutor.java     |  33 ++++
 core/src/main/resources/oozie-default.xml       |  16 ++
 .../oozie/action/ssh/TestSshActionExecutor.java | 172 ++++++-------------
 docs/src/site/twiki/DG_SshActionExtension.twiki |   5 +
 release-log.txt                                 |   1 +
 5 files changed, 111 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/2fd0b66b/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java 
b/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java
index db0bcd6..128feee 100644
--- a/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java
@@ -81,8 +81,14 @@ public class SshActionExecutor extends ActionExecutor {
 
     public static final String HTTP_COMMAND_OPTIONS = 
"oozie.action.ssh.http.command.post.options";
 
+    public static final String CHECK_MAX_RETRIES = 
"oozie.action.ssh.check.retries.max";
+
+    public static final String CHECK_INITIAL_RETRY_WAIT_TIME = 
"oozie.action.ssh.check.initial.retry.wait.time";
+
     private static final String EXT_STATUS_VAR = "#status";
 
+    private static final int SSH_CONNECT_ERROR_CODE = 255;
+
     private static int maxLen;
     private static boolean allowSshUserAtHost;
 
@@ -547,6 +553,21 @@ public class SshActionExecutor extends ActionExecutor {
         String command = SSH_COMMAND_BASE + action.getTrackerUri() + " ps -p " 
+ action.getExternalId();
         Status aStatus;
         int returnValue = getReturnValue(command);
+        if (returnValue == SSH_CONNECT_ERROR_CODE) {
+            int maxRetryCount = ConfigurationService.getInt(CHECK_MAX_RETRIES, 
3);
+            long waitTime = 
ConfigurationService.getLong(CHECK_INITIAL_RETRY_WAIT_TIME, 3000);
+            for (int retries = 1; retries <= maxRetryCount; retries++) {
+                waitTime = handleRetry(waitTime, retries);
+                returnValue = getReturnValue(command);
+                if (returnValue != SSH_CONNECT_ERROR_CODE) {
+                    break;
+                }
+            }
+            if (returnValue == SSH_CONNECT_ERROR_CODE) {
+                throw new 
ActionExecutorException(ActionExecutorException.ErrorType.FAILED, 
ERR_COULD_NOT_CONNECT,
+                        "Failed to connect to host [" + action.getTrackerUri() 
+ "] for ssh action status check.");
+            }
+        }
         if (returnValue == 0) {
             aStatus = Status.RUNNING;
         }
@@ -564,6 +585,18 @@ public class SshActionExecutor extends ActionExecutor {
         return aStatus;
     }
 
+    private long handleRetry(long sleepBeforeRetryMs, final int retries) {
+        LOG.warn("failed to check ssh action status, sleeping {0} milliseconds 
before retry #{1}", sleepBeforeRetryMs,
+                retries);
+        try {
+            Thread.sleep(sleepBeforeRetryMs);
+        } catch (InterruptedException e) {
+            LOG.error("ssh action status check retry get interrupted during 
wait, caused by {0}", e.getMessage());
+        }
+        sleepBeforeRetryMs *= 2;
+        return sleepBeforeRetryMs;
+    }
+
     /**
      * Execute the callable.
      *

http://git-wip-us.apache.org/repos/asf/oozie/blob/2fd0b66b/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml 
b/core/src/main/resources/oozie-default.xml
index 14bb200..b828c80 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -2059,6 +2059,22 @@ will be the requeue interval for the actions which are 
waiting for a long time w
         </description>
     </property>
 
+    <property>
+        <name>ooozie.action.ssh.check.retries.max</name>
+        <value>3</value>
+        <description>
+            Maximal retry count for ssh action status check
+        </description>
+    </property>
+
+    <property>
+        <name>oozie.action.ssh.check.initial.retry.wait.time</name>
+        <value>3000</value>
+        <description>
+            init wait time that the first retry check needs to wait
+        </description>
+    </property>
+
     <!-- SubworkflowActionExecutor -->
 
     <property>

http://git-wip-us.apache.org/repos/asf/oozie/blob/2fd0b66b/core/src/test/java/org/apache/oozie/action/ssh/TestSshActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/action/ssh/TestSshActionExecutor.java 
b/core/src/test/java/org/apache/oozie/action/ssh/TestSshActionExecutor.java
index 00daa77..942a581 100644
--- a/core/src/test/java/org/apache/oozie/action/ssh/TestSshActionExecutor.java
+++ b/core/src/test/java/org/apache/oozie/action/ssh/TestSshActionExecutor.java
@@ -191,7 +191,7 @@ public class TestSshActionExecutor extends XFsTestCase {
         return "uri:oozie-workflow:0.1";
     }
 
-    public void testJobStart() throws ActionExecutorException {
+    private WorkflowJobBean createBaseWorkflowJobBean(){
         String baseDir = getTestCaseDir();
         Path appPath = new Path(getNameNodeUri(), baseDir);
 
@@ -207,6 +207,11 @@ public class TestSshActionExecutor extends XFsTestCase {
         workflow.setAppPath(wfConf.get(OozieClient.APP_PATH));
         workflow.setProtoActionConf(protoConf.toXmlString());
         
workflow.setId(Services.get().get(UUIDService.class).generateId(ApplicationType.WORKFLOW));
+        return workflow;
+    }
+
+    public void testJobStart() throws ActionExecutorException {
+        WorkflowJobBean workflow = createBaseWorkflowJobBean();
 
         final WorkflowActionBean action = new WorkflowActionBean();
         action.setId("actionId");
@@ -233,21 +238,7 @@ public class TestSshActionExecutor extends XFsTestCase {
     }
 
     public void testJobRecover() throws ActionExecutorException, 
InterruptedException {
-        String baseDir = getTestCaseDir();
-        Path appPath = new Path(getNameNodeUri(), baseDir);
-
-        XConfiguration protoConf = new XConfiguration();
-        protoConf.setStrings(WorkflowAppService.HADOOP_USER, getTestUser());
-
-
-        XConfiguration wfConf = new XConfiguration();
-        wfConf.set(OozieClient.APP_PATH, appPath.toString());
-
-        WorkflowJobBean workflow = new WorkflowJobBean();
-        workflow.setConf(wfConf.toXmlString());
-        workflow.setAppPath(wfConf.get(OozieClient.APP_PATH));
-        workflow.setProtoActionConf(protoConf.toXmlString());
-        
workflow.setId(Services.get().get(UUIDService.class).generateId(ApplicationType.WORKFLOW));
+        WorkflowJobBean workflow = createBaseWorkflowJobBean();
 
         final WorkflowActionBean action = new WorkflowActionBean();
         action.setId("actionId");
@@ -335,21 +326,7 @@ public class TestSshActionExecutor extends XFsTestCase {
 //    }
 
     public void testConnectionErrors() throws ActionExecutorException {
-        String baseDir = getTestCaseDir();
-        Path appPath = new Path(getNameNodeUri(), baseDir);
-
-        XConfiguration protoConf = new XConfiguration();
-        protoConf.setStrings(WorkflowAppService.HADOOP_USER, getTestUser());
-
-
-        XConfiguration wfConf = new XConfiguration();
-        wfConf.set(OozieClient.APP_PATH, appPath.toString());
-
-        WorkflowJobBean workflow = new WorkflowJobBean();
-        workflow.setConf(wfConf.toXmlString());
-        workflow.setAppPath(wfConf.get(OozieClient.APP_PATH));
-        workflow.setProtoActionConf(protoConf.toXmlString());
-        
workflow.setId(Services.get().get(UUIDService.class).generateId(ApplicationType.WORKFLOW));
+        WorkflowJobBean workflow = createBaseWorkflowJobBean();
 
         final WorkflowActionBean action = new WorkflowActionBean();
         action.setId("actionId");
@@ -401,27 +378,14 @@ public class TestSshActionExecutor extends XFsTestCase {
     }
 
     public void testSpaceInArgs() throws Exception {
-        String baseDir = getTestCaseDir();
-        Path appPath = new Path(getNameNodeUri(), baseDir);
+        WorkflowJobBean workflow = createBaseWorkflowJobBean();
 
-        Path script = new Path(baseDir, "script.sh");
+        Path script = new Path(getTestCaseDir(), "script.sh");
         FileSystem fs = FileSystem.getLocal(createJobConf());
         Writer w = new OutputStreamWriter(fs.create(script));
         w.write(ECHO_ARGS_SCRIPT);
         w.close();
 
-        XConfiguration protoConf = new XConfiguration();
-        protoConf.setStrings(WorkflowAppService.HADOOP_USER, getTestUser());
-
-        XConfiguration wfConf = new XConfiguration();
-        wfConf.set(OozieClient.APP_PATH, appPath.toString());
-
-        WorkflowJobBean workflow = new WorkflowJobBean();
-        workflow.setConf(wfConf.toXmlString());
-        workflow.setAppPath(wfConf.get(OozieClient.APP_PATH));
-        workflow.setProtoActionConf(protoConf.toXmlString());
-        
workflow.setId(Services.get().get(UUIDService.class).generateId(ApplicationType.WORKFLOW));
-
         final WorkflowActionBean action = new WorkflowActionBean();
         action.setId("actionId");
         action.setConf("<ssh xmlns='" + getActionXMLSchema() + "'>" +
@@ -458,27 +422,14 @@ public class TestSshActionExecutor extends XFsTestCase {
     }
 
     public void testSpaceInArg() throws Exception {
-        String baseDir = getTestCaseDir();
-        Path appPath = new Path(getNameNodeUri(), baseDir);
+        WorkflowJobBean workflow = createBaseWorkflowJobBean();
 
-        Path script = new Path(baseDir, "script.sh");
+        Path script = new Path(getTestCaseDir(), "script.sh");
         FileSystem fs = FileSystem.getLocal(createJobConf());
         Writer w = new OutputStreamWriter(fs.create(script));
         w.write(ECHO_ARGS_SCRIPT);
         w.close();
 
-        XConfiguration protoConf = new XConfiguration();
-        protoConf.setStrings(WorkflowAppService.HADOOP_USER, getTestUser());
-
-        XConfiguration wfConf = new XConfiguration();
-        wfConf.set(OozieClient.APP_PATH, appPath.toString());
-
-        WorkflowJobBean workflow = new WorkflowJobBean();
-        workflow.setConf(wfConf.toXmlString());
-        workflow.setAppPath(wfConf.get(OozieClient.APP_PATH));
-        workflow.setProtoActionConf(protoConf.toXmlString());
-        
workflow.setId(Services.get().get(UUIDService.class).generateId(ApplicationType.WORKFLOW));
-
         final WorkflowActionBean action = new WorkflowActionBean();
         action.setId("actionId");
         action.setConf("<ssh xmlns='" + getActionXMLSchema() + "'>" +
@@ -512,27 +463,14 @@ public class TestSshActionExecutor extends XFsTestCase {
     }
 
     public void testNoArgsNorArg() throws Exception {
-        String baseDir = getTestCaseDir();
-        Path appPath = new Path(getNameNodeUri(), baseDir);
+        WorkflowJobBean workflow = createBaseWorkflowJobBean();
 
-        Path script = new Path(baseDir, "script.sh");
+        Path script = new Path(getTestCaseDir(), "script.sh");
         FileSystem fs = FileSystem.getLocal(createJobConf());
         Writer w = new OutputStreamWriter(fs.create(script));
         w.write("echo \"prop1=something\"");
         w.close();
 
-        XConfiguration protoConf = new XConfiguration();
-        protoConf.setStrings(WorkflowAppService.HADOOP_USER, getTestUser());
-
-        XConfiguration wfConf = new XConfiguration();
-        wfConf.set(OozieClient.APP_PATH, appPath.toString());
-
-        WorkflowJobBean workflow = new WorkflowJobBean();
-        workflow.setConf(wfConf.toXmlString());
-        workflow.setAppPath(wfConf.get(OozieClient.APP_PATH));
-        workflow.setProtoActionConf(protoConf.toXmlString());
-        
workflow.setId(Services.get().get(UUIDService.class).generateId(ApplicationType.WORKFLOW));
-
         final WorkflowActionBean action = new WorkflowActionBean();
         action.setId("actionId");
         action.setConf("<ssh xmlns='" + getActionXMLSchema() + "'>" +
@@ -558,24 +496,50 @@ public class TestSshActionExecutor extends XFsTestCase {
     }
 
     /**
-     * test {@code SshActionExecutor.check()} method with invalid
-     * xml configuration
+     * test {@code SshActionExecutor.check()} method with host connection
+     * failure.
      */
-    public void testSshCheckWithInvalidXml() throws Exception  {
-        String baseDir = getTestCaseDir();
-        Path appPath = new Path(getNameNodeUri(), baseDir);
+    public void testSshCheckWithConnectionError() throws Exception  {
+        WorkflowJobBean workflow = createBaseWorkflowJobBean();
 
-        XConfiguration protoConf = new XConfiguration();
-        protoConf.setStrings(WorkflowAppService.HADOOP_USER, getTestUser());
+        final WorkflowActionBean action = new WorkflowActionBean();
+        action.setId("actionId");
+        action.setConf("<ssh xmlns='" + getActionXMLSchema() + "'>" +
+                       "<host>localhost</host>" +
+                       "<command>echo</command>" +
+                       "<args>\"prop1=something\"</args>" +
+                       "</ssh>");
+        action.setName("ssh");
+        final SshActionExecutor ssh = new SshActionExecutor();
+        final Context context = new Context(workflow, action);
+        ssh.start(context, action);
 
-        XConfiguration wfConf = new XConfiguration();
-        wfConf.set(OozieClient.APP_PATH, appPath.toString());
+        String originTrackerUri = action.getTrackerUri();
+        action.setTrackerUri("dummy@dummyHost");
+        try {
+            ssh.check(context, action);
+            fail("testCheckConnectionError expected ex error");
+        } catch (ActionExecutorException e) {
+            assertEquals(SshActionExecutor.ERR_COULD_NOT_CONNECT, 
e.getErrorCode());
+        }
 
-        WorkflowJobBean workflow = new WorkflowJobBean();
-        workflow.setConf(wfConf.toXmlString());
-        workflow.setAppPath(wfConf.get(OozieClient.APP_PATH));
-        workflow.setProtoActionConf(protoConf.toXmlString());
-        
workflow.setId(Services.get().get(UUIDService.class).generateId(ApplicationType.WORKFLOW));
+        action.setTrackerUri(originTrackerUri);
+        waitFor(30 * 1000, new Predicate() {
+            public boolean evaluate() throws Exception {
+                ssh.check(context, action);
+                return Status.DONE == action.getStatus();
+            }
+        });
+        ssh.end(context, action);
+        assertEquals(Status.OK, action.getStatus());
+    }
+
+    /**
+     * test {@code SshActionExecutor.check()} method with invalid
+     * xml configuration
+     */
+    public void testSshCheckWithInvalidXml() throws Exception  {
+        WorkflowJobBean workflow = createBaseWorkflowJobBean();
 
         WorkflowActionBean action = new WorkflowActionBean();
         action.setId("actionId");
@@ -599,19 +563,7 @@ public class TestSshActionExecutor extends XFsTestCase {
      * xml configuration
      */
     public void testSshStartWithInvalidXml() throws Exception {
-        String baseDir = getTestCaseDir();
-        Path appPath = new Path(getNameNodeUri(), baseDir);
-        XConfiguration protoConf = new XConfiguration();
-        protoConf.setStrings(WorkflowAppService.HADOOP_USER, getTestUser());
-
-        XConfiguration wfConf = new XConfiguration();
-        wfConf.set(OozieClient.APP_PATH, appPath.toString());
-
-        WorkflowJobBean workflow = new WorkflowJobBean();
-        workflow.setConf(wfConf.toXmlString());
-        workflow.setAppPath(wfConf.get(OozieClient.APP_PATH));
-        workflow.setProtoActionConf(protoConf.toXmlString());
-        
workflow.setId(Services.get().get(UUIDService.class).generateId(ApplicationType.WORKFLOW));
+        WorkflowJobBean workflow = createBaseWorkflowJobBean();
 
         WorkflowActionBean action = new WorkflowActionBean();
         action.setId("actionId");
@@ -632,19 +584,7 @@ public class TestSshActionExecutor extends XFsTestCase {
      * sequence call
      */
     public void testJobStartAndKill() throws Exception {
-        String baseDir = getTestCaseDir();
-        Path appPath = new Path(getNameNodeUri(), baseDir);
-        XConfiguration protoConf = new XConfiguration();
-        protoConf.setStrings(WorkflowAppService.HADOOP_USER, getTestUser());
-
-        XConfiguration wfConf = new XConfiguration();
-        wfConf.set(OozieClient.APP_PATH, appPath.toString());
-
-        WorkflowJobBean workflow = new WorkflowJobBean();
-        workflow.setConf(wfConf.toXmlString());
-        workflow.setAppPath(wfConf.get(OozieClient.APP_PATH));
-        workflow.setProtoActionConf(protoConf.toXmlString());
-        
workflow.setId(Services.get().get(UUIDService.class).generateId(ApplicationType.WORKFLOW));
+        WorkflowJobBean workflow = createBaseWorkflowJobBean();
 
         WorkflowActionBean action = new WorkflowActionBean();
         action.setId("actionId");

http://git-wip-us.apache.org/repos/asf/oozie/blob/2fd0b66b/docs/src/site/twiki/DG_SshActionExtension.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/DG_SshActionExtension.twiki 
b/docs/src/site/twiki/DG_SshActionExtension.twiki
index 99b96f7..5a51d49 100644
--- a/docs/src/site/twiki/DG_SshActionExtension.twiki
+++ b/docs/src/site/twiki/DG_SshActionExtension.twiki
@@ -27,6 +27,11 @@ command must follow the following requirements:
 Note: Ssh Action will fail if any output is written to standard error / output 
upon login (e.g. .bashrc of the remote
 user contains ls -a).
 
+Note: Ssh Action will fail if oozie fails to ssh connect to host for action 
status check
+(e.g., the host is under heavy load, or network is bad) after a configurable 
number (3 by default) of retries.
+The first retry will wait a configurable period of time ( 3 seconds by 
default) before check.
+The following retries will wait 2 times of previous wait time.
+
 *Syntax:*
 
 <verbatim>

http://git-wip-us.apache.org/repos/asf/oozie/blob/2fd0b66b/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 21dfe34..d07d459 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.1.0 release (trunk - unreleased)
 
+OOZIE-3156 Retry SSH action check when cannot connect to remote host (txsing 
via andras.piros)
 OOZIE-3227 Eliminate duplicate dependencies when using Hadoop 3 
DistributedCache (dionusos via andras.piros)
 OOZIE-2097 Get rid of non-Javadoc comments (Jan Hentschel via andras.piros)
 OOZIE-3269 Flaky tests in TestCoordMaterializeTriggerService class (pbacsko 
via andras.piros)

Reply via email to