This is an automated email from the ASF dual-hosted git repository.
dionusos pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/oozie.git
The following commit(s) were added to refs/heads/master by this push:
new 13837d9b1 OOZIE-3670 Actions can stuck while running in a Fork-Join
workflow (jmakai via dionusos)
13837d9b1 is described below
commit 13837d9b16c793eea2d9ce40052d956417102450
Author: Denes Bodo <[email protected]>
AuthorDate: Wed Nov 30 09:10:07 2022 +0100
OOZIE-3670 Actions can stuck while running in a Fork-Join workflow (jmakai
via dionusos)
---
.../java/org/apache/oozie/WorkflowActionBean.java | 7 +-
.../java/org/apache/oozie/command/XCommand.java | 34 ++++++++-
.../oozie/command/wf/ActionCheckXCommand.java | 18 ++++-
.../apache/oozie/command/wf/ActionEndXCommand.java | 33 ++++++---
.../apache/oozie/command/wf/SignalXCommand.java | 15 ++++
...utsideOfProvidedActionGetForJobJPAExecutor.java | 65 +++++++++++++++++
.../oozie/command/wf/TestActionCheckXCommand.java | 82 ++++++++++------------
.../oozie/command/wf/TestActionEndXCommand.java | 67 ++++++++++++++++++
.../java/org/apache/oozie/test/XDataTestCase.java | 76 ++++++++++++++++++++
release-log.txt | 1 +
10 files changed, 340 insertions(+), 58 deletions(-)
diff --git a/core/src/main/java/org/apache/oozie/WorkflowActionBean.java
b/core/src/main/java/org/apache/oozie/WorkflowActionBean.java
index 2d75e1670..f47969c9b 100644
--- a/core/src/main/java/org/apache/oozie/WorkflowActionBean.java
+++ b/core/src/main/java/org/apache/oozie/WorkflowActionBean.java
@@ -155,7 +155,12 @@ import org.json.simple.JSONObject;
+ "= 'END_MANUAL')"),
@NamedQuery(name = "GET_ACTIONS_FOR_WORKFLOW_RERUN", query = "select a.id,
a.name, a.statusStr, a.endTimestamp, a.type "
- + "from WorkflowActionBean a where a.wfId = :wfId order by
a.startTimestamp") })
+ + "from WorkflowActionBean a where a.wfId = :wfId order by
a.startTimestamp"),
+
+ @NamedQuery(name = "GET_ACTIONS_FAILED_OUTSIDE_OF_PROVIDED_ACTION", query
= "select OBJECT(a) from "
+ + "WorkflowActionBean a where a.wfId = :wfId AND a.id <> :actionId
AND a.statusStr = 'FAILED' order by "
+ + "a.startTimestamp")})
+
@Table(name = "WF_ACTIONS")
public class WorkflowActionBean implements Writable, WorkflowAction, JsonBean {
@Id
diff --git a/core/src/main/java/org/apache/oozie/command/XCommand.java
b/core/src/main/java/org/apache/oozie/command/XCommand.java
index 28918b6f7..1919f95d4 100644
--- a/core/src/main/java/org/apache/oozie/command/XCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/XCommand.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,11 +20,16 @@ package org.apache.oozie.command;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.FaultInjection;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.XException;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import
org.apache.oozie.executor.jpa.WorkflowActionsFailedOutsideOfProvidedActionGetForJobJPAExecutor;
import org.apache.oozie.service.CallableQueueService;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.InstrumentationService;
+import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.MemoryLocksService;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.Instrumentation;
@@ -547,4 +552,31 @@ public abstract class XCommand<T> implements XCallable<T> {
public String toString() {
return getKey();
}
+
+ /**
+ * Checks whether the given workflow job contains at least one failed
action except for the action which this
+ * function was called with
+ *
+ * @param wfJob the workflow job
+ * @param wfAction the workflow action
+ * @return true if there is a failed action outside of the action which
this function was called with
+ * @throws CommandException in case a missing JPAService
+ */
+ public boolean isOtherActionFailedUnderJob(WorkflowJobBean wfJob,
WorkflowActionBean wfAction) throws CommandException {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ if (jpaService == null) {
+ throw new CommandException(ErrorCode.E0610);
+ }
+
+ List<WorkflowActionBean> actionList = null;
+ try {
+ // Getting the failed actions of the the given job outside of the
given action
+ actionList = jpaService.execute(new
WorkflowActionsFailedOutsideOfProvidedActionGetForJobJPAExecutor(
+ wfJob.getId(), wfAction.getId()));
+ } catch (JPAExecutorException e) {
+ LOG.error("Could not get the actions of job [{0}]", wfJob.getId(),
e);
+ }
+
+ return actionList != null && actionList.size() > 0;
+ }
}
diff --git
a/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
b/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
index 335527d65..0cdfa5aa0 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -159,6 +159,22 @@ public class ActionCheckXCommand extends
ActionXCommand<Void> {
catch (JPAExecutorException e) {
throw new CommandException(e);
}
+
+ // In case of forked actions there might be a case when an action
- running in parallel - fails.
+ // In that case in the same fork, an other running action would
not pass the precondition
+ // check, as the workflow job itself gets failed as well because
of the other action's failure.
+ // This behaviour leads to the incidence that the action will
stick in RUNNING phase.
+ // Hence the below method is responsible for recognizing those
scenarios.
+
+ // If there is an (other) action which's status is FAILED in the
same workflow job of this action
+ // to be checked, then it means this action was launched in
parallel (with that other action),
+ // because otherwise the workflow job would not have transitioned
to this action due to the
+ // other workflow's failure.
+ if (isOtherActionFailedUnderJob(wfJob, wfAction)) {
+ // Skipping throwing exception, therefore preventing this
action to be stuck in RUNNING phase
+ return;
+ }
+
throw new PreconditionException(ErrorCode.E0818, wfAction.getId(),
wfJob.getId(), wfJob.getStatus());
}
}
diff --git
a/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
b/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
index 8bf4fc75e..08f4f8f4a 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
@@ -128,23 +128,38 @@ public class ActionEndXCommand extends
ActionXCommand<Void> {
if (wfAction == null) {
throw new PreconditionException(ErrorCode.E0605, actionId);
}
- if (wfAction.isPending()
- && (wfAction.getStatus() == WorkflowActionBean.Status.DONE
- || wfAction.getStatus() ==
WorkflowActionBean.Status.END_RETRY || wfAction.getStatus()
- == WorkflowActionBean.Status.END_MANUAL)) {
+
+ executor =
Services.get().get(ActionService.class).getExecutor(wfAction.getType());
+ if (executor == null) {
+ throw new CommandException(ErrorCode.E0802, wfAction.getType());
+ }
+
+ if (wfAction.isPending() && (wfAction.getStatus() ==
WorkflowActionBean.Status.DONE
+ || wfAction.getStatus() == WorkflowActionBean.Status.END_RETRY
+ || wfAction.getStatus() ==
WorkflowActionBean.Status.END_MANUAL)) {
if (wfJob.getStatus() != WorkflowJob.Status.RUNNING) {
+ // In case of forked actions there might be a case when an
action - running in parallel - fails.
+ // In that case in the same fork, an other running action
would not pass the precondition
+ // check, as the workflow job itself gets failed as well
because of the other action's failure.
+ // This behaviour leads to the incidence that the action will
stick in RUNNING phase.
+ // Hence the below method is responsible for recognizing those
scenarios.
+
+ // If there is an (other) action which's status is FAILED in
the same workflow job of this action
+ // to be checked, then it means this action was launched in
parallel (with that other action),
+ // because otherwise the workflow job would not have
transitioned to this action due to the
+ // other workflow's failure.
+ if (isOtherActionFailedUnderJob(wfJob, wfAction)) {
+ // Skipping throwing exception, therefore preventing this
action to be stuck in RUNNING phase
+ return;
+ }
+
throw new PreconditionException(ErrorCode.E0811,
WorkflowJob.Status.RUNNING.toString());
}
}
else {
throw new PreconditionException(ErrorCode.E0812,
wfAction.isPending(), wfAction.getStatusStr());
}
-
- executor =
Services.get().get(ActionService.class).getExecutor(wfAction.getType());
- if (executor == null) {
- throw new CommandException(ErrorCode.E0802, wfAction.getType());
- }
}
@Override
diff --git a/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
b/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
index 4db55ccde..37f55f820 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
@@ -155,6 +155,21 @@ public class SignalXCommand extends WorkflowXCommand<Void>
{
protected void verifyPrecondition() throws CommandException,
PreconditionException {
if ((wfAction == null) || (wfAction.isComplete() &&
wfAction.isPending())) {
if (wfJob.getStatus() != WorkflowJob.Status.RUNNING &&
wfJob.getStatus() != WorkflowJob.Status.PREP) {
+ // In case of forked actions there might be a case when an
action - running in parallel - fails.
+ // In that case in the same fork, an other running action
would not pass the precondition
+ // check, as the workflow job itself gets failed as well
because of the other action's failure.
+ // This behaviour leads to the incidence that the action will
stick in RUNNING phase.
+ // Hence the below method is responsible for recognizing those
scenarios.
+
+ // If there is an (other) action which's status is FAILED n in
the same workflow job of this action
+ // to be checked, then it means this action was launched in
parallel (with that other action),
+ // because otherwise the workflow job would not have
transitioned to this action due to the
+ // other workflow's failure.
+ if (isOtherActionFailedUnderJob(wfJob, wfAction)) {
+ // Skipping throwing exception, therefore preventing this
action to be stuck in RUNNING phase
+ return;
+ }
+
throw new PreconditionException(ErrorCode.E0813,
wfJob.getStatusStr());
}
}
diff --git
a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionsFailedOutsideOfProvidedActionGetForJobJPAExecutor.java
b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionsFailedOutsideOfProvidedActionGetForJobJPAExecutor.java
new file mode 100644
index 000000000..eb7d98a75
--- /dev/null
+++
b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionsFailedOutsideOfProvidedActionGetForJobJPAExecutor.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.executor.jpa;
+
+import java.util.List;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.util.ParamChecker;
+
+/**
+ * Load the list of WorkflowAction for a WorkflowJob and return the list.
+ */
+public class WorkflowActionsFailedOutsideOfProvidedActionGetForJobJPAExecutor
implements JPAExecutor<List<WorkflowActionBean>> {
+
+ private String wfJobId = null;
+ private String wfActionId = null;
+
+ public
WorkflowActionsFailedOutsideOfProvidedActionGetForJobJPAExecutor(String
wfJobId, String wfActionId) {
+ ParamChecker.notEmpty(wfJobId, "wfJobId");
+ ParamChecker.notEmpty(wfActionId, "wfActionId");
+ this.wfJobId = wfJobId;
+ this.wfActionId = wfActionId;
+ }
+
+ @Override
+ public String getName() {
+ return
"WorkflowActionsFailedOutsideOfProvidedActionGetForJobJPAExecutor";
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public List<WorkflowActionBean> execute(EntityManager em) throws
JPAExecutorException {
+ List<WorkflowActionBean> actions;
+ try {
+ Query q =
em.createNamedQuery("GET_ACTIONS_FAILED_OUTSIDE_OF_PROVIDED_ACTION");
+ q.setParameter("wfId", wfJobId);
+ q.setParameter("actionId", wfActionId);
+ actions = q.getResultList();
+ }
+ catch (Exception e) {
+ throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
+ }
+ return actions;
+ }
+}
diff --git
a/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java
b/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java
index 7fca19401..5c612ac70 100644
---
a/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java
+++
b/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -34,6 +34,7 @@ import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.action.ActionExecutor;
import org.apache.oozie.action.ActionExecutorException;
+import org.apache.oozie.action.control.ForkActionExecutor;
import org.apache.oozie.action.hadoop.LauncherHelper;
import org.apache.oozie.action.hadoop.MapReduceActionExecutor;
import org.apache.oozie.action.hadoop.MapperReducerForTest;
@@ -254,6 +255,39 @@ public class TestActionCheckXCommand extends XDataTestCase
{
assertEquals(3L, counterVal);
}
+ /**
+ * Test : verify that ActionCheckXCommand failed the precondition
verification phase when job != RUNNING and
+ * there is an other action which status' is FAILED.
+ *
+ * @throws Exception
+ */
+ public void testParalellyFailedActionInJobContainingFork() throws
Exception {
+ Instrumentation inst =
Services.get().get(InstrumentationService.class).get();
+
+ WorkflowJobBean job =
this.addRecordToWfJobTable(WorkflowJob.Status.FAILED,
WorkflowInstance.Status.FAILED);
+
+ WorkflowActionBean forkAction =
this.addRecordToWfActionTableWithType(job.getId(), "forkAction",
+ WorkflowAction.Status.OK, ForkActionExecutor.TYPE);
+
+ WorkflowActionBean failedAction =
this.addRecordToWfActionTable(job.getId(), "failedAction",
+ WorkflowAction.Status.FAILED);
+
+ WorkflowActionBean greenAction =
this.addRecordToWfActionTable(job.getId(), "greenAction",
+ WorkflowAction.Status.RUNNING);
+
+ ActionCheckXCommand checkCmd = new
ActionCheckXCommand(greenAction.getId());
+
+ checkCmd.call();
+
+ try {
+ // this is supposed to throw NullPointerException
+
inst.getCounters().get(XCommand.INSTRUMENTATION_GROUP).get(checkCmd.getName() +
".preconditionfailed").getValue();
+ fail("A NullPointerException should have been thrown");
+ } catch (NullPointerException expect) {
+ // we should get here
+ }
+ }
+
public void testActionCheck() throws Exception {
JPAService jpaService = Services.get().get(JPAService.class);
WorkflowJobBean job =
this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING,
WorkflowInstance.Status.RUNNING);
@@ -608,7 +642,7 @@ public class TestActionCheckXCommand extends XDataTestCase {
@Override
protected WorkflowActionBean addRecordToWfActionTable(
String wfId, String actionName, WorkflowAction.Status status)
throws Exception {
- WorkflowActionBean action = createWorkflowActionSetPending(wfId,
status);
+ WorkflowActionBean action = createWorkflowActionSetPending(wfId,
actionName, status);
try {
JPAService jpaService = Services.get().get(JPAService.class);
assertNotNull(jpaService);
@@ -622,48 +656,4 @@ public class TestActionCheckXCommand extends XDataTestCase
{
}
return action;
}
-
- protected WorkflowActionBean createWorkflowActionSetPending(String wfId,
WorkflowAction.Status status) throws Exception {
- WorkflowActionBean action = new WorkflowActionBean();
- String actionname = "testAction";
- action.setName(actionname);
-
action.setId(Services.get().get(UUIDService.class).generateChildId(wfId,
actionname));
- action.setJobId(wfId);
- action.setType("map-reduce");
- action.setTransition("transition");
- action.setStatus(status);
- action.setStartTime(new Date());
- action.setEndTime(new Date());
- action.setLastCheckTime(new Date());
- action.setPending();
- action.setExecutionPath("/");
- action.setUserRetryMax(2);
-
- Path inputDir = new Path(getFsTestCaseDir(), "input");
- Path outputDir = new Path(getFsTestCaseDir(), "output");
-
- FileSystem fs = getFileSystem();
- Writer w = new OutputStreamWriter(fs.create(new Path(inputDir,
"data.txt")), StandardCharsets.UTF_8);
- w.write("dummy\n");
- w.write("dummy\n");
- w.close();
-
- String actionXml = "<map-reduce>" +
- "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
- "<name-node>" + getNameNodeUri() + "</name-node>" +
- "<prepare><delete path=\"" + outputDir.toString() + "\"/></prepare>" +
- "<configuration>" +
- "<property><name>mapred.mapper.class</name><value>" +
MapperReducerForTest.class.getName() +
- "</value></property>" +
- "<property><name>mapred.reducer.class</name><value>" +
MapperReducerForTest.class.getName() +
- "</value></property>" +
-
"<property><name>mapred.input.dir</name><value>"+inputDir.toString()+"</value></property>"
+
-
"<property><name>mapred.output.dir</name><value>"+outputDir.toString()+"</value></property>"
+
- "</configuration>" +
- "</map-reduce>";
- action.setConf(actionXml);
-
- return action;
- }
-
}
diff --git
a/core/src/test/java/org/apache/oozie/command/wf/TestActionEndXCommand.java
b/core/src/test/java/org/apache/oozie/command/wf/TestActionEndXCommand.java
new file mode 100644
index 000000000..0b212c17a
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestActionEndXCommand.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.wf;
+
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.action.control.ForkActionExecutor;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.command.XCommand;
+import org.apache.oozie.service.InstrumentationService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.util.Instrumentation;
+import org.apache.oozie.workflow.WorkflowInstance;
+
+public class TestActionEndXCommand extends XDataTestCase {
+
+ /**
+ * Test : verify that ActionEndXCommand failed the precondition
verification phase when job != RUNNING and
+ * there is an other action which status' is FAILED.
+ *
+ * @throws Exception
+ */
+ public void testParalellyFailedActionInJobContainingFork() throws
Exception {
+ Instrumentation inst =
Services.get().get(InstrumentationService.class).get();
+
+ WorkflowJobBean job =
this.addRecordToWfJobTable(WorkflowJob.Status.FAILED,
WorkflowInstance.Status.FAILED);
+
+ WorkflowActionBean forkAction =
this.addRecordToWfActionTableWithType(job.getId(), "forkAction",
+ WorkflowAction.Status.OK, ForkActionExecutor.TYPE);
+
+ WorkflowActionBean failedAction =
this.addRecordToWfActionTable(job.getId(), "failedAction",
+ WorkflowAction.Status.FAILED);
+
+ WorkflowActionBean greenAction =
this.addRecordToWfActionTable(job.getId(), "greenAction",
+ WorkflowAction.Status.DONE, "/", true);
+
+ ActionEndXCommand checkCmd = new
ActionEndXCommand(greenAction.getId(), greenAction.getType());
+
+ checkCmd.call();
+
+ try {
+ // this is supposed to throw NullPointerException
+
inst.getCounters().get(XCommand.INSTRUMENTATION_GROUP).get(checkCmd.getName() +
".preconditionfailed").getValue();
+ fail("A NullPointerException should have been thrown");
+ } catch (NullPointerException expect) {
+ // we should get here
+ }
+ }
+}
diff --git a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
index db22103eb..8e5d69aa0 100644
--- a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
+++ b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
@@ -1972,4 +1972,80 @@ public abstract class XDataTestCase extends
XHCatTestCase {
assertEquals(stat, action.getStatus());
return action;
}
+
+ protected WorkflowActionBean addRecordToWfActionTableWithType(String wfId,
String actionName, WorkflowAction.Status status,
+ String type)
throws Exception {
+ WorkflowActionBean action = createWorkflowActionSetPending(wfId,
actionName, status);
+ action.setType(type);
+ try {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+ WorkflowActionInsertJPAExecutor actionInsertCmd = new
WorkflowActionInsertJPAExecutor(action);
+ jpaService.execute(actionInsertCmd);
+ }
+ catch (JPAExecutorException ce) {
+ ce.printStackTrace();
+ fail("Unable to insert the test wf action record to table");
+ throw ce;
+ }
+ return action;
+ }
+
+ protected WorkflowActionBean createWorkflowActionSetPending(String wfId,
WorkflowAction.Status status) throws Exception {
+ return createWorkflowActionSetPending(wfId, "actionName", status);
+ }
+
+ protected WorkflowActionBean createWorkflowActionSetPending(String wfId,
String actionName, WorkflowAction.Status status)
+ throws Exception {
+ WorkflowActionBean action = new WorkflowActionBean();
+ action.setName(actionName);
+
action.setId(Services.get().get(UUIDService.class).generateChildId(wfId,
actionName));
+ action.setJobId(wfId);
+ action.setType("map-reduce");
+ action.setTransition("transition");
+ action.setStatus(status);
+ action.setStartTime(new Date());
+ action.setEndTime(new Date());
+ action.setLastCheckTime(new Date());
+ action.setPending();
+ action.setExecutionPath("/");
+ action.setUserRetryMax(2);
+
+ Path inputDir = new Path(getFsTestCaseDir(), "input");
+ Path outputDir = new Path(getFsTestCaseDir(), "output");
+
+ FileSystem fs = getFileSystem();
+ Writer w = new OutputStreamWriter(fs.create(new Path(inputDir,
"data.txt")), StandardCharsets.UTF_8);
+ w.write("dummy\n");
+ w.write("dummy\n");
+ w.close();
+
+ String actionXml =
+ " <map-reduce>"
+ + " <job-tracker>" + getJobTrackerUri() +
"</job-tracker>"
+ + " <name-node>" + getNameNodeUri() + "</name-node>"
+ + " <prepare><delete path=\"" + outputDir.toString()
+ "\"/></prepare>"
+ + " <configuration>"
+ + " <property>"
+ + " <name>mapred.mapper.class</name>"
+ + " <value>" +
MapperReducerForTest.class.getName() + "</value>"
+ + " </property>"
+ + " <property>"
+ + " <name>mapred.reducer.class</name>"
+ + " <value>" +
MapperReducerForTest.class.getName() + "</value>"
+ + " </property>"
+ + " <property>"
+ + " <name>mapred.input.dir</name>"
+ + " <value>" + inputDir.toString() + "</value>"
+ + " </property>"
+ + " <property>"
+ + " <name>mapred.output.dir</name>"
+ + " <value>" + outputDir.toString() + "</value>"
+ + " </property>"
+ + " </configuration>"
+ + "</map-reduce>";
+ action.setConf(actionXml);
+
+ return action;
+ }
}
diff --git a/release-log.txt b/release-log.txt
index ffee90849..4cfb75bd5 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 5.3.0 release (trunk - unreleased)
+OOZIE-3670 Actions can stuck while running in a Fork-Join workflow (jmakai via
dionusos)
OOZIE-3676 Remove all non FIPS compliant encoding algorithms (jmakai via
dionusos)
OOZIE-3674 Add a --insecure like parameter to Oozie client so it can ignore
certificate errors (jmakai via dionusos)
OOZIE-3673 Add possibility to configure custom SSL/TLS protocols when
executing an email action (jmakai via dionusos)