This is an automated email from the ASF dual-hosted git repository.

dionusos pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/oozie.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d3530c  OOZIE-3646 Possible dead-lock in SignalXCommand (zuston via 
dionusos)
1d3530c is described below

commit 1d3530cb26642d460dabd373dcb99c8cf29e54a3
Author: Denes Bodo <[email protected]>
AuthorDate: Thu Feb 10 16:01:55 2022 +0100

    OOZIE-3646 Possible dead-lock in SignalXCommand (zuston via dionusos)
---
 .../apache/oozie/command/wf/SignalXCommand.java    | 55 +++++++++++--
 .../apache/oozie/service/CallableQueueService.java | 40 +++++++++
 .../oozie/command/wf/TestSignalXCommand.java       | 96 ++++++++++++++++++++++
 release-log.txt                                    |  1 +
 4 files changed, 183 insertions(+), 9 deletions(-)

diff --git a/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java 
b/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
index 22c791b..4db55cc 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
@@ -137,7 +137,8 @@ public class SignalXCommand extends WorkflowXCommand<Void> {
                 this.wfJob = 
WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, 
jobId);
                 LogUtils.setLogInfo(wfJob);
                 if (actionId != null) {
-                    this.wfAction = 
WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION_SIGNAL,
 actionId);
+                    this.wfAction = WorkflowActionQueryExecutor.getInstance()
+                            .get(WorkflowActionQuery.GET_ACTION_SIGNAL, 
actionId);
                     LogUtils.setLogInfo(wfAction);
                 }
             }
