[ 
https://issues.apache.org/jira/browse/SPARK-38792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17518998#comment-17518998
 ] 

Danny Guinther commented on SPARK-38792:
----------------------------------------

I added yet another kind of dummy job that aims to measure:
 # Time between driver preparing data frame to read and adding a transform to 
said dataframe
 # Time between the driver adding a literal column to the dataframe and the 
executor seeing that literal column
 # Time between the executor seeing the literal column and adding another 
column via udf
 # Time between the executor adding another column via udf and control being 
returned to the driver
 # Time between the driver calling first on the dataframe and control returning 
to the driver

 

The code looks something like this:
{code:java}
  val RecordSchema = StructType(Seq(
    StructField("driverReadEpochMillis", LongType, false)
  ))  def mkSourceDataFrame(sparkSession: SparkSession): DataFrame = {
    sparkSession.createDataFrame(
      
sparkSession.sparkContext.makeRDD(Seq[Row](Row(Instant.now.toEpochMilli)), 1),
      RecordSchema
    )
  }

val df = mkSourceDataFrame(sparkSession)
  .withColumn("driverTransformEpochMillis", lit(Instant.now.toEpochMilli))
  .withColumn("executorTransformEpochMillis", (udf { () => 
Instant.now.toEpochMilli })())
  .withColumn("executorTransformAgainEpochMillis", (udf { () => 
Instant.now.toEpochMilli })())

val count = df.count
val beforeFirstEpochMillis = Instant.now.toEpochMilli
val row = df.first
val afterFirstEpochMillis = Instant.now.toEpochMilli
val driverReadEpochMillis = row.getAs[Long]("driverReadEpochMillis")
val driverTransformEpochMillis = row.getAs[Long]("driverTransformEpochMillis")
val executorTransformEpochMillis = 
row.getAs[Long]("executorTransformEpochMillis")
val executorTransformAgainEpochMillis = 
row.getAs[Long]("executorTransformAgainEpochMillis")
val statsEvent = Map[String, Any](
  "event" -> "executor-timing-debug",
  "driverReadEpochMillis" -> driverReadEpochMillis,
  "driverTransformEpochMillis" -> driverTransformEpochMillis,
  "driverReadToDriverTransformDeltaMillis" -> (driverTransformEpochMillis - 
driverReadEpochMillis), // #1
  "executorTransformEpochMillis" -> executorTransformEpochMillis,
  "driverTransformToExecutorTransformDeltaMillis" -> 
(executorTransformEpochMillis - driverTransformEpochMillis), // #2
  "executorTransformAgainEpochMillis" -> executorTransformAgainEpochMillis,
  "executorTransformToExecutorTransformAgainDeltaMillis" -> 
(executorTransformAgainEpochMillis - executorTransformEpochMillis), // #3
  "driverBeforeFirstEpochMillis" -> beforeFirstEpochMillis,
  "executorTransformAgainToDriverBeforeFirstDeltaMillis" -> 
(beforeFirstEpochMillis - executorTransformAgainEpochMillis), //#4
  "driverAfterFirstEpochMillis" -> afterFirstEpochMillis,
  "driverBeforeFirstEpochMillisToDriverAfterFirstDeltaMillis" -> 
(afterFirstEpochMillis - beforeFirstEpochMillis) // #5
)
 {code}
 

I think the results of running this job on Spark 3.0.1 vs. Spark 3.2.1 help 
narrow down the source of the problem. The results are as follows:
 # Time between driver preparing data frame to read and adding a transform to 
said dataframe
 ## NO CHANGE
 # Time between the driver adding a literal column to the dataframe and the 
executor seeing that literal column
 ## DEFINITE REGRESSION: See attached screenshot named 
executor-timing-debug-number-2.jpg
 # Time between the executor seeing the literal column and adding another 
column via udf
 ## NO CHANGE
 # Time between the executor adding another column via udf and control being 
returned to the driver
 ## DEFINITE REGRESSION: See attached screenshot named 
executor-timing-debug-number-4.jpg
 ## I renamed some metrics so names are a little inconsistent with the example 
code above, but I assure you this is the right data. I added 
executorTransformAgainEpochMillis after the fact so had to rename this metric 
then. This screenshot is from before the metric rename.
 # Time between the driver calling first on the dataframe and control returning 
to the driver
 ## DEFINITE REGRESSION: See attached screenshot named 
executor-timing-debug-number-5.jpg

 

The commonality between all the metrics that suffered regressions are they are 
all points where control transfers from the driver to the executor and/or when 
control transfers from the executor back to the driver.

I wonder if this is something that I can replicate on versions of Spark between 
3.0.1 and 3.2.1 despite the bottleneck caused by 
https://issues.apache.org/jira/browse/SPARK-37391 ; I'll give it a try.

 

Is there anybody out there?

 

> Regression in time executor takes to do work sometime after v3.0.1 ?
> --------------------------------------------------------------------
>
>                 Key: SPARK-38792
>                 URL: https://issues.apache.org/jira/browse/SPARK-38792
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.2.1
>            Reporter: Danny Guinther
>            Priority: Major
>         Attachments: dummy-job-job.jpg, dummy-job-query.png, 
> executor-timing-debug-number-2.jpg, executor-timing-debug-number-4.jpg, 
> executor-timing-debug-number-5.jpg, min-time-way-up.jpg, 
> what-s-up-with-exec-actions.jpg
>
>
> Hello!
> I'm sorry to trouble you with this, but I'm seeing a noticeable regression in 
> performance when upgrading from 3.0.1 to 3.2.1 and I can't pin down why. I 
> don't believe it is specific to my application since the upgrade to 3.0.1 to 
> 3.2.1 is purely a configuration change. I'd guess it presents itself in my 
> application due to the high volume of work my application does, but I could 
> be mistaken.
> The gist is that it seems like the executor actions I'm running suddenly 
> appear to take a lot longer on Spark 3.2.1. I don't have any ability to test 
> versions between 3.0.1 and 3.2.1 because my application was previously 
> blocked from upgrading beyond Spark 3.0.1 by 
> https://issues.apache.org/jira/browse/SPARK-37391 (which I helped to fix).
> Any ideas what might cause this or metrics I might try to gather to pinpoint 
> the problem? I've tried a bunch of the suggestions from 
> [https://spark.apache.org/docs/latest/tuning.html] to see if any of those 
> help, but none of the adjustments I've tried have been fruitful. I also tried 
> to look in [https://spark.apache.org/docs/latest/sql-migration-guide.html] 
> for ideas as to what might have changed to cause this behavior, but haven't 
> seen anything that sticks out as being a possible source of the problem.
> I have attached a graph that shows the drastic change in time taken by 
> executor actions. In the image the blue and purple lines are different kinds 
> of reads using the built-in JDBC data reader and the green line is writes 
> using a custom-built data writer. The deploy to switch from 3.0.1 to 3.2.1 
> occurred at 9AM on the graph. The graph data comes from timing blocks that 
> surround only the calls to dataframe actions, so there shouldn't be anything 
> specific to my application that is suddenly inflating these numbers. The 
> specific actions I'm invoking are: count() (but there's some transforming and 
> caching going on, so it's really more than that); first(); and write().
> The driver process does seem to be seeing more GC churn then with Spark 
> 3.0.1, but I don't think that explains this behavior. The executors don't 
> seem to have any problem with memory or GC and are not overutilized (our 
> pipeline is very read and write heavy, less heavy on transformations, so 
> executors tend to be idle while waiting for various network I/O).
>  
> Thanks in advance for any help!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to