[
https://issues.apache.org/jira/browse/SPARK-31427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17317874#comment-17317874
]
Nick Hryhoriev commented on SPARK-31427:
----------------------------------------
thx [~kabhwan], yes my only concern read data twice.
But on my load cache is not relevant at all.
But anyway, In my specific case I find an issue, Sort, and Repartition by range
do sampling. Which actually an additional job.
For other cases, I explain why my way ti debugs give weird result.
Mostly because spark SQL too smart, and do unexpected optimization on `show`.
So there is no issue in spark 3 at least.
> Spark Structure streaming read data twice per every micro-batch.
> ----------------------------------------------------------------
>
> Key: SPARK-31427
> URL: https://issues.apache.org/jira/browse/SPARK-31427
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 2.4.3, 2.4.7, 3.0.1, 3.0.2
> Reporter: Nick Hryhoriev
> Priority: Major
>
> I have a very strange issue with spark structure streaming. Spark structure
> streaming creates two spark jobs for every micro-batch. As a result, read
> data from Kafka twice. Here is a simple code snippet.
>
> {code:java}
> import org.apache.hadoop.fs.{FileSystem, Path}
> import org.apache.spark.SparkConf
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.streaming.Trigger
> object CheckHowSparkReadFromKafka {
> def main(args: Array[String]): Unit = {
> val session = SparkSession.builder()
> .config(new SparkConf()
> .setAppName(s"simple read from kafka with repartition")
> .setMaster("local[*]")
> .set("spark.driver.host", "localhost"))
> .getOrCreate()
> val testPath = "/tmp/spark-test"
> FileSystem.get(session.sparkContext.hadoopConfiguration).delete(new
> Path(testPath), true)
> import session.implicits._
> val stream = session
> .readStream
> .format("kafka")
> .option("kafka.bootstrap.servers", "kafka-20002-prod:9092")
> .option("subscribe", "topic")
> .option("maxOffsetsPerTrigger", 1000)
> .option("failOnDataLoss", false)
> .option("startingOffsets", "latest")
> .load()
> .repartitionByRange( $"offset")
> .writeStream
> .option("path", testPath + "/data")
> .option("checkpointLocation", testPath + "/checkpoint")
> .format("parquet")
> .trigger(Trigger.ProcessingTime(10.seconds))
> .start()
> stream.processAllAvailable()
> {code}
> This happens because if {{.repartitionByRange( $"offset")}}, if I remove this
> line, all good. But with spark create two jobs, one with 1 stage just read
> from Kafka, the second with 3 stage read -> shuffle -> write. So the result
> of the first job never used.
> This has a significant impact on performance. Some of my Kafka topics have
> 1550 partitions, so read them twice is a big deal. In case I add cache,
> things going better, but this is not a way for me. In local mode, the first
> job in batch takes less than 0.1 ms, except batch with index 0. But in YARN
> cluster and Messos both jobs fully expected and on my topics take near 1.2
> min.
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]