This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new acd6df721661 [SPARK-54478][SPARK-54479][SPARK-54480][SPARK-54484] 
Re-enable streaming tests for connect compat test CI
acd6df721661 is described below

commit acd6df721661fd2cfef657bcf708ce5b2be85713
Author: Jungtaek Lim <[email protected]>
AuthorDate: Mon Dec 1 17:00:28 2025 +0900

    [SPARK-54478][SPARK-54479][SPARK-54480][SPARK-54484] Re-enable streaming 
tests for connect compat test CI
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to re-enable streaming tests for connect compatibility 
test CI.
    
    ### Why are the changes needed?
    
    They were disabled due to failure, but I can't reproduce these failures in 
both local and CI after installing zstandard.
    
    Code change to trigger compatibility test CI against test branch + install 
zstandard:
    
https://github.com/apache/spark/compare/master...HeartSaVioR:spark:WIP-investigate-ss-spark-connect-compat-test-failures-master-and-4.0
    
    Code change to re-enable these tests during reproducing:
    
https://github.com/apache/spark/compare/branch-4.0...HeartSaVioR:spark:branch-4.0-SC-213385
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    GA run with the above reproducer setup:
    
https://github.com/HeartSaVioR/spark/actions/runs/19807973545/job/56745231698
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #53266 from HeartSaVioR/reenable-streaming-connect-tests.
    
    Authored-by: Jungtaek Lim <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
    (cherry picked from commit aecd932c34d03234dc2cdc80468d95dddb121d46)
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 python/pyspark/sql/streaming/readwriter.py                         | 7 +++----
 .../connect/pandas/test_parity_pandas_grouped_map_with_state.py    | 4 ----
 .../sql/tests/connect/streaming/test_parity_foreach_batch.py       | 4 ----
 python/pyspark/sql/tests/test_python_streaming_datasource.py       | 4 ----
 4 files changed, 3 insertions(+), 16 deletions(-)

diff --git a/python/pyspark/sql/streaming/readwriter.py 
b/python/pyspark/sql/streaming/readwriter.py
index aae66994ce64..34af8cd9b070 100644
--- a/python/pyspark/sql/streaming/readwriter.py
+++ b/python/pyspark/sql/streaming/readwriter.py
@@ -1568,7 +1568,6 @@ class DataStreamWriter:
         self._jwrite.foreach(jForeachWriter)
         return self
 
-    # SPARK-54478: Reenable doctest
     def foreachBatch(self, func: Callable[["DataFrame", int], None]) -> 
"DataStreamWriter":
         """
         Sets the output of the streaming query to be processed using the 
provided
@@ -1601,9 +1600,9 @@ class DataStreamWriter:
         ...     my_value = 100
         ...     batch_df.collect()
         ...
-        >>> q = df.writeStream.foreachBatch(func).start()  # doctest: +SKIP
-        >>> time.sleep(3)  # doctest: +SKIP
-        >>> q.stop()  # doctest: +SKIP
+        >>> q = df.writeStream.foreachBatch(func).start()
+        >>> time.sleep(3)
+        >>> q.stop()
         >>> # if in Spark Connect, my_value = -1, else my_value = 100
         """
         from py4j.java_gateway import java_import
diff --git 
a/python/pyspark/sql/tests/connect/pandas/test_parity_pandas_grouped_map_with_state.py
 
b/python/pyspark/sql/tests/connect/pandas/test_parity_pandas_grouped_map_with_state.py
index f18a531329ba..2da8b4aa3be8 100644
--- 
a/python/pyspark/sql/tests/connect/pandas/test_parity_pandas_grouped_map_with_state.py
+++ 
b/python/pyspark/sql/tests/connect/pandas/test_parity_pandas_grouped_map_with_state.py
@@ -15,7 +15,6 @@
 # limitations under the License.
 #
 import unittest
-import os
 
 from pyspark.sql.tests.pandas.test_pandas_grouped_map_with_state import (
     GroupedApplyInPandasWithStateTestsMixin,
@@ -23,9 +22,6 @@ from 
pyspark.sql.tests.pandas.test_pandas_grouped_map_with_state import (
 from pyspark.testing.connectutils import ReusedConnectTestCase
 
 
[email protected](
-    os.environ.get("SPARK_SKIP_CONNECT_COMPAT_TESTS") == "1", "SPARK-54479: To 
be reenabled"
-)
 class GroupedApplyInPandasWithStateTests(
     GroupedApplyInPandasWithStateTestsMixin, ReusedConnectTestCase
 ):
diff --git 
a/python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py 
b/python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py
index c2a747dd57f3..632fa4628d1b 100644
--- a/python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py
+++ b/python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py
@@ -16,7 +16,6 @@
 #
 
 import unittest
-import os
 
 from pyspark.sql.tests.streaming.test_streaming_foreach_batch import 
StreamingTestsForeachBatchMixin
 from pyspark.testing.connectutils import ReusedConnectTestCase, 
should_test_connect
@@ -26,9 +25,6 @@ if should_test_connect:
     from pyspark.errors.exceptions.connect import 
StreamingPythonRunnerInitializationException
 
 
[email protected](
-    os.environ.get("SPARK_SKIP_CONNECT_COMPAT_TESTS") == "1", "SPARK-54480: To 
be reenabled"
-)
 class StreamingForeachBatchParityTests(StreamingTestsForeachBatchMixin, 
ReusedConnectTestCase):
     def test_streaming_foreach_batch_propagates_python_errors(self):
         super().test_streaming_foreach_batch_propagates_python_errors()
diff --git a/python/pyspark/sql/tests/test_python_streaming_datasource.py 
b/python/pyspark/sql/tests/test_python_streaming_datasource.py
index dd3669e021e7..fa14b37b57e6 100644
--- a/python/pyspark/sql/tests/test_python_streaming_datasource.py
+++ b/python/pyspark/sql/tests/test_python_streaming_datasource.py
@@ -138,9 +138,6 @@ class BasePythonStreamingDataSourceTestsMixin:
 
         return TestDataSource
 
-    @unittest.skipIf(
-        os.environ.get("SPARK_SKIP_CONNECT_COMPAT_TESTS") == "1", 
"SPARK-54484: To be reenabled"
-    )
     def test_stream_reader(self):
         self.spark.dataSource.register(self._get_test_data_source())
         df = self.spark.readStream.format("TestDataSource").load()
@@ -215,7 +212,6 @@ class BasePythonStreamingDataSourceTestsMixin:
 
         assertDataFrameEqual(df, expected_data)
 
-    @unittest.skipIf(os.environ.get("SPARK_SKIP_CONNECT_COMPAT_TESTS") == "1", 
"To be reenabled")
     def test_simple_stream_reader(self):
         class SimpleStreamReader(SimpleDataSourceStreamReader):
             def initialOffset(self):


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to