Updated Branches:
  refs/heads/master 222b01458 -> 040e5f4d4

OOZIE-1668 Coord log streaming start and end time should be of action list 
start and end time (puru via rohini)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/040e5f4d
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/040e5f4d
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/040e5f4d

Branch: refs/heads/master
Commit: 040e5f4d4ca84872d04ea8b01c3c5be710ed428d
Parents: 222b014
Author: Rohini Palaniswamy <[email protected]>
Authored: Sun Jan 26 19:54:30 2014 -0800
Committer: Rohini Palaniswamy <[email protected]>
Committed: Sun Jan 26 19:54:30 2014 -0800

----------------------------------------------------------------------
 .../java/org/apache/oozie/BundleEngine.java     |   3 +-
 .../org/apache/oozie/CoordinatorActionBean.java |   6 +-
 .../org/apache/oozie/CoordinatorEngine.java     |  55 +++++-
 ...etActionModifiedDateForRangeJPAExecutor.java |  65 +++++++
 ...etActionRunningCountForRangeJPAExecutor.java |  57 ++++++
 .../oozie/util/CoordActionsInDateRange.java     |  38 +++-
 .../oozie/TestCoordinatorEngineStreamLog.java   | 186 +++++++++++--------
 release-log.txt                                 |   1 +
 8 files changed, 333 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/040e5f4d/core/src/main/java/org/apache/oozie/BundleEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/BundleEngine.java 
b/core/src/main/java/org/apache/oozie/BundleEngine.java
index e8c8f31..7f879be 100644
--- a/core/src/main/java/org/apache/oozie/BundleEngine.java
+++ b/core/src/main/java/org/apache/oozie/BundleEngine.java
@@ -253,7 +253,8 @@ public class BundleEngine extends BaseEngine {
             throw new BundleEngineException(ex);
         }
 
-        Services.get().get(XLogStreamingService.class).streamLog(filter, 
job.getCreatedTime(), new Date(), writer, params);
+        Date endTime = job.getEndTime() == null ? new Date() : 
job.getEndTime();
+        Services.get().get(XLogStreamingService.class).streamLog(filter, 
job.getCreatedTime(), endTime, writer, params);
     }
 
     /* (non-Javadoc)

http://git-wip-us.apache.org/repos/asf/oozie/blob/040e5f4d/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java 
b/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
index 5d8d6df..03a7ed8 100644
--- a/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
+++ b/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
@@ -157,7 +157,11 @@ import org.json.simple.JSONObject;
 
         @NamedQuery(name = "GET_ACTIONS_BY_DATES_FOR_KILL", query = "select 
a.id, a.jobId, a.statusStr, a.externalId, a.pending, a.nominalTimestamp, 
a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId AND 
(a.statusStr <> 'FAILED' AND a.statusStr <> 'KILLED' AND a.statusStr <> 
'SUCCEEDED' AND a.statusStr <> 'TIMEDOUT') AND a.nominalTimestamp >= :startTime 
AND a.nominalTimestamp <= :endTime"),
 
-        @NamedQuery(name = "GET_COORD_ACTIONS_COUNT", query = "select count(w) 
from CoordinatorActionBean w")})
+        @NamedQuery(name = "GET_COORD_ACTIONS_COUNT", query = "select count(w) 
from CoordinatorActionBean w"),
+
+        @NamedQuery(name = "GET_COORD_ACTIONS_COUNT_RUNNING_FOR_RANGE", query 
= "select count(w) from CoordinatorActionBean w where w.statusStr = 'RUNNING' 
and w.jobId= :jobId and w.id >= :startAction AND w.id <= :endAction"),
+
+        @NamedQuery(name = "GET_COORD_ACTIONS_MAX_MODIFIED_DATE_FOR_RANGE", 
query = "select max(w.lastModifiedTimestamp) from CoordinatorActionBean w where 
w.jobId= :jobId and w.id >= :startAction AND w.id <= :endAction")})
 
 @NamedNativeQueries({
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/040e5f4d/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/CoordinatorEngine.java 
b/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
index 57587c2..8c5e80c 100644
--- a/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
+++ b/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
@@ -20,6 +20,8 @@ package org.apache.oozie;
 import java.io.IOException;
 import java.io.Writer;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -280,6 +282,9 @@ public class CoordinatorEngine extends BaseEngine {
      */
     public void streamLog(String jobId, String logRetrievalScope, String 
logRetrievalType, Writer writer,
             Map<String, String[]> params) throws IOException, 
