Repository: oozie Updated Branches: refs/heads/master 53c1c81ef -> 2fd0b66b5
OOZIE-3156 Retry SSH action check when cannot connect to remote host (txsing via andras.piros) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/2fd0b66b Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/2fd0b66b Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/2fd0b66b Branch: refs/heads/master Commit: 2fd0b66b5eb4edc1256a630da8fb37e7c4565800 Parents: 53c1c81 Author: Andras Piros <andras.pi...@cloudera.com> Authored: Mon Jun 4 16:16:05 2018 +0200 Committer: Andras Piros <andras.pi...@cloudera.com> Committed: Mon Jun 4 16:16:05 2018 +0200 ---------------------------------------------------------------------- .../oozie/action/ssh/SshActionExecutor.java | 33 ++++ core/src/main/resources/oozie-default.xml | 16 ++ .../oozie/action/ssh/TestSshActionExecutor.java | 172 ++++++------------- docs/src/site/twiki/DG_SshActionExtension.twiki | 5 + release-log.txt | 1 + 5 files changed, 111 insertions(+), 116 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/2fd0b66b/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java b/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java index db0bcd6..128feee 100644 --- a/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java @@ -81,8 +81,14 @@ public class SshActionExecutor extends ActionExecutor { public static final String HTTP_COMMAND_OPTIONS = "oozie.action.ssh.http.command.post.options"; + public static final String CHECK_MAX_RETRIES = "oozie.action.ssh.check.retries.max"; + + public static final String CHECK_INITIAL_RETRY_WAIT_TIME = "oozie.action.ssh.check.initial.retry.wait.time"; + private static final String EXT_STATUS_VAR = "#status"; + private static final int SSH_CONNECT_ERROR_CODE = 255; + private static int maxLen; private static boolean allowSshUserAtHost; @@ -547,6 +553,21 @@ public class SshActionExecutor extends ActionExecutor { String command = SSH_COMMAND_BASE + action.getTrackerUri() + " ps -p " + action.getExternalId(); Status aStatus; int returnValue = getReturnValue(command); + if (returnValue == SSH_CONNECT_ERROR_CODE) { + int maxRetryCount = ConfigurationService.getInt(CHECK_MAX_RETRIES, 3); + long waitTime = ConfigurationService.getLong(CHECK_INITIAL_RETRY_WAIT_TIME, 3000); + for (int retries = 1; retries <= maxRetryCount; retries++) { + waitTime = handleRetry(waitTime, retries); + returnValue = getReturnValue(command); + if (returnValue != SSH_CONNECT_ERROR_CODE) { + break; + } + } + if (returnValue == SSH_CONNECT_ERROR_CODE) { + throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, ERR_COULD_NOT_CONNECT, + "Failed to connect to host [" + action.getTrackerUri() + "] for ssh action status check."); + } + } if (returnValue == 0) { aStatus = Status.RUNNING; } @@ -564,6 +585,18 @@ public class SshActionExecutor extends ActionExecutor { return aStatus; } + private long handleRetry(long sleepBeforeRetryMs, final int retries) { + LOG.warn("failed to check ssh action status, sleeping {0} milliseconds before retry #{1}", sleepBeforeRetryMs, + retries); + try { + Thread.sleep(sleepBeforeRetryMs); + } catch (InterruptedException e) { + LOG.error("ssh action status check retry get interrupted during wait, caused by {0}", e.getMessage()); + } + sleepBeforeRetryMs *= 2; + return sleepBeforeRetryMs; + } + /** * Execute the callable. * http://git-wip-us.apache.org/repos/asf/oozie/blob/2fd0b66b/core/src/main/resources/oozie-default.xml ---------------------------------------------------------------------- diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml index 14bb200..b828c80 100644 --- a/core/src/main/resources/oozie-default.xml +++ b/core/src/main/resources/oozie-default.xml @@ -2059,6 +2059,22 @@ will be the requeue interval for the actions which are waiting for a long time w </description> </property> + <property> + <name>ooozie.action.ssh.check.retries.max</name> + <value>3</value> + <description> + Maximal retry count for ssh action status check + </description> + </property> + + <property> + <name>oozie.action.ssh.check.initial.retry.wait.time</name> + <value>3000</value> + <description> + init wait time that the first retry check needs to wait + </description> + </property> + <!-- SubworkflowActionExecutor --> <property> http://git-wip-us.apache.org/repos/asf/oozie/blob/2fd0b66b/core/src/test/java/org/apache/oozie/action/ssh/TestSshActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/ssh/TestSshActionExecutor.java b/core/src/test/java/org/apache/oozie/action/ssh/TestSshActionExecutor.java index 00daa77..942a581 100644 --- a/core/src/test/java/org/apache/oozie/action/ssh/TestSshActionExecutor.java +++ b/core/src/test/java/org/apache/oozie/action/ssh/TestSshActionExecutor.java @@ -191,7 +191,7 @@ public class TestSshActionExecutor extends XFsTestCase { return "uri:oozie-workflow:0.1"; } - public void testJobStart() throws ActionExecutorException { + private WorkflowJobBean createBaseWorkflowJobBean(){ String baseDir = getTestCaseDir(); Path appPath = new Path(getNameNodeUri(), baseDir); @@ -207,6 +207,11 @@ public class TestSshActionExecutor extends XFsTestCase { workflow.setAppPath(wfConf.get(OozieClient.APP_PATH)); workflow.setProtoActionConf(protoConf.toXmlString()); workflow.setId(Services.get().get(UUIDService.class).generateId(ApplicationType.WORKFLOW)); + return workflow; + } + + public void testJobStart() throws ActionExecutorException { + WorkflowJobBean workflow = createBaseWorkflowJobBean(); final WorkflowActionBean action = new WorkflowActionBean(); action.setId("actionId"); @@ -233,21 +238,7 @@ public class TestSshActionExecutor extends XFsTestCase { } public void testJobRecover() throws ActionExecutorException, InterruptedException { - String baseDir = getTestCaseDir(); - Path appPath = new Path(getNameNodeUri(), baseDir); - - XConfiguration protoConf = new XConfiguration(); - protoConf.setStrings(WorkflowAppService.HADOOP_USER, getTestUser()); - - - XConfiguration wfConf = new XConfiguration(); - wfConf.set(OozieClient.APP_PATH, appPath.toString()); - - WorkflowJobBean workflow = new WorkflowJobBean(); - workflow.setConf(wfConf.toXmlString()); - workflow.setAppPath(wfConf.get(OozieClient.APP_PATH)); - workflow.setProtoActionConf(protoConf.toXmlString()); - workflow.setId(Services.get().get(UUIDService.class).generateId(ApplicationType.WORKFLOW)); + WorkflowJobBean workflow = createBaseWorkflowJobBean(); final WorkflowActionBean action = new WorkflowActionBean(); action.setId("actionId"); @@ -335,21 +326,7 @@ public class TestSshActionExecutor extends XFsTestCase { // } public void testConnectionErrors() throws ActionExecutorException { - String baseDir = getTestCaseDir(); - Path appPath = new Path(getNameNodeUri(), baseDir); - - XConfiguration protoConf = new XConfiguration(); - protoConf.setStrings(WorkflowAppService.HADOOP_USER, getTestUser()); - - - XConfiguration wfConf = new XConfiguration(); - wfConf.set(OozieClient.APP_PATH, appPath.toString()); - - WorkflowJobBean workflow = new WorkflowJobBean(); - workflow.setConf(wfConf.toXmlString()); - workflow.setAppPath(wfConf.get(OozieClient.APP_PATH)); - workflow.setProtoActionConf(protoConf.toXmlString()); - workflow.setId(Services.get().get(UUIDService.class).generateId(ApplicationType.WORKFLOW)); + WorkflowJobBean workflow = createBaseWorkflowJobBean(); final WorkflowActionBean action = new WorkflowActionBean(); action.setId("actionId"); @@ -401,27 +378,14 @@ public class TestSshActionExecutor extends XFsTestCase { } public void testSpaceInArgs() throws Exception { - String baseDir = getTestCaseDir(); - Path appPath = new Path(getNameNodeUri(), baseDir); + WorkflowJobBean workflow = createBaseWorkflowJobBean(); - Path script = new Path(baseDir, "script.sh"); + Path script = new Path(getTestCaseDir(), "script.sh"); FileSystem fs = FileSystem.getLocal(createJobConf()); Writer w = new OutputStreamWriter(fs.create(script)); w.write(ECHO_ARGS_SCRIPT); w.close(); - XConfiguration protoConf = new XConfiguration(); - protoConf.setStrings(WorkflowAppService.HADOOP_USER, getTestUser()); - - XConfiguration wfConf = new XConfiguration(); - wfConf.set(OozieClient.APP_PATH, appPath.toString()); - - WorkflowJobBean workflow = new WorkflowJobBean(); - workflow.setConf(wfConf.toXmlString()); - workflow.setAppPath(wfConf.get(OozieClient.APP_PATH)); - workflow.setProtoActionConf(protoConf.toXmlString()); - workflow.setId(Services.get().get(UUIDService.class).generateId(ApplicationType.WORKFLOW)); - final WorkflowActionBean action = new WorkflowActionBean(); action.setId("actionId"); action.setConf("<ssh xmlns='" + getActionXMLSchema() + "'>" + @@ -458,27 +422,14 @@ public class TestSshActionExecutor extends XFsTestCase { } public void testSpaceInArg() throws Exception { - String baseDir = getTestCaseDir(); - Path appPath = new Path(getNameNodeUri(), baseDir); + WorkflowJobBean workflow = createBaseWorkflowJobBean(); - Path script = new Path(baseDir, "script.sh"); + Path script = new Path(getTestCaseDir(), "script.sh"); FileSystem fs = FileSystem.getLocal(createJobConf()); Writer w = new OutputStreamWriter(fs.create(script)); w.write(ECHO_ARGS_SCRIPT); w.close(); - XConfiguration protoConf = new XConfiguration(); - protoConf.setStrings(WorkflowAppService.HADOOP_USER, getTestUser()); - - XConfiguration wfConf = new XConfiguration(); - wfConf.set(OozieClient.APP_PATH, appPath.toString()); - - WorkflowJobBean workflow = new WorkflowJobBean(); - workflow.setConf(wfConf.toXmlString()); - workflow.setAppPath(wfConf.get(OozieClient.APP_PATH)); - workflow.setProtoActionConf(protoConf.toXmlString()); - workflow.setId(Services.get().get(UUIDService.class).generateId(ApplicationType.WORKFLOW)); - final WorkflowActionBean action = new WorkflowActionBean(); action.setId("actionId"); action.setConf("<ssh xmlns='" + getActionXMLSchema() + "'>" + @@ -512,27 +463,14 @@ public class TestSshActionExecutor extends XFsTestCase { } public void testNoArgsNorArg() throws Exception { - String baseDir = getTestCaseDir(); - Path appPath = new Path(getNameNodeUri(), baseDir); + WorkflowJobBean workflow = createBaseWorkflowJobBean(); - Path script = new Path(baseDir, "script.sh"); + Path script = new Path(getTestCaseDir(), "script.sh"); FileSystem fs = FileSystem.getLocal(createJobConf()); Writer w = new OutputStreamWriter(fs.create(script)); w.write("echo \"prop1=something\""); w.close(); - XConfiguration protoConf = new XConfiguration(); - protoConf.setStrings(WorkflowAppService.HADOOP_USER, getTestUser()); - - XConfiguration wfConf = new XConfiguration(); - wfConf.set(OozieClient.APP_PATH, appPath.toString()); - - WorkflowJobBean workflow = new WorkflowJobBean(); - workflow.setConf(wfConf.toXmlString()); - workflow.setAppPath(wfConf.get(OozieClient.APP_PATH)); - workflow.setProtoActionConf(protoConf.toXmlString()); - workflow.setId(Services.get().get(UUIDService.class).generateId(ApplicationType.WORKFLOW)); - final WorkflowActionBean action = new WorkflowActionBean(); action.setId("actionId"); action.setConf("<ssh xmlns='" + getActionXMLSchema() + "'>" + @@ -558,24 +496,50 @@ public class TestSshActionExecutor extends XFsTestCase { } /** - * test {@code SshActionExecutor.check()} method with invalid - * xml configuration + * test {@code SshActionExecutor.check()} method with host connection + * failure. */ - public void testSshCheckWithInvalidXml() throws Exception { - String baseDir = getTestCaseDir(); - Path appPath = new Path(getNameNodeUri(), baseDir); + public void testSshCheckWithConnectionError() throws Exception { + WorkflowJobBean workflow = createBaseWorkflowJobBean(); - XConfiguration protoConf = new XConfiguration(); - protoConf.setStrings(WorkflowAppService.HADOOP_USER, getTestUser()); + final WorkflowActionBean action = new WorkflowActionBean(); + action.setId("actionId"); + action.setConf("<ssh xmlns='" + getActionXMLSchema() + "'>" + + "<host>localhost</host>" + + "<command>echo</command>" + + "<args>\"prop1=something\"</args>" + + "</ssh>"); + action.setName("ssh"); + final SshActionExecutor ssh = new SshActionExecutor(); + final Context context = new Context(workflow, action); + ssh.start(context, action); - XConfiguration wfConf = new XConfiguration(); - wfConf.set(OozieClient.APP_PATH, appPath.toString()); + String originTrackerUri = action.getTrackerUri(); + action.setTrackerUri("dummy@dummyHost"); + try { + ssh.check(context, action); + fail("testCheckConnectionError expected ex error"); + } catch (ActionExecutorException e) { + assertEquals(SshActionExecutor.ERR_COULD_NOT_CONNECT, e.getErrorCode()); + } - WorkflowJobBean workflow = new WorkflowJobBean(); - workflow.setConf(wfConf.toXmlString()); - workflow.setAppPath(wfConf.get(OozieClient.APP_PATH)); - workflow.setProtoActionConf(protoConf.toXmlString()); - workflow.setId(Services.get().get(UUIDService.class).generateId(ApplicationType.WORKFLOW)); + action.setTrackerUri(originTrackerUri); + waitFor(30 * 1000, new Predicate() { + public boolean evaluate() throws Exception { + ssh.check(context, action); + return Status.DONE == action.getStatus(); + } + }); + ssh.end(context, action); + assertEquals(Status.OK, action.getStatus()); + } + + /** + * test {@code SshActionExecutor.check()} method with invalid + * xml configuration + */ + public void testSshCheckWithInvalidXml() throws Exception { + WorkflowJobBean workflow = createBaseWorkflowJobBean(); WorkflowActionBean action = new WorkflowActionBean(); action.setId("actionId"); @@ -599,19 +563,7 @@ public class TestSshActionExecutor extends XFsTestCase { * xml configuration */ public void testSshStartWithInvalidXml() throws Exception { - String baseDir = getTestCaseDir(); - Path appPath = new Path(getNameNodeUri(), baseDir); - XConfiguration protoConf = new XConfiguration(); - protoConf.setStrings(WorkflowAppService.HADOOP_USER, getTestUser()); - - XConfiguration wfConf = new XConfiguration(); - wfConf.set(OozieClient.APP_PATH, appPath.toString()); - - WorkflowJobBean workflow = new WorkflowJobBean(); - workflow.setConf(wfConf.toXmlString()); - workflow.setAppPath(wfConf.get(OozieClient.APP_PATH)); - workflow.setProtoActionConf(protoConf.toXmlString()); - workflow.setId(Services.get().get(UUIDService.class).generateId(ApplicationType.WORKFLOW)); + WorkflowJobBean workflow = createBaseWorkflowJobBean(); WorkflowActionBean action = new WorkflowActionBean(); action.setId("actionId"); @@ -632,19 +584,7 @@ public class TestSshActionExecutor extends XFsTestCase { * sequence call */ public void testJobStartAndKill() throws Exception { - String baseDir = getTestCaseDir(); - Path appPath = new Path(getNameNodeUri(), baseDir); - XConfiguration protoConf = new XConfiguration(); - protoConf.setStrings(WorkflowAppService.HADOOP_USER, getTestUser()); - - XConfiguration wfConf = new XConfiguration(); - wfConf.set(OozieClient.APP_PATH, appPath.toString()); - - WorkflowJobBean workflow = new WorkflowJobBean(); - workflow.setConf(wfConf.toXmlString()); - workflow.setAppPath(wfConf.get(OozieClient.APP_PATH)); - workflow.setProtoActionConf(protoConf.toXmlString()); - workflow.setId(Services.get().get(UUIDService.class).generateId(ApplicationType.WORKFLOW)); + WorkflowJobBean workflow = createBaseWorkflowJobBean(); WorkflowActionBean action = new WorkflowActionBean(); action.setId("actionId"); http://git-wip-us.apache.org/repos/asf/oozie/blob/2fd0b66b/docs/src/site/twiki/DG_SshActionExtension.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/DG_SshActionExtension.twiki b/docs/src/site/twiki/DG_SshActionExtension.twiki index 99b96f7..5a51d49 100644 --- a/docs/src/site/twiki/DG_SshActionExtension.twiki +++ b/docs/src/site/twiki/DG_SshActionExtension.twiki @@ -27,6 +27,11 @@ command must follow the following requirements: Note: Ssh Action will fail if any output is written to standard error / output upon login (e.g. .bashrc of the remote user contains ls -a). +Note: Ssh Action will fail if oozie fails to ssh connect to host for action status check +(e.g., the host is under heavy load, or network is bad) after a configurable number (3 by default) of retries. +The first retry will wait a configurable period of time ( 3 seconds by default) before check. +The following retries will wait 2 times of previous wait time. + *Syntax:* <verbatim> http://git-wip-us.apache.org/repos/asf/oozie/blob/2fd0b66b/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 21dfe34..d07d459 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 5.1.0 release (trunk - unreleased) +OOZIE-3156 Retry SSH action check when cannot connect to remote host (txsing via andras.piros) OOZIE-3227 Eliminate duplicate dependencies when using Hadoop 3 DistributedCache (dionusos via andras.piros) OOZIE-2097 Get rid of non-Javadoc comments (Jan Hentschel via andras.piros) OOZIE-3269 Flaky tests in TestCoordMaterializeTriggerService class (pbacsko via andras.piros)