[ 
https://issues.apache.org/jira/browse/SPARK-26691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vikash Kumar updated SPARK-26691:
---------------------------------
    Summary: WholeStageCodegen after InMemoryTableScan task takes significant 
time and time increases based on the input size  (was: WholeStageCodegen after 
InMemoryTableScan task takes more time and time increases based on the input 
size)

> WholeStageCodegen after InMemoryTableScan task takes significant time and 
> time increases based on the input size
> ----------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-26691
>                 URL: https://issues.apache.org/jira/browse/SPARK-26691
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.3.1
>            Reporter: Vikash Kumar
>            Priority: Major
>         Attachments: DataScale_LkpPolicy_FirstRow_SF1_JobDetail.png, 
> DataScale_LkpPolicy_FirstRow_SF50_JobDetail.png, WholeStageCodegen.PNG
>
>
> Scenario :  I am doing a left outer join between Sreaming dataframe and 
> Static dataframe and writing result to kafka target. Static dataframe is 
> created with Hive Source and Streaming dataframe is created with kafka 
> source. And joining both the dataframe with equal condition. Here is sample 
> program.
>  
> {code:java}
> package com.spark.exec;
> import org.apache.spark._
> import org.apache.spark.rdd._
> import org.apache.spark.storage.StorageLevel._
> import org.apache.spark.sql._
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.functions.{ broadcast => infabroadcast }
> import java.io._
> import java.sql.Timestamp
> import scala.reflect.ClassTag
> import scala.collection.JavaConversions._
> import org.apache.spark.sql.streaming._
> import org.apache.spark.sql.streaming.Trigger._
> import java.util.UUID.randomUUID
> import org.apache.spark.storage.StorageLevel
> object Spark0 {
> def main(s:Array[String]) {
> val sqlContext = SparkSession.builder().enableHiveSupport().getOrCreate()
> import sqlContext.implicits._
> import org.apache.spark.sql.functions.{stddev_samp, var_samp}
> val v1 = 
> sqlContext.readStream.format("kafka").option("kafka.bootstrap.servers", 
> "localhost:49092").option("subscribe", "source").load().toDF();
> val schema = StructType(List(StructField("id", IntegerType, true), 
> StructField("name", StringType, true)))
> val stream = v1.selectExpr("cast (value as string) as json")
> .select(from_json($"json", schema=schema) as "data")
> .select("data.*")
> val static = sqlContext.sql("SELECT hive_lookup.c0 as id, hive_lookup.c1 as 
> name FROM 
> default.hive_lookup").repartition(18).persist(StorageLevel.MEMORY_AND_DISK).toDF;
> val joinDF = stream.join(static, stream.col("id").equalTo(static.col("id")), 
> "left_outer")
> val result = joinDF.selectExpr("to_json(struct(*)) AS value")
> val UUID = randomUUID().toString
> val checkpoint = "/tmp/" + UUID
> result.writeStream.format("kafka").option("kafka.bootstrap.servers", 
> "localhost:49092")
> .option("topic", "target").options(Map(Tuple2("batch.size", "16384"), 
> Tuple2("metadata.fetch.timeout.ms", "10000"), Tuple2("linger.ms", "1000")))
> .option("checkpointLocation", 
> checkpoint).trigger(Trigger.ProcessingTime(20000L)).start()
> val activeStreams = sqlContext.streams.active
> activeStreams.foreach( stream => stream.awaitTermination())
> }
> }
> {code}
>  
> On the static dataframe applied repartition and persist function.
> {code:java}
> val static = sqlContext.sql("SELECT hive_lookup.c0 as id, hive_lookup.c1 as 
> name FROM 
> default.hive_lookup").repartition(18).persist(StorageLevel.MEMORY_AND_DISK).toDF;{code}
> What i observed in Spark UI that WholeStageCodegen after InMemoryTableScan 
> task takes is taking significant amount of time in every batch which degrade 
> the performance. And time increases for large amount of dataset in Hive 
> source (static datafrme). we have already persisted the data after 
> reparation. What is WholeStageCodegen is doing here which is taking 
> significant amount of time based on the hive source dataset? Is this 
> happening as per design?
> Expectation is that when we have partitioned and persisted the data frame in 
> memory or disk than we should just need to read the data from memory and pass 
> it to joiner to join the data.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to