This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0775ea732a38 [SPARK-48611][CORE] Log TID for input split in HadoopRDD 
and NewHadoopRDD
0775ea732a38 is described below

commit 0775ea732a38c668b8727f56929dad5745962f61
Author: Cheng Pan <[email protected]>
AuthorDate: Fri Jun 14 20:21:09 2024 -0700

    [SPARK-48611][CORE] Log TID for input split in HadoopRDD and NewHadoopRDD
    
    ### What changes were proposed in this pull request?
    
    Log `TID` for "input split" in `HadoopRDD` and `NewHadoopRDD`
    
    ### Why are the changes needed?
    
    This change should benefit both structured logging enabled/disabled cases.
    
    When structured logging is disabled, and executor cores > 1, the logs of 
tasks are mixed in stdout, something like
    
    ```
    24/06/12 21:40:10 INFO Executor: Running task 26.0 in stage 2.0 (TID 10)
    24/06/12 21:40:10 INFO Executor: Running task 27.0 in stage 2.0 (TID 11)
    24/06/12 21:40:11 INFO HadoopRDD: Input split: 
hdfs://.../part-00025-53bc40ae-399f-4291-b5ac-617c980deb86-c000:0+124138257
    24/06/12 21:40:11 INFO HadoopRDD: Input split: 
hdfs://.../part-00045-53bc40ae-399f-4291-b5ac-617c980deb86-c000:0+121726684
    ```
    it's hard to say which file is read by which task because they run in 
parallel.
    
    If something goes wrong, the log prints `TID` and exception stack trace, 
the error may related to the input data, sometimes that `exception message` is 
clear enough to show which file that input data comes from, but sometimes not, 
in the latter case, the current log is not clear enough to allow us to identify 
the bad file quickly.
    
    ```
    24/06/12 21:40:18 ERROR Executor: Exception in task 27.0 in stage 2.0 (TID 
11)
    (... exception message)
    (... stacktraces)
    ```
    
    When structured logging is enabled, exposing TID as a LogKey makes the logs 
more selective.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, it supplies additional information in logs.
    
    ### How was this patch tested?
    
    Review, as it only touches log contents.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #46966 from pan3793/SPARK-48611.
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Gengliang Wang <[email protected]>
---
 core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala    | 3 ++-
 core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala | 3 ++-
 2 files changed, 4 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index cbfce378879e..545eafe7a444 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -270,7 +270,8 @@ class HadoopRDD[K, V](
     val iter = new NextIterator[(K, V)] {
 
       private val split = theSplit.asInstanceOf[HadoopPartition]
-      logInfo(log"Input split: ${MDC(INPUT_SPLIT, split.inputSplit)}")
+      logInfo(log"Task (TID ${MDC(TASK_ID, context.taskAttemptId())}) input 
split: " +
+        log"${MDC(INPUT_SPLIT, split.inputSplit)}")
       private val jobConf = getJobConf()
 
       private val inputMetrics = context.taskMetrics().inputMetrics
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 3a1ce4bd1dfd..bf539320b598 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -197,7 +197,8 @@ class NewHadoopRDD[K, V](
   override def compute(theSplit: Partition, context: TaskContext): 
InterruptibleIterator[(K, V)] = {
     val iter = new Iterator[(K, V)] {
       private val split = theSplit.asInstanceOf[NewHadoopPartition]
-      logInfo(log"Input split: ${MDC(INPUT_SPLIT, 
split.serializableHadoopSplit)}")
+      logInfo(log"Task (TID ${MDC(TASK_ID, context.taskAttemptId())}) input 
split: " +
+        log"${MDC(INPUT_SPLIT, split.serializableHadoopSplit)}")
       private val conf = getConf
 
       private val inputMetrics = context.taskMetrics().inputMetrics


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to