This is an automated email from the ASF dual-hosted git repository.
dionusos pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/oozie.git
The following commit(s) were added to refs/heads/master by this push:
new 603900735 OOZIE-3715 Fix fork out more than one transitions submit ,
one transition submit fail can't execute KillXCommand (chenhd via dionusos)
603900735 is described below
commit 603900735d8682bad3d0d1a62f7ca1db9fa1d2b3
Author: Denes Bodo <[email protected]>
AuthorDate: Mon Jul 10 18:50:09 2023 +0200
OOZIE-3715 Fix fork out more than one transitions submit , one transition
submit fail can't execute KillXCommand (chenhd via dionusos)
---
.../apache/oozie/command/wf/SignalXCommand.java | 3 +
.../org/apache/oozie/ForTestingActionExecutor.java | 3 +
.../oozie/command/wf/TestSignalXCommand.java | 144 +++++++++++++++++++++
release-log.txt | 1 +
4 files changed, 151 insertions(+)
diff --git a/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
b/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
index 37f55f820..67e449c4d 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
@@ -549,6 +549,9 @@ public class SignalXCommand extends WorkflowXCommand<Void> {
if (context.getJobStatus() != null &&
context.getJobStatus().equals(Job.Status.FAILED)) {
LOG.warn("Action has failed, failing job" +
context.getAction().getId());
new ActionStartXCommand(context.getAction().getId(),
null).failJob(context);
+ // Fork out more than one transitions, one should be
transitions,
+ // one submit fail can't execute KillXCommand
+ queue(new KillXCommand(context.getWorkflow().getId()));
updateList.add(new
UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START,
(WorkflowActionBean) context.getAction()));
if (context.isShouldEndWF()) {
diff --git a/core/src/test/java/org/apache/oozie/ForTestingActionExecutor.java
b/core/src/test/java/org/apache/oozie/ForTestingActionExecutor.java
index 2479d5566..21a313df6 100644
--- a/core/src/test/java/org/apache/oozie/ForTestingActionExecutor.java
+++ b/core/src/test/java/org/apache/oozie/ForTestingActionExecutor.java
@@ -59,6 +59,9 @@ public class ForTestingActionExecutor extends ActionExecutor {
if ("start.error".equals(error)) {
throw new
ActionExecutorException(ActionExecutorException.ErrorType.ERROR, TEST_ERROR,
"start");
}
+ if ("start.fail".equals(error)) {
+ throw new
ActionExecutorException(ActionExecutorException.ErrorType.FAILED, TEST_ERROR,
"start");
+ }
String externalStatus = eConf.getChild("external-status",
ns).getText().trim();
Element externalChildIds = eConf.getChild("external-childIds", ns);
diff --git
a/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java
b/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java
index bff2a0317..9ffe2d543 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java
@@ -28,6 +28,7 @@ import java.io.Reader;
import java.io.Writer;
import java.net.URI;
import java.nio.charset.StandardCharsets;
+import java.util.List;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
@@ -39,21 +40,31 @@ import org.apache.log4j.Logger;
import org.apache.log4j.SimpleLayout;
import org.apache.log4j.WriterAppender;
import org.apache.oozie.DagEngine;
+import org.apache.oozie.ForTestingActionExecutor;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobsGetForPurgeJPAExecutor;
import org.apache.oozie.local.LocalOozie;
import org.apache.oozie.service.CallableQueueService;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.ExtendedCallableQueueService;
import org.apache.oozie.service.Services;
+import org.apache.oozie.service.SchemaService;
+import org.apache.oozie.service.LiteWorkflowStoreService;
+import org.apache.oozie.service.ActionService;
+import org.apache.oozie.service.JPAService;
import org.apache.oozie.test.XDataTestCase;
import org.apache.oozie.util.IOUtils;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.workflow.lite.LiteWorkflowAppParser;
+import javax.persistence.EntityManager;
+
public class TestSignalXCommand extends XDataTestCase {
private Services services;
@@ -410,4 +421,137 @@ public class TestSignalXCommand extends XDataTestCase {
.getStatus(),
WorkflowJob.Status.SUCCEEDED);
}
+
+ /**
+ * Test : fork parallel submit, one transition fail, and the job is failed
but the other transition
+ * always RUNNING or PREP.
+ * verify the PreconditionException is thrown when action2 =
RUNNING or PREP and job = FAIL
+ *
+ */
+ public void testForkParallelSubmitFail() throws Exception {
+ _testForkSubmitRunFail(true);
+ }
+
+ /**
+ * Test : fork serial submit, one transition fail, and the job is failed
but the other transition
+ * always RUNNING or PREP.
+ * verify the PreconditionException is thrown when action2 =
RUNNING or PREP and job = FAIL
+ *
+ */
+ public void testForkSerialSubmitFail() throws Exception {
+ _testForkSubmitRunFail(false);
+ }
+
+ /**
+ * Test : fork parallel submit, one transition fail, and the job is failed
but the other transition
+ * always RUNNING or PREP.
+ * verify the PreconditionException is thrown when action2 =
RUNNING or PREP and job = FAIL
+ *
+ * @param isForkParallelSubmit ("ture" or "fail")
+ */
+ private void _testForkSubmitRunFail(boolean isForkParallelSubmit) throws
Exception {
+ services.destroy();
+ setSystemProperty(SchemaService.WF_CONF_EXT_SCHEMAS,
"wf-ext-schema.xsd");
+
setSystemProperty(LiteWorkflowStoreService.CONF_USER_RETRY_ERROR_CODE_EXT,
ForTestingActionExecutor.TEST_ERROR);
+ services = new Services();
+ services.init();
+
services.get(ActionService.class).registerAndInitExecutor(ForTestingActionExecutor.class);
+
ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION,
isForkParallelSubmit);
+
+ String workflowUri = getTestCaseFileUri("workflow.xml");
+ //@formatter:off
+ String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:1.0\"
name=\"wf-fork-submit\">\n" +
+ " <start to=\"fork1\"/>\n" +
+ " <fork name=\"fork1\">\n" +
+ " <path start=\"action_to_be_failed\"/>\n" +
+ " <path start=\"action_to_be_succeeded_or_killed\"/>\n"
+
+ " </fork>\n" +
+ " <action name=\"action_to_be_failed\">\n" +
+ " <test xmlns=\"uri:test\">\n" +
+ "
<signal-value>${wf:conf('signal-value')}</signal-value>\n" +
+ "
<external-status>${wf:conf('external-status')}</external-status>\n" +
+ " <error>${wf:conf('error')}</error>\n" +
+ "
<avoid-set-execution-data>${wf:conf('avoid-set-execution-data')}</avoid-set-execution-data>\n"
+
+ "
<avoid-set-end-data>${wf:conf('avoid-set-end-data')}</avoid-set-end-data>\n" +
+ "
<running-mode>${wf:conf('running-mode')}</running-mode>\n" +
+ " </test>\n" +
+ " <ok to=\"join1\"/>\n" +
+ " <error to=\"kill\"/>\n" +
+ " </action>\n" +
+ " <action name=\"action_to_be_succeeded_or_killed\">\n" +
+ " <test xmlns=\"uri:test\">\n" +
+ "
<signal-value>based_on_action_status</signal-value>\n" +
+ " <external-status>ok</external-status>\n" +
+ " <error>ok</error>\n" +
+ "
<avoid-set-execution-data>true</avoid-set-execution-data>\n" +
+ " <avoid-set-end-data>false</avoid-set-end-data>\n"
+
+ " <running-mode>async</running-mode>\n" +
+ " </test>\n" +
+ " <ok to=\"join1\"/>\n" +
+ " <error to=\"kill\"/>\n" +
+ " </action>\n" +
+ " <join name=\"join1\" to=\"end\"/>\n" +
+ " <kill name=\"kill\">\n" +
+ " <message>killed</message>\n" +
+ " </kill>\n" +
+ " <end name=\"end\"/>\n" +
+ "</workflow-app>";
+ //@Formatter:on
+ writeToFile(appXml, workflowUri);
+
+ final DagEngine engine = new DagEngine("u");
+
+ Configuration conf = new Configuration();
+ conf.set(OozieClient.APP_PATH, workflowUri);
+ conf.set(OozieClient.USER_NAME, getTestUser());
+ conf.set(OozieClient.LOG_TOKEN, "t");
+ conf.set("error", "start.fail");
+ conf.set("external-status", "error");
+ conf.set("signal-value", "based_on_action_status");
+
+ final String jobId = engine.submitJob(conf, true);
+ final WorkflowActionsGetForJobJPAExecutor actionsGetExecutor = new
WorkflowActionsGetForJobJPAExecutor(jobId);
+ final JPAService jpaService = Services.get().get(JPAService.class);
+
+ waitFor(30 * 1000, new Predicate() {
+ public boolean evaluate() throws Exception {
+ return
WorkflowJob.Status.FAILED.equals(engine.getJob(jobId).getStatus());
+ }
+ });
+
+ // wait for execute KillXCommand, and all actions has finished
+ waitFor(30 * 1000, new Predicate() {
+ public boolean evaluate() throws Exception {
+ List<WorkflowActionBean> actions =
jpaService.execute(actionsGetExecutor);
+ for (WorkflowActionBean action : actions) {
+ if (WorkflowAction.Status.PREP.equals(action.getStatus())
||
+
WorkflowAction.Status.RUNNING.equals(action.getStatus()) ){
+ return false;
+ }
+ }
+ return true;
+ }
+ });
+
+ List<WorkflowActionBean> actions =
jpaService.execute(actionsGetExecutor);
+ assertEquals("action size [" + actions.size() + "] had incorrect", 4,
actions.size());
+
+ for (WorkflowActionBean action : actions) {
+ if ("action_to_be_failed".equals(action.getName())){
+ assertEquals("action [" + action.getName() + "] had incorrect
status",
+ WorkflowAction.Status.FAILED, action.getStatus());
+ }
+
+ if ("action_to_be_succeeded_or_killed".equals(action.getName())){
+ // 1.The "action_to_be_failed" action submit fail, and the
"action_to_be_succeeded_or_killed" action
+ // has finished, so the "action_to_be_succeeded_or_killed"
action should be OK
+ // 2.The "action_to_be_failed" action submit fail, and the
"action_to_be_succeeded_or_killed" action is
+ // PREP or RUNNING, so the
"action_to_be_succeeded_or_killed" action should be KILLED
+ if (!WorkflowAction.Status.KILLED.equals(action.getStatus()) &&
+ !WorkflowAction.Status.OK.equals(action.getStatus())) {
+ fail("Unexpected action [" + action.getName() + "] with
status [" + action.getStatus() + "]");
+ }
+ }
+ }
+ }
}
diff --git a/release-log.txt b/release-log.txt
index 776aaa8df..5e3b886dd 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 5.3.0 release (trunk - unreleased)
+OOZIE-3715 Fix fork out more than one transitions submit , one transition
submit fail can't execute KillXCommand (chenhd via dionusos)
OOZIE-3716 Invocation of Main class completed Message is skipped when
LauncherSecurityManager calls system exit (khr9603 via dionusos)
OOZIE-3695 [sharelib-hive2] Fix current SpotBugs discovered issues in Oozie's
sharelib-hive2 module (jmakai via dionusos)
OOZIE-3694 [sharelib-hive] Fix current SpotBugs discovered issues in Oozie's
sharelib-hive module (jmakai via dionusos)