Updated Branches: refs/heads/master 222b01458 -> 040e5f4d4
OOZIE-1668 Coord log streaming start and end time should be of action list start and end time (puru via rohini) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/040e5f4d Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/040e5f4d Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/040e5f4d Branch: refs/heads/master Commit: 040e5f4d4ca84872d04ea8b01c3c5be710ed428d Parents: 222b014 Author: Rohini Palaniswamy <[email protected]> Authored: Sun Jan 26 19:54:30 2014 -0800 Committer: Rohini Palaniswamy <[email protected]> Committed: Sun Jan 26 19:54:30 2014 -0800 ---------------------------------------------------------------------- .../java/org/apache/oozie/BundleEngine.java | 3 +- .../org/apache/oozie/CoordinatorActionBean.java | 6 +- .../org/apache/oozie/CoordinatorEngine.java | 55 +++++- ...etActionModifiedDateForRangeJPAExecutor.java | 65 +++++++ ...etActionRunningCountForRangeJPAExecutor.java | 57 ++++++ .../oozie/util/CoordActionsInDateRange.java | 38 +++- .../oozie/TestCoordinatorEngineStreamLog.java | 186 +++++++++++-------- release-log.txt | 1 + 8 files changed, 333 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/040e5f4d/core/src/main/java/org/apache/oozie/BundleEngine.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/BundleEngine.java b/core/src/main/java/org/apache/oozie/BundleEngine.java index e8c8f31..7f879be 100644 --- a/core/src/main/java/org/apache/oozie/BundleEngine.java +++ b/core/src/main/java/org/apache/oozie/BundleEngine.java @@ -253,7 +253,8 @@ public class BundleEngine extends BaseEngine { throw new BundleEngineException(ex); } - Services.get().get(XLogStreamingService.class).streamLog(filter, job.getCreatedTime(), new Date(), writer, params); + Date endTime = job.getEndTime() == null ? new Date() : job.getEndTime(); + Services.get().get(XLogStreamingService.class).streamLog(filter, job.getCreatedTime(), endTime, writer, params); } /* (non-Javadoc) http://git-wip-us.apache.org/repos/asf/oozie/blob/040e5f4d/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java b/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java index 5d8d6df..03a7ed8 100644 --- a/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java +++ b/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java @@ -157,7 +157,11 @@ import org.json.simple.JSONObject; @NamedQuery(name = "GET_ACTIONS_BY_DATES_FOR_KILL", query = "select a.id, a.jobId, a.statusStr, a.externalId, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId AND (a.statusStr <> 'FAILED' AND a.statusStr <> 'KILLED' AND a.statusStr <> 'SUCCEEDED' AND a.statusStr <> 'TIMEDOUT') AND a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"), - @NamedQuery(name = "GET_COORD_ACTIONS_COUNT", query = "select count(w) from CoordinatorActionBean w")}) + @NamedQuery(name = "GET_COORD_ACTIONS_COUNT", query = "select count(w) from CoordinatorActionBean w"), + + @NamedQuery(name = "GET_COORD_ACTIONS_COUNT_RUNNING_FOR_RANGE", query = "select count(w) from CoordinatorActionBean w where w.statusStr = 'RUNNING' and w.jobId= :jobId and w.id >= :startAction AND w.id <= :endAction"), + + @NamedQuery(name = "GET_COORD_ACTIONS_MAX_MODIFIED_DATE_FOR_RANGE", query = "select max(w.lastModifiedTimestamp) from CoordinatorActionBean w where w.jobId= :jobId and w.id >= :startAction AND w.id <= :endAction")}) @NamedNativeQueries({ http://git-wip-us.apache.org/repos/asf/oozie/blob/040e5f4d/core/src/main/java/org/apache/oozie/CoordinatorEngine.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/CoordinatorEngine.java b/core/src/main/java/org/apache/oozie/CoordinatorEngine.java index 57587c2..8c5e80c 100644 --- a/core/src/main/java/org/apache/oozie/CoordinatorEngine.java +++ b/core/src/main/java/org/apache/oozie/CoordinatorEngine.java @@ -20,6 +20,8 @@ package org.apache.oozie; import java.io.IOException; import java.io.Writer; import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -280,6 +282,9 @@ public class CoordinatorEngine extends BaseEngine { */ public void streamLog(String jobId, String logRetrievalScope, String logRetrievalType, Writer writer, Map<String, String[]> params) throws IOException, BaseEngineException, CommandException { + + Date startTime = null; + Date endTime = null; XLogStreamer.Filter filter = new XLogStreamer.Filter(); filter.setParameter(DagXLogInfoService.JOB, jobId); if (logRetrievalScope != null && logRetrievalType != null) { @@ -343,7 +348,26 @@ public class CoordinatorEngine extends BaseEngine { orSeparatedActions.insert(0, "("); orSeparatedActions.append(")"); } + filter.setParameter(DagXLogInfoService.ACTION, orSeparatedActions.toString()); + if (actionSet != null && actionSet.size() == 1) { + CoordinatorActionBean actionBean = getCoordAction(actionSet.iterator().next()); + startTime = actionBean.getCreatedTime(); + endTime = actionBean.getStatus().equals(CoordinatorAction.Status.RUNNING) ? new Date() : actionBean + .getLastModifiedTime(); + } + else if (actionSet != null && actionSet.size() > 0) { + List<String> tempList = new ArrayList<String>(actionSet); + Collections.sort(tempList, new Comparator<String>() { + public int compare(String a, String b) { + return Integer.valueOf(a.substring(a.lastIndexOf("@") + 1)).compareTo( + Integer.valueOf(b.substring(b.lastIndexOf("@") + 1))); + } + }); + startTime = getCoordAction(tempList.get(0)).getCreatedTime(); + endTime = CoordActionsInDateRange.getCoordActionsLastModifiedDate(jobId, tempList.get(0), + tempList.get(tempList.size() - 1)); + } } // if coordinator action logs are to be retrieved based on date range // this block gets the corresponding list of coordinator actions to be used by the log filter @@ -369,10 +393,37 @@ public class CoordinatorEngine extends BaseEngine { orSeparatedActions.append(")"); } filter.setParameter(DagXLogInfoService.ACTION, orSeparatedActions.toString()); + + if (coordActionIdList != null && coordActionIdList.size() == 1) { + CoordinatorActionBean actionBean = getCoordAction(coordActionIdList.get(0)); + startTime = actionBean.getCreatedTime(); + endTime = actionBean.getStatus().equals(CoordinatorAction.Status.RUNNING) ? new Date() : actionBean + .getLastModifiedTime(); + } + else if (coordActionIdList != null && coordActionIdList.size() > 0) { + Collections.sort(coordActionIdList, new Comparator<String>() { + public int compare(String a, String b) { + return Integer.valueOf(a.substring(a.lastIndexOf("@") + 1)).compareTo( + Integer.valueOf(b.substring(b.lastIndexOf("@") + 1))); + } + }); + startTime = getCoordAction(coordActionIdList.get(0)).getCreatedTime(); + endTime = CoordActionsInDateRange.getCoordActionsLastModifiedDate(jobId, coordActionIdList.get(0), + coordActionIdList.get(coordActionIdList.size() - 1)); + } } } - CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId); - Services.get().get(XLogStreamingService.class).streamLog(filter, job.getCreatedTime(), new Date(), writer, params); + if (startTime == null || endTime == null) { + CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId); + if (startTime == null) { + startTime = job.getCreatedTime(); + } + if (endTime == null) { + endTime = job.getEndTime() == null ? new Date() : job.getEndTime(); + } + } + //job.getActions() + Services.get().get(XLogStreamingService.class).streamLog(filter, startTime, endTime, writer, params); } /* http://git-wip-us.apache.org/repos/asf/oozie/blob/040e5f4d/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionModifiedDateForRangeJPAExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionModifiedDateForRangeJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionModifiedDateForRangeJPAExecutor.java new file mode 100644 index 0000000..dc4a313 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionModifiedDateForRangeJPAExecutor.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.executor.jpa; + +import java.sql.Timestamp; +import java.util.Date; +import java.util.List; + +import javax.persistence.EntityManager; +import javax.persistence.Query; + +import org.apache.oozie.ErrorCode; +import org.apache.oozie.util.DateUtils; +import org.apache.oozie.util.ParamChecker; + +public class CoordJobGetActionModifiedDateForRangeJPAExecutor implements JPAExecutor<Date> { + + private String jobId = null; + private String startAction, endAction; + + public CoordJobGetActionModifiedDateForRangeJPAExecutor(String jobId, String startAction, String endAction) { + ParamChecker.notNull(jobId, "jobId"); + this.jobId = jobId; + this.startAction = startAction; + this.endAction = endAction; + } + + @Override + public String getName() { + return "CoordJobGetActionIdsForDateRangeJPAExecutor"; + } + + @Override + @SuppressWarnings("unchecked") + public Date execute(EntityManager em) throws JPAExecutorException { + try { + Query q = em.createNamedQuery("GET_COORD_ACTIONS_MAX_MODIFIED_DATE_FOR_RANGE"); + q.setParameter("jobId", jobId); + q.setParameter("startAction", startAction); + q.setParameter("endAction", endAction); + List<Timestamp> coordActionIds = q.getResultList(); + return coordActionIds.isEmpty() ? new Date() : DateUtils.toDate(coordActionIds.get(0)); + } + catch (Exception e) { + throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e); + } + } + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/040e5f4d/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionRunningCountForRangeJPAExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionRunningCountForRangeJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionRunningCountForRangeJPAExecutor.java new file mode 100644 index 0000000..cbcdb98 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionRunningCountForRangeJPAExecutor.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.executor.jpa; + +import javax.persistence.EntityManager; +import javax.persistence.Query; +import org.apache.oozie.ErrorCode; +import org.apache.oozie.util.ParamChecker; + +public class CoordJobGetActionRunningCountForRangeJPAExecutor implements JPAExecutor<Long> { + + private String jobId = null; + private String startAction, endAction; + + public CoordJobGetActionRunningCountForRangeJPAExecutor(String jobId, String startAction, String endAction) { + ParamChecker.notNull(jobId, "jobId"); + this.jobId = jobId; + this.startAction = startAction; + this.endAction = endAction; + } + + @Override + public String getName() { + return "CoordJobGetActionIdsForDateRangeJPAExecutor"; + } + + @Override + public Long execute(EntityManager em) throws JPAExecutorException { + try { + Query q = em.createNamedQuery("GET_COORD_ACTIONS_COUNT_RUNNING_FOR_RANGE"); + q.setParameter("jobId", jobId); + q.setParameter("startAction", startAction); + q.setParameter("endAction", endAction); + Long count = (Long) q.getSingleResult(); + return count; + } + catch (Exception e) { + throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e); + } + } + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/040e5f4d/core/src/main/java/org/apache/oozie/util/CoordActionsInDateRange.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/CoordActionsInDateRange.java b/core/src/main/java/org/apache/oozie/util/CoordActionsInDateRange.java index 6746922..fd21c45 100644 --- a/core/src/main/java/org/apache/oozie/util/CoordActionsInDateRange.java +++ b/core/src/main/java/org/apache/oozie/util/CoordActionsInDateRange.java @@ -28,9 +28,13 @@ import java.util.Set; import org.apache.oozie.CoordinatorActionBean; import org.apache.oozie.ErrorCode; import org.apache.oozie.XException; +import org.apache.oozie.command.CommandException; +import org.apache.oozie.executor.jpa.CoordJobGetActionModifiedDateForRangeJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobGetActionIdsForDateRangeJPAExecutor; +import org.apache.oozie.executor.jpa.CoordJobGetActionRunningCountForRangeJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobGetActionsByDatesForKillJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobGetActionsForDatesJPAExecutor; +import org.apache.oozie.executor.jpa.JPAExecutorException; import org.apache.oozie.service.JPAService; import org.apache.oozie.service.Services; @@ -140,7 +144,7 @@ public class CoordActionsInDateRange { return list; } - /* + /** * Get coordinator action ids between given start and end time * * @param jobId coordinator job id @@ -160,4 +164,36 @@ public class CoordActionsInDateRange { } return list; } + + /** + * Gets the coordinator actions last modified date for range, if any action is running it return new date + * + * @param jobId the job id + * @param startAction the start action + * @param endAction the end action + * @return the coordinator actions last modified date + * @throws CommandException the command exception + */ + public static Date getCoordActionsLastModifiedDate(String jobId, String startAction, String endAction) + throws CommandException { + JPAService jpaService = Services.get().get(JPAService.class); + ParamChecker.notEmpty(jobId, "jobId"); + ParamChecker.notEmpty(startAction, "startAction"); + ParamChecker.notEmpty(endAction, "endAction"); + + try { + long count = jpaService.execute(new CoordJobGetActionRunningCountForRangeJPAExecutor(jobId, startAction, + endAction)); + if (count == 0) { + return jpaService.execute(new CoordJobGetActionModifiedDateForRangeJPAExecutor(jobId, startAction, endAction)); + } + else { + return new Date(); + } + } + catch (JPAExecutorException je) { + throw new CommandException(je); + } + } + } http://git-wip-us.apache.org/repos/asf/oozie/blob/040e5f4d/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java b/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java index a29a07e..6f7403d 100644 --- a/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java +++ b/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java @@ -25,6 +25,8 @@ import java.io.PrintWriter; import java.io.StringWriter; import java.io.Writer; import java.net.URI; +import java.util.Collections; +import java.util.Comparator; import java.util.Date; import java.util.HashMap; import java.util.List; @@ -80,11 +82,15 @@ public class TestCoordinatorEngineStreamLog extends XFsTestCase { static class DummyXLogStreamingService extends XLogStreamingService { Filter filter; + Date startTime; + Date endTime; @Override public void streamLog(Filter filter1, Date startTime, Date endTime, Writer writer, Map<String, String[]> params) throws IOException { filter = filter1; + this.startTime = startTime; + this.endTime = endTime; } } @@ -92,89 +98,121 @@ public class TestCoordinatorEngineStreamLog extends XFsTestCase { return new CoordinatorEngine(getTestUser()); } - /** - * The log streaming itself is tested in - * {@link org.apache.oozie.service.TestXLogService}. Here we test only the - * fields that are injected into - * {@link org.apache.oozie.util.XLogStreamer.Filter} upon - * {@link CoordinatorEngine#streamLog(String, Writer)} invocation. - */ - public void testStreamLog2() throws Exception { + public void testCoordLogStreaming() throws Exception { CoordinatorEngine ce = createCoordinatorEngine(); - String jobId = runJobsImpl(ce); - ce.streamLog(jobId, new StringWriter(), new HashMap<String, String[]>()); + final String jobId = runJobsImpl(ce, 6); - DummyXLogStreamingService service = (DummyXLogStreamingService) services.get(XLogStreamingService.class); - Filter filter = service.filter; - - assertEquals(filter.getFilterParams().get(DagXLogInfoService.JOB), jobId); - } + CoordinatorJobBean cjb = ce.getCoordJob(jobId); + Date createdDate = cjb.getCreatedTime(); + Date endDate = new Date(); + assertTrue(endDate.after(createdDate)); - /** - * Test method org.apache.oozie.CoordinatorEngine.streamLog(String, String, - * String, Writer) with null 2nd and 3rd arguments. - */ - public void testStreamLog4NullNull() throws Exception { - CoordinatorEngine ce = createCoordinatorEngine(); - String jobId = runJobsImpl(ce); - ce.streamLog(jobId, null, null, new StringWriter(), new HashMap<String, String[]>()); + List<CoordinatorAction> list = cjb.getActions(); + Collections.sort(list, new Comparator<CoordinatorAction>() { + public int compare(CoordinatorAction a, CoordinatorAction b) { + return a.getId().compareTo(b.getId()); + } + }); + // Test 1.to test if fields are injected + ce.streamLog(jobId, new StringWriter(), new HashMap<String, String[]>()); DummyXLogStreamingService service = (DummyXLogStreamingService) services.get(XLogStreamingService.class); Filter filter = service.filter; - assertEquals(filter.getFilterParams().get(DagXLogInfoService.JOB), jobId); - } - /** - * Test method org.apache.oozie.CoordinatorEngine.streamLog(String, String, - * String, Writer) with RestConstants.JOB_LOG_ACTION and non-null 2nd - * argument. - */ - public void testStreamLog4JobLogAction() throws Exception { - CoordinatorEngine ce = createCoordinatorEngine(); - String jobId = runJobsImpl(ce); + // Test2 + // * Test method org.apache.oozie.CoordinatorEngine.streamLog(String, + // String, + // * String, Writer) with null 2nd and 3rd arguments. + // */ + ce.streamLog(jobId, null, null, new StringWriter(), new HashMap<String, String[]>()); + service = (DummyXLogStreamingService) services.get(XLogStreamingService.class); + filter = service.filter; + assertEquals(filter.getFilterParams().get(DagXLogInfoService.JOB), jobId); - ce.streamLog(jobId, "678, 123-127, 946", RestConstants.JOB_LOG_ACTION, new StringWriter(), new HashMap<String, String[]>()); + // Test 3 + // * Test method org.apache.oozie.CoordinatorEngine.streamLog(String, + // String, + // * String, Writer) with RestConstants.JOB_LOG_ACTION and non-null 2nd + // * argument. - DummyXLogStreamingService service = (DummyXLogStreamingService) services.get(XLogStreamingService.class); - Filter filter = service.filter; + ce.streamLog(jobId, "1, 3-4, 6", RestConstants.JOB_LOG_ACTION, new StringWriter(), + new HashMap<String, String[]>()); + service = (DummyXLogStreamingService) services.get(XLogStreamingService.class); + filter = service.filter; assertEquals(jobId, filter.getFilterParams().get(DagXLogInfoService.JOB)); - assertEquals("(" + jobId + "@678|" + jobId + "@123|" + jobId + "@124|" + jobId + "@125|" + jobId + "@126|" + jobId - + "@127|" + jobId + "@946)", filter.getFilterParams().get(DagXLogInfoService.ACTION)); - } - - /** - * Test method org.apache.oozie.CoordinatorEngine.streamLog(String, String, - * String, Writer) with RestConstants.JOB_LOG_DATE. - */ - public void testStreamLog4JobLogDate() throws Exception { - CoordinatorEngine ce = createCoordinatorEngine(); - final String jobId = runJobsImpl(ce); - - CoordinatorJobBean cjb = ce.getCoordJob(jobId); - Date createdDate = cjb.getCreatedTime(); - Date endDate = new Date(); - assertTrue(endDate.after(createdDate)); + assertEquals("(" + jobId + "@1|" + jobId + "@3|" + jobId + "@4|" + jobId + "@6)", + filter.getFilterParams().get(DagXLogInfoService.ACTION)); + // Test 4. testing with date range long middle = (createdDate.getTime() + endDate.getTime()) / 2; Date middleDate = new Date(middle); - - ce.streamLog(jobId, DateUtils.formatDateOozieTZ(createdDate) + "::" + DateUtils.formatDateOozieTZ(middleDate) + "," - + DateUtils.formatDateOozieTZ(middleDate) + "::" + DateUtils.formatDateOozieTZ(endDate), + ce.streamLog(jobId, DateUtils.formatDateOozieTZ(createdDate) + "::" + DateUtils.formatDateOozieTZ(middleDate) + + "," + DateUtils.formatDateOozieTZ(middleDate) + "::" + DateUtils.formatDateOozieTZ(endDate), RestConstants.JOB_LOG_DATE, new StringWriter(), new HashMap<String, String[]>()); - - DummyXLogStreamingService service = (DummyXLogStreamingService) services.get(XLogStreamingService.class); - Filter filter = service.filter; - + service = (DummyXLogStreamingService) services.get(XLogStreamingService.class); + filter = service.filter; assertEquals(jobId, filter.getFilterParams().get(DagXLogInfoService.JOB)); final String action = filter.getFilterParams().get(DagXLogInfoService.ACTION); - assertEquals("(" + jobId + "@1|" + jobId + "@2)", action); + assertEquals("(" + jobId + "@1|" + jobId + "@2|" + jobId + "@3|" + jobId + "@4|" + jobId + "@5|" + jobId + + "@6)", action); + + // Test 5 testing with action list range + ce.streamLog(jobId, "2-4", RestConstants.JOB_LOG_ACTION, new StringWriter(), new HashMap<String, String[]>()); + service = (DummyXLogStreamingService) services.get(XLogStreamingService.class); + assertEquals(list.get(1).getCreatedTime(), service.startTime); + assertEquals(list.get(3).getLastModifiedTime(), service.endTime); + + // Test 6, testing with 1 action list + ce.streamLog(jobId, "5", RestConstants.JOB_LOG_ACTION, new StringWriter(), new HashMap<String, String[]>()); + service = (DummyXLogStreamingService) services.get(XLogStreamingService.class); + assertEquals(list.get(4).getCreatedTime(), service.startTime); + assertEquals(list.get(4).getLastModifiedTime(), service.endTime); + + // Test 7, testing with 1 action list + range + ce.streamLog(jobId, "1,2-4,5", RestConstants.JOB_LOG_ACTION, new StringWriter(), + new HashMap<String, String[]>()); + service = (DummyXLogStreamingService) services.get(XLogStreamingService.class); + assertEquals(list.get(0).getCreatedTime(), service.startTime); + assertEquals(list.get(4).getLastModifiedTime(), service.endTime); + + // Test 8, testing with out order range + ce.streamLog(jobId, "5,3-4,1", RestConstants.JOB_LOG_ACTION, new StringWriter(), + new HashMap<String, String[]>()); + service = (DummyXLogStreamingService) services.get(XLogStreamingService.class); + assertEquals(list.get(0).getCreatedTime(), service.startTime); + assertEquals(list.get(4).getLastModifiedTime(), service.endTime); + + + // Test 9, testing with date range + ce.streamLog( + jobId, + DateUtils.formatDateOozieTZ(list.get(1).getCreatedTime()) + "::" + + DateUtils.formatDateOozieTZ(list.get(4).getLastModifiedTime()) + ",", + RestConstants.JOB_LOG_DATE, new StringWriter(), new HashMap<String, String[]>()); + service = (DummyXLogStreamingService) services.get(XLogStreamingService.class); + assertEquals(list.get(1).getCreatedTime().toString(), service.startTime.toString()); + assertEquals(list.get(4).getLastModifiedTime().toString(), service.endTime.toString()); + + // Test 10, testing with multiple date range + ce.streamLog( + jobId, + DateUtils.formatDateOozieTZ(list.get(1).getCreatedTime()) + "::" + + DateUtils.formatDateOozieTZ(list.get(2).getLastModifiedTime()) + "," + + DateUtils.formatDateOozieTZ(list.get(3).getCreatedTime()) + "::" + + DateUtils.formatDateOozieTZ(list.get(5).getLastModifiedTime()), RestConstants.JOB_LOG_DATE, + new StringWriter(), new HashMap<String, String[]>()); + service = (DummyXLogStreamingService) services.get(XLogStreamingService.class); + assertEquals(list.get(1).getCreatedTime().toString(), service.startTime.toString()); + assertEquals(list.get(5).getLastModifiedTime().toString(), service.endTime.toString()); + } - private String runJobsImpl(final CoordinatorEngine ce) throws Exception { + private String runJobsImpl(final CoordinatorEngine ce, int count) throws Exception { services.setService(DummyXLogStreamingService.class); - // need to re-define the parameters that are cleared upon the service reset: + // need to re-define the parameters that are cleared upon the service + // reset: new DagXLogInfoService().init(services); Configuration conf = new XConfiguration(); @@ -182,25 +220,26 @@ public class TestCoordinatorEngineStreamLog extends XFsTestCase { final String appPath = getTestCaseFileUri("coordinator.xml"); final long now = System.currentTimeMillis(); final String start = DateUtils.formatDateOozieTZ(new Date(now)); - long e = now + 1000 * 119; + long e = now + 1000 * 60 * count; final String end = DateUtils.formatDateOozieTZ(new Date(e)); String wfXml = IOUtils.getResourceAsString("wf-no-op.xml", -1); writeToFile(wfXml, getFsTestCaseDir(), "workflow.xml"); - String appXml = "<coordinator-app name=\"NAME\" frequency=\"${coord:minutes(1)}\" start=\"" + start + "\" end=\"" + end - + "\" timezone=\"UTC\" " + "xmlns=\"uri:oozie:coordinator:0.1\"> " + "<controls> " + " <timeout>10</timeout> " - + " <concurrency>1</concurrency> " + " <execution>LIFO</execution> " + "</controls> " + "<action> " - + " <workflow> " + " <app-path>" + getFsTestCaseDir() + "/workflow.xml</app-path>" + String appXml = "<coordinator-app name=\"NAME\" frequency=\"${coord:minutes(1)}\" start=\"" + start + + "\" end=\"" + end + "\" timezone=\"UTC\" " + "xmlns=\"uri:oozie:coordinator:0.1\"> " + "<controls> " + + " <timeout>1</timeout> " + " <concurrency>1</concurrency> " + " <execution>LIFO</execution> " + + "</controls> " + "<action> " + " <workflow> " + " <app-path>" + getFsTestCaseDir() + + "/workflow.xml</app-path>" + " <configuration> <property> <name>inputA</name> <value>valueA</value> </property> " - + " <property> <name>inputB</name> <value>valueB</value> " + " </property></configuration> " + "</workflow>" - + "</action> " + "</coordinator-app>"; + + " <property> <name>inputB</name> <value>valueB</value> " + " </property></configuration> " + + "</workflow>" + "</action> " + "</coordinator-app>"; writeToFile(appXml, appPath); conf.set(OozieClient.COORDINATOR_APP_PATH, appPath); conf.set(OozieClient.USER_NAME, getTestUser()); final String jobId = ce.submitJob(conf, true); - waitFor(1000 * 119, new Predicate() { + waitFor(1000 * 60 * count, new Predicate() { @Override public boolean evaluate() throws Exception { try { @@ -222,10 +261,11 @@ public class TestCoordinatorEngineStreamLog extends XFsTestCase { } } }); - // Assert all the actions are succeeded (useful for waitFor() timeout case): + // Assert all the actions are succeeded (useful for waitFor() timeout + // case): final List<CoordinatorAction> actions = ce.getCoordJob(jobId).getActions(); - for (CoordinatorAction action: actions) { - assertEquals(CoordinatorAction.Status.SUCCEEDED, action.getStatus()); + for (CoordinatorAction action : actions) { + assertEquals(CoordinatorAction.Status.SUCCEEDED, action.getStatus()); } return jobId; } http://git-wip-us.apache.org/repos/asf/oozie/blob/040e5f4d/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 05cb661..d4643c7 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.1.0 release (trunk - unreleased) +OOZIE-1668 Coord log streaming start and end time should be of action list start and end time (puru via rohini) OOZIE-1674 DB upgrade from 3.3.0 to trunk fails on postgres (rkanter) OOZIE-1581 Workflow performance optimizations (mona) OOZIE-1663 Queuedump to display command type (shwethags via virag)
