[
https://issues.apache.org/jira/browse/SPARK-53972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Zifei Feng updated SPARK-53972:
-------------------------------
Description:
We have identified a significant performance regression in Apache Spark's
streaming recentProgress method in python notebook starting from version 4.0.0.
The time required to fetch recentProgress increases substantially as the number
of progress records grows, creating a linear or worse scaling issue.
With the following code, it output charts for time it takes to get
recentProgress before and after changes in [this
commit|https://github.com/apache/spark/commit/22eb6c4b0a82b9fcf84fc9952b1f6c41dde9bd8d#diff-4d4ed29d139877b160de444add7ee63cfa7a7577d849ab2686f1aa2d5b4aae64]
```
%python
from datetime import datetime
import time
df = spark.readStream.format("rate").option("rowsPerSecond", 10).load()
q = df.writeStream.format("noop").start()
print("begin waiting for progress")
progress_list = []
time_diff_list = []
numProgress = len(q.recentProgress)
while numProgress < 70 and q.exception() is None:
time.sleep(1)
beforeTime = datetime.now()
print(beforeTime.strftime("%Y-%m-%d %H:%M:%S") +": before we got those
progress: "+str(numProgress))
rep = q.recentProgress
numProgress = len(rep)
afterTime = datetime.now()
print(afterTime.strftime("%Y-%m-%d %H:%M:%S") +": after we got those progress:
"+str(numProgress))
time_diff = (afterTime - beforeTime).total_seconds()
print("Total Time: "+str(time_diff) +" seconds")
progress_list.append(numProgress)
time_diff_list.append(time_diff)
q.stop()
q.awaitTermination()
assert(q.exception() is None)
import pandas as pd
plot_df = pd.DataFrame(\{'numProgress': progress_list, 'time_diff':
time_diff_list})
display(spark.createDataFrame(plot_df).orderBy("numProgress").toPandas().plot.line(x="numProgress",
y="time_diff"))
```
See attachment for the generated graph
was:
We have identified a significant performance regression in Apache Spark's
streaming recentProgress method in python notebook starting from version 4.0.0.
The time required to fetch recentProgress increases substantially as the number
of progress records grows, creating a linear or worse scaling issue.
With the following code, it output charts for time it takes to get
recentProgress before and after changes in [this
commit|https://github.com/apache/spark/commit/22eb6c4b0a82b9fcf84fc9952b1f6c41dde9bd8d#diff-4d4ed29d139877b160de444add7ee63cfa7a7577d849ab2686f1aa2d5b4aae64]
```
%python
from datetime import datetime
import time
df = spark.readStream.format("rate").option("rowsPerSecond", 10).load()
q = df.writeStream.format("noop").start()
print("begin waiting for progress")
progress_list = []
time_diff_list = []
numProgress = len(q.recentProgress)
while numProgress < 70 and q.exception() is None:
time.sleep(1)
beforeTime = datetime.now()
print(beforeTime.strftime("%Y-%m-%d %H:%M:%S") +": before we got those
progress: "+str(numProgress))
rep = q.recentProgress
numProgress = len(rep)
afterTime = datetime.now()
print(afterTime.strftime("%Y-%m-%d %H:%M:%S") +": after we got those progress:
"+str(numProgress))
time_diff = (afterTime - beforeTime).total_seconds()
print("Total Time: "+str(time_diff) +" seconds")
progress_list.append(numProgress)
time_diff_list.append(time_diff)
q.stop()
q.awaitTermination()
assert(q.exception() is None)
import pandas as pd
plot_df = pd.DataFrame(\{'numProgress': progress_list, 'time_diff':
time_diff_list})
display(spark.createDataFrame(plot_df).orderBy("numProgress").toPandas().plot.line(x="numProgress",
y="time_diff"))
```
See environment for screenshot
!image-2025-10-21-09-27-01-081.png!
!image-2025-10-21-09-27-33-612.png!
> Fix recentProgress performance regression
> -----------------------------------------
>
> Key: SPARK-53972
> URL: https://issues.apache.org/jira/browse/SPARK-53972
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 4.0.0, 4.0.1
> Reporter: Zifei Feng
> Priority: Major
> Attachments: Screenshot 2025-10-21 at 9.26.23 AM.png, Screenshot
> 2025-10-21 at 9.26.29 AM.png
>
>
> We have identified a significant performance regression in Apache Spark's
> streaming recentProgress method in python notebook starting from version
> 4.0.0. The time required to fetch recentProgress increases substantially as
> the number of progress records grows, creating a linear or worse scaling
> issue.
> With the following code, it output charts for time it takes to get
> recentProgress before and after changes in [this
> commit|https://github.com/apache/spark/commit/22eb6c4b0a82b9fcf84fc9952b1f6c41dde9bd8d#diff-4d4ed29d139877b160de444add7ee63cfa7a7577d849ab2686f1aa2d5b4aae64]
> ```
> %python
> from datetime import datetime
> import time
> df = spark.readStream.format("rate").option("rowsPerSecond", 10).load()
> q = df.writeStream.format("noop").start()
> print("begin waiting for progress")
> progress_list = []
> time_diff_list = []
> numProgress = len(q.recentProgress)
> while numProgress < 70 and q.exception() is None:
> time.sleep(1)
> beforeTime = datetime.now()
> print(beforeTime.strftime("%Y-%m-%d %H:%M:%S") +": before we got those
> progress: "+str(numProgress))
> rep = q.recentProgress
> numProgress = len(rep)
> afterTime = datetime.now()
> print(afterTime.strftime("%Y-%m-%d %H:%M:%S") +": after we got those
> progress: "+str(numProgress))
> time_diff = (afterTime - beforeTime).total_seconds()
> print("Total Time: "+str(time_diff) +" seconds")
> progress_list.append(numProgress)
> time_diff_list.append(time_diff)
> q.stop()
> q.awaitTermination()
> assert(q.exception() is None)
> import pandas as pd
> plot_df = pd.DataFrame(\{'numProgress': progress_list, 'time_diff':
> time_diff_list})
> display(spark.createDataFrame(plot_df).orderBy("numProgress").toPandas().plot.line(x="numProgress",
> y="time_diff"))
> ```
> See attachment for the generated graph
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]