Author: mona
Date: Tue Nov 26 20:14:33 2013
New Revision: 1545809

URL: http://svn.apache.org/r1545809
Log:
OOZIE-1474 Fix logging issues - latency, accurate job ids, coord Job UI to show 
job logs (mona)

Modified:
    
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
    
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/ShellActionExecutor.java
    
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java
    
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
    
oozie/trunk/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
    
oozie/trunk/core/src/main/java/org/apache/oozie/service/XLogStreamingService.java
    
oozie/trunk/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
    oozie/trunk/core/src/main/java/org/apache/oozie/util/LogUtils.java
    
oozie/trunk/core/src/main/java/org/apache/oozie/util/TimestampedMessageParser.java
    oozie/trunk/core/src/main/java/org/apache/oozie/util/XLogStreamer.java
    oozie/trunk/core/src/main/resources/oozie-default.xml
    oozie/trunk/core/src/test/java/org/apache/oozie/util/TestLogStreamer.java
    oozie/trunk/release-log.txt
    oozie/trunk/webapp/src/main/webapp/oozie-console.js

Modified: 
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java?rev=1545809&r1=1545808&r2=1545809&view=diff
==============================================================================
--- 
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
 (original)
+++ 
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
 Tue Nov 26 20:14:33 2013
@@ -65,7 +65,7 @@ import org.apache.oozie.service.URIHandl
 import org.apache.oozie.service.WorkflowAppService;
 import org.apache.oozie.servlet.CallbackServlet;
 import org.apache.oozie.util.ELEvaluator;
-import org.apache.oozie.util.IOUtils;
+import org.apache.oozie.util.LogUtils;
 import org.apache.oozie.util.PropertiesUtils;
 import org.apache.oozie.util.XConfiguration;
 import org.apache.oozie.util.XLog;
@@ -109,7 +109,7 @@ public class JavaActionExecutor extends 
     private static final String FAILED = "FAILED";
     private static final String FAILED_KILLED = "FAILED/KILLED";
     private static final String RUNNING = "RUNNING";
-    protected XLog log = XLog.getLog(getClass());
+    protected XLog LOG = XLog.getLog(getClass());
     private static final Pattern heapPattern = 
