Repository: oozie
Updated Branches:
  refs/heads/master e7f002073 -> 402678730


OOZIE-1704 Add ability to use Bulk API with bundle ID (mona)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/40267873
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/40267873
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/40267873

Branch: refs/heads/master
Commit: 40267873071a5c039690a5550ec9e4444512f16d
Parents: e7f0020
Author: Mona Chitnis <[email protected]>
Authored: Fri Mar 21 00:56:11 2014 -0700
Committer: Mona Chitnis <[email protected]>
Committed: Fri Mar 21 00:56:11 2014 -0700

----------------------------------------------------------------------
 .../java/org/apache/oozie/cli/OozieCLI.java     |   4 +-
 .../java/org/apache/oozie/BundleEngine.java     |   4 +-
 .../java/org/apache/oozie/BundleJobBean.java    |   2 +-
 .../oozie/client/rest/BulkResponseImpl.java     |   8 +-
 .../oozie/executor/jpa/BulkJPAExecutor.java     | 199 ++++++++++++-------
 .../jpa/TestBulkMonitorJPAExecutor.java         |  36 +++-
 .../servlet/TestBulkMonitorWebServiceAPI.java   |  63 +++++-
 .../org/apache/oozie/test/XDataTestCase.java    |   3 +-
 release-log.txt                                 |   1 +
 9 files changed, 232 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/40267873/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/oozie/cli/OozieCLI.java 
