Repository: oozie Updated Branches: refs/heads/master e7f002073 -> 402678730
OOZIE-1704 Add ability to use Bulk API with bundle ID (mona) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/40267873 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/40267873 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/40267873 Branch: refs/heads/master Commit: 40267873071a5c039690a5550ec9e4444512f16d Parents: e7f0020 Author: Mona Chitnis <[email protected]> Authored: Fri Mar 21 00:56:11 2014 -0700 Committer: Mona Chitnis <[email protected]> Committed: Fri Mar 21 00:56:11 2014 -0700 ---------------------------------------------------------------------- .../java/org/apache/oozie/cli/OozieCLI.java | 4 +- .../java/org/apache/oozie/BundleEngine.java | 4 +- .../java/org/apache/oozie/BundleJobBean.java | 2 +- .../oozie/client/rest/BulkResponseImpl.java | 8 +- .../oozie/executor/jpa/BulkJPAExecutor.java | 199 ++++++++++++------- .../jpa/TestBulkMonitorJPAExecutor.java | 36 +++- .../servlet/TestBulkMonitorWebServiceAPI.java | 63 +++++- .../org/apache/oozie/test/XDataTestCase.java | 3 +- release-log.txt | 1 + 9 files changed, 232 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/40267873/client/src/main/java/org/apache/oozie/cli/OozieCLI.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/oozie/cli/OozieCLI.java b/client/src/main/java/org/apache/oozie/cli/OozieCLI.java index 3c780dd..87e2f27 100644 --- a/client/src/main/java/org/apache/oozie/cli/OozieCLI.java +++ b/client/src/main/java/org/apache/oozie/cli/OozieCLI.java @@ -362,8 +362,8 @@ public class OozieCLI { Option doAs = new Option(DO_AS_OPTION, true, "doAs user, impersonates as the specified user"); Option bulkMonitor = new Option(BULK_OPTION, true, "key-value pairs to filter bulk jobs response. e.g. bundle=<B>\\;" + "coordinators=<C>\\;actionstatus=<S>\\;startcreatedtime=<SC>\\;endcreatedtime=<EC>\\;" + - "startscheduledtime=<SS>\\;endscheduledtime=<ES>\\; coordinators and actionstatus can be multiple comma separated values" + - "bundle and coordinators are 'names' of those jobs. Bundle name is mandatory, other params are optional"); + "startscheduledtime=<SS>\\;endscheduledtime=<ES>\\; bundle, coordinators and actionstatus can be multiple comma separated values" + + "bundle and coordinators can be id(s) or appName(s) of those jobs. Specifying bundle is mandatory, other params are optional"); start.setType(Integer.class); len.setType(Integer.class); Options jobsOptions = new Options(); http://git-wip-us.apache.org/repos/asf/oozie/blob/40267873/core/src/main/java/org/apache/oozie/BundleEngine.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/BundleEngine.java b/core/src/main/java/org/apache/oozie/BundleEngine.java index ce7c9c4..5bf1538 100644 --- a/core/src/main/java/org/apache/oozie/BundleEngine.java +++ b/core/src/main/java/org/apache/oozie/BundleEngine.java @@ -455,8 +455,8 @@ public class BundleEngine extends BaseEngine { throw new BundleEngineException(ErrorCode.E0420, token, "elements must be name=value pairs"); } } - if(!bulkFilter.containsKey(BulkResponseImpl.BULK_FILTER_BUNDLE_NAME)) { - throw new BundleEngineException(ErrorCode.E0305, BulkResponseImpl.BULK_FILTER_BUNDLE_NAME); + if (!bulkFilter.containsKey(BulkResponseImpl.BULK_FILTER_BUNDLE)) { + throw new BundleEngineException(ErrorCode.E0305, BulkResponseImpl.BULK_FILTER_BUNDLE); } } return bulkFilter; http://git-wip-us.apache.org/repos/asf/oozie/blob/40267873/core/src/main/java/org/apache/oozie/BundleJobBean.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/BundleJobBean.java b/core/src/main/java/org/apache/oozie/BundleJobBean.java index 2c06035..4c37e57 100644 --- a/core/src/main/java/org/apache/oozie/BundleJobBean.java +++ b/core/src/main/java/org/apache/oozie/BundleJobBean.java @@ -96,7 +96,7 @@ import org.json.simple.JSONObject; @NamedQuery(name = "GET_COMPLETED_BUNDLE_JOBS_OLDER_THAN", query = "select w.id from BundleJobBean w where ( w.statusStr = 'SUCCEEDED' OR w.statusStr = 'FAILED' OR w.statusStr = 'KILLED' OR w.statusStr = 'DONEWITHERROR') AND w.lastModifiedTimestamp <= :lastModTime order by w.lastModifiedTimestamp"), - @NamedQuery(name = "BULK_MONITOR_BUNDLE_QUERY", query = "SELECT b.id, b.statusStr, b.user FROM BundleJobBean b WHERE b.appName = :appName"), + @NamedQuery(name = "BULK_MONITOR_BUNDLE_QUERY", query = "SELECT b.id, b.appName, b.statusStr, b.user FROM BundleJobBean b"), // Join query @NamedQuery(name = "BULK_MONITOR_ACTIONS_QUERY", query = "SELECT a.id, a.actionNumber, a.errorCode, a.errorMessage, a.externalId, " + http://git-wip-us.apache.org/repos/asf/oozie/blob/40267873/core/src/main/java/org/apache/oozie/client/rest/BulkResponseImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/client/rest/BulkResponseImpl.java b/core/src/main/java/org/apache/oozie/client/rest/BulkResponseImpl.java index 4493ced..d9998c7 100644 --- a/core/src/main/java/org/apache/oozie/client/rest/BulkResponseImpl.java +++ b/core/src/main/java/org/apache/oozie/client/rest/BulkResponseImpl.java @@ -37,8 +37,8 @@ public class BulkResponseImpl implements BulkResponse, JsonBean { private CoordinatorJobBean coordinator; private CoordinatorActionBean action; - public static final String BULK_FILTER_BUNDLE_NAME = "bundle"; - public static final String BULK_FILTER_COORD_NAME = "coordinators"; + public static final String BULK_FILTER_BUNDLE = "bundle"; + public static final String BULK_FILTER_COORD = "coordinators"; public static final String BULK_FILTER_LEVEL = "filterlevel"; public static final String BULK_FILTER_STATUS = "actionstatus"; public static final String BULK_FILTER_START_CREATED_EPOCH = "startcreatedtime"; @@ -51,8 +51,8 @@ public class BulkResponseImpl implements BulkResponse, JsonBean { static { - BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_BUNDLE_NAME); - BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_COORD_NAME); + BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_BUNDLE); + BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_COORD); BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_LEVEL); BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_STATUS); BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_START_CREATED_EPOCH); http://git-wip-us.apache.org/repos/asf/oozie/blob/40267873/core/src/main/java/org/apache/oozie/executor/jpa/BulkJPAExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/BulkJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/BulkJPAExecutor.java index 930e8a5..60e5624 100644 --- a/core/src/main/java/org/apache/oozie/executor/jpa/BulkJPAExecutor.java +++ b/core/src/main/java/org/apache/oozie/executor/jpa/BulkJPAExecutor.java @@ -26,6 +26,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import javax.persistence.EntityManager; import javax.persistence.Query; @@ -52,6 +54,9 @@ public class BulkJPAExecutor implements JPAExecutor<BulkResponseInfo> { // defaults private int start = 1; private int len = 50; + private enum PARAM_TYPE { + ID, NAME + } public BulkJPAExecutor(Map<String, List<String>> bulkFilter, int start, int len) { ParamChecker.notNull(bulkFilter, "bulkFilter"); @@ -60,35 +65,27 @@ public class BulkJPAExecutor implements JPAExecutor<BulkResponseInfo> { this.len = len; } - /* - * (non-Javadoc) - * @see org.apache.oozie.executor.jpa.JPAExecutor#getName() - */ @Override public String getName() { return "BulkJPAExecutor"; } - /* - * (non-Javadoc) - * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.EntityManager) - */ @Override public BulkResponseInfo execute(EntityManager em) throws JPAExecutorException { List<BulkResponseImpl> responseList = new ArrayList<BulkResponseImpl>(); Map<String, Timestamp> actionTimes = new HashMap<String, Timestamp>(); try { - // Lightweight Query 1 on Bundle level to fetch the bundle job - // corresponding to name - BundleJobBean bundleBean = bundleQuery(em); + // Lightweight Query 1 on Bundle level to fetch the bundle job(s) + // corresponding to names or ids + List<BundleJobBean> bundleBeans = bundleQuery(em); // Join query between coordinator job and coordinator action tables // to get entries for specific bundleId only - String conditions = actionQuery(em, bundleBean, actionTimes, responseList); + String conditions = actionQuery(em, bundleBeans, actionTimes, responseList); // Query to get the count of records - long total = countQuery(conditions, em, bundleBean, actionTimes); + long total = countQuery(conditions, em, bundleBeans); BulkResponseInfo bulk = new BulkResponseInfo(responseList, start, len, total); return bulk; @@ -98,36 +95,99 @@ public class BulkJPAExecutor implements JPAExecutor<BulkResponseInfo> { } } + /** + * build the bundle level query to get bundle beans for the specified ids or appnames + * @param em + * @return List BundleJobBeans + * @throws JPAExecutorException + */ @SuppressWarnings("unchecked") - private BundleJobBean bundleQuery(EntityManager em) throws JPAExecutorException { - BundleJobBean bundleBean = new BundleJobBean(); - String bundleName = bulkFilter.get(BulkResponseImpl.BULK_FILTER_BUNDLE_NAME).get(0); + private List<BundleJobBean> bundleQuery(EntityManager em) throws JPAExecutorException { Query q = em.createNamedQuery("BULK_MONITOR_BUNDLE_QUERY"); - q.setParameter("appName", bundleName); - List<Object[]> bundles = (List<Object[]>) q.getResultList(); - if (bundles.isEmpty()) { - throw new JPAExecutorException(ErrorCode.E0603, "No bundle entries found for bundle name: " - + bundleName); - } - if (bundles.size() > 1) { // more than one bundles running with same - // name - ERROR. Fail fast - throw new JPAExecutorException(ErrorCode.E0603, "Non-unique bundles present for same bundle name: " - + bundleName); - } - bundleBean = getBeanForBundleJob(bundles.get(0), bundleName); - return bundleBean; + StringBuilder bundleQuery = new StringBuilder(q.toString()); + + StringBuilder whereClause = null; + List<String> bundles = bulkFilter.get(BulkResponseImpl.BULK_FILTER_BUNDLE); + if (bundles != null) { + PARAM_TYPE type = getParamType(bundles.get(0), 'B'); + if (type == PARAM_TYPE.NAME) { + whereClause = inClause(bundles, "appName", 'b'); + } + else if (type == PARAM_TYPE.ID) { + whereClause = inClause(bundles, "id", 'b'); + } + + // Query: select <columns> from BundleJobBean b where b.id IN (...) _or_ b.appName IN (...) + bundleQuery.append(whereClause.replace(whereClause.indexOf("AND"), whereClause.indexOf("AND") + 3, "WHERE")); + List<Object[]> bundleObjs = (List<Object[]>) em.createQuery(bundleQuery.toString()).getResultList(); + if (bundleObjs.isEmpty()) { + throw new JPAExecutorException(ErrorCode.E0603, "No entries found for given bundle(s)"); + } + + List<BundleJobBean> bundleBeans = new ArrayList<BundleJobBean>(); + for (Object[] bundleElem : bundleObjs) { + bundleBeans.add(constructBundleBean(bundleElem)); + } + return bundleBeans; + } + return null; + } + + /** + * Validate and determine whether passed param is job-id or appname + * @param id + * @param job + * @return PARAM_TYPE + */ + private PARAM_TYPE getParamType(String id, char job) { + Pattern p = Pattern.compile("\\d{7}-\\d{15}-oozie-[a-z]{4}-" + job); + Matcher m = p.matcher(id); + if (m.matches()) { + return PARAM_TYPE.ID; + } + return PARAM_TYPE.NAME; } + /** + * Compose the coord action level query comprising bundle id/appname filter and coord action + * status filter (if specified) and start-time or nominal-time filter (if specified) + * @param em + * @param bundles + * @param times + * @param responseList + * @return Query string + * @throws ParseException + */ @SuppressWarnings("unchecked") - private String actionQuery(EntityManager em, BundleJobBean bundleBean, + private String actionQuery(EntityManager em, List<BundleJobBean> bundles, Map<String, Timestamp> times, List<BulkResponseImpl> responseList) throws ParseException { Query q = em.createNamedQuery("BULK_MONITOR_ACTIONS_QUERY"); StringBuilder getActions = new StringBuilder(q.toString()); + int offset = getActions.indexOf("ORDER"); StringBuilder conditionClause = new StringBuilder(); - conditionClause.append(coordNamesClause(bulkFilter.get(BulkResponseImpl.BULK_FILTER_COORD_NAME))); + + List<String> coords = bulkFilter.get(BulkResponseImpl.BULK_FILTER_COORD); + // Query: Select <columns> from CoordinatorActionBean a, CoordinatorJobBean c WHERE a.jobId = c.id + // AND c.bundleId = :bundleId AND c.appName/id IN (...) + if (coords != null) { + PARAM_TYPE type = getParamType(coords.get(0), 'C'); + if (type == PARAM_TYPE.NAME) { + conditionClause.append(inClause(bulkFilter.get(BulkResponseImpl.BULK_FILTER_COORD), "appName", 'c')); + } + else if (type == PARAM_TYPE.ID) { + conditionClause.append(inClause(bulkFilter.get(BulkResponseImpl.BULK_FILTER_COORD), "id", 'c')); + } + } + // Query: Select <columns> from CoordinatorActionBean a, CoordinatorJobBean c WHERE a.jobId = c.id + // AND c.bundleId = :bundleId AND c.appName/id IN (...) AND a.statusStr IN (...) conditionClause.append(statusClause(bulkFilter.get(BulkResponseImpl.BULK_FILTER_STATUS))); - int offset = getActions.indexOf("ORDER"); + offset = getActions.indexOf("ORDER"); getActions.insert(offset - 1, conditionClause); + + // Query: Select <columns> from CoordinatorActionBean a, CoordinatorJobBean c WHERE a.jobId = c.id + // AND c.bundleId = :bundleId AND c.appName/id IN (...) AND a.statusStr IN (...) + // AND a.createdTimestamp >= startCreated _or_ a.createdTimestamp <= endCreated + // AND a.nominalTimestamp >= startNominal _or_ a.nominalTimestamp <= endNominal timesClause(getActions, offset, times); q = em.createQuery(getActions.toString()); Iterator<Entry<String, Timestamp>> iter = times.entrySet().iterator(); @@ -135,41 +195,55 @@ public class BulkJPAExecutor implements JPAExecutor<BulkResponseInfo> { Entry<String, Timestamp> time = iter.next(); q.setParameter(time.getKey(), time.getValue()); } - q.setParameter("bundleId", bundleBean.getId()); // pagination q.setFirstResult(start - 1); q.setMaxResults(len); - - List<Object[]> response = q.getResultList(); - for (Object[] r : response) { - BulkResponseImpl br = getResponseFromObject(bundleBean, r); - responseList.add(br); + // repeatedly execute above query for each bundle + for (BundleJobBean bundle : bundles) { + q.setParameter("bundleId", bundle.getId()); + List<Object[]> response = q.getResultList(); + for (Object[] r : response) { + BulkResponseImpl br = getResponseFromObject(bundle, r); + responseList.add(br); + } } return q.toString(); } - private long countQuery(String clause, EntityManager em, BundleJobBean bundleBean, Map<String, Timestamp> times) { + /** + * Get total number of records for use with offset and len in API + * @param clause + * @param em + * @param bundles + * @return total count of coord actions + */ + private long countQuery(String clause, EntityManager em, List<BundleJobBean> bundles) { Query q = em.createNamedQuery("BULK_MONITOR_COUNT_QUERY"); StringBuilder getTotal = new StringBuilder(q.toString() + " "); + // Query: select COUNT(a) from CoordinatorActionBean a, CoordinatorJobBean c + // get entire WHERE clause from above i.e. actionQuery() for all conditions on coordinator job + // and action status and times getTotal.append(clause.substring(clause.indexOf("WHERE"), clause.indexOf("ORDER"))); + int offset = getTotal.indexOf("bundleId"); + List<String> bundleIds = new ArrayList<String>(); + for (BundleJobBean bundle : bundles) { + bundleIds.add(bundle.getId()); + } + // Query: select COUNT(a) from CoordinatorActionBean a, CoordinatorJobBean c WHERE ... + // AND c.bundleId IN (... list of bundle ids) i.e. replace single :bundleId with list + getTotal = getTotal.replace(offset - 6, offset + 20, inClause(bundleIds, "bundleId", 'c').toString()); q = em.createQuery(getTotal.toString()); - q.setParameter("bundleId", bundleBean.getId()); - Iterator<Entry<String, Timestamp>> iter = times.entrySet().iterator(); - while (iter.hasNext()) { - Entry<String, Timestamp> time = iter.next(); - q.setParameter(time.getKey(), time.getValue()); - } long total = ((Long) q.getSingleResult()).longValue(); return total; } - // Form the where clause to filter by coordinator names - private StringBuilder coordNamesClause(List<String> coordNames) { + // Form the where clause to filter by coordinator appname/id + private StringBuilder inClause(List<String> values, String col, char type) { StringBuilder sb = new StringBuilder(); boolean firstVal = true; - for (String name : nullToEmpty(coordNames)) { + for (String name : nullToEmpty(values)) { if (firstVal) { - sb.append(" AND c.appName IN (\'" + name + "\'"); + sb.append(" AND " + type + "." + col + " IN (\'" + name + "\'"); firstVal = false; } else { @@ -184,21 +258,8 @@ public class BulkJPAExecutor implements JPAExecutor<BulkResponseInfo> { // Form the where clause to filter by coord action status private StringBuilder statusClause(List<String> statuses) { - StringBuilder sb = new StringBuilder(); - boolean firstVal = true; - for (String status : nullToEmpty(statuses)) { - if (firstVal) { - sb.append(" AND a.statusStr IN (\'" + status + "\'"); - firstVal = false; - } - else { - sb.append(",\'" + status + "\'"); - } - } - if (!firstVal) { - sb.append(") "); - } - else { // statuses was null. adding default + StringBuilder sb = inClause(statuses, "statusStr", 'a'); + if (sb.length() == 0) { // statuses was null. adding default sb.append(" AND a.statusStr IN ('KILLED', 'FAILED') "); } return sb; @@ -282,7 +343,7 @@ public class BulkJPAExecutor implements JPAExecutor<BulkResponseInfo> { return bean; } - private BundleJobBean getBeanForBundleJob(Object[] barr, String name) throws JPAExecutorException { + private BundleJobBean constructBundleBean(Object[] barr) throws JPAExecutorException { BundleJobBean bean = new BundleJobBean(); if (barr[0] != null) { bean.setId((String) barr[0]); @@ -291,12 +352,14 @@ public class BulkJPAExecutor implements JPAExecutor<BulkResponseInfo> { throw new JPAExecutorException(ErrorCode.E0603, "bundleId returned by query is null - cannot retrieve bulk results"); } - bean.setAppName(name); if (barr[1] != null) { - bean.setStatus(BundleJob.Status.valueOf((String) barr[1])); + bean.setAppName((String) barr[1]); } if (barr[2] != null) { - bean.setUser((String) barr[2]); + bean.setStatus(BundleJob.Status.valueOf((String) barr[2])); + } + if (barr[3] != null) { + bean.setUser((String) barr[3]); } return bean; } http://git-wip-us.apache.org/repos/asf/oozie/blob/40267873/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkMonitorJPAExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkMonitorJPAExecutor.java b/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkMonitorJPAExecutor.java index 202d05f..4536398 100644 --- a/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkMonitorJPAExecutor.java +++ b/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkMonitorJPAExecutor.java @@ -93,7 +93,7 @@ public class TestBulkMonitorJPAExecutor extends XDataTestCase { // bundle } catch (JPAExecutorException jex) { - assertTrue(jex.getMessage().contains("No bundle entries found")); + assertTrue(jex.getMessage().contains("No entries found for given bundle(s)")); } } @@ -134,14 +134,40 @@ public class TestBulkMonitorJPAExecutor extends XDataTestCase { BulkJPAExecutor bulkjpa = new BulkJPAExecutor(BundleEngine.parseBulkFilter(request), 1, 10); try { jpaService.execute(bulkjpa); - fail(); // exception expected due to >1 records found for same - // bundle name } catch (JPAExecutorException jex) { - assertTrue(jex.getMessage().contains("Non-unique bundles present for same bundle name")); + fail(); // should not throw exception as this case is now supported } } + public void testBundleId() throws Exception { + String request = "bundle=" + bundleId + ";actionstatus=FAILED;" + + "startcreatedtime=2012-07-21T00:00Z;endcreatedtime=2012-07-22T02:00Z"; + + List<BulkResponseImpl> brList = _execQuery(request); + assertEquals(1, brList.size()); // only 1 action satisfies the + // conditions + BulkResponseImpl br = brList.get(0); + assertEquals(bundleId, br.getBundle().getId()); + assertEquals("Coord1", br.getCoordinator().getAppName()); + assertEquals(CoordinatorAction.Status.FAILED, br.getAction().getStatus()); + assertEquals(DateUtils.parseDateUTC(CREATE_TIME).toString(), br.getAction().getCreatedTime().toString()); + } + + public void testBundleIdWithCoordId() throws Exception { + // fetching coord Ids + JPAService jpaService = Services.get().get(JPAService.class); + List<String> coordIds = jpaService.execute(new CoordJobsGetFromParentIdJPAExecutor(bundleId, 10)); + + // there are 3 coordinators but giving range as only two of them + String coordIdsStr = coordIds.get(0) + "," + coordIds.get(1); + String request = "bundle=" + bundleId + ";coordinators=" + coordIdsStr + ";actionstatus=KILLED"; + List<BulkResponseImpl> brList = _execQuery(request); + assertEquals(2, brList.size()); // 2 actions satisfy the conditions + assertEquals(brList.get(0).getAction().getId(), "Coord1@2"); + assertEquals(brList.get(1).getAction().getId(), "Coord2@1"); + } + private List<BulkResponseImpl> _execQuery(String request) throws JPAExecutorException, BundleEngineException { BulkJPAExecutor bulkjpa = new BulkJPAExecutor(BundleEngine.parseBulkFilter(request), 1, 10); BulkResponseInfo response = jpaService.execute(bulkjpa); @@ -149,4 +175,4 @@ public class TestBulkMonitorJPAExecutor extends XDataTestCase { return response.getResponses(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/oozie/blob/40267873/core/src/test/java/org/apache/oozie/servlet/TestBulkMonitorWebServiceAPI.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/servlet/TestBulkMonitorWebServiceAPI.java b/core/src/test/java/org/apache/oozie/servlet/TestBulkMonitorWebServiceAPI.java index 0a486b3..515d599 100644 --- a/core/src/test/java/org/apache/oozie/servlet/TestBulkMonitorWebServiceAPI.java +++ b/core/src/test/java/org/apache/oozie/servlet/TestBulkMonitorWebServiceAPI.java @@ -24,12 +24,15 @@ import java.net.URLEncoder; import java.util.Calendar; import java.util.Date; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.Callable; + import javax.servlet.http.HttpServletResponse; import org.apache.oozie.executor.jpa.BundleJobInsertJPAExecutor; import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor; +import org.apache.oozie.executor.jpa.CoordJobsGetFromParentIdJPAExecutor; import org.apache.oozie.local.LocalOozie; import org.apache.oozie.BundleJobBean; import org.apache.oozie.CoordinatorActionBean; @@ -151,10 +154,6 @@ public class TestBulkMonitorWebServiceAPI extends XDataTestCase { JSONObject jcoord = (JSONObject) ((JSONObject) array.get(0)).get(JsonTags.BULK_RESPONSE_COORDINATOR); JSONObject jaction = (JSONObject) ((JSONObject) array.get(0)).get(JsonTags.BULK_RESPONSE_ACTION); - assertNotNull(jbundle); - assertNotNull(jcoord); - assertNotNull(jaction); - assertEquals(jbundle.get(JsonTags.BUNDLE_JOB_NAME), "BUNDLE-TEST"); assertEquals(jcoord.get(JsonTags.COORDINATOR_JOB_NAME), "Coord1"); assertEquals(jcoord.get(JsonTags.COORDINATOR_JOB_STATUS), "RUNNING"); @@ -286,7 +285,61 @@ public class TestBulkMonitorWebServiceAPI extends XDataTestCase { HttpURLConnection conn = (HttpURLConnection) url.openConnection(); // WS call will throw BAD_REQUEST code 400 error because no // records found for this bundle - assertEquals(HttpServletResponse.SC_BAD_REQUEST, conn.getResponseCode()); + assertFalse(HttpServletResponse.SC_BAD_REQUEST == conn.getResponseCode()); + + return null; + } + }); + } + + public void testBundleId() throws Exception { + runTest("/v1/jobs", V1JobsServlet.class, false, new Callable<Void>() { + public Void call() throws Exception { + + String bulkRequest = "bundle=" + bundleId + ";coordinators=Coord1;" + + "actionStatus=FAILED;startcreatedtime=2012-07-21T00:00Z"; + JSONArray array = _requestToServer(bulkRequest); + + assertEquals(1, array.size()); + JSONObject jbundle = (JSONObject) ((JSONObject) array.get(0)).get(JsonTags.BULK_RESPONSE_BUNDLE); + JSONObject jcoord = (JSONObject) ((JSONObject) array.get(0)).get(JsonTags.BULK_RESPONSE_COORDINATOR); + JSONObject jaction = (JSONObject) ((JSONObject) array.get(0)).get(JsonTags.BULK_RESPONSE_ACTION); + + assertNotNull(jbundle); + assertNotNull(jcoord); + assertNotNull(jaction); + + assertEquals(jbundle.get(JsonTags.BUNDLE_JOB_ID), bundleId); + assertEquals(jcoord.get(JsonTags.COORDINATOR_JOB_NAME), "Coord1"); + assertEquals(jcoord.get(JsonTags.COORDINATOR_JOB_STATUS), "RUNNING"); + assertEquals(jaction.get(JsonTags.COORDINATOR_ACTION_STATUS), "FAILED"); + assertEquals((jaction.get(JsonTags.COORDINATOR_ACTION_CREATED_TIME).toString().split(", "))[1], + DateUtils.parseDateUTC(CREATE_TIME).toGMTString()); + return null; + } + }); + } + + public void testBundleIdWithCoordId() throws Exception { + // fetching coord Ids + JPAService jpaService = Services.get().get(JPAService.class); + List<String> coordIds = jpaService.execute(new CoordJobsGetFromParentIdJPAExecutor(bundleId, 10)); + // there are 3 coordinators but giving range as only two of them + final String coordIdsStr = coordIds.get(0) + "," + coordIds.get(1); + + runTest("/v1/jobs", V1JobsServlet.class, false, new Callable<Void>() { + public Void call() throws Exception { + // giving range as 2 of the total 3 coordinators + String bulkRequest = "bundle=" + bundleId + ";coordinators=" + coordIdsStr + ";actionstatus=KILLED"; + JSONArray array = _requestToServer(bulkRequest); + + assertEquals(2, array.size()); + JSONObject jbundle = (JSONObject) ((JSONObject) array.get(0)).get(JsonTags.BULK_RESPONSE_BUNDLE); + JSONObject jaction1 = (JSONObject) ((JSONObject) array.get(0)).get(JsonTags.BULK_RESPONSE_ACTION); + JSONObject jaction2 = (JSONObject) ((JSONObject) array.get(1)).get(JsonTags.BULK_RESPONSE_ACTION); + + assertEquals(jaction1.get(JsonTags.COORDINATOR_ACTION_ID), "Coord1@2"); + assertEquals(jaction2.get(JsonTags.COORDINATOR_ACTION_ID), "Coord2@1"); return null; } http://git-wip-us.apache.org/repos/asf/oozie/blob/40267873/core/src/test/java/org/apache/oozie/test/XDataTestCase.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java index 317885b..423944a 100644 --- a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java +++ b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java @@ -110,6 +110,7 @@ public abstract class XDataTestCase extends XHCatTestCase { + "</sla:info>"; protected String bundleName; + protected String bundleId; protected String CREATE_TIME = "2012-07-22T00:00Z"; public XDataTestCase() { @@ -1420,7 +1421,7 @@ public abstract class XDataTestCase extends XHCatTestCase { assertNotNull(jpaService); // adding the bundle job BundleJobBean bundle = addRecordToBundleJobTable(BundleJob.Status.RUNNING, false); - String bundleId = bundle.getId(); + bundleId = bundle.getId(); bundleName = bundle.getAppName(); // adding coordinator job(s) for this bundle http://git-wip-us.apache.org/repos/asf/oozie/blob/40267873/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 5b3a2da..dae44b3 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.1.0 release (trunk - unreleased) +OOZIE-1704 Add ability to use Bulk API with bundle ID (mona) OOZIE-1718 Coord Job Query UPDATE_COORD_JOB_CHANGE does not update last modified time (mona) OOZIE-1693 UI timeout while loading job table (puru via rohini) OOZIE-1698 Action sharelib configuration document lacks the "oozie." prefix (qwertymaniac via rohini)