Pattern.compile("-Xmx(([0-9]+)[mMgG])");
 
     static {
@@ -438,7 +438,7 @@ public class JavaActionExecutor extends 
             return conf;
         }
         catch (Exception ex) {
-            XLog.getLog(getClass()).debug(
+            LOG.debug(
                     "Errors when add to DistributedCache. Path=" + 
uri.toString() + ", archive=" + archive + ", conf="
                             + XmlUtils.prettyPrint(conf).toString());
             throw convertException(ex);
@@ -753,7 +753,7 @@ public class JavaActionExecutor extends 
     private void injectCallback(Context context, Configuration conf) {
         String callback = context.getCallbackUrl("$jobStatus");
         if (conf.get("job.end.notification.url") != null) {
-            XLog.getLog(getClass()).warn("Overriding the action job end 
notification URI");
+            LOG.warn("Overriding the action job end notification URI");
         }
         conf.set("job.end.notification.url", callback);
     }
@@ -790,7 +790,7 @@ public class JavaActionExecutor extends 
             // action job configuration
             Configuration actionConf = createBaseHadoopConf(context, 
actionXml);
             setupActionConf(actionConf, context, actionXml, appPathRoot);
-            XLog.getLog(getClass()).debug("Setting LibFilesArchives ");
+            LOG.debug("Setting LibFilesArchives ");
             setLibFilesArchives(context, actionXml, appPathRoot, actionConf);
 
             String jobName = actionConf.get(HADOOP_JOB_NAME);
@@ -831,7 +831,7 @@ public class JavaActionExecutor extends 
 
             JobConf launcherJobConf = createLauncherConf(actionFs, context, 
action, actionXml, actionConf);
             injectLauncherCallback(context, launcherJobConf);
-            XLog.getLog(getClass()).debug("Creating Job Client for action " + 
action.getId());
+            LOG.debug("Creating Job Client for action " + action.getId());
             jobClient = createJobClient(context, launcherJobConf);
             String launcherId = 
LauncherMapperHelper.getRecoveryId(launcherJobConf, context.getActionDir(), 
context
                     .getRecoveryId());
@@ -850,7 +850,7 @@ public class JavaActionExecutor extends 
                 }
             }
             else {
-                XLog.getLog(getClass()).debug("Submitting the job through Job 
Client for action " + action.getId());
+                LOG.debug("Submitting the job through Job Client for action " 
+ action.getId());
 
                 // setting up propagation of the delegation token.
                 HadoopAccessorService has = 
Services.get().get(HadoopAccessorService.class);
@@ -861,12 +861,12 @@ public class JavaActionExecutor extends 
                 // insert credentials tokens to launcher job conf if needed
                 if (needInjectCredentials()) {
                     for (Token<? extends TokenIdentifier> tk : 
credentialsConf.getCredentials().getAllTokens()) {
-                        log.debug("ADDING TOKEN: " + tk.getKind().toString());
+                        LOG.debug("ADDING TOKEN: " + tk.getKind().toString());
                         
launcherJobConf.getCredentials().addToken(tk.getKind(), tk);
                     }
                 }
                 else {
-                    log.info("No need to inject credentials.");
+                    LOG.info("No need to inject credentials.");
                 }
                 runningJob = jobClient.submitJob(launcherJobConf);
                 if (runningJob == null) {
@@ -874,7 +874,7 @@ public class JavaActionExecutor extends 
                             "Error submitting launcher for action [{0}]", 
action.getId());
                 }
                 launcherId = runningJob.getID().toString();
-                XLog.getLog(getClass()).debug("After submission get the 
launcherId " + launcherId);
+                LOG.debug("After submission get the launcherId " + launcherId);
             }
 
             String jobTracker = launcherJobConf.get(HADOOP_JOB_TRACKER);
@@ -892,7 +892,7 @@ public class JavaActionExecutor extends 
                 }
                 catch (Exception e) {
                     if (exception) {
-                        log.error("JobClient error: ", e);
+                        LOG.error("JobClient error: ", e);
                     }
                     else {
                         throw convertException(e);
@@ -929,20 +929,20 @@ public class JavaActionExecutor extends 
                 for (String key : credPropertiesMap.keySet()) {
                     CredentialsProperties prop = credPropertiesMap.get(key);
                     if (prop != null) {
-                        log.debug("Credential Properties set for action : " + 
action.getId());
+                        LOG.debug("Credential Properties set for action : " + 
action.getId());
                         for (String property : prop.getProperties().keySet()) {
                             actionConf.set(property, 
prop.getProperties().get(property));
-                            log.debug("property : '" + property + "', value : 
'" + prop.getProperties().get(property) + "'");
+                            LOG.debug("property : '" + property + "', value : 
'" + prop.getProperties().get(property) + "'");
                         }
                     }
                 }
             }
             else {
-                log.warn("No credential properties found for action : " + 
action.getId() + ", cred : " + action.getCred());
+                LOG.warn("No credential properties found for action : " + 
action.getId() + ", cred : " + action.getCred());
             }
         }
         else {
-            log.warn("context or action is null");
+            LOG.warn("context or action is null");
         }
         return credPropertiesMap;
     }
@@ -959,10 +959,10 @@ public class JavaActionExecutor extends 
                     Credentials credentialObject = 
credProvider.createCredentialObject();
                     if (credentialObject != null) {
                         credentialObject.addtoJobConf(jobconf, credProps, 
context);
-                        log.debug("Retrieved Credential '" + credName + "' for 
action " + action.getId());
+                        LOG.debug("Retrieved Credential '" + credName + "' for 
action " + action.getId());
                     }
                     else {
-                        log.debug("Credentials object is null for name= " + 
credName + ", type=" + credProps.getType());
+                        LOG.debug("Credentials object is null for name= " + 
credName + ", type=" + credProps.getType());
                         throw new 
ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA020",
                             "Could not load credentials of type [{0}] with 
name [{1}]]; perhaps it was not defined"
                                 + " in oozie-site.xml?", credProps.getType(), 
credName);
@@ -978,7 +978,7 @@ public class JavaActionExecutor extends 
         HashMap<String, CredentialsProperties> props = new HashMap<String, 
CredentialsProperties>();
         if (context != null && action != null) {
             String credsInAction = action.getCred();
-            log.debug("Get credential '" + credsInAction + "' properties for 
action : " + action.getId());
+            LOG.debug("Get credential '" + credsInAction + "' properties for 
action : " + action.getId());
             String[] credNames = credsInAction.split(",");
             for (String credName : credNames) {
                 CredentialsProperties credProps = getCredProperties(context, 
credName);
@@ -986,7 +986,7 @@ public class JavaActionExecutor extends 
             }
         }
         else {
-            log.warn("context or action is null");
+            LOG.warn("context or action is null");
         }
         return props;
     }
@@ -1003,7 +1003,7 @@ public class JavaActionExecutor extends 
             for (Element credential : (List<Element>) 
credentials.getChildren("credential", credentials.getNamespace())) {
                 String name = credential.getAttributeValue("name");
                 String type = credential.getAttributeValue("type");
-                log.debug("getCredProperties: Name: " + name + ", Type: " + 
type);
+                LOG.debug("getCredProperties: Name: " + name + ", Type: " + 
type);
                 if (name.equalsIgnoreCase(credName)) {
                     credProp = new CredentialsProperties(name, type);
                     for (Element property : (List<Element>) 
credential.getChildren("property",
@@ -1018,29 +1018,31 @@ public class JavaActionExecutor extends 
                         propertyValue = eval.evaluate(propertyValue, 
String.class);
 
                         credProp.getProperties().put(propertyName, 
propertyValue);
-                        log.debug("getCredProperties: Properties name :'" + 
propertyName + "', Value : '"
+                        LOG.debug("getCredProperties: Properties name :'" + 
propertyName + "', Value : '"
                                 + propertyValue + "'");
                     }
                 }
             }
         } else {
-            log.warn("credentials is null for the action");
+            LOG.debug("credentials is null for the action");
         }
         return credProp;
     }
 
     @Override
     public void start(Context context, WorkflowAction action) throws 
ActionExecutorException {
+        LOG = XLog.resetPrefix(LOG);
+        LogUtils.setLogInfo(action, new XLog.Info());
         try {
-            XLog.getLog(getClass()).debug("Starting action " + action.getId() 
+ " getting Action File System");
+            LOG.debug("Starting action " + action.getId() + " getting Action 
File System");
             FileSystem actionFs = context.getAppFileSystem();
-            XLog.getLog(getClass()).debug("Preparing action Dir through 
copying " + context.getActionDir());
+            LOG.debug("Preparing action Dir through copying " + 
context.getActionDir());
             prepareActionDir(actionFs, context);
-            XLog.getLog(getClass()).debug("Action Dir is ready. Submitting the 
action ");
+            LOG.debug("Action Dir is ready. Submitting the action ");
             submitLauncher(actionFs, context, action);
-            XLog.getLog(getClass()).debug("Action submit completed. Performing 
check ");
+            LOG.debug("Action submit completed. Performing check ");
             check(context, action);
-            XLog.getLog(getClass()).debug("Action check is done after 
submission");
+            LOG.debug("Action check is done after submission");
         }
         catch (Exception ex) {
             throw convertException(ex);
@@ -1120,14 +1122,14 @@ public class JavaActionExecutor extends 
                                 action.getId());
                     }
                     context.setExternalChildIDs(newId);
-                    XLog.getLog(getClass()).info(XLog.STD, "External ID swap, 
old ID [{0}] new ID [{1}]", launcherId,
+                    LOG.info(XLog.STD, "External ID swap, old ID [{0}] new ID 
[{1}]", launcherId,
                             newId);
                 }
                 else {
                     String externalIDs = 
actionData.get(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS);
                     if (externalIDs != null) {
                         context.setExternalChildIDs(externalIDs);
-                        XLog.getLog(getClass()).info(XLog.STD, "Hadoop Jobs 
launched : [{0}]", externalIDs);
+                        LOG.info(XLog.STD, "Hadoop Jobs launched : [{0}]", 
externalIDs);
                     }
                 }
                 if (runningJob.isComplete()) {
@@ -1135,25 +1137,24 @@ public class JavaActionExecutor extends 
                     if (newId != null) {
                         actionData = 
LauncherMapperHelper.getActionData(actionFs, context.getActionDir(), jobConf);
                     }
-                    XLog.getLog(getClass()).info(XLog.STD, "action completed, 
external ID [{0}]",
+                    LOG.info(XLog.STD, "action completed, external ID [{0}]",
                             action.getExternalId());
                     if (LauncherMapperHelper.isMainSuccessful(runningJob)) {
                         if (getCaptureOutput(action) && 
LauncherMapperHelper.hasOutputData(actionData)) {
                             context.setExecutionData(SUCCEEDED, 
PropertiesUtils.stringToProperties(actionData
                                     
.get(LauncherMapper.ACTION_DATA_OUTPUT_PROPS)));
-                            XLog.getLog(getClass()).info(XLog.STD, "action 
produced output");
+                            LOG.info(XLog.STD, "action produced output");
                         }
                         else {
                             context.setExecutionData(SUCCEEDED, null);
                         }
                         if (LauncherMapperHelper.hasStatsData(actionData)) {
                             
context.setExecutionStats(actionData.get(LauncherMapper.ACTION_DATA_STATS));
-                            XLog.getLog(getClass()).info(XLog.STD, "action 
produced stats");
+                            LOG.info(XLog.STD, "action produced stats");
                         }
                         getActionData(actionFs, runningJob, action, context);
                     }
                     else {
-                        XLog log = XLog.getLog(getClass());
                         String errorReason;
                         if 
(actionData.containsKey(LauncherMapper.ACTION_DATA_ERROR_PROPS)) {
                             Properties props = 
PropertiesUtils.stringToProperties(actionData
@@ -1166,37 +1167,37 @@ public class JavaActionExecutor extends 
                                 errorCode = "JA019";
                             }
                             errorReason = props.getProperty("error.reason");
-                            log.warn("Launcher ERROR, reason: {0}", 
errorReason);
+                            LOG.warn("Launcher ERROR, reason: {0}", 
errorReason);
                             String exMsg = 
props.getProperty("exception.message");
                             String errorInfo = (exMsg != null) ? exMsg : 
errorReason;
                             context.setErrorInfo(errorCode, errorInfo);
                             String exStackTrace = 
props.getProperty("exception.stacktrace");
                             if (exMsg != null) {
-                                log.warn("Launcher exception: {0}{E}{1}", 
exMsg, exStackTrace);
+                                LOG.warn("Launcher exception: {0}{E}{1}", 
exMsg, exStackTrace);
                             }
                         }
                         else {
-                            errorReason = XLog.format("LauncherMapper died, 
check Hadoop log for job [{0}:{1}]", action
+                            errorReason = XLog.format("LauncherMapper died, 
check Hadoop LOG for job [{0}:{1}]", action
                                     .getTrackerUri(), action.getExternalId());
-                            log.warn(errorReason);
+                            LOG.warn(errorReason);
                         }
                         context.setExecutionData(FAILED_KILLED, null);
                     }
                 }
                 else {
                     context.setExternalStatus(RUNNING);
-                    XLog.getLog(getClass()).info(XLog.STD, "checking action, 
external ID [{0}] status [{1}]",
+                    LOG.info(XLog.STD, "checking action, external ID [{0}] 
status [{1}]",
                             action.getExternalId(), 
action.getExternalStatus());
                 }
             }
             else {
                 context.setExternalStatus(RUNNING);
-                XLog.getLog(getClass()).info(XLog.STD, "checking action, 
external ID [{0}] status [{1}]",
+                LOG.info(XLog.STD, "checking action, external ID [{0}] status 
[{1}]",
                         action.getExternalId(), action.getExternalStatus());
             }
         }
         catch (Exception ex) {
-            XLog.getLog(getClass()).warn("Exception in check(). Message[{0}]", 
ex.getMessage(), ex);
+            LOG.warn("Exception in check(). Message[{0}]", ex.getMessage(), 
ex);
             exception = true;
             throw convertException(ex);
         }
@@ -1207,7 +1208,7 @@ public class JavaActionExecutor extends 
                 }
                 catch (Exception e) {
                     if (exception) {
-                        log.error("JobClient error: ", e);
+                        LOG.error("JobClient error: ", e);
                     }
                     else {
                         throw convertException(e);
@@ -1267,7 +1268,7 @@ public class JavaActionExecutor extends 
             }
             catch (Exception ex) {
                 if (exception) {
-                    log.error("Error: ", ex);
+                    LOG.error("Error: ", ex);
                 }
                 else {
                     throw convertException(ex);

Modified: 
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/ShellActionExecutor.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/ShellActionExecutor.java?rev=1545809&r1=1545808&r2=1545809&view=diff
==============================================================================
--- 
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/ShellActionExecutor.java
 (original)
+++ 
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/ShellActionExecutor.java
 Tue Nov 26 20:14:33 2013
@@ -147,7 +147,7 @@ public class ShellActionExecutor extends
             }
             val += appendValue;
             conf.set(propertyName, val);
-            log.debug("action conf is updated with default value for property 
" + propertyName + ", old value :"
+            LOG.debug("action conf is updated with default value for property 
" + propertyName + ", old value :"
                     + conf.get(propertyName, "") + ", new value :" + val);
         }
     }

Modified: 
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java?rev=1545809&r1=1545808&r2=1545809&view=diff
==============================================================================
--- 
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java
 (original)
+++ 
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java
 Tue Nov 26 20:14:33 2013
@@ -178,7 +178,7 @@ public class SqoopActionExecutor extends
                             OOZIE_ACTION_EXTERNAL_STATS_WRITE, "true"))
                             && (statsJsonString.getBytes().length <= 
getMaxExternalStatsSize())) {
                         context.setExecutionStats(statsJsonString);
-                        log.debug(
+                        LOG.debug(
                           "Printing stats for sqoop action as a JSON string : 
[{0}]", statsJsonString);
                     }
                 } else {
@@ -198,7 +198,7 @@ public class SqoopActionExecutor extends
                 }
                 catch (Exception e) {
                     if (exception) {
-                        log.error("JobClient error: ", e);
+                        LOG.error("JobClient error: ", e);
                     }
                     else {
                         throw convertException(e);

Modified: 
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java?rev=1545809&r1=1545808&r2=1545809&view=diff
==============================================================================
--- 
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
 (original)
+++ 
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
 Tue Nov 26 20:14:33 2013
@@ -90,7 +90,7 @@ public class CoordActionInputCheckXComma
 
     @Override
     protected Void execute() throws CommandException {
-        LOG.info("[" + actionId + "]::ActionInputCheck:: Action is in WAITING 
state.");
+        LOG.debug("[" + actionId + "]::ActionInputCheck:: Action is in WAITING 
state.");
 
         // this action should only get processed if current time > nominal 
time;
         // otherwise, requeue this action for delay execution;
@@ -421,7 +421,6 @@ public class CoordActionInputCheckXComma
      */
     private boolean checkResolvedUris(Element eAction, StringBuilder 
existList, StringBuilder nonExistList,
             Configuration conf) throws IOException {
-        LOG.info("[" + actionId + "]::ActionInputCheck:: In 
checkResolvedUris...");
         Element inputList = eAction.getChild("input-events", 
eAction.getNamespace());
         if (inputList != null) {
             if (nonExistList.length() > 0) {

Modified: 
oozie/trunk/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/service/CallableQueueService.java?rev=1545809&r1=1545808&r2=1545809&view=diff
==============================================================================
--- 
oozie/trunk/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
 (original)
+++ 
oozie/trunk/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
 Tue Nov 26 20:14:33 2013
@@ -557,7 +557,7 @@ public class CallableQueueService implem
 
     private synchronized boolean queue(CallableWrapper wrapper, boolean 
ignoreQueueSize) {
         if (!ignoreQueueSize && queue.size() >= queueSize) {
-            log.warn("queue if full, ignoring queuing for [{0}]", 
wrapper.getElement());
+            log.warn("queue full, ignoring queuing for [{0}]", 
wrapper.getElement().getKey());
             return false;
         }
         if (!executor.isShutdown()) {
@@ -573,7 +573,7 @@ public class CallableQueueService implem
             }
         }
         else {
-            log.warn("Executor shutting down, ignoring queueing of [{0}]", 
wrapper.getElement());
+            log.warn("Executor shutting down, ignoring queueing of [{0}]", 
wrapper.getElement().getKey());
         }
         return true;
     }

Modified: 
oozie/trunk/core/src/main/java/org/apache/oozie/service/XLogStreamingService.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/service/XLogStreamingService.java?rev=1545809&r1=1545808&r2=1545809&view=diff
==============================================================================
--- 
oozie/trunk/core/src/main/java/org/apache/oozie/service/XLogStreamingService.java
 (original)
+++ 
oozie/trunk/core/src/main/java/org/apache/oozie/service/XLogStreamingService.java
 Tue Nov 26 20:14:33 2013
@@ -20,6 +20,7 @@ package org.apache.oozie.service;
 import org.apache.oozie.util.Instrumentable;
 import org.apache.oozie.util.Instrumentation;
 import org.apache.oozie.util.XLogStreamer;
+
 import java.io.IOException;
 import java.io.Writer;
 import java.util.Map;
@@ -29,7 +30,9 @@ import java.util.Date;
  * Service that performs streaming of log files over Web Services if enabled 
in XLogService
  */
 public class XLogStreamingService implements Service, Instrumentable {
-
+    private static final String CONF_PREFIX = Service.CONF_PREFIX + 
"XLogStreamingService.";
+    public static final String STREAM_BUFFER_LEN = CONF_PREFIX + "buffer.len";
+    protected int bufferLen;
 
     /**
      * Initialize the log streaming service.
@@ -38,6 +41,7 @@ public class XLogStreamingService implem
      * @throws ServiceException thrown if the log streaming service could not 
be initialized.
      */
     public void init(Services services) throws ServiceException {
+        bufferLen = services.getConf().getInt(STREAM_BUFFER_LEN, 4096);
     }
 
     /**
@@ -79,11 +83,15 @@ public class XLogStreamingService implem
         XLogService xLogService = Services.get().get(XLogService.class);
         if (xLogService.getLogOverWS()) {
             new XLogStreamer(filter, xLogService.getOozieLogPath(), 
xLogService.getOozieLogName(),
-                    xLogService.getOozieLogRotation()).streamLog(writer, 
startTime, endTime);
+                    xLogService.getOozieLogRotation()).streamLog(writer, 
startTime, endTime, bufferLen);
         }
         else {
             writer.write("Log streaming disabled!!");
         }
 
     }
+
+    public int getBufferLen() {
+        return bufferLen;
+    }
 }

Modified: 
oozie/trunk/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java?rev=1545809&r1=1545808&r2=1545809&view=diff
==============================================================================
--- 
oozie/trunk/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
 (original)
+++ 
oozie/trunk/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
 Tue Nov 26 20:14:33 2013
@@ -124,7 +124,7 @@ public class ZKXLogStreamingService exte
             if (params.get(ALL_SERVERS_PARAM) != null && 
params.get(ALL_SERVERS_PARAM).length > 0
                     && params.get(ALL_SERVERS_PARAM)[0].equals("false")) {
                 new XLogStreamer(filter, xLogService.getOozieLogPath(), 
xLogService.getOozieLogName(),
-                        xLogService.getOozieLogRotation()).streamLog(writer, 
startTime, endTime);
+                        xLogService.getOozieLogRotation()).streamLog(writer, 
startTime, endTime, bufferLen);
             }
             // Otherwise, we have to go collate relevant logs from the other 
Oozie servers
             else {
@@ -201,7 +201,7 @@ public class ZKXLogStreamingService exte
             // If it's just the one server (this server), then we don't need 
to do any more processing and can just copy it directly
             if (parsers.size() == 1) {
                 TimestampedMessageParser parser = parsers.get(0);
-                parser.processRemaining(writer);
+                parser.processRemaining(writer, bufferLen);
             }
             else {
                 // Now that we have a Reader for each server to get the logs 
from that server, we have to collate them.  Within each
@@ -215,11 +215,17 @@ public class ZKXLogStreamingService exte
                         timestampMap.put(parser.getLastTimestamp(), parser);
                     }
                 }
+                int bytesWritten = 0;
                 while (timestampMap.size() > 1) {
                     // The first entry will be the earliest based on the 
timestamp (also removes it) from the map
                     TimestampedMessageParser earliestParser = 
timestampMap.pollFirstEntry().getValue();
                     // Write the message from that parser at that timestamp
                     writer.write(earliestParser.getLastMessage());
+                    bytesWritten = earliestParser.getLastMessage().length();
+                    if (bytesWritten > bufferLen) {
+                        writer.flush();
+                        bytesWritten = 0;
+                    }
                     // Increment that parser to read the next message
                     if (earliestParser.increment()) {
                         // If it still has messages left, put it back in the 
map with the new last timestamp for it
@@ -230,7 +236,7 @@ public class ZKXLogStreamingService exte
                 if (timestampMap.size() == 1) {
                     TimestampedMessageParser parser = 
timestampMap.values().iterator().next();
                     writer.write(parser.getLastMessage());  // don't forget 
the last message read by the parser
-                    parser.processRemaining(writer);
+                    parser.processRemaining(writer, bufferLen, bytesWritten + 
parser.getLastMessage().length());
                 }
             }
         }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/util/LogUtils.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/util/LogUtils.java?rev=1545809&r1=1545808&r2=1545809&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/util/LogUtils.java 
(original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/util/LogUtils.java Tue Nov 
26 20:14:33 2013
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,6 +22,7 @@ import org.apache.oozie.CoordinatorActio
 import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.service.DagXLogInfoService;
 import org.apache.oozie.service.XLogService;
 
@@ -89,6 +90,13 @@ public class LogUtils {
         XLog.Info.get().setParameters(logInfo);
     }
 
+    public static void setLogInfo(WorkflowAction action, XLog.Info logInfo) {
+        String actionId = action.getId();
+        logInfo.setParameter(DagXLogInfoService.JOB, actionId.substring(0, 
actionId.indexOf("@")));
+        logInfo.setParameter(DagXLogInfoService.ACTION, actionId);
+        XLog.Info.get().setParameters(logInfo);
+    }
+
     /**
      * Set the log info with the context of the given bundle bean.
      *

Modified: 
oozie/trunk/core/src/main/java/org/apache/oozie/util/TimestampedMessageParser.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/util/TimestampedMessageParser.java?rev=1545809&r1=1545808&r2=1545809&view=diff
==============================================================================
--- 
oozie/trunk/core/src/main/java/org/apache/oozie/util/TimestampedMessageParser.java
 (original)
+++ 
oozie/trunk/core/src/main/java/org/apache/oozie/util/TimestampedMessageParser.java
 Tue Nov 26 20:14:33 2013
@@ -22,6 +22,9 @@ import java.io.IOException;
 import java.io.Writer;
 import java.util.ArrayList;
 
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.XLogStreamingService;
+
   /**
  * Encapsulates the parsing and filtering of the log messages from a 
BufferedReader. It makes sure not to read in the entire log
  * into memory at the same time; at most, it will have two messages (which can 
be multi-line in the case of exception stack traces).
@@ -29,7 +32,7 @@ import java.util.ArrayList;
  * To use this class: Calling {@link TimestampedMessageParser#increment()} 
will tell the parser to read the next message from the
  * Reader. It will return true if there are more messages and false if not. 
Calling
  * {@link TimestampedMessageParser#getLastMessage()} and {@link 
TimestampedMessageParser#getLastTimestamp()} will return the last
- * message and timestamp, respectively, that were parsed when {@link 
TimestampedMessageParser#increment()} was called.  Calling
+ * message and timestamp, respectively, that were parsed when {@link 
TimestampedMessageParser#increment()} was called. Calling
  * {@link TimestampedMessageParser#processRemaining(java.io.Writer)} will 
write the remaining log messages to the given Writer.
  */
 public class TimestampedMessageParser {
@@ -95,7 +98,6 @@ public class TimestampedMessageParser {
         return true;
     }
 
-
     /**
      * Returns the timestamp from the last message that was parsed.
      *
@@ -160,14 +162,47 @@ public class TimestampedMessageParser {
     }
 
     /**
-     * Writes the remaining log messages to the passed in Writer.
+     * Streams log messages to the passed in Writer. Flushes the log writing
+     * based on buffer len
      *
      * @param writer
+     * @param bufferLen maximum len of log buffer
+     * @param bytesWritten num bytes already written to writer
      * @throws IOException
      */
-    public void processRemaining(Writer writer) throws IOException {
-        while(increment()) {
-            writer.write(getLastMessage());
+    public void processRemaining(Writer writer, int bufferLen, int 
bytesWritten) throws IOException {
+        while (increment()) {
+            writer.write(lastMessage);
+            bytesWritten += lastMessage.length();
+            if (bytesWritten > bufferLen) {
+                writer.flush();
+                bytesWritten = 0;
+            }
         }
+        writer.flush();
+    }
+
+    /**
+     * Streams log messages to the passed in Writer, with zero bytes already
+     * written
+     *
+     * @param writer
+     * @param bufferLen maximum len of log buffer
+     * @throws IOException
+     */
+    public void processRemaining(Writer writer, int bufferLen) throws 
IOException {
+        processRemaining(writer, bufferLen, 0);
+    }
+
+    /**
+     * Streams log messages to the passed in Writer, with default buffer len 4K
+     * and zero bytes already written
+     *
+     * @param writer
+     * @throws IOException
+     */
+    public void processRemaining(Writer writer) throws IOException {
+        processRemaining(writer, 
Services.get().get(XLogStreamingService.class).getBufferLen());
     }
+
 }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/util/XLogStreamer.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/util/XLogStreamer.java?rev=1545809&r1=1545808&r2=1545809&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/util/XLogStreamer.java 
(original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/util/XLogStreamer.java Tue 
Nov 26 20:14:33 2013
@@ -31,6 +31,7 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import com.google.common.annotations.VisibleForTesting;
+
 import java.io.BufferedReader;
 
 /**
@@ -204,17 +205,16 @@ public class XLogStreamer {
      * @param endTime
      * @throws IOException
      */
-    public void streamLog(Writer writer, Date startTime, Date endTime) throws 
IOException {
+    public void streamLog(Writer writer, Date startTime, Date endTime, int 
bufferLen) throws IOException {
         // Get a Reader for the log file(s)
         BufferedReader reader = makeReader(startTime, endTime);
         try {
             // Process the entire logs from the reader using the logFilter
-            new TimestampedMessageParser(reader, 
logFilter).processRemaining(writer);
+            new TimestampedMessageParser(reader, 
logFilter).processRemaining(writer, bufferLen);
         }
         finally {
             reader.close();
         }
-        writer.flush();
     }
 
     /**

Modified: oozie/trunk/core/src/main/resources/oozie-default.xml
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/resources/oozie-default.xml?rev=1545809&r1=1545808&r2=1545809&view=diff
==============================================================================
--- oozie/trunk/core/src/main/resources/oozie-default.xml (original)
+++ oozie/trunk/core/src/main/resources/oozie-default.xml Tue Nov 26 20:14:33 
2013
@@ -146,6 +146,12 @@
         </description>
     </property>
 
+    <property>
+        <name>oozie.service.XLogStreamingService.buffer.len</name>
+        <value>4096</value>
+        <description>4K buffer for streaming the logs 
progressively</description>
+    </property>
+
  <!-- HCatAccessorService -->
    <property>
         <name>oozie.service.HCatAccessorService.jmsconnections</name>

Modified: 
oozie/trunk/core/src/test/java/org/apache/oozie/util/TestLogStreamer.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/util/TestLogStreamer.java?rev=1545809&r1=1545808&r2=1545809&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/util/TestLogStreamer.java 
(original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/util/TestLogStreamer.java 
Tue Nov 26 20:14:33 2013
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -139,7 +139,7 @@ public class TestLogStreamer extends XTe
         // respectively
         StringWriter sw = new StringWriter();
         XLogStreamer str = new XLogStreamer(xf, getTestCaseDir(), "oozie.log", 
1);
-        str.streamLog(sw, new Date(currTime - 10 * 3600000), new Date(currTime 
- 5 * 3600000));
+        str.streamLog(sw, new Date(currTime - 10 * 3600000), new Date(currTime 
- 5 * 3600000), 4096);
         String[] out = sw.toString().split("\n");
         // Check if the retrieved log content is of length seven lines after 
filtering based on time window, file name
         // pattern and parameters like JobId, Username etc. and/or based on 
log level like INFO, DEBUG, etc.
@@ -157,7 +157,7 @@ public class TestLogStreamer extends XTe
         // and corresponding log content is retrieved properly
         StringWriter sw1 = new StringWriter();
         XLogStreamer str1 = new XLogStreamer(xf, getTestCaseDir(), 
"oozie.log", 1);
-        str1.streamLog(sw1, null, null);
+        str1.streamLog(sw1, null, null, 4096);
         out = sw1.toString().split("\n");
         // Check if the retrieved log content is of length eight lines after 
filtering based on time window, file name
         // pattern and parameters like JobId, Username etc. and/or based on 
log level like INFO, DEBUG, etc.
@@ -224,7 +224,7 @@ public class TestLogStreamer extends XTe
         Calendar calendarEntry = Calendar.getInstance();
         // Setting start-time to 2012-04-24-19 for log stream (month-1 passed 
as parameter since 0=January), and end time is current time
         calendarEntry.set(2012, 3, 24, 19, 0);
-        str2.streamLog(sw2, calendarEntry.getTime(), new 
Date(System.currentTimeMillis()));
+        str2.streamLog(sw2, calendarEntry.getTime(), new 
Date(System.currentTimeMillis()), 4096);
         String[] out = sw2.toString().split("\n");
 
         // Check if the retrieved log content is of length five lines after 
filtering based on time window, file name
@@ -273,7 +273,7 @@ public class TestLogStreamer extends XTe
 
         StringWriter sw = new StringWriter();
         XLogStreamer str = new XLogStreamer(xf, getTestCaseDir(), "oozie.log", 
1);
-        str.streamLog(sw, new Date(currTime - 5000), new Date(currTime + 
5000));
+        str.streamLog(sw, new Date(currTime - 5000), new Date(currTime + 
5000), 4096);
         String[] out = sw.toString().split("\n");
         // Check if the retrieved log content is of length five lines after 
filtering; we expect the first five lines because the
         // filtering shouldn't care whether or not there is a dash while the 
last five lines don't pass the normal filtering

Modified: oozie/trunk/release-log.txt
URL: 
http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1545809&r1=1545808&r2=1545809&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Tue Nov 26 20:14:33 2013
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1474 Fix logging issues - latency, accurate job ids, coord Job UI to 
show job logs (mona)
 OOZIE-1623 JPAService doesn't need to do reads in a transaction (rkanter)
 OOZIE-1612 When printing Dates to log messages, we should make sure they are 
in oozie.processing.timezone (gwenshap via rkanter)
 OOZIE-1519 Admin command to update the sharelib (puru via ryota)

Modified: oozie/trunk/webapp/src/main/webapp/oozie-console.js
URL: 
http://svn.apache.org/viewvc/oozie/trunk/webapp/src/main/webapp/oozie-console.js?rev=1545809&r1=1545808&r2=1545809&view=diff
==============================================================================
--- oozie/trunk/webapp/src/main/webapp/oozie-console.js (original)
+++ oozie/trunk/webapp/src/main/webapp/oozie-console.js Tue Nov 26 20:14:33 2013
@@ -423,7 +423,6 @@ function jobDetailsPopup(response, reque
 
     });
     function showActionContextMenu(thisGrid, rowIndex, cellIndex, e) {
-        var jobContextMenu = new Ext.menu.Menu('taskContext');
         var actionStatus = thisGrid.store.data.items[rowIndex].data;
         actionDetailsGridWindow(actionStatus);
         function actionDetailsGridWindow(actionStatus) {
@@ -739,9 +738,9 @@ function coordJobDetailsPopup(response, 
         emptyText: "Loading..."
     });
     var getLogButton = new Ext.Button({
-           text: 'Retrieve log',
+           text: 'Retrieve coord action logs',
            handler: function() {
-                    fetchLogs(coordJobId, actionsTextBox.getValue());
+            fetchLogs(coordJobId, actionsTextBox.getValue());
            }
     });
     var actionsTextBox = new Ext.form.TextField({
@@ -789,7 +788,7 @@ function coordJobDetailsPopup(response, 
        var responseLength = response.length;
        var twentyFiveMB = 25*1024*1024;
        if(responseLength > twentyFiveMB) {
-           response = 
response.substring(responseLength-twentyFiveMB,responseLength)
+           response = 
response.substring(responseLength-twentyFiveMB,responseLength);
            response = 
response.substring(response.indexOf("\n")+1,responseLength);
            jobLogArea.setRawValue(response);
        }
@@ -974,14 +973,12 @@ function coordJobDetailsPopup(response, 
 
     });
     function showWorkflowPopup(thisGrid, rowIndex, cellIndex, e) {
-        var jobContextMenu = new Ext.menu.Menu('taskContext');
         var actionStatus = thisGrid.store.data.items[rowIndex].data;
         var workflowId = actionStatus["externalId"];
         jobDetailsGridWindow(workflowId);
     }
     // alert("Coordinator PopUP 4 inside coordDetailsPopup ");
     function showCoordActionContextMenu(thisGrid, rowIndex, cellIndex, e) {
-        var jobContextMenu = new Ext.menu.Menu('taskContext');
         var actionStatus = thisGrid.store.data.items[rowIndex].data;
         actionDetailsGridWindow(actionStatus);
         function actionDetailsGridWindow(actionStatus) {
@@ -1130,8 +1127,17 @@ function coordJobDetailsPopup(response, 
           items: [jobLogArea, actionsTextBox, getLogButton],
            tbar: [ {
                 text: "&nbsp;&nbsp;&nbsp;",
-                icon: 'ext-2.2/resources/images/default/grid/refresh.gif'
-                 }]
+                icon: 'ext-2.2/resources/images/default/grid/refresh.gif',
+                handler: function() {
+                    var actionsText = actionsTextBox.getValue();
+                    if (actionsText == 'Enter the action list here' || 
actionsText == '') {
+                        fetchLogs(coordJobId, '');
+                    }
+                    else {
+                        fetchLogs(coordJobId, actionsText);
+                    }
+                }
+           }]
           }]
 });
 
@@ -1141,7 +1147,9 @@ function coordJobDetailsPopup(response, 
             return;
         }
         if (selectedTab.title == 'Coord Job Log') {
-           actionsTextBox.setValue('Enter the action list here');
+            fetchLogs(coordJobId, '');
+            //actionsTextBox.position
+               actionsTextBox.setValue('Enter the action list here');
         }
         else if (selectedTab.title == 'Coord Job Definition') {
             fetchDefinition(coordJobId);
@@ -1827,7 +1835,7 @@ var getSupportedVersions = new Ext.Actio
                 Ext.Msg.alert('Oozie Console Alert!', 'Server doesn\'t support 
client version: v' + getOozieClientVersion());
             }
 
-        })
+        });
     }
 
 });
@@ -1860,7 +1868,7 @@ var serverVersion = new Ext.Action({
                 var ret = eval("(" + response.responseText + ")");
                 serverVersion.setText("<font size='2'>Server version [" + 
ret['buildVersion'] + "]</font>");
             }
-        })
+        });
     }
 });
 
@@ -1954,20 +1962,17 @@ var timeZones_store = new Ext.data.JsonS
 timeZones_store.proxy.conn.method = "GET";
 
 function showCoordJobContextMenu(thisGrid, rowIndex, cellIndex, e) {
-    var jobContextMenu = new Ext.menu.Menu('taskContext');
     var coordJobId = thisGrid.store.data.items[rowIndex].data.coordJobId;
     coordJobDetailsGridWindow(coordJobId);
 }
 
 function initConsole() {
     function showJobContextMenu(thisGrid, rowIndex, cellIndex, e) {
-        var jobContextMenu = new Ext.menu.Menu('taskContext');
         var workflowId = thisGrid.store.data.items[rowIndex].data.id;
         jobDetailsGridWindow(workflowId);
     }
 
     function showBundleJobContextMenu(thisGrid, rowIndex, cellIndex, e) {
-        var jobContextMenu = new Ext.menu.Menu('taskContext');
         var bundleJobId = thisGrid.store.data.items[rowIndex].data.bundleJobId;
         bundleJobDetailsGridWindow(bundleJobId);
     }


Reply via email to