BaseEngineException, CommandException {
+
+        Date startTime = null;
+        Date endTime = null;
         XLogStreamer.Filter filter = new XLogStreamer.Filter();
         filter.setParameter(DagXLogInfoService.JOB, jobId);
         if (logRetrievalScope != null && logRetrievalType != null) {
@@ -343,7 +348,26 @@ public class CoordinatorEngine extends BaseEngine {
                     orSeparatedActions.insert(0, "(");
                     orSeparatedActions.append(")");
                 }
+
                 filter.setParameter(DagXLogInfoService.ACTION, 
orSeparatedActions.toString());
+                if (actionSet != null && actionSet.size() == 1) {
+                    CoordinatorActionBean actionBean = 
getCoordAction(actionSet.iterator().next());
+                    startTime = actionBean.getCreatedTime();
+                    endTime = 
actionBean.getStatus().equals(CoordinatorAction.Status.RUNNING) ? new Date() : 
actionBean
+                            .getLastModifiedTime();
+                }
+                else if (actionSet != null && actionSet.size() > 0) {
+                    List<String> tempList = new ArrayList<String>(actionSet);
+                    Collections.sort(tempList, new Comparator<String>() {
+                        public int compare(String a, String b) {
+                            return 
Integer.valueOf(a.substring(a.lastIndexOf("@") + 1)).compareTo(
+                                    
Integer.valueOf(b.substring(b.lastIndexOf("@") + 1)));
+                        }
+                    });
+                    startTime = 
getCoordAction(tempList.get(0)).getCreatedTime();
+                    endTime = 
CoordActionsInDateRange.getCoordActionsLastModifiedDate(jobId, tempList.get(0),
+                            tempList.get(tempList.size() - 1));
+                }
             }
             // if coordinator action logs are to be retrieved based on date 
range
             // this block gets the corresponding list of coordinator actions 
to be used by the log filter
@@ -369,10 +393,37 @@ public class CoordinatorEngine extends BaseEngine {
                     orSeparatedActions.append(")");
                 }
                 filter.setParameter(DagXLogInfoService.ACTION, 
orSeparatedActions.toString());
+
+                if (coordActionIdList != null && coordActionIdList.size() == 
1) {
+                    CoordinatorActionBean actionBean = 
getCoordAction(coordActionIdList.get(0));
+                    startTime = actionBean.getCreatedTime();
+                    endTime = 
actionBean.getStatus().equals(CoordinatorAction.Status.RUNNING) ? new Date() : 
actionBean
+                            .getLastModifiedTime();
+                }
+                else if (coordActionIdList != null && coordActionIdList.size() 
> 0) {
+                    Collections.sort(coordActionIdList, new 
Comparator<String>() {
+                        public int compare(String a, String b) {
+                            return 
Integer.valueOf(a.substring(a.lastIndexOf("@") + 1)).compareTo(
+                                    
Integer.valueOf(b.substring(b.lastIndexOf("@") + 1)));
+                        }
+                    });
+                    startTime = 
getCoordAction(coordActionIdList.get(0)).getCreatedTime();
+                    endTime = 
CoordActionsInDateRange.getCoordActionsLastModifiedDate(jobId, 
coordActionIdList.get(0),
+                            coordActionIdList.get(coordActionIdList.size() - 
1));
+                }
             }
         }
