Repository: oozie
Updated Branches:
  refs/heads/master 8c11f9c7a -> 350ce480e


OOZIE-1993 Rerun fails during join in certain condition (shwethags)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/350ce480
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/350ce480
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/350ce480

Branch: refs/heads/master
Commit: 350ce480edc2fb8abd6c55acd29f01fc05c89675
Parents: 8c11f9c
Author: Shwetha GS <[email protected]>
Authored: Wed May 20 11:58:42 2015 +0530
Committer: Shwetha GS <[email protected]>
Committed: Wed May 20 11:58:42 2015 +0530

----------------------------------------------------------------------
 .../org/apache/oozie/WorkflowActionBean.java    |   2 +-
 .../apache/oozie/command/wf/ReRunXCommand.java  |  10 +-
 .../apache/oozie/command/wf/SignalXCommand.java |   1 +
 .../jpa/WorkflowActionQueryExecutor.java        |   1 +
 .../oozie/service/LiteWorkflowStoreService.java |   5 +-
 .../org/apache/oozie/workflow/WorkflowLib.java  |   3 +-
 .../workflow/lite/LiteWorkflowInstance.java     | 190 ++++++-------------
 .../oozie/workflow/lite/LiteWorkflowLib.java    |   4 +-
 core/src/main/resources/oozie-log4j.properties  |   2 +-
 .../oozie/command/wf/TestReRunXCommand.java     |  19 +-
 core/src/test/resources/rerun-wf-fork.xml       |  14 +-
 release-log.txt                                 |   1 +
 12 files changed, 89 insertions(+), 163 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/350ce480/core/src/main/java/org/apache/oozie/WorkflowActionBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/WorkflowActionBean.java 
