This is an automated email from the ASF dual-hosted git repository.
dionusos pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/oozie.git
The following commit(s) were added to refs/heads/master by this push:
new 1d3530c OOZIE-3646 Possible dead-lock in SignalXCommand (zuston via
dionusos)
1d3530c is described below
commit 1d3530cb26642d460dabd373dcb99c8cf29e54a3
Author: Denes Bodo <[email protected]>
AuthorDate: Thu Feb 10 16:01:55 2022 +0100
OOZIE-3646 Possible dead-lock in SignalXCommand (zuston via dionusos)
---
.../apache/oozie/command/wf/SignalXCommand.java | 55 +++++++++++--
.../apache/oozie/service/CallableQueueService.java | 40 +++++++++
.../oozie/command/wf/TestSignalXCommand.java | 96 ++++++++++++++++++++++
release-log.txt | 1 +
4 files changed, 183 insertions(+), 9 deletions(-)
diff --git a/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
b/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
index 22c791b..4db55cc 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
@@ -137,7 +137,8 @@ public class SignalXCommand extends WorkflowXCommand<Void> {
this.wfJob =
WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW,
jobId);
LogUtils.setLogInfo(wfJob);
if (actionId != null) {
- this.wfAction =
WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION_SIGNAL,
actionId);
+ this.wfAction = WorkflowActionQueryExecutor.getInstance()
+ .get(WorkflowActionQuery.GET_ACTION_SIGNAL,
actionId);
LogUtils.setLogInfo(wfAction);
}
}
@@ -468,21 +469,57 @@ public class SignalXCommand extends
WorkflowXCommand<Void> {
public void startForkedActions(List<WorkflowActionBean>
workflowActionBeanListForForked) throws CommandException {
- List<CallableWrapper<ActionExecutorContext>> tasks = new
ArrayList<CallableWrapper<ActionExecutorContext>>();
List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
List<JsonBean> insertList = new ArrayList<JsonBean>();
boolean endWorkflow = false;
boolean submitJobByQueuing = false;
- for (WorkflowActionBean workflowActionBean :
workflowActionBeanListForForked) {
- LOG.debug("Starting forked actions parallely : " +
workflowActionBean.getId());
- tasks.add(Services.get().get(CallableQueueService.class).new
CallableWrapper<ActionExecutorContext>(
- new ForkedActionStartXCommand(wfJob,
workflowActionBean.getId(), workflowActionBean.getType()), 0));
- }
try {
- List<Future<ActionExecutorContext>> futures =
Services.get().get(CallableQueueService.class)
- .invokeAll(tasks);
+ /*
+ * The limited thread execution mechanism aims to solve the
dead-lock when all active threads are
+ * executing the SignalXCommand's invokeAll method.
+ *
+ * Solution
+ * 1. Need to limit directly invokeAll call when the num of rest
threads is less than the tasks
+ * 2. To obtain correct active threads number in callableQueue,
the SignalXCommand.class lock is needed.
+ *
+ */
+ CallableQueueService callableQueueService =
Services.get().get(CallableQueueService.class);
+ List<Future<ActionExecutorContext>> futures = new ArrayList<>();
+
+ synchronized (SignalXCommand.class) {
+ long limitedRestThreadNum =
+ callableQueueService.getQueueThreadsNumber() -
callableQueueService.getThreadActiveCount();
+ if (limitedRestThreadNum <
workflowActionBeanListForForked.size()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Limited callable queue rest threads number:
"
+ + limitedRestThreadNum + ", needed forked task
size: "
+ + workflowActionBeanListForForked.size()
+ + ", tasks will be submitted to queue by async
mode.");
+ }
+ submitJobByQueuing = true;
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Starting forked actions parallely: " +
workflowActionBeanListForForked);
+ }
+ for (WorkflowActionBean workflowActionBean :
workflowActionBeanListForForked) {
+ futures.add(
+
callableQueueService.submit(callableQueueService.new
CallableWrapper<ActionExecutorContext>(
+ new ForkedActionStartXCommand(wfJob,
+ workflowActionBean.getId(),
workflowActionBean.getType()), 0))
+ );
+ }
+
+ long startTime = System.currentTimeMillis();
+ callableQueueService.blockingWait(futures);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Execution time of forked actions parallely:
"
+ + (System.currentTimeMillis() - startTime) /
1000 + " sec");
+ }
+ }
+ }
+
for (Future<ActionExecutorContext> result : futures) {
if (result == null) {
submitJobByQueuing = true;
diff --git
a/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
b/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
index d1222bc..f158a15 100644
--- a/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
+++ b/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
@@ -30,7 +30,9 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadPoolExecutor;
@@ -114,6 +116,8 @@ public class CallableQueueService implements Service,
Instrumentable {
private int queueAwaitTerminationTimeoutSeconds;
+ private long queueThreadsNumber;
+
private boolean callableBegin(XCallable<?> callable) {
synchronized (activeCallables) {
AtomicInteger counter = activeCallables.get(callable.getType());
@@ -584,6 +588,8 @@ public class CallableQueueService implements Service,
Instrumentable {
}
maxCallableConcurrency = ConfigurationService.getInt(conf,
CONF_CALLABLE_CONCURRENCY);
+
+ queueThreadsNumber = threads;
}
/**
@@ -872,4 +878,38 @@ public class CallableQueueService implements Service,
Instrumentable {
public Set<String> getInterruptTypes() {
return interruptTypes;
}
+
+ public synchronized long getThreadActiveCount() {
+ return executor.getActiveCount();
+ }
+
+ public long getQueueThreadsNumber() {
+ return queueThreadsNumber;
+ }
+
+ public <T> Future<T> submit(CallableWrapper<T> task) throws
InterruptedException {
+ return executor.submit((Callable<T>) task);
+ }
+
+ /*
+ * Refer to AbstractExecutorService.invokeAll of JDK and it will wait
until all futures finished.
+ */
+ public <T> void blockingWait(List<Future<T>> futures) throws
InterruptedException {
+ try {
+ for(Future<T> future : futures) {
+ if (!future.isDone()) {
+ try {
+ future.get();
+ } catch (ExecutionException | CancellationException e) {
+ // no-op
+ }
+ }
+ }
+ } catch (Throwable throwable) {
+ for(Future<T> future : futures) {
+ future.cancel(true);
+ }
+ throw throwable;
+ }
+ }
}
diff --git
a/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java
b/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java
index bff7e80..bff2a03 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java
@@ -19,12 +19,18 @@
package org.apache.oozie.command.wf;
import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
import java.io.Reader;
import java.io.Writer;
+import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Appender;
@@ -37,8 +43,11 @@ import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
import org.apache.oozie.local.LocalOozie;
+import org.apache.oozie.service.CallableQueueService;
import org.apache.oozie.service.ConfigurationService;
+import org.apache.oozie.service.ExtendedCallableQueueService;
import org.apache.oozie.service.Services;
import org.apache.oozie.test.XDataTestCase;
import org.apache.oozie.util.IOUtils;
@@ -314,4 +323,91 @@ public class TestSignalXCommand extends XDataTestCase {
assertEquals(prepActions.length, numPrep);
assertEquals(okActions.length, numOK);
}
+
+ private void writeToFile(String appXml, String appPath) throws IOException
{
+ File wf = new File(URI.create(appPath));
+ try(PrintWriter out = new PrintWriter(new OutputStreamWriter(new
FileOutputStream(wf), StandardCharsets.UTF_8))) {
+ out.println(appXml);
+ }
+ }
+
+ /*
+ * This test case is just to test possible dead-lock when
+ * the conf of oozie.workflow.parallel.fork.action.start
+ * is enabled.
+ *
+ * Details could be linked to OOZIE-3646
+ */
+ public void testPossibleDeadLock() throws Exception {
+ setSystemProperty(Services.CONF_SERVICE_EXT_CLASSES,
ExtendedCallableQueueService.class.getName());
+
+ services = new Services();
+ Configuration servicesConf = services.getConf();
+ servicesConf.setInt(CallableQueueService.CONF_THREADS, 1);
+ services.init();
+
+
ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION,
true);
+
+ Configuration conf = new XConfiguration();
+ String workflowUri = getTestCaseFileUri("workflow.xml");
+ //@formatter:off
+ String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:0.4\"
name=\"wf-fork\">"
+ + "<start to=\"fork1\"/>"
+ + "<fork name=\"fork1\">"
+ + "<path start=\"action1\"/>"
+ + "<path start=\"action2\"/>"
+ + "<path start=\"action3\"/>"
+ + "<path start=\"action4\"/>"
+ + "<path start=\"action5\"/>"
+ + "</fork>"
+ + "<action name=\"action1\">"
+ + "<fs></fs>"
+ + "<ok to=\"join1\"/>"
+ + "<error to=\"kill\"/>"
+ + "</action>"
+ + "<action name=\"action2\">"
+ + "<fs></fs><ok to=\"join1\"/>"
+ + "<error to=\"kill\"/>"
+ + "</action>"
+ + "<action name=\"action3\">"
+ + "<fs></fs><ok to=\"join1\"/>"
+ + "<error to=\"kill\"/>"
+ + "</action>"
+ + "<action name=\"action4\">"
+ + "<fs></fs><ok to=\"join1\"/>"
+ + "<error to=\"kill\"/>"
+ + "</action>"
+ + "<action name=\"action5\">"
+ + "<fs></fs><ok to=\"join1\"/>"
+ + "<error to=\"kill\"/>"
+ + "</action>"
+ + "<join name=\"join1\" to=\"end\"/>"
+ + "<kill name=\"kill\"><message>killed</message>"
+ + "</kill><"
+ + "end name=\"end\"/>"
+ + "</workflow-app>";
+ //@Formatter:on
+
+ writeToFile(appXml, workflowUri);
+ conf.set(OozieClient.APP_PATH, workflowUri);
+ conf.set(OozieClient.USER_NAME, getTestUser());
+
+ SubmitXCommand sc = new SubmitXCommand(conf);
+ final String jobId = sc.call();
+ new StartXCommand(jobId).call();
+
+ waitFor(20 * 1000, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return WorkflowJobQueryExecutor.getInstance()
+
.get(WorkflowJobQueryExecutor.WorkflowJobQuery.GET_WORKFLOW, jobId)
+ .getStatus() == WorkflowJob.Status.SUCCEEDED;
+ }
+ });
+
+ assertEquals(WorkflowJobQueryExecutor.getInstance()
+
.get(WorkflowJobQueryExecutor.WorkflowJobQuery.GET_WORKFLOW, jobId)
+ .getStatus(),
+ WorkflowJob.Status.SUCCEEDED);
+ }
}
diff --git a/release-log.txt b/release-log.txt
index 930fd31..220f21c 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 5.3.0 release (trunk - unreleased)
+OOZIE-3646 Possible dead-lock in SignalXCommand (zuston via dionusos)
OOZIE-3652 Oozie launcher should retry directory listing when
NoSuchFileException occurs (aajisaka via dionusos)
OOZIE-3655 upgrade jdom to jdom2 2.0.6.1 (pj.fanning via dionusos)
OOZIE-3653 Upgrade commons-io to 2.11.0 (groot via dionusos)