OOZIE-2815 Oozie not always display job log
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/993f06df Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/993f06df Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/993f06df Branch: refs/heads/oya Commit: 993f06df44b05ece89afd85cd662bb325ed5f8a3 Parents: 2237bbd Author: puru <[email protected]> Authored: Wed Apr 12 14:15:36 2017 -0700 Committer: puru <[email protected]> Committed: Wed Apr 12 14:15:36 2017 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/oozie/BaseEngine.java | 85 +++++++----- .../java/org/apache/oozie/BundleEngine.java | 50 ++----- .../org/apache/oozie/CoordinatorEngine.java | 65 ++------- .../main/java/org/apache/oozie/DagEngine.java | 83 ++--------- .../org/apache/oozie/service/XLogService.java | 12 +- .../oozie/service/XLogStreamingService.java | 80 +++-------- .../oozie/service/ZKXLogStreamingService.java | 138 +++++-------------- .../org/apache/oozie/servlet/V0JobServlet.java | 3 +- .../org/apache/oozie/servlet/V1JobServlet.java | 4 +- .../org/apache/oozie/util/AuthUrlClient.java | 3 +- .../oozie/util/TimestampedMessageParser.java | 36 +---- .../org/apache/oozie/util/XLogAuditFilter.java | 16 --- .../apache/oozie/util/XLogAuditStreamer.java | 79 +++++++++++ .../apache/oozie/util/XLogErrorStreamer.java | 67 +++++++++ .../java/org/apache/oozie/util/XLogFilter.java | 76 ++++++---- .../org/apache/oozie/util/XLogStreamer.java | 117 ++++++++++++++-- core/src/main/resources/oozie-default.xml | 26 +++- .../oozie/TestCoordinatorEngineStreamLog.java | 9 +- .../oozie/service/TestConfigurationService.java | 3 +- .../oozie/service/TestXLogStreamingService.java | 57 ++++++-- .../service/TestZKXLogStreamingService.java | 46 ++++++- .../org/apache/oozie/util/TestLogStreamer.java | 47 ++++++- .../TestSimplifiedTimestampedMessageParser.java | 15 +- .../util/TestTimestampedMessageParser.java | 9 +- .../oozie/util/TestXLogUserFilterParam.java | 34 +---- release-log.txt | 1 + webapp/src/main/webapp/oozie-console.js | 4 +- 27 files changed, 639 insertions(+), 526 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/BaseEngine.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/BaseEngine.java b/core/src/main/java/org/apache/oozie/BaseEngine.java index 50df897..2780ec2 100644 --- a/core/src/main/java/org/apache/oozie/BaseEngine.java +++ b/core/src/main/java/org/apache/oozie/BaseEngine.java @@ -20,25 +20,22 @@ package org.apache.oozie; import java.io.IOException; import java.io.Writer; -import java.util.Date; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.oozie.client.CoordinatorJob; import org.apache.oozie.client.WorkflowJob; +import org.apache.oozie.command.CommandException; import org.apache.oozie.executor.jpa.JPAExecutorException; import org.apache.oozie.service.JMSTopicService; import org.apache.oozie.service.Services; -import org.apache.oozie.service.XLogStreamingService; -import org.apache.oozie.util.XLogFilter; +import org.apache.oozie.util.XLogAuditStreamer; +import org.apache.oozie.util.XLogErrorStreamer; +import org.apache.oozie.util.XLogStreamer; public abstract class BaseEngine { public static final String USE_XCOMMAND = "oozie.useXCommand"; - public enum LOG_TYPE { - LOG, ERROR_LOG, AUDIT_LOG - } - protected String user; /** @@ -169,39 +166,60 @@ public abstract class BaseEngine { * * @param jobId job Id. * @param writer writer to stream the log to. - * @param params additional parameters from the request + * @param requestParameters additional parameters from the request * @throws IOException thrown if the log cannot be streamed. * @throws BaseEngineException thrown if there is error in getting the Workflow/Coordinator Job Information for * jobId. */ - public abstract void streamLog(String jobId, Writer writer, Map<String, String[]> params) - throws IOException, BaseEngineException; + public void streamLog(String jobId, Writer writer, Map<String, String[]> requestParameters) throws IOException, + BaseEngineException { + try { + + streamJobLog(new XLogStreamer(requestParameters), jobId, writer); + } + catch (CommandException e) { + throw new IOException(e); + } + } /** * Stream error log of a job. * * @param jobId job Id. * @param writer writer to stream the log to. - * @param params additional parameters from the request + * @param requestParameters additional parameters from the request * @throws IOException thrown if the log cannot be streamed. * @throws BaseEngineException thrown if there is error in getting the Workflow/Coordinator Job Information for * jobId. */ - public abstract void streamErrorLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException, - BaseEngineException; - /** + public void streamErrorLog(String jobId, Writer writer, Map<String, String[]> requestParameters) throws IOException, + BaseEngineException { + try { + + streamJobLog(new XLogErrorStreamer(requestParameters), jobId, writer); + } + catch (CommandException e) { + throw new IOException(e); + } + } /** * Stream Audit log of a job. * * @param jobId job Id. * @param writer writer to stream the log to. - * @param params additional parameters from the request + * @param requestParameters additional parameters from the request * @throws IOException thrown if the log cannot be streamed. * @throws BaseEngineException thrown if there is error in getting the Workflow/Coordinator Job Information for * jobId. */ - public abstract void streamAuditLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException, - BaseEngineException; - + public void streamAuditLog(String jobId, Writer writer, Map<String, String[]> requestParameters) throws IOException, + BaseEngineException { + try { + streamJobLog(new XLogAuditStreamer(requestParameters), jobId, writer); + } + catch (CommandException e) { + throw new IOException(e); + } + } /** * Return the workflow Job ID for an external ID. @@ -299,25 +317,16 @@ public abstract class BaseEngine { public abstract void changeSLA(String id, String actions, String dates, String childIds, String newParams) throws BaseEngineException; - protected void fetchLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, - Map<String, String[]> params, LOG_TYPE logType) throws IOException { - - switch (logType) { - case LOG: - Services.get().get(XLogStreamingService.class).streamLog(filter, startTime, endTime, writer, params); - break; - case ERROR_LOG: - Services.get().get(XLogStreamingService.class) - .streamErrorLog(filter, startTime, endTime, writer, params); - break; - case AUDIT_LOG: - Services.get().get(XLogStreamingService.class) - .streamAuditLog(filter, startTime, endTime, writer, params); - break; - default: - throw new IOException("Unsupported log Type"); - } - } - + /** + * Stream job log. + * + * @param xLogStreamer the log streamer + * @param jobId the job id + * @param writer the writer + * @throws IOException Signals that an I/O exception has occurred. + * @throws BaseEngineException the base engine exception + */ + protected abstract void streamJobLog(XLogStreamer logStreamer, String jobId, Writer writer) throws IOException, + BaseEngineException; } http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/BundleEngine.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/BundleEngine.java b/core/src/main/java/org/apache/oozie/BundleEngine.java index d0099b4..e994bff 100644 --- a/core/src/main/java/org/apache/oozie/BundleEngine.java +++ b/core/src/main/java/org/apache/oozie/BundleEngine.java @@ -54,14 +54,14 @@ import org.apache.oozie.command.bundle.BundleSubmitXCommand; import org.apache.oozie.executor.jpa.BundleJobQueryExecutor; import org.apache.oozie.executor.jpa.JPAExecutorException; import org.apache.oozie.service.DagXLogInfoService; +import org.apache.oozie.service.Services; +import org.apache.oozie.service.XLogStreamingService; import org.apache.oozie.util.DateUtils; import org.apache.oozie.util.JobUtils; import org.apache.oozie.util.JobsFilterUtils; import org.apache.oozie.util.ParamChecker; import org.apache.oozie.util.XLog; -import org.apache.oozie.util.XLogAuditFilter; -import org.apache.oozie.util.XLogFilter; -import org.apache.oozie.util.XLogUserFilterParam; +import org.apache.oozie.util.XLogStreamer; import com.google.common.annotations.VisibleForTesting; @@ -245,44 +245,11 @@ public class BundleEngine extends BaseEngine { } } - @Override - public void streamLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException, - BundleEngineException { - streamJobLog(jobId, writer, params, LOG_TYPE.LOG); - } - - @Override - public void streamErrorLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException, + protected void streamJobLog(XLogStreamer logStreamer, String jobId, Writer writer) throws IOException, BundleEngineException { - streamJobLog(jobId, writer, params, LOG_TYPE.ERROR_LOG); - } - - @Override - public void streamAuditLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException, - BundleEngineException { - try { - streamJobLog(new XLogAuditFilter(new XLogUserFilterParam(params)), jobId, writer, params, LOG_TYPE.AUDIT_LOG); - } - catch (CommandException e) { - throw new IOException(e); - } - } - - private void streamJobLog(String jobId, Writer writer, Map<String, String[]> params, LOG_TYPE logType) - throws IOException, BundleEngineException { - try { - streamJobLog(new XLogFilter(new XLogUserFilterParam(params)), jobId, writer, params, logType); - } - catch (Exception e) { - throw new IOException(e); - } - } - - private void streamJobLog(XLogFilter filter, String jobId, Writer writer, Map<String, String[]> params, LOG_TYPE logType) - throws IOException, BundleEngineException { - try { + try { BundleJobBean job; - filter.setParameter(DagXLogInfoService.JOB, jobId); + logStreamer.getXLogFilter().setParameter(DagXLogInfoService.JOB, jobId); job = new BundleJobXCommand(jobId).call(); Date lastTime = null; if (job.isTerminalStatus()) { @@ -291,9 +258,10 @@ public class BundleEngine extends BaseEngine { if (lastTime == null) { lastTime = new Date(); } - fetchLog(filter, job.getCreatedTime(), lastTime, writer, params, logType); + Services.get().get(XLogStreamingService.class) + .streamLog(logStreamer, job.getCreatedTime(), lastTime, writer); } - catch (Exception ex) { + catch (CommandException ex) { throw new IOException(ex); } } http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/CoordinatorEngine.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/CoordinatorEngine.java b/core/src/main/java/org/apache/oozie/CoordinatorEngine.java index 2f9f822..2c04bea 100644 --- a/core/src/main/java/org/apache/oozie/CoordinatorEngine.java +++ b/core/src/main/java/org/apache/oozie/CoordinatorEngine.java @@ -76,8 +76,8 @@ import org.apache.oozie.util.JobUtils; import org.apache.oozie.util.Pair; import org.apache.oozie.util.ParamChecker; import org.apache.oozie.util.XLog; -import org.apache.oozie.util.XLogAuditFilter; import org.apache.oozie.util.XLogFilter; +import org.apache.oozie.util.XLogStreamer; import org.apache.oozie.util.XLogUserFilterParam; import com.google.common.annotations.VisibleForTesting; @@ -303,56 +303,20 @@ public class CoordinatorEngine extends BaseEngine { throw new BaseEngineException(new XException(ErrorCode.E0301, "invalid use of start")); } - @Override - public void streamLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException, - BaseEngineException { - streamJobLog(jobId, writer, params, LOG_TYPE.LOG); - } - - @Override - public void streamErrorLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException, - BaseEngineException { - streamJobLog(jobId, writer, params, LOG_TYPE.ERROR_LOG); - } @Override - public void streamAuditLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException, - BaseEngineException { - try { - streamJobLog(new XLogAuditFilter(new XLogUserFilterParam(params)), jobId, writer, params, LOG_TYPE.AUDIT_LOG); - } - catch (CommandException e) { - throw new IOException(e); - } - } - - private void streamJobLog(String jobId, Writer writer, Map<String, String[]> params, LOG_TYPE logType) + protected void streamJobLog(XLogStreamer logStreamer, String jobId, Writer writer) throws IOException, BaseEngineException { - try { - streamJobLog(new XLogFilter(new XLogUserFilterParam(params)), jobId, writer, params, logType); - } - catch (Exception e) { - throw new IOException(e); - } - } - - private void streamJobLog(XLogFilter filter, String jobId, Writer writer, Map<String, String[]> params, LOG_TYPE logType) - throws IOException, BaseEngineException { - try { - filter.setParameter(DagXLogInfoService.JOB, jobId); - Date lastTime = null; - CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId); - if (job.isTerminalStatus()) { - lastTime = job.getLastModifiedTime(); - } - if (lastTime == null) { - lastTime = new Date(); - } - fetchLog(filter, job.getCreatedTime(), lastTime, writer, params, logType); + logStreamer.getXLogFilter().setParameter(DagXLogInfoService.JOB, jobId); + Date lastTime = null; + CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId); + if (job.isTerminalStatus()) { + lastTime = job.getLastModifiedTime(); } - catch (Exception e) { - throw new IOException(e); + if (lastTime == null) { + lastTime = new Date(); } + Services.get().get(XLogStreamingService.class).streamLog(logStreamer, job.getCreatedTime(), lastTime, writer); } /** @@ -362,17 +326,17 @@ public class CoordinatorEngine extends BaseEngine { * @param logRetrievalScope Value for the retrieval type * @param logRetrievalType Based on which filter criteria the log is retrieved * @param writer writer to stream the log to - * @param params additional parameters from the request + * @param requestParameters additional parameters from the request * @throws IOException * @throws BaseEngineException * @throws CommandException */ public void streamLog(String jobId, String logRetrievalScope, String logRetrievalType, Writer writer, - Map<String, String[]> params) throws IOException, BaseEngineException, CommandException { + Map<String, String[]> requestParameters) throws IOException, BaseEngineException, CommandException { Date startTime = null; Date endTime = null; - XLogFilter filter = new XLogFilter(new XLogUserFilterParam(params)); + XLogFilter filter = new XLogFilter(new XLogUserFilterParam(requestParameters)); filter.setParameter(DagXLogInfoService.JOB, jobId); if (logRetrievalScope != null && logRetrievalType != null) { @@ -528,7 +492,8 @@ public class CoordinatorEngine extends BaseEngine { } } } - Services.get().get(XLogStreamingService.class).streamLog(filter, startTime, endTime, writer, params); + Services.get().get(XLogStreamingService.class).streamLog(new XLogStreamer(filter, requestParameters), startTime, + endTime, writer); } /* http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/DagEngine.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/DagEngine.java b/core/src/main/java/org/apache/oozie/DagEngine.java index 57d2761..36198b5 100644 --- a/core/src/main/java/org/apache/oozie/DagEngine.java +++ b/core/src/main/java/org/apache/oozie/DagEngine.java @@ -63,13 +63,12 @@ import org.apache.oozie.service.CallableQueueService; import org.apache.oozie.service.DagXLogInfoService; import org.apache.oozie.service.Services; import org.apache.oozie.service.XLogService; +import org.apache.oozie.service.XLogStreamingService; import org.apache.oozie.util.ParamChecker; import org.apache.oozie.util.XCallable; import org.apache.oozie.util.XConfiguration; import org.apache.oozie.util.XLog; -import org.apache.oozie.util.XLogAuditFilter; -import org.apache.oozie.util.XLogFilter; -import org.apache.oozie.util.XLogUserFilterParam; +import org.apache.oozie.util.XLogStreamer; /** * The DagEngine provides all the DAG engine functionality for WS calls. @@ -389,80 +388,16 @@ public class DagEngine extends BaseEngine { } } - /** - * Stream the log of a job. - * - * @param jobId job Id. - * @param writer writer to stream the log to. - * @param params additional parameters from the request - * @throws IOException thrown if the log cannot be streamed. - * @throws DagEngineException thrown if there is error in getting the Workflow Information for jobId. - */ @Override - public void streamLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException, - DagEngineException { - streamJobLog(jobId, writer, params, LOG_TYPE.LOG); - } - - /** - * Stream the error log of a job. - * - * @param jobId job Id. - * @param writer writer to stream the log to. - * @param params additional parameters from the request - * @throws IOException thrown if the log cannot be streamed. - * @throws DagEngineException thrown if there is error in getting the Workflow Information for jobId. - */ - @Override - public void streamErrorLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException, - DagEngineException { - streamJobLog(jobId, writer, params, LOG_TYPE.ERROR_LOG); - } - - /** - * Stream the audit log of a job. - * - * @param jobId job Id. - * @param writer writer to stream the log to. - * @param params additional parameters from the request - * @throws IOException thrown if the log cannot be streamed. - * @throws DagEngineException thrown if there is error in getting the Workflow Information for jobId. - */ - @Override - public void streamAuditLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException, - DagEngineException { - try { - streamJobLog(new XLogAuditFilter(new XLogUserFilterParam(params)),jobId, writer, params, LOG_TYPE.AUDIT_LOG); - } - catch (CommandException e) { - throw new IOException(e); - } - } - - private void streamJobLog(String jobId, Writer writer, Map<String, String[]> params, LOG_TYPE logType) - throws IOException, DagEngineException { - try { - streamJobLog(new XLogFilter(new XLogUserFilterParam(params)), jobId, writer, params, logType); - } - catch (Exception e) { - throw new IOException(e); - } - } - - private void streamJobLog(XLogFilter filter, String jobId, Writer writer, Map<String, String[]> params, LOG_TYPE logType) + protected void streamJobLog(XLogStreamer logStreamer, String jobId, Writer writer) throws IOException, DagEngineException { - try { - filter.setParameter(DagXLogInfoService.JOB, jobId); - WorkflowJob job = getJob(jobId); - Date lastTime = job.getEndTime(); - if (lastTime == null) { - lastTime = job.getLastModifiedTime(); - } - fetchLog(filter, job.getCreatedTime(), lastTime, writer, params, logType); - } - catch (Exception e) { - throw new IOException(e); + logStreamer.getXLogFilter().setParameter(DagXLogInfoService.JOB, jobId); + WorkflowJob job = getJob(jobId); + Date lastTime = job.getEndTime(); + if (lastTime == null) { + lastTime = job.getLastModifiedTime(); } + Services.get().get(XLogStreamingService.class).streamLog(logStreamer, job.getCreatedTime(), lastTime, writer); } private static final Set<String> FILTER_NAMES = new HashSet<String>(); http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/service/XLogService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/XLogService.java b/core/src/main/java/org/apache/oozie/service/XLogService.java index 04f04f4..0b43722 100644 --- a/core/src/main/java/org/apache/oozie/service/XLogService.java +++ b/core/src/main/java/org/apache/oozie/service/XLogService.java @@ -295,23 +295,23 @@ public class XLogService implements Service, Instrumentable { }); } - boolean getLogOverWS() { + public boolean getLogOverWS() { return logOverWS; } - boolean isErrorLogEnabled(){ + public boolean isErrorLogEnabled(){ return errorLogEnabled; } - int getOozieLogRotation() { + public int getOozieLogRotation() { return oozieLogRotation; } - int getOozieErrorLogRotation() { + public int getOozieErrorLogRotation() { return oozieErrorLogRotation; } - int getOozieAuditLogRotation() { + public int getOozieAuditLogRotation() { return oozieAuditLogRotation; } @@ -323,7 +323,7 @@ public class XLogService implements Service, Instrumentable { return oozieAuditLogName; } - boolean isAuditLogEnabled() { + public boolean isAuditLogEnabled() { return auditLogEnabled; } http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/service/XLogStreamingService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/XLogStreamingService.java b/core/src/main/java/org/apache/oozie/service/XLogStreamingService.java index c15c4c1..40ba46e 100644 --- a/core/src/main/java/org/apache/oozie/service/XLogStreamingService.java +++ b/core/src/main/java/org/apache/oozie/service/XLogStreamingService.java @@ -18,24 +18,19 @@ package org.apache.oozie.service; -import org.apache.oozie.util.XLogFilter; +import org.apache.commons.lang.StringUtils; import org.apache.oozie.util.Instrumentable; import org.apache.oozie.util.Instrumentation; import org.apache.oozie.util.XLogStreamer; - import java.io.IOException; import java.io.Writer; -import java.util.Map; import java.util.Date; /** * Service that performs streaming of log files over Web Services if enabled in XLogService */ public class XLogStreamingService implements Service, Instrumentable { - private static final String CONF_PREFIX = Service.CONF_PREFIX + "XLogStreamingService."; - public static final String STREAM_BUFFER_LEN = CONF_PREFIX + "buffer.len"; - protected int bufferLen; /** * Initialize the log streaming service. @@ -44,7 +39,6 @@ public class XLogStreamingService implements Service, Instrumentable { * @throws ServiceException thrown if the log streaming service could not be initialized. */ public void init(Services services) throws ServiceException { - bufferLen = ConfigurationService.getInt(services.getConf(), STREAM_BUFFER_LEN); } /** @@ -71,75 +65,37 @@ public class XLogStreamingService implements Service, Instrumentable { // nothing to instrument } - /** - * Stream the log of a job. - * - * @param filter log streamer filter. - * @param startTime start time for log events to filter. - * @param endTime end time for log events to filter. - * @param writer writer to stream the log to. - * @param params additional parameters from the request - * @throws IOException thrown if the log cannot be streamed. - */ - public void streamLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params) - throws IOException { - XLogService xLogService = Services.get().get(XLogService.class); - if (xLogService.getLogOverWS()) { - new XLogStreamer(filter, xLogService.getOozieLogPath(), xLogService.getOozieLogName(), - xLogService.getOozieLogRotation()).streamLog(writer, startTime, endTime, bufferLen); - } - else { - writer.write("Log streaming disabled!!"); - } - } /** - * Stream the error log of a job. + * Stream the log of a job. * + * @param logStreamer the log streamer * @param filter log streamer filter. * @param startTime start time for log events to filter. * @param endTime end time for log events to filter. * @param writer writer to stream the log to. - * @param params additional parameters from the request - * @throws IOException thrown if the log cannot be streamed. + * @throws IOException Signals that an I/O exception has occurred. */ - public void streamErrorLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params) - throws IOException { - XLogService xLogService = Services.get().get(XLogService.class); - if (xLogService.isErrorLogEnabled()) { - new XLogStreamer(filter, xLogService.getOozieErrorLogPath(), xLogService.getOozieErrorLogName(), - xLogService.getOozieErrorLogRotation()).streamLog(writer, startTime, endTime, bufferLen); - } - else { - writer.write("Error Log is disabled!!"); - } + public void streamLog(XLogStreamer logStreamer, Date startTime, Date endTime, Writer writer) throws IOException { + streamLog(logStreamer, startTime, endTime, writer, true); } /** - * Stream the audit log of a job. + * Stream log. * - * @param filter log streamer filter. - * @param startTime start time for log events to filter. - * @param endTime end time for log events to filter. - * @param writer writer to stream the log to. - * @param params additional parameters from the request - * @throws IOException thrown if the log cannot be streamed. + * @param logStreamer the log streamer + * @param startTime the start time + * @param endTime the end time + * @param writer the writer + * @param appendDebug the append debug + * @throws IOException Signals that an I/O exception has occurred. */ - public void streamAuditLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params) + protected void streamLog(XLogStreamer logStreamer, Date startTime, Date endTime, Writer writer, boolean appendDebug) throws IOException { - XLogService xLogService = Services.get().get(XLogService.class); - if (xLogService.isAuditLogEnabled()) { - new XLogStreamer(filter, xLogService.getOozieAuditLogPath(), xLogService.getOozieAuditLogName(), - xLogService.getOozieAuditLogRotation()).streamLog(writer, startTime, endTime, bufferLen); - } - else { - writer.write("Audit Log is disabled!!"); + if (!logStreamer.isLogEnabled()) { + writer.write(logStreamer.getLogDisableMessage()); + return; } - } - - - - public int getBufferLen() { - return bufferLen; + logStreamer.streamLog(writer, startTime, endTime, appendDebug); } } http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java b/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java index 97771ad..3a5081c 100644 --- a/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java +++ b/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; +import org.apache.commons.lang.StringUtils; import org.apache.curator.x.discovery.ServiceInstance; import org.apache.oozie.ErrorCode; import org.apache.oozie.client.OozieClient; @@ -37,7 +38,6 @@ import org.apache.oozie.util.Instrumentation; import org.apache.oozie.util.SimpleTimestampedMessageParser; import org.apache.oozie.util.TimestampedMessageParser; import org.apache.oozie.util.XLog; -import org.apache.oozie.util.XLogFilter; import org.apache.oozie.util.XLogStreamer; import org.apache.oozie.util.ZKUtils; @@ -93,119 +93,42 @@ public class ZKXLogStreamingService extends XLogStreamingService implements Serv /** * Stream the log of a job. It contacts any other running Oozie servers to collate relevant logs while streaming. * - * @param filter log streamer filter. + * @param logStreamer the log streamer * @param startTime start time for log events to filter. * @param endTime end time for log events to filter. * @param writer writer to stream the log to. - * @param params additional parameters from the request * @throws IOException thrown if the log cannot be streamed. */ @Override - public void streamLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params) - throws IOException { - XLogService xLogService = Services.get().get(XLogService.class); - if (xLogService.getLogOverWS()) { - // If ALL_SERVERS_PARAM is set to false, then only stream our log - if (!Services.get().get(JobsConcurrencyService.class).isAllServerRequest(params)) { - new XLogStreamer(filter, xLogService.getOozieLogPath(), xLogService.getOozieLogName(), - xLogService.getOozieLogRotation()).streamLog(writer, startTime, endTime, bufferLen); - } - // Otherwise, we have to go collate relevant logs from the other Oozie servers - else { - collateLogs(filter, startTime, endTime, writer, params, xLogService.getOozieLogPath(), - xLogService.getOozieLogName(), xLogService.getOozieLogRotation(), RestConstants.JOB_SHOW_LOG); - } - } - else { - writer.write("Log streaming disabled!!"); - } - } + public void streamLog(XLogStreamer logStreamer, Date startTime, Date endTime, Writer writer) throws IOException { - /** - * Stream the error log of a job. It contacts any other running Oozie servers to collate relevant error logs while streaming. - * - * @param filter log streamer filter. - * @param startTime start time for log events to filter. - * @param endTime end time for log events to filter. - * @param writer writer to stream the log to. - * @param params additional parameters from the request - * @throws IOException thrown if the log cannot be streamed. - */ - public void streamErrorLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params) - throws IOException { - XLogService xLogService = Services.get().get(XLogService.class); - if (xLogService.isErrorLogEnabled()) { - // If ALL_SERVERS_PARAM is set to false, then only stream our log - if (!Services.get().get(JobsConcurrencyService.class).isAllServerRequest(params)) { - new XLogStreamer(filter, xLogService.getOozieErrorLogPath(), xLogService.getOozieErrorLogName(), - xLogService.getOozieErrorLogRotation()).streamLog(writer, startTime, endTime, bufferLen); - } - // Otherwise, we have to go collate relevant logs from the other Oozie servers - else { - collateLogs(filter, startTime, endTime, writer, params, xLogService.getOozieLogPath(), - xLogService.getOozieErrorLogName(), xLogService.getOozieErrorLogRotation(), - RestConstants.JOB_SHOW_ERROR_LOG); - } - } - else { - writer.write("Error Log streaming disabled!!"); + if (!logStreamer.isLogEnabled()) { + writer.write(logStreamer.getLogDisableMessage()); + return; } - } - - /** - * Stream the audit log of a job. It contacts any other running Oozie servers to collate relevant audit logs while streaming. - * - * @param filter log streamer filter. - * @param startTime start time for log events to filter. - * @param endTime end time for log events to filter. - * @param writer writer to stream the log to. - * @param params additional parameters from the request - * @throws IOException thrown if the log cannot be streamed. - */ - @Override - public void streamAuditLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params) - throws IOException { - XLogService xLogService = Services.get().get(XLogService.class); - if (xLogService.isAuditLogEnabled()) { - // If ALL_SERVERS_PARAM is set to false, then only stream our log - if (!Services.get().get(JobsConcurrencyService.class).isAllServerRequest(params)) { - new XLogStreamer(filter, xLogService.getOozieAuditLogPath(), xLogService.getOozieAuditLogName(), - xLogService.getOozieAuditLogRotation()).streamLog(writer, startTime, endTime, bufferLen); - } - // Otherwise, we have to go collate relevant logs from the other Oozie servers - else { - collateLogs(filter, startTime, endTime, writer, params, xLogService.getOozieAuditLogPath(), - xLogService.getOozieAuditLogName(), xLogService.getOozieAuditLogRotation(), - RestConstants.JOB_SHOW_AUDIT_LOG); - } + // If ALL_SERVERS_PARAM is set to false, then only stream our log + if (!Services.get().get(JobsConcurrencyService.class).isAllServerRequest(logStreamer.getRequestParam())) { + super.streamLog(logStreamer, startTime, endTime, writer, false); } + // Otherwise, we have to go collate relevant logs from the other Oozie servers else { - writer.write("Audit Log streaming disabled!!"); + collateLogs(logStreamer, startTime, endTime, writer); } } - - /** * Contacts each of the other Oozie servers, gets their logs for the job, collates them, and sends them to the user via the * Writer. It will make sure to not read all of the log messages into memory at the same time to not use up the heap. If there * is a problem talking to one of the other servers, it will ignore that server and prepend a message to the Writer about it. * For getting the logs from this server, it won't use the REST API and instead get them directly to be more efficient. * - * @param filter the job filter + * @param logStreamer the XLogStreamer * @param startTime the job start time * @param endTime the job end time * @param writer the writer - * @param params the params - * @param logPath the log path - * @param logName the log name - * @param rotation the rotation - * @param logType the log type * @throws IOException Signals that an I/O exception has occurred. */ - private void collateLogs(XLogFilter filter, Date startTime, Date endTime, Writer writer, - Map<String, String[]> params, String logPath, String logName, int rotation, final String logType) throws IOException { - XLogService xLogService = Services.get().get(XLogService.class); + private void collateLogs(XLogStreamer logStreamer, Date startTime, Date endTime, Writer writer) throws IOException { List<String> badOozies = new ArrayList<String>(); List<ServiceInstance<Map>> oozies = null; try { @@ -222,24 +145,28 @@ public class ZKXLogStreamingService extends XLogStreamingService implements Serv String otherId = oozieMeta.get(ZKUtils.ZKMetadataKeys.OOZIE_ID); // If it's this server, we can just get them directly if (otherId.equals(zk.getZKId())) { - BufferedReader reader = new XLogStreamer(filter, logPath, logName, rotation).makeReader(startTime, + BufferedReader reader = logStreamer.makeReader(startTime, endTime); - parsers.add(new TimestampedMessageParser(reader, filter)); + parsers.add(new TimestampedMessageParser(reader, logStreamer.getXLogFilter())); } // If it's another server, we'll have to use the REST API else { String otherUrl = oozieMeta.get(ZKUtils.ZKMetadataKeys.OOZIE_URL); - String jobId = filter.getFilterParams().get(DagXLogInfoService.JOB); + String jobId = logStreamer.getXLogFilter().getFilterParams().get(DagXLogInfoService.JOB); try { // It's important that we specify ALL_SERVERS_PARAM=false in the GET request to prevent the other Oozie // Server from trying aggregate logs from the other Oozie servers (and creating an infinite recursion) final String url = otherUrl + "/v" + OozieClient.WS_PROTOCOL_VERSION + "/" + RestConstants.JOB - + "/" + jobId + "?" + RestConstants.JOB_SHOW_PARAM + "=" + logType - + "&" + RestConstants.ALL_SERVER_REQUEST + "=false" + AuthUrlClient.getQueryParamString(params); + + "/" + jobId + "?" + RestConstants.JOB_SHOW_PARAM + "=" + logStreamer.getLogType() + + "&" + RestConstants.ALL_SERVER_REQUEST + "=false" + + AuthUrlClient.getQueryParamString(logStreamer.getRequestParam()); // remove doAs from url to avoid failure while fetching // logs in case of HA mode String key = "doAs"; - String[] value = params.get(key); + String[] value = null; + if (logStreamer.getRequestParam() != null) { + value = logStreamer.getRequestParam().get(key); + } String urlWithoutdoAs = null; if (value != null && value.length > 0 && value[0] != null && value[0].length() > 0) { urlWithoutdoAs = url.replace("&" + key + "=" + URLEncoder.encode(value[0], "UTF-8"), ""); @@ -248,7 +175,7 @@ public class ZKXLogStreamingService extends XLogStreamingService implements Serv urlWithoutdoAs = url; } BufferedReader reader = AuthUrlClient.callServer(urlWithoutdoAs); - parsers.add(new SimpleTimestampedMessageParser(reader, filter)); + parsers.add(new SimpleTimestampedMessageParser(reader, logStreamer.getXLogFilter())); } catch(IOException ioe) { log.warn("Failed to retrieve logs for job [" + jobId + "] from Oozie server with ID [" + otherId @@ -259,10 +186,13 @@ public class ZKXLogStreamingService extends XLogStreamingService implements Serv } //If log param debug is set, we need to write start date and end date to outputstream. - if(filter.isDebugMode()){ - writer.write(filter.getDebugMessage()); + if(!StringUtils.isEmpty(logStreamer.getXLogFilter().getTruncatedMessage())){ + writer.write(logStreamer.getXLogFilter().getTruncatedMessage()); } + if (logStreamer.getXLogFilter().isDebugMode()) { + writer.write(logStreamer.getXLogFilter().getDebugMessage()); + } // Add a message about any servers we couldn't contact if (!badOozies.isEmpty()) { writer.write("Unable to contact the following Oozie Servers for logs (log information may be incomplete):\n"); @@ -278,7 +208,7 @@ public class ZKXLogStreamingService extends XLogStreamingService implements Serv // If it's just the one server (this server), then we don't need to do any more processing and can just copy it directly if (parsers.size() == 1) { TimestampedMessageParser parser = parsers.get(0); - parser.processRemaining(writer, bufferLen); + parser.processRemaining(writer, logStreamer); } else { // Now that we have a Reader for each server to get the logs from that server, we have to collate them. Within each @@ -292,16 +222,13 @@ public class ZKXLogStreamingService extends XLogStreamingService implements Serv timestampMap.put(parser.getLastTimestamp(), parser); } } - int bytesWritten = 0; while (timestampMap.size() > 1) { // The first entry will be the earliest based on the timestamp (also removes it) from the map TimestampedMessageParser earliestParser = timestampMap.pollFirstEntry().getValue(); // Write the message from that parser at that timestamp writer.write(earliestParser.getLastMessage()); - bytesWritten = earliestParser.getLastMessage().length(); - if (bytesWritten > bufferLen) { + if (logStreamer.shouldFlushOutput(earliestParser.getLastMessage().length())) { writer.flush(); - bytesWritten = 0; } // Increment that parser to read the next message if (earliestParser.increment()) { @@ -313,7 +240,7 @@ public class ZKXLogStreamingService extends XLogStreamingService implements Serv if (timestampMap.size() == 1) { TimestampedMessageParser parser = timestampMap.values().iterator().next(); writer.write(parser.getLastMessage()); // don't forget the last message read by the parser - parser.processRemaining(writer, bufferLen, bytesWritten + parser.getLastMessage().length()); + parser.processRemaining(writer, logStreamer); } } } @@ -321,7 +248,6 @@ public class ZKXLogStreamingService extends XLogStreamingService implements Serv for (TimestampedMessageParser parser : parsers) { parser.closeReader(); } - writer.flush(); } } } http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java b/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java index d3b4689..39c7b85 100644 --- a/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java +++ b/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java @@ -24,6 +24,7 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.apache.hadoop.conf.Configuration; +import org.apache.oozie.BaseEngineException; import org.apache.oozie.DagEngine; import org.apache.oozie.DagEngineException; import org.apache.oozie.client.rest.JsonBean; @@ -192,7 +193,7 @@ public class V0JobServlet extends BaseJobServlet { try { dagEngine.streamLog(jobId, response.getWriter(), request.getParameterMap()); } - catch (DagEngineException ex) { + catch (BaseEngineException ex) { throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); } } http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java b/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java index 9356768..10812c6 100644 --- a/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java +++ b/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java @@ -990,7 +990,7 @@ public class V1JobServlet extends BaseJobServlet { try { dagEngine.streamLog(jobId, response.getWriter(), request.getParameterMap()); } - catch (DagEngineException ex) { + catch (BaseEngineException ex) { throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); } } @@ -1009,7 +1009,7 @@ public class V1JobServlet extends BaseJobServlet { try { bundleEngine.streamLog(jobId, response.getWriter(), request.getParameterMap()); } - catch (BundleEngineException ex) { + catch (BaseEngineException ex) { throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); } } http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/util/AuthUrlClient.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/AuthUrlClient.java b/core/src/main/java/org/apache/oozie/util/AuthUrlClient.java index b45a96a..4fc8f57 100644 --- a/core/src/main/java/org/apache/oozie/util/AuthUrlClient.java +++ b/core/src/main/java/org/apache/oozie/util/AuthUrlClient.java @@ -37,11 +37,11 @@ import org.apache.hadoop.security.authentication.client.Authenticator; import org.apache.hadoop.security.authentication.client.KerberosAuthenticator; import org.apache.hadoop.security.authentication.client.PseudoAuthenticator; import org.apache.oozie.service.ConfigurationService; -import org.apache.oozie.service.Services; public class AuthUrlClient { public static final String SERVER_SERVER_AUTH_TYPE = "oozie.server.authentication.type"; + public static final String SERVER_SERVER_CONNECTION_TIMEOUT_SECONDS = "oozie.server.connection.timeout.seconds"; private static XLog LOG = XLog.getLog(AuthUrlClient.class); @@ -129,6 +129,7 @@ public class AuthUrlClient { @Override public BufferedReader run() throws IOException { HttpURLConnection conn = getConnection(url); + conn.setConnectTimeout(ConfigurationService.getInt(SERVER_SERVER_CONNECTION_TIMEOUT_SECONDS, 180)); BufferedReader reader = null; if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { InputStream is = conn.getInputStream(); http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/util/TimestampedMessageParser.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/TimestampedMessageParser.java b/core/src/main/java/org/apache/oozie/util/TimestampedMessageParser.java index 830f287..7fa8a60 100644 --- a/core/src/main/java/org/apache/oozie/util/TimestampedMessageParser.java +++ b/core/src/main/java/org/apache/oozie/util/TimestampedMessageParser.java @@ -194,50 +194,24 @@ public class TimestampedMessageParser { } /** - * Streams log messages to the passed in Writer. Flushes the log writing - * based on buffer len + * Streams log messages to the passed in Writer, with zero bytes already + * written * * @param writer - * @param bufferLen maximum len of log buffer - * @param bytesWritten num bytes already written to writer + * @param logStreamer the log streamer * @throws IOException */ - public void processRemaining(Writer writer, int bufferLen, int bytesWritten) throws IOException { + public void processRemaining(Writer writer, XLogStreamer logStreamer) throws IOException { while (increment()) { writer.write(lastMessage); - bytesWritten += lastMessage.length(); - if (bytesWritten > bufferLen) { + if (logStreamer.shouldFlushOutput(lastMessage.length())) { writer.flush(); - bytesWritten = 0; } } writer.flush(); } /** - * Streams log messages to the passed in Writer, with zero bytes already - * written - * - * @param writer - * @param bufferLen maximum len of log buffer - * @throws IOException - */ - public void processRemaining(Writer writer, int bufferLen) throws IOException { - processRemaining(writer, bufferLen, 0); - } - - /** - * Streams log messages to the passed in Writer, with default buffer len 4K - * and zero bytes already written - * - * @param writer - * @throws IOException - */ - public void processRemaining(Writer writer) throws IOException { - processRemaining(writer, Services.get().get(XLogStreamingService.class).getBufferLen()); - } - - /** * Splits the log message into parts * * @param line http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/util/XLogAuditFilter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/XLogAuditFilter.java b/core/src/main/java/org/apache/oozie/util/XLogAuditFilter.java index c377db5..465d808 100644 --- a/core/src/main/java/org/apache/oozie/util/XLogAuditFilter.java +++ b/core/src/main/java/org/apache/oozie/util/XLogAuditFilter.java @@ -17,8 +17,6 @@ */ package org.apache.oozie.util; -import java.io.IOException; -import java.util.Date; import java.util.regex.Pattern; import org.apache.oozie.service.DagXLogInfoService; @@ -29,20 +27,6 @@ public class XLogAuditFilter extends XLogFilter { super(xLogUserFilterParam); } - @Override - public void calculateAndValidateDateRange(Date jobStartTime, Date jobEndTime) throws IOException { - // for testcase, otherwise jobStartTime and jobEndTime will be always set - if (jobStartTime == null || jobEndTime == null) { - return; - } - - if (jobStartTime.after(jobEndTime)) { - throw new IOException("Start time should be less than end time. startTime = " + jobStartTime - + " endtime = " + jobEndTime); - } - - } - public void constructPattern() { // audit log will only need to scan on jobID StringBuilder sb = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/util/XLogAuditStreamer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/XLogAuditStreamer.java b/core/src/main/java/org/apache/oozie/util/XLogAuditStreamer.java new file mode 100644 index 0000000..6b27ae3 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/util/XLogAuditStreamer.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.util; + +import java.io.IOException; +import java.util.Date; +import java.util.Map; + +import org.apache.oozie.client.rest.RestConstants; +import org.apache.oozie.command.CommandException; +import org.apache.oozie.service.ConfigurationService; +import org.apache.oozie.service.Services; +import org.apache.oozie.service.XLogService; + +public class XLogAuditStreamer extends XLogStreamer { + public static final String STREAM_BUFFER_LEN = CONF_PREFIX + "audit.buffer.len"; + + public XLogAuditStreamer(XLogFilter logFilter, Map<String, String[]> requestParameters) { + super(logFilter, Services.get().get(XLogService.class).getOozieAuditLogPath(), + Services.get().get(XLogService.class).getOozieAuditLogName(), + Services.get().get(XLogService.class).getOozieAuditLogRotation()); + this.requestParam = requestParameters; + bufferLen = ConfigurationService.getInt(STREAM_BUFFER_LEN, 3); + } + + public XLogAuditStreamer(Map<String, String[]> requestParameters) throws CommandException { + this(new XLogAuditFilter(new XLogUserFilterParam(requestParameters)), requestParameters); + + } + + @Override + protected void calculateAndValidateDateRange(Date startTime, Date endTime) throws IOException { + logFilter.calculateAndCheckDates(startTime, endTime); + // no validate + } + + @Override + public boolean isLogEnabled() { + return Services.get().get(XLogService.class).isAuditLogEnabled(); + } + + @Override + public String getLogType() { + return RestConstants.JOB_SHOW_AUDIT_LOG; + } + + @Override + public String getLogDisableMessage() { + return "Audit Log is disabled!!"; + } + + public boolean shouldFlushOutput(int byteCountIgnored) { + // audit flush is done on number of lines written, not byte + this.totalDataWritten += 1; + if (this.totalDataWritten > getBufferLen()) { + this.totalDataWritten = 0; + return true; + } + else { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/util/XLogErrorStreamer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/XLogErrorStreamer.java b/core/src/main/java/org/apache/oozie/util/XLogErrorStreamer.java new file mode 100644 index 0000000..3395925 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/util/XLogErrorStreamer.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.util; + +import java.io.IOException; +import java.util.Date; +import java.util.Map; + +import org.apache.oozie.client.rest.RestConstants; +import org.apache.oozie.command.CommandException; +import org.apache.oozie.service.ConfigurationService; +import org.apache.oozie.service.Services; +import org.apache.oozie.service.XLogService; + +public class XLogErrorStreamer extends XLogStreamer { + + public static final String STREAM_BUFFER_LEN = CONF_PREFIX + "error.buffer.len"; + + public XLogErrorStreamer(XLogFilter logFilter, Map<String, String[]> requestParameters) { + super(logFilter, Services.get().get(XLogService.class).getOozieErrorLogPath(), Services.get() + .get(XLogService.class).getOozieErrorLogName(), Services.get().get(XLogService.class) + .getOozieErrorLogRotation()); + this.requestParam = requestParameters; + bufferLen = ConfigurationService.getInt(STREAM_BUFFER_LEN, 2048); + } + + public XLogErrorStreamer(Map<String, String[]> requestParameters) throws CommandException { + this(new XLogFilter(new XLogUserFilterParam(requestParameters)), requestParameters); + } + + @Override + protected void calculateAndValidateDateRange(Date startTime, Date endTime) throws IOException { + logFilter.calculateAndCheckDates(startTime, endTime); + // no validate and truncating + } + + @Override + public boolean isLogEnabled() { + return Services.get().get(XLogService.class).isErrorLogEnabled(); + } + + @Override + public String getLogType() { + return RestConstants.JOB_SHOW_ERROR_LOG; + } + + @Override + public String getLogDisableMessage() { + return "Error Log is disabled!!"; + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/util/XLogFilter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/XLogFilter.java b/core/src/main/java/org/apache/oozie/util/XLogFilter.java index cdf172b..505f945 100644 --- a/core/src/main/java/org/apache/oozie/util/XLogFilter.java +++ b/core/src/main/java/org/apache/oozie/util/XLogFilter.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.oozie.util; import java.io.IOException; @@ -56,6 +55,7 @@ public class XLogFilter { private boolean isActionList = false; private String formattedEndDate; private String formattedStartDate; + private String truncatedMessage; // TODO Patterns to be read from config file private static final String DEFAULT_REGEX = "[^\\]]*"; @@ -290,8 +290,9 @@ public class XLogFilter { } public String getDebugMessage() { - return "Log start time = " + getStartDate() + ". Log end time = " + getEndDate() + ". User Log Filter = " - + getUserLogFilter() + System.getProperty("line.separator"); + return new StringBuilder("Log start time = ").append(getStartDate()).append(". Log end time = ") + .append(getEndDate()).append(". User Log Filter = ").append(getUserLogFilter()) + .append(System.getProperty("line.separator")).toString(); } public boolean isActionList() { @@ -302,7 +303,20 @@ public class XLogFilter { this.isActionList = isActionList; } - private void calculateScanDate(Date jobStartTime, Date jobEndTime) throws IOException { + /** + * Calculate scan date + * + * @param jobStartTime the job start time + * @param jobEndTime the job end time + * @throws IOException Signals that an I/O exception has occurred. + */ + public void calculateAndCheckDates(Date jobStartTime, Date jobEndTime) throws IOException { + + // for testcase, otherwise jobStartTime and jobEndTime will be always + // set + if (jobStartTime == null || jobEndTime == null) { + return; + } if (userLogFilter.getStartDate() != null) { startDate = userLogFilter.getStartDate(); @@ -311,14 +325,15 @@ public class XLogFilter { startDate = adjustOffset(jobStartTime, userLogFilter.getStartOffset()); } else { - startDate = jobStartTime; + startDate = new Date(jobStartTime.getTime()); } if (userLogFilter.getEndDate() != null) { endDate = userLogFilter.getEndDate(); } else if (userLogFilter.getEndOffset() != -1) { - // If user has specified startdate as absolute then end offset will be on user start date, + // If user has specified startdate as absolute then end offset will + // be on user start date, // else end offset will be calculated on job startdate. if (userLogFilter.getStartDate() != null) { endDate = adjustOffset(startDate, userLogFilter.getEndOffset()); @@ -328,14 +343,14 @@ public class XLogFilter { } } else { - endDate = jobEndTime; + endDate = new Date(jobEndTime.getTime()); } // if recent offset is specified then start time = endtime - offset if (getUserLogFilter().getRecent() != -1) { startDate = adjustOffset(endDate, userLogFilter.getRecent() * -1); } - //add buffer iff dates are not asbsolute + // add buffer if dates are not absolute if (userLogFilter.getStartDate() == null) { startDate = adjustOffset(startDate, -LOG_TIME_BUFFER); } @@ -345,26 +360,27 @@ public class XLogFilter { formattedEndDate = XLogUserFilterParam.dt.get().format(getEndDate()); formattedStartDate = XLogUserFilterParam.dt.get().format(getStartDate()); + + if (startDate.after(endDate)) { + throw new IOException( + "Start time should be less than end time. startTime = " + startDate + " endTime = " + endDate); + } } /** - * Calculate and validate date range. + * validate date range. * * @param jobStartTime the job start time * @param jobEndTime the job end time * @throws IOException Signals that an I/O exception has occurred. */ - public void calculateAndValidateDateRange(Date jobStartTime, Date jobEndTime) throws IOException { - // for testcase, otherwise jobStartTime and jobEndTime will be always set + public void validateDateRange(Date jobStartTime, Date jobEndTime) throws IOException { + // for testcase, otherwise jobStartTime and jobEndTime will be always + // set if (jobStartTime == null || jobEndTime == null) { return; } - calculateScanDate(jobStartTime, jobEndTime); - if (startDate.after(endDate)) { - throw new IOException("Start time should be less than end time. startTime = " + startDate + " endtime = " - + endDate); - } long diffHours = (endDate.getTime() - startDate.getTime()) / (60 * 60 * 1000); if (isActionList) { int actionLogDuration = ConfigurationService.getInt(MAX_ACTIONLIST_SCAN_DURATION); @@ -372,10 +388,9 @@ public class XLogFilter { return; } if (diffHours > actionLogDuration) { - throw new IOException( - "Request log streaming time range with action list is higher than configured. Please reduce the scan " - + "time range. Input range (hours) = " + diffHours - + " system allowed (hours) with action list = " + actionLogDuration); + setTruncatedMessage("Truncated logs to max log scan duration " + actionLogDuration + " hrs"); + startDate = adjustOffset(endDate, -1 * actionLogDuration * 60); + startDate = adjustOffset(startDate, -1 * LOG_TIME_BUFFER); } } else { @@ -384,14 +399,27 @@ public class XLogFilter { return; } if (diffHours > logDuration) { - throw new IOException( - "Request log streaming time range is higher than configured. Please reduce the scan time range. For coord" - + " jobs you can provide action list to reduce log scan time range. Input range (hours) = " - + diffHours + " system allowed (hours) = " + logDuration); + setTruncatedMessage("Truncated logs to max log scan duration " + logDuration + " hrs"); + startDate = adjustOffset(endDate, -1 * logDuration * 60); + startDate = adjustOffset(startDate, -1 * LOG_TIME_BUFFER); } } } + protected void setTruncatedMessage(String message) { + truncatedMessage = message; + + } + + public String getTruncatedMessage() { + if (StringUtils.isEmpty(truncatedMessage)) { + return truncatedMessage; + } + else { + return truncatedMessage + System.getProperty("line.separator"); + } + } + /** * Adjust offset, offset will always be in min. * http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/util/XLogStreamer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/XLogStreamer.java b/core/src/main/java/org/apache/oozie/util/XLogStreamer.java index 19f1fee..012fbef 100644 --- a/core/src/main/java/org/apache/oozie/util/XLogStreamer.java +++ b/core/src/main/java/org/apache/oozie/util/XLogStreamer.java @@ -25,28 +25,64 @@ import java.util.ArrayList; import java.util.Calendar; import java.util.Collections; import java.util.Date; +import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.io.BufferedReader; +import org.apache.commons.lang.StringUtils; +import org.apache.oozie.client.rest.RestConstants; +import org.apache.oozie.command.CommandException; +import org.apache.oozie.service.ConfigurationService; +import org.apache.oozie.service.Service; +import org.apache.oozie.service.Services; +import org.apache.oozie.service.XLogService; + /** * XLogStreamer streams the given log file to writer after applying the given filter. */ public class XLogStreamer { private static XLog LOG = XLog.getLog(XLogStreamer.class); + protected static final String CONF_PREFIX = Service.CONF_PREFIX + "XLogStreamingService."; + public static final String STREAM_BUFFER_LEN = CONF_PREFIX + "buffer.len"; + private String logFile; private String logPath; - private XLogFilter logFilter; + protected XLogFilter logFilter; private long logRotation; + Map<String, String[]> requestParam; + protected int totalDataWritten; + protected int bufferLen; public XLogStreamer(XLogFilter logFilter, String logPath, String logFile, long logRotationSecs) { - this.logFilter = logFilter; if (logFile == null) { logFile = "oozie-app.log"; } + + this.logFilter = logFilter; this.logFile = logFile; this.logPath = logPath; this.logRotation = logRotationSecs * 1000l; + bufferLen = ConfigurationService.getInt(STREAM_BUFFER_LEN, 4096); + } + + public XLogStreamer(XLogFilter logFilter) { + this(logFilter, Services.get().get(XLogService.class).getOozieLogPath(), Services.get().get(XLogService.class) + .getOozieLogName(), Services.get().get(XLogService.class).getOozieLogRotation()); + + } + + public XLogStreamer(XLogFilter logFilter, Map<String, String[]> params) { + this(logFilter, Services.get().get(XLogService.class).getOozieLogPath(), Services.get().get(XLogService.class) + .getOozieLogName(), Services.get().get(XLogService.class).getOozieLogRotation()); + this.requestParam = params; + + } + + + public XLogStreamer(Map<String, String[]> params) throws CommandException { + this(new XLogFilter(new XLogUserFilterParam(params))); + this.requestParam = params; } /** @@ -58,15 +94,34 @@ public class XLogStreamer { * @param endTime * @throws IOException */ - public void streamLog(Writer writer, Date startTime, Date endTime, int bufferLen) throws IOException { + public void streamLog(Writer writer, Date startTime, Date endTime) throws IOException { + streamLog(writer, startTime, endTime, true); + } + + /** + * Gets the files that are modified between startTime and endTime in the given logPath and streams the log after + * applying the filters + * + * @param writer the writer + * @param startTime the start time + * @param endTime the end time + * @param appendDebug the append debug + * @throws IOException Signals that an I/O exception has occurred. + */ + public void streamLog(Writer writer, Date startTime, Date endTime, boolean appendDebug) throws IOException { // Get a Reader for the log file(s) BufferedReader reader = new BufferedReader(getReader(startTime, endTime)); try { - if(logFilter.isDebugMode()){ - writer.write(logFilter.getDebugMessage()); + if (appendDebug) { + if (!StringUtils.isEmpty(logFilter.getTruncatedMessage())) { + writer.write(logFilter.getTruncatedMessage()); + } + if (logFilter.isDebugMode()) { + writer.write(logFilter.getDebugMessage()); + } } // Process the entire logs from the reader using the logFilter - new TimestampedMessageParser(reader, logFilter).processRemaining(writer, bufferLen); + new TimestampedMessageParser(reader, logFilter).processRemaining(writer, this); } finally { reader.close(); @@ -83,12 +138,17 @@ public class XLogStreamer { */ private MultiFileReader getReader(Date startTime, Date endTime) throws IOException { - logFilter.calculateAndValidateDateRange(startTime, endTime); + calculateAndValidateDateRange(startTime, endTime); return new MultiFileReader(getFileList(logFilter.getStartDate(), logFilter.getEndDate())); } + protected void calculateAndValidateDateRange(Date startTime, Date endTime) throws IOException { + logFilter.calculateAndCheckDates(startTime, endTime); + logFilter.validateDateRange(startTime, endTime); + } + public BufferedReader makeReader(Date startTime, Date endTime) throws IOException { - return new BufferedReader(getReader(startTime,endTime)); + return new BufferedReader(getReader(startTime, endTime)); } /** @@ -220,7 +280,8 @@ public class XLogStreamer { LOG.warn("oozie.log has been GZipped, which is unexpected"); // Return a value other than -1 to include the file in list returnVal = 0; - } else { + } + else { Matcher m = gzTimePattern.matcher(fileName); if (m.matches() && m.groupCount() == 4) { int year = Integer.parseInt(m.group(1)); @@ -242,11 +303,47 @@ public class XLogStreamer { || (startTime <= logFileStartTime && endTime >= logFileEndTime)) { returnVal = logFileStartTime; } - } else { + } + else { LOG.debug("Filename " + fileName + " does not match the expected format"); returnVal = -1; } } return returnVal; } + + public boolean isLogEnabled() { + return Services.get().get(XLogService.class).getLogOverWS(); + } + + public String getLogType() { + return RestConstants.JOB_SHOW_LOG; + } + + public XLogFilter getXLogFilter() { + return logFilter; + } + + public String getLogDisableMessage() { + return "Log streaming disabled!!"; + } + + public Map<String, String[]> getRequestParam() { + return requestParam; + } + + public boolean shouldFlushOutput(int writtenBytes) { + this.totalDataWritten += writtenBytes; + if (this.totalDataWritten > getBufferLen()) { + this.totalDataWritten = 0; + return true; + } + else { + return false; + } + } + + public int getBufferLen() { + return bufferLen; + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/resources/oozie-default.xml ---------------------------------------------------------------------- diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml index ae384b4..e7a48a0 100644 --- a/core/src/main/resources/oozie-default.xml +++ b/core/src/main/resources/oozie-default.xml @@ -192,7 +192,23 @@ <property> <name>oozie.service.XLogStreamingService.buffer.len</name> <value>4096</value> - <description>4K buffer for streaming the logs progressively</description> + <description>4K buffer for streaming the logs progressively + </description> + </property> + <property> + <name>oozie.service.XLogStreamingService.error.buffer.len</name> + <value>2048</value> + <description>2K buffer for streaming the error logs + progressively + </description> + </property> + + <property> + <name>oozie.service.XLogStreamingService.audit.buffer.len</name> + <value>3</value> + <description>Number of lines for streaming the audit logs + progressively + </description> </property> <!-- HCatAccessorService --> @@ -2223,6 +2239,14 @@ will be the requeue interval for the actions which are waiting for a long time w </property> <property> + <name>oozie.server.connection.timeout.seconds</name> + <value>180</value> + <description> + Defines connection timeout used for Oozie server communicating to other Oozie server over HTTP(s). Default is 3 min. + </description> + </property> + + <property> <name>oozie.authentication.token.validity</name> <value>36000</value> <description> http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java b/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java index 3eb1016..d6f1aef 100644 --- a/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java +++ b/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java @@ -26,13 +26,9 @@ import java.util.Comparator; import java.util.Date; import java.util.HashMap; import java.util.List; -import java.util.Map; - import org.apache.oozie.client.CoordinatorAction; import org.apache.oozie.client.CoordinatorJob; -import org.apache.oozie.client.Job; import org.apache.oozie.client.rest.RestConstants; -import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; import org.apache.oozie.service.DagXLogInfoService; import org.apache.oozie.service.Services; @@ -40,6 +36,7 @@ import org.apache.oozie.service.XLogStreamingService; import org.apache.oozie.test.XDataTestCase; import org.apache.oozie.util.DateUtils; import org.apache.oozie.util.XLogFilter; +import org.apache.oozie.util.XLogStreamer; public class TestCoordinatorEngineStreamLog extends XDataTestCase { private Services services; @@ -63,9 +60,9 @@ public class TestCoordinatorEngineStreamLog extends XDataTestCase { Date endTime; @Override - public void streamLog(XLogFilter filter1, Date startTime, Date endTime, Writer writer, Map<String, String[]> params) + public void streamLog(XLogStreamer logStreamer, Date startTime, Date endTime, Writer writer) throws IOException { - filter = filter1; + filter = logStreamer.getXLogFilter(); this.startTime = startTime; this.endTime = endTime; } http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java b/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java index 2a3d3d3..3c6525d 100644 --- a/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java +++ b/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java @@ -36,6 +36,7 @@ import org.apache.oozie.test.XTestCase; import org.apache.oozie.util.ConfigUtils; import org.apache.oozie.util.IOUtils; import org.apache.oozie.util.XLogFilter; +import org.apache.oozie.util.XLogStreamer; import org.apache.oozie.workflow.lite.LiteWorkflowAppParser; import java.io.File; @@ -176,7 +177,7 @@ public class TestConfigurationService extends XTestCase { assertEquals("http://0.0.0.0:11000/oozie/callback", ConfigurationService.get(CallbackService.CONF_BASE_URL)); assertEquals(5, ConfigurationService.getInt(CallbackService.CONF_EARLY_REQUEUE_MAX_RETRIES)); assertEquals("gz", ConfigurationService.get(CodecFactory.COMPRESSION_OUTPUT_CODEC)); - assertEquals(4096, ConfigurationService.getInt(XLogStreamingService.STREAM_BUFFER_LEN)); + assertEquals(4096, ConfigurationService.getInt(XLogStreamer.STREAM_BUFFER_LEN)); assertEquals(10000, ConfigurationService.getLong(JvmPauseMonitorService.WARN_THRESHOLD_KEY)); assertEquals(60, ConfigurationService.getInt(InstrumentationService.CONF_LOGGING_INTERVAL)); assertEquals(30, ConfigurationService.getInt(PurgeService.CONF_OLDER_THAN)); http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/test/java/org/apache/oozie/service/TestXLogStreamingService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestXLogStreamingService.java b/core/src/test/java/org/apache/oozie/service/TestXLogStreamingService.java index bebb678..1921f1b 100644 --- a/core/src/test/java/org/apache/oozie/service/TestXLogStreamingService.java +++ b/core/src/test/java/org/apache/oozie/service/TestXLogStreamingService.java @@ -23,18 +23,23 @@ import org.apache.oozie.command.CommandException; import org.apache.oozie.test.XTestCase; import org.apache.oozie.util.XLog; import org.apache.oozie.util.XLogAuditFilter; +import org.apache.oozie.util.XLogAuditStreamer; +import org.apache.oozie.util.XLogErrorStreamer; import org.apache.oozie.util.XLogFilter; +import org.apache.oozie.util.XLogStreamer; import org.apache.oozie.util.XLogUserFilterParam; import java.io.File; import java.io.FileOutputStream; import java.io.InputStream; import java.io.StringWriter; -import java.util.HashMap; +import java.util.Date; import java.util.Properties; public class TestXLogStreamingService extends XTestCase { + final int FIFTEEN_HOURS = 60 * 60 * 1000 * 15; + @Override protected void setUp() throws Exception { super.setUp(); @@ -386,6 +391,28 @@ public class TestXLogStreamingService extends XTestCase { } } + public void testTuncateLog() throws Exception { + File log4jFile = new File(getTestCaseConfDir(), "test-log4j.properties"); + ClassLoader cl = Thread.currentThread().getContextClassLoader(); + InputStream is = cl.getResourceAsStream("test-no-dash-log4j.properties"); + Properties log4jProps = new Properties(); + log4jProps.load(is); + log4jProps.setProperty("log4j.appender.oozie.File", getTestCaseDir() + "/oozie.log"); + log4jProps.store(new FileOutputStream(log4jFile), ""); + setSystemProperty(XLogService.LOG4J_FILE, log4jFile.getName()); + + new Services().init(); + ConfigurationService.set(XLogFilter.MAX_SCAN_DURATION, "10"); + Date startDate = new Date(); + Date endDate = new Date(startDate.getTime() + FIFTEEN_HOURS); + String log = doStreamLog(new XLogFilter(), startDate, endDate); + assertTrue(log.contains("Truncated logs to max log scan duration")); + log = doStreamErrorLog(new XLogFilter(), startDate, endDate); + assertFalse(log.contains("Truncated logs to max log scan duration")); + log = doStreamAuditLog(new XLogFilter(), startDate, endDate); + assertFalse(log.contains("Truncated logs to max log scan duration")); + } + private boolean doStreamDisabledCheckWithServices() throws Exception { boolean result = false; try { @@ -405,21 +432,35 @@ public class TestXLogStreamingService extends XTestCase { } private String doStreamLog(XLogFilter xf) throws Exception { - StringWriter w = new StringWriter(); - Services.get().get(XLogStreamingService.class).streamLog(xf, null, null, w, new HashMap<String, String[]>()); - return w.toString(); + return doStreamLog(new XLogStreamer(xf), null, null); } + private String doStreamErrorLog(XLogFilter xf) throws Exception { - StringWriter w = new StringWriter(); - Services.get().get(XLogStreamingService.class).streamErrorLog(xf, null, null, w, new HashMap<String, String[]>()); - return w.toString(); + return doStreamLog(new XLogErrorStreamer(xf, null), null, null); } + private String doStreamAuditLog(XLogFilter xf) throws Exception { + return doStreamLog(new XLogAuditStreamer(xf, null), null, null); + } + + private String doStreamLog(XLogStreamer logStreamer, Date startDate, Date endDate) throws Exception { StringWriter w = new StringWriter(); - Services.get().get(XLogStreamingService.class).streamAuditLog(xf, null, null, w, new HashMap<String, String[]>()); + Services.get().get(XLogStreamingService.class) + .streamLog(logStreamer, startDate, endDate, w); return w.toString(); } + private String doStreamLog(XLogFilter xf, Date startDate, Date endDate) throws Exception { + return doStreamLog(new XLogStreamer(xf, null), startDate, endDate); + } + + private String doStreamErrorLog(XLogFilter xf, Date startDate, Date endDate) throws Exception { + return doStreamLog(new XLogErrorStreamer(xf, null), null, null); + } + + private String doStreamAuditLog(XLogFilter xf, Date startDate, Date endDate) throws Exception { + return doStreamLog(new XLogAuditStreamer(xf, null), null, null); + } private boolean doErrorStreamDisabledCheck() throws Exception { XLogFilter xf = new XLogFilter(new XLogUserFilterParam(null)); http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java b/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java index fca8d84..df097fd 100644 --- a/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java +++ b/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java @@ -32,7 +32,9 @@ import org.apache.oozie.client.rest.RestConstants; import org.apache.oozie.test.EmbeddedServletContainer; import org.apache.oozie.test.ZKXTestCase; import org.apache.oozie.util.DateUtils; +import org.apache.oozie.util.XLogErrorStreamer; import org.apache.oozie.util.XLogFilter; +import org.apache.oozie.util.XLogStreamer; import org.apache.oozie.util.ZKUtils; public class TestZKXLogStreamingService extends ZKXTestCase { @@ -198,15 +200,29 @@ public class TestZKXLogStreamingService extends ZKXTestCase { return doStreamLog(xf, new HashMap<String, String[]>()); } + protected String doStreamLog(XLogFilter xf, Date startTime, Date endTime) throws Exception { + return doStreamLog(xf, new HashMap<String, String[]>(), false, startTime, endTime); + } + protected String doStreamErrorLog(XLogFilter xf) throws Exception { return doStreamLog(xf, new HashMap<String, String[]>(), true); } + private String doStreamErrorLog(XLogFilter xf, Date startDate, Date endDate) throws Exception { + return doStreamLog(xf, new HashMap<String, String[]>(), true, startDate, startDate); + } + + protected String doStreamLog(XLogFilter xf, Map<String, String[]> param) throws Exception { return doStreamLog(xf, param, false); } protected String doStreamLog(XLogFilter xf, Map<String, String[]> param, boolean isErrorLog) throws Exception { + return doStreamLog(xf, param, isErrorLog, null, null); + } + + protected String doStreamLog(XLogFilter xf, Map<String, String[]> param, boolean isErrorLog, Date startTime, + Date endTime) throws Exception { StringWriter w = new StringWriter(); ZKXLogStreamingService zkxlss = new ZKXLogStreamingService(); try { @@ -215,10 +231,10 @@ public class TestZKXLogStreamingService extends ZKXTestCase { zkxlss.init(services); sleep(1000); // Sleep to allow ZKUtils ServiceCache to update if (isErrorLog) { - zkxlss.streamErrorLog(xf, null, null, w, param); + zkxlss.streamLog(new XLogErrorStreamer(xf, param), startTime, endTime, w); } else { - zkxlss.streamLog(xf, null, null, w, param); + zkxlss.streamLog(new XLogStreamer(xf, param), startTime, endTime, w); } } finally { @@ -583,4 +599,30 @@ public class TestZKXLogStreamingService extends ZKXTestCase { } } + public void testTuncateLog() throws Exception { + XLogFilter.reset(); + File log4jFile = new File(getTestCaseConfDir(), "test-log4j.properties"); + ClassLoader cl = Thread.currentThread().getContextClassLoader(); + InputStream is = cl.getResourceAsStream("test-no-dash-log4j.properties"); + Properties log4jProps = new Properties(); + log4jProps.load(is); + // prevent conflicts with other tests by changing the log file location + log4jProps.setProperty("log4j.appender.oozie.File", getTestCaseDir() + "/oozie.log"); + log4jProps.store(new FileOutputStream(log4jFile), ""); + setSystemProperty(XLogService.LOG4J_FILE, log4jFile.getName()); + assertFalse(doStreamDisabledCheck()); + File logFile = new File(Services.get().get(XLogService.class).getOozieLogPath(), + Services.get().get(XLogService.class).getOozieLogName()); + logFile.getParentFile().mkdirs(); + + ConfigurationService.set(XLogFilter.MAX_SCAN_DURATION, "1"); + Date startDate = new Date(); + Date endDate = new Date(startDate.getTime() + 60 * 60 * 1000 * 15); + + String log = doStreamLog(new XLogFilter(), startDate, endDate); + assertTrue(log.contains("Truncated logs to max log scan duration")); + String logError = doStreamErrorLog(new XLogFilter(), startDate, endDate); + assertFalse(logError.contains("Truncated logs to max log scan duration")); + } + }
