[
https://issues.apache.org/jira/browse/SPARK-50908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hyukjin Kwon resolved SPARK-50908.
----------------------------------
Fix Version/s: 4.1.0
Resolution: Fixed
Issue resolved by pull request 49634
[https://github.com/apache/spark/pull/49634]
> Fix flacky
> pyspark.sql.tests.pandas.test_pandas_transform_with_state.TransformWithStateInPandasTests
> ----------------------------------------------------------------------------------------------------
>
> Key: SPARK-50908
> URL: https://issues.apache.org/jira/browse/SPARK-50908
> Project: Spark
> Issue Type: Sub-task
> Components: PySpark, Structured Streaming
> Affects Versions: 4.0.0
> Reporter: Hyukjin Kwon
> Assignee: Hyukjin Kwon
> Priority: Major
> Labels: pull-request-available
> Fix For: 4.1.0
>
>
> https://github.com/apache/spark/actions/runs/12883552117/job/35918143550
> {code}
> ======================================================================
> ERROR [10.759s]: test_value_state_ttl_expiration
> (pyspark.sql.tests.pandas.test_pandas_transform_with_state.TransformWithStateInPandasTests)
> ----------------------------------------------------------------------
> Traceback (most recent call last):
> File
> "/__w/spark/spark/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py",
> line 422, in test_value_state_ttl_expiration
> q.processAllAvailable()
> File "/__w/spark/spark/python/pyspark/sql/streaming/query.py", line 351, in
> processAllAvailable
> return self._jsq.processAllAvailable()
> File
> "/__w/spark/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/java_gateway.py",
> line 1362, in __call__
> return_value = get_return_value(
> File "/__w/spark/spark/python/pyspark/errors/exceptions/captured.py", line
> 253, in deco
> raise converted from None
> pyspark.errors.exceptions.captured.StreamingQueryException: [STREAM_FAILED]
> Query [id = 8e860141-b4c1-445b-be17-7f46f4a8e1dc, runId =
> 8b11487b-67b9-4cfb-9176-a421032ff236] terminated with exception:
> [FOREACH_BATCH_USER_FUNCTION_ERROR] An error occurred in the user provided
> function in foreach batch sink. Reason: An exception was raised by the Python
> Proxy. Return Message: Traceback (most recent call last):
> File
> "/__w/spark/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/clientserver.py",
> line 644, in _call_proxy
> return_value = getattr(self.pool[obj_id], method)(*params)
> File "/__w/spark/spark/python/pyspark/sql/utils.py", line 165, in call
> raise e
> File "/__w/spark/spark/python/pyspark/sql/utils.py", line 162, in call
> self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
> File
> "/__w/spark/spark/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py",
> line 374, in check_results
> assertDataFrameEqual(
> File "/__w/spark/spark/python/pyspark/testing/utils.py", line 1039, in
> assertDataFrameEqual
> assert_rows_equal(actual_list, expected_list, maxErrors=maxErrors,
> showOnlyDiff=showOnlyDiff)
> File "/__w/spark/spark/python/pyspark/testing/utils.py", line 995, in
> assert_rows_equal
> raise PySparkAssertionError(
> pyspark.errors.exceptions.base.PySparkAssertionError: [DIFFERENT_ROWS]
> Results do not match: ( 37.50000 % )
> *** actual ***
> Row(id='count-0', count=3)
> Row(id='count-1', count=3)
> ! Row(id='ttl-count-0', count=2)
> Row(id='ttl-count-1', count=3)
> ! Row(id='ttl-list-state-count-0', count=3)
> Row(id='ttl-list-state-count-1', count=7)
> ! Row(id='ttl-map-state-count-0', count=2)
> Row(id='ttl-map-state-count-1', count=3)
> *** expected ***
> Row(id='count-0', count=3)
> Row(id='count-1', count=3)
> ! Row(id='ttl-count-0', count=1)
> Row(id='ttl-count-1', count=3)
> ! Row(id='ttl-list-state-count-0', count=1)
> Row(id='ttl-list-state-count-1', count=7)
> ! Row(id='ttl-map-state-count-0', count=1)
> Row(id='ttl-map-state-count-1', count=3)
> SQLSTATE: 39000 SQLSTATE: XXKST
> === Streaming Query ===
> Identifier: [id = 8e860141-b4c1-445b-be17-7f46f4a8e1dc, runId =
> 8b11487b-67b9-4cfb-9176-a421032ff236]
> Current Committed Offsets:
> {FileStreamSource[file:/__w/spark/spark/python/target/c80337ee-345b-4c00-b9b5-bfaf49d18854/tmpg_utvcka]:
> {"logOffset":1}}
> Current Available Offsets:
> {FileStreamSource[file:/__w/spark/spark/python/target/c80337ee-345b-4c00-b9b5-bfaf49d18854/tmpg_utvcka]:
> {"logOffset":2}}
> Current State: ACTIVE
> Thread State: RUNNABLE
> Logical Plan:
> ~WriteToMicroBatchDataSourceV1 ForeachBatchSink,
> 8e860141-b4c1-445b-be17-7f46f4a8e1dc, Update
> +- ~TransformWithStateInPandas transformWithStateUDF(id#15428,
> temperature#15429)#15430, 1, [id#15431, count#15432], Update, ProcessingTime,
> false, 0
> :- ~Project [id#15428, id#15428, temperature#15429]
> : +- ~Project [cast(split_values#15425[0] as string) AS id#15428,
> cast(split_values#15425[1] as int) AS temperature#15429]
> : +- ~Project [value#15423, split(value#15423, ,, -1) AS
> split_values#15425]
> : +- ~StreamingExecutionRelation
> FileStreamSource[file:/__w/spark/spark/python/target/c80337ee-345b-4c00-b9b5-bfaf49d18854/tmpg_utvcka],
> [value#15423]
> +- LocalRelation <empty>
> JVM stacktrace:
> org.apache.spark.sql.streaming.StreamingQueryException: An exception was
> raised by the Python Proxy. Return Message: Traceback (most recent call last):
> File
> "/__w/spark/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/clientserver.py",
> line 644, in _call_proxy
> at
> org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:105)
> at
> org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:110)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:124)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:291)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:123)
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:791)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:77)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:233)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:875)
> at
> org.apache.spark.sql.execution.streaming.ProgressContext.reportTimeTaken(ProgressReporter.scala:185)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:875)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$executeOneBatch$2(MicroBatchExecution.scala:393)
> at
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
> at
> org.apache.spark.sql.execution.streaming.ProgressContext.reportTimeTaken(ProgressReporter.scala:185)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.executeOneBatch(MicroBatchExecution.scala:363)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:343)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1$adapted(MicroBatchExecution.scala:343)
> at
> org.apache.spark.sql.execution.streaming.TriggerExecutor.runOneBatch(TriggerExecutor.scala:39)
> at
> org.apache.spark.sql.execution.streaming.TriggerExecutor.runOneBatch$(TriggerExecutor.scala:37)
> at
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.runOneBatch(TriggerExecutor.scala:70)
> at
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:82)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:343)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:337)
> at
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:791)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:311)
> ... 1 more
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]