This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 34026d1f54ed [SPARK-42929][CONNECT][PYTHON][TEST] test barrier mode
for mapInPandas/mapInArrow
34026d1f54ed is described below
commit 34026d1f54edd7710a87ffff2e27b5c90e27015d
Author: Bobby Wang <[email protected]>
AuthorDate: Wed Feb 28 17:01:05 2024 +0900
[SPARK-42929][CONNECT][PYTHON][TEST] test barrier mode for
mapInPandas/mapInArrow
### What changes were proposed in this pull request?
Add barrier mode tests for mapInPandas and mapInArrow.
### Why are the changes needed?
This is the follow-up of https://github.com/apache/spark/pull/40559
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
The newly added tests can pass the CIs
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #45310 from wbo4958/barrier-test.
Authored-by: Bobby Wang <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/pyspark/sql/tests/pandas/test_pandas_map.py | 25 ++++++++++++++++++++++
python/pyspark/sql/tests/test_arrow_map.py | 25 ++++++++++++++++++++++
2 files changed, 50 insertions(+)
diff --git a/python/pyspark/sql/tests/pandas/test_pandas_map.py
b/python/pyspark/sql/tests/pandas/test_pandas_map.py
index 8f7229e1d74f..b8dfe8314a72 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_map.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_map.py
@@ -401,6 +401,31 @@ class MapInPandasTestsMixin:
finally:
shutil.rmtree(path)
+ def test_map_in_pandas_with_barrier_mode(self):
+ df = self.spark.range(10)
+
+ def func1(iterator):
+ from pyspark import TaskContext, BarrierTaskContext
+
+ tc = TaskContext.get()
+ assert tc is not None
+ assert not isinstance(tc, BarrierTaskContext)
+ for batch in iterator:
+ yield batch
+
+ df.mapInPandas(func1, "id long", False).collect()
+
+ def func2(iterator):
+ from pyspark import TaskContext, BarrierTaskContext
+
+ tc = TaskContext.get()
+ assert tc is not None
+ assert isinstance(tc, BarrierTaskContext)
+ for batch in iterator:
+ yield batch
+
+ df.mapInPandas(func2, "id long", True).collect()
+
class MapInPandasTests(ReusedSQLTestCase, MapInPandasTestsMixin):
@classmethod
diff --git a/python/pyspark/sql/tests/test_arrow_map.py
b/python/pyspark/sql/tests/test_arrow_map.py
index 15367743585e..0c2d5f36d0f2 100644
--- a/python/pyspark/sql/tests/test_arrow_map.py
+++ b/python/pyspark/sql/tests/test_arrow_map.py
@@ -146,6 +146,31 @@ class MapInArrowTestsMixin(object):
expected = df1.join(df1).collect()
self.assertEqual(sorted(actual), sorted(expected))
+ def test_map_in_arrow_with_barrier_mode(self):
+ df = self.spark.range(10)
+
+ def func1(iterator):
+ from pyspark import TaskContext, BarrierTaskContext
+
+ tc = TaskContext.get()
+ assert tc is not None
+ assert not isinstance(tc, BarrierTaskContext)
+ for batch in iterator:
+ yield batch
+
+ df.mapInArrow(func1, "id long", False).collect()
+
+ def func2(iterator):
+ from pyspark import TaskContext, BarrierTaskContext
+
+ tc = TaskContext.get()
+ assert tc is not None
+ assert isinstance(tc, BarrierTaskContext)
+ for batch in iterator:
+ yield batch
+
+ df.mapInArrow(func2, "id long", True).collect()
+
class MapInArrowTests(MapInArrowTestsMixin, ReusedSQLTestCase):
@classmethod
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]