-        CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
-        Services.get().get(XLogStreamingService.class).streamLog(filter, 
job.getCreatedTime(), new Date(), writer, params);
+        if (startTime == null || endTime == null) {
+            CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
+            if (startTime == null) {
+                startTime = job.getCreatedTime();
+            }
+            if (endTime == null) {
+                endTime = job.getEndTime() == null ? new Date() : 
job.getEndTime();
+            }
+        }
+        //job.getActions()
+        Services.get().get(XLogStreamingService.class).streamLog(filter, 
startTime, endTime, writer, params);
     }
 
     /*

http://git-wip-us.apache.org/repos/asf/oozie/blob/040e5f4d/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionModifiedDateForRangeJPAExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionModifiedDateForRangeJPAExecutor.java
 
b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionModifiedDateForRangeJPAExecutor.java
new file mode 100644
index 0000000..dc4a313
--- /dev/null
+++ 
b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionModifiedDateForRangeJPAExecutor.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.executor.jpa;
+
+import java.sql.Timestamp;
+import java.util.Date;
+import java.util.List;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.ParamChecker;
+
+public class CoordJobGetActionModifiedDateForRangeJPAExecutor implements 
JPAExecutor<Date> {
+
+    private String jobId = null;
+    private String startAction, endAction;
+
+    public CoordJobGetActionModifiedDateForRangeJPAExecutor(String jobId, 
String startAction, String endAction) {
+        ParamChecker.notNull(jobId, "jobId");
+        this.jobId = jobId;
+        this.startAction = startAction;
+        this.endAction = endAction;
+    }
+
+    @Override
+    public String getName() {
+        return "CoordJobGetActionIdsForDateRangeJPAExecutor";
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public Date execute(EntityManager em) throws JPAExecutorException {
+        try {
+            Query q = 
em.createNamedQuery("GET_COORD_ACTIONS_MAX_MODIFIED_DATE_FOR_RANGE");
+            q.setParameter("jobId", jobId);
+            q.setParameter("startAction", startAction);
+            q.setParameter("endAction", endAction);
+            List<Timestamp> coordActionIds = q.getResultList();
+            return coordActionIds.isEmpty() ? new Date() : 
DateUtils.toDate(coordActionIds.get(0));
+        }
+        catch (Exception e) {
+            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/040e5f4d/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionRunningCountForRangeJPAExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionRunningCountForRangeJPAExecutor.java
 
b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionRunningCountForRangeJPAExecutor.java
new file mode 100644
index 0000000..cbcdb98
--- /dev/null
+++ 
b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionRunningCountForRangeJPAExecutor.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.util.ParamChecker;
+
+public class CoordJobGetActionRunningCountForRangeJPAExecutor implements 
JPAExecutor<Long> {
+
+    private String jobId = null;
+    private String startAction, endAction;
+
+    public CoordJobGetActionRunningCountForRangeJPAExecutor(String jobId, 
String startAction, String endAction) {
+        ParamChecker.notNull(jobId, "jobId");
+        this.jobId = jobId;
+        this.startAction = startAction;
+        this.endAction = endAction;
+    }
+
+    @Override
+    public String getName() {
+        return "CoordJobGetActionIdsForDateRangeJPAExecutor";
+    }
+
+    @Override
+    public Long execute(EntityManager em) throws JPAExecutorException {
+        try {
+            Query q = 
em.createNamedQuery("GET_COORD_ACTIONS_COUNT_RUNNING_FOR_RANGE");
+            q.setParameter("jobId", jobId);
+            q.setParameter("startAction", startAction);
+            q.setParameter("endAction", endAction);
+            Long count = (Long) q.getSingleResult();
+            return count;
+        }
+        catch (Exception e) {
+            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/040e5f4d/core/src/main/java/org/apache/oozie/util/CoordActionsInDateRange.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/util/CoordActionsInDateRange.java 
b/core/src/main/java/org/apache/oozie/util/CoordActionsInDateRange.java
index 6746922..fd21c45 100644
--- a/core/src/main/java/org/apache/oozie/util/CoordActionsInDateRange.java
+++ b/core/src/main/java/org/apache/oozie/util/CoordActionsInDateRange.java
@@ -28,9 +28,13 @@ import java.util.Set;
 import org.apache.oozie.CoordinatorActionBean;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.XException;
+import org.apache.oozie.command.CommandException;
+import 
org.apache.oozie.executor.jpa.CoordJobGetActionModifiedDateForRangeJPAExecutor;
 import 
org.apache.oozie.executor.jpa.CoordJobGetActionIdsForDateRangeJPAExecutor;
+import 
org.apache.oozie.executor.jpa.CoordJobGetActionRunningCountForRangeJPAExecutor;
 import 
org.apache.oozie.executor.jpa.CoordJobGetActionsByDatesForKillJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetActionsForDatesJPAExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
 
@@ -140,7 +144,7 @@ public class CoordActionsInDateRange {
             return list;
     }
 
-    /*
+    /**
      * Get coordinator action ids between given start and end time
      *
      * @param jobId coordinator job id
@@ -160,4 +164,36 @@ public class CoordActionsInDateRange {
         }
         return list;
     }
+
+    /**
+     * Gets the coordinator actions last modified date for range, if any 
action is running it return new date
+     *
+     * @param jobId the job id
+     * @param startAction the start action
+     * @param endAction the end action
+     * @return the coordinator actions last modified date
+     * @throws CommandException the command exception
+     */
+    public static Date getCoordActionsLastModifiedDate(String jobId, String 
startAction, String endAction)
+            throws CommandException {
+        JPAService jpaService = Services.get().get(JPAService.class);
+        ParamChecker.notEmpty(jobId, "jobId");
+        ParamChecker.notEmpty(startAction, "startAction");
+        ParamChecker.notEmpty(endAction, "endAction");
+
+        try {
+            long count = jpaService.execute(new 
CoordJobGetActionRunningCountForRangeJPAExecutor(jobId, startAction,
+                    endAction));
+            if (count == 0) {
+                return jpaService.execute(new 
CoordJobGetActionModifiedDateForRangeJPAExecutor(jobId, startAction, 
endAction));
+            }
+            else {
+                return new Date();
+            }
+        }
+        catch (JPAExecutorException je) {
+            throw new CommandException(je);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/040e5f4d/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java 
b/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java
index a29a07e..6f7403d 100644
--- a/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java
+++ b/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java
@@ -25,6 +25,8 @@ import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.io.Writer;
 import java.net.URI;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -80,11 +82,15 @@ public class TestCoordinatorEngineStreamLog extends 
XFsTestCase {
 
     static class DummyXLogStreamingService extends XLogStreamingService {
         Filter filter;
+        Date startTime;
+        Date endTime;
 
         @Override
         public void streamLog(Filter filter1, Date startTime, Date endTime, 
Writer writer, Map<String, String[]> params)
                 throws IOException {
             filter = filter1;
+            this.startTime = startTime;
+            this.endTime = endTime;
         }
     }
 
@@ -92,89 +98,121 @@ public class TestCoordinatorEngineStreamLog extends 
XFsTestCase {
         return new CoordinatorEngine(getTestUser());
     }
 
-    /**
-     * The log streaming itself is tested in
-     * {@link org.apache.oozie.service.TestXLogService}. Here we test only the
-     * fields that are injected into
-     * {@link org.apache.oozie.util.XLogStreamer.Filter} upon
-     * {@link CoordinatorEngine#streamLog(String, Writer)} invocation.
-     */
-    public void testStreamLog2() throws Exception {
+    public void testCoordLogStreaming() throws Exception {
         CoordinatorEngine ce = createCoordinatorEngine();
-        String jobId = runJobsImpl(ce);
-        ce.streamLog(jobId, new StringWriter(), new HashMap<String, 
String[]>());
+        final String jobId = runJobsImpl(ce, 6);
 
-        DummyXLogStreamingService service = (DummyXLogStreamingService) 
services.get(XLogStreamingService.class);
-        Filter filter = service.filter;
-
-        assertEquals(filter.getFilterParams().get(DagXLogInfoService.JOB), 
jobId);
-    }
+        CoordinatorJobBean cjb = ce.getCoordJob(jobId);
+        Date createdDate = cjb.getCreatedTime();
+        Date endDate = new Date();
+        assertTrue(endDate.after(createdDate));
 
-    /**
-     * Test method org.apache.oozie.CoordinatorEngine.streamLog(String, String,
-     * String, Writer) with null 2nd and 3rd arguments.
-     */
-    public void testStreamLog4NullNull() throws Exception {
-        CoordinatorEngine ce = createCoordinatorEngine();
-        String jobId = runJobsImpl(ce);
-        ce.streamLog(jobId, null, null, new StringWriter(), new 
HashMap<String, String[]>());
+        List<CoordinatorAction> list = cjb.getActions();
+        Collections.sort(list, new Comparator<CoordinatorAction>() {
+            public int compare(CoordinatorAction a, CoordinatorAction b) {
+                return a.getId().compareTo(b.getId());
+            }
+        });
 
+        // Test 1.to test if fields are injected
+        ce.streamLog(jobId, new StringWriter(), new HashMap<String, 
String[]>());
         DummyXLogStreamingService service = (DummyXLogStreamingService) 
services.get(XLogStreamingService.class);
         Filter filter = service.filter;
-
         assertEquals(filter.getFilterParams().get(DagXLogInfoService.JOB), 
jobId);
-    }
 
-    /**
-     * Test method org.apache.oozie.CoordinatorEngine.streamLog(String, String,
-     * String, Writer) with RestConstants.JOB_LOG_ACTION and non-null 2nd
-     * argument.
-     */
-    public void testStreamLog4JobLogAction() throws Exception {
-        CoordinatorEngine ce = createCoordinatorEngine();
-        String jobId = runJobsImpl(ce);
+        // Test2
+        // * Test method org.apache.oozie.CoordinatorEngine.streamLog(String,
+        // String,
+        // * String, Writer) with null 2nd and 3rd arguments.
+        // */
+        ce.streamLog(jobId, null, null, new StringWriter(), new 
HashMap<String, String[]>());
+        service = (DummyXLogStreamingService) 
services.get(XLogStreamingService.class);
+        filter = service.filter;
+        assertEquals(filter.getFilterParams().get(DagXLogInfoService.JOB), 
jobId);
 
-        ce.streamLog(jobId, "678, 123-127, 946", RestConstants.JOB_LOG_ACTION, 
new StringWriter(), new HashMap<String, String[]>());
+        // Test 3
+        // * Test method org.apache.oozie.CoordinatorEngine.streamLog(String,
+        // String,
+        // * String, Writer) with RestConstants.JOB_LOG_ACTION and non-null 2nd
+        // * argument.
 
-        DummyXLogStreamingService service = (DummyXLogStreamingService) 
services.get(XLogStreamingService.class);
-        Filter filter = service.filter;
+        ce.streamLog(jobId, "1, 3-4, 6", RestConstants.JOB_LOG_ACTION, new 
StringWriter(),
+                new HashMap<String, String[]>());
 
+        service = (DummyXLogStreamingService) 
services.get(XLogStreamingService.class);
+        filter = service.filter;
         assertEquals(jobId, 
filter.getFilterParams().get(DagXLogInfoService.JOB));
-        assertEquals("(" + jobId + "@678|" + jobId + "@123|" + jobId + "@124|" 
+ jobId + "@125|" + jobId + "@126|" + jobId
-                + "@127|" + jobId + "@946)", 
filter.getFilterParams().get(DagXLogInfoService.ACTION));
-    }
-
-    /**
-     * Test method org.apache.oozie.CoordinatorEngine.streamLog(String, String,
-     * String, Writer) with RestConstants.JOB_LOG_DATE.
-     */
-    public void testStreamLog4JobLogDate() throws Exception {
-        CoordinatorEngine ce = createCoordinatorEngine();
-        final String jobId = runJobsImpl(ce);
-
-        CoordinatorJobBean cjb = ce.getCoordJob(jobId);
-        Date createdDate = cjb.getCreatedTime();
-        Date endDate = new Date();
-        assertTrue(endDate.after(createdDate));
+        assertEquals("(" + jobId + "@1|" + jobId + "@3|" + jobId + "@4|" + 
jobId + "@6)",
+                filter.getFilterParams().get(DagXLogInfoService.ACTION));
 
+        // Test 4. testing with date range
         long middle = (createdDate.getTime() + endDate.getTime()) / 2;
         Date middleDate = new Date(middle);
-
-        ce.streamLog(jobId, DateUtils.formatDateOozieTZ(createdDate) + "::" + 
DateUtils.formatDateOozieTZ(middleDate) + ","
-                + DateUtils.formatDateOozieTZ(middleDate) + "::" + 
DateUtils.formatDateOozieTZ(endDate),
+        ce.streamLog(jobId, DateUtils.formatDateOozieTZ(createdDate) + "::" + 
DateUtils.formatDateOozieTZ(middleDate)
+                + "," + DateUtils.formatDateOozieTZ(middleDate) + "::" + 
DateUtils.formatDateOozieTZ(endDate),
                 RestConstants.JOB_LOG_DATE, new StringWriter(), new 
HashMap<String, String[]>());
-
-        DummyXLogStreamingService service = (DummyXLogStreamingService) 
services.get(XLogStreamingService.class);
-        Filter filter = service.filter;
-
+        service = (DummyXLogStreamingService) 
services.get(XLogStreamingService.class);
+        filter = service.filter;
         assertEquals(jobId, 
filter.getFilterParams().get(DagXLogInfoService.JOB));
         final String action = 
filter.getFilterParams().get(DagXLogInfoService.ACTION);
-        assertEquals("(" + jobId + "@1|" + jobId + "@2)", action);
+        assertEquals("(" + jobId + "@1|" + jobId + "@2|" + jobId + "@3|" + 
jobId + "@4|" + jobId + "@5|" + jobId
+                + "@6)", action);
+
+        // Test 5 testing with action list range
+        ce.streamLog(jobId, "2-4", RestConstants.JOB_LOG_ACTION, new 
StringWriter(), new HashMap<String, String[]>());
+        service = (DummyXLogStreamingService) 
services.get(XLogStreamingService.class);
+        assertEquals(list.get(1).getCreatedTime(), service.startTime);
+        assertEquals(list.get(3).getLastModifiedTime(), service.endTime);
+
+        // Test 6, testing with 1 action list
+        ce.streamLog(jobId, "5", RestConstants.JOB_LOG_ACTION, new 
StringWriter(), new HashMap<String, String[]>());
+        service = (DummyXLogStreamingService) 
services.get(XLogStreamingService.class);
+        assertEquals(list.get(4).getCreatedTime(), service.startTime);
+        assertEquals(list.get(4).getLastModifiedTime(), service.endTime);
+
+        // Test 7, testing with 1 action list + range
+        ce.streamLog(jobId, "1,2-4,5", RestConstants.JOB_LOG_ACTION, new 
StringWriter(),
+                new HashMap<String, String[]>());
+        service = (DummyXLogStreamingService) 
services.get(XLogStreamingService.class);
+        assertEquals(list.get(0).getCreatedTime(), service.startTime);
+        assertEquals(list.get(4).getLastModifiedTime(), service.endTime);
+
+        // Test 8, testing with out order range
+        ce.streamLog(jobId, "5,3-4,1", RestConstants.JOB_LOG_ACTION, new 
StringWriter(),
+                new HashMap<String, String[]>());
+        service = (DummyXLogStreamingService) 
services.get(XLogStreamingService.class);
+        assertEquals(list.get(0).getCreatedTime(), service.startTime);
+        assertEquals(list.get(4).getLastModifiedTime(), service.endTime);
+
+
+        // Test 9, testing with date range
+        ce.streamLog(
+                jobId,
+                DateUtils.formatDateOozieTZ(list.get(1).getCreatedTime()) + 
"::"
+                        + 
DateUtils.formatDateOozieTZ(list.get(4).getLastModifiedTime()) + ",",
+                RestConstants.JOB_LOG_DATE, new StringWriter(), new 
HashMap<String, String[]>());
+        service = (DummyXLogStreamingService) 
services.get(XLogStreamingService.class);
+        assertEquals(list.get(1).getCreatedTime().toString(), 
service.startTime.toString());
+        assertEquals(list.get(4).getLastModifiedTime().toString(), 
service.endTime.toString());
+
+        // Test 10, testing with multiple date range
+        ce.streamLog(
+                jobId,
+                DateUtils.formatDateOozieTZ(list.get(1).getCreatedTime()) + 
"::"
+                        + 
DateUtils.formatDateOozieTZ(list.get(2).getLastModifiedTime()) + ","
+                        + 
DateUtils.formatDateOozieTZ(list.get(3).getCreatedTime()) + "::"
+                        + 
DateUtils.formatDateOozieTZ(list.get(5).getLastModifiedTime()), 
RestConstants.JOB_LOG_DATE,
+                new StringWriter(), new HashMap<String, String[]>());
+        service = (DummyXLogStreamingService) 
services.get(XLogStreamingService.class);
+        assertEquals(list.get(1).getCreatedTime().toString(), 
service.startTime.toString());
+        assertEquals(list.get(5).getLastModifiedTime().toString(), 
service.endTime.toString());
+
     }
 
-    private String runJobsImpl(final CoordinatorEngine ce) throws Exception {
+    private String runJobsImpl(final CoordinatorEngine ce, int count) throws 
Exception {
         services.setService(DummyXLogStreamingService.class);
-        // need to re-define the parameters that are cleared upon the service 
reset:
+        // need to re-define the parameters that are cleared upon the service
+        // reset:
         new DagXLogInfoService().init(services);
 
         Configuration conf = new XConfiguration();
@@ -182,25 +220,26 @@ public class TestCoordinatorEngineStreamLog extends 
XFsTestCase {
         final String appPath = getTestCaseFileUri("coordinator.xml");
         final long now = System.currentTimeMillis();
         final String start = DateUtils.formatDateOozieTZ(new Date(now));
-        long e = now + 1000 * 119;
+        long e = now + 1000 * 60 * count;
         final String end = DateUtils.formatDateOozieTZ(new Date(e));
 
         String wfXml = IOUtils.getResourceAsString("wf-no-op.xml", -1);
         writeToFile(wfXml, getFsTestCaseDir(), "workflow.xml");
 
-        String appXml = "<coordinator-app name=\"NAME\" 
frequency=\"${coord:minutes(1)}\" start=\"" + start + "\" end=\"" + end
-                + "\" timezone=\"UTC\" " + 
"xmlns=\"uri:oozie:coordinator:0.1\"> " + "<controls> " + "  
<timeout>10</timeout> "
-                + "  <concurrency>1</concurrency> " + "  
<execution>LIFO</execution> " + "</controls> " + "<action> "
-                + "  <workflow> " + "  <app-path>" + getFsTestCaseDir() + 
"/workflow.xml</app-path>"
+        String appXml = "<coordinator-app name=\"NAME\" 
frequency=\"${coord:minutes(1)}\" start=\"" + start
+                + "\" end=\"" + end + "\" timezone=\"UTC\" " + 
"xmlns=\"uri:oozie:coordinator:0.1\"> " + "<controls> "
+                + "  <timeout>1</timeout> " + "  <concurrency>1</concurrency> 
" + "  <execution>LIFO</execution> "
+                + "</controls> " + "<action> " + "  <workflow> " + "  
<app-path>" + getFsTestCaseDir()
+                + "/workflow.xml</app-path>"
                 + "  <configuration> <property> <name>inputA</name> 
<value>valueA</value> </property> "
-                + "  <property> <name>inputB</name> <value>valueB</value> " + 
"  </property></configuration> " + "</workflow>"
-                + "</action> " + "</coordinator-app>";
+                + "  <property> <name>inputB</name> <value>valueB</value> " + 
"  </property></configuration> "
+                + "</workflow>" + "</action> " + "</coordinator-app>";
         writeToFile(appXml, appPath);
         conf.set(OozieClient.COORDINATOR_APP_PATH, appPath);
         conf.set(OozieClient.USER_NAME, getTestUser());
 
         final String jobId = ce.submitJob(conf, true);
-        waitFor(1000 * 119, new Predicate() {
+        waitFor(1000 * 60 * count, new Predicate() {
             @Override
             public boolean evaluate() throws Exception {
                 try {
@@ -222,10 +261,11 @@ public class TestCoordinatorEngineStreamLog extends 
XFsTestCase {
                 }
             }
         });
-        // Assert all the actions are succeeded (useful for waitFor() timeout 
case):
+        // Assert all the actions are succeeded (useful for waitFor() timeout
+        // case):
         final List<CoordinatorAction> actions = 
ce.getCoordJob(jobId).getActions();
-        for (CoordinatorAction action: actions) {
-          assertEquals(CoordinatorAction.Status.SUCCEEDED, action.getStatus());
+        for (CoordinatorAction action : actions) {
+            assertEquals(CoordinatorAction.Status.SUCCEEDED, 
action.getStatus());
         }
         return jobId;
     }

http://git-wip-us.apache.org/repos/asf/oozie/blob/040e5f4d/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 05cb661..d4643c7 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1668 Coord log streaming start and end time should be of action list 
start and end time (puru via rohini)
 OOZIE-1674 DB upgrade from 3.3.0 to trunk fails on postgres (rkanter)
 OOZIE-1581 Workflow performance optimizations (mona)
 OOZIE-1663 Queuedump to display command type (shwethags via virag)

Reply via email to