This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b7cb9a69dc7 [SPARK-53562][PYTHON][TESTS][FOLLOW-UP] Add more tests 
for `maxBytesPerBatch`
7b7cb9a69dc7 is described below

commit 7b7cb9a69dc7f732dc6f67d37b410bf1cb65bf29
Author: Ruifeng Zheng <ruife...@apache.org>
AuthorDate: Fri Oct 10 12:03:21 2025 +0800

    [SPARK-53562][PYTHON][TESTS][FOLLOW-UP] Add more tests for 
`maxBytesPerBatch`
    
    ### What changes were proposed in this pull request?
    Add more tests for `maxBytesPerBatch`
    
    ### Why are the changes needed?
    to make sure `maxBytesPerBatch` works
    
    ### Does this PR introduce _any_ user-facing change?
    no, test-only
    
    ### How was this patch tested?
    ci
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #52567 from zhengruifeng/test_bytes.
    
    Authored-by: Ruifeng Zheng <ruife...@apache.org>
    Signed-off-by: Ruifeng Zheng <ruife...@apache.org>
---
 .../sql/tests/arrow/test_arrow_grouped_map.py      | 52 ++++++++++++---------
 .../sql/tests/pandas/test_pandas_grouped_map.py    | 54 +++++++++++++---------
 2 files changed, 61 insertions(+), 45 deletions(-)

diff --git a/python/pyspark/sql/tests/arrow/test_arrow_grouped_map.py 
b/python/pyspark/sql/tests/arrow/test_arrow_grouped_map.py
index e1cd507737cf..765bc7ba6fe1 100644
--- a/python/pyspark/sql/tests/arrow/test_arrow_grouped_map.py
+++ b/python/pyspark/sql/tests/arrow/test_arrow_grouped_map.py
@@ -353,33 +353,41 @@ class ApplyInArrowTestsMixin:
         self.assertEqual(df2.join(df2).count(), 1)
 
     def test_arrow_batch_slicing(self):
-        with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": 
1000}):
-            df = self.spark.range(10000000).select(
-                (sf.col("id") % 2).alias("key"), sf.col("id").alias("v")
+        df = self.spark.range(10000000).select(
+            (sf.col("id") % 2).alias("key"), sf.col("id").alias("v")
+        )
+        cols = {f"col_{i}": sf.col("v") + i for i in range(20)}
+        df = df.withColumns(cols)
+
+        def min_max_v(table):
+            return pa.Table.from_pydict(
+                {
+                    "key": [table.column("key")[0].as_py()],
+                    "min": [pc.min(table.column("v")).as_py()],
+                    "max": [pc.max(table.column("v")).as_py()],
+                }
             )
-            cols = {f"col_{i}": sf.col("v") + i for i in range(20)}
-            df = df.withColumns(cols)
 
-            def min_max_v(table):
-                return pa.Table.from_pydict(
+        expected = (
+            df.groupby("key").agg(sf.min("v").alias("min"), 
sf.max("v").alias("max")).sort("key")
+        ).collect()
+
+        int_max = 2147483647
+        for maxRecords, maxBytes in [(1000, int_max), (0, 1048576), (1000, 
1048576)]:
+            with self.subTest(maxRecords=maxRecords, maxBytes=maxBytes):
+                with self.sql_conf(
                     {
-                        "key": [table.column("key")[0].as_py()],
-                        "min": [pc.min(table.column("v")).as_py()],
-                        "max": [pc.max(table.column("v")).as_py()],
+                        "spark.sql.execution.arrow.maxRecordsPerBatch": 
maxRecords,
+                        "spark.sql.execution.arrow.maxBytesPerBatch": maxBytes,
                     }
-                )
+                ):
+                    result = (
+                        df.groupBy("key")
+                        .applyInArrow(min_max_v, "key long, min long, max 
long")
+                        .sort("key")
+                    ).collect()
 
-            result = (
-                df.groupBy("key")
-                .applyInArrow(min_max_v, "key long, min long, max long")
-                .sort("key")
-            )
-            expected = (
-                df.groupby("key")
-                .agg(sf.min("v").alias("min"), sf.max("v").alias("max"))
-                .sort("key")
-            )
-            self.assertEqual(expected.collect(), result.collect())
+                    self.assertEqual(expected, result)
 
     def test_negative_and_zero_batch_size(self):
         for batch_size in [0, -1]:
diff --git a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py 
b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py
index 8756f824a56c..f81c774c0e91 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py
@@ -947,33 +947,41 @@ class ApplyInPandasTestsMixin:
                 self.assertEqual(row[1], 123)
 
     def test_arrow_batch_slicing(self):
-        with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": 
1000}):
-            df = self.spark.range(10000000).select(
-                (sf.col("id") % 2).alias("key"), sf.col("id").alias("v")
+        df = self.spark.range(10000000).select(
+            (sf.col("id") % 2).alias("key"), sf.col("id").alias("v")
+        )
+        cols = {f"col_{i}": sf.col("v") + i for i in range(20)}
+        df = df.withColumns(cols)
+
+        def min_max_v(pdf):
+            return pd.DataFrame(
+                {
+                    "key": [pdf.key.iloc[0]],
+                    "min": [pdf.v.min()],
+                    "max": [pdf.v.max()],
+                }
             )
-            cols = {f"col_{i}": sf.col("v") + i for i in range(20)}
-            df = df.withColumns(cols)
 
-            def min_max_v(pdf):
-                return pd.DataFrame(
+        expected = (
+            df.groupby("key").agg(sf.min("v").alias("min"), 
sf.max("v").alias("max")).sort("key")
+        ).collect()
+
+        int_max = 2147483647
+        for maxRecords, maxBytes in [(1000, int_max), (0, 1048576), (1000, 
1048576)]:
+            with self.subTest(maxRecords=maxRecords, maxBytes=maxBytes):
+                with self.sql_conf(
                     {
-                        "key": [pdf.key.iloc[0]],
-                        "min": [pdf.v.min()],
-                        "max": [pdf.v.max()],
+                        "spark.sql.execution.arrow.maxRecordsPerBatch": 
maxRecords,
+                        "spark.sql.execution.arrow.maxBytesPerBatch": maxBytes,
                     }
-                )
-
-            result = (
-                df.groupBy("key")
-                .applyInPandas(min_max_v, "key long, min long, max long")
-                .sort("key")
-            )
-            expected = (
-                df.groupby("key")
-                .agg(sf.min("v").alias("min"), sf.max("v").alias("max"))
-                .sort("key")
-            )
-            self.assertEqual(expected.collect(), result.collect())
+                ):
+                    result = (
+                        df.groupBy("key")
+                        .applyInPandas(min_max_v, "key long, min long, max 
long")
+                        .sort("key")
+                    ).collect()
+
+                    self.assertEqual(expected, result)
 
     def test_negative_and_zero_batch_size(self):
         for batch_size in [0, -1]:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to