b/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
index 3c780dd..87e2f27 100644
--- a/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
+++ b/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
@@ -362,8 +362,8 @@ public class OozieCLI {
         Option doAs = new Option(DO_AS_OPTION, true, "doAs user, impersonates 
as the specified user");
         Option bulkMonitor = new Option(BULK_OPTION, true, "key-value pairs to 
filter bulk jobs response. e.g. bundle=<B>\\;" +
                 
"coordinators=<C>\\;actionstatus=<S>\\;startcreatedtime=<SC>\\;endcreatedtime=<EC>\\;"
 +
-                "startscheduledtime=<SS>\\;endscheduledtime=<ES>\\; 
coordinators and actionstatus can be multiple comma separated values" +
-                "bundle and coordinators are 'names' of those jobs. Bundle 
name is mandatory, other params are optional");
+                "startscheduledtime=<SS>\\;endscheduledtime=<ES>\\; bundle, 
coordinators and actionstatus can be multiple comma separated values" +
+                "bundle and coordinators can be id(s) or appName(s) of those 
jobs. Specifying bundle is mandatory, other params are optional");
         start.setType(Integer.class);
         len.setType(Integer.class);
         Options jobsOptions = new Options();

http://git-wip-us.apache.org/repos/asf/oozie/blob/40267873/core/src/main/java/org/apache/oozie/BundleEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/BundleEngine.java 
b/core/src/main/java/org/apache/oozie/BundleEngine.java
index ce7c9c4..5bf1538 100644
--- a/core/src/main/java/org/apache/oozie/BundleEngine.java
+++ b/core/src/main/java/org/apache/oozie/BundleEngine.java
@@ -455,8 +455,8 @@ public class BundleEngine extends BaseEngine {
                     throw new BundleEngineException(ErrorCode.E0420, token, 
"elements must be name=value pairs");
                 }
             }
-            
if(!bulkFilter.containsKey(BulkResponseImpl.BULK_FILTER_BUNDLE_NAME)) {
-                throw new BundleEngineException(ErrorCode.E0305, 
BulkResponseImpl.BULK_FILTER_BUNDLE_NAME);
+            if (!bulkFilter.containsKey(BulkResponseImpl.BULK_FILTER_BUNDLE)) {
+                throw new BundleEngineException(ErrorCode.E0305, 
BulkResponseImpl.BULK_FILTER_BUNDLE);
             }
         }
         return bulkFilter;

http://git-wip-us.apache.org/repos/asf/oozie/blob/40267873/core/src/main/java/org/apache/oozie/BundleJobBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/BundleJobBean.java 
b/core/src/main/java/org/apache/oozie/BundleJobBean.java
index 2c06035..4c37e57 100644
--- a/core/src/main/java/org/apache/oozie/BundleJobBean.java
+++ b/core/src/main/java/org/apache/oozie/BundleJobBean.java
@@ -96,7 +96,7 @@ import org.json.simple.JSONObject;
 
         @NamedQuery(name = "GET_COMPLETED_BUNDLE_JOBS_OLDER_THAN", query = 
"select w.id from BundleJobBean w where ( w.statusStr = 'SUCCEEDED' OR 
w.statusStr = 'FAILED' OR w.statusStr = 'KILLED' OR w.statusStr = 
'DONEWITHERROR') AND w.lastModifiedTimestamp <= :lastModTime order by 
w.lastModifiedTimestamp"),
 
-        @NamedQuery(name = "BULK_MONITOR_BUNDLE_QUERY", query = "SELECT b.id, 
b.statusStr, b.user FROM BundleJobBean b WHERE b.appName = :appName"),
+        @NamedQuery(name = "BULK_MONITOR_BUNDLE_QUERY", query = "SELECT b.id, 
b.appName, b.statusStr, b.user FROM BundleJobBean b"),
 
         // Join query
         @NamedQuery(name = "BULK_MONITOR_ACTIONS_QUERY", query = "SELECT a.id, 
a.actionNumber, a.errorCode, a.errorMessage, a.externalId, " +

http://git-wip-us.apache.org/repos/asf/oozie/blob/40267873/core/src/main/java/org/apache/oozie/client/rest/BulkResponseImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/client/rest/BulkResponseImpl.java 
b/core/src/main/java/org/apache/oozie/client/rest/BulkResponseImpl.java
index 4493ced..d9998c7 100644
--- a/core/src/main/java/org/apache/oozie/client/rest/BulkResponseImpl.java
+++ b/core/src/main/java/org/apache/oozie/client/rest/BulkResponseImpl.java
@@ -37,8 +37,8 @@ public class BulkResponseImpl implements BulkResponse, 
JsonBean {
     private CoordinatorJobBean coordinator;
     private CoordinatorActionBean action;
 
-    public static final String BULK_FILTER_BUNDLE_NAME = "bundle";
-    public static final String BULK_FILTER_COORD_NAME = "coordinators";
+    public static final String BULK_FILTER_BUNDLE = "bundle";
+    public static final String BULK_FILTER_COORD = "coordinators";
     public static final String BULK_FILTER_LEVEL = "filterlevel";
     public static final String BULK_FILTER_STATUS = "actionstatus";
     public static final String BULK_FILTER_START_CREATED_EPOCH = 
"startcreatedtime";
@@ -51,8 +51,8 @@ public class BulkResponseImpl implements BulkResponse, 
JsonBean {
 
     static {
 
-        BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_BUNDLE_NAME);
-        BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_COORD_NAME);
+        BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_BUNDLE);
+        BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_COORD);
         BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_LEVEL);
         BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_STATUS);
         
BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_START_CREATED_EPOCH);

http://git-wip-us.apache.org/repos/asf/oozie/blob/40267873/core/src/main/java/org/apache/oozie/executor/jpa/BulkJPAExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/executor/jpa/BulkJPAExecutor.java 
b/core/src/main/java/org/apache/oozie/executor/jpa/BulkJPAExecutor.java
index 930e8a5..60e5624 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/BulkJPAExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/BulkJPAExecutor.java
@@ -26,6 +26,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import javax.persistence.EntityManager;
 import javax.persistence.Query;
@@ -52,6 +54,9 @@ public class BulkJPAExecutor implements 
JPAExecutor<BulkResponseInfo> {
     // defaults
     private int start = 1;
     private int len = 50;
+    private enum PARAM_TYPE {
+        ID, NAME
+    }
 
     public BulkJPAExecutor(Map<String, List<String>> bulkFilter, int start, 
int len) {
         ParamChecker.notNull(bulkFilter, "bulkFilter");
@@ -60,35 +65,27 @@ public class BulkJPAExecutor implements 
JPAExecutor<BulkResponseInfo> {
         this.len = len;
     }
 
-    /*
-     * (non-Javadoc)
-     * @see org.apache.oozie.executor.jpa.JPAExecutor#getName()
-     */
     @Override
     public String getName() {
         return "BulkJPAExecutor";
     }
 
-    /*
-     * (non-Javadoc)
-     * @see 
org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.EntityManager)
-     */
     @Override
     public BulkResponseInfo execute(EntityManager em) throws 
JPAExecutorException {
         List<BulkResponseImpl> responseList = new 
ArrayList<BulkResponseImpl>();
         Map<String, Timestamp> actionTimes = new HashMap<String, Timestamp>();
 
         try {
-            // Lightweight Query 1 on Bundle level to fetch the bundle job
-            // corresponding to name
-            BundleJobBean bundleBean = bundleQuery(em);
+            // Lightweight Query 1 on Bundle level to fetch the bundle job(s)
+            // corresponding to names or ids
+            List<BundleJobBean> bundleBeans = bundleQuery(em);
 
             // Join query between coordinator job and coordinator action tables
             // to get entries for specific bundleId only
-            String conditions = actionQuery(em, bundleBean, actionTimes, 
responseList);
+            String conditions = actionQuery(em, bundleBeans, actionTimes, 
responseList);
 
             // Query to get the count of records
-            long total = countQuery(conditions, em, bundleBean, actionTimes);
+            long total = countQuery(conditions, em, bundleBeans);
 
             BulkResponseInfo bulk = new BulkResponseInfo(responseList, start, 
len, total);
             return bulk;
@@ -98,36 +95,99 @@ public class BulkJPAExecutor implements 
JPAExecutor<BulkResponseInfo> {
         }
     }
 
+    /**
+     * build the bundle level query to get bundle beans for the specified ids 
or appnames
+     * @param em
+     * @return List BundleJobBeans
+     * @throws JPAExecutorException
+     */
     @SuppressWarnings("unchecked")
-    private BundleJobBean bundleQuery(EntityManager em) throws 
JPAExecutorException {
-        BundleJobBean bundleBean = new BundleJobBean();
-        String bundleName = 
bulkFilter.get(BulkResponseImpl.BULK_FILTER_BUNDLE_NAME).get(0);
+    private List<BundleJobBean> bundleQuery(EntityManager em) throws 
JPAExecutorException {
         Query q = em.createNamedQuery("BULK_MONITOR_BUNDLE_QUERY");
-        q.setParameter("appName", bundleName);
-        List<Object[]> bundles = (List<Object[]>) q.getResultList();
-        if (bundles.isEmpty()) {
-            throw new JPAExecutorException(ErrorCode.E0603, "No bundle entries 
found for bundle name: "
-                    + bundleName);
-        }
-        if (bundles.size() > 1) { // more than one bundles running with same
-                                  // name - ERROR. Fail fast
-            throw new JPAExecutorException(ErrorCode.E0603, "Non-unique 
bundles present for same bundle name: "
-                    + bundleName);
-        }
-        bundleBean = getBeanForBundleJob(bundles.get(0), bundleName);
-        return bundleBean;
+        StringBuilder bundleQuery = new StringBuilder(q.toString());
+
+        StringBuilder whereClause = null;
+        List<String> bundles = 
bulkFilter.get(BulkResponseImpl.BULK_FILTER_BUNDLE);
+        if (bundles != null) {
+            PARAM_TYPE type = getParamType(bundles.get(0), 'B');
+            if (type == PARAM_TYPE.NAME) {
+                whereClause = inClause(bundles, "appName", 'b');
+            }
+            else if (type == PARAM_TYPE.ID) {
+                whereClause = inClause(bundles, "id", 'b');
+            }
+
+            // Query: select <columns> from BundleJobBean b where b.id IN 
(...) _or_ b.appName IN (...)
+            bundleQuery.append(whereClause.replace(whereClause.indexOf("AND"), 
whereClause.indexOf("AND") + 3, "WHERE"));
+            List<Object[]> bundleObjs = (List<Object[]>) 
em.createQuery(bundleQuery.toString()).getResultList();
+            if (bundleObjs.isEmpty()) {
+                throw new JPAExecutorException(ErrorCode.E0603, "No entries 
found for given bundle(s)");
+            }
+
+            List<BundleJobBean> bundleBeans = new ArrayList<BundleJobBean>();
+            for (Object[] bundleElem : bundleObjs) {
+                bundleBeans.add(constructBundleBean(bundleElem));
+            }
+            return bundleBeans;
+        }
+        return null;
+    }
+
+    /**
+     * Validate and determine whether passed param is job-id or appname
+     * @param id
+     * @param job
+     * @return PARAM_TYPE
+     */
+    private PARAM_TYPE getParamType(String id, char job) {
+        Pattern p = Pattern.compile("\\d{7}-\\d{15}-oozie-[a-z]{4}-" + job);
+        Matcher m = p.matcher(id);
+        if (m.matches()) {
+            return PARAM_TYPE.ID;
+        }
+        return PARAM_TYPE.NAME;
     }
 
+    /**
+     * Compose the coord action level query comprising bundle id/appname 
filter and coord action
+     * status filter (if specified) and start-time or nominal-time filter (if 
specified)
+     * @param em
+     * @param bundles
+     * @param times
+     * @param responseList
+     * @return Query string
+     * @throws ParseException
+     */
     @SuppressWarnings("unchecked")
-    private String actionQuery(EntityManager em, BundleJobBean bundleBean,
+    private String actionQuery(EntityManager em, List<BundleJobBean> bundles,
             Map<String, Timestamp> times, List<BulkResponseImpl> responseList) 
throws ParseException {
         Query q = em.createNamedQuery("BULK_MONITOR_ACTIONS_QUERY");
         StringBuilder getActions = new StringBuilder(q.toString());
+        int offset = getActions.indexOf("ORDER");
         StringBuilder conditionClause = new StringBuilder();
-        
conditionClause.append(coordNamesClause(bulkFilter.get(BulkResponseImpl.BULK_FILTER_COORD_NAME)));
+
+        List<String> coords = 
bulkFilter.get(BulkResponseImpl.BULK_FILTER_COORD);
+        // Query: Select <columns> from CoordinatorActionBean a, 
CoordinatorJobBean c WHERE a.jobId = c.id
+        // AND c.bundleId = :bundleId AND c.appName/id IN (...)
+        if (coords != null) {
+            PARAM_TYPE type = getParamType(coords.get(0), 'C');
+            if (type == PARAM_TYPE.NAME) {
+                
conditionClause.append(inClause(bulkFilter.get(BulkResponseImpl.BULK_FILTER_COORD),
 "appName", 'c'));
+            }
+            else if (type == PARAM_TYPE.ID) {
+                
conditionClause.append(inClause(bulkFilter.get(BulkResponseImpl.BULK_FILTER_COORD),
 "id", 'c'));
+            }
+        }
+        // Query: Select <columns> from CoordinatorActionBean a, 
CoordinatorJobBean c WHERE a.jobId = c.id
+        // AND c.bundleId = :bundleId AND c.appName/id IN (...) AND 
a.statusStr IN (...)
         
conditionClause.append(statusClause(bulkFilter.get(BulkResponseImpl.BULK_FILTER_STATUS)));
-        int offset = getActions.indexOf("ORDER");
+        offset = getActions.indexOf("ORDER");
         getActions.insert(offset - 1, conditionClause);
+
+        // Query: Select <columns> from CoordinatorActionBean a, 
CoordinatorJobBean c WHERE a.jobId = c.id
+        // AND c.bundleId = :bundleId AND c.appName/id IN (...) AND 
a.statusStr IN (...)
+        // AND a.createdTimestamp >= startCreated _or_ a.createdTimestamp <= 
endCreated
+        // AND a.nominalTimestamp >= startNominal _or_ a.nominalTimestamp <= 
endNominal
         timesClause(getActions, offset, times);
         q = em.createQuery(getActions.toString());
         Iterator<Entry<String, Timestamp>> iter = times.entrySet().iterator();
@@ -135,41 +195,55 @@ public class BulkJPAExecutor implements 
JPAExecutor<BulkResponseInfo> {
             Entry<String, Timestamp> time = iter.next();
             q.setParameter(time.getKey(), time.getValue());
         }
-        q.setParameter("bundleId", bundleBean.getId());
         // pagination
         q.setFirstResult(start - 1);
         q.setMaxResults(len);
-
-        List<Object[]> response = q.getResultList();
-        for (Object[] r : response) {
-            BulkResponseImpl br = getResponseFromObject(bundleBean, r);
-            responseList.add(br);
+        // repeatedly execute above query for each bundle
+        for (BundleJobBean bundle : bundles) {
+            q.setParameter("bundleId", bundle.getId());
+            List<Object[]> response = q.getResultList();
+            for (Object[] r : response) {
+                BulkResponseImpl br = getResponseFromObject(bundle, r);
+                responseList.add(br);
+            }
         }
         return q.toString();
     }
 
-    private long countQuery(String clause, EntityManager em, BundleJobBean 
bundleBean, Map<String, Timestamp> times) {
+    /**
+     * Get total number of records for use with offset and len in API
+     * @param clause
+     * @param em
+     * @param bundles
+     * @return total count of coord actions
+     */
+    private long countQuery(String clause, EntityManager em, 
List<BundleJobBean> bundles) {
         Query q = em.createNamedQuery("BULK_MONITOR_COUNT_QUERY");
         StringBuilder getTotal = new StringBuilder(q.toString() + " ");
+        // Query: select COUNT(a) from CoordinatorActionBean a, 
CoordinatorJobBean c
+        // get entire WHERE clause from above i.e. actionQuery() for all 
conditions on coordinator job
+        // and action status and times
         getTotal.append(clause.substring(clause.indexOf("WHERE"), 
clause.indexOf("ORDER")));
+        int offset = getTotal.indexOf("bundleId");
+        List<String> bundleIds = new ArrayList<String>();
+        for (BundleJobBean bundle : bundles) {
+            bundleIds.add(bundle.getId());
+        }
+        // Query: select COUNT(a) from CoordinatorActionBean a, 
CoordinatorJobBean c WHERE ...
+        // AND c.bundleId IN (... list of bundle ids) i.e. replace single 
:bundleId with list
+        getTotal = getTotal.replace(offset - 6, offset + 20, 
inClause(bundleIds, "bundleId", 'c').toString());
         q = em.createQuery(getTotal.toString());
-        q.setParameter("bundleId", bundleBean.getId());
-        Iterator<Entry<String, Timestamp>> iter = times.entrySet().iterator();
-        while (iter.hasNext()) {
-            Entry<String, Timestamp> time = iter.next();
-            q.setParameter(time.getKey(), time.getValue());
-        }
         long total = ((Long) q.getSingleResult()).longValue();
         return total;
     }
 
-    // Form the where clause to filter by coordinator names
-    private StringBuilder coordNamesClause(List<String> coordNames) {
+    // Form the where clause to filter by coordinator appname/id
+    private StringBuilder inClause(List<String> values, String col, char type) 
{
         StringBuilder sb = new StringBuilder();
         boolean firstVal = true;
-        for (String name : nullToEmpty(coordNames)) {
+        for (String name : nullToEmpty(values)) {
             if (firstVal) {
-                sb.append(" AND c.appName IN (\'" + name + "\'");
+                sb.append(" AND " + type + "." + col + " IN (\'" + name + 
"\'");
                 firstVal = false;
             }
             else {
@@ -184,21 +258,8 @@ public class BulkJPAExecutor implements 
JPAExecutor<BulkResponseInfo> {
 
     // Form the where clause to filter by coord action status
     private StringBuilder statusClause(List<String> statuses) {
-        StringBuilder sb = new StringBuilder();
-        boolean firstVal = true;
-        for (String status : nullToEmpty(statuses)) {
-            if (firstVal) {
-                sb.append(" AND a.statusStr IN (\'" + status + "\'");
-                firstVal = false;
-            }
-            else {
-                sb.append(",\'" + status + "\'");
-            }
-        }
-        if (!firstVal) {
-            sb.append(") ");
-        }
-        else { // statuses was null. adding default
+        StringBuilder sb = inClause(statuses, "statusStr", 'a');
+        if (sb.length() == 0) { // statuses was null. adding default
             sb.append(" AND a.statusStr IN ('KILLED', 'FAILED') ");
         }
         return sb;
@@ -282,7 +343,7 @@ public class BulkJPAExecutor implements 
JPAExecutor<BulkResponseInfo> {
         return bean;
     }
 
-    private BundleJobBean getBeanForBundleJob(Object[] barr, String name) 
throws JPAExecutorException {
+    private BundleJobBean constructBundleBean(Object[] barr) throws 
JPAExecutorException {
         BundleJobBean bean = new BundleJobBean();
         if (barr[0] != null) {
             bean.setId((String) barr[0]);
@@ -291,12 +352,14 @@ public class BulkJPAExecutor implements 
JPAExecutor<BulkResponseInfo> {
             throw new JPAExecutorException(ErrorCode.E0603,
                     "bundleId returned by query is null - cannot retrieve bulk 
results");
         }
-        bean.setAppName(name);
         if (barr[1] != null) {
-            bean.setStatus(BundleJob.Status.valueOf((String) barr[1]));
+            bean.setAppName((String) barr[1]);
         }
         if (barr[2] != null) {
-            bean.setUser((String) barr[2]);
+            bean.setStatus(BundleJob.Status.valueOf((String) barr[2]));
+        }
+        if (barr[3] != null) {
+            bean.setUser((String) barr[3]);
         }
         return bean;
     }

http://git-wip-us.apache.org/repos/asf/oozie/blob/40267873/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkMonitorJPAExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkMonitorJPAExecutor.java
 
b/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkMonitorJPAExecutor.java
index 202d05f..4536398 100644
--- 
a/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkMonitorJPAExecutor.java
+++ 
b/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkMonitorJPAExecutor.java
@@ -93,7 +93,7 @@ public class TestBulkMonitorJPAExecutor extends XDataTestCase 
{
                     // bundle
         }
         catch (JPAExecutorException jex) {
-            assertTrue(jex.getMessage().contains("No bundle entries found"));
+            assertTrue(jex.getMessage().contains("No entries found for given 
bundle(s)"));
         }
     }
 
@@ -134,14 +134,40 @@ public class TestBulkMonitorJPAExecutor extends 
XDataTestCase {
         BulkJPAExecutor bulkjpa = new 
BulkJPAExecutor(BundleEngine.parseBulkFilter(request), 1, 10);
         try {
             jpaService.execute(bulkjpa);
-            fail(); // exception expected due to >1 records found for same
-                    // bundle name
         }
         catch (JPAExecutorException jex) {
-            assertTrue(jex.getMessage().contains("Non-unique bundles present 
for same bundle name"));
+            fail(); // should not throw exception as this case is now supported
         }
     }
 
+    public void testBundleId() throws Exception {
+        String request = "bundle=" + bundleId + ";actionstatus=FAILED;"
+                + 
"startcreatedtime=2012-07-21T00:00Z;endcreatedtime=2012-07-22T02:00Z";
+
+        List<BulkResponseImpl> brList = _execQuery(request);
+        assertEquals(1, brList.size()); // only 1 action satisfies the
+                                        // conditions
+        BulkResponseImpl br = brList.get(0);
+        assertEquals(bundleId, br.getBundle().getId());
+        assertEquals("Coord1", br.getCoordinator().getAppName());
+        assertEquals(CoordinatorAction.Status.FAILED, 
br.getAction().getStatus());
+        assertEquals(DateUtils.parseDateUTC(CREATE_TIME).toString(), 
br.getAction().getCreatedTime().toString());
+    }
+
+    public void testBundleIdWithCoordId() throws Exception {
+        // fetching coord Ids
+        JPAService jpaService = Services.get().get(JPAService.class);
+        List<String> coordIds = jpaService.execute(new 
CoordJobsGetFromParentIdJPAExecutor(bundleId, 10));
+
+        // there are 3 coordinators but giving range as only two of them
+        String coordIdsStr = coordIds.get(0) + "," + coordIds.get(1);
+        String request = "bundle=" + bundleId + ";coordinators=" + coordIdsStr 
+ ";actionstatus=KILLED";
+        List<BulkResponseImpl> brList = _execQuery(request);
+        assertEquals(2, brList.size()); // 2 actions satisfy the conditions
+        assertEquals(brList.get(0).getAction().getId(), "Coord1@2");
+        assertEquals(brList.get(1).getAction().getId(), "Coord2@1");
+    }
+
     private List<BulkResponseImpl> _execQuery(String request) throws 
JPAExecutorException, BundleEngineException {
         BulkJPAExecutor bulkjpa = new 
BulkJPAExecutor(BundleEngine.parseBulkFilter(request), 1, 10);
         BulkResponseInfo response = jpaService.execute(bulkjpa);
@@ -149,4 +175,4 @@ public class TestBulkMonitorJPAExecutor extends 
XDataTestCase {
         return response.getResponses();
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/40267873/core/src/test/java/org/apache/oozie/servlet/TestBulkMonitorWebServiceAPI.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/servlet/TestBulkMonitorWebServiceAPI.java 
b/core/src/test/java/org/apache/oozie/servlet/TestBulkMonitorWebServiceAPI.java
index 0a486b3..515d599 100644
--- 
a/core/src/test/java/org/apache/oozie/servlet/TestBulkMonitorWebServiceAPI.java
+++ 
b/core/src/test/java/org/apache/oozie/servlet/TestBulkMonitorWebServiceAPI.java
@@ -24,12 +24,15 @@ import java.net.URLEncoder;
 import java.util.Calendar;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
+
 import javax.servlet.http.HttpServletResponse;
 
 import org.apache.oozie.executor.jpa.BundleJobInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobsGetFromParentIdJPAExecutor;
 import org.apache.oozie.local.LocalOozie;
 import org.apache.oozie.BundleJobBean;
 import org.apache.oozie.CoordinatorActionBean;
@@ -151,10 +154,6 @@ public class TestBulkMonitorWebServiceAPI extends 
XDataTestCase {
                 JSONObject jcoord = (JSONObject) ((JSONObject) 
array.get(0)).get(JsonTags.BULK_RESPONSE_COORDINATOR);
                 JSONObject jaction = (JSONObject) ((JSONObject) 
array.get(0)).get(JsonTags.BULK_RESPONSE_ACTION);
 
-                assertNotNull(jbundle);
-                assertNotNull(jcoord);
-                assertNotNull(jaction);
-
                 assertEquals(jbundle.get(JsonTags.BUNDLE_JOB_NAME), 
"BUNDLE-TEST");
                 assertEquals(jcoord.get(JsonTags.COORDINATOR_JOB_NAME), 
"Coord1");
                 assertEquals(jcoord.get(JsonTags.COORDINATOR_JOB_STATUS), 
"RUNNING");
@@ -286,7 +285,61 @@ public class TestBulkMonitorWebServiceAPI extends 
XDataTestCase {
                 HttpURLConnection conn = (HttpURLConnection) 
url.openConnection();
                 // WS call will throw BAD_REQUEST code 400 error because no
                 // records found for this bundle
-                assertEquals(HttpServletResponse.SC_BAD_REQUEST, 
conn.getResponseCode());
+                assertFalse(HttpServletResponse.SC_BAD_REQUEST == 
conn.getResponseCode());
+
+                return null;
+            }
+        });
+    }
+
+    public void testBundleId() throws Exception {
+        runTest("/v1/jobs", V1JobsServlet.class, false, new Callable<Void>() {
+            public Void call() throws Exception {
+
+                String bulkRequest = "bundle=" + bundleId + 
";coordinators=Coord1;"
+                        + 
"actionStatus=FAILED;startcreatedtime=2012-07-21T00:00Z";
+                JSONArray array = _requestToServer(bulkRequest);
+
+                assertEquals(1, array.size());
+                JSONObject jbundle = (JSONObject) ((JSONObject) 
array.get(0)).get(JsonTags.BULK_RESPONSE_BUNDLE);
+                JSONObject jcoord = (JSONObject) ((JSONObject) 
array.get(0)).get(JsonTags.BULK_RESPONSE_COORDINATOR);
+                JSONObject jaction = (JSONObject) ((JSONObject) 
array.get(0)).get(JsonTags.BULK_RESPONSE_ACTION);
+
+                assertNotNull(jbundle);
+                assertNotNull(jcoord);
+                assertNotNull(jaction);
+
+                assertEquals(jbundle.get(JsonTags.BUNDLE_JOB_ID), bundleId);
+                assertEquals(jcoord.get(JsonTags.COORDINATOR_JOB_NAME), 
"Coord1");
+                assertEquals(jcoord.get(JsonTags.COORDINATOR_JOB_STATUS), 
"RUNNING");
+                assertEquals(jaction.get(JsonTags.COORDINATOR_ACTION_STATUS), 
"FAILED");
+                
assertEquals((jaction.get(JsonTags.COORDINATOR_ACTION_CREATED_TIME).toString().split(",
 "))[1],
+                        DateUtils.parseDateUTC(CREATE_TIME).toGMTString());
+                return null;
+            }
+        });
+    }
+
+    public void testBundleIdWithCoordId() throws Exception {
+        // fetching coord Ids
+        JPAService jpaService = Services.get().get(JPAService.class);
+        List<String> coordIds = jpaService.execute(new 
CoordJobsGetFromParentIdJPAExecutor(bundleId, 10));
+        // there are 3 coordinators but giving range as only two of them
+        final String coordIdsStr = coordIds.get(0) + "," + coordIds.get(1);
+
+        runTest("/v1/jobs", V1JobsServlet.class, false, new Callable<Void>() {
+            public Void call() throws Exception {
+                // giving range as 2 of the total 3 coordinators
+                String bulkRequest = "bundle=" + bundleId + ";coordinators=" + 
coordIdsStr + ";actionstatus=KILLED";
+                JSONArray array = _requestToServer(bulkRequest);
+
+                assertEquals(2, array.size());
+                JSONObject jbundle = (JSONObject) ((JSONObject) 
array.get(0)).get(JsonTags.BULK_RESPONSE_BUNDLE);
+                JSONObject jaction1 = (JSONObject) ((JSONObject) 
array.get(0)).get(JsonTags.BULK_RESPONSE_ACTION);
+                JSONObject jaction2 = (JSONObject) ((JSONObject) 
array.get(1)).get(JsonTags.BULK_RESPONSE_ACTION);
+
+                assertEquals(jaction1.get(JsonTags.COORDINATOR_ACTION_ID), 
"Coord1@2");
+                assertEquals(jaction2.get(JsonTags.COORDINATOR_ACTION_ID), 
"Coord2@1");
 
                 return null;
             }

http://git-wip-us.apache.org/repos/asf/oozie/blob/40267873/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java 
b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
index 317885b..423944a 100644
--- a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
+++ b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
@@ -110,6 +110,7 @@ public abstract class XDataTestCase extends XHCatTestCase {
             + "</sla:info>";
 
     protected String bundleName;
+    protected String bundleId;
     protected String CREATE_TIME = "2012-07-22T00:00Z";
 
     public XDataTestCase() {
@@ -1420,7 +1421,7 @@ public abstract class XDataTestCase extends XHCatTestCase 
{
         assertNotNull(jpaService);
         // adding the bundle job
         BundleJobBean bundle = 
addRecordToBundleJobTable(BundleJob.Status.RUNNING, false);
-        String bundleId = bundle.getId();
+        bundleId = bundle.getId();
         bundleName = bundle.getAppName();
 
         // adding coordinator job(s) for this bundle

http://git-wip-us.apache.org/repos/asf/oozie/blob/40267873/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 5b3a2da..dae44b3 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1704 Add ability to use Bulk API with bundle ID (mona)
 OOZIE-1718 Coord Job Query UPDATE_COORD_JOB_CHANGE does not update last 
modified time (mona)
 OOZIE-1693 UI timeout while loading job table (puru via rohini)
 OOZIE-1698 Action sharelib configuration document lacks the "oozie." prefix 
(qwertymaniac via rohini)

Reply via email to