Author: mona
Date: Tue Nov 26 20:14:33 2013
New Revision: 1545809
URL: http://svn.apache.org/r1545809
Log:
OOZIE-1474 Fix logging issues - latency, accurate job ids, coord Job UI to show
job logs (mona)
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/ShellActionExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
oozie/trunk/core/src/main/java/org/apache/oozie/service/XLogStreamingService.java
oozie/trunk/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
oozie/trunk/core/src/main/java/org/apache/oozie/util/LogUtils.java
oozie/trunk/core/src/main/java/org/apache/oozie/util/TimestampedMessageParser.java
oozie/trunk/core/src/main/java/org/apache/oozie/util/XLogStreamer.java
oozie/trunk/core/src/main/resources/oozie-default.xml
oozie/trunk/core/src/test/java/org/apache/oozie/util/TestLogStreamer.java
oozie/trunk/release-log.txt
oozie/trunk/webapp/src/main/webapp/oozie-console.js
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java?rev=1545809&r1=1545808&r2=1545809&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
Tue Nov 26 20:14:33 2013
@@ -65,7 +65,7 @@ import org.apache.oozie.service.URIHandl
import org.apache.oozie.service.WorkflowAppService;
import org.apache.oozie.servlet.CallbackServlet;
import org.apache.oozie.util.ELEvaluator;
-import org.apache.oozie.util.IOUtils;
+import org.apache.oozie.util.LogUtils;
import org.apache.oozie.util.PropertiesUtils;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XLog;
@@ -109,7 +109,7 @@ public class JavaActionExecutor extends
private static final String FAILED = "FAILED";
private static final String FAILED_KILLED = "FAILED/KILLED";
private static final String RUNNING = "RUNNING";
- protected XLog log = XLog.getLog(getClass());
+ protected XLog LOG = XLog.getLog(getClass());
private static final Pattern heapPattern =
Pattern.compile("-Xmx(([0-9]+)[mMgG])");
static {
@@ -438,7 +438,7 @@ public class JavaActionExecutor extends
return conf;
}
catch (Exception ex) {
- XLog.getLog(getClass()).debug(
+ LOG.debug(
"Errors when add to DistributedCache. Path=" +
uri.toString() + ", archive=" + archive + ", conf="
+ XmlUtils.prettyPrint(conf).toString());
throw convertException(ex);
@@ -753,7 +753,7 @@ public class JavaActionExecutor extends
private void injectCallback(Context context, Configuration conf) {
String callback = context.getCallbackUrl("$jobStatus");
if (conf.get("job.end.notification.url") != null) {
- XLog.getLog(getClass()).warn("Overriding the action job end
notification URI");
+ LOG.warn("Overriding the action job end notification URI");
}
conf.set("job.end.notification.url", callback);
}
@@ -790,7 +790,7 @@ public class JavaActionExecutor extends
// action job configuration
Configuration actionConf = createBaseHadoopConf(context,
actionXml);
setupActionConf(actionConf, context, actionXml, appPathRoot);
- XLog.getLog(getClass()).debug("Setting LibFilesArchives ");
+ LOG.debug("Setting LibFilesArchives ");
setLibFilesArchives(context, actionXml, appPathRoot, actionConf);
String jobName = actionConf.get(HADOOP_JOB_NAME);
@@ -831,7 +831,7 @@ public class JavaActionExecutor extends
JobConf launcherJobConf = createLauncherConf(actionFs, context,
action, actionXml, actionConf);
injectLauncherCallback(context, launcherJobConf);
- XLog.getLog(getClass()).debug("Creating Job Client for action " +
action.getId());
+ LOG.debug("Creating Job Client for action " + action.getId());
jobClient = createJobClient(context, launcherJobConf);
String launcherId =
LauncherMapperHelper.getRecoveryId(launcherJobConf, context.getActionDir(),
context
.getRecoveryId());
@@ -850,7 +850,7 @@ public class JavaActionExecutor extends
}
}
else {
- XLog.getLog(getClass()).debug("Submitting the job through Job
Client for action " + action.getId());
+ LOG.debug("Submitting the job through Job Client for action "
+ action.getId());
// setting up propagation of the delegation token.
HadoopAccessorService has =
Services.get().get(HadoopAccessorService.class);
@@ -861,12 +861,12 @@ public class JavaActionExecutor extends
// insert credentials tokens to launcher job conf if needed
if (needInjectCredentials()) {
for (Token<? extends TokenIdentifier> tk :
credentialsConf.getCredentials().getAllTokens()) {
- log.debug("ADDING TOKEN: " + tk.getKind().toString());
+ LOG.debug("ADDING TOKEN: " + tk.getKind().toString());
launcherJobConf.getCredentials().addToken(tk.getKind(), tk);
}
}
else {
- log.info("No need to inject credentials.");
+ LOG.info("No need to inject credentials.");
}
runningJob = jobClient.submitJob(launcherJobConf);
if (runningJob == null) {
@@ -874,7 +874,7 @@ public class JavaActionExecutor extends
"Error submitting launcher for action [{0}]",
action.getId());
}
launcherId = runningJob.getID().toString();
- XLog.getLog(getClass()).debug("After submission get the
launcherId " + launcherId);
+ LOG.debug("After submission get the launcherId " + launcherId);
}
String jobTracker = launcherJobConf.get(HADOOP_JOB_TRACKER);
@@ -892,7 +892,7 @@ public class JavaActionExecutor extends
}
catch (Exception e) {
if (exception) {
- log.error("JobClient error: ", e);
+ LOG.error("JobClient error: ", e);
}
else {
throw convertException(e);
@@ -929,20 +929,20 @@ public class JavaActionExecutor extends
for (String key : credPropertiesMap.keySet()) {
CredentialsProperties prop = credPropertiesMap.get(key);
if (prop != null) {
- log.debug("Credential Properties set for action : " +
action.getId());
+ LOG.debug("Credential Properties set for action : " +
action.getId());
for (String property : prop.getProperties().keySet()) {
actionConf.set(property,
prop.getProperties().get(property));
- log.debug("property : '" + property + "', value :
'" + prop.getProperties().get(property) + "'");
+ LOG.debug("property : '" + property + "', value :
'" + prop.getProperties().get(property) + "'");
}
}
}
}
else {
- log.warn("No credential properties found for action : " +
action.getId() + ", cred : " + action.getCred());
+ LOG.warn("No credential properties found for action : " +
action.getId() + ", cred : " + action.getCred());
}
}
else {
- log.warn("context or action is null");
+ LOG.warn("context or action is null");
}
return credPropertiesMap;
}
@@ -959,10 +959,10 @@ public class JavaActionExecutor extends
Credentials credentialObject =
credProvider.createCredentialObject();
if (credentialObject != null) {
credentialObject.addtoJobConf(jobconf, credProps,
context);
- log.debug("Retrieved Credential '" + credName + "' for
action " + action.getId());
+ LOG.debug("Retrieved Credential '" + credName + "' for
action " + action.getId());
}
else {
- log.debug("Credentials object is null for name= " +
credName + ", type=" + credProps.getType());
+ LOG.debug("Credentials object is null for name= " +
credName + ", type=" + credProps.getType());
throw new
ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA020",
"Could not load credentials of type [{0}] with
name [{1}]]; perhaps it was not defined"
+ " in oozie-site.xml?", credProps.getType(),
credName);
@@ -978,7 +978,7 @@ public class JavaActionExecutor extends
HashMap<String, CredentialsProperties> props = new HashMap<String,
CredentialsProperties>();
if (context != null && action != null) {
String credsInAction = action.getCred();
- log.debug("Get credential '" + credsInAction + "' properties for
action : " + action.getId());
+ LOG.debug("Get credential '" + credsInAction + "' properties for
action : " + action.getId());
String[] credNames = credsInAction.split(",");
for (String credName : credNames) {
CredentialsProperties credProps = getCredProperties(context,
credName);
@@ -986,7 +986,7 @@ public class JavaActionExecutor extends
}
}
else {
- log.warn("context or action is null");
+ LOG.warn("context or action is null");
}
return props;
}
@@ -1003,7 +1003,7 @@ public class JavaActionExecutor extends
for (Element credential : (List<Element>)
credentials.getChildren("credential", credentials.getNamespace())) {
String name = credential.getAttributeValue("name");
String type = credential.getAttributeValue("type");
- log.debug("getCredProperties: Name: " + name + ", Type: " +
type);
+ LOG.debug("getCredProperties: Name: " + name + ", Type: " +
type);
if (name.equalsIgnoreCase(credName)) {
credProp = new CredentialsProperties(name, type);
for (Element property : (List<Element>)
credential.getChildren("property",
@@ -1018,29 +1018,31 @@ public class JavaActionExecutor extends
propertyValue = eval.evaluate(propertyValue,
String.class);
credProp.getProperties().put(propertyName,
propertyValue);
- log.debug("getCredProperties: Properties name :'" +
propertyName + "', Value : '"
+ LOG.debug("getCredProperties: Properties name :'" +
propertyName + "', Value : '"
+ propertyValue + "'");
}
}
}
} else {
- log.warn("credentials is null for the action");
+ LOG.debug("credentials is null for the action");
}
return credProp;
}
@Override
public void start(Context context, WorkflowAction action) throws
ActionExecutorException {
+ LOG = XLog.resetPrefix(LOG);
+ LogUtils.setLogInfo(action, new XLog.Info());
try {
- XLog.getLog(getClass()).debug("Starting action " + action.getId()
+ " getting Action File System");
+ LOG.debug("Starting action " + action.getId() + " getting Action
File System");
FileSystem actionFs = context.getAppFileSystem();
- XLog.getLog(getClass()).debug("Preparing action Dir through
copying " + context.getActionDir());
+ LOG.debug("Preparing action Dir through copying " +
context.getActionDir());
prepareActionDir(actionFs, context);
- XLog.getLog(getClass()).debug("Action Dir is ready. Submitting the
action ");
+ LOG.debug("Action Dir is ready. Submitting the action ");
submitLauncher(actionFs, context, action);
- XLog.getLog(getClass()).debug("Action submit completed. Performing
check ");
+ LOG.debug("Action submit completed. Performing check ");
check(context, action);
- XLog.getLog(getClass()).debug("Action check is done after
submission");
+ LOG.debug("Action check is done after submission");
}
catch (Exception ex) {
throw convertException(ex);
@@ -1120,14 +1122,14 @@ public class JavaActionExecutor extends
action.getId());
}
context.setExternalChildIDs(newId);
- XLog.getLog(getClass()).info(XLog.STD, "External ID swap,
old ID [{0}] new ID [{1}]", launcherId,
+ LOG.info(XLog.STD, "External ID swap, old ID [{0}] new ID
[{1}]", launcherId,
newId);
}
else {
String externalIDs =
actionData.get(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS);
if (externalIDs != null) {
context.setExternalChildIDs(externalIDs);
- XLog.getLog(getClass()).info(XLog.STD, "Hadoop Jobs
launched : [{0}]", externalIDs);
+ LOG.info(XLog.STD, "Hadoop Jobs launched : [{0}]",
externalIDs);
}
}
if (runningJob.isComplete()) {
@@ -1135,25 +1137,24 @@ public class JavaActionExecutor extends
if (newId != null) {
actionData =
LauncherMapperHelper.getActionData(actionFs, context.getActionDir(), jobConf);
}
- XLog.getLog(getClass()).info(XLog.STD, "action completed,
external ID [{0}]",
+ LOG.info(XLog.STD, "action completed, external ID [{0}]",
action.getExternalId());
if (LauncherMapperHelper.isMainSuccessful(runningJob)) {
if (getCaptureOutput(action) &&
LauncherMapperHelper.hasOutputData(actionData)) {
context.setExecutionData(SUCCEEDED,
PropertiesUtils.stringToProperties(actionData
.get(LauncherMapper.ACTION_DATA_OUTPUT_PROPS)));
- XLog.getLog(getClass()).info(XLog.STD, "action
produced output");
+ LOG.info(XLog.STD, "action produced output");
}
else {
context.setExecutionData(SUCCEEDED, null);
}
if (LauncherMapperHelper.hasStatsData(actionData)) {
context.setExecutionStats(actionData.get(LauncherMapper.ACTION_DATA_STATS));
- XLog.getLog(getClass()).info(XLog.STD, "action
produced stats");
+ LOG.info(XLog.STD, "action produced stats");
}
getActionData(actionFs, runningJob, action, context);
}
else {
- XLog log = XLog.getLog(getClass());
String errorReason;
if
(actionData.containsKey(LauncherMapper.ACTION_DATA_ERROR_PROPS)) {
Properties props =
PropertiesUtils.stringToProperties(actionData
@@ -1166,37 +1167,37 @@ public class JavaActionExecutor extends
errorCode = "JA019";
}
errorReason = props.getProperty("error.reason");
- log.warn("Launcher ERROR, reason: {0}",
errorReason);
+ LOG.warn("Launcher ERROR, reason: {0}",
errorReason);
String exMsg =
props.getProperty("exception.message");
String errorInfo = (exMsg != null) ? exMsg :
errorReason;
context.setErrorInfo(errorCode, errorInfo);
String exStackTrace =
props.getProperty("exception.stacktrace");
if (exMsg != null) {
- log.warn("Launcher exception: {0}{E}{1}",
exMsg, exStackTrace);
+ LOG.warn("Launcher exception: {0}{E}{1}",
exMsg, exStackTrace);
}
}
else {
- errorReason = XLog.format("LauncherMapper died,
check Hadoop log for job [{0}:{1}]", action
+ errorReason = XLog.format("LauncherMapper died,
check Hadoop LOG for job [{0}:{1}]", action
.getTrackerUri(), action.getExternalId());
- log.warn(errorReason);
+ LOG.warn(errorReason);
}
context.setExecutionData(FAILED_KILLED, null);
}
}
else {
context.setExternalStatus(RUNNING);
- XLog.getLog(getClass()).info(XLog.STD, "checking action,
external ID [{0}] status [{1}]",
+ LOG.info(XLog.STD, "checking action, external ID [{0}]
status [{1}]",
action.getExternalId(),
action.getExternalStatus());
}
}
else {
context.setExternalStatus(RUNNING);
- XLog.getLog(getClass()).info(XLog.STD, "checking action,
external ID [{0}] status [{1}]",
+ LOG.info(XLog.STD, "checking action, external ID [{0}] status
[{1}]",
action.getExternalId(), action.getExternalStatus());
}
}
catch (Exception ex) {
- XLog.getLog(getClass()).warn("Exception in check(). Message[{0}]",
ex.getMessage(), ex);
+ LOG.warn("Exception in check(). Message[{0}]", ex.getMessage(),
ex);
exception = true;
throw convertException(ex);
}
@@ -1207,7 +1208,7 @@ public class JavaActionExecutor extends
}
catch (Exception e) {
if (exception) {
- log.error("JobClient error: ", e);
+ LOG.error("JobClient error: ", e);
}
else {
throw convertException(e);
@@ -1267,7 +1268,7 @@ public class JavaActionExecutor extends
}
catch (Exception ex) {
if (exception) {
- log.error("Error: ", ex);
+ LOG.error("Error: ", ex);
}
else {
throw convertException(ex);
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/ShellActionExecutor.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/ShellActionExecutor.java?rev=1545809&r1=1545808&r2=1545809&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/ShellActionExecutor.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/ShellActionExecutor.java
Tue Nov 26 20:14:33 2013
@@ -147,7 +147,7 @@ public class ShellActionExecutor extends
}
val += appendValue;
conf.set(propertyName, val);
- log.debug("action conf is updated with default value for property
" + propertyName + ", old value :"
+ LOG.debug("action conf is updated with default value for property
" + propertyName + ", old value :"
+ conf.get(propertyName, "") + ", new value :" + val);
}
}
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java?rev=1545809&r1=1545808&r2=1545809&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java
Tue Nov 26 20:14:33 2013
@@ -178,7 +178,7 @@ public class SqoopActionExecutor extends
OOZIE_ACTION_EXTERNAL_STATS_WRITE, "true"))
&& (statsJsonString.getBytes().length <=
getMaxExternalStatsSize())) {
context.setExecutionStats(statsJsonString);
- log.debug(
+ LOG.debug(
"Printing stats for sqoop action as a JSON string :
[{0}]", statsJsonString);
}
} else {
@@ -198,7 +198,7 @@ public class SqoopActionExecutor extends
}
catch (Exception e) {
if (exception) {
- log.error("JobClient error: ", e);
+ LOG.error("JobClient error: ", e);
}
else {
throw convertException(e);
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java?rev=1545809&r1=1545808&r2=1545809&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
Tue Nov 26 20:14:33 2013
@@ -90,7 +90,7 @@ public class CoordActionInputCheckXComma
@Override
protected Void execute() throws CommandException {
- LOG.info("[" + actionId + "]::ActionInputCheck:: Action is in WAITING
state.");
+ LOG.debug("[" + actionId + "]::ActionInputCheck:: Action is in WAITING
state.");
// this action should only get processed if current time > nominal
time;
// otherwise, requeue this action for delay execution;
@@ -421,7 +421,6 @@ public class CoordActionInputCheckXComma
*/
private boolean checkResolvedUris(Element eAction, StringBuilder
existList, StringBuilder nonExistList,
Configuration conf) throws IOException {
- LOG.info("[" + actionId + "]::ActionInputCheck:: In
checkResolvedUris...");
Element inputList = eAction.getChild("input-events",
eAction.getNamespace());
if (inputList != null) {
if (nonExistList.length() > 0) {
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/service/CallableQueueService.java?rev=1545809&r1=1545808&r2=1545809&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
Tue Nov 26 20:14:33 2013
@@ -557,7 +557,7 @@ public class CallableQueueService implem
private synchronized boolean queue(CallableWrapper wrapper, boolean
ignoreQueueSize) {
if (!ignoreQueueSize && queue.size() >= queueSize) {
- log.warn("queue if full, ignoring queuing for [{0}]",
wrapper.getElement());
+ log.warn("queue full, ignoring queuing for [{0}]",
wrapper.getElement().getKey());
return false;
}
if (!executor.isShutdown()) {
@@ -573,7 +573,7 @@ public class CallableQueueService implem
}
}
else {
- log.warn("Executor shutting down, ignoring queueing of [{0}]",
wrapper.getElement());
+ log.warn("Executor shutting down, ignoring queueing of [{0}]",
wrapper.getElement().getKey());
}
return true;
}
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/service/XLogStreamingService.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/service/XLogStreamingService.java?rev=1545809&r1=1545808&r2=1545809&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/service/XLogStreamingService.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/service/XLogStreamingService.java
Tue Nov 26 20:14:33 2013
@@ -20,6 +20,7 @@ package org.apache.oozie.service;
import org.apache.oozie.util.Instrumentable;
import org.apache.oozie.util.Instrumentation;
import org.apache.oozie.util.XLogStreamer;
+
import java.io.IOException;
import java.io.Writer;
import java.util.Map;
@@ -29,7 +30,9 @@ import java.util.Date;
* Service that performs streaming of log files over Web Services if enabled
in XLogService
*/
public class XLogStreamingService implements Service, Instrumentable {
-
+ private static final String CONF_PREFIX = Service.CONF_PREFIX +
"XLogStreamingService.";
+ public static final String STREAM_BUFFER_LEN = CONF_PREFIX + "buffer.len";
+ protected int bufferLen;
/**
* Initialize the log streaming service.
@@ -38,6 +41,7 @@ public class XLogStreamingService implem
* @throws ServiceException thrown if the log streaming service could not
be initialized.
*/
public void init(Services services) throws ServiceException {
+ bufferLen = services.getConf().getInt(STREAM_BUFFER_LEN, 4096);
}
/**
@@ -79,11 +83,15 @@ public class XLogStreamingService implem
XLogService xLogService = Services.get().get(XLogService.class);
if (xLogService.getLogOverWS()) {
new XLogStreamer(filter, xLogService.getOozieLogPath(),
xLogService.getOozieLogName(),
- xLogService.getOozieLogRotation()).streamLog(writer,
startTime, endTime);
+ xLogService.getOozieLogRotation()).streamLog(writer,
startTime, endTime, bufferLen);
}
else {
writer.write("Log streaming disabled!!");
}
}
+
+ public int getBufferLen() {
+ return bufferLen;
+ }
}
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java?rev=1545809&r1=1545808&r2=1545809&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
Tue Nov 26 20:14:33 2013
@@ -124,7 +124,7 @@ public class ZKXLogStreamingService exte
if (params.get(ALL_SERVERS_PARAM) != null &&
params.get(ALL_SERVERS_PARAM).length > 0
&& params.get(ALL_SERVERS_PARAM)[0].equals("false")) {
new XLogStreamer(filter, xLogService.getOozieLogPath(),
xLogService.getOozieLogName(),
- xLogService.getOozieLogRotation()).streamLog(writer,
startTime, endTime);
+ xLogService.getOozieLogRotation()).streamLog(writer,
startTime, endTime, bufferLen);
}
// Otherwise, we have to go collate relevant logs from the other
Oozie servers
else {
@@ -201,7 +201,7 @@ public class ZKXLogStreamingService exte
// If it's just the one server (this server), then we don't need
to do any more processing and can just copy it directly
if (parsers.size() == 1) {
TimestampedMessageParser parser = parsers.get(0);
- parser.processRemaining(writer);
+ parser.processRemaining(writer, bufferLen);
}
else {
// Now that we have a Reader for each server to get the logs
from that server, we have to collate them. Within each
@@ -215,11 +215,17 @@ public class ZKXLogStreamingService exte
timestampMap.put(parser.getLastTimestamp(), parser);
}
}
+ int bytesWritten = 0;
while (timestampMap.size() > 1) {
// The first entry will be the earliest based on the
timestamp (also removes it) from the map
TimestampedMessageParser earliestParser =
timestampMap.pollFirstEntry().getValue();
// Write the message from that parser at that timestamp
writer.write(earliestParser.getLastMessage());
+ bytesWritten = earliestParser.getLastMessage().length();
+ if (bytesWritten > bufferLen) {
+ writer.flush();
+ bytesWritten = 0;
+ }
// Increment that parser to read the next message
if (earliestParser.increment()) {
// If it still has messages left, put it back in the
map with the new last timestamp for it
@@ -230,7 +236,7 @@ public class ZKXLogStreamingService exte
if (timestampMap.size() == 1) {
TimestampedMessageParser parser =
timestampMap.values().iterator().next();
writer.write(parser.getLastMessage()); // don't forget
the last message read by the parser
- parser.processRemaining(writer);
+ parser.processRemaining(writer, bufferLen, bytesWritten +
parser.getLastMessage().length());
}
}
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/util/LogUtils.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/util/LogUtils.java?rev=1545809&r1=1545808&r2=1545809&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/util/LogUtils.java
(original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/util/LogUtils.java Tue Nov
26 20:14:33 2013
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,6 +22,7 @@ import org.apache.oozie.CoordinatorActio
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.service.DagXLogInfoService;
import org.apache.oozie.service.XLogService;
@@ -89,6 +90,13 @@ public class LogUtils {
XLog.Info.get().setParameters(logInfo);
}
+ public static void setLogInfo(WorkflowAction action, XLog.Info logInfo) {
+ String actionId = action.getId();
+ logInfo.setParameter(DagXLogInfoService.JOB, actionId.substring(0,
actionId.indexOf("@")));
+ logInfo.setParameter(DagXLogInfoService.ACTION, actionId);
+ XLog.Info.get().setParameters(logInfo);
+ }
+
/**
* Set the log info with the context of the given bundle bean.
*
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/util/TimestampedMessageParser.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/util/TimestampedMessageParser.java?rev=1545809&r1=1545808&r2=1545809&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/util/TimestampedMessageParser.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/util/TimestampedMessageParser.java
Tue Nov 26 20:14:33 2013
@@ -22,6 +22,9 @@ import java.io.IOException;
import java.io.Writer;
import java.util.ArrayList;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.XLogStreamingService;
+
/**
* Encapsulates the parsing and filtering of the log messages from a
BufferedReader. It makes sure not to read in the entire log
* into memory at the same time; at most, it will have two messages (which can
be multi-line in the case of exception stack traces).
@@ -29,7 +32,7 @@ import java.util.ArrayList;
* To use this class: Calling {@link TimestampedMessageParser#increment()}
will tell the parser to read the next message from the
* Reader. It will return true if there are more messages and false if not.
Calling
* {@link TimestampedMessageParser#getLastMessage()} and {@link
TimestampedMessageParser#getLastTimestamp()} will return the last
- * message and timestamp, respectively, that were parsed when {@link
TimestampedMessageParser#increment()} was called. Calling
+ * message and timestamp, respectively, that were parsed when {@link
TimestampedMessageParser#increment()} was called. Calling
* {@link TimestampedMessageParser#processRemaining(java.io.Writer)} will
write the remaining log messages to the given Writer.
*/
public class TimestampedMessageParser {
@@ -95,7 +98,6 @@ public class TimestampedMessageParser {
return true;
}
-
/**
* Returns the timestamp from the last message that was parsed.
*
@@ -160,14 +162,47 @@ public class TimestampedMessageParser {
}
/**
- * Writes the remaining log messages to the passed in Writer.
+ * Streams log messages to the passed in Writer. Flushes the log writing
+ * based on buffer len
*
* @param writer
+ * @param bufferLen maximum len of log buffer
+ * @param bytesWritten num bytes already written to writer
* @throws IOException
*/
- public void processRemaining(Writer writer) throws IOException {
- while(increment()) {
- writer.write(getLastMessage());
+ public void processRemaining(Writer writer, int bufferLen, int
bytesWritten) throws IOException {
+ while (increment()) {
+ writer.write(lastMessage);
+ bytesWritten += lastMessage.length();
+ if (bytesWritten > bufferLen) {
+ writer.flush();
+ bytesWritten = 0;
+ }
}
+ writer.flush();
+ }
+
+ /**
+ * Streams log messages to the passed in Writer, with zero bytes already
+ * written
+ *
+ * @param writer
+ * @param bufferLen maximum len of log buffer
+ * @throws IOException
+ */
+ public void processRemaining(Writer writer, int bufferLen) throws
IOException {
+ processRemaining(writer, bufferLen, 0);
+ }
+
+ /**
+ * Streams log messages to the passed in Writer, with default buffer len 4K
+ * and zero bytes already written
+ *
+ * @param writer
+ * @throws IOException
+ */
+ public void processRemaining(Writer writer) throws IOException {
+ processRemaining(writer,
Services.get().get(XLogStreamingService.class).getBufferLen());
}
+
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/util/XLogStreamer.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/util/XLogStreamer.java?rev=1545809&r1=1545808&r2=1545809&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/util/XLogStreamer.java
(original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/util/XLogStreamer.java Tue
Nov 26 20:14:33 2013
@@ -31,6 +31,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.google.common.annotations.VisibleForTesting;
+
import java.io.BufferedReader;
/**
@@ -204,17 +205,16 @@ public class XLogStreamer {
* @param endTime
* @throws IOException
*/
- public void streamLog(Writer writer, Date startTime, Date endTime) throws
IOException {
+ public void streamLog(Writer writer, Date startTime, Date endTime, int
bufferLen) throws IOException {
// Get a Reader for the log file(s)
BufferedReader reader = makeReader(startTime, endTime);
try {
// Process the entire logs from the reader using the logFilter
- new TimestampedMessageParser(reader,
logFilter).processRemaining(writer);
+ new TimestampedMessageParser(reader,
logFilter).processRemaining(writer, bufferLen);
}
finally {
reader.close();
}
- writer.flush();
}
/**
Modified: oozie/trunk/core/src/main/resources/oozie-default.xml
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/resources/oozie-default.xml?rev=1545809&r1=1545808&r2=1545809&view=diff
==============================================================================
--- oozie/trunk/core/src/main/resources/oozie-default.xml (original)
+++ oozie/trunk/core/src/main/resources/oozie-default.xml Tue Nov 26 20:14:33
2013
@@ -146,6 +146,12 @@
</description>
</property>
+ <property>
+ <name>oozie.service.XLogStreamingService.buffer.len</name>
+ <value>4096</value>
+ <description>4K buffer for streaming the logs
progressively</description>
+ </property>
+
<!-- HCatAccessorService -->
<property>
<name>oozie.service.HCatAccessorService.jmsconnections</name>
Modified:
oozie/trunk/core/src/test/java/org/apache/oozie/util/TestLogStreamer.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/util/TestLogStreamer.java?rev=1545809&r1=1545808&r2=1545809&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/util/TestLogStreamer.java
(original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/util/TestLogStreamer.java
Tue Nov 26 20:14:33 2013
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -139,7 +139,7 @@ public class TestLogStreamer extends XTe
// respectively
StringWriter sw = new StringWriter();
XLogStreamer str = new XLogStreamer(xf, getTestCaseDir(), "oozie.log",
1);
- str.streamLog(sw, new Date(currTime - 10 * 3600000), new Date(currTime
- 5 * 3600000));
+ str.streamLog(sw, new Date(currTime - 10 * 3600000), new Date(currTime
- 5 * 3600000), 4096);
String[] out = sw.toString().split("\n");
// Check if the retrieved log content is of length seven lines after
filtering based on time window, file name
// pattern and parameters like JobId, Username etc. and/or based on
log level like INFO, DEBUG, etc.
@@ -157,7 +157,7 @@ public class TestLogStreamer extends XTe
// and corresponding log content is retrieved properly
StringWriter sw1 = new StringWriter();
XLogStreamer str1 = new XLogStreamer(xf, getTestCaseDir(),
"oozie.log", 1);
- str1.streamLog(sw1, null, null);
+ str1.streamLog(sw1, null, null, 4096);
out = sw1.toString().split("\n");
// Check if the retrieved log content is of length eight lines after
filtering based on time window, file name
// pattern and parameters like JobId, Username etc. and/or based on
log level like INFO, DEBUG, etc.
@@ -224,7 +224,7 @@ public class TestLogStreamer extends XTe
Calendar calendarEntry = Calendar.getInstance();
// Setting start-time to 2012-04-24-19 for log stream (month-1 passed
as parameter since 0=January), and end time is current time
calendarEntry.set(2012, 3, 24, 19, 0);
- str2.streamLog(sw2, calendarEntry.getTime(), new
Date(System.currentTimeMillis()));
+ str2.streamLog(sw2, calendarEntry.getTime(), new
Date(System.currentTimeMillis()), 4096);
String[] out = sw2.toString().split("\n");
// Check if the retrieved log content is of length five lines after
filtering based on time window, file name
@@ -273,7 +273,7 @@ public class TestLogStreamer extends XTe
StringWriter sw = new StringWriter();
XLogStreamer str = new XLogStreamer(xf, getTestCaseDir(), "oozie.log",
1);
- str.streamLog(sw, new Date(currTime - 5000), new Date(currTime +
5000));
+ str.streamLog(sw, new Date(currTime - 5000), new Date(currTime +
5000), 4096);
String[] out = sw.toString().split("\n");
// Check if the retrieved log content is of length five lines after
filtering; we expect the first five lines because the
// filtering shouldn't care whether or not there is a dash while the
last five lines don't pass the normal filtering
Modified: oozie/trunk/release-log.txt
URL:
http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1545809&r1=1545808&r2=1545809&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Tue Nov 26 20:14:33 2013
@@ -1,5 +1,6 @@
-- Oozie 4.1.0 release (trunk - unreleased)
+OOZIE-1474 Fix logging issues - latency, accurate job ids, coord Job UI to
show job logs (mona)
OOZIE-1623 JPAService doesn't need to do reads in a transaction (rkanter)
OOZIE-1612 When printing Dates to log messages, we should make sure they are
in oozie.processing.timezone (gwenshap via rkanter)
OOZIE-1519 Admin command to update the sharelib (puru via ryota)
Modified: oozie/trunk/webapp/src/main/webapp/oozie-console.js
URL:
http://svn.apache.org/viewvc/oozie/trunk/webapp/src/main/webapp/oozie-console.js?rev=1545809&r1=1545808&r2=1545809&view=diff
==============================================================================
--- oozie/trunk/webapp/src/main/webapp/oozie-console.js (original)
+++ oozie/trunk/webapp/src/main/webapp/oozie-console.js Tue Nov 26 20:14:33 2013
@@ -423,7 +423,6 @@ function jobDetailsPopup(response, reque
});
function showActionContextMenu(thisGrid, rowIndex, cellIndex, e) {
- var jobContextMenu = new Ext.menu.Menu('taskContext');
var actionStatus = thisGrid.store.data.items[rowIndex].data;
actionDetailsGridWindow(actionStatus);
function actionDetailsGridWindow(actionStatus) {
@@ -739,9 +738,9 @@ function coordJobDetailsPopup(response,
emptyText: "Loading..."
});
var getLogButton = new Ext.Button({
- text: 'Retrieve log',
+ text: 'Retrieve coord action logs',
handler: function() {
- fetchLogs(coordJobId, actionsTextBox.getValue());
+ fetchLogs(coordJobId, actionsTextBox.getValue());
}
});
var actionsTextBox = new Ext.form.TextField({
@@ -789,7 +788,7 @@ function coordJobDetailsPopup(response,
var responseLength = response.length;
var twentyFiveMB = 25*1024*1024;
if(responseLength > twentyFiveMB) {
- response =
response.substring(responseLength-twentyFiveMB,responseLength)
+ response =
response.substring(responseLength-twentyFiveMB,responseLength);
response =
response.substring(response.indexOf("\n")+1,responseLength);
jobLogArea.setRawValue(response);
}
@@ -974,14 +973,12 @@ function coordJobDetailsPopup(response,
});
function showWorkflowPopup(thisGrid, rowIndex, cellIndex, e) {
- var jobContextMenu = new Ext.menu.Menu('taskContext');
var actionStatus = thisGrid.store.data.items[rowIndex].data;
var workflowId = actionStatus["externalId"];
jobDetailsGridWindow(workflowId);
}
// alert("Coordinator PopUP 4 inside coordDetailsPopup ");
function showCoordActionContextMenu(thisGrid, rowIndex, cellIndex, e) {
- var jobContextMenu = new Ext.menu.Menu('taskContext');
var actionStatus = thisGrid.store.data.items[rowIndex].data;
actionDetailsGridWindow(actionStatus);
function actionDetailsGridWindow(actionStatus) {
@@ -1130,8 +1127,17 @@ function coordJobDetailsPopup(response,
items: [jobLogArea, actionsTextBox, getLogButton],
tbar: [ {
text: " ",
- icon: 'ext-2.2/resources/images/default/grid/refresh.gif'
- }]
+ icon: 'ext-2.2/resources/images/default/grid/refresh.gif',
+ handler: function() {
+ var actionsText = actionsTextBox.getValue();
+ if (actionsText == 'Enter the action list here' ||
actionsText == '') {
+ fetchLogs(coordJobId, '');
+ }
+ else {
+ fetchLogs(coordJobId, actionsText);
+ }
+ }
+ }]
}]
});
@@ -1141,7 +1147,9 @@ function coordJobDetailsPopup(response,
return;
}
if (selectedTab.title == 'Coord Job Log') {
- actionsTextBox.setValue('Enter the action list here');
+ fetchLogs(coordJobId, '');
+ //actionsTextBox.position
+ actionsTextBox.setValue('Enter the action list here');
}
else if (selectedTab.title == 'Coord Job Definition') {
fetchDefinition(coordJobId);
@@ -1827,7 +1835,7 @@ var getSupportedVersions = new Ext.Actio
Ext.Msg.alert('Oozie Console Alert!', 'Server doesn\'t support
client version: v' + getOozieClientVersion());
}
- })
+ });
}
});
@@ -1860,7 +1868,7 @@ var serverVersion = new Ext.Action({
var ret = eval("(" + response.responseText + ")");
serverVersion.setText("<font size='2'>Server version [" +
ret['buildVersion'] + "]</font>");
}
- })
+ });
}
});
@@ -1954,20 +1962,17 @@ var timeZones_store = new Ext.data.JsonS
timeZones_store.proxy.conn.method = "GET";
function showCoordJobContextMenu(thisGrid, rowIndex, cellIndex, e) {
- var jobContextMenu = new Ext.menu.Menu('taskContext');
var coordJobId = thisGrid.store.data.items[rowIndex].data.coordJobId;
coordJobDetailsGridWindow(coordJobId);
}
function initConsole() {
function showJobContextMenu(thisGrid, rowIndex, cellIndex, e) {
- var jobContextMenu = new Ext.menu.Menu('taskContext');
var workflowId = thisGrid.store.data.items[rowIndex].data.id;
jobDetailsGridWindow(workflowId);
}
function showBundleJobContextMenu(thisGrid, rowIndex, cellIndex, e) {
- var jobContextMenu = new Ext.menu.Menu('taskContext');
var bundleJobId = thisGrid.store.data.items[rowIndex].data.bundleJobId;
bundleJobDetailsGridWindow(bundleJobId);
}