This is an automated email from the ASF dual-hosted git repository.
ashrigondekar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 65336a0a1769 [SPARK-53810][SS][TESTS] Split large TWS python tests
into multiple small tests to speedup the CI
65336a0a1769 is described below
commit 65336a0a17695b2b67082431efdd2ea7e179962e
Author: huanliwang-db <[email protected]>
AuthorDate: Wed Oct 8 10:38:28 2025 -0700
[SPARK-53810][SS][TESTS] Split large TWS python tests into multiple small
tests to speedup the CI
### What changes were proposed in this pull request?
Split large TWS python tests into multiple small tests to speedup the CI
### Why are the changes needed?
CI is slow now, splitting tests can help speed it up and it's easier for
debug
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
test passes
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #52531 from huanliwang-db/huanliwang-db/split-test.
Authored-by: huanliwang-db <[email protected]>
Signed-off-by: Anish Shrigondekar <[email protected]>
---
dev/sparktestsupport/modules.py | 2 +
.../test_parity_pandas_transform_with_state.py | 31 -----------
.../test_parity_transform_with_state_pyspark.py} | 33 +----------
.../pandas/test_pandas_transform_with_state.py | 28 ----------
...st_pandas_transform_with_state_checkpoint_v2.py | 64 ++++++++++++++++++++++
5 files changed, 67 insertions(+), 91 deletions(-)
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index bc30c7740e39..2397bc9c7962 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -565,6 +565,7 @@ pyspark_sql = Module(
"pyspark.sql.tests.pandas.test_pandas_grouped_map_with_state",
"pyspark.sql.tests.pandas.test_pandas_map",
"pyspark.sql.tests.pandas.test_pandas_transform_with_state",
+
"pyspark.sql.tests.pandas.test_pandas_transform_with_state_checkpoint_v2",
"pyspark.sql.tests.pandas.test_pandas_udf",
"pyspark.sql.tests.pandas.test_pandas_udf_grouped_agg",
"pyspark.sql.tests.pandas.test_pandas_udf_scalar",
@@ -1125,6 +1126,7 @@ pyspark_connect = Module(
"pyspark.sql.tests.connect.streaming.test_parity_listener",
"pyspark.sql.tests.connect.streaming.test_parity_foreach",
"pyspark.sql.tests.connect.streaming.test_parity_foreach_batch",
+
"pyspark.sql.tests.connect.streaming.test_parity_transform_with_state_pyspark",
"pyspark.sql.tests.connect.test_resources",
"pyspark.sql.tests.connect.shell.test_progress",
"pyspark.sql.tests.connect.test_df_debug",
diff --git
a/python/pyspark/sql/tests/connect/pandas/test_parity_pandas_transform_with_state.py
b/python/pyspark/sql/tests/connect/pandas/test_parity_pandas_transform_with_state.py
index e772c2139326..334031ec362f 100644
---
a/python/pyspark/sql/tests/connect/pandas/test_parity_pandas_transform_with_state.py
+++
b/python/pyspark/sql/tests/connect/pandas/test_parity_pandas_transform_with_state.py
@@ -18,7 +18,6 @@ import unittest
from pyspark.sql.tests.pandas.test_pandas_transform_with_state import (
TransformWithStateInPandasTestsMixin,
- TransformWithStateInPySparkTestsMixin,
)
from pyspark import SparkConf
from pyspark.testing.connectutils import ReusedConnectTestCase
@@ -54,36 +53,6 @@ class TransformWithStateInPandasParityTests(
pass
-class TransformWithStateInPySparkParityTests(
- TransformWithStateInPySparkTestsMixin, ReusedConnectTestCase
-):
- """
- Spark connect parity tests for TransformWithStateInPySpark. Run every test
case in
- `TransformWithStateInPySparkTestsMixin` in spark connect mode.
- """
-
- @classmethod
- def conf(cls):
- # Due to multiple inheritance from the same level, we need to
explicitly setting configs in
- # both TransformWithStateInPySparkTestsMixin and ReusedConnectTestCase
here
- cfg = SparkConf(loadDefaults=False)
- for base in cls.__bases__:
- if hasattr(base, "conf"):
- parent_cfg = base.conf()
- for k, v in parent_cfg.getAll():
- cfg.set(k, v)
-
- # Extra removing config for connect suites
- if cfg._jconf is not None:
- cfg._jconf.remove("spark.master")
-
- return cfg
-
- @unittest.skip("Flaky in spark connect on CI. Skip for now. See
SPARK-51368 for details.")
- def test_schema_evolution_scenarios(self):
- pass
-
-
if __name__ == "__main__":
from
pyspark.sql.tests.connect.pandas.test_parity_pandas_transform_with_state import
* # noqa: F401,E501
diff --git
a/python/pyspark/sql/tests/connect/pandas/test_parity_pandas_transform_with_state.py
b/python/pyspark/sql/tests/connect/streaming/test_parity_transform_with_state_pyspark.py
similarity index 65%
copy from
python/pyspark/sql/tests/connect/pandas/test_parity_pandas_transform_with_state.py
copy to
python/pyspark/sql/tests/connect/streaming/test_parity_transform_with_state_pyspark.py
index e772c2139326..7209b94a5060 100644
---
a/python/pyspark/sql/tests/connect/pandas/test_parity_pandas_transform_with_state.py
+++
b/python/pyspark/sql/tests/connect/streaming/test_parity_transform_with_state_pyspark.py
@@ -17,43 +17,12 @@
import unittest
from pyspark.sql.tests.pandas.test_pandas_transform_with_state import (
- TransformWithStateInPandasTestsMixin,
TransformWithStateInPySparkTestsMixin,
)
from pyspark import SparkConf
from pyspark.testing.connectutils import ReusedConnectTestCase
-class TransformWithStateInPandasParityTests(
- TransformWithStateInPandasTestsMixin, ReusedConnectTestCase
-):
- """
- Spark connect parity tests for TransformWithStateInPandas. Run every test
case in
- `TransformWithStateInPandasTestsMixin` in spark connect mode.
- """
-
- @classmethod
- def conf(cls):
- # Due to multiple inheritance from the same level, we need to
explicitly setting configs in
- # both TransformWithStateInPandasTestsMixin and ReusedConnectTestCase
here
- cfg = SparkConf(loadDefaults=False)
- for base in cls.__bases__:
- if hasattr(base, "conf"):
- parent_cfg = base.conf()
- for k, v in parent_cfg.getAll():
- cfg.set(k, v)
-
- # Extra removing config for connect suites
- if cfg._jconf is not None:
- cfg._jconf.remove("spark.master")
-
- return cfg
-
- @unittest.skip("Flaky in spark connect on CI. Skip for now. See
SPARK-51368 for details.")
- def test_schema_evolution_scenarios(self):
- pass
-
-
class TransformWithStateInPySparkParityTests(
TransformWithStateInPySparkTestsMixin, ReusedConnectTestCase
):
@@ -85,7 +54,7 @@ class TransformWithStateInPySparkParityTests(
if __name__ == "__main__":
- from
pyspark.sql.tests.connect.pandas.test_parity_pandas_transform_with_state import
* # noqa: F401,E501
+ from
pyspark.sql.tests.connect.streaming.test_parity_transform_with_state_pyspark
import * # noqa: F401,E501
try:
import xmlrunner
diff --git
a/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py
b/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py
index 576c0cf6e6e1..d5b1c737386a 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py
@@ -2011,22 +2011,6 @@ class
TransformWithStateInPandasTestsMixin(TransformWithStateTestsMixin):
return cfg
-class
TransformWithStateInPandasWithCheckpointV2TestsMixin(TransformWithStateInPandasTestsMixin):
- @classmethod
- def conf(cls):
- cfg = super().conf()
- cfg.set("spark.sql.streaming.stateStore.checkpointFormatVersion", "2")
- return cfg
-
-
-class
TransformWithStateInPySparkWithCheckpointV2TestsMixin(TransformWithStateInPySparkTestsMixin):
- @classmethod
- def conf(cls):
- cfg = super().conf()
- cfg.set("spark.sql.streaming.stateStore.checkpointFormatVersion", "2")
- return cfg
-
-
class TransformWithStateInPandasTests(TransformWithStateInPandasTestsMixin,
ReusedSQLTestCase):
pass
@@ -2035,18 +2019,6 @@ class
TransformWithStateInPySparkTests(TransformWithStateInPySparkTestsMixin, Re
pass
-class TransformWithStateInPandasWithCheckpointV2Tests(
- TransformWithStateInPandasWithCheckpointV2TestsMixin, ReusedSQLTestCase
-):
- pass
-
-
-class TransformWithStateInPySparkWithCheckpointV2Tests(
- TransformWithStateInPySparkWithCheckpointV2TestsMixin, ReusedSQLTestCase
-):
- pass
-
-
if __name__ == "__main__":
from pyspark.sql.tests.pandas.test_pandas_transform_with_state import * #
noqa: F401
diff --git
a/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state_checkpoint_v2.py
b/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state_checkpoint_v2.py
new file mode 100644
index 000000000000..d6609c44db62
--- /dev/null
+++
b/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state_checkpoint_v2.py
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+
+from pyspark.testing.sqlutils import ReusedSQLTestCase
+from pyspark.sql.tests.pandas.test_pandas_transform_with_state import (
+ TransformWithStateInPandasTestsMixin,
+ TransformWithStateInPySparkTestsMixin,
+)
+
+
+class
TransformWithStateInPandasWithCheckpointV2TestsMixin(TransformWithStateInPandasTestsMixin):
+ @classmethod
+ def conf(cls):
+ cfg = super().conf()
+ cfg.set("spark.sql.streaming.stateStore.checkpointFormatVersion", "2")
+ return cfg
+
+
+class
TransformWithStateInPySparkWithCheckpointV2TestsMixin(TransformWithStateInPySparkTestsMixin):
+ @classmethod
+ def conf(cls):
+ cfg = super().conf()
+ cfg.set("spark.sql.streaming.stateStore.checkpointFormatVersion", "2")
+ return cfg
+
+
+class TransformWithStateInPandasWithCheckpointV2Tests(
+ TransformWithStateInPandasWithCheckpointV2TestsMixin, ReusedSQLTestCase
+):
+ pass
+
+
+class TransformWithStateInPySparkWithCheckpointV2Tests(
+ TransformWithStateInPySparkWithCheckpointV2TestsMixin, ReusedSQLTestCase
+):
+ pass
+
+
+if __name__ == "__main__":
+ from
pyspark.sql.tests.pandas.test_pandas_transform_with_state_checkpoint_v2 import
* # noqa: F401,E501
+
+ try:
+ import xmlrunner
+
+ testRunner = xmlrunner.XMLTestRunner(output="target/test-reports",
verbosity=2)
+ except ImportError:
+ testRunner = None
+ unittest.main(testRunner=testRunner, verbosity=2)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]