b/core/src/main/java/org/apache/oozie/WorkflowActionBean.java
index 6289158..7f00278 100644
--- a/core/src/main/java/org/apache/oozie/WorkflowActionBean.java
+++ b/core/src/main/java/org/apache/oozie/WorkflowActionBean.java
@@ -68,7 +68,7 @@ import org.json.simple.JSONObject;
 
     @NamedQuery(name = "UPDATE_ACTION_END", query = "update WorkflowActionBean 
a set a.stats = :stats, a.errorCode = :errorCode, a.errorMessage = 
:errorMessage, a.retries = :retries, a.endTimestamp = :endTime, a.statusStr = 
:status, a.pending = :pending, a.pendingAgeTimestamp = :pendingAge, 
a.signalValue = :signalValue, a.userRetryCount = :userRetryCount, 
a.externalStatus = :externalStatus where a.id = :id"),
 
-    @NamedQuery(name = "UPDATE_ACTION_PENDING", query = "update 
WorkflowActionBean a set a.pending = :pending, a.pendingAgeTimestamp = 
:pendingAge where a.id = :id"),
+    @NamedQuery(name = "UPDATE_ACTION_PENDING", query = "update 
WorkflowActionBean a set a.pending = :pending, a.pendingAgeTimestamp = 
:pendingAge, a.executionPath = :executionPath where a.id = :id"),
 
     @NamedQuery(name = "UPDATE_ACTION_STATUS_PENDING", query = "update 
WorkflowActionBean a set a.statusStr = :status, a.pending = :pending, 
a.pendingAgeTimestamp = :pendingAge where a.id = :id"),
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/350ce480/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java 
b/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java
index 6138278..bb6d4e7 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java
@@ -178,16 +178,8 @@ public class ReRunXCommand extends WorkflowXCommand<Void> {
             // necessary to ensure propagation of Oozie properties to Hadoop 
calls downstream
             conf = ((XConfiguration) conf).resolve();
 
-            // Prepare the action endtimes map
-            Map<String, Date> actionEndTimes = new HashMap<String, Date>();
-            for (WorkflowActionBean action : actions) {
-                if (action.getEndTime() != null) {
-                    actionEndTimes.put(action.getName(), action.getEndTime());
-                }
-            }
-
             try {
-                newWfInstance = workflowLib.createInstance(app, conf, jobId, 
actionEndTimes);
+                newWfInstance = workflowLib.createInstance(app, conf, jobId);
             }
             catch (WorkflowException e) {
                 throw new CommandException(e);

http://git-wip-us.apache.org/repos/asf/oozie/blob/350ce480/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java 
b/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
index bccac51..1b4b0b6 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
@@ -350,6 +350,7 @@ public class SignalXCommand extends WorkflowXCommand<Void> {
                     WorkflowActionBean oldAction = new WorkflowActionBean();
                     oldAction.setId(newAction.getId());
                     oldAction.setPending();
+                    oldAction.setExecutionPath(newAction.getExecutionPath());
                     updateList.add(new 
UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_PENDING,
                             oldAction));
                     queue(new SignalXCommand(jobId, oldAction.getId()));

http://git-wip-us.apache.org/repos/asf/oozie/blob/350ce480/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
 
b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
index 0e99ae2..4dec9da 100644
--- 
a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
+++ 
b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
@@ -110,6 +110,7 @@ public class WorkflowActionQueryExecutor extends
             case UPDATE_ACTION_PENDING:
                 query.setParameter("pending", actionBean.getPending());
                 query.setParameter("pendingAge", 
actionBean.getPendingAgeTimestamp());
+                query.setParameter("executionPath", 
actionBean.getExecutionPath());
                 query.setParameter("id", actionBean.getId());
                 break;
             case UPDATE_ACTION_STATUS_PENDING:

http://git-wip-us.apache.org/repos/asf/oozie/blob/350ce480/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java 
b/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java
index d661d08..f1f26ec 100644
--- a/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java
+++ b/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java
@@ -93,7 +93,6 @@ public abstract class LiteWorkflowStoreService extends 
WorkflowStoreService {
 
         if (!skipAction) {
             String nodeConf = context.getNodeDef().getConf();
-            String executionPath = context.getExecutionPath();
 
             if (actionType == null) {
                 try {
@@ -107,12 +106,14 @@ public abstract class LiteWorkflowStoreService extends 
WorkflowStoreService {
             }
             log.debug(" Creating action for node [{0}]", nodeName);
             action.setType(actionType);
-            action.setExecutionPath(executionPath);
             action.setConf(nodeConf);
             action.setLogToken(((WorkflowJobBean) 
context.getTransientVar(WORKFLOW_BEAN)).getLogToken());
             action.setStatus(WorkflowAction.Status.PREP);
             action.setJobId(jobId);
         }
+
+        String executionPath = context.getExecutionPath();
+        action.setExecutionPath(executionPath);
         action.setCred(context.getNodeDef().getCred());
         log.debug("Setting action for cred: '"+context.getNodeDef().getCred() +
                        "', name: '"+ context.getNodeDef().getName() + "'");

http://git-wip-us.apache.org/repos/asf/oozie/blob/350ce480/core/src/main/java/org/apache/oozie/workflow/WorkflowLib.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/workflow/WorkflowLib.java 
b/core/src/main/java/org/apache/oozie/workflow/WorkflowLib.java
index ba5b6b1..ccbca62 100644
--- a/core/src/main/java/org/apache/oozie/workflow/WorkflowLib.java
+++ b/core/src/main/java/org/apache/oozie/workflow/WorkflowLib.java
@@ -58,11 +58,10 @@ public interface WorkflowLib {
      * @param app application to create a workflow instance of.
      * @param conf job configuration.
      * @param wfId Workflow ID.
-     * @param actionEndTimes A map of the actions to their endtimes; actions 
with no endtime should be omitted
      * @return the newly created workflow instance.
      * @throws WorkflowException thrown if the instance could not be created.
      */
-    public WorkflowInstance createInstance(WorkflowApp app, Configuration 
conf, String wfId, Map<String, Date> actionEndTimes)
+    public WorkflowInstance createInstance(WorkflowApp app, Configuration 
conf, String wfId)
             throws WorkflowException;
 
     /**

http://git-wip-us.apache.org/repos/asf/oozie/blob/350ce480/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowInstance.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowInstance.java 
b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowInstance.java
index 919c95a..2b13e67 100644
--- 
a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowInstance.java
+++ 
b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowInstance.java
@@ -18,30 +18,26 @@
 
 package org.apache.oozie.workflow.lite;
 
-import org.apache.oozie.service.XLogService;
-import org.apache.oozie.service.DagXLogInfoService;
-import org.apache.oozie.client.OozieClient;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.service.DagXLogInfoService;
+import org.apache.oozie.service.XLogService;
+import org.apache.oozie.util.ParamChecker;
+import org.apache.oozie.util.XConfiguration;
+import org.apache.oozie.util.XLog;
 import org.apache.oozie.workflow.WorkflowApp;
 import org.apache.oozie.workflow.WorkflowException;
 import org.apache.oozie.workflow.WorkflowInstance;
-import org.apache.oozie.util.ParamChecker;
-import org.apache.oozie.util.XLog;
-import org.apache.oozie.util.XConfiguration;
-import org.apache.oozie.ErrorCode;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.io.ByteArrayOutputStream;
-import java.io.ByteArrayInputStream;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -50,9 +46,7 @@ import java.util.Map;
 public class LiteWorkflowInstance implements Writable, WorkflowInstance {
     private static final String TRANSITION_TO = "transition.to";
 
-    private final Date FAR_INTO_THE_FUTURE = new Date(Long.MAX_VALUE);
-
-    private XLog log;
+    private XLog log = XLog.getLog(getClass());
 
     private static String PATH_SEPARATOR = "/";
     private static String ROOT = PATH_SEPARATOR;
@@ -161,7 +155,6 @@ public class LiteWorkflowInstance implements Writable, 
WorkflowInstance {
     private Map<String, NodeInstance> executionPaths = new HashMap<String, 
NodeInstance>();
     private Map<String, String> persistentVars = new HashMap<String, String>();
     private Map<String, Object> transientVars = new HashMap<String, Object>();
-    private ActionEndTimesComparator actionEndTimesComparator = null;
 
     protected LiteWorkflowInstance() {
         log = XLog.getLog(getClass());
@@ -176,11 +169,6 @@ public class LiteWorkflowInstance implements Writable, 
WorkflowInstance {
         status = Status.PREP;
     }
 
-    public LiteWorkflowInstance(LiteWorkflowApp def, Configuration conf, 
String instanceId, Map<String, Date> actionEndTimes) {
-        this(def, conf, instanceId);
-        actionEndTimesComparator = new 
ActionEndTimesComparator(actionEndTimes);
-    }
-
     public synchronized boolean start() throws WorkflowException {
         if (status != Status.PREP) {
             throw new WorkflowException(ErrorCode.E0719);
@@ -196,15 +184,19 @@ public class LiteWorkflowInstance implements Writable, 
WorkflowInstance {
     public synchronized boolean signal(String executionPath, String 
signalValue) throws WorkflowException {
         ParamChecker.notEmpty(executionPath, "executionPath");
         ParamChecker.notNull(signalValue, "signalValue");
-        log.debug(XLog.STD, "Signaling job execution path [{0}] signal value 
[{1}]", executionPath, signalValue);
+
         if (status != Status.RUNNING) {
             throw new WorkflowException(ErrorCode.E0716);
         }
+
         NodeInstance nodeJob = executionPaths.get(executionPath);
+        log.debug(XLog.STD, "Signaling job execution path [{0}] signal value 
[{1}] for node [{2}]", executionPath,
+                signalValue, (nodeJob == null ? null : nodeJob.nodeName));
         if (nodeJob == null) {
             status = Status.FAILED;
             log.error("invalid execution path [{0}]", executionPath);
         }
+
         NodeDef nodeDef = null;
         if (!status.isEndState()) {
             nodeDef = def.getNode(nodeJob.nodeName);
@@ -213,6 +205,7 @@ public class LiteWorkflowInstance implements Writable, 
WorkflowInstance {
                 log.error("invalid transition [{0}]", nodeJob.nodeName);
             }
         }
+
         if (!status.isEndState()) {
             NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass());
             boolean exiting = true;
@@ -260,72 +253,49 @@ public class LiteWorkflowInstance implements Writable, 
WorkflowInstance {
                 if (context.status == Status.KILLED) {
                     status = Status.KILLED;
                     log.debug(XLog.STD, "Completing job, kill node [{0}]", 
nodeJob.nodeName);
-                }
-                else {
-                    if (context.status == Status.FAILED) {
-                        status = Status.FAILED;
-                        log.debug(XLog.STD, "Completing job, fail node [{0}]", 
nodeJob.nodeName);
-                    }
-                    else {
-                        if (context.status == Status.SUCCEEDED) {
-                            status = Status.SUCCEEDED;
-                            log.debug(XLog.STD, "Completing job, end node 
[{0}]", nodeJob.nodeName);
-                        }
-/*
-                else if (context.status == Status.SUSPENDED) {
-                    status = Status.SUSPENDED;
+                } else if (context.status == Status.FAILED) {
+                    status = Status.FAILED;
+                    log.debug(XLog.STD, "Completing job, fail node [{0}]", 
nodeJob.nodeName);
+                } else if (context.status == Status.SUCCEEDED) {
+                    status = Status.SUCCEEDED;
                     log.debug(XLog.STD, "Completing job, end node [{0}]", 
nodeJob.nodeName);
-                }
-*/
-                        else {
-                            for (String fullTransition : fullTransitions) {
-                                // this is the whole trick for forking, we 
need the
-                                // executionpath and the transition
-                                // in the case of no forking last element of
-                                // executionpath is different from transition
-                                // in the case of forking they are the same
-
-                                log.debug(XLog.STD, "Exiting node [{0}] with 
transition[{1}]", nodeJob.nodeName,
-                                          fullTransition);
-
-                                String execPathFromTransition = 
getExecutionPath(fullTransition);
-                                String transition = 
getTransitionNode(fullTransition);
-                                def.validateTransition(nodeJob.nodeName, 
transition);
-
-                                NodeInstance nodeJobInPath = 
executionPaths.get(execPathFromTransition);
-                                if ((nodeJobInPath == null) || 
(!transition.equals(nodeJobInPath.nodeName))) {
-                                    // TODO explain this IF better
-                                    // If the WfJob is signaled with the parent
-                                    // execution executionPath again
-                                    // The Fork node will execute again.. and 
replace
-                                    // the Node WorkflowJobBean
-                                    // so this is required to prevent that..
-                                    // Question : Should we throw an error in 
this case
-                                    // ??
-                                    executionPaths.put(execPathFromTransition, 
new NodeInstance(transition));
-                                    pathsToStart.add(execPathFromTransition);
-                                }
-
-                            }
+                } else {
+                    for (String fullTransition : fullTransitions) {
+                        //this is the whole trick for forking, we need the 
executionpath and the transition.
+                        //in case of no forking, last element of executionpath 
is different from transition.
+                        //in case of forking, they are the same
+
+                        log.debug(XLog.STD, "Exiting node [{0}] with 
transition[{1}]", nodeJob.nodeName,
+                                fullTransition);
+
+                        String execPathFromTransition = 
getExecutionPath(fullTransition);
+                        String transition = getTransitionNode(fullTransition);
+                        def.validateTransition(nodeJob.nodeName, transition);
+
+                        NodeInstance nodeJobInPath = 
executionPaths.get(execPathFromTransition);
+                        if ((nodeJobInPath == null) || 
(!transition.equals(nodeJobInPath.nodeName))) {
+                            // TODO explain this IF better
+                            // If the WfJob is signaled with the parent
+                            // execution executionPath again
+                            // The Fork node will execute again.. and replace
+                            // the Node WorkflowJobBean
+                            // so this is required to prevent that..
+                            // Question : Should we throw an error in this case
+                            // ??
+                            executionPaths.put(execPathFromTransition, new 
NodeInstance(transition));
+                            pathsToStart.add(execPathFromTransition);
+                        }
 
-                            // If we're doing a rerun, then we need to make 
sure to put the actions in pathToStart into the order
-                            // that they ended in.  Otherwise, it could result 
in an error later on in some edge cases.
-                            // e.g. You have a fork with two nodes, A and B, 
that both succeeded, followed by a join and some more
-                            // nodes, some of which failed.  If you do the 
rerun, it will always signal A and then B, even if in the
-                            // original run B signaled first and then A.  By 
sorting this, we maintain the proper signal ordering.
-                            if (actionEndTimesComparator != null && 
pathsToStart.size() > 1) {
-                                Collections.sort(pathsToStart, 
actionEndTimesComparator);
-                            }
+                    }
 
-                            // signal all new synch transitions
-                            for (String pathToStart : pathsToStart) {
-                                signal(pathToStart, "::synch::");
-                            }
-                        }
+                    // signal all new synch transitions
+                    for (String pathToStart : pathsToStart) {
+                        signal(pathToStart, "::synch::");
                     }
                 }
             }
         }
+
         if (status.isEndState()) {
             if (status == Status.FAILED) {
                 List<String> failedNodes = terminateNodes(status);
@@ -341,6 +311,7 @@ public class LiteWorkflowInstance implements Writable, 
WorkflowInstance {
                 }
             }
         }
+
         return status.isEndState();
     }
 
@@ -608,14 +579,6 @@ public class LiteWorkflowInstance implements Writable, 
WorkflowInstance {
             dOut.writeUTF(entry.getKey());
             writeStringAsBytes(entry.getValue(), dOut);
         }
-        if (actionEndTimesComparator != null) {
-            Map<String, Date> actionEndTimes = 
actionEndTimesComparator.getActionEndTimes();
-            dOut.writeInt(actionEndTimes.size());
-            for (Map.Entry<String, Date> entry : actionEndTimes.entrySet()) {
-                dOut.writeUTF(entry.getKey());
-                dOut.writeLong(entry.getValue().getTime());
-            }
-        }
     }
 
     @Override
@@ -647,21 +610,6 @@ public class LiteWorkflowInstance implements Writable, 
WorkflowInstance {
             String vVal = readBytesAsString(dIn);
             persistentVars.put(vName, vVal);
         }
-        int numActionEndTimes = -1;
-        try {
-            numActionEndTimes = dIn.readInt();
-        } catch (IOException ioe) {
-            // This means that there isn't an actionEndTimes, so just ignore
-        }
-        if (numActionEndTimes > 0) {
-            Map<String, Date> actionEndTimes = new HashMap<String, 
Date>(numActionEndTimes);
-            for (int x = 0; x < numActionEndTimes; x++) {
-                String name = dIn.readUTF();
-                long endTime = dIn.readLong();
-                actionEndTimes.put(name, new Date(endTime));
-            }
-            actionEndTimesComparator = new 
ActionEndTimesComparator(actionEndTimes);
-        }
         refreshLog();
     }
 
@@ -716,34 +664,4 @@ public class LiteWorkflowInstance implements Writable, 
WorkflowInstance {
     public int hashCode() {
         return instanceId.hashCode();
     }
-
-    private class ActionEndTimesComparator implements Comparator<String> {
-
-        private final Map<String, Date> actionEndTimes;
-
-        public ActionEndTimesComparator(Map<String, Date> actionEndTimes) {
-            this.actionEndTimes = actionEndTimes;
-        }
-
-        @Override
-        public int compare(String node1, String node2) {
-            Date date1 = null;
-            Date date2 = null;
-            NodeInstance node1Instance = executionPaths.get(node1);
-            if (node1Instance != null) {
-                date1 = this.actionEndTimes.get(node1Instance.nodeName);
-            }
-            NodeInstance node2Instance = executionPaths.get(node2);
-            if (node2Instance != null) {
-                date2 = this.actionEndTimes.get(node2Instance.nodeName);
-            }
-            date1 = (date1 == null) ? FAR_INTO_THE_FUTURE : date1;
-            date2 = (date2 == null) ? FAR_INTO_THE_FUTURE : date2;
-            return date1.compareTo(date2);
-        }
-
-        public Map<String, Date> getActionEndTimes() {
-            return actionEndTimes;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/350ce480/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java 
b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java
index 706cc2a..23df086 100644
--- a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java
+++ b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java
@@ -66,10 +66,10 @@ public abstract class LiteWorkflowLib implements 
WorkflowLib {
     }
 
     @Override
-    public WorkflowInstance createInstance(WorkflowApp app, Configuration 
conf, String wfId, Map<String, Date> actionEndTimes)
+    public WorkflowInstance createInstance(WorkflowApp app, Configuration 
conf, String wfId)
             throws WorkflowException {
         ParamChecker.notNull(app, "app");
         ParamChecker.notNull(wfId, "wfId");
-        return new LiteWorkflowInstance((LiteWorkflowApp) app, conf, wfId, 
actionEndTimes);
+        return new LiteWorkflowInstance((LiteWorkflowApp) app, conf, wfId);
     }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/350ce480/core/src/main/resources/oozie-log4j.properties
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-log4j.properties 
b/core/src/main/resources/oozie-log4j.properties
index 4b532ca..c86b301 100644
--- a/core/src/main/resources/oozie-log4j.properties
+++ b/core/src/main/resources/oozie-log4j.properties
@@ -47,7 +47,7 @@ log4j.logger.org.apache.oozie.command=DEBUG, test
 log4j.logger.org.apache.oozie.wf.service=INFO, test
 log4j.logger.org.apache.oozie.wf.servlet=INFO, test
 log4j.logger.org.apache.oozie.store=DEBUG, test
-log4j.logger.org.apache.oozie.workflow=INFO, test
+log4j.logger.org.apache.oozie.workflow=DEBUG, test
 log4j.logger.org.apache.oozie.service=DEBUG, test
 log4j.logger.org.apache.oozie.servlet=INFO, test
 log4j.logger.org.apache.oozie.sla=DEBUG, test

http://git-wip-us.apache.org/repos/asf/oozie/blob/350ce480/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java 
b/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java
index ce32e51..c9bbd00 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java
@@ -119,7 +119,7 @@ public class TestReRunXCommand extends XDataTestCase {
      * This tests a specific edge case where rerun can fail when there's a 
fork, the actions in the fork succeed, but an action
      * after the fork fails.  Previously, the rerun would step through the 
forked actions in the order they were listed in the
      * fork action's XML; if they happened to finish in a different order, 
this would cause an error during rerun.  This is fixed by
-     * enforcing the same order in LiteWorkflowInstance#signal, which this 
test verifies.
+     * using the new execution path for LiteWorkflowInstance#signal, which 
this test verifies.
      *
      * @throws Exception
      */
@@ -139,7 +139,7 @@ public class TestReRunXCommand extends XDataTestCase {
         conf.setProperty("jobTracker", getJobTrackerUri());
         conf.setProperty(OozieClient.APP_PATH, 
getTestCaseFileUri("workflow.xml"));
         conf.setProperty(OozieClient.USER_NAME, getTestUser());
-        conf.setProperty("cmd3", "echo1");      // expected to fail
+        conf.setProperty("cmd4", "echo1");      //expected to fail
 
         final String jobId1 = wfClient.submit(conf);
         wfClient.start(jobId1);
@@ -149,17 +149,21 @@ public class TestReRunXCommand extends XDataTestCase {
                 return wfClient.getJobInfo(jobId1).getStatus() == 
WorkflowJob.Status.KILLED;
             }
         });
+        wfClient.kill(jobId1);
+
         assertEquals(WorkflowJob.Status.KILLED, 
wfClient.getJobInfo(jobId1).getStatus());
+
         List<WorkflowAction> actions = 
wfClient.getJobInfo(jobId1).getActions();
         assertEquals(WorkflowAction.Status.OK, actions.get(1).getStatus());    
 // fork
         assertEquals(WorkflowAction.Status.OK, actions.get(2).getStatus());    
 // sh1
         assertEquals(WorkflowAction.Status.OK, actions.get(3).getStatus());    
 // sh2
-        assertEquals(WorkflowAction.Status.OK, actions.get(4).getStatus());    
 // join
-        assertEquals(WorkflowAction.Status.ERROR, actions.get(5).getStatus()); 
 // sh3
+        assertEquals(WorkflowAction.Status.OK, actions.get(4).getStatus());    
 // sh3
+        assertEquals(WorkflowAction.Status.OK, actions.get(5).getStatus());    
 // j
+        assertEquals(WorkflowAction.Status.ERROR, actions.get(6).getStatus()); 
 // sh4
 
         // rerun failed node, which is after the fork
         conf.setProperty(OozieClient.RERUN_FAIL_NODES, "true");
-        conf.setProperty("cmd3", "echo");      // expected to succeed
+        conf.setProperty("cmd4", "echo");
 
         wfClient.reRun(jobId1, conf);
         waitFor(200 * 1000, new Predicate() {
@@ -173,8 +177,9 @@ public class TestReRunXCommand extends XDataTestCase {
         assertEquals(WorkflowAction.Status.OK, actions.get(1).getStatus());    
 // fork
         assertEquals(WorkflowAction.Status.OK, actions.get(2).getStatus());    
 // sh1
         assertEquals(WorkflowAction.Status.OK, actions.get(3).getStatus());    
 // sh2
-        assertEquals(WorkflowAction.Status.OK, actions.get(4).getStatus());    
 // join
-        assertEquals(WorkflowAction.Status.OK, actions.get(5).getStatus());    
 // sh3
+        assertEquals(WorkflowAction.Status.OK, actions.get(4).getStatus());    
 // sh3
+        assertEquals(WorkflowAction.Status.OK, actions.get(5).getStatus());    
 // join
+        assertEquals(WorkflowAction.Status.OK, actions.get(6).getStatus());    
 // sh4
     }
 
     /*

http://git-wip-us.apache.org/repos/asf/oozie/blob/350ce480/core/src/test/resources/rerun-wf-fork.xml
----------------------------------------------------------------------
diff --git a/core/src/test/resources/rerun-wf-fork.xml 
b/core/src/test/resources/rerun-wf-fork.xml
index 8fa8f34..9172207 100644
--- a/core/src/test/resources/rerun-wf-fork.xml
+++ b/core/src/test/resources/rerun-wf-fork.xml
@@ -41,15 +41,23 @@
         <shell xmlns="uri:oozie:shell-action:0.3">
             <exec>echo</exec>
         </shell>
+        <ok to="sh3"/>
+        <error to="k"/>
+    </action>
+
+    <action name="sh3">
+        <shell xmlns="uri:oozie:shell-action:0.3">
+            <exec>echo</exec>
+        </shell>
         <ok to="j"/>
         <error to="k"/>
     </action>
 
-    <join name="j" to="sh3"/>
+    <join name="j" to="sh4"/>
 
-    <action name="sh3">
+    <action name="sh4">
         <shell xmlns="uri:oozie:shell-action:0.3">
-            <exec>${cmd3}</exec>
+            <exec>${cmd4}</exec>
         </shell>
         <ok to="end"/>
         <error to="k"/>

http://git-wip-us.apache.org/repos/asf/oozie/blob/350ce480/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index e8df674..23a8601 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -2,6 +2,7 @@
 
 -- Oozie 4.2.0 release (unreleased)
 
+OOZIE-1993 Rerun fails during join in certain condition (shwethags)
 OOZIE-2236 Need to package hive-hcatalog-server-extensions.jar in the hcatalog 
sharelib (venkatnrangan via bzhang)
 OOZIE-2232 Oozie should invalidate bulk write command when "-filter" is 
missing (venkatnrangan via bzhang)
 OOZIE-2224 Add example worklfow.xml for hive in secure mode (venkatnrangan via 
bzhang)

Reply via email to