OOZIE-1950 Coordinator job info should support timestamp (nominal time) 
(shwethags)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/d5cd976a
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/d5cd976a
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/d5cd976a

Branch: refs/remotes/trunk
Commit: d5cd976a80e17f258050c02bd03891e82820b9ac
Parents: 1122898
Author: Shwetha GS <[email protected]>
Authored: Thu Sep 11 11:16:34 2014 +0530
Committer: Shwetha GS <[email protected]>
Committed: Thu Sep 11 11:16:34 2014 +0530

----------------------------------------------------------------------
 .../org/apache/oozie/client/OozieClient.java    |  38 ++--
 .../org/apache/oozie/CoordinatorEngine.java     | 177 ++++++++++-------
 .../oozie/command/coord/CoordJobXCommand.java   |  18 +-
 .../CoordJobGetActionsSubsetJPAExecutor.java    | 113 +++++++----
 .../main/java/org/apache/oozie/util/Pair.java   |  69 +++++++
 .../org/apache/oozie/TestCoordinatorEngine.java |  35 ++--
 .../oozie/TestCoordinatorEngineSimple.java      | 121 ++++++++++--
 ...TestCoordJobGetActionsSubsetJPAExecutor.java | 197 ++++++++++++++++---
 docs/src/site/twiki/DG_CommandLineTool.twiki    |  59 +++---
 release-log.txt                                 |   1 +
 10 files changed, 609 insertions(+), 219 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/d5cd976a/client/src/main/java/org/apache/oozie/client/OozieClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/oozie/client/OozieClient.java 
b/client/src/main/java/org/apache/oozie/client/OozieClient.java
index 363ebd2..d6ff2d0 100644
--- a/client/src/main/java/org/apache/oozie/client/OozieClient.java
+++ b/client/src/main/java/org/apache/oozie/client/OozieClient.java
@@ -18,6 +18,21 @@
 
 package org.apache.oozie.client;
 
+import org.apache.oozie.BuildInfo;
+import org.apache.oozie.client.rest.JsonTags;
+import org.apache.oozie.client.rest.JsonToBean;
+import org.apache.oozie.client.rest.RestConstants;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
@@ -38,22 +53,6 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.Callable;
 
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.transform.Transformer;
-import javax.xml.transform.TransformerFactory;
-import javax.xml.transform.dom.DOMSource;
-import javax.xml.transform.stream.StreamResult;
-
-import org.apache.oozie.BuildInfo;
-import org.apache.oozie.client.rest.JsonTags;
-import org.apache.oozie.client.rest.JsonToBean;
-import org.apache.oozie.client.rest.RestConstants;
-import org.json.simple.JSONArray;
-import org.json.simple.JSONObject;
-import org.json.simple.JSONValue;
-import org.w3c.dom.Document;
-import org.w3c.dom.Element;
-
 /**
  * Client API to submit and manage Oozie workflow jobs against an Oozie 
intance.
  * <p/>
@@ -123,6 +122,8 @@ public class OozieClient {
 
     public static final String FILTER_STATUS = "status";
 
+    public static final String FILTER_NOMINAL_TIME = "nominaltime";
+
     public static final String FILTER_FREQUENCY = "frequency";
 
     public static final String FILTER_ID = "id";
@@ -159,7 +160,7 @@ public class OozieClient {
 
     public static enum SYSTEM_MODE {
         NORMAL, NOWEBSERVICE, SAFEMODE
-    };
+    }
 
     /**
      * debugMode =0 means no debugging. > 0 means debugging on.
@@ -172,7 +173,7 @@ public class OozieClient {
     private JSONArray supportedVersions;
     private final Map<String, String> headers = new HashMap<String, String>();
 
-    private static ThreadLocal<String> USER_NAME_TL = new 
ThreadLocal<String>();
+    private static final ThreadLocal<String> USER_NAME_TL = new 
ThreadLocal<String>();
 
     /**
      * Allows to impersonate other users in the Oozie server. The current user
@@ -1047,7 +1048,6 @@ public class OozieClient {
          *
          * @param reader reader to read into a string.
          * @param maxLen max content length allowed, if -1 there is no limit.
-         * @param ps Printstream of command line interface
          * @throws IOException
          */
         private void sendToOutputStream(Reader reader, int maxLen) throws 
