[ https://issues.apache.org/jira/browse/SPARK-53192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ruifeng Zheng reassigned SPARK-53192: ------------------------------------- Assignee: Robert Dillitz > Greatly improve SparkConnectPlanner performance for Read.DataSource > ------------------------------------------------------------------- > > Key: SPARK-53192 > URL: https://issues.apache.org/jira/browse/SPARK-53192 > Project: Spark > Issue Type: Improvement > Components: Connect > Affects Versions: 3.4.4, 4.1.0, 3.5.6, 4.0.0 > Reporter: Robert Dillitz > Assignee: Robert Dillitz > Priority: Major > Labels: pull-request-available > > I believe we can dramatically improve the performance of the > {{SparkConnectPlanner}} for plans using the same {{Read.DataSource}} > ({{spark.read}}) multiple times (within the same session) by actively caching > them in the [Spark Connect Plan > Cache|https://github.com/apache/spark/commit/a1fc6d57b27d24b832b2f2580e6acd64c4488c62]. > At the moment, every occurrence of a {{Read.DataSource}} issues a separate > analysis of the {{DataSource}}, which leads to us kicking off a new Spark Job > per analysis, if no explicit schema is provided. This leads to very slow plan > translation, because we need to fetch the (meta)data every time. > For example, the following code, unionizing the same CSV file N times, kicks > off N+1 Spark Jobs for the analysis of the final DataFrame in Spark Connect > (compared to exactly 1 for Spark Classic): > {code:scala} > val df = spark.read.csv("abc.csv") > (0 until N).foldLeft(df)((a, _) => a.union(df)).schema {code} > I propose to adjust the Spark Connect Plan Cache to always cache a > {{Read.DataSource}}, even when it is not the root of a relation. This always > reduces the required Spark Jobs for analysis to at most 1 per *unique* > {{DataSource}}. This has the same effect as when one explicitly analyzes the > base {{DataSource}} today to populate the Spark Connect Plan Cache with its > base plan, leading to much greater performance for subsequent queries using > this {{DataSource}}: > {code:scala} > val df = spark.read.csv("abc.csv") > df.schema > (0 until N).foldLeft(df)((a, _) => a.union(df)).schema {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org