[
https://issues.apache.org/jira/browse/SPARK-26691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Vikash Kumar updated SPARK-26691:
---------------------------------
Summary: WholeStageCodegen after InMemoryTableScan task takes significant
time and time increases based on the input size (was: WholeStageCodegen after
InMemoryTableScan task takes more time and time increases based on the input
size)
> WholeStageCodegen after InMemoryTableScan task takes significant time and
> time increases based on the input size
> ----------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-26691
> URL: https://issues.apache.org/jira/browse/SPARK-26691
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 2.3.1
> Reporter: Vikash Kumar
> Priority: Major
> Attachments: DataScale_LkpPolicy_FirstRow_SF1_JobDetail.png,
> DataScale_LkpPolicy_FirstRow_SF50_JobDetail.png, WholeStageCodegen.PNG
>
>
> Scenario : I am doing a left outer join between Sreaming dataframe and
> Static dataframe and writing result to kafka target. Static dataframe is
> created with Hive Source and Streaming dataframe is created with kafka
> source. And joining both the dataframe with equal condition. Here is sample
> program.
>
> {code:java}
> package com.spark.exec;
> import org.apache.spark._
> import org.apache.spark.rdd._
> import org.apache.spark.storage.StorageLevel._
> import org.apache.spark.sql._
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.functions.{ broadcast => infabroadcast }
> import java.io._
> import java.sql.Timestamp
> import scala.reflect.ClassTag
> import scala.collection.JavaConversions._
> import org.apache.spark.sql.streaming._
> import org.apache.spark.sql.streaming.Trigger._
> import java.util.UUID.randomUUID
> import org.apache.spark.storage.StorageLevel
> object Spark0 {
> def main(s:Array[String]) {
> val sqlContext = SparkSession.builder().enableHiveSupport().getOrCreate()
> import sqlContext.implicits._
> import org.apache.spark.sql.functions.{stddev_samp, var_samp}
> val v1 =
> sqlContext.readStream.format("kafka").option("kafka.bootstrap.servers",
> "localhost:49092").option("subscribe", "source").load().toDF();
> val schema = StructType(List(StructField("id", IntegerType, true),
> StructField("name", StringType, true)))
> val stream = v1.selectExpr("cast (value as string) as json")
> .select(from_json($"json", schema=schema) as "data")
> .select("data.*")
> val static = sqlContext.sql("SELECT hive_lookup.c0 as id, hive_lookup.c1 as
> name FROM
> default.hive_lookup").repartition(18).persist(StorageLevel.MEMORY_AND_DISK).toDF;
> val joinDF = stream.join(static, stream.col("id").equalTo(static.col("id")),
> "left_outer")
> val result = joinDF.selectExpr("to_json(struct(*)) AS value")
> val UUID = randomUUID().toString
> val checkpoint = "/tmp/" + UUID
> result.writeStream.format("kafka").option("kafka.bootstrap.servers",
> "localhost:49092")
> .option("topic", "target").options(Map(Tuple2("batch.size", "16384"),
> Tuple2("metadata.fetch.timeout.ms", "10000"), Tuple2("linger.ms", "1000")))
> .option("checkpointLocation",
> checkpoint).trigger(Trigger.ProcessingTime(20000L)).start()
> val activeStreams = sqlContext.streams.active
> activeStreams.foreach( stream => stream.awaitTermination())
> }
> }
> {code}
>
> On the static dataframe applied repartition and persist function.
> {code:java}
> val static = sqlContext.sql("SELECT hive_lookup.c0 as id, hive_lookup.c1 as
> name FROM
> default.hive_lookup").repartition(18).persist(StorageLevel.MEMORY_AND_DISK).toDF;{code}
> What i observed in Spark UI that WholeStageCodegen after InMemoryTableScan
> task takes is taking significant amount of time in every batch which degrade
> the performance. And time increases for large amount of dataset in Hive
> source (static datafrme). we have already persisted the data after
> reparation. What is WholeStageCodegen is doing here which is taking
> significant amount of time based on the hive source dataset? Is this
> happening as per design?
> Expectation is that when we have partitioned and persisted the data frame in
> memory or disk than we should just need to read the data from memory and pass
> it to joiner to join the data.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]