IOException {

http://git-wip-us.apache.org/repos/asf/oozie/blob/d5cd976a/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/CoordinatorEngine.java 
b/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
index 249ae68..c8bbbf7 100644
--- a/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
+++ b/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
@@ -18,21 +18,7 @@
 
 package org.apache.oozie;
 
-import java.io.IOException;
-import java.io.Writer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.StringTokenizer;
-
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.client.CoordinatorAction;
@@ -42,7 +28,6 @@ import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.client.rest.RestConstants;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.coord.CoordActionInfoXCommand;
-import org.apache.oozie.util.CoordActionsInDateRange;
 import org.apache.oozie.command.coord.CoordActionsIgnoreXCommand;
 import org.apache.oozie.command.coord.CoordActionsKillXCommand;
 import org.apache.oozie.command.coord.CoordChangeXCommand;
@@ -60,26 +45,59 @@ import 
org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
 import org.apache.oozie.service.DagXLogInfoService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.XLogStreamingService;
-import org.apache.oozie.util.XLogFilter;
-import org.apache.oozie.util.XLogUserFilterParam;
+import org.apache.oozie.util.CoordActionsInDateRange;
+import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.Pair;
 import org.apache.oozie.util.ParamChecker;
 import org.apache.oozie.util.XLog;
+import org.apache.oozie.util.XLogFilter;
+import org.apache.oozie.util.XLogUserFilterParam;
 
-import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.io.Writer;
+import java.sql.Timestamp;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.StringTokenizer;
 
 public class CoordinatorEngine extends BaseEngine {
-    private static XLog LOG = XLog.getLog(CoordinatorEngine.class);
+    private static final XLog LOG = XLog.getLog(CoordinatorEngine.class);
     public final static String COORD_ACTIONS_LOG_MAX_COUNT = 
"oozie.coord.actions.log.max.count";
     private final static int COORD_ACTIONS_LOG_MAX_COUNT_DEFAULT = 50;
-    private int maxNumActionsForLog;
-    public final static String POSITIVE_FILTER = "positive";
-    public final static String NEGATIVE_FILTER = "negative";
+    private final int maxNumActionsForLog;
+
+    public enum FILTER_COMPARATORS {
+        //This ordering is important, dont change this
+        GREATER_EQUAL(">="), GREATER(">"), LESSTHAN_EQUAL("<="), 
LESSTHAN("<"), NOT_EQUALS("!="), EQUALS("=");
+
+        private final String sign;
+
+        FILTER_COMPARATORS(String sign) {
+            this.sign = sign;
+        }
+
+        public String getSign() {
+            return sign;
+        }
+    }
+
+    public static final String[] VALID_JOB_FILTERS = 
{OozieClient.FILTER_STATUS, OozieClient.FILTER_NOMINAL_TIME};
 
     /**
      * Create a system Coordinator engine, with no user and no group.
      */
     public CoordinatorEngine() {
-        if (Services.get().getConf().getBoolean(USE_XCOMMAND, true) == false) {
+        if (!Services.get().getConf().getBoolean(USE_XCOMMAND, true)) {
             LOG.debug("Oozie CoordinatorEngine is not using XCommands.");
         }
         else {
@@ -161,10 +179,9 @@ public class CoordinatorEngine extends BaseEngine {
     @Override
     public CoordinatorJobBean getCoordJob(String jobId, String filter, int 
offset, int length, boolean desc)
             throws BaseEngineException {
-        Map<String, List<String>> filterMap = parseStatusFilter(filter);
+        Map<Pair<String, FILTER_COMPARATORS>, List<Object>> filterMap = 
parseJobFilter(filter);
         try {
-            return new CoordJobXCommand(jobId, filterMap, offset, length, desc)
-                    .call();
+            return new CoordJobXCommand(jobId, filterMap, offset, length, 
desc).call();
         }
         catch (CommandException ex) {
             throw new BaseEngineException(ex);
@@ -575,7 +592,7 @@ public class CoordinatorEngine extends BaseEngine {
      * @throws CoordinatorEngineException
      */
     public CoordinatorJobInfo getCoordJobs(String filter, int start, int len) 
throws CoordinatorEngineException {
-        Map<String, List<String>> filterList = parseFilter(filter);
+        Map<String, List<String>> filterList = parseJobsFilter(filter);
 
         try {
             return new CoordJobsXCommand(filterList, start, len).call();
@@ -586,60 +603,68 @@ public class CoordinatorEngine extends BaseEngine {
     }
 
     // Parses the filter string (e.g status=RUNNING;status=WAITING) and 
returns a list of status values
-    private Map<String, List<String>> parseStatusFilter(String filter) throws 
CoordinatorEngineException {
-        Map<String, List<String>> filterMap = new HashMap<String, 
List<String>>();
+    public Map<Pair<String, FILTER_COMPARATORS>, List<Object>> 
parseJobFilter(String filter) throws
+        CoordinatorEngineException {
+        Map<Pair<String, FILTER_COMPARATORS>, List<Object>> filterMap = new 
HashMap<Pair<String,
+            FILTER_COMPARATORS>, List<Object>>();
         if (filter != null) {
-            //split name;value pairs
+            //split name value pairs
             StringTokenizer st = new StringTokenizer(filter, ";");
             while (st.hasMoreTokens()) {
-                String token = st.nextToken();
-                if (token.contains("=")) {
-                    boolean negative = false;
-                    String[] pair = null;
-                    if(token.contains("!=")) {
-                        negative = true;
-                        pair = token.split("!=");
-                    }else {
-                        pair = token.split("=");
-                    }
-                    if (pair.length != 2) {
-                        throw new CoordinatorEngineException(ErrorCode.E0421, 
token,
-                                "elements must be name=value or name!=value 
pairs");
-                    }
-                    if (pair[0].equalsIgnoreCase("status")) {
-                        String statusValue = pair[1];
-                        try {
-                            CoordinatorAction.Status.valueOf(statusValue);
-                        } catch (IllegalArgumentException ex) {
-                            StringBuilder validStatusList = new 
StringBuilder();
-                            for (CoordinatorAction.Status status : 
CoordinatorAction.Status.values()){
-                                validStatusList.append(status.toString()+" ");
+                String token = st.nextToken().trim();
+                Pair<String, FILTER_COMPARATORS> pair = null;
+                for (FILTER_COMPARATORS comp : FILTER_COMPARATORS.values()) {
+                    if (token.contains(comp.getSign())) {
+                        int index = token.indexOf(comp.getSign());
+                        String key = token.substring(0, index);
+                        String valueStr = token.substring(index + 
comp.getSign().length());
+                        Object value;
+
+                        if (key.equalsIgnoreCase(OozieClient.FILTER_STATUS)) {
+                            value = valueStr.toUpperCase();
+                            try {
+                                CoordinatorAction.Status.valueOf((String) 
value);
+                            } catch (IllegalArgumentException ex) {
+                                // Check for incorrect status value
+                                throw new 
CoordinatorEngineException(ErrorCode.E0421, filter,
+                                    XLog.format("invalid status value [{0}]." 
+ " Valid status values are: [{1}]",
+                                        valueStr, 
StringUtils.join(CoordinatorAction.Status.values(), ", ")));
                             }
-                            // Check for incorrect status value
-                            throw new 
CoordinatorEngineException(ErrorCode.E0421, filter, XLog.format(
-                                "invalid status value [{0}]." + " Valid status 
values are: [{1}]", statusValue, validStatusList));
-                        }
-                        String filterType = negative ? NEGATIVE_FILTER : 
POSITIVE_FILTER;
-                        String oppositeFilterType = negative ? POSITIVE_FILTER 
: NEGATIVE_FILTER;
-                        List<String> filterList = filterMap.get(filterType);
-                        if (filterList == null) {
-                            filterList = new ArrayList<String>();
-                            filterMap.put(filterType, filterList);
+
+                            if (!(comp == FILTER_COMPARATORS.EQUALS || comp == 
FILTER_COMPARATORS.NOT_EQUALS)) {
+                                throw new 
CoordinatorEngineException(ErrorCode.E0421, filter,
+                                    XLog.format("invalid comparator [{0}] for 
status." + " Valid are = and !=",
+                                        comp.getSign()));
+                            }
+
+                            pair = Pair.of(OozieClient.FILTER_STATUS, comp);
+                        } else if 
(key.equalsIgnoreCase(OozieClient.FILTER_NOMINAL_TIME)) {
+                            try {
+                                value = new 
Timestamp(DateUtils.parseDateUTC(valueStr).getTime());
+                            } catch (ParseException e) {
+                                throw new 
CoordinatorEngineException(ErrorCode.E0421, filter,
+                                    XLog.format("invalid nominal time [{0}]." 
+ " Valid format: " +
+                                            "[{1}]", valueStr, 
DateUtils.ISO8601_UTC_MASK));
+                            }
+                            pair = Pair.of(OozieClient.FILTER_NOMINAL_TIME, 
comp);
+                        } else {
+                            // Check for incorrect filter option
+                            throw new 
CoordinatorEngineException(ErrorCode.E0421, filter,
+                                XLog.format("invalid filter [{0}]." + " Valid 
filters [{1}]", key, StringUtils.join
+                                    (VALID_JOB_FILTERS, ", ")));
                         }
-                        List<String> oFilterList = 
filterMap.get(oppositeFilterType);
-                        if (oFilterList != null && 
oFilterList.contains(statusValue)) {
-                            throw new 
CoordinatorEngineException(ErrorCode.E0421, filter, XLog.format(
-                                    "the status [{0}] specified in both 
positive and negative filters", statusValue));
+                        if (!filterMap.containsKey(pair)) {
+                            filterMap.put(pair, new ArrayList<Object>());
                         }
-                        filterList.add(statusValue);
-                    } else {
-                        // Check for incorrect filter option
-                        throw new CoordinatorEngineException(ErrorCode.E0421, 
filter, XLog.format(
-                                "invalid filter [{0}]." + " The only valid 
filter is \"status\"", pair[0]));
+                        filterMap.get(pair).add(value);
+                        break;
                     }
-                } else {
-                    throw new CoordinatorEngineException(ErrorCode.E0421, 
token,
-                             "elements must be name=value or name!=value 
pairs");
+                }
+
+                if (pair == null) {
+                    //token doesn't contain comparator
+                    throw new CoordinatorEngineException(ErrorCode.E0421, 
filter,
+                        "filter should be of format <key><comparator><value> 
pairs");
                 }
             }
         }
@@ -652,7 +677,7 @@ public class CoordinatorEngine extends BaseEngine {
      * @throws CoordinatorEngineException
      */
     @VisibleForTesting
-    Map<String, List<String>> parseFilter(String filter) throws 
CoordinatorEngineException {
+    Map<String, List<String>> parseJobsFilter(String filter) throws 
CoordinatorEngineException {
         Map<String, List<String>> map = new HashMap<String, List<String>>();
         boolean isTimeUnitSpecified = false;
         String timeUnit = "MINUTE";

http://git-wip-us.apache.org/repos/asf/oozie/blob/d5cd976a/core/src/main/java/org/apache/oozie/command/coord/CoordJobXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/command/coord/CoordJobXCommand.java 
b/core/src/main/java/org/apache/oozie/command/coord/CoordJobXCommand.java
index 4d5ba71..51b09a2 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordJobXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordJobXCommand.java
@@ -18,11 +18,8 @@
 
 package org.apache.oozie.command.coord;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorEngine.FILTER_COMPARATORS;
 import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.XException;
@@ -33,8 +30,13 @@ import 
org.apache.oozie.executor.jpa.CoordJobGetActionsSubsetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
+import org.apache.oozie.util.Pair;
 import org.apache.oozie.util.ParamChecker;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
 /**
  * Command for loading a coordinator job information
  */
@@ -44,7 +46,7 @@ public class CoordJobXCommand extends 
CoordinatorXCommand<CoordinatorJobBean> {
     private int offset = 1;
     private int len = Integer.MAX_VALUE;
     private boolean desc = false;
-    private Map<String, List<String>> filterMap;
+    private Map<Pair<String, FILTER_COMPARATORS>, List<Object>> filterMap;
 
     /**
      * Constructor for loading a coordinator job information
@@ -57,14 +59,14 @@ public class CoordJobXCommand extends 
CoordinatorXCommand<CoordinatorJobBean> {
 
     /**
      * Constructor for loading a coordinator job information
-     *
      * @param id coord jobId
+     * @param filterMap
      * @param offset starting index in the list of actions belonging to the job
      * @param length number of actions to be returned
-     * @param filterMap
      * @param desc boolean for whether the actions returned are in descending 
order
      */
-    public CoordJobXCommand(String id, Map<String, List<String>> filterMap, 
int offset, int length, boolean desc) {
+    public CoordJobXCommand(String id, Map<Pair<String, FILTER_COMPARATORS>, 
List<Object>> filterMap, int offset,
+        int length, boolean desc) {
         super("job.info", "job.info", 1);
         this.id = ParamChecker.notEmpty(id, "id");
         this.getActionInfo = true;

http://git-wip-us.apache.org/repos/asf/oozie/blob/d5cd976a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsSubsetJPAExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsSubsetJPAExecutor.java
 
b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsSubsetJPAExecutor.java
index cdd5000..ffe91ec 100644
--- 
a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsSubsetJPAExecutor.java
+++ 
b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsSubsetJPAExecutor.java
@@ -18,23 +18,26 @@
 
 package org.apache.oozie.executor.jpa;
 
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import javax.persistence.EntityManager;
-import javax.persistence.Query;
-
 import org.apache.oozie.CoordinatorActionBean;
-import org.apache.oozie.CoordinatorEngine;
+import org.apache.oozie.CoordinatorEngine.FILTER_COMPARATORS;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.StringBlob;
 import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.Pair;
 import org.apache.oozie.util.ParamChecker;
 
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
 /**
  * Load coordinator actions by offset and len (a subset) for a coordinator job.
  */
@@ -44,14 +47,14 @@ public class CoordJobGetActionsSubsetJPAExecutor implements 
JPAExecutor<List<Coo
     private int offset = 1;
     private int len = 50;
     private boolean desc = false;
-    private Map<String,List<String>> filterMap;
+    private Map<Pair<String, FILTER_COMPARATORS>, List<Object>> filterMap;
 
     public CoordJobGetActionsSubsetJPAExecutor(String coordJobId) {
         ParamChecker.notNull(coordJobId, "coordJobId");
         this.coordJobId = coordJobId;
     }
 
-    public CoordJobGetActionsSubsetJPAExecutor(String coordJobId, Map<String, 
List<String>> filterMap,
+    public CoordJobGetActionsSubsetJPAExecutor(String coordJobId, 
Map<Pair<String, FILTER_COMPARATORS>, List<Object>> filterMap,
             int offset, int len, boolean desc) {
         this(coordJobId);
         this.filterMap = filterMap;
@@ -93,6 +96,7 @@ public class CoordJobGetActionsSubsetJPAExecutor implements 
JPAExecutor<List<Coo
     }
 
     private Query setQueryParameters(Query q, EntityManager em){
+        Map<String, Object> params = null;
         if (filterMap != null) {
             // Add the filter clause
             String query = q.toString();
@@ -100,8 +104,7 @@ public class CoordJobGetActionsSubsetJPAExecutor implements 
JPAExecutor<List<Coo
             int offset = query.lastIndexOf("order");
             // Get the 'where' clause for status filters
             StringBuilder statusClause = new StringBuilder();
-            getStatusClause(statusClause, 
filterMap.get(CoordinatorEngine.POSITIVE_FILTER), true);
-            getStatusClause(statusClause, 
filterMap.get(CoordinatorEngine.NEGATIVE_FILTER), false);
+            params = getWhereClause(statusClause, filterMap);
             // Insert 'where' before 'order by'
             sbTotal.insert(offset, statusClause);
             q = em.createQuery(sbTotal.toString());
@@ -109,6 +112,11 @@ public class CoordJobGetActionsSubsetJPAExecutor 
implements JPAExecutor<List<Coo
         if (desc) {
             q = em.createQuery(q.toString().concat(" desc"));
         }
+        if (params != null) {
+            for (String pname : params.keySet()) {
+                q.setParameter(pname, params.get(pname));
+            }
+        }
         q.setParameter("jobId", coordJobId);
         q.setFirstResult(offset - 1);
         q.setMaxResults(len);
@@ -116,29 +124,68 @@ public class CoordJobGetActionsSubsetJPAExecutor 
implements JPAExecutor<List<Coo
     }
 
     // Form the where clause to filter by status values
-    private StringBuilder getStatusClause(StringBuilder sb, List<String> 
filterList, boolean positive) {
-        if (sb == null) {
-            sb = new StringBuilder();
-        }
-        boolean isStatus = false;
-        if (filterList != null && filterList.size() > 0) {
-            for (String statusVal : filterList) {
-                if (!isStatus) {
-                    if (positive) {
-                        sb.append(" and a.statusStr IN 
(\'").append(statusVal).append("\'");
-                    }
-                    else {
-                        sb.append(" and a.statusStr NOT IN 
(\'").append(statusVal).append("\'");
-                    }
-                    isStatus = true;
-                }
-                else {
-                    sb.append(",\'").append(statusVal).append("\'");
+    private Map<String, Object> getWhereClause(StringBuilder sb, 
Map<Pair<String, FILTER_COMPARATORS>,
+        List<Object>> filterMap) {
+        Map<String, Object> params = new HashMap<String, Object>();
+        int pcnt= 1;
+        for (Entry<Pair<String, FILTER_COMPARATORS>, List<Object>> filter : 
filterMap.entrySet()) {
+            String field = filter.getKey().getFist();
+            FILTER_COMPARATORS comp = filter.getKey().getSecond();
+            String sqlField;
+            if (field.equals(OozieClient.FILTER_STATUS)) {
+                sqlField = "a.statusStr";
+            } else if (field.equals(OozieClient.FILTER_NOMINAL_TIME)) {
+                sqlField = "a.nominalTimestamp";
+            } else {
+                throw new IllegalArgumentException("Invalid filter key " + 
field);
+            }
+
+            sb.append(" and ").append(sqlField).append(" ");
+            switch (comp) {
+            case EQUALS:
+                sb.append("IN (");
+                params.putAll(appendParams(sb, filter.getValue(), pcnt));
+                sb.append(")");
+                break;
+
+            case NOT_EQUALS:
+                sb.append("NOT IN (");
+                params.putAll(appendParams(sb, filter.getValue(), pcnt));
+                sb.append(")");
+                break;
+
+            case GREATER:
+            case GREATER_EQUAL:
+            case LESSTHAN:
+            case LESSTHAN_EQUAL:
+                if (filter.getValue().size() != 1) {
+                    throw new IllegalArgumentException(field + comp.getSign() 
+ " can't have more than 1 values");
                 }
+
+                sb.append(comp.getSign()).append(" ");
+                params.putAll(appendParams(sb, filter.getValue(), pcnt));
+                break;
+            }
+
+            pcnt += filter.getValue().size();
+        }
+        sb.append(" ");
+        return params;
+    }
+
+    private Map<String, Object> appendParams(StringBuilder sb, List<Object> 
value, int sindex) {
+        Map<String, Object> params = new HashMap<String, Object>();
+        boolean first = true;
+        for (Object val : value) {
+            String pname = "p" + sindex++;
+            params.put(pname, val);
+            if (!first) {
+                sb.append(", ");
             }
-            sb.append(") ");
+            sb.append(':').append(pname);
+            first = false;
         }
-        return sb;
+        return params;
     }
 
     private CoordinatorActionBean getBeanForRunningCoordAction(Object arr[]) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/d5cd976a/core/src/main/java/org/apache/oozie/util/Pair.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/Pair.java 
b/core/src/main/java/org/apache/oozie/util/Pair.java
new file mode 100644
index 0000000..1bf45b4
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/util/Pair.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.util;
+
+import org.apache.commons.lang.ObjectUtils;
+
+/**
+ * Utility class for holding a pair of data
+ * @param <T> type of first data
+ * @param <S> type of second data
+ */
+public class Pair<T, S> {
+    private T first;
+    private S second;
+
+    public Pair(T first, S second) {
+        this.first = first;
+        this.second = second;
+    }
+
+
+    public static <T, S> Pair<T, S> of(T first, S second) {
+        return new Pair(first, second);
+    }
+
+    public T getFist() {
+        return first;
+    }
+
+    public S getSecond() {
+        return second;
+    }
+
+    public int hashCode()
+    {
+        return (first == null ? 1 : first.hashCode()) * 17 + (second == null ? 
1 : second.hashCode()) * 19;
+    }
+
+    public boolean equals(Object other)
+    {
+        if (other == null) {
+            return false;
+        }
+
+        if (!(other instanceof Pair)) {
+            return false;
+        }
+
+        Pair otherPair = (Pair)other;
+        return (ObjectUtils.equals(first, otherPair.first) && 
ObjectUtils.equals(second, otherPair.second));
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/oozie/blob/d5cd976a/core/src/test/java/org/apache/oozie/TestCoordinatorEngine.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/TestCoordinatorEngine.java 
b/core/src/test/java/org/apache/oozie/TestCoordinatorEngine.java
index 93c9235..8b5bee0 100644
--- a/core/src/test/java/org/apache/oozie/TestCoordinatorEngine.java
+++ b/core/src/test/java/org/apache/oozie/TestCoordinatorEngine.java
@@ -18,13 +18,7 @@
 
 package org.apache.oozie;
 
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.net.URI;
-import java.util.List;
-
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.CoordinatorJob;
@@ -36,6 +30,13 @@ import org.apache.oozie.store.StoreException;
 import org.apache.oozie.test.XTestCase;
 import org.apache.oozie.util.XConfiguration;
 
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.util.List;
+
 public class TestCoordinatorEngine extends XTestCase {
     private Services services;
 
@@ -482,8 +483,8 @@ public class TestCoordinatorEngine extends XTestCase {
         }
         catch (CoordinatorEngineException ex) {
             assertEquals(ErrorCode.E0421, ex.getErrorCode());
-            assertEquals("E0421: Invalid job filter [statusRUNNING], elements 
must be name=value or name!=value pairs",
-                    ex.getMessage());
+            assertEquals("E0421: Invalid job filter [statusRUNNING], " +
+                    "filter should be of format <key><comparator><value> 
pairs", ex.getMessage());
         }
 
         //Check for missing value after "="
@@ -492,7 +493,8 @@ public class TestCoordinatorEngine extends XTestCase {
         }
         catch (CoordinatorEngineException ex) {
             assertEquals(ErrorCode.E0421, ex.getErrorCode());
-            assertEquals("E0421: Invalid job filter [status=], elements must 
be name=value or name!=value pairs", ex.getMessage());
+            assertEquals("E0421: Invalid job filter [status=], invalid status 
value []. Valid status values are: ["
+                + StringUtils.join(CoordinatorAction.Status.values(), ", ") + 
"]", ex.getMessage());
         }
 
         // Check for invalid status value
@@ -502,8 +504,8 @@ public class TestCoordinatorEngine extends XTestCase {
         catch (CoordinatorEngineException ex) {
             assertEquals(ErrorCode.E0421, ex.getErrorCode());
             assertEquals("E0421: Invalid job filter [status=blahblah], invalid 
status value [blahblah]."
-                    + " Valid status values are: [WAITING READY SUBMITTED 
RUNNING SUSPENDED TIMEDOUT "
-                    + "SUCCEEDED KILLED FAILED IGNORED SKIPPED ]", 
ex.getMessage());
+                + " Valid status values are: ["
+                + StringUtils.join(CoordinatorAction.Status.values(), ", ") + 
"]", ex.getMessage());
         }
 
         // Check for empty status value
@@ -512,9 +514,9 @@ public class TestCoordinatorEngine extends XTestCase {
         }
         catch (CoordinatorEngineException ex) {
             assertEquals(ErrorCode.E0421, ex.getErrorCode());
-            assertEquals("E0421: Invalid job filter [status=\"\"], invalid 
status value [\"\"]. "
-                    + "Valid status values are: [WAITING READY SUBMITTED 
RUNNING SUSPENDED TIMEDOUT "
-                    + "SUCCEEDED KILLED FAILED IGNORED SKIPPED ]", 
ex.getMessage());
+            assertEquals("E0421: Invalid job filter [status=\"\"], invalid 
status value [\"\"]."
+                + " Valid status values are: ["
+                + StringUtils.join(CoordinatorAction.Status.values(), ", ") + 
"]", ex.getMessage());
         }
 
         // Check for invalid filter option
@@ -523,7 +525,8 @@ public class TestCoordinatorEngine extends XTestCase {
         }
         catch (CoordinatorEngineException ex) {
             assertEquals(ErrorCode.E0421, ex.getErrorCode());
-            assertEquals("E0421: Invalid job filter [blahblah=blahblah], 
invalid filter [blahblah]. The only valid filter is \"status\"", 
ex.getMessage());
+            assertEquals("E0421: Invalid job filter [blahblah=blahblah], 
invalid filter [blahblah]. " +
+                "Valid filters [" + 
StringUtils.join(CoordinatorEngine.VALID_JOB_FILTERS, ", ") + "]", 
ex.getMessage());
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/d5cd976a/core/src/test/java/org/apache/oozie/TestCoordinatorEngineSimple.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/TestCoordinatorEngineSimple.java 
b/core/src/test/java/org/apache/oozie/TestCoordinatorEngineSimple.java
index 824eb80..4a12e42 100644
--- a/core/src/test/java/org/apache/oozie/TestCoordinatorEngineSimple.java
+++ b/core/src/test/java/org/apache/oozie/TestCoordinatorEngineSimple.java
@@ -18,12 +18,18 @@
 
 package org.apache.oozie;
 
-import java.util.List;
-import java.util.Map;
-
+import org.apache.oozie.CoordinatorEngine.FILTER_COMPARATORS;
+import org.apache.oozie.client.CoordinatorAction.Status;
+import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.test.XTestCase;
-import org.junit.Test;
+import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.Pair;
+
+import java.sql.Timestamp;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
 
 public class TestCoordinatorEngineSimple extends XTestCase {
 
@@ -42,23 +48,107 @@ public class TestCoordinatorEngineSimple extends XTestCase 
{
         super.tearDown();
     }
 
-    @Test
+    public void testParseJobFilter() throws CoordinatorEngineException {
+        final CoordinatorEngine ce = new CoordinatorEngine();
+
+        //valid status filter
+        Map<Pair<String, FILTER_COMPARATORS>, List<Object>> map = 
ce.parseJobFilter("staTus=succeeded; status=waiTing");
+        assertNotNull(map);
+        assertEquals(1, map.size());
+        Pair<String, FILTER_COMPARATORS> key = map.keySet().iterator().next();
+        assertNotNull(key);
+        assertEquals(OozieClient.FILTER_STATUS, key.getFist());
+        assertEquals(FILTER_COMPARATORS.EQUALS, key.getSecond());
+        List<Object> list = map.get(key);
+        assertNotNull(list);
+        assertEquals(2, list.size());
+        assertEquals(Status.SUCCEEDED.name(), (String) list.get(0));
+        assertEquals(Status.WAITING.name(), (String)list.get(1));
+
+        //valid nominal time filter
+        map = ce.parseJobFilter("nominaltime>=2013-05-01T10:00Z");
+        assertNotNull(map);
+        assertEquals(1, map.size());
+        key = map.keySet().iterator().next();
+        assertNotNull(key);
+        assertEquals(OozieClient.FILTER_NOMINAL_TIME, key.getFist());
+        assertEquals(FILTER_COMPARATORS.GREATER_EQUAL, key.getSecond());
+        list = map.get(key);
+        assertNotNull(list);
+        assertEquals(1, list.size());
+        assertEquals("2013-05-01T10:00Z", DateUtils.formatDateOozieTZ(new 
Date(((Timestamp) list.get(0)).getTime())));
+
+        //invalid format
+        try {
+            ce.parseJobFilter("winniethepooh");
+            fail("CoordinatorEngineException expected.");
+        }
+        catch (CoordinatorEngineException bee) {
+            assertEquals(ErrorCode.E0421, bee.getErrorCode());
+        }
+
+        //invalid key
+        try {
+            ce.parseJobFilter("stat=some");
+            fail("CoordinatorEngineException expected.");
+        }
+        catch (CoordinatorEngineException bee) {
+            assertEquals(ErrorCode.E0421, bee.getErrorCode());
+        }
+
+        //invalid status value
+        try {
+            ce.parseJobFilter("status=some");
+            fail("CoordinatorEngineException expected.");
+        }
+        catch (CoordinatorEngineException bee) {
+            assertEquals(ErrorCode.E0421, bee.getErrorCode());
+        }
+
+        //invalid comparator for status
+        try {
+            ce.parseJobFilter("status>=some");
+            fail("CoordinatorEngineException expected.");
+        }
+        catch (CoordinatorEngineException bee) {
+            assertEquals(ErrorCode.E0421, bee.getErrorCode());
+        }
+
+        //invalid nominal time value
+        try {
+            ce.parseJobFilter("nominaltime=2013-13-01T00:00Z");
+            fail("CoordinatorEngineException expected.");
+        }
+        catch (CoordinatorEngineException bee) {
+            assertEquals(ErrorCode.E0421, bee.getErrorCode());
+        }
+
+        //invalid comparator
+        try {
+            ce.parseJobFilter("nominaltime*2013-13-01T00:00Z");
+            fail("CoordinatorEngineException expected.");
+        }
+        catch (CoordinatorEngineException bee) {
+            assertEquals(ErrorCode.E0421, bee.getErrorCode());
+        }
+    }
+
     public void testParseFilterNegative() throws CoordinatorEngineException {
         final CoordinatorEngine ce = new CoordinatorEngine();
 
         // null argument:
-        Map<String, List<String>> map = ce.parseFilter(null);
+        Map<String, List<String>> map = ce.parseJobsFilter(null);
         assertNotNull(map);
         assertEquals(0, map.size());
 
         // empty String:
-        map = ce.parseFilter("");
+        map = ce.parseJobsFilter("");
         assertNotNull(map);
         assertEquals(0, map.size());
 
         // no eq sign in token:
         try {
-            ce.parseFilter("winniethepooh");
+            ce.parseJobsFilter("winniethepooh");
             fail("CoordinatorEngineException expected.");
         }
         catch (CoordinatorEngineException bee) {
@@ -66,7 +156,7 @@ public class TestCoordinatorEngineSimple extends XTestCase {
         }
         // incorrect k=v:
         try {
-            map = ce.parseFilter("kk=vv=zz");
+            ce.parseJobsFilter("kk=vv=zz");
             fail("CoordinatorEngineException expected.");
         }
         catch (CoordinatorEngineException cee) {
@@ -74,7 +164,7 @@ public class TestCoordinatorEngineSimple extends XTestCase {
         }
         // unknown key in key=value pair:
         try {
-            ce.parseFilter("foo=moo");
+            ce.parseJobsFilter("foo=moo");
             fail("CoordinatorEngineException expected.");
         }
         catch (CoordinatorEngineException bee) {
@@ -82,7 +172,7 @@ public class TestCoordinatorEngineSimple extends XTestCase {
         }
         // incorrect "status" key value:
         try {
-            ce.parseFilter("status=foo");
+            ce.parseJobsFilter("status=foo");
             fail("CoordinatorEngineException expected.");
         }
         catch (CoordinatorEngineException bee) {
@@ -90,7 +180,7 @@ public class TestCoordinatorEngineSimple extends XTestCase {
         }
         // unparseable "frequency" value:
         try {
-            ce.parseFilter("FreQuency=foo");
+            ce.parseJobsFilter("FreQuency=foo");
             fail("CoordinatorEngineException expected.");
         }
         catch (CoordinatorEngineException bee) {
@@ -98,7 +188,7 @@ public class TestCoordinatorEngineSimple extends XTestCase {
         }
         // unparseable "unit" value:
         try {
-            ce.parseFilter("UniT=foo");
+            ce.parseJobsFilter("UniT=foo");
             fail("CoordinatorEngineException expected.");
         }
         catch (CoordinatorEngineException bee) {
@@ -106,7 +196,7 @@ public class TestCoordinatorEngineSimple extends XTestCase {
         }
         // "unit" specified, but "frequency" is not:
         try {
-            ce.parseFilter("unit=minutes");
+            ce.parseJobsFilter("unit=minutes");
             fail("CoordinatorEngineException expected.");
         }
         catch (CoordinatorEngineException bee) {
@@ -114,11 +204,10 @@ public class TestCoordinatorEngineSimple extends 
XTestCase {
         }
     }
 
-    @Test
     public void testParseFilterPositive() throws CoordinatorEngineException {
         final CoordinatorEngine ce = new CoordinatorEngine();
 
-        Map<String, List<String>> map = 
ce.parseFilter("frequency=5;unit=hours;user=foo;status=FAILED");
+        Map<String, List<String>> map = 
ce.parseJobsFilter("frequency=5;unit=hours;user=foo;status=FAILED");
         assertEquals(4, map.size());
         assertEquals("300", map.get("frequency").get(0));
         assertEquals("MINUTE", map.get("unit").get(0));

http://git-wip-us.apache.org/repos/asf/oozie/blob/d5cd976a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobGetActionsSubsetJPAExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobGetActionsSubsetJPAExecutor.java
 
b/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobGetActionsSubsetJPAExecutor.java
index d3d29aa..9ccd62a 100644
--- 
a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobGetActionsSubsetJPAExecutor.java
+++ 
b/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobGetActionsSubsetJPAExecutor.java
@@ -18,26 +18,34 @@
 
 package org.apache.oozie.executor.jpa;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.hadoop.fs.Path;
 import org.apache.oozie.CoordinatorActionBean;
-import org.apache.oozie.CoordinatorEngine;
+import org.apache.oozie.CoordinatorEngine.FILTER_COMPARATORS;
 import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorAction.Status;
 import org.apache.oozie.client.CoordinatorJob;
-import org.apache.oozie.local.LocalOozie;
+import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.test.XDataTestCase;
 import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.Pair;
+import org.junit.Assert;
+
+import java.sql.Timestamp;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 public class TestCoordJobGetActionsSubsetJPAExecutor extends XDataTestCase {
+    private static final Pair<String, FILTER_COMPARATORS> 
POSITIVE_STATUS_FILTER =
+        Pair.of(OozieClient.FILTER_STATUS, FILTER_COMPARATORS.EQUALS);
+    private static final Pair<String, FILTER_COMPARATORS> 
NEGATIVE_STATUS_FILTER =
+        Pair.of(OozieClient.FILTER_STATUS, FILTER_COMPARATORS.NOT_EQUALS);
     Services services;
 
     @Override
@@ -53,6 +61,140 @@ public class TestCoordJobGetActionsSubsetJPAExecutor 
extends XDataTestCase {
         super.tearDown();
     }
 
+    private Timestamp getSqlTime(String str) throws ParseException {
+        return new Timestamp(DateUtils.parseDateUTC(str).getTime());
+    }
+
+    private List<Object> getList(Object... objs) {
+        List<Object> list = new ArrayList<Object>();
+        for (Object o : objs) {
+            list.add(o);
+        }
+        return list;
+    }
+
+    public void testGetActionsWithNominalTimeFilter() throws Exception {
+        CoordinatorJobBean job = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+        String timeStr[] = {"2009-02-01T00:00Z", "2009-02-01T05:00Z", 
"2009-02-01T10:00Z", "2009-02-01T15:00Z"};
+        Date ntime[] = {getSqlTime(timeStr[0]), getSqlTime(timeStr[1]), 
getSqlTime(timeStr[2]), getSqlTime(timeStr[3])};
+
+        List<String> actionIds = new ArrayList<String>(timeStr.length);
+        int startAction = 5;
+        for (Date time : ntime) {
+            CoordinatorActionBean action =
+                addRecordToCoordActionTable(job.getId(), startAction++, 
Status.WAITING, "coord-action-get.xml", 0,
+                    time);
+            actionIds.add(action.getId());
+        }
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+
+        //Filter with nominalTime >=, asc
+        Map<Pair<String, FILTER_COMPARATORS>, List<Object>> filterMap = new 
HashMap<Pair<String, FILTER_COMPARATORS>,
+            List<Object>>();
+        filterMap.put(
+            Pair.of(OozieClient.FILTER_NOMINAL_TIME, 
FILTER_COMPARATORS.GREATER_EQUAL), getList(ntime[2]));
+        CoordJobGetActionsSubsetJPAExecutor actionGetCmd = new 
CoordJobGetActionsSubsetJPAExecutor(job.getId(),
+            filterMap, 1, 10, false);
+        List<CoordinatorActionBean> actions = jpaService.execute(actionGetCmd);
+        Assert.assertEquals(2, actions.size());
+        Assert.assertEquals(actionIds.get(2), actions.get(0).getId());
+        Assert.assertEquals(actionIds.get(3), actions.get(1).getId());
+
+        //Filter with nominalTime >=, desc
+        actionGetCmd = new CoordJobGetActionsSubsetJPAExecutor(job.getId(), 
filterMap, 1, 10, true);
+        actions = jpaService.execute(actionGetCmd);
+        Assert.assertEquals(2, actions.size());
+        Assert.assertEquals(actionIds.get(3), actions.get(0).getId());
+        Assert.assertEquals(actionIds.get(2), actions.get(1).getId());
+
+        //Filter with nominalTime <, asc
+        filterMap.clear();
+        filterMap.put(
+            Pair.of(OozieClient.FILTER_NOMINAL_TIME, 
FILTER_COMPARATORS.LESSTHAN), getList(ntime[2]));
+        actionGetCmd = new CoordJobGetActionsSubsetJPAExecutor(job.getId(), 
filterMap, 1, 10, false);
+        actions = jpaService.execute(actionGetCmd);
+        Assert.assertEquals(2, actions.size());
+        Assert.assertEquals(actionIds.get(0), actions.get(0).getId());
+        Assert.assertEquals(actionIds.get(1), actions.get(1).getId());
+
+        //Filter with nominalTime <, desc
+        actionGetCmd = new CoordJobGetActionsSubsetJPAExecutor(job.getId(), 
filterMap, 1, 10, true);
+        actions = jpaService.execute(actionGetCmd);
+        Assert.assertEquals(2, actions.size());
+        Assert.assertEquals(actionIds.get(1), actions.get(0).getId());
+        Assert.assertEquals(actionIds.get(0), actions.get(1).getId());
+
+        //Filter with nominalTime >=, nominalTime <, asc
+        filterMap.put(
+           Pair.of(OozieClient.FILTER_NOMINAL_TIME, 
FILTER_COMPARATORS.GREATER_EQUAL), getList(ntime[1]));
+        filterMap.put(
+            Pair.of(OozieClient.FILTER_NOMINAL_TIME, 
FILTER_COMPARATORS.LESSTHAN), getList(ntime[3]));
+        actionGetCmd = new CoordJobGetActionsSubsetJPAExecutor(job.getId(), 
filterMap, 1, 10, false);
+        actions = jpaService.execute(actionGetCmd);
+        Assert.assertEquals(2, actions.size());
+        Assert.assertEquals(actionIds.get(1), actions.get(0).getId());
+        Assert.assertEquals(actionIds.get(2), actions.get(1).getId());
+
+        //Filter with nominalTime >=, nominalTime <, desc
+        actionGetCmd = new CoordJobGetActionsSubsetJPAExecutor(job.getId(), 
filterMap, 1, 10, true);
+        actions = jpaService.execute(actionGetCmd);
+        Assert.assertEquals(2, actions.size());
+        Assert.assertEquals(actionIds.get(2), actions.get(0).getId());
+        Assert.assertEquals(actionIds.get(1), actions.get(1).getId());
+
+        //Filter with nominalTime >=, nominalTime <, desc, offset
+        actionGetCmd = new CoordJobGetActionsSubsetJPAExecutor(job.getId(), 
filterMap, 2, 10, true);
+        actions = jpaService.execute(actionGetCmd);
+        Assert.assertEquals(1, actions.size());
+        Assert.assertEquals(actionIds.get(1), actions.get(0).getId());
+
+        //Filter with nominalTime >=, nominalTime <, asc, offset
+        actionGetCmd = new CoordJobGetActionsSubsetJPAExecutor(job.getId(), 
filterMap, 2, 10, false);
+        actions = jpaService.execute(actionGetCmd);
+        Assert.assertEquals(1, actions.size());
+        Assert.assertEquals(actionIds.get(2), actions.get(0).getId());
+
+        //Filter with nominalTime >=, nominalTime <, desc, len
+        actionGetCmd = new CoordJobGetActionsSubsetJPAExecutor(job.getId(), 
filterMap, 1, 2, true);
+        actions = jpaService.execute(actionGetCmd);
+        Assert.assertEquals(2, actions.size());
+        Assert.assertEquals(actionIds.get(2), actions.get(0).getId());
+        Assert.assertEquals(actionIds.get(1), actions.get(1).getId());
+
+        //Filter with nominalTime >=, nominalTime <, asc, len
+        actionGetCmd = new CoordJobGetActionsSubsetJPAExecutor(job.getId(), 
filterMap, 1, 2, false);
+        actions = jpaService.execute(actionGetCmd);
+        Assert.assertEquals(2, actions.size());
+        Assert.assertEquals(actionIds.get(1), actions.get(0).getId());
+        Assert.assertEquals(actionIds.get(2), actions.get(1).getId());
+
+        //Filter with nominalTime >=, nominalTime <, asc, offset, len
+        filterMap.put(
+            Pair.of(OozieClient.FILTER_NOMINAL_TIME, 
FILTER_COMPARATORS.LESSTHAN),
+            getList(getSqlTime("2009-02-01T23:00Z")));
+        actionGetCmd = new CoordJobGetActionsSubsetJPAExecutor(job.getId(), 
filterMap, 2, 2, false);
+        actions = jpaService.execute(actionGetCmd);
+        Assert.assertEquals(2, actions.size());
+        Assert.assertEquals(actionIds.get(2), actions.get(0).getId());
+        Assert.assertEquals(actionIds.get(3), actions.get(1).getId());
+
+        //Filter with nominalTime >=, nominalTime <, desc, offset, len
+        actionGetCmd = new CoordJobGetActionsSubsetJPAExecutor(job.getId(), 
filterMap, 2, 2, true);
+        actions = jpaService.execute(actionGetCmd);
+        Assert.assertEquals(2, actions.size());
+        Assert.assertEquals(actionIds.get(2), actions.get(0).getId());
+        Assert.assertEquals(actionIds.get(1), actions.get(1).getId());
+
+        //Filter with nominalTime and status
+        filterMap.put(
+            Pair.of(OozieClient.FILTER_STATUS, FILTER_COMPARATORS.EQUALS), 
getList(Status.SUCCEEDED.name()));
+        actionGetCmd = new CoordJobGetActionsSubsetJPAExecutor(job.getId(), 
filterMap, 1, 10, true);
+        actions = jpaService.execute(actionGetCmd);
+        Assert.assertEquals(0, actions.size());
+
+    }
+
     public void testCoordActionGet() throws Exception {
         int actionNum = 1;
         String resourceXmlName = "coord-action-get.xml";
@@ -116,10 +258,10 @@ public class TestCoordJobGetActionsSubsetJPAExecutor 
extends XDataTestCase {
     public void testCoordActionOrderBy() throws Exception {
         CoordinatorJobBean job = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
         // Add Coordinator action with nominal time: 2009-12-15T01:00Z
-        CoordinatorActionBean action = 
addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.WAITING,
+        addRecordToCoordActionTable(job.getId(), 1, 
CoordinatorAction.Status.WAITING,
                 "coord-action-get.xml", 0);
         // Add Coordinator action with nominal time: 2009-02-01T23:59Z
-        CoordinatorActionBean action1 = 
addRecordToCoordActionTable(job.getId(), 2, CoordinatorAction.Status.WAITING,
+        addRecordToCoordActionTable(job.getId(), 2, 
CoordinatorAction.Status.WAITING,
                 "coord-action-for-action-input-check.xml", 0);
         // test for the expected action number
         List<CoordinatorActionBean> actions = 
_testGetActionsSubsetOrderBy(job.getId(), 1, 2, false);
@@ -133,10 +275,10 @@ public class TestCoordJobGetActionsSubsetJPAExecutor 
extends XDataTestCase {
     public void testCoordActionOrderByDesc() throws Exception {
         CoordinatorJobBean job = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
         // Add Coordinator action with nominal time: 2009-12-15T01:00Z
-        CoordinatorActionBean action = 
addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.WAITING,
+        addRecordToCoordActionTable(job.getId(), 1, 
CoordinatorAction.Status.WAITING,
                 "coord-action-get.xml", 0);
         // Add Coordinator action with nominal time: 2009-02-01T23:59Z
-        CoordinatorActionBean action1 = 
addRecordToCoordActionTable(job.getId(), 2, CoordinatorAction.Status.WAITING,
+        addRecordToCoordActionTable(job.getId(), 2, 
CoordinatorAction.Status.WAITING,
                 "coord-action-for-action-input-check.xml", 0);
         // test for the expected action number
         List<CoordinatorActionBean> actions = 
_testGetActionsSubsetOrderBy(job.getId(), 1, 2, true);
@@ -152,9 +294,7 @@ public class TestCoordJobGetActionsSubsetJPAExecutor 
extends XDataTestCase {
         assertNotNull(jpaService);
         CoordJobGetActionsSubsetJPAExecutor actionGetCmd = new 
CoordJobGetActionsSubsetJPAExecutor(jobId, null, start,
                 len, order);
-        List<CoordinatorActionBean> actions = jpaService.execute(actionGetCmd);
-        return actions;
-
+        return jpaService.execute(actionGetCmd);
     }
 
     // Check status filters for Coordinator actions
@@ -165,49 +305,50 @@ public class TestCoordJobGetActionsSubsetJPAExecutor 
extends XDataTestCase {
         // Add Coordinator action with nominal time: 2009-02-01T23:59Z
         addRecordToCoordActionTable(job.getId(), 2, 
CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0);
         // Create lists for status filter to test positive filter
-        Map<String, List<String>> filterMap = new HashMap<String, 
List<String>>();
-        List<String> positiveFilter = new ArrayList<String>();
+        Map<Pair<String, FILTER_COMPARATORS>, List<Object>>
+            filterMap = new HashMap<Pair<String, FILTER_COMPARATORS>, 
List<Object>>();
+        List<Object> positiveFilter = new ArrayList<Object>();
         positiveFilter.add("RUNNING");
         positiveFilter.add("KILLED");
-        filterMap.put(CoordinatorEngine.POSITIVE_FILTER, positiveFilter);
+        filterMap.put(POSITIVE_STATUS_FILTER, positiveFilter);
         List<CoordinatorActionBean> actions = 
_testGetActionsSubsetFilter(job.getId(), 1, filterMap, 1, 2);
         assertEquals(actions.size(), 1);
         assertEquals(actions.get(0).getActionNumber(), 1);
 
         // Create lists for status filter to test negative filter
         filterMap.clear();
-        List<String> negativeFilter = new ArrayList<String>();
+        List<Object> negativeFilter = new ArrayList<Object>();
         negativeFilter.add("WAITING");
         negativeFilter.add("KILLED");
-        filterMap.put(CoordinatorEngine.NEGATIVE_FILTER, negativeFilter);
+        filterMap.put(NEGATIVE_STATUS_FILTER, negativeFilter);
         actions = _testGetActionsSubsetFilter(job.getId(), 1, filterMap, 1, 2);
         assertEquals(actions.size(), 1);
         assertEquals(actions.get(0).getActionNumber(), 1);
 
         // Test Combination of include/exclude filters - no dup
         filterMap.clear();
-        filterMap.put(CoordinatorEngine.POSITIVE_FILTER, positiveFilter);
-        filterMap.put(CoordinatorEngine.NEGATIVE_FILTER, negativeFilter);
+        filterMap.put(POSITIVE_STATUS_FILTER, positiveFilter);
+        filterMap.put(NEGATIVE_STATUS_FILTER, negativeFilter);
         actions = _testGetActionsSubsetFilter(job.getId(), 1, filterMap, 1, 2);
         assertEquals(actions.size(), 1);
         assertEquals(actions.get(0).getActionNumber(), 1);
 
         // Test Combination of include/exclude filters - dup --> no result
         filterMap.clear();
-        filterMap.put(CoordinatorEngine.POSITIVE_FILTER, positiveFilter);
-        filterMap.put(CoordinatorEngine.NEGATIVE_FILTER, positiveFilter);
+        filterMap.put(POSITIVE_STATUS_FILTER, positiveFilter);
+        filterMap.put(NEGATIVE_STATUS_FILTER, positiveFilter);
         actions = _testGetActionsSubsetFilter(job.getId(), 1, filterMap, 1, 2);
         assertEquals(actions.size(), 0);
     }
     // Check whether actions are retrieved based on the filter values for 
status
     private List<CoordinatorActionBean> _testGetActionsSubsetFilter(String 
jobId, int actionNum,
-            Map<String, List<String>> filterMap, int start, int len) throws 
JPAExecutorException {
+            Map<Pair<String, FILTER_COMPARATORS>, List<Object>> filterMap, int 
start,
+        int len) throws JPAExecutorException {
         JPAService jpaService = Services.get().get(JPAService.class);
         assertNotNull(jpaService);
         CoordJobGetActionsSubsetJPAExecutor actionGetCmd = new 
CoordJobGetActionsSubsetJPAExecutor(jobId, filterMap, start,
                 len, false);
-        List<CoordinatorActionBean> actions = jpaService.execute(actionGetCmd);
-        return actions;
+        return jpaService.execute(actionGetCmd);
     }
 
     public void testGetActionAllColumns() throws Exception {

http://git-wip-us.apache.org/repos/asf/oozie/blob/d5cd976a/docs/src/site/twiki/DG_CommandLineTool.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/DG_CommandLineTool.twiki 
b/docs/src/site/twiki/DG_CommandLineTool.twiki
index 4a07711..c65acbd 100644
--- a/docs/src/site/twiki/DG_CommandLineTool.twiki
+++ b/docs/src/site/twiki/DG_CommandLineTool.twiki
@@ -40,11 +40,15 @@ usage:
                 -dryrun               Dryrun a workflow (since 3.3.2) or 
coordinator (since 2.0) job without actually executing it
                 -info <arg>           info of a job
                 -kill <arg>           kill a job (coordinator requires -action 
or -date)
-                -len <arg>            number of actions (default TOTAL 
ACTIONS, requires -info)
-                -filter <arg>         status=<S1>[;status=<S2>]* or 
status!=<S1>[;status!=<S2>]*
-                                      All coordinator actions satisfying the 
status filters will be retrieved.
-                                      Positive filters '=' concatenated with 
'OR' and negative filters '!=' with 'AND'.
-                                      Currently, only supported for 
coordinator job.
+                -len <arg>            number of actions to be returned, used 
for pagination(default 1000, requires -info)
+                -filter <arg>         All coordinator actions satisfying the 
filter will be retrieved.
+                                      Filter is of the format 
<key><comparator><value>[;<key><comparator><value>]*
+                                      key: status or nominaltime
+                                      comparator: =, !=, <, <=, >, >=
+                                      value: valid status like SUCCEEDED, 
KILLED, RUNNING etc. Only = and != apply for status
+                                      nominalTime is valid date of the format 
yyyy-MM-dd'T'HH:mm'Z' (like 2014-06-01T00:00Z)
+                                      Filter with '=' is concatenated with 
'OR' and other filters are concatenated with 'AND'.
+                                      Currently, supported only for 
coordinator job.
                 -order <arg>          order to show coordinator actions 
(default ascending order, 'desc' for descending order, requires -info)
                                       Currently, only supported for 
coordinator job.
                 -localtime            use local time (same as passing your 
time zone to -timezone).
@@ -52,7 +56,8 @@ usage:
                 -log <arg>            job log
                 -nocleanup            do not clean up output-events of the 
coordinator rerun actions
                                       (requires -rerun)
-                -offset <arg>         job info offset of actions (default '1', 
requires -info)
+                -offset <arg>         offset of actions returned relative to 
all actions matching the filter criteria,
+                                      used for pagination (default '1', 
requires -info)
                 -oozie <arg>          Oozie URL
                 -refresh              re-materialize the coordinator rerun 
actions (requires -rerun)
                 -rerun <arg>          rerun a job  (coordinator requires 
-action or -date; bundle requires -coordinator or -date)
@@ -189,14 +194,14 @@ If the option is not provided and the environment 
variable is not set, the =oozi
 
 ---+++ Time zone
 
-The <code>-timezone TIME_ZONE_ID</code> option in the =job= and =jobs= 
sub-commands allows you to specify the time zone to use in 
+The <code>-timezone TIME_ZONE_ID</code> option in the =job= and =jobs= 
sub-commands allows you to specify the time zone to use in
 the output of those sub-commands. The <code>TIME_ZONE_ID</code> should be one 
of the standard Java Time Zone IDs.  You can get a
 list of the available time zones with the command =oozie info -timezones=.
 
-If the <code>-localtime</code> option is used, it will cause Oozie to use 
whatever the time zone is of the machine. If 
-both <code>-localtime</code> and <code>-timezone TIME_ZONE_ID</code> are used, 
the <code>-localtime</code> option will override 
-the <code>-timezone TIME_ZONE_ID</code> option.  If neither option is given, 
Oozie will look for the =OOZIE_TIMEZONE= environment 
-variable and uses it if set.  If neither option is given and the environment 
variable is not set, or if Oozie is given an invalid 
+If the <code>-localtime</code> option is used, it will cause Oozie to use 
whatever the time zone is of the machine. If
+both <code>-localtime</code> and <code>-timezone TIME_ZONE_ID</code> are used, 
the <code>-localtime</code> option will override
+the <code>-timezone TIME_ZONE_ID</code> option.  If neither option is given, 
Oozie will look for the =OOZIE_TIMEZONE= environment
+variable and uses it if set.  If neither option is given and the environment 
variable is not set, or if Oozie is given an invalid
 time zone, it will use GMT.
 
 ---+++ Debug Mode
@@ -480,20 +485,28 @@ hadoop1                 map-reduce  OK         end        
 job_200904281535_0254
 The =info= option can display information about a workflow job or coordinator 
job or coordinator action.
 The =info= option for a Coordinator job will retrieve the Coordinator actions 
ordered by nominal time. However, the =info= command may timeout if the number 
of Coordinator actions are very high. In that case, =info= should be used with 
=offset= and =len= option.
 
-The =offset= and =len= option specifies the offset and number of actions to 
display, if checking a workflow job or coordinator job.
+The =offset= and =len= option should be used for pagination. offset determines 
the start offset of the action
+returned among all the actions that matched the filter criteria. len 
determines number of actions to be returned.
 
 The =localtime= option displays times in local time, if not specified times 
are displayed in GMT.
 
-The =verbose= option gives more detailed information for all the actions, if 
checking for workflow job or coordinator job.
-
-The =filter= option can be used to filter coordinator actions based on their 
status.
-The filter option syntax is: <code>[status=VALUE][\;status=VALUE]* or 
[status!=VALUE][\;status!=VALUE]*</code>.
+The =filter= option can be used to filter coordinator actions based on some 
criteria.
+The filter option syntax is: 
<code><key><comparator><value>[;<key><comparator><value>]*</code>.
 (Note escape <code>\</code> needed before semicolon to specify multiple names 
for filter in shell)
-Multiple values must be specified as different name value pairs. 
-When multiple positive filters <code>=</code> are specified, all Coordinator 
actions that satisfy any one of the filters will be retrieved.(The query will 
do an OR among all the positive filter values for the status)
-When multiple negative filters <code>!=</code> are specified, all Coordinator 
actions that satisfy all of the filters will be retrieved.(The query will do an 
AND among all the negative filter values for the status)
+key: status or nominalTime
+comparator: =, !=, <, <=, >, >=
+value: valid status like SUCCEEDED, KILLED, RUNNING etc. Only = and != apply 
for status
+value for nominalTime is valid date of the format yyyy-MM-dd'T'HH:mm'Z' (like 
2014-06-01T00:00Z)
+
+Multiple values must be specified as different name value pairs. The query is 
formed by doing AND of all conditions,
+with the exception of = which uses OR if there are multiple values for the 
same key. For example,
+filter 'status=RUNNING;status=WAITING;nominalTime>=2014-06-01T00:00Z' maps to 
query (status = RUNNING OR status =
+WAITING) AND nominalTime >= 2014-06-01T00:00Z which returns all waiting or 
running actions with nominalTime >=
+2014-06-01T00:00Z.
+
 Currently, the filter option can be used only with an =info= option on 
Coordinator job.
 
+The =verbose= option gives more detailed information for all the actions, if 
checking for workflow job or coordinator job.
 An example below shows how the =verbose= option can be used to gather action 
statistics information for a job:
 
 <verbatim>
@@ -1360,7 +1373,7 @@ must be placed before the "-command" argument.
 
 ---++ Info Operations
 
-The Info sub-command provides a convenient place for Oozie to display misc 
information. 
+The Info sub-command provides a convenient place for Oozie to display misc 
information.
 
 ---+++ Getting a list of time zones
 
@@ -1381,12 +1394,12 @@ Available Time Zones :
       HAST (America/Adak)
       HAST (America/Atka)
       HST (HST)
-      ...      
+      ...
 </verbatim>
 
-The <code>-timezones</code> option will print out a (long) list of all 
available time zones. 
+The <code>-timezones</code> option will print out a (long) list of all 
available time zones.
 
-These IDs (the text in the parentheses) are what should be used for the 
<code>-timezone TIME_ZONE_ID</code> option in the =job= 
+These IDs (the text in the parentheses) are what should be used for the 
<code>-timezone TIME_ZONE_ID</code> option in the =job=
 and =jobs= sub-commands
 
 ---++ Map-reduce Operations

http://git-wip-us.apache.org/repos/asf/oozie/blob/d5cd976a/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 7ce842b..4b33e0a 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.2.0 release (trunk - unreleased)
 
+OOZIE-1950 Coordinator job info should support timestamp (nominal time) 
(shwethags)
 OOZIE-1813 Add service to report/kill rogue bundles and coordinator jobs (puru)
 OOZIE-1847 HA - Oozie servers should shutdown (or go in safe mode) in case of 
ZK failure (puru)
 OOZIE-1957 Coord update command override group when 
oozie.service.AuthorizationService.default.group.as.acl is set and group/acl is 
not configured in job property (puru)

Reply via email to