This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new acd6df721661 [SPARK-54478][SPARK-54479][SPARK-54480][SPARK-54484]
Re-enable streaming tests for connect compat test CI
acd6df721661 is described below
commit acd6df721661fd2cfef657bcf708ce5b2be85713
Author: Jungtaek Lim <[email protected]>
AuthorDate: Mon Dec 1 17:00:28 2025 +0900
[SPARK-54478][SPARK-54479][SPARK-54480][SPARK-54484] Re-enable streaming
tests for connect compat test CI
### What changes were proposed in this pull request?
This PR proposes to re-enable streaming tests for connect compatibility
test CI.
### Why are the changes needed?
They were disabled due to failure, but I can't reproduce these failures in
both local and CI after installing zstandard.
Code change to trigger compatibility test CI against test branch + install
zstandard:
https://github.com/apache/spark/compare/master...HeartSaVioR:spark:WIP-investigate-ss-spark-connect-compat-test-failures-master-and-4.0
Code change to re-enable these tests during reproducing:
https://github.com/apache/spark/compare/branch-4.0...HeartSaVioR:spark:branch-4.0-SC-213385
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
GA run with the above reproducer setup:
https://github.com/HeartSaVioR/spark/actions/runs/19807973545/job/56745231698
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #53266 from HeartSaVioR/reenable-streaming-connect-tests.
Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
(cherry picked from commit aecd932c34d03234dc2cdc80468d95dddb121d46)
Signed-off-by: Jungtaek Lim <[email protected]>
---
python/pyspark/sql/streaming/readwriter.py | 7 +++----
.../connect/pandas/test_parity_pandas_grouped_map_with_state.py | 4 ----
.../sql/tests/connect/streaming/test_parity_foreach_batch.py | 4 ----
python/pyspark/sql/tests/test_python_streaming_datasource.py | 4 ----
4 files changed, 3 insertions(+), 16 deletions(-)
diff --git a/python/pyspark/sql/streaming/readwriter.py
b/python/pyspark/sql/streaming/readwriter.py
index aae66994ce64..34af8cd9b070 100644
--- a/python/pyspark/sql/streaming/readwriter.py
+++ b/python/pyspark/sql/streaming/readwriter.py
@@ -1568,7 +1568,6 @@ class DataStreamWriter:
self._jwrite.foreach(jForeachWriter)
return self
- # SPARK-54478: Reenable doctest
def foreachBatch(self, func: Callable[["DataFrame", int], None]) ->
"DataStreamWriter":
"""
Sets the output of the streaming query to be processed using the
provided
@@ -1601,9 +1600,9 @@ class DataStreamWriter:
... my_value = 100
... batch_df.collect()
...
- >>> q = df.writeStream.foreachBatch(func).start() # doctest: +SKIP
- >>> time.sleep(3) # doctest: +SKIP
- >>> q.stop() # doctest: +SKIP
+ >>> q = df.writeStream.foreachBatch(func).start()
+ >>> time.sleep(3)
+ >>> q.stop()
>>> # if in Spark Connect, my_value = -1, else my_value = 100
"""
from py4j.java_gateway import java_import
diff --git
a/python/pyspark/sql/tests/connect/pandas/test_parity_pandas_grouped_map_with_state.py
b/python/pyspark/sql/tests/connect/pandas/test_parity_pandas_grouped_map_with_state.py
index f18a531329ba..2da8b4aa3be8 100644
---
a/python/pyspark/sql/tests/connect/pandas/test_parity_pandas_grouped_map_with_state.py
+++
b/python/pyspark/sql/tests/connect/pandas/test_parity_pandas_grouped_map_with_state.py
@@ -15,7 +15,6 @@
# limitations under the License.
#
import unittest
-import os
from pyspark.sql.tests.pandas.test_pandas_grouped_map_with_state import (
GroupedApplyInPandasWithStateTestsMixin,
@@ -23,9 +22,6 @@ from
pyspark.sql.tests.pandas.test_pandas_grouped_map_with_state import (
from pyspark.testing.connectutils import ReusedConnectTestCase
[email protected](
- os.environ.get("SPARK_SKIP_CONNECT_COMPAT_TESTS") == "1", "SPARK-54479: To
be reenabled"
-)
class GroupedApplyInPandasWithStateTests(
GroupedApplyInPandasWithStateTestsMixin, ReusedConnectTestCase
):
diff --git
a/python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py
b/python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py
index c2a747dd57f3..632fa4628d1b 100644
--- a/python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py
+++ b/python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py
@@ -16,7 +16,6 @@
#
import unittest
-import os
from pyspark.sql.tests.streaming.test_streaming_foreach_batch import
StreamingTestsForeachBatchMixin
from pyspark.testing.connectutils import ReusedConnectTestCase,
should_test_connect
@@ -26,9 +25,6 @@ if should_test_connect:
from pyspark.errors.exceptions.connect import
StreamingPythonRunnerInitializationException
[email protected](
- os.environ.get("SPARK_SKIP_CONNECT_COMPAT_TESTS") == "1", "SPARK-54480: To
be reenabled"
-)
class StreamingForeachBatchParityTests(StreamingTestsForeachBatchMixin,
ReusedConnectTestCase):
def test_streaming_foreach_batch_propagates_python_errors(self):
super().test_streaming_foreach_batch_propagates_python_errors()
diff --git a/python/pyspark/sql/tests/test_python_streaming_datasource.py
b/python/pyspark/sql/tests/test_python_streaming_datasource.py
index dd3669e021e7..fa14b37b57e6 100644
--- a/python/pyspark/sql/tests/test_python_streaming_datasource.py
+++ b/python/pyspark/sql/tests/test_python_streaming_datasource.py
@@ -138,9 +138,6 @@ class BasePythonStreamingDataSourceTestsMixin:
return TestDataSource
- @unittest.skipIf(
- os.environ.get("SPARK_SKIP_CONNECT_COMPAT_TESTS") == "1",
"SPARK-54484: To be reenabled"
- )
def test_stream_reader(self):
self.spark.dataSource.register(self._get_test_data_source())
df = self.spark.readStream.format("TestDataSource").load()
@@ -215,7 +212,6 @@ class BasePythonStreamingDataSourceTestsMixin:
assertDataFrameEqual(df, expected_data)
- @unittest.skipIf(os.environ.get("SPARK_SKIP_CONNECT_COMPAT_TESTS") == "1",
"To be reenabled")
def test_simple_stream_reader(self):
class SimpleStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]