This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 6f8b3e0 [SPARK-31287][PYTHON][SQL] Ignore type hints in
groupby.(cogroup.)applyInPandas and mapInPandas
6f8b3e0 is described below
commit 6f8b3e0bea3c6639ba7648b41580198ad925683a
Author: HyukjinKwon <[email protected]>
AuthorDate: Sun Mar 29 13:59:18 2020 +0900
[SPARK-31287][PYTHON][SQL] Ignore type hints in
groupby.(cogroup.)applyInPandas and mapInPandas
### What changes were proposed in this pull request?
This PR proposes to make pandas function APIs
(`groupby.(cogroup.)applyInPandas` and `mapInPandas`) to ignore Python type
hints.
### Why are the changes needed?
Python type hints are optional. It shouldn't affect where pandas UDFs are
not used.
This is also a future work for them to support other type hints. We
shouldn't at least throw an exception at this moment.
### Does this PR introduce any user-facing change?
No, it's master-only change.
```python
import pandas as pd
def pandas_plus_one(pdf: pd.DataFrame) -> pd.DataFrame:
return pdf + 1
spark.range(10).groupby('id').applyInPandas(pandas_plus_one, schema="id
long").show()
```
```python
import pandas as pd
def pandas_plus_one(left: pd.DataFrame, right: pd.DataFrame) ->
pd.DataFrame:
return left + 1
spark.range(10).groupby('id').cogroup(spark.range(10).groupby("id")).applyInPandas(pandas_plus_one,
schema="id long").show()
```
```python
from typing import Iterator
import pandas as pd
def pandas_plus_one(iter: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
return map(lambda v: v + 1, iter)
spark.range(10).mapInPandas(pandas_plus_one, schema="id long").show()
```
**Before:**
Exception
**After:**
```
+---+
| id|
+---+
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
| 10|
+---+
```
### How was this patch tested?
Closes #28052 from HyukjinKwon/SPARK-31287.
Authored-by: HyukjinKwon <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
(cherry picked from commit 3165a95a04448546ae8955020566d718c6960223)
Signed-off-by: HyukjinKwon <[email protected]>
---
python/pyspark/sql/pandas/functions.py | 8 +++++
.../pyspark/sql/tests/test_pandas_udf_typehints.py | 42 ++++++++++++++++++++++
2 files changed, 50 insertions(+)
diff --git a/python/pyspark/sql/pandas/functions.py
b/python/pyspark/sql/pandas/functions.py
index 31aa321..f43ebf8 100644
--- a/python/pyspark/sql/pandas/functions.py
+++ b/python/pyspark/sql/pandas/functions.py
@@ -384,6 +384,14 @@ def _create_pandas_udf(f, returnType, evalType):
"In Python 3.6+ and Spark 3.0+, it is preferred to specify
type hints for "
"pandas UDF instead of specifying pandas UDF type which will
be deprecated "
"in the future releases. See SPARK-28264 for more details.",
UserWarning)
+ elif evalType in [PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
+ PythonEvalType.SQL_MAP_PANDAS_ITER_UDF,
+ PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF]:
+ # In case of 'SQL_GROUPED_MAP_PANDAS_UDF', deprecation warning is
being triggered
+ # at `apply` instead.
+ # In case of 'SQL_MAP_PANDAS_ITER_UDF' and
'SQL_COGROUPED_MAP_PANDAS_UDF', the
+ # evaluation type will always be set.
+ pass
elif len(argspec.annotations) > 0:
evalType = infer_eval_type(signature(f))
assert evalType is not None
diff --git a/python/pyspark/sql/tests/test_pandas_udf_typehints.py
b/python/pyspark/sql/tests/test_pandas_udf_typehints.py
index 7c83c78..2582080 100644
--- a/python/pyspark/sql/tests/test_pandas_udf_typehints.py
+++ b/python/pyspark/sql/tests/test_pandas_udf_typehints.py
@@ -261,6 +261,48 @@ class PandasUDFTypeHintsTests(ReusedSQLTestCase):
expected = df.groupby('id').agg(mean(df.v).alias('weighted_mean(v,
1.0)')).sort('id')
assert_frame_equal(expected.toPandas(), actual.toPandas())
+ def test_ignore_type_hint_in_group_apply_in_pandas(self):
+ df = self.spark.range(10)
+ exec(
+ "def pandas_plus_one(v: pd.DataFrame) -> pd.DataFrame:\n"
+ " return v + 1",
+ self.local)
+
+ pandas_plus_one = self.local["pandas_plus_one"]
+
+ actual = df.groupby('id').applyInPandas(pandas_plus_one,
schema=df.schema).sort('id')
+ expected = df.selectExpr("id + 1 as id")
+ assert_frame_equal(expected.toPandas(), actual.toPandas())
+
+ def test_ignore_type_hint_in_cogroup_apply_in_pandas(self):
+ df = self.spark.range(10)
+ exec(
+ "def pandas_plus_one(left: pd.DataFrame, right: pd.DataFrame) ->
pd.DataFrame:\n"
+ " return left + 1",
+ self.local)
+
+ pandas_plus_one = self.local["pandas_plus_one"]
+
+ actual = df.groupby('id').cogroup(
+ self.spark.range(10).groupby("id")
+ ).applyInPandas(pandas_plus_one, schema=df.schema).sort('id')
+ expected = df.selectExpr("id + 1 as id")
+ assert_frame_equal(expected.toPandas(), actual.toPandas())
+
+ def test_ignore_type_hint_in_map_in_pandas(self):
+ df = self.spark.range(10)
+ exec(
+ "from typing import Iterator\n"
+ "def pandas_plus_one(iter: Iterator[pd.DataFrame]) ->
Iterator[pd.DataFrame]:\n"
+ " return map(lambda v: v + 1, iter)",
+ self.local)
+
+ pandas_plus_one = self.local["pandas_plus_one"]
+
+ actual = df.mapInPandas(pandas_plus_one, schema=df.schema)
+ expected = df.selectExpr("id + 1 as id")
+ assert_frame_equal(expected.toPandas(), actual.toPandas())
+
if __name__ == "__main__":
from pyspark.sql.tests.test_pandas_udf_typehints import *
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]