Repository: oozie Updated Branches: refs/heads/master 48c1128d3 -> 0c90b8c36
OOZIE-2126 SSH action can be too fast for Oozie sometimes (rkanter) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/0c90b8c3 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/0c90b8c3 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/0c90b8c3 Branch: refs/heads/master Commit: 0c90b8c3629ffbf98fdafa5081979a7d42aaf056 Parents: 48c1128 Author: Robert Kanter <[email protected]> Authored: Mon Mar 9 14:44:12 2015 -0700 Committer: Robert Kanter <[email protected]> Committed: Mon Mar 9 14:44:12 2015 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/oozie/ErrorCode.java | 2 + .../command/wf/CompletedActionXCommand.java | 38 +++- .../apache/oozie/service/CallbackService.java | 7 + core/src/main/resources/oozie-default.xml | 9 + .../command/wf/TestCompletedActionXCommand.java | 200 +++++++++++++++++++ .../oozie/service/TestConfigurationService.java | 1 + release-log.txt | 1 + 7 files changed, 251 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/0c90b8c3/core/src/main/java/org/apache/oozie/ErrorCode.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/ErrorCode.java b/core/src/main/java/org/apache/oozie/ErrorCode.java index 7630c2f..2fd2e99 100644 --- a/core/src/main/java/org/apache/oozie/ErrorCode.java +++ b/core/src/main/java/org/apache/oozie/ErrorCode.java @@ -174,6 +174,8 @@ public enum ErrorCode { E0818(XLog.STD, "Action [{0}] status is running but WF Job [{1}] status is [{2}]. Expected status is RUNNING or SUSPENDED."), E0819(XLog.STD, "Unable to delete the temp dir of job WF Job [{0}]."), E0820(XLog.STD, "Action user retry max [{0}] is over system defined max [{1}], re-assign to use system max."), + E0821(XLog.STD, "Received early callback for action still in PREP state; will wait [{0}]ms and requeue up to [{1}] more times"), + E0822(XLog.STD, "Received early callback for action [{0}] while still in PREP state and exhausted all requeues"), E0900(XLog.OPS, "JobTracker [{0}] not allowed, not in Oozie's whitelist. Allowed values are: {1}"), E0901(XLog.OPS, "NameNode [{0}] not allowed, not in Oozie's whitelist. Allowed values are: {1}"), http://git-wip-us.apache.org/repos/asf/oozie/blob/0c90b8c3/core/src/main/java/org/apache/oozie/command/wf/CompletedActionXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/CompletedActionXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/CompletedActionXCommand.java index b1226cc..bc39bce 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/CompletedActionXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/CompletedActionXCommand.java @@ -28,6 +28,7 @@ import org.apache.oozie.command.PreconditionException; import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor; import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery; import org.apache.oozie.service.ActionService; +import org.apache.oozie.service.CallbackService; import org.apache.oozie.service.Services; import org.apache.oozie.util.LogUtils; import org.apache.oozie.util.ParamChecker; @@ -39,11 +40,18 @@ public class CompletedActionXCommand extends WorkflowXCommand<Void> { private final String actionId; private final String externalStatus; private WorkflowActionBean wfactionBean; + private int earlyRequeueCount; - public CompletedActionXCommand(String actionId, String externalStatus, Properties actionData, int priority) { + public CompletedActionXCommand(String actionId, String externalStatus, Properties actionData, int priority, + int earlyRequeueCount) { super("callback", "callback", priority); this.actionId = ParamChecker.notEmpty(actionId, "actionId"); this.externalStatus = ParamChecker.notEmpty(externalStatus, "externalStatus"); + this.earlyRequeueCount = earlyRequeueCount; + } + + public CompletedActionXCommand(String actionId, String externalStatus, Properties actionData, int priority) { + this(actionId, externalStatus, actionData, 1, 0); } public CompletedActionXCommand(String actionId, String externalStatus, Properties actionData) { @@ -79,7 +87,8 @@ public class CompletedActionXCommand extends WorkflowXCommand<Void> { */ @Override protected void eagerVerifyPrecondition() throws CommandException, PreconditionException { - if (this.wfactionBean.getStatus() != WorkflowActionBean.Status.RUNNING) { + if (this.wfactionBean.getStatus() != WorkflowActionBean.Status.RUNNING + && this.wfactionBean.getStatus() != WorkflowActionBean.Status.PREP) { throw new CommandException(ErrorCode.E0800, actionId, this.wfactionBean.getStatus()); } } @@ -91,11 +100,26 @@ public class CompletedActionXCommand extends WorkflowXCommand<Void> { */ @Override protected Void execute() throws CommandException { - ActionExecutor executor = Services.get().get(ActionService.class).getExecutor(this.wfactionBean.getType()); - // this is done because oozie notifications (of sub-wfs) is send - // every status change, not only on completion. - if (executor.isCompleted(externalStatus)) { - queue(new ActionCheckXCommand(this.wfactionBean.getId(), getPriority(), -1)); + // If the action is still in PREP, we probably received a callback before Oozie was able to update from PREP to RUNNING; + // we'll requeue this command a few times and hope that it switches to RUNNING before giving up + if (this.wfactionBean.getStatus() == WorkflowActionBean.Status.PREP) { + int maxEarlyRequeueCount = Services.get().get(CallbackService.class).getEarlyRequeueMaxRetries(); + if (this.earlyRequeueCount < maxEarlyRequeueCount) { + long delay = getRequeueDelay(); + LOG.warn("Received early callback for action still in PREP state; will wait [{0}]ms and requeue up to [{1}] more" + + " times", delay, (maxEarlyRequeueCount - earlyRequeueCount)); + queue(new CompletedActionXCommand(this.actionId, this.externalStatus, null, this.getPriority(), + this.earlyRequeueCount + 1), delay); + } else { + throw new CommandException(ErrorCode.E0822, actionId); + } + } else { // RUNNING + ActionExecutor executor = Services.get().get(ActionService.class).getExecutor(this.wfactionBean.getType()); + // this is done because oozie notifications (of sub-wfs) is send + // every status change, not only on completion. + if (executor.isCompleted(externalStatus)) { + queue(new ActionCheckXCommand(this.wfactionBean.getId(), getPriority(), -1)); + } } return null; } http://git-wip-us.apache.org/repos/asf/oozie/blob/0c90b8c3/core/src/main/java/org/apache/oozie/service/CallbackService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/CallbackService.java b/core/src/main/java/org/apache/oozie/service/CallbackService.java index 7fa07f1..405701d 100644 --- a/core/src/main/java/org/apache/oozie/service/CallbackService.java +++ b/core/src/main/java/org/apache/oozie/service/CallbackService.java @@ -36,7 +36,10 @@ public class CallbackService implements Service { public static final String CONF_BASE_URL = CONF_PREFIX + "base.url"; + public static final String CONF_EARLY_REQUEUE_MAX_RETRIES = CONF_PREFIX + "early.requeue.max.retries"; + private Configuration oozieConf; + private int earlyRequeueMaxRetries; /** * Initialize the service. @@ -45,6 +48,7 @@ public class CallbackService implements Service { */ public void init(Services services) { oozieConf = services.getConf(); + earlyRequeueMaxRetries = ConfigurationService.getInt(CONF_EARLY_REQUEUE_MAX_RETRIES); } /** @@ -132,4 +136,7 @@ public class CallbackService implements Service { } } + public int getEarlyRequeueMaxRetries() { + return earlyRequeueMaxRetries; + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/0c90b8c3/core/src/main/resources/oozie-default.xml ---------------------------------------------------------------------- diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml index 9843330..cb65502 100644 --- a/core/src/main/resources/oozie-default.xml +++ b/core/src/main/resources/oozie-default.xml @@ -1518,6 +1518,15 @@ </description> </property> + <property> + <name>oozie.service.CallbackService.early.requeue.max.retries</name> + <value>5</value> + <description> + If Oozie receives a callback too early (while the action is in PREP state), it will requeue the command this many times + to give the action time to transition to RUNNING. + </description> + </property> + <!-- CallbackServlet --> <property> http://git-wip-us.apache.org/repos/asf/oozie/blob/0c90b8c3/core/src/test/java/org/apache/oozie/command/wf/TestCompletedActionXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestCompletedActionXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestCompletedActionXCommand.java new file mode 100644 index 0000000..a4f0e83 --- /dev/null +++ b/core/src/test/java/org/apache/oozie/command/wf/TestCompletedActionXCommand.java @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.command.wf; + +import org.apache.oozie.WorkflowActionBean; +import org.apache.oozie.WorkflowJobBean; +import org.apache.oozie.client.WorkflowAction; +import org.apache.oozie.client.WorkflowJob; +import org.apache.oozie.command.XCommand; +import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor; +import org.apache.oozie.service.InstrumentationService; +import org.apache.oozie.service.Services; +import org.apache.oozie.test.XDataTestCase; +import org.apache.oozie.util.Instrumentation; +import org.apache.oozie.workflow.WorkflowInstance; + +public class TestCompletedActionXCommand extends XDataTestCase { + private Services services; + + @Override + protected void setUp() throws Exception { + super.setUp(); + services = new Services(); + services.init(); + } + + @Override + protected void tearDown() throws Exception { + services.destroy(); + super.tearDown(); + } + + public void testEarlyCallbackTimeout() throws Exception { + final Instrumentation inst = Services.get().get(InstrumentationService.class).get(); + + WorkflowJobBean job = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING); + WorkflowActionBean action = addRecordToWfActionTable(job.getId(), "1", WorkflowAction.Status.PREP); + final CompletedActionXCommand cmd = new CompletedActionXCommand(action.getId(), "SUCCEEDED", null); + + long xexceptionCount; + try { + xexceptionCount = inst.getCounters().get(XCommand.INSTRUMENTATION_GROUP).get(cmd.getName() + + ".xexceptions").getValue(); + } catch (NullPointerException npe){ + //counter might be null + xexceptionCount = 0L; + } + assertEquals(0L, xexceptionCount); + + long executionsCount; + try { + executionsCount = inst.getCounters().get(XCommand.INSTRUMENTATION_GROUP).get(cmd.getName() + + ".executions").getValue(); + } catch (NullPointerException npe){ + //counter might be null + executionsCount = 0L; + } + assertEquals(0L, executionsCount); + + long executionCount; + try { + executionCount = inst.getCounters().get(XCommand.INSTRUMENTATION_GROUP).get(cmd.getName() + + ".execution").getValue(); + } catch (NullPointerException npe){ + //counter might be null + executionCount = 0L; + } + assertEquals(0L, executionCount); + + cmd.call(); + int timeout = 10000 * 5 * 2; + waitFor(timeout, new Predicate() { + @Override + public boolean evaluate() throws Exception { + long xexceptionCount; + try { + xexceptionCount = inst.getCounters().get(XCommand.INSTRUMENTATION_GROUP).get(cmd.getName() + + ".xexceptions").getValue(); + } catch(NullPointerException npe) { + //counter might be null + xexceptionCount = 0L; + } + return (xexceptionCount == 1L); + } + }); + executionsCount = inst.getCounters().get(XCommand.INSTRUMENTATION_GROUP).get(cmd.getName() + + ".executions").getValue(); + assertEquals(6L, executionsCount); + try { + executionCount = inst.getCounters().get(XCommand.INSTRUMENTATION_GROUP).get(cmd.getName() + + ".execution").getValue(); + } catch (NullPointerException npe){ + //counter might be null + executionCount = 0L; + } + assertEquals(0L, executionCount); + xexceptionCount = inst.getCounters().get(XCommand.INSTRUMENTATION_GROUP).get(cmd.getName() + + ".xexceptions").getValue(); + assertEquals(1L, xexceptionCount); + } + + public void testEarlyCallbackTransitionToRunning() throws Exception { + final Instrumentation inst = Services.get().get(InstrumentationService.class).get(); + + WorkflowJobBean job = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING); + final WorkflowActionBean action = addRecordToWfActionTable(job.getId(), "1", WorkflowAction.Status.PREP); + final CompletedActionXCommand cmd = new CompletedActionXCommand(action.getId(), "SUCCEEDED", null); + + long xexceptionCount; + try { + xexceptionCount = inst.getCounters().get(XCommand.INSTRUMENTATION_GROUP).get(cmd.getName() + + ".xexceptions").getValue(); + } catch (NullPointerException npe){ + //counter might be null + xexceptionCount = 0L; + } + assertEquals(0L, xexceptionCount); + + long executionsCount; + try { + executionsCount = inst.getCounters().get(XCommand.INSTRUMENTATION_GROUP).get(cmd.getName() + + ".executions").getValue(); + } catch (NullPointerException npe){ + //counter might be null + executionsCount = 0L; + } + assertEquals(0L, executionsCount); + + long checkXCommandExecutionsCount; + try { + checkXCommandExecutionsCount = inst.getCounters().get(XCommand.INSTRUMENTATION_GROUP).get( + "action.check.executions").getValue(); + } catch (NullPointerException npe){ + //counter might be null + checkXCommandExecutionsCount = 0L; + } + assertEquals(0L, checkXCommandExecutionsCount); + + cmd.call(); + int timeout = 100000 * 5 * 2; + waitFor(timeout, new Predicate() { + @Override + public boolean evaluate() throws Exception { + long executionsCount; + try { + executionsCount = inst.getCounters().get(XCommand.INSTRUMENTATION_GROUP).get(cmd.getName() + + ".executions").getValue(); + } catch (NullPointerException npe){ + //counter might be null + executionsCount = 0L; + } + if (executionsCount == 3 && !action.getStatus().equals(WorkflowAction.Status.RUNNING)) { + // Transition the action to RUNNING + action.setStatus(WorkflowAction.Status.RUNNING); + WorkflowActionQueryExecutor.getInstance().executeUpdate( + WorkflowActionQueryExecutor.WorkflowActionQuery.UPDATE_ACTION, action); + } + long checkXCommandExecutionsCount; + try { + checkXCommandExecutionsCount = inst.getCounters().get(XCommand.INSTRUMENTATION_GROUP).get( + "action.check.executions").getValue(); + } catch (NullPointerException npe){ + //counter might be null + checkXCommandExecutionsCount = 0L; + } + return (checkXCommandExecutionsCount == 1L); + } + }); + executionsCount = inst.getCounters().get(XCommand.INSTRUMENTATION_GROUP).get(cmd.getName() + + ".executions").getValue(); + assertTrue("expected a value greater than 3L, but found " + executionsCount, executionsCount >= 3L); + checkXCommandExecutionsCount = inst.getCounters().get(XCommand.INSTRUMENTATION_GROUP).get( + "action.check.executions").getValue(); + assertEquals(1L, checkXCommandExecutionsCount); + try { + xexceptionCount = inst.getCounters().get(XCommand.INSTRUMENTATION_GROUP).get(cmd.getName() + + ".xexceptions").getValue(); + } catch (NullPointerException npe){ + //counter might be null + xexceptionCount = 0L; + } + assertEquals(0L, xexceptionCount); + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/0c90b8c3/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java b/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java index b1dde2c..ddb3d58 100644 --- a/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java +++ b/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java @@ -174,6 +174,7 @@ public class TestConfigurationService extends XTestCase { assertEquals(ConfigUtils.STRING_DEFAULT, ConfigurationService.get(testConf, "test.nonexist")); assertEquals("http://localhost:8080/oozie/callback", ConfigurationService.get(CallbackService.CONF_BASE_URL)); + assertEquals(5, ConfigurationService.getInt(CallbackService.CONF_EARLY_REQUEUE_MAX_RETRIES)); assertEquals("gz", ConfigurationService.get(CodecFactory.COMPRESSION_OUTPUT_CODEC)); assertEquals(4096, ConfigurationService.getInt(XLogStreamingService.STREAM_BUFFER_LEN)); assertEquals(10000, ConfigurationService.getLong(JvmPauseMonitorService.WARN_THRESHOLD_KEY)); http://git-wip-us.apache.org/repos/asf/oozie/blob/0c90b8c3/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 1e6e101..6ab5737 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.2.0 release (trunk - unreleased) +OOZIE-2126 SSH action can be too fast for Oozie sometimes (rkanter) OOZIE-2142 Changing the JT whitelist causes running Workflows to stay RUNNING forever (rkanter) OOZIE-2164 make master parameterizable in Spark action example (wypoon via rkanter) OOZIE-2155 Incorrect DST Shifts are occurring based on the Database timezone (rkanter)
