This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d933dc58cfd [SPARK-43298][PYTHON][ML] predict_batch_udf with scalar
input fails with batch size of one
d933dc58cfd is described below
commit d933dc58cfdd9812c8ad266f1baae7b341c793d1
Author: Lee Yang <[email protected]>
AuthorDate: Thu Apr 27 11:29:11 2023 -0700
[SPARK-43298][PYTHON][ML] predict_batch_udf with scalar input fails with
batch size of one
### What changes were proposed in this pull request?
This is a followup to #39817 to handle another error condition when the
input batch is a single scalar value (where the previous fix focused on a
single scalar value output).
### Why are the changes needed?
Using `predict_batch_udf` fails when the input batch size is one.
```
import numpy as np
from pyspark.ml.functions import predict_batch_udf
from pyspark.sql.types import DoubleType
df = spark.createDataFrame([[1.0],[2.0]], schema=["a"])
def make_predict_fn():
def predict(inputs):
return inputs
return predict
identity = predict_batch_udf(make_predict_fn, return_type=DoubleType(),
batch_size=1)
preds = df.withColumn("preds", identity("a")).show()
```
fails with:
```
File "/.../spark/python/pyspark/worker.py", line 869, in main
process()
File "/.../spark/python/pyspark/worker.py", line 861, in process
serializer.dump_stream(out_iter, outfile)
File "/.../spark/python/pyspark/sql/pandas/serializers.py", line 354, in
dump_stream
return ArrowStreamSerializer.dump_stream(self,
init_stream_yield_batches(), stream)
File "/.../spark/python/pyspark/sql/pandas/serializers.py", line 86, in
dump_stream
for batch in iterator:
File "/.../spark/python/pyspark/sql/pandas/serializers.py", line 347, in
init_stream_yield_batches
for series in iterator:
File "/.../spark/python/pyspark/worker.py", line 555, in func
for result_batch, result_type in result_iter:
File "/.../spark/python/pyspark/ml/functions.py", line 818, in predict
yield _validate_and_transform_prediction_result(
File "/.../spark/python/pyspark/ml/functions.py", line 339, in
_validate_and_transform_prediction_result
if len(preds_array) != num_input_rows:
TypeError: len() of unsized object
```
After the fix:
```
+---+-----+
| a|preds|
+---+-----+
|1.0| 1.0|
|2.0| 2.0|
+---+-----+
```
### Does this PR introduce _any_ user-facing change?
This fixes a bug in the feature that was released in Spark 3.4.0.
### How was this patch tested?
Unit test was added.
Closes #40967 from leewyang/SPARK-43298.
Authored-by: Lee Yang <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/pyspark/ml/functions.py | 5 +++--
python/pyspark/ml/tests/test_functions.py | 5 +++++
2 files changed, 8 insertions(+), 2 deletions(-)
diff --git a/python/pyspark/ml/functions.py b/python/pyspark/ml/functions.py
index 4ad239cb5f0..bcd7fc2f47b 100644
--- a/python/pyspark/ml/functions.py
+++ b/python/pyspark/ml/functions.py
@@ -236,7 +236,8 @@ def _validate_and_transform_single_input(
# scalar columns
if len(batch.columns) == 1:
# single scalar column, remove extra dim
- single_input = np.squeeze(batch.to_numpy())
+ np_batch = batch.to_numpy()
+ single_input = np.squeeze(np_batch, -1) if len(np_batch.shape) > 1
else np_batch
if input_shapes and input_shapes[0] not in [None, [], [1]]:
raise ValueError("Invalid input_tensor_shape for scalar
column.")
elif not has_tuple:
@@ -344,7 +345,7 @@ def _validate_and_transform_prediction_result(
):
raise ValueError("Invalid shape for scalar prediction result.")
- output = np.squeeze(preds) # type: ignore[arg-type]
+ output = np.squeeze(preds, -1) if len(preds.shape) > 1 else preds #
type: ignore[arg-type]
return pd.Series(output).astype(output.dtype)
else:
raise ValueError("Unsupported return type")
diff --git a/python/pyspark/ml/tests/test_functions.py
b/python/pyspark/ml/tests/test_functions.py
index 6c2268b0968..894db2f8a7d 100644
--- a/python/pyspark/ml/tests/test_functions.py
+++ b/python/pyspark/ml/tests/test_functions.py
@@ -72,6 +72,11 @@ class PredictBatchUDFTests(SparkSessionTestCase):
with self.assertRaisesRegex(Exception, "Multiple input columns found,
but model expected"):
preds = self.df.withColumn("preds", identity("a", "b")).toPandas()
+ # batch_size 1
+ identity = predict_batch_udf(make_predict_fn,
return_type=DoubleType(), batch_size=1)
+ preds = self.df.withColumn("preds", identity("a")).toPandas()
+ self.assertTrue(preds["a"].equals(preds["preds"]))
+
def test_identity_multi(self):
# single input model
def make_predict_fn():
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]