Repository: oozie Updated Branches: refs/heads/master 1122898e5 -> d5cd976a8
OOZIE-1950 Coordinator job info should support timestamp (nominal time) (shwethags) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/d5cd976a Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/d5cd976a Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/d5cd976a Branch: refs/heads/master Commit: d5cd976a80e17f258050c02bd03891e82820b9ac Parents: 1122898 Author: Shwetha GS <[email protected]> Authored: Thu Sep 11 11:16:34 2014 +0530 Committer: Shwetha GS <[email protected]> Committed: Thu Sep 11 11:16:34 2014 +0530 ---------------------------------------------------------------------- .../org/apache/oozie/client/OozieClient.java | 38 ++-- .../org/apache/oozie/CoordinatorEngine.java | 177 ++++++++++------- .../oozie/command/coord/CoordJobXCommand.java | 18 +- .../CoordJobGetActionsSubsetJPAExecutor.java | 113 +++++++---- .../main/java/org/apache/oozie/util/Pair.java | 69 +++++++ .../org/apache/oozie/TestCoordinatorEngine.java | 35 ++-- .../oozie/TestCoordinatorEngineSimple.java | 121 ++++++++++-- ...TestCoordJobGetActionsSubsetJPAExecutor.java | 197 ++++++++++++++++--- docs/src/site/twiki/DG_CommandLineTool.twiki | 59 +++--- release-log.txt | 1 + 10 files changed, 609 insertions(+), 219 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/d5cd976a/client/src/main/java/org/apache/oozie/client/OozieClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/oozie/client/OozieClient.java b/client/src/main/java/org/apache/oozie/client/OozieClient.java index 363ebd2..d6ff2d0 100644 --- a/client/src/main/java/org/apache/oozie/client/OozieClient.java +++ b/client/src/main/java/org/apache/oozie/client/OozieClient.java @@ -18,6 +18,21 @@ package org.apache.oozie.client; +import org.apache.oozie.BuildInfo; +import org.apache.oozie.client.rest.JsonTags; +import org.apache.oozie.client.rest.JsonToBean; +import org.apache.oozie.client.rest.RestConstants; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.json.simple.JSONValue; +import org.w3c.dom.Document; +import org.w3c.dom.Element; + +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.transform.Transformer; +import javax.xml.transform.TransformerFactory; +import javax.xml.transform.dom.DOMSource; +import javax.xml.transform.stream.StreamResult; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; @@ -38,22 +53,6 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.Callable; -import javax.xml.parsers.DocumentBuilderFactory; -import javax.xml.transform.Transformer; -import javax.xml.transform.TransformerFactory; -import javax.xml.transform.dom.DOMSource; -import javax.xml.transform.stream.StreamResult; - -import org.apache.oozie.BuildInfo; -import org.apache.oozie.client.rest.JsonTags; -import org.apache.oozie.client.rest.JsonToBean; -import org.apache.oozie.client.rest.RestConstants; -import org.json.simple.JSONArray; -import org.json.simple.JSONObject; -import org.json.simple.JSONValue; -import org.w3c.dom.Document; -import org.w3c.dom.Element; - /** * Client API to submit and manage Oozie workflow jobs against an Oozie intance. * <p/> @@ -123,6 +122,8 @@ public class OozieClient { public static final String FILTER_STATUS = "status"; + public static final String FILTER_NOMINAL_TIME = "nominaltime"; + public static final String FILTER_FREQUENCY = "frequency"; public static final String FILTER_ID = "id"; @@ -159,7 +160,7 @@ public class OozieClient { public static enum SYSTEM_MODE { NORMAL, NOWEBSERVICE, SAFEMODE - }; + } /** * debugMode =0 means no debugging. > 0 means debugging on. @@ -172,7 +173,7 @@ public class OozieClient { private JSONArray supportedVersions; private final Map<String, String> headers = new HashMap<String, String>(); - private static ThreadLocal<String> USER_NAME_TL = new ThreadLocal<String>(); + private static final ThreadLocal<String> USER_NAME_TL = new ThreadLocal<String>(); /** * Allows to impersonate other users in the Oozie server. The current user @@ -1047,7 +1048,6 @@ public class OozieClient { * * @param reader reader to read into a string. * @param maxLen max content length allowed, if -1 there is no limit. - * @param ps Printstream of command line interface * @throws IOException */ private void sendToOutputStream(Reader reader, int maxLen) throws IOException { http://git-wip-us.apache.org/repos/asf/oozie/blob/d5cd976a/core/src/main/java/org/apache/oozie/CoordinatorEngine.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/CoordinatorEngine.java b/core/src/main/java/org/apache/oozie/CoordinatorEngine.java index 249ae68..c8bbbf7 100644 --- a/core/src/main/java/org/apache/oozie/CoordinatorEngine.java +++ b/core/src/main/java/org/apache/oozie/CoordinatorEngine.java @@ -18,21 +18,7 @@ package org.apache.oozie; -import java.io.IOException; -import java.io.Writer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.StringTokenizer; - +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.oozie.client.CoordinatorAction; @@ -42,7 +28,6 @@ import org.apache.oozie.client.WorkflowJob; import org.apache.oozie.client.rest.RestConstants; import org.apache.oozie.command.CommandException; import org.apache.oozie.command.coord.CoordActionInfoXCommand; -import org.apache.oozie.util.CoordActionsInDateRange; import org.apache.oozie.command.coord.CoordActionsIgnoreXCommand; import org.apache.oozie.command.coord.CoordActionsKillXCommand; import org.apache.oozie.command.coord.CoordChangeXCommand; @@ -60,26 +45,59 @@ import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; import org.apache.oozie.service.DagXLogInfoService; import org.apache.oozie.service.Services; import org.apache.oozie.service.XLogStreamingService; -import org.apache.oozie.util.XLogFilter; -import org.apache.oozie.util.XLogUserFilterParam; +import org.apache.oozie.util.CoordActionsInDateRange; +import org.apache.oozie.util.DateUtils; +import org.apache.oozie.util.Pair; import org.apache.oozie.util.ParamChecker; import org.apache.oozie.util.XLog; +import org.apache.oozie.util.XLogFilter; +import org.apache.oozie.util.XLogUserFilterParam; -import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.io.Writer; +import java.sql.Timestamp; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.StringTokenizer; public class CoordinatorEngine extends BaseEngine { - private static XLog LOG = XLog.getLog(CoordinatorEngine.class); + private static final XLog LOG = XLog.getLog(CoordinatorEngine.class); public final static String COORD_ACTIONS_LOG_MAX_COUNT = "oozie.coord.actions.log.max.count"; private final static int COORD_ACTIONS_LOG_MAX_COUNT_DEFAULT = 50; - private int maxNumActionsForLog; - public final static String POSITIVE_FILTER = "positive"; - public final static String NEGATIVE_FILTER = "negative"; + private final int maxNumActionsForLog; + + public enum FILTER_COMPARATORS { + //This ordering is important, dont change this + GREATER_EQUAL(">="), GREATER(">"), LESSTHAN_EQUAL("<="), LESSTHAN("<"), NOT_EQUALS("!="), EQUALS("="); + + private final String sign; + + FILTER_COMPARATORS(String sign) { + this.sign = sign; + } + + public String getSign() { + return sign; + } + } + + public static final String[] VALID_JOB_FILTERS = {OozieClient.FILTER_STATUS, OozieClient.FILTER_NOMINAL_TIME}; /** * Create a system Coordinator engine, with no user and no group. */ public CoordinatorEngine() { - if (Services.get().getConf().getBoolean(USE_XCOMMAND, true) == false) { + if (!Services.get().getConf().getBoolean(USE_XCOMMAND, true)) { LOG.debug("Oozie CoordinatorEngine is not using XCommands."); } else { @@ -161,10 +179,9 @@ public class CoordinatorEngine extends BaseEngine { @Override public CoordinatorJobBean getCoordJob(String jobId, String filter, int offset, int length, boolean desc) throws BaseEngineException { - Map<String, List<String>> filterMap = parseStatusFilter(filter); + Map<Pair<String, FILTER_COMPARATORS>, List<Object>> filterMap = parseJobFilter(filter); try { - return new CoordJobXCommand(jobId, filterMap, offset, length, desc) - .call(); + return new CoordJobXCommand(jobId, filterMap, offset, length, desc).call(); } catch (CommandException ex) { throw new BaseEngineException(ex); @@ -575,7 +592,7 @@ public class CoordinatorEngine extends BaseEngine { * @throws CoordinatorEngineException */ public CoordinatorJobInfo getCoordJobs(String filter, int start, int len) throws CoordinatorEngineException { - Map<String, List<String>> filterList = parseFilter(filter); + Map<String, List<String>> filterList = parseJobsFilter(filter); try { return new CoordJobsXCommand(filterList, start, len).call(); @@ -586,60 +603,68 @@ public class CoordinatorEngine extends BaseEngine { } // Parses the filter string (e.g status=RUNNING;status=WAITING) and returns a list of status values - private Map<String, List<String>> parseStatusFilter(String filter) throws CoordinatorEngineException { - Map<String, List<String>> filterMap = new HashMap<String, List<String>>(); + public Map<Pair<String, FILTER_COMPARATORS>, List<Object>> parseJobFilter(String filter) throws + CoordinatorEngineException { + Map<Pair<String, FILTER_COMPARATORS>, List<Object>> filterMap = new HashMap<Pair<String, + FILTER_COMPARATORS>, List<Object>>(); if (filter != null) { - //split name;value pairs + //split name value pairs StringTokenizer st = new StringTokenizer(filter, ";"); while (st.hasMoreTokens()) { - String token = st.nextToken(); - if (token.contains("=")) { - boolean negative = false; - String[] pair = null; - if(token.contains("!=")) { - negative = true; - pair = token.split("!="); - }else { - pair = token.split("="); - } - if (pair.length != 2) { - throw new CoordinatorEngineException(ErrorCode.E0421, token, - "elements must be name=value or name!=value pairs"); - } - if (pair[0].equalsIgnoreCase("status")) { - String statusValue = pair[1]; - try { - CoordinatorAction.Status.valueOf(statusValue); - } catch (IllegalArgumentException ex) { - StringBuilder validStatusList = new StringBuilder(); - for (CoordinatorAction.Status status : CoordinatorAction.Status.values()){ - validStatusList.append(status.toString()+" "); + String token = st.nextToken().trim(); + Pair<String, FILTER_COMPARATORS> pair = null; + for (FILTER_COMPARATORS comp : FILTER_COMPARATORS.values()) { + if (token.contains(comp.getSign())) { + int index = token.indexOf(comp.getSign()); + String key = token.substring(0, index); + String valueStr = token.substring(index + comp.getSign().length()); + Object value; + + if (key.equalsIgnoreCase(OozieClient.FILTER_STATUS)) { + value = valueStr.toUpperCase(); + try { + CoordinatorAction.Status.valueOf((String) value); + } catch (IllegalArgumentException ex) { + // Check for incorrect status value + throw new CoordinatorEngineException(ErrorCode.E0421, filter, + XLog.format("invalid status value [{0}]." + " Valid status values are: [{1}]", + valueStr, StringUtils.join(CoordinatorAction.Status.values(), ", "))); } - // Check for incorrect status value - throw new CoordinatorEngineException(ErrorCode.E0421, filter, XLog.format( - "invalid status value [{0}]." + " Valid status values are: [{1}]", statusValue, validStatusList)); - } - String filterType = negative ? NEGATIVE_FILTER : POSITIVE_FILTER; - String oppositeFilterType = negative ? POSITIVE_FILTER : NEGATIVE_FILTER; - List<String> filterList = filterMap.get(filterType); - if (filterList == null) { - filterList = new ArrayList<String>(); - filterMap.put(filterType, filterList); + + if (!(comp == FILTER_COMPARATORS.EQUALS || comp == FILTER_COMPARATORS.NOT_EQUALS)) { + throw new CoordinatorEngineException(ErrorCode.E0421, filter, + XLog.format("invalid comparator [{0}] for status." + " Valid are = and !=", + comp.getSign())); + } + + pair = Pair.of(OozieClient.FILTER_STATUS, comp); + } else if (key.equalsIgnoreCase(OozieClient.FILTER_NOMINAL_TIME)) { + try { + value = new Timestamp(DateUtils.parseDateUTC(valueStr).getTime()); + } catch (ParseException e) { + throw new CoordinatorEngineException(ErrorCode.E0421, filter, + XLog.format("invalid nominal time [{0}]." + " Valid format: " + + "[{1}]", valueStr, DateUtils.ISO8601_UTC_MASK)); + } + pair = Pair.of(OozieClient.FILTER_NOMINAL_TIME, comp); + } else { + // Check for incorrect filter option + throw new CoordinatorEngineException(ErrorCode.E0421, filter, + XLog.format("invalid filter [{0}]." + " Valid filters [{1}]", key, StringUtils.join + (VALID_JOB_FILTERS, ", "))); } - List<String> oFilterList = filterMap.get(oppositeFilterType); - if (oFilterList != null && oFilterList.contains(statusValue)) { - throw new CoordinatorEngineException(ErrorCode.E0421, filter, XLog.format( - "the status [{0}] specified in both positive and negative filters", statusValue)); + if (!filterMap.containsKey(pair)) { + filterMap.put(pair, new ArrayList<Object>()); } - filterList.add(statusValue); - } else { - // Check for incorrect filter option - throw new CoordinatorEngineException(ErrorCode.E0421, filter, XLog.format( - "invalid filter [{0}]." + " The only valid filter is \"status\"", pair[0])); + filterMap.get(pair).add(value); + break; } - } else { - throw new CoordinatorEngineException(ErrorCode.E0421, token, - "elements must be name=value or name!=value pairs"); + } + + if (pair == null) { + //token doesn't contain comparator + throw new CoordinatorEngineException(ErrorCode.E0421, filter, + "filter should be of format <key><comparator><value> pairs"); } } } @@ -652,7 +677,7 @@ public class CoordinatorEngine extends BaseEngine { * @throws CoordinatorEngineException */ @VisibleForTesting - Map<String, List<String>> parseFilter(String filter) throws CoordinatorEngineException { + Map<String, List<String>> parseJobsFilter(String filter) throws CoordinatorEngineException { Map<String, List<String>> map = new HashMap<String, List<String>>(); boolean isTimeUnitSpecified = false; String timeUnit = "MINUTE"; http://git-wip-us.apache.org/repos/asf/oozie/blob/d5cd976a/core/src/main/java/org/apache/oozie/command/coord/CoordJobXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordJobXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordJobXCommand.java index 4d5ba71..51b09a2 100644 --- a/core/src/main/java/org/apache/oozie/command/coord/CoordJobXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/coord/CoordJobXCommand.java @@ -18,11 +18,8 @@ package org.apache.oozie.command.coord; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - import org.apache.oozie.CoordinatorActionBean; +import org.apache.oozie.CoordinatorEngine.FILTER_COMPARATORS; import org.apache.oozie.CoordinatorJobBean; import org.apache.oozie.ErrorCode; import org.apache.oozie.XException; @@ -33,8 +30,13 @@ import org.apache.oozie.executor.jpa.CoordJobGetActionsSubsetJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; import org.apache.oozie.service.JPAService; import org.apache.oozie.service.Services; +import org.apache.oozie.util.Pair; import org.apache.oozie.util.ParamChecker; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + /** * Command for loading a coordinator job information */ @@ -44,7 +46,7 @@ public class CoordJobXCommand extends CoordinatorXCommand<CoordinatorJobBean> { private int offset = 1; private int len = Integer.MAX_VALUE; private boolean desc = false; - private Map<String, List<String>> filterMap; + private Map<Pair<String, FILTER_COMPARATORS>, List<Object>> filterMap; /** * Constructor for loading a coordinator job information @@ -57,14 +59,14 @@ public class CoordJobXCommand extends CoordinatorXCommand<CoordinatorJobBean> { /** * Constructor for loading a coordinator job information - * * @param id coord jobId + * @param filterMap * @param offset starting index in the list of actions belonging to the job * @param length number of actions to be returned - * @param filterMap * @param desc boolean for whether the actions returned are in descending order */ - public CoordJobXCommand(String id, Map<String, List<String>> filterMap, int offset, int length, boolean desc) { + public CoordJobXCommand(String id, Map<Pair<String, FILTER_COMPARATORS>, List<Object>> filterMap, int offset, + int length, boolean desc) { super("job.info", "job.info", 1); this.id = ParamChecker.notEmpty(id, "id"); this.getActionInfo = true; http://git-wip-us.apache.org/repos/asf/oozie/blob/d5cd976a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsSubsetJPAExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsSubsetJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsSubsetJPAExecutor.java index cdd5000..ffe91ec 100644 --- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsSubsetJPAExecutor.java +++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsSubsetJPAExecutor.java @@ -18,23 +18,26 @@ package org.apache.oozie.executor.jpa; -import java.sql.Timestamp; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import javax.persistence.EntityManager; -import javax.persistence.Query; - import org.apache.oozie.CoordinatorActionBean; -import org.apache.oozie.CoordinatorEngine; +import org.apache.oozie.CoordinatorEngine.FILTER_COMPARATORS; import org.apache.oozie.ErrorCode; import org.apache.oozie.StringBlob; import org.apache.oozie.client.CoordinatorAction; +import org.apache.oozie.client.OozieClient; import org.apache.oozie.service.Services; import org.apache.oozie.util.DateUtils; +import org.apache.oozie.util.Pair; import org.apache.oozie.util.ParamChecker; +import javax.persistence.EntityManager; +import javax.persistence.Query; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + /** * Load coordinator actions by offset and len (a subset) for a coordinator job. */ @@ -44,14 +47,14 @@ public class CoordJobGetActionsSubsetJPAExecutor implements JPAExecutor<List<Coo private int offset = 1; private int len = 50; private boolean desc = false; - private Map<String,List<String>> filterMap; + private Map<Pair<String, FILTER_COMPARATORS>, List<Object>> filterMap; public CoordJobGetActionsSubsetJPAExecutor(String coordJobId) { ParamChecker.notNull(coordJobId, "coordJobId"); this.coordJobId = coordJobId; } - public CoordJobGetActionsSubsetJPAExecutor(String coordJobId, Map<String, List<String>> filterMap, + public CoordJobGetActionsSubsetJPAExecutor(String coordJobId, Map<Pair<String, FILTER_COMPARATORS>, List<Object>> filterMap, int offset, int len, boolean desc) { this(coordJobId); this.filterMap = filterMap; @@ -93,6 +96,7 @@ public class CoordJobGetActionsSubsetJPAExecutor implements JPAExecutor<List<Coo } private Query setQueryParameters(Query q, EntityManager em){ + Map<String, Object> params = null; if (filterMap != null) { // Add the filter clause String query = q.toString(); @@ -100,8 +104,7 @@ public class CoordJobGetActionsSubsetJPAExecutor implements JPAExecutor<List<Coo int offset = query.lastIndexOf("order"); // Get the 'where' clause for status filters StringBuilder statusClause = new StringBuilder(); - getStatusClause(statusClause, filterMap.get(CoordinatorEngine.POSITIVE_FILTER), true); - getStatusClause(statusClause, filterMap.get(CoordinatorEngine.NEGATIVE_FILTER), false); + params = getWhereClause(statusClause, filterMap); // Insert 'where' before 'order by' sbTotal.insert(offset, statusClause); q = em.createQuery(sbTotal.toString()); @@ -109,6 +112,11 @@ public class CoordJobGetActionsSubsetJPAExecutor implements JPAExecutor<List<Coo if (desc) { q = em.createQuery(q.toString().concat(" desc")); } + if (params != null) { + for (String pname : params.keySet()) { + q.setParameter(pname, params.get(pname)); + } + } q.setParameter("jobId", coordJobId); q.setFirstResult(offset - 1); q.setMaxResults(len); @@ -116,29 +124,68 @@ public class CoordJobGetActionsSubsetJPAExecutor implements JPAExecutor<List<Coo } // Form the where clause to filter by status values - private StringBuilder getStatusClause(StringBuilder sb, List<String> filterList, boolean positive) { - if (sb == null) { - sb = new StringBuilder(); - } - boolean isStatus = false; - if (filterList != null && filterList.size() > 0) { - for (String statusVal : filterList) { - if (!isStatus) { - if (positive) { - sb.append(" and a.statusStr IN (\'").append(statusVal).append("\'"); - } - else { - sb.append(" and a.statusStr NOT IN (\'").append(statusVal).append("\'"); - } - isStatus = true; - } - else { - sb.append(",\'").append(statusVal).append("\'"); + private Map<String, Object> getWhereClause(StringBuilder sb, Map<Pair<String, FILTER_COMPARATORS>, + List<Object>> filterMap) { + Map<String, Object> params = new HashMap<String, Object>(); + int pcnt= 1; + for (Entry<Pair<String, FILTER_COMPARATORS>, List<Object>> filter : filterMap.entrySet()) { + String field = filter.getKey().getFist(); + FILTER_COMPARATORS comp = filter.getKey().getSecond(); + String sqlField; + if (field.equals(OozieClient.FILTER_STATUS)) { + sqlField = "a.statusStr"; + } else if (field.equals(OozieClient.FILTER_NOMINAL_TIME)) { + sqlField = "a.nominalTimestamp"; + } else { + throw new IllegalArgumentException("Invalid filter key " + field); + } + + sb.append(" and ").append(sqlField).append(" "); + switch (comp) { + case EQUALS: + sb.append("IN ("); + params.putAll(appendParams(sb, filter.getValue(), pcnt)); + sb.append(")"); + break; + + case NOT_EQUALS: + sb.append("NOT IN ("); + params.putAll(appendParams(sb, filter.getValue(), pcnt)); + sb.append(")"); + break; + + case GREATER: + case GREATER_EQUAL: + case LESSTHAN: + case LESSTHAN_EQUAL: + if (filter.getValue().size() != 1) { + throw new IllegalArgumentException(field + comp.getSign() + " can't have more than 1 values"); } + + sb.append(comp.getSign()).append(" "); + params.putAll(appendParams(sb, filter.getValue(), pcnt)); + break; + } + + pcnt += filter.getValue().size(); + } + sb.append(" "); + return params; + } + + private Map<String, Object> appendParams(StringBuilder sb, List<Object> value, int sindex) { + Map<String, Object> params = new HashMap<String, Object>(); + boolean first = true; + for (Object val : value) { + String pname = "p" + sindex++; + params.put(pname, val); + if (!first) { + sb.append(", "); } - sb.append(") "); + sb.append(':').append(pname); + first = false; } - return sb; + return params; } private CoordinatorActionBean getBeanForRunningCoordAction(Object arr[]) { http://git-wip-us.apache.org/repos/asf/oozie/blob/d5cd976a/core/src/main/java/org/apache/oozie/util/Pair.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/Pair.java b/core/src/main/java/org/apache/oozie/util/Pair.java new file mode 100644 index 0000000..1bf45b4 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/util/Pair.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.util; + +import org.apache.commons.lang.ObjectUtils; + +/** + * Utility class for holding a pair of data + * @param <T> type of first data + * @param <S> type of second data + */ +public class Pair<T, S> { + private T first; + private S second; + + public Pair(T first, S second) { + this.first = first; + this.second = second; + } + + + public static <T, S> Pair<T, S> of(T first, S second) { + return new Pair(first, second); + } + + public T getFist() { + return first; + } + + public S getSecond() { + return second; + } + + public int hashCode() + { + return (first == null ? 1 : first.hashCode()) * 17 + (second == null ? 1 : second.hashCode()) * 19; + } + + public boolean equals(Object other) + { + if (other == null) { + return false; + } + + if (!(other instanceof Pair)) { + return false; + } + + Pair otherPair = (Pair)other; + return (ObjectUtils.equals(first, otherPair.first) && ObjectUtils.equals(second, otherPair.second)); + } +} + http://git-wip-us.apache.org/repos/asf/oozie/blob/d5cd976a/core/src/test/java/org/apache/oozie/TestCoordinatorEngine.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/TestCoordinatorEngine.java b/core/src/test/java/org/apache/oozie/TestCoordinatorEngine.java index 93c9235..8b5bee0 100644 --- a/core/src/test/java/org/apache/oozie/TestCoordinatorEngine.java +++ b/core/src/test/java/org/apache/oozie/TestCoordinatorEngine.java @@ -18,13 +18,7 @@ package org.apache.oozie; -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; -import java.io.PrintWriter; -import java.net.URI; -import java.util.List; - +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.oozie.client.CoordinatorAction; import org.apache.oozie.client.CoordinatorJob; @@ -36,6 +30,13 @@ import org.apache.oozie.store.StoreException; import org.apache.oozie.test.XTestCase; import org.apache.oozie.util.XConfiguration; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.net.URI; +import java.util.List; + public class TestCoordinatorEngine extends XTestCase { private Services services; @@ -482,8 +483,8 @@ public class TestCoordinatorEngine extends XTestCase { } catch (CoordinatorEngineException ex) { assertEquals(ErrorCode.E0421, ex.getErrorCode()); - assertEquals("E0421: Invalid job filter [statusRUNNING], elements must be name=value or name!=value pairs", - ex.getMessage()); + assertEquals("E0421: Invalid job filter [statusRUNNING], " + + "filter should be of format <key><comparator><value> pairs", ex.getMessage()); } //Check for missing value after "=" @@ -492,7 +493,8 @@ public class TestCoordinatorEngine extends XTestCase { } catch (CoordinatorEngineException ex) { assertEquals(ErrorCode.E0421, ex.getErrorCode()); - assertEquals("E0421: Invalid job filter [status=], elements must be name=value or name!=value pairs", ex.getMessage()); + assertEquals("E0421: Invalid job filter [status=], invalid status value []. Valid status values are: [" + + StringUtils.join(CoordinatorAction.Status.values(), ", ") + "]", ex.getMessage()); } // Check for invalid status value @@ -502,8 +504,8 @@ public class TestCoordinatorEngine extends XTestCase { catch (CoordinatorEngineException ex) { assertEquals(ErrorCode.E0421, ex.getErrorCode()); assertEquals("E0421: Invalid job filter [status=blahblah], invalid status value [blahblah]." - + " Valid status values are: [WAITING READY SUBMITTED RUNNING SUSPENDED TIMEDOUT " - + "SUCCEEDED KILLED FAILED IGNORED SKIPPED ]", ex.getMessage()); + + " Valid status values are: [" + + StringUtils.join(CoordinatorAction.Status.values(), ", ") + "]", ex.getMessage()); } // Check for empty status value @@ -512,9 +514,9 @@ public class TestCoordinatorEngine extends XTestCase { } catch (CoordinatorEngineException ex) { assertEquals(ErrorCode.E0421, ex.getErrorCode()); - assertEquals("E0421: Invalid job filter [status=\"\"], invalid status value [\"\"]. " - + "Valid status values are: [WAITING READY SUBMITTED RUNNING SUSPENDED TIMEDOUT " - + "SUCCEEDED KILLED FAILED IGNORED SKIPPED ]", ex.getMessage()); + assertEquals("E0421: Invalid job filter [status=\"\"], invalid status value [\"\"]." + + " Valid status values are: [" + + StringUtils.join(CoordinatorAction.Status.values(), ", ") + "]", ex.getMessage()); } // Check for invalid filter option @@ -523,7 +525,8 @@ public class TestCoordinatorEngine extends XTestCase { } catch (CoordinatorEngineException ex) { assertEquals(ErrorCode.E0421, ex.getErrorCode()); - assertEquals("E0421: Invalid job filter [blahblah=blahblah], invalid filter [blahblah]. The only valid filter is \"status\"", ex.getMessage()); + assertEquals("E0421: Invalid job filter [blahblah=blahblah], invalid filter [blahblah]. " + + "Valid filters [" + StringUtils.join(CoordinatorEngine.VALID_JOB_FILTERS, ", ") + "]", ex.getMessage()); } } } http://git-wip-us.apache.org/repos/asf/oozie/blob/d5cd976a/core/src/test/java/org/apache/oozie/TestCoordinatorEngineSimple.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/TestCoordinatorEngineSimple.java b/core/src/test/java/org/apache/oozie/TestCoordinatorEngineSimple.java index 824eb80..4a12e42 100644 --- a/core/src/test/java/org/apache/oozie/TestCoordinatorEngineSimple.java +++ b/core/src/test/java/org/apache/oozie/TestCoordinatorEngineSimple.java @@ -18,12 +18,18 @@ package org.apache.oozie; -import java.util.List; -import java.util.Map; - +import org.apache.oozie.CoordinatorEngine.FILTER_COMPARATORS; +import org.apache.oozie.client.CoordinatorAction.Status; +import org.apache.oozie.client.OozieClient; import org.apache.oozie.service.Services; import org.apache.oozie.test.XTestCase; -import org.junit.Test; +import org.apache.oozie.util.DateUtils; +import org.apache.oozie.util.Pair; + +import java.sql.Timestamp; +import java.util.Date; +import java.util.List; +import java.util.Map; public class TestCoordinatorEngineSimple extends XTestCase { @@ -42,23 +48,107 @@ public class TestCoordinatorEngineSimple extends XTestCase { super.tearDown(); } - @Test + public void testParseJobFilter() throws CoordinatorEngineException { + final CoordinatorEngine ce = new CoordinatorEngine(); + + //valid status filter + Map<Pair<String, FILTER_COMPARATORS>, List<Object>> map = ce.parseJobFilter("staTus=succeeded; status=waiTing"); + assertNotNull(map); + assertEquals(1, map.size()); + Pair<String, FILTER_COMPARATORS> key = map.keySet().iterator().next(); + assertNotNull(key); + assertEquals(OozieClient.FILTER_STATUS, key.getFist()); + assertEquals(FILTER_COMPARATORS.EQUALS, key.getSecond()); + List<Object> list = map.get(key); + assertNotNull(list); + assertEquals(2, list.size()); + assertEquals(Status.SUCCEEDED.name(), (String) list.get(0)); + assertEquals(Status.WAITING.name(), (String)list.get(1)); + + //valid nominal time filter + map = ce.parseJobFilter("nominaltime>=2013-05-01T10:00Z"); + assertNotNull(map); + assertEquals(1, map.size()); + key = map.keySet().iterator().next(); + assertNotNull(key); + assertEquals(OozieClient.FILTER_NOMINAL_TIME, key.getFist()); + assertEquals(FILTER_COMPARATORS.GREATER_EQUAL, key.getSecond()); + list = map.get(key); + assertNotNull(list); + assertEquals(1, list.size()); + assertEquals("2013-05-01T10:00Z", DateUtils.formatDateOozieTZ(new Date(((Timestamp) list.get(0)).getTime()))); + + //invalid format + try { + ce.parseJobFilter("winniethepooh"); + fail("CoordinatorEngineException expected."); + } + catch (CoordinatorEngineException bee) { + assertEquals(ErrorCode.E0421, bee.getErrorCode()); + } + + //invalid key + try { + ce.parseJobFilter("stat=some"); + fail("CoordinatorEngineException expected."); + } + catch (CoordinatorEngineException bee) { + assertEquals(ErrorCode.E0421, bee.getErrorCode()); + } + + //invalid status value + try { + ce.parseJobFilter("status=some"); + fail("CoordinatorEngineException expected."); + } + catch (CoordinatorEngineException bee) { + assertEquals(ErrorCode.E0421, bee.getErrorCode()); + } + + //invalid comparator for status + try { + ce.parseJobFilter("status>=some"); + fail("CoordinatorEngineException expected."); + } + catch (CoordinatorEngineException bee) { + assertEquals(ErrorCode.E0421, bee.getErrorCode()); + } + + //invalid nominal time value + try { + ce.parseJobFilter("nominaltime=2013-13-01T00:00Z"); + fail("CoordinatorEngineException expected."); + } + catch (CoordinatorEngineException bee) { + assertEquals(ErrorCode.E0421, bee.getErrorCode()); + } + + //invalid comparator + try { + ce.parseJobFilter("nominaltime*2013-13-01T00:00Z"); + fail("CoordinatorEngineException expected."); + } + catch (CoordinatorEngineException bee) { + assertEquals(ErrorCode.E0421, bee.getErrorCode()); + } + } + public void testParseFilterNegative() throws CoordinatorEngineException { final CoordinatorEngine ce = new CoordinatorEngine(); // null argument: - Map<String, List<String>> map = ce.parseFilter(null); + Map<String, List<String>> map = ce.parseJobsFilter(null); assertNotNull(map); assertEquals(0, map.size()); // empty String: - map = ce.parseFilter(""); + map = ce.parseJobsFilter(""); assertNotNull(map); assertEquals(0, map.size()); // no eq sign in token: try { - ce.parseFilter("winniethepooh"); + ce.parseJobsFilter("winniethepooh"); fail("CoordinatorEngineException expected."); } catch (CoordinatorEngineException bee) { @@ -66,7 +156,7 @@ public class TestCoordinatorEngineSimple extends XTestCase { } // incorrect k=v: try { - map = ce.parseFilter("kk=vv=zz"); + ce.parseJobsFilter("kk=vv=zz"); fail("CoordinatorEngineException expected."); } catch (CoordinatorEngineException cee) { @@ -74,7 +164,7 @@ public class TestCoordinatorEngineSimple extends XTestCase { } // unknown key in key=value pair: try { - ce.parseFilter("foo=moo"); + ce.parseJobsFilter("foo=moo"); fail("CoordinatorEngineException expected."); } catch (CoordinatorEngineException bee) { @@ -82,7 +172,7 @@ public class TestCoordinatorEngineSimple extends XTestCase { } // incorrect "status" key value: try { - ce.parseFilter("status=foo"); + ce.parseJobsFilter("status=foo"); fail("CoordinatorEngineException expected."); } catch (CoordinatorEngineException bee) { @@ -90,7 +180,7 @@ public class TestCoordinatorEngineSimple extends XTestCase { } // unparseable "frequency" value: try { - ce.parseFilter("FreQuency=foo"); + ce.parseJobsFilter("FreQuency=foo"); fail("CoordinatorEngineException expected."); } catch (CoordinatorEngineException bee) { @@ -98,7 +188,7 @@ public class TestCoordinatorEngineSimple extends XTestCase { } // unparseable "unit" value: try { - ce.parseFilter("UniT=foo"); + ce.parseJobsFilter("UniT=foo"); fail("CoordinatorEngineException expected."); } catch (CoordinatorEngineException bee) { @@ -106,7 +196,7 @@ public class TestCoordinatorEngineSimple extends XTestCase { } // "unit" specified, but "frequency" is not: try { - ce.parseFilter("unit=minutes"); + ce.parseJobsFilter("unit=minutes"); fail("CoordinatorEngineException expected."); } catch (CoordinatorEngineException bee) { @@ -114,11 +204,10 @@ public class TestCoordinatorEngineSimple extends XTestCase { } } - @Test public void testParseFilterPositive() throws CoordinatorEngineException { final CoordinatorEngine ce = new CoordinatorEngine(); - Map<String, List<String>> map = ce.parseFilter("frequency=5;unit=hours;user=foo;status=FAILED"); + Map<String, List<String>> map = ce.parseJobsFilter("frequency=5;unit=hours;user=foo;status=FAILED"); assertEquals(4, map.size()); assertEquals("300", map.get("frequency").get(0)); assertEquals("MINUTE", map.get("unit").get(0)); http://git-wip-us.apache.org/repos/asf/oozie/blob/d5cd976a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobGetActionsSubsetJPAExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobGetActionsSubsetJPAExecutor.java b/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobGetActionsSubsetJPAExecutor.java index d3d29aa..9ccd62a 100644 --- a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobGetActionsSubsetJPAExecutor.java +++ b/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobGetActionsSubsetJPAExecutor.java @@ -18,26 +18,34 @@ package org.apache.oozie.executor.jpa; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - import org.apache.hadoop.fs.Path; import org.apache.oozie.CoordinatorActionBean; -import org.apache.oozie.CoordinatorEngine; +import org.apache.oozie.CoordinatorEngine.FILTER_COMPARATORS; import org.apache.oozie.CoordinatorJobBean; import org.apache.oozie.client.CoordinatorAction; +import org.apache.oozie.client.CoordinatorAction.Status; import org.apache.oozie.client.CoordinatorJob; -import org.apache.oozie.local.LocalOozie; +import org.apache.oozie.client.OozieClient; import org.apache.oozie.service.JPAService; import org.apache.oozie.service.Services; import org.apache.oozie.test.XDataTestCase; import org.apache.oozie.util.DateUtils; +import org.apache.oozie.util.Pair; +import org.junit.Assert; + +import java.sql.Timestamp; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; public class TestCoordJobGetActionsSubsetJPAExecutor extends XDataTestCase { + private static final Pair<String, FILTER_COMPARATORS> POSITIVE_STATUS_FILTER = + Pair.of(OozieClient.FILTER_STATUS, FILTER_COMPARATORS.EQUALS); + private static final Pair<String, FILTER_COMPARATORS> NEGATIVE_STATUS_FILTER = + Pair.of(OozieClient.FILTER_STATUS, FILTER_COMPARATORS.NOT_EQUALS); Services services; @Override @@ -53,6 +61,140 @@ public class TestCoordJobGetActionsSubsetJPAExecutor extends XDataTestCase { super.tearDown(); } + private Timestamp getSqlTime(String str) throws ParseException { + return new Timestamp(DateUtils.parseDateUTC(str).getTime()); + } + + private List<Object> getList(Object... objs) { + List<Object> list = new ArrayList<Object>(); + for (Object o : objs) { + list.add(o); + } + return list; + } + + public void testGetActionsWithNominalTimeFilter() throws Exception { + CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false); + String timeStr[] = {"2009-02-01T00:00Z", "2009-02-01T05:00Z", "2009-02-01T10:00Z", "2009-02-01T15:00Z"}; + Date ntime[] = {getSqlTime(timeStr[0]), getSqlTime(timeStr[1]), getSqlTime(timeStr[2]), getSqlTime(timeStr[3])}; + + List<String> actionIds = new ArrayList<String>(timeStr.length); + int startAction = 5; + for (Date time : ntime) { + CoordinatorActionBean action = + addRecordToCoordActionTable(job.getId(), startAction++, Status.WAITING, "coord-action-get.xml", 0, + time); + actionIds.add(action.getId()); + } + JPAService jpaService = Services.get().get(JPAService.class); + assertNotNull(jpaService); + + //Filter with nominalTime >=, asc + Map<Pair<String, FILTER_COMPARATORS>, List<Object>> filterMap = new HashMap<Pair<String, FILTER_COMPARATORS>, + List<Object>>(); + filterMap.put( + Pair.of(OozieClient.FILTER_NOMINAL_TIME, FILTER_COMPARATORS.GREATER_EQUAL), getList(ntime[2])); + CoordJobGetActionsSubsetJPAExecutor actionGetCmd = new CoordJobGetActionsSubsetJPAExecutor(job.getId(), + filterMap, 1, 10, false); + List<CoordinatorActionBean> actions = jpaService.execute(actionGetCmd); + Assert.assertEquals(2, actions.size()); + Assert.assertEquals(actionIds.get(2), actions.get(0).getId()); + Assert.assertEquals(actionIds.get(3), actions.get(1).getId()); + + //Filter with nominalTime >=, desc + actionGetCmd = new CoordJobGetActionsSubsetJPAExecutor(job.getId(), filterMap, 1, 10, true); + actions = jpaService.execute(actionGetCmd); + Assert.assertEquals(2, actions.size()); + Assert.assertEquals(actionIds.get(3), actions.get(0).getId()); + Assert.assertEquals(actionIds.get(2), actions.get(1).getId()); + + //Filter with nominalTime <, asc + filterMap.clear(); + filterMap.put( + Pair.of(OozieClient.FILTER_NOMINAL_TIME, FILTER_COMPARATORS.LESSTHAN), getList(ntime[2])); + actionGetCmd = new CoordJobGetActionsSubsetJPAExecutor(job.getId(), filterMap, 1, 10, false); + actions = jpaService.execute(actionGetCmd); + Assert.assertEquals(2, actions.size()); + Assert.assertEquals(actionIds.get(0), actions.get(0).getId()); + Assert.assertEquals(actionIds.get(1), actions.get(1).getId()); + + //Filter with nominalTime <, desc + actionGetCmd = new CoordJobGetActionsSubsetJPAExecutor(job.getId(), filterMap, 1, 10, true); + actions = jpaService.execute(actionGetCmd); + Assert.assertEquals(2, actions.size()); + Assert.assertEquals(actionIds.get(1), actions.get(0).getId()); + Assert.assertEquals(actionIds.get(0), actions.get(1).getId()); + + //Filter with nominalTime >=, nominalTime <, asc + filterMap.put( + Pair.of(OozieClient.FILTER_NOMINAL_TIME, FILTER_COMPARATORS.GREATER_EQUAL), getList(ntime[1])); + filterMap.put( + Pair.of(OozieClient.FILTER_NOMINAL_TIME, FILTER_COMPARATORS.LESSTHAN), getList(ntime[3])); + actionGetCmd = new CoordJobGetActionsSubsetJPAExecutor(job.getId(), filterMap, 1, 10, false); + actions = jpaService.execute(actionGetCmd); + Assert.assertEquals(2, actions.size()); + Assert.assertEquals(actionIds.get(1), actions.get(0).getId()); + Assert.assertEquals(actionIds.get(2), actions.get(1).getId()); + + //Filter with nominalTime >=, nominalTime <, desc + actionGetCmd = new CoordJobGetActionsSubsetJPAExecutor(job.getId(), filterMap, 1, 10, true); + actions = jpaService.execute(actionGetCmd); + Assert.assertEquals(2, actions.size()); + Assert.assertEquals(actionIds.get(2), actions.get(0).getId()); + Assert.assertEquals(actionIds.get(1), actions.get(1).getId()); + + //Filter with nominalTime >=, nominalTime <, desc, offset + actionGetCmd = new CoordJobGetActionsSubsetJPAExecutor(job.getId(), filterMap, 2, 10, true); + actions = jpaService.execute(actionGetCmd); + Assert.assertEquals(1, actions.size()); + Assert.assertEquals(actionIds.get(1), actions.get(0).getId()); + + //Filter with nominalTime >=, nominalTime <, asc, offset + actionGetCmd = new CoordJobGetActionsSubsetJPAExecutor(job.getId(), filterMap, 2, 10, false); + actions = jpaService.execute(actionGetCmd); + Assert.assertEquals(1, actions.size()); + Assert.assertEquals(actionIds.get(2), actions.get(0).getId()); + + //Filter with nominalTime >=, nominalTime <, desc, len + actionGetCmd = new CoordJobGetActionsSubsetJPAExecutor(job.getId(), filterMap, 1, 2, true); + actions = jpaService.execute(actionGetCmd); + Assert.assertEquals(2, actions.size()); + Assert.assertEquals(actionIds.get(2), actions.get(0).getId()); + Assert.assertEquals(actionIds.get(1), actions.get(1).getId()); + + //Filter with nominalTime >=, nominalTime <, asc, len + actionGetCmd = new CoordJobGetActionsSubsetJPAExecutor(job.getId(), filterMap, 1, 2, false); + actions = jpaService.execute(actionGetCmd); + Assert.assertEquals(2, actions.size()); + Assert.assertEquals(actionIds.get(1), actions.get(0).getId()); + Assert.assertEquals(actionIds.get(2), actions.get(1).getId()); + + //Filter with nominalTime >=, nominalTime <, asc, offset, len + filterMap.put( + Pair.of(OozieClient.FILTER_NOMINAL_TIME, FILTER_COMPARATORS.LESSTHAN), + getList(getSqlTime("2009-02-01T23:00Z"))); + actionGetCmd = new CoordJobGetActionsSubsetJPAExecutor(job.getId(), filterMap, 2, 2, false); + actions = jpaService.execute(actionGetCmd); + Assert.assertEquals(2, actions.size()); + Assert.assertEquals(actionIds.get(2), actions.get(0).getId()); + Assert.assertEquals(actionIds.get(3), actions.get(1).getId()); + + //Filter with nominalTime >=, nominalTime <, desc, offset, len + actionGetCmd = new CoordJobGetActionsSubsetJPAExecutor(job.getId(), filterMap, 2, 2, true); + actions = jpaService.execute(actionGetCmd); + Assert.assertEquals(2, actions.size()); + Assert.assertEquals(actionIds.get(2), actions.get(0).getId()); + Assert.assertEquals(actionIds.get(1), actions.get(1).getId()); + + //Filter with nominalTime and status + filterMap.put( + Pair.of(OozieClient.FILTER_STATUS, FILTER_COMPARATORS.EQUALS), getList(Status.SUCCEEDED.name())); + actionGetCmd = new CoordJobGetActionsSubsetJPAExecutor(job.getId(), filterMap, 1, 10, true); + actions = jpaService.execute(actionGetCmd); + Assert.assertEquals(0, actions.size()); + + } + public void testCoordActionGet() throws Exception { int actionNum = 1; String resourceXmlName = "coord-action-get.xml"; @@ -116,10 +258,10 @@ public class TestCoordJobGetActionsSubsetJPAExecutor extends XDataTestCase { public void testCoordActionOrderBy() throws Exception { CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false); // Add Coordinator action with nominal time: 2009-12-15T01:00Z - CoordinatorActionBean action = addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.WAITING, + addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0); // Add Coordinator action with nominal time: 2009-02-01T23:59Z - CoordinatorActionBean action1 = addRecordToCoordActionTable(job.getId(), 2, CoordinatorAction.Status.WAITING, + addRecordToCoordActionTable(job.getId(), 2, CoordinatorAction.Status.WAITING, "coord-action-for-action-input-check.xml", 0); // test for the expected action number List<CoordinatorActionBean> actions = _testGetActionsSubsetOrderBy(job.getId(), 1, 2, false); @@ -133,10 +275,10 @@ public class TestCoordJobGetActionsSubsetJPAExecutor extends XDataTestCase { public void testCoordActionOrderByDesc() throws Exception { CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false); // Add Coordinator action with nominal time: 2009-12-15T01:00Z - CoordinatorActionBean action = addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.WAITING, + addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0); // Add Coordinator action with nominal time: 2009-02-01T23:59Z - CoordinatorActionBean action1 = addRecordToCoordActionTable(job.getId(), 2, CoordinatorAction.Status.WAITING, + addRecordToCoordActionTable(job.getId(), 2, CoordinatorAction.Status.WAITING, "coord-action-for-action-input-check.xml", 0); // test for the expected action number List<CoordinatorActionBean> actions = _testGetActionsSubsetOrderBy(job.getId(), 1, 2, true); @@ -152,9 +294,7 @@ public class TestCoordJobGetActionsSubsetJPAExecutor extends XDataTestCase { assertNotNull(jpaService); CoordJobGetActionsSubsetJPAExecutor actionGetCmd = new CoordJobGetActionsSubsetJPAExecutor(jobId, null, start, len, order); - List<CoordinatorActionBean> actions = jpaService.execute(actionGetCmd); - return actions; - + return jpaService.execute(actionGetCmd); } // Check status filters for Coordinator actions @@ -165,49 +305,50 @@ public class TestCoordJobGetActionsSubsetJPAExecutor extends XDataTestCase { // Add Coordinator action with nominal time: 2009-02-01T23:59Z addRecordToCoordActionTable(job.getId(), 2, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0); // Create lists for status filter to test positive filter - Map<String, List<String>> filterMap = new HashMap<String, List<String>>(); - List<String> positiveFilter = new ArrayList<String>(); + Map<Pair<String, FILTER_COMPARATORS>, List<Object>> + filterMap = new HashMap<Pair<String, FILTER_COMPARATORS>, List<Object>>(); + List<Object> positiveFilter = new ArrayList<Object>(); positiveFilter.add("RUNNING"); positiveFilter.add("KILLED"); - filterMap.put(CoordinatorEngine.POSITIVE_FILTER, positiveFilter); + filterMap.put(POSITIVE_STATUS_FILTER, positiveFilter); List<CoordinatorActionBean> actions = _testGetActionsSubsetFilter(job.getId(), 1, filterMap, 1, 2); assertEquals(actions.size(), 1); assertEquals(actions.get(0).getActionNumber(), 1); // Create lists for status filter to test negative filter filterMap.clear(); - List<String> negativeFilter = new ArrayList<String>(); + List<Object> negativeFilter = new ArrayList<Object>(); negativeFilter.add("WAITING"); negativeFilter.add("KILLED"); - filterMap.put(CoordinatorEngine.NEGATIVE_FILTER, negativeFilter); + filterMap.put(NEGATIVE_STATUS_FILTER, negativeFilter); actions = _testGetActionsSubsetFilter(job.getId(), 1, filterMap, 1, 2); assertEquals(actions.size(), 1); assertEquals(actions.get(0).getActionNumber(), 1); // Test Combination of include/exclude filters - no dup filterMap.clear(); - filterMap.put(CoordinatorEngine.POSITIVE_FILTER, positiveFilter); - filterMap.put(CoordinatorEngine.NEGATIVE_FILTER, negativeFilter); + filterMap.put(POSITIVE_STATUS_FILTER, positiveFilter); + filterMap.put(NEGATIVE_STATUS_FILTER, negativeFilter); actions = _testGetActionsSubsetFilter(job.getId(), 1, filterMap, 1, 2); assertEquals(actions.size(), 1); assertEquals(actions.get(0).getActionNumber(), 1); // Test Combination of include/exclude filters - dup --> no result filterMap.clear(); - filterMap.put(CoordinatorEngine.POSITIVE_FILTER, positiveFilter); - filterMap.put(CoordinatorEngine.NEGATIVE_FILTER, positiveFilter); + filterMap.put(POSITIVE_STATUS_FILTER, positiveFilter); + filterMap.put(NEGATIVE_STATUS_FILTER, positiveFilter); actions = _testGetActionsSubsetFilter(job.getId(), 1, filterMap, 1, 2); assertEquals(actions.size(), 0); } // Check whether actions are retrieved based on the filter values for status private List<CoordinatorActionBean> _testGetActionsSubsetFilter(String jobId, int actionNum, - Map<String, List<String>> filterMap, int start, int len) throws JPAExecutorException { + Map<Pair<String, FILTER_COMPARATORS>, List<Object>> filterMap, int start, + int len) throws JPAExecutorException { JPAService jpaService = Services.get().get(JPAService.class); assertNotNull(jpaService); CoordJobGetActionsSubsetJPAExecutor actionGetCmd = new CoordJobGetActionsSubsetJPAExecutor(jobId, filterMap, start, len, false); - List<CoordinatorActionBean> actions = jpaService.execute(actionGetCmd); - return actions; + return jpaService.execute(actionGetCmd); } public void testGetActionAllColumns() throws Exception { http://git-wip-us.apache.org/repos/asf/oozie/blob/d5cd976a/docs/src/site/twiki/DG_CommandLineTool.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/DG_CommandLineTool.twiki b/docs/src/site/twiki/DG_CommandLineTool.twiki index 4a07711..c65acbd 100644 --- a/docs/src/site/twiki/DG_CommandLineTool.twiki +++ b/docs/src/site/twiki/DG_CommandLineTool.twiki @@ -40,11 +40,15 @@ usage: -dryrun Dryrun a workflow (since 3.3.2) or coordinator (since 2.0) job without actually executing it -info <arg> info of a job -kill <arg> kill a job (coordinator requires -action or -date) - -len <arg> number of actions (default TOTAL ACTIONS, requires -info) - -filter <arg> status=<S1>[;status=<S2>]* or status!=<S1>[;status!=<S2>]* - All coordinator actions satisfying the status filters will be retrieved. - Positive filters '=' concatenated with 'OR' and negative filters '!=' with 'AND'. - Currently, only supported for coordinator job. + -len <arg> number of actions to be returned, used for pagination(default 1000, requires -info) + -filter <arg> All coordinator actions satisfying the filter will be retrieved. + Filter is of the format <key><comparator><value>[;<key><comparator><value>]* + key: status or nominaltime + comparator: =, !=, <, <=, >, >= + value: valid status like SUCCEEDED, KILLED, RUNNING etc. Only = and != apply for status + nominalTime is valid date of the format yyyy-MM-dd'T'HH:mm'Z' (like 2014-06-01T00:00Z) + Filter with '=' is concatenated with 'OR' and other filters are concatenated with 'AND'. + Currently, supported only for coordinator job. -order <arg> order to show coordinator actions (default ascending order, 'desc' for descending order, requires -info) Currently, only supported for coordinator job. -localtime use local time (same as passing your time zone to -timezone). @@ -52,7 +56,8 @@ usage: -log <arg> job log -nocleanup do not clean up output-events of the coordinator rerun actions (requires -rerun) - -offset <arg> job info offset of actions (default '1', requires -info) + -offset <arg> offset of actions returned relative to all actions matching the filter criteria, + used for pagination (default '1', requires -info) -oozie <arg> Oozie URL -refresh re-materialize the coordinator rerun actions (requires -rerun) -rerun <arg> rerun a job (coordinator requires -action or -date; bundle requires -coordinator or -date) @@ -189,14 +194,14 @@ If the option is not provided and the environment variable is not set, the =oozi ---+++ Time zone -The <code>-timezone TIME_ZONE_ID</code> option in the =job= and =jobs= sub-commands allows you to specify the time zone to use in +The <code>-timezone TIME_ZONE_ID</code> option in the =job= and =jobs= sub-commands allows you to specify the time zone to use in the output of those sub-commands. The <code>TIME_ZONE_ID</code> should be one of the standard Java Time Zone IDs. You can get a list of the available time zones with the command =oozie info -timezones=. -If the <code>-localtime</code> option is used, it will cause Oozie to use whatever the time zone is of the machine. If -both <code>-localtime</code> and <code>-timezone TIME_ZONE_ID</code> are used, the <code>-localtime</code> option will override -the <code>-timezone TIME_ZONE_ID</code> option. If neither option is given, Oozie will look for the =OOZIE_TIMEZONE= environment -variable and uses it if set. If neither option is given and the environment variable is not set, or if Oozie is given an invalid +If the <code>-localtime</code> option is used, it will cause Oozie to use whatever the time zone is of the machine. If +both <code>-localtime</code> and <code>-timezone TIME_ZONE_ID</code> are used, the <code>-localtime</code> option will override +the <code>-timezone TIME_ZONE_ID</code> option. If neither option is given, Oozie will look for the =OOZIE_TIMEZONE= environment +variable and uses it if set. If neither option is given and the environment variable is not set, or if Oozie is given an invalid time zone, it will use GMT. ---+++ Debug Mode @@ -480,20 +485,28 @@ hadoop1 map-reduce OK end job_200904281535_0254 The =info= option can display information about a workflow job or coordinator job or coordinator action. The =info= option for a Coordinator job will retrieve the Coordinator actions ordered by nominal time. However, the =info= command may timeout if the number of Coordinator actions are very high. In that case, =info= should be used with =offset= and =len= option. -The =offset= and =len= option specifies the offset and number of actions to display, if checking a workflow job or coordinator job. +The =offset= and =len= option should be used for pagination. offset determines the start offset of the action +returned among all the actions that matched the filter criteria. len determines number of actions to be returned. The =localtime= option displays times in local time, if not specified times are displayed in GMT. -The =verbose= option gives more detailed information for all the actions, if checking for workflow job or coordinator job. - -The =filter= option can be used to filter coordinator actions based on their status. -The filter option syntax is: <code>[status=VALUE][\;status=VALUE]* or [status!=VALUE][\;status!=VALUE]*</code>. +The =filter= option can be used to filter coordinator actions based on some criteria. +The filter option syntax is: <code><key><comparator><value>[;<key><comparator><value>]*</code>. (Note escape <code>\</code> needed before semicolon to specify multiple names for filter in shell) -Multiple values must be specified as different name value pairs. -When multiple positive filters <code>=</code> are specified, all Coordinator actions that satisfy any one of the filters will be retrieved.(The query will do an OR among all the positive filter values for the status) -When multiple negative filters <code>!=</code> are specified, all Coordinator actions that satisfy all of the filters will be retrieved.(The query will do an AND among all the negative filter values for the status) +key: status or nominalTime +comparator: =, !=, <, <=, >, >= +value: valid status like SUCCEEDED, KILLED, RUNNING etc. Only = and != apply for status +value for nominalTime is valid date of the format yyyy-MM-dd'T'HH:mm'Z' (like 2014-06-01T00:00Z) + +Multiple values must be specified as different name value pairs. The query is formed by doing AND of all conditions, +with the exception of = which uses OR if there are multiple values for the same key. For example, +filter 'status=RUNNING;status=WAITING;nominalTime>=2014-06-01T00:00Z' maps to query (status = RUNNING OR status = +WAITING) AND nominalTime >= 2014-06-01T00:00Z which returns all waiting or running actions with nominalTime >= +2014-06-01T00:00Z. + Currently, the filter option can be used only with an =info= option on Coordinator job. +The =verbose= option gives more detailed information for all the actions, if checking for workflow job or coordinator job. An example below shows how the =verbose= option can be used to gather action statistics information for a job: <verbatim> @@ -1360,7 +1373,7 @@ must be placed before the "-command" argument. ---++ Info Operations -The Info sub-command provides a convenient place for Oozie to display misc information. +The Info sub-command provides a convenient place for Oozie to display misc information. ---+++ Getting a list of time zones @@ -1381,12 +1394,12 @@ Available Time Zones : HAST (America/Adak) HAST (America/Atka) HST (HST) - ... + ... </verbatim> -The <code>-timezones</code> option will print out a (long) list of all available time zones. +The <code>-timezones</code> option will print out a (long) list of all available time zones. -These IDs (the text in the parentheses) are what should be used for the <code>-timezone TIME_ZONE_ID</code> option in the =job= +These IDs (the text in the parentheses) are what should be used for the <code>-timezone TIME_ZONE_ID</code> option in the =job= and =jobs= sub-commands ---++ Map-reduce Operations http://git-wip-us.apache.org/repos/asf/oozie/blob/d5cd976a/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 7ce842b..4b33e0a 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.2.0 release (trunk - unreleased) +OOZIE-1950 Coordinator job info should support timestamp (nominal time) (shwethags) OOZIE-1813 Add service to report/kill rogue bundles and coordinator jobs (puru) OOZIE-1847 HA - Oozie servers should shutdown (or go in safe mode) in case of ZK failure (puru) OOZIE-1957 Coord update command override group when oozie.service.AuthorizationService.default.group.as.acl is set and group/acl is not configured in job property (puru)
