[
https://issues.apache.org/jira/browse/SPARK-28502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16968703#comment-16968703
]
Nasir Ali edited comment on SPARK-28502 at 11/6/19 9:17 PM:
------------------------------------------------------------
[~bryanc] If I perform any agg (e.g., _df.groupBy("id",F.window("ts", "15
days")).agg(\{"id":"avg"}).show()_ ) on grouped data, pyspark returns me the
key (e.g., id, window) with the avg for each group. However, in the above
example, when udf returns the struct, it does not automatically return the key.
I have to manually add window to returning dataframe. Is there a way to
automatically concatenate results of udf?
was (Author: nasirali):
[~bryanc] If I perform any agg (e.g., avg) on grouped data, pyspark returns me
the key (e.g., window etc.) with the avg for each row. However, in the above
example, when udf returns the struct, it does not automatically return the key.
I have to manually add window to returning dataframe. Is there a way to
automatically concatenate results of udf with key?
> Error with struct conversion while using pandas_udf
> ---------------------------------------------------
>
> Key: SPARK-28502
> URL: https://issues.apache.org/jira/browse/SPARK-28502
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 2.4.3
> Environment: OS: Ubuntu
> Python: 3.6
> Reporter: Nasir Ali
> Priority: Minor
> Fix For: 3.0.0
>
>
> What I am trying to do: Group data based on time intervals (e.g., 15 days
> window) and perform some operations on dataframe using (pandas) UDFs. I don't
> know if there is a better/cleaner way to do it.
> Below is the sample code that I tried and error message I am getting.
>
> {code:java}
> df = sparkSession.createDataFrame([(17.00, "2018-03-10T15:27:18+00:00"),
> (13.00, "2018-03-11T12:27:18+00:00"),
> (25.00, "2018-03-12T11:27:18+00:00"),
> (20.00, "2018-03-13T15:27:18+00:00"),
> (17.00, "2018-03-14T12:27:18+00:00"),
> (99.00, "2018-03-15T11:27:18+00:00"),
> (156.00, "2018-03-22T11:27:18+00:00"),
> (17.00, "2018-03-31T11:27:18+00:00"),
> (25.00, "2018-03-15T11:27:18+00:00"),
> (25.00, "2018-03-16T11:27:18+00:00")
> ],
> ["id", "ts"])
> df = df.withColumn('ts', df.ts.cast('timestamp'))
> schema = StructType([
> StructField("id", IntegerType()),
> StructField("ts", TimestampType())
> ])
> @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
> def some_udf(df):
> # some computation
> return df
> df.groupby('id', F.window("ts", "15 days")).apply(some_udf).show()
> {code}
> This throws following exception:
> {code:java}
> TypeError: Unsupported type in conversion from Arrow: struct<start:
> timestamp[us, tz=America/Chicago], end: timestamp[us, tz=America/Chicago]>
> {code}
>
> However, if I use builtin agg method then it works all fine. For example,
> {code:java}
> df.groupby('id', F.window("ts", "15 days")).mean().show(truncate=False)
> {code}
> Output
> {code:java}
> +-----+------------------------------------------+-------+
> |id |window |avg(id)|
> +-----+------------------------------------------+-------+
> |13.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|13.0 |
> |17.0 |[2018-03-20 00:00:00, 2018-04-04 00:00:00]|17.0 |
> |156.0|[2018-03-20 00:00:00, 2018-04-04 00:00:00]|156.0 |
> |99.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|99.0 |
> |20.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|20.0 |
> |17.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|17.0 |
> |25.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|25.0 |
> +-----+------------------------------------------+-------+
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]