This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 7b7cb9a69dc7 [SPARK-53562][PYTHON][TESTS][FOLLOW-UP] Add more tests for `maxBytesPerBatch` 7b7cb9a69dc7 is described below commit 7b7cb9a69dc7f732dc6f67d37b410bf1cb65bf29 Author: Ruifeng Zheng <ruife...@apache.org> AuthorDate: Fri Oct 10 12:03:21 2025 +0800 [SPARK-53562][PYTHON][TESTS][FOLLOW-UP] Add more tests for `maxBytesPerBatch` ### What changes were proposed in this pull request? Add more tests for `maxBytesPerBatch` ### Why are the changes needed? to make sure `maxBytesPerBatch` works ### Does this PR introduce _any_ user-facing change? no, test-only ### How was this patch tested? ci ### Was this patch authored or co-authored using generative AI tooling? no Closes #52567 from zhengruifeng/test_bytes. Authored-by: Ruifeng Zheng <ruife...@apache.org> Signed-off-by: Ruifeng Zheng <ruife...@apache.org> --- .../sql/tests/arrow/test_arrow_grouped_map.py | 52 ++++++++++++--------- .../sql/tests/pandas/test_pandas_grouped_map.py | 54 +++++++++++++--------- 2 files changed, 61 insertions(+), 45 deletions(-) diff --git a/python/pyspark/sql/tests/arrow/test_arrow_grouped_map.py b/python/pyspark/sql/tests/arrow/test_arrow_grouped_map.py index e1cd507737cf..765bc7ba6fe1 100644 --- a/python/pyspark/sql/tests/arrow/test_arrow_grouped_map.py +++ b/python/pyspark/sql/tests/arrow/test_arrow_grouped_map.py @@ -353,33 +353,41 @@ class ApplyInArrowTestsMixin: self.assertEqual(df2.join(df2).count(), 1) def test_arrow_batch_slicing(self): - with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": 1000}): - df = self.spark.range(10000000).select( - (sf.col("id") % 2).alias("key"), sf.col("id").alias("v") + df = self.spark.range(10000000).select( + (sf.col("id") % 2).alias("key"), sf.col("id").alias("v") + ) + cols = {f"col_{i}": sf.col("v") + i for i in range(20)} + df = df.withColumns(cols) + + def min_max_v(table): + return pa.Table.from_pydict( + { + "key": [table.column("key")[0].as_py()], + "min": [pc.min(table.column("v")).as_py()], + "max": [pc.max(table.column("v")).as_py()], + } ) - cols = {f"col_{i}": sf.col("v") + i for i in range(20)} - df = df.withColumns(cols) - def min_max_v(table): - return pa.Table.from_pydict( + expected = ( + df.groupby("key").agg(sf.min("v").alias("min"), sf.max("v").alias("max")).sort("key") + ).collect() + + int_max = 2147483647 + for maxRecords, maxBytes in [(1000, int_max), (0, 1048576), (1000, 1048576)]: + with self.subTest(maxRecords=maxRecords, maxBytes=maxBytes): + with self.sql_conf( { - "key": [table.column("key")[0].as_py()], - "min": [pc.min(table.column("v")).as_py()], - "max": [pc.max(table.column("v")).as_py()], + "spark.sql.execution.arrow.maxRecordsPerBatch": maxRecords, + "spark.sql.execution.arrow.maxBytesPerBatch": maxBytes, } - ) + ): + result = ( + df.groupBy("key") + .applyInArrow(min_max_v, "key long, min long, max long") + .sort("key") + ).collect() - result = ( - df.groupBy("key") - .applyInArrow(min_max_v, "key long, min long, max long") - .sort("key") - ) - expected = ( - df.groupby("key") - .agg(sf.min("v").alias("min"), sf.max("v").alias("max")) - .sort("key") - ) - self.assertEqual(expected.collect(), result.collect()) + self.assertEqual(expected, result) def test_negative_and_zero_batch_size(self): for batch_size in [0, -1]: diff --git a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py index 8756f824a56c..f81c774c0e91 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py @@ -947,33 +947,41 @@ class ApplyInPandasTestsMixin: self.assertEqual(row[1], 123) def test_arrow_batch_slicing(self): - with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": 1000}): - df = self.spark.range(10000000).select( - (sf.col("id") % 2).alias("key"), sf.col("id").alias("v") + df = self.spark.range(10000000).select( + (sf.col("id") % 2).alias("key"), sf.col("id").alias("v") + ) + cols = {f"col_{i}": sf.col("v") + i for i in range(20)} + df = df.withColumns(cols) + + def min_max_v(pdf): + return pd.DataFrame( + { + "key": [pdf.key.iloc[0]], + "min": [pdf.v.min()], + "max": [pdf.v.max()], + } ) - cols = {f"col_{i}": sf.col("v") + i for i in range(20)} - df = df.withColumns(cols) - def min_max_v(pdf): - return pd.DataFrame( + expected = ( + df.groupby("key").agg(sf.min("v").alias("min"), sf.max("v").alias("max")).sort("key") + ).collect() + + int_max = 2147483647 + for maxRecords, maxBytes in [(1000, int_max), (0, 1048576), (1000, 1048576)]: + with self.subTest(maxRecords=maxRecords, maxBytes=maxBytes): + with self.sql_conf( { - "key": [pdf.key.iloc[0]], - "min": [pdf.v.min()], - "max": [pdf.v.max()], + "spark.sql.execution.arrow.maxRecordsPerBatch": maxRecords, + "spark.sql.execution.arrow.maxBytesPerBatch": maxBytes, } - ) - - result = ( - df.groupBy("key") - .applyInPandas(min_max_v, "key long, min long, max long") - .sort("key") - ) - expected = ( - df.groupby("key") - .agg(sf.min("v").alias("min"), sf.max("v").alias("max")) - .sort("key") - ) - self.assertEqual(expected.collect(), result.collect()) + ): + result = ( + df.groupBy("key") + .applyInPandas(min_max_v, "key long, min long, max long") + .sort("key") + ).collect() + + self.assertEqual(expected, result) def test_negative_and_zero_batch_size(self): for batch_size in [0, -1]: --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org