@@ -468,21 +469,57 @@ public class SignalXCommand extends 
WorkflowXCommand<Void> {
 
     public void startForkedActions(List<WorkflowActionBean> 
workflowActionBeanListForForked) throws CommandException {
 
-        List<CallableWrapper<ActionExecutorContext>> tasks = new 
ArrayList<CallableWrapper<ActionExecutorContext>>();
         List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
         List<JsonBean> insertList = new ArrayList<JsonBean>();
 
         boolean endWorkflow = false;
         boolean submitJobByQueuing = false;
-        for (WorkflowActionBean workflowActionBean : 
workflowActionBeanListForForked) {
-            LOG.debug("Starting forked actions parallely : " + 
workflowActionBean.getId());
-            tasks.add(Services.get().get(CallableQueueService.class).new 
CallableWrapper<ActionExecutorContext>(
-                    new ForkedActionStartXCommand(wfJob, 
workflowActionBean.getId(), workflowActionBean.getType()), 0));
-        }
 
         try {
-            List<Future<ActionExecutorContext>> futures = 
Services.get().get(CallableQueueService.class)
-                    .invokeAll(tasks);
+            /*
+             * The limited thread execution mechanism aims to solve the 
dead-lock when all active threads are
+             * executing the SignalXCommand's invokeAll method.
+             *
+             * Solution
+             * 1. Need to limit directly invokeAll call when the num of rest 
threads is less than the tasks
+             * 2. To obtain correct active threads number in callableQueue, 
the SignalXCommand.class lock is needed.
+             *
+             */
+            CallableQueueService callableQueueService = 
Services.get().get(CallableQueueService.class);
+            List<Future<ActionExecutorContext>> futures = new ArrayList<>();
+
+            synchronized (SignalXCommand.class) {
+                long limitedRestThreadNum =
+                        callableQueueService.getQueueThreadsNumber() - 
callableQueueService.getThreadActiveCount();
+                if (limitedRestThreadNum < 
workflowActionBeanListForForked.size()) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Limited callable queue rest threads number: 
"
+                                + limitedRestThreadNum + ", needed forked task 
size: "
+                                + workflowActionBeanListForForked.size()
+                                + ", tasks will be submitted to queue by async 
mode.");
+                    }
+                    submitJobByQueuing = true;
+                } else {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Starting forked actions parallely: " + 
workflowActionBeanListForForked);
+                    }
+                    for (WorkflowActionBean workflowActionBean : 
workflowActionBeanListForForked) {
+                        futures.add(
+                                
callableQueueService.submit(callableQueueService.new 
CallableWrapper<ActionExecutorContext>(
+                                        new ForkedActionStartXCommand(wfJob,
+                                                workflowActionBean.getId(), 
workflowActionBean.getType()), 0))
+                        );
+                    }
+
+                    long startTime = System.currentTimeMillis();
+                    callableQueueService.blockingWait(futures);
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Execution time of forked actions parallely: 
"
+                                + (System.currentTimeMillis() - startTime) / 
1000 + " sec");
+                    }
+                }
+            }
+
             for (Future<ActionExecutorContext> result : futures) {
                 if (result == null) {
                     submitJobByQueuing = true;
diff --git 
a/core/src/main/java/org/apache/oozie/service/CallableQueueService.java 
b/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
index d1222bc..f158a15 100644
--- a/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
+++ b/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
@@ -30,7 +30,9 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -114,6 +116,8 @@ public class CallableQueueService implements Service, 
Instrumentable {
 
     private int queueAwaitTerminationTimeoutSeconds;
 
+    private long queueThreadsNumber;
+
     private boolean callableBegin(XCallable<?> callable) {
         synchronized (activeCallables) {
             AtomicInteger counter = activeCallables.get(callable.getType());
@@ -584,6 +588,8 @@ public class CallableQueueService implements Service, 
Instrumentable {
         }
 
         maxCallableConcurrency = ConfigurationService.getInt(conf, 
CONF_CALLABLE_CONCURRENCY);
+
+        queueThreadsNumber = threads;
     }
 
     /**
@@ -872,4 +878,38 @@ public class CallableQueueService implements Service, 
Instrumentable {
     public Set<String> getInterruptTypes() {
         return interruptTypes;
     }
+
+    public synchronized long getThreadActiveCount() {
+        return executor.getActiveCount();
+    }
+
+    public long getQueueThreadsNumber() {
+        return queueThreadsNumber;
+    }
+
+    public <T> Future<T> submit(CallableWrapper<T> task) throws 
InterruptedException {
+        return executor.submit((Callable<T>) task);
+    }
+
+    /*
+     * Refer to AbstractExecutorService.invokeAll of JDK and it will wait 
until all futures finished.
+     */
+    public <T> void blockingWait(List<Future<T>> futures) throws 
InterruptedException {
+        try {
+            for(Future<T> future : futures) {
+                if (!future.isDone()) {
+                    try {
+                        future.get();
+                    } catch (ExecutionException | CancellationException e) {
+                        // no-op
+                    }
+                }
+            }
+        } catch (Throwable throwable) {
+            for(Future<T> future : futures) {
+                future.cancel(true);
+            }
+            throw throwable;
+        }
+    }
 }
diff --git 
a/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java 
b/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java
index bff7e80..bff2a03 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java
@@ -19,12 +19,18 @@
 package org.apache.oozie.command.wf;
 
 import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
 import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
 import java.io.Reader;
 import java.io.Writer;
+import java.net.URI;
 import java.nio.charset.StandardCharsets;
 import java.util.Properties;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Appender;
@@ -37,8 +43,11 @@ import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
 import org.apache.oozie.local.LocalOozie;
+import org.apache.oozie.service.CallableQueueService;
 import org.apache.oozie.service.ConfigurationService;
+import org.apache.oozie.service.ExtendedCallableQueueService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.test.XDataTestCase;
 import org.apache.oozie.util.IOUtils;
@@ -314,4 +323,91 @@ public class TestSignalXCommand extends XDataTestCase {
         assertEquals(prepActions.length, numPrep);
         assertEquals(okActions.length, numOK);
     }
+
+    private void writeToFile(String appXml, String appPath) throws IOException 
{
+        File wf = new File(URI.create(appPath));
+        try(PrintWriter out = new PrintWriter(new OutputStreamWriter(new 
FileOutputStream(wf), StandardCharsets.UTF_8))) {
+            out.println(appXml);
+        }
+    }
+
+    /*
+     *  This test case is just to test possible dead-lock when
+     *  the conf of oozie.workflow.parallel.fork.action.start
+     *  is enabled.
+     *
+     *  Details could be linked to OOZIE-3646
+     */
+    public void testPossibleDeadLock() throws Exception {
+        setSystemProperty(Services.CONF_SERVICE_EXT_CLASSES, 
ExtendedCallableQueueService.class.getName());
+
+        services = new Services();
+        Configuration servicesConf = services.getConf();
+        servicesConf.setInt(CallableQueueService.CONF_THREADS, 1);
+        services.init();
+
+        
ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, 
true);
+
+        Configuration conf = new XConfiguration();
+        String workflowUri = getTestCaseFileUri("workflow.xml");
+        //@formatter:off
+        String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:0.4\" 
name=\"wf-fork\">"
+                + "<start to=\"fork1\"/>"
+                + "<fork name=\"fork1\">"
+                + "<path start=\"action1\"/>"
+                + "<path start=\"action2\"/>"
+                + "<path start=\"action3\"/>"
+                + "<path start=\"action4\"/>"
+                + "<path start=\"action5\"/>"
+                + "</fork>"
+                + "<action name=\"action1\">"
+                + "<fs></fs>"
+                + "<ok to=\"join1\"/>"
+                + "<error to=\"kill\"/>"
+                + "</action>"
+                + "<action name=\"action2\">"
+                + "<fs></fs><ok to=\"join1\"/>"
+                + "<error to=\"kill\"/>"
+                + "</action>"
+                + "<action name=\"action3\">"
+                + "<fs></fs><ok to=\"join1\"/>"
+                + "<error to=\"kill\"/>"
+                + "</action>"
+                + "<action name=\"action4\">"
+                + "<fs></fs><ok to=\"join1\"/>"
+                + "<error to=\"kill\"/>"
+                + "</action>"
+                + "<action name=\"action5\">"
+                + "<fs></fs><ok to=\"join1\"/>"
+                + "<error to=\"kill\"/>"
+                + "</action>"
+                + "<join name=\"join1\" to=\"end\"/>"
+                + "<kill name=\"kill\"><message>killed</message>"
+                + "</kill><"
+                + "end name=\"end\"/>"
+                + "</workflow-app>";
+        //@Formatter:on
+
+        writeToFile(appXml, workflowUri);
+        conf.set(OozieClient.APP_PATH, workflowUri);
+        conf.set(OozieClient.USER_NAME, getTestUser());
+
+        SubmitXCommand sc = new SubmitXCommand(conf);
+        final String jobId = sc.call();
+        new StartXCommand(jobId).call();
+
+        waitFor(20 * 1000, new Predicate() {
+            @Override
+            public boolean evaluate() throws Exception {
+                return WorkflowJobQueryExecutor.getInstance()
+                        
.get(WorkflowJobQueryExecutor.WorkflowJobQuery.GET_WORKFLOW, jobId)
+                        .getStatus() == WorkflowJob.Status.SUCCEEDED;
+            }
+        });
+
+        assertEquals(WorkflowJobQueryExecutor.getInstance()
+                        
.get(WorkflowJobQueryExecutor.WorkflowJobQuery.GET_WORKFLOW, jobId)
+                        .getStatus(),
+                WorkflowJob.Status.SUCCEEDED);
+    }
 }
diff --git a/release-log.txt b/release-log.txt
index 930fd31..220f21c 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.3.0 release (trunk - unreleased)
 
+OOZIE-3646 Possible dead-lock in SignalXCommand (zuston via dionusos)
 OOZIE-3652 Oozie launcher should retry directory listing when 
NoSuchFileException occurs (aajisaka via dionusos)
 OOZIE-3655 upgrade jdom to jdom2 2.0.6.1 (pj.fanning via dionusos)
 OOZIE-3653 Upgrade commons-io to 2.11.0 (groot via dionusos)

Reply via email to