This is an automated email from the ASF dual-hosted git repository.

dionusos pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/oozie.git


The following commit(s) were added to refs/heads/master by this push:
     new 603900735 OOZIE-3715 Fix fork out more than one transitions submit , 
one transition submit fail can't execute KillXCommand (chenhd via dionusos)
603900735 is described below

commit 603900735d8682bad3d0d1a62f7ca1db9fa1d2b3
Author: Denes Bodo <[email protected]>
AuthorDate: Mon Jul 10 18:50:09 2023 +0200

    OOZIE-3715 Fix fork out more than one transitions submit , one transition 
submit fail can't execute KillXCommand (chenhd via dionusos)
---
 .../apache/oozie/command/wf/SignalXCommand.java    |   3 +
 .../org/apache/oozie/ForTestingActionExecutor.java |   3 +
 .../oozie/command/wf/TestSignalXCommand.java       | 144 +++++++++++++++++++++
 release-log.txt                                    |   1 +
 4 files changed, 151 insertions(+)

diff --git a/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java 
b/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
index 37f55f820..67e449c4d 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
@@ -549,6 +549,9 @@ public class SignalXCommand extends WorkflowXCommand<Void> {
                 if (context.getJobStatus() != null && 
context.getJobStatus().equals(Job.Status.FAILED)) {
                     LOG.warn("Action has failed, failing job" + 
context.getAction().getId());
                     new ActionStartXCommand(context.getAction().getId(), 
null).failJob(context);
+                    // Fork out more than one transitions, one should be 
transitions,
+                    // one submit fail can't execute KillXCommand
+                    queue(new KillXCommand(context.getWorkflow().getId()));
                     updateList.add(new 
UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START,
                             (WorkflowActionBean) context.getAction()));
                     if (context.isShouldEndWF()) {
diff --git a/core/src/test/java/org/apache/oozie/ForTestingActionExecutor.java 
b/core/src/test/java/org/apache/oozie/ForTestingActionExecutor.java
index 2479d5566..21a313df6 100644
--- a/core/src/test/java/org/apache/oozie/ForTestingActionExecutor.java
+++ b/core/src/test/java/org/apache/oozie/ForTestingActionExecutor.java
@@ -59,6 +59,9 @@ public class ForTestingActionExecutor extends ActionExecutor {
         if ("start.error".equals(error)) {
             throw new 
ActionExecutorException(ActionExecutorException.ErrorType.ERROR, TEST_ERROR, 
"start");
         }
+        if ("start.fail".equals(error)) {
+            throw new 
ActionExecutorException(ActionExecutorException.ErrorType.FAILED, TEST_ERROR, 
"start");
+        }
         String externalStatus = eConf.getChild("external-status", 
ns).getText().trim();
         Element externalChildIds = eConf.getChild("external-childIds", ns);
 
diff --git 
a/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java 
b/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java
index bff2a0317..9ffe2d543 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java
@@ -28,6 +28,7 @@ import java.io.Reader;
 import java.io.Writer;
 import java.net.URI;
 import java.nio.charset.StandardCharsets;
+import java.util.List;
 import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
@@ -39,21 +40,31 @@ import org.apache.log4j.Logger;
 import org.apache.log4j.SimpleLayout;
 import org.apache.log4j.WriterAppender;
 import org.apache.oozie.DagEngine;
+import org.apache.oozie.ForTestingActionExecutor;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobsGetForPurgeJPAExecutor;
 import org.apache.oozie.local.LocalOozie;
 import org.apache.oozie.service.CallableQueueService;
 import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.service.ExtendedCallableQueueService;
 import org.apache.oozie.service.Services;
+import org.apache.oozie.service.SchemaService;
+import org.apache.oozie.service.LiteWorkflowStoreService;
+import org.apache.oozie.service.ActionService;
+import org.apache.oozie.service.JPAService;
 import org.apache.oozie.test.XDataTestCase;
 import org.apache.oozie.util.IOUtils;
 import org.apache.oozie.util.XConfiguration;
 import org.apache.oozie.workflow.lite.LiteWorkflowAppParser;
 
+import javax.persistence.EntityManager;
+
 public class TestSignalXCommand extends XDataTestCase {
 
     private Services services;
@@ -410,4 +421,137 @@ public class TestSignalXCommand extends XDataTestCase {
                         .getStatus(),
                 WorkflowJob.Status.SUCCEEDED);
     }
+
+    /**
+     * Test : fork parallel submit, one transition fail, and the job is failed 
but the other transition
+     *        always RUNNING or PREP.
+     *        verify the PreconditionException is thrown when action2 = 
RUNNING or PREP and job = FAIL
+     *
+     */
+    public void testForkParallelSubmitFail() throws Exception {
+        _testForkSubmitRunFail(true);
+    }
+
+    /**
+     * Test : fork serial submit, one transition fail, and the job is failed 
but the other transition
+     *        always RUNNING or PREP.
+     *        verify the PreconditionException is thrown when action2 = 
RUNNING or PREP and job = FAIL
+     *
+     */
+    public void testForkSerialSubmitFail() throws Exception {
+        _testForkSubmitRunFail(false);
+    }
+
+    /**
+     * Test : fork parallel submit, one transition fail, and the job is failed 
but the other transition
+     *        always RUNNING or PREP.
+     *        verify the PreconditionException is thrown when action2 = 
RUNNING or PREP and job = FAIL
+     *
+     * @param isForkParallelSubmit ("ture" or "fail")
+     */
+    private void _testForkSubmitRunFail(boolean isForkParallelSubmit) throws 
Exception {
+        services.destroy();
+        setSystemProperty(SchemaService.WF_CONF_EXT_SCHEMAS, 
"wf-ext-schema.xsd");
+        
setSystemProperty(LiteWorkflowStoreService.CONF_USER_RETRY_ERROR_CODE_EXT, 
ForTestingActionExecutor.TEST_ERROR);
+        services = new Services();
+        services.init();
+        
services.get(ActionService.class).registerAndInitExecutor(ForTestingActionExecutor.class);
+        
ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, 
isForkParallelSubmit);
+
+        String workflowUri = getTestCaseFileUri("workflow.xml");
+        //@formatter:off
+        String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:1.0\" 
name=\"wf-fork-submit\">\n" +
+                "    <start to=\"fork1\"/>\n" +
+                "     <fork name=\"fork1\">\n" +
+                "        <path start=\"action_to_be_failed\"/>\n" +
+                "        <path start=\"action_to_be_succeeded_or_killed\"/>\n" 
+
+                "    </fork>\n" +
+                "    <action name=\"action_to_be_failed\">\n" +
+                "        <test xmlns=\"uri:test\">\n" +
+                "            
<signal-value>${wf:conf('signal-value')}</signal-value>\n" +
+                "            
<external-status>${wf:conf('external-status')}</external-status>\n" +
+                "            <error>${wf:conf('error')}</error>\n" +
+                "            
<avoid-set-execution-data>${wf:conf('avoid-set-execution-data')}</avoid-set-execution-data>\n"
 +
+                "            
<avoid-set-end-data>${wf:conf('avoid-set-end-data')}</avoid-set-end-data>\n" +
+                "            
<running-mode>${wf:conf('running-mode')}</running-mode>\n" +
+                "        </test>\n" +
+                "        <ok to=\"join1\"/>\n" +
+                "        <error to=\"kill\"/>\n" +
+                "    </action>\n" +
+                "    <action name=\"action_to_be_succeeded_or_killed\">\n" +
+                "        <test xmlns=\"uri:test\">\n" +
+                "            
<signal-value>based_on_action_status</signal-value>\n" +
+                "            <external-status>ok</external-status>\n" +
+                "            <error>ok</error>\n" +
+                "            
<avoid-set-execution-data>true</avoid-set-execution-data>\n" +
+                "            <avoid-set-end-data>false</avoid-set-end-data>\n" 
+
+                "            <running-mode>async</running-mode>\n" +
+                "        </test>\n" +
+                "        <ok to=\"join1\"/>\n" +
+                "        <error to=\"kill\"/>\n" +
+                "    </action>\n" +
+                "    <join name=\"join1\" to=\"end\"/>\n" +
+                "    <kill name=\"kill\">\n" +
+                "        <message>killed</message>\n" +
+                "    </kill>\n" +
+                "    <end name=\"end\"/>\n" +
+                "</workflow-app>";
+        //@Formatter:on
+        writeToFile(appXml, workflowUri);
+
+        final DagEngine engine = new DagEngine("u");
+
+        Configuration conf = new Configuration();
+        conf.set(OozieClient.APP_PATH, workflowUri);
+        conf.set(OozieClient.USER_NAME, getTestUser());
+        conf.set(OozieClient.LOG_TOKEN, "t");
+        conf.set("error", "start.fail");
+        conf.set("external-status", "error");
+        conf.set("signal-value", "based_on_action_status");
+
+        final String jobId = engine.submitJob(conf, true);
+        final WorkflowActionsGetForJobJPAExecutor actionsGetExecutor = new 
WorkflowActionsGetForJobJPAExecutor(jobId);
+        final JPAService jpaService = Services.get().get(JPAService.class);
+
+        waitFor(30 * 1000, new Predicate() {
+            public boolean evaluate() throws Exception {
+                return 
WorkflowJob.Status.FAILED.equals(engine.getJob(jobId).getStatus());
+            }
+        });
+
+        // wait for execute KillXCommand, and all actions has finished
+        waitFor(30 * 1000, new Predicate() {
+            public boolean evaluate() throws Exception {
+                List<WorkflowActionBean> actions = 
jpaService.execute(actionsGetExecutor);
+                for (WorkflowActionBean action : actions) {
+                    if (WorkflowAction.Status.PREP.equals(action.getStatus()) 
||
+                            
WorkflowAction.Status.RUNNING.equals(action.getStatus()) ){
+                        return false;
+                    }
+                }
+                return true;
+            }
+        });
+
+        List<WorkflowActionBean> actions = 
jpaService.execute(actionsGetExecutor);
+        assertEquals("action size [" + actions.size() + "] had incorrect", 4, 
actions.size());
+
+        for (WorkflowActionBean action : actions) {
+            if ("action_to_be_failed".equals(action.getName())){
+                assertEquals("action [" + action.getName() + "] had incorrect 
status",
+                        WorkflowAction.Status.FAILED, action.getStatus());
+            }
+
+            if ("action_to_be_succeeded_or_killed".equals(action.getName())){
+                // 1.The "action_to_be_failed" action submit fail, and the 
"action_to_be_succeeded_or_killed" action
+                //   has finished, so the "action_to_be_succeeded_or_killed" 
action should be OK
+                // 2.The "action_to_be_failed" action submit fail, and the 
"action_to_be_succeeded_or_killed" action is
+                //   PREP or RUNNING, so the 
"action_to_be_succeeded_or_killed" action should be KILLED
+                if (!WorkflowAction.Status.KILLED.equals(action.getStatus()) &&
+                        !WorkflowAction.Status.OK.equals(action.getStatus())) {
+                    fail("Unexpected action [" + action.getName() + "] with 
status [" + action.getStatus() + "]");
+                }
+            }
+        }
+    }
 }
diff --git a/release-log.txt b/release-log.txt
index 776aaa8df..5e3b886dd 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.3.0 release (trunk - unreleased)
 
+OOZIE-3715 Fix fork out more than one transitions submit , one transition 
submit fail can't execute KillXCommand (chenhd via dionusos)
 OOZIE-3716 Invocation of Main class completed Message is skipped when 
LauncherSecurityManager calls system exit (khr9603 via dionusos)
 OOZIE-3695 [sharelib-hive2] Fix current SpotBugs discovered issues in Oozie's 
sharelib-hive2 module (jmakai via dionusos)
 OOZIE-3694 [sharelib-hive] Fix current SpotBugs discovered issues in Oozie's 
sharelib-hive module (jmakai via dionusos)

Reply via email to