Repository: oozie Updated Branches: refs/heads/master 72bce837d -> 8bb40f3fa
OOZIE-2796 oozie.action.keep.action.dir not getting notice (zgengxb2005 via gezapeti) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/8bb40f3f Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/8bb40f3f Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/8bb40f3f Branch: refs/heads/master Commit: 8bb40f3fa88c1d17984108a8ec0ff56fd24800f6 Parents: 72bce83 Author: Gezapeti Cseh <[email protected]> Authored: Tue Jun 20 12:08:32 2017 +0200 Committer: Gezapeti Cseh <[email protected]> Committed: Tue Jun 20 12:08:32 2017 +0200 ---------------------------------------------------------------------- .../java/org/apache/oozie/WorkflowJobBean.java | 2 +- .../oozie/action/hadoop/FsActionExecutor.java | 3 +- .../oozie/action/hadoop/JavaActionExecutor.java | 3 +- .../oozie/command/wf/ActionKillXCommand.java | 2 +- .../apache/oozie/command/wf/WfEndXCommand.java | 45 +++++++++++++++----- .../oozie/command/wf/WorkflowXCommand.java | 4 ++ .../executor/jpa/WorkflowJobQueryExecutor.java | 1 + .../jpa/TestWorkflowJobQueryExecutor.java | 3 +- release-log.txt | 1 + 9 files changed, 48 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/8bb40f3f/core/src/main/java/org/apache/oozie/WorkflowJobBean.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/WorkflowJobBean.java b/core/src/main/java/org/apache/oozie/WorkflowJobBean.java index 2042063..028164d 100644 --- a/core/src/main/java/org/apache/oozie/WorkflowJobBean.java +++ b/core/src/main/java/org/apache/oozie/WorkflowJobBean.java @@ -101,7 +101,7 @@ import java.util.List; @NamedQuery(name = "GET_WORKFLOW_ACTION_OP", query = "select w.id, w.user, w.group, w.appName, w.appPath, w.statusStr, w.run, w.parentId, w.logToken, w.wfInstance, w.protoActionConf from WorkflowJobBean w where w.id = :id"), - @NamedQuery(name = "GET_WORKFLOW_KILL", query = "select w.id, w.user, w.group, w.appName, w.appPath, w.statusStr, w.parentId, w.startTimestamp, w.endTimestamp, w.logToken, w.wfInstance, w.slaXml from WorkflowJobBean w where w.id = :id"), + @NamedQuery(name = "GET_WORKFLOW_KILL", query = "select w.id, w.user, w.group, w.appName, w.appPath, w.statusStr, w.parentId, w.startTimestamp, w.endTimestamp, w.logToken, w.wfInstance, w.slaXml, w.protoActionConf from WorkflowJobBean w where w.id = :id"), @NamedQuery(name = "GET_WORKFLOW_RESUME", query = "select w.id, w.user, w.group, w.appName, w.appPath, w.statusStr, w.parentId, w.startTimestamp, w.endTimestamp, w.logToken, w.wfInstance, w.protoActionConf from WorkflowJobBean w where w.id = :id"), http://git-wip-us.apache.org/repos/asf/oozie/blob/8bb40f3f/core/src/main/java/org/apache/oozie/action/hadoop/FsActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/FsActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/FsActionExecutor.java index 85d3efa..0515db6 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/FsActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/FsActionExecutor.java @@ -41,6 +41,7 @@ import org.apache.oozie.action.ActionExecutor; import org.apache.oozie.action.ActionExecutorException; import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.WorkflowAction; +import org.apache.oozie.command.wf.WorkflowXCommand; import org.apache.oozie.dependency.FSURIHandler; import org.apache.oozie.dependency.URIHandler; import org.apache.oozie.service.ConfigurationService; @@ -630,7 +631,7 @@ public class FsActionExecutor extends ActionExecutor { WorkflowAction.Status status = externalStatus.equals("OK") ? WorkflowAction.Status.OK : WorkflowAction.Status.ERROR; context.setEndData(status, getActionSignal(status)); - if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false)) { + if (!context.getProtoActionConf().getBoolean(WorkflowXCommand.KEEP_WF_ACTION_DIR, false)) { try { FileSystem fs = context.getAppFileSystem(); fs.delete(context.getActionDir(), true); http://git-wip-us.apache.org/repos/asf/oozie/blob/8bb40f3f/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java index bf02397..465cd9e 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java @@ -84,6 +84,7 @@ import org.apache.oozie.action.ActionExecutorException; import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.command.coord.CoordActionStartXCommand; +import org.apache.oozie.command.wf.WorkflowXCommand; import org.apache.oozie.service.ConfigurationService; import org.apache.oozie.service.HadoopAccessorException; import org.apache.oozie.service.HadoopAccessorService; @@ -511,7 +512,7 @@ public class JavaActionExecutor extends ActionExecutor { void cleanUpActionDir(FileSystem actionFs, Context context) throws ActionExecutorException { try { Path actionDir = context.getActionDir(); - if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false) + if (!context.getProtoActionConf().getBoolean(WorkflowXCommand.KEEP_WF_ACTION_DIR, false) && actionFs.exists(actionDir)) { actionFs.delete(actionDir, true); } http://git-wip-us.apache.org/repos/asf/oozie/blob/8bb40f3f/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java index ac096cc..61891b8 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java @@ -208,7 +208,7 @@ public class ActionKillXCommand extends ActionXCommand<Void> { FileSystem actionFs = context.getAppFileSystem(); Path actionDir = context.getActionDir(); Path jobDir = actionDir.getParent(); - if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false) + if (!context.getProtoActionConf().getBoolean(KEEP_WF_ACTION_DIR, false) && actionFs.exists(actionDir)) { actionFs.delete(actionDir, true); } http://git-wip-us.apache.org/repos/asf/oozie/blob/8bb40f3f/core/src/main/java/org/apache/oozie/command/wf/WfEndXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/WfEndXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/WfEndXCommand.java index e282d94..0531bb3 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/WfEndXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/WfEndXCommand.java @@ -19,6 +19,7 @@ package org.apache.oozie.command.wf; import java.io.IOException; +import java.io.StringReader; import java.net.URI; import java.net.URISyntaxException; @@ -26,12 +27,16 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.oozie.ErrorCode; +import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.client.WorkflowJob; import org.apache.oozie.command.CommandException; import org.apache.oozie.command.PreconditionException; import org.apache.oozie.service.HadoopAccessorException; import org.apache.oozie.service.HadoopAccessorService; import org.apache.oozie.service.Services; +import org.apache.oozie.util.XConfiguration; + +import com.google.common.annotations.VisibleForTesting; /** * This Command is expected to be called when a Workflow moves to any terminal @@ -40,40 +45,58 @@ import org.apache.oozie.service.Services; */ public class WfEndXCommand extends WorkflowXCommand<Void> { - private WorkflowJob job = null; + private WorkflowJobBean jobBean = null; - public WfEndXCommand(WorkflowJob job) { + public WfEndXCommand(WorkflowJobBean jobBean) { super("wf_end", "wf_end", 1); - this.job = job; + this.jobBean = jobBean; } @Override protected Void execute() throws CommandException { - LOG.debug("STARTED WFEndXCommand " + job.getId()); + LOG.debug("STARTED WFEndXCommand " + jobBean.getId()); deleteWFDir(); - LOG.debug("ENDED WFEndXCommand " + job.getId()); + LOG.debug("ENDED WFEndXCommand " + jobBean.getId()); return null; } private void deleteWFDir() throws CommandException { FileSystem fs; try { - fs = getAppFileSystem(job); - String wfDir = Services.get().getSystemId() + "/" + job.getId(); + fs = getAppFileSystem(jobBean); + String wfDir = Services.get().getSystemId() + "/" + jobBean.getId(); Path wfDirPath = new Path(fs.getHomeDirectory(), wfDir); + LOG.debug("WF tmp dir :" + wfDirPath); - if (fs.exists(wfDirPath)) { + boolean keepActionDir = keepWfActionDir(); + if (!keepActionDir && fs.exists(wfDirPath)) { fs.delete(wfDirPath, true); } + else if (keepActionDir) { + LOG.debug(KEEP_WF_ACTION_DIR + " is set to true"); + } else { LOG.debug("Tmp dir doesn't exist :" + wfDirPath); } } catch (Exception e) { - LOG.error("Unable to delete WF temp dir of wf id :" + job.getId(), e); - throw new CommandException(ErrorCode.E0819, job.getId(), e); + LOG.error("Unable to delete WF temp dir of wf id :" + jobBean.getId(), e); + throw new CommandException(ErrorCode.E0819, jobBean.getId(), e); } + } + + @VisibleForTesting + protected boolean keepWfActionDir() throws IOException { + if (jobBean.getProtoActionConf() == null) { + return false; + } + Configuration wfConf = getWfConfiguration(); + return wfConf.getBoolean(KEEP_WF_ACTION_DIR, false); + } + @VisibleForTesting + protected Configuration getWfConfiguration() throws IOException { + return new XConfiguration(new StringReader(jobBean.getProtoActionConf())); } protected FileSystem getAppFileSystem(WorkflowJob workflow) throws HadoopAccessorException, IOException, @@ -86,7 +109,7 @@ public class WfEndXCommand extends WorkflowXCommand<Void> { @Override public String getEntityKey() { - return job.getId(); + return jobBean.getId(); } @Override http://git-wip-us.apache.org/repos/asf/oozie/blob/8bb40f3f/core/src/main/java/org/apache/oozie/command/wf/WorkflowXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/WorkflowXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/WorkflowXCommand.java index bc80dfe..87d7e77 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/WorkflowXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/WorkflowXCommand.java @@ -34,6 +34,10 @@ import org.apache.oozie.event.WorkflowJobEvent; */ public abstract class WorkflowXCommand<T> extends XCommand<T> { + // Configuration on whether or not workflow and action directory will be deleted + // after workflow is done. + public static final String KEEP_WF_ACTION_DIR = "oozie.action.keep.action.dir"; + protected static final String INSTR_SUCCEEDED_JOBS_COUNTER_NAME = "succeeded"; protected static final String INSTR_KILLED_JOBS_COUNTER_NAME = "killed"; protected static final String INSTR_FAILED_JOBS_COUNTER_NAME = "failed"; http://git-wip-us.apache.org/repos/asf/oozie/blob/8bb40f3f/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java index 13fa54d..aa622d0 100644 --- a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java +++ b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java @@ -295,6 +295,7 @@ public class WorkflowJobQueryExecutor extends QueryExecutor<WorkflowJobBean, Wor bean.setLogToken((String) arr[9]); bean.setWfInstanceBlob((BinaryBlob) (arr[10])); bean.setSlaXmlBlob((StringBlob) arr[11]); + bean.setProtoActionConfBlob((StringBlob) arr[12]); break; case GET_WORKFLOW_RESUME: bean = new WorkflowJobBean(); http://git-wip-us.apache.org/repos/asf/oozie/blob/8bb40f3f/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobQueryExecutor.java b/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobQueryExecutor.java index c01d3d5..fadd8b9 100644 --- a/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobQueryExecutor.java +++ b/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobQueryExecutor.java @@ -194,6 +194,7 @@ public class TestWorkflowJobQueryExecutor extends XDataTestCase { public void testGet() throws Exception { WorkflowJobBean bean = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING); + assertNotNull(bean.getProtoActionConf()); bean.setStartTime(new Date(System.currentTimeMillis() - 10)); bean.setEndTime(new Date()); WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW, bean); @@ -301,7 +302,7 @@ public class TestWorkflowJobQueryExecutor extends XDataTestCase { assertEquals(ByteBuffer.wrap(bean.getWfInstanceBlob().getBytes()).getInt(), ByteBuffer.wrap(retBean.getWfInstanceBlob().getBytes()).getInt()); assertEquals(bean.getSlaXml(), retBean.getSlaXml()); - assertNull(retBean.getProtoActionConf()); + assertEquals(bean.getProtoActionConf(), retBean.getProtoActionConf()); assertNull(retBean.getConf()); // GET_WORKFLOW_RESUME http://git-wip-us.apache.org/repos/asf/oozie/blob/8bb40f3f/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 05ed49c..cfc94e9 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 5.0.0 release (trunk - unreleased) +OOZIE-2796 oozie.action.keep.action.dir not getting notice (zgengxb2005 via gezapeti) OOZIE-2769 Extend FS action to allow setrep on a file (Artem Ervits via gezapeti) OOZIE-2815 amend - Oozie not always display job log (andras.piros via gezapeti) OOZIE-2946 Include find-sec-bugs plugin (Jan Hentschel via rkanter)
