[ 
https://issues.apache.org/jira/browse/FLINK-17969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17125396#comment-17125396
 ] 

Bhagavan commented on FLINK-17969:
----------------------------------

PR: https://github.com/apache/flink/pull/12472

> Enhance Flink (Task) logging to include job name as context diagnostic 
> information
> ----------------------------------------------------------------------------------
>
>                 Key: FLINK-17969
>                 URL: https://issues.apache.org/jira/browse/FLINK-17969
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Task
>    Affects Versions: 1.10.0
>            Reporter: Bhagavan
>            Priority: Trivial
>              Labels: pull-request-available
>
> Problem statement:
> We use a shared session cluster (Standalone/Yarn) to execute jobs. All logs 
> from the cluster are shipped using log aggregation framework 
> (Logstash/Splunk) so that application diagnostic is easier.
> However, we are missing one vital information in the logline. i.e. Job name 
> so that we can filter the logs for a single job.
> Background
> Currently, Flink logging uses SLF4J as API to abstract away from concrete 
> logging implementation (log4j 1.x, Logback or log4j2) and configuration of 
> logging pattern and implementation can be configured at deployment, However, 
> there is no MDC info from framework indicating job context.
> Proposed improvement.
> Add jobName field to Task class so that we can add it as MDC when task thread 
> starts executing.
> Change is trivial and uses SLF4J MDC API.
> With this change, user can customise logging pattern to include MDC (e.g. in 
> Logback %X{jobName} )
> Change required.
> {code:java}
> @@ -319,6 +323,7 @@ public class Task implements Runnable, TaskSlotPayload, 
> TaskActions, PartitionPr
>  
>                 this.jobId = jobInformation.getJobId();
> +               this.jobName = jobInformation.getJobName();
>                 this.vertexId = taskInformation.getJobVertexId();
> @@ -530,8 +535,10 @@ public class Task implements Runnable, TaskSlotPayload, 
> TaskActions, PartitionPr
>         @Override
>         public void run() {
>                 try {
> +                       MDC.put("jobName", this.jobName);
>                         doRun();
>                 } finally {
> +                       MDC.remove("jobName");
>                         terminationFuture.complete(executionState);
>                 }
>         }
> {code}
> if we are in agreement for this small change. Will raise PR.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to