[ 
https://issues.apache.org/jira/browse/SPARK-29009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16926552#comment-16926552
 ] 

Tomasz Belina commented on SPARK-29009:
---------------------------------------

I've dig  a little dipper into source code and it looks like only Row and 
simple types are supported. I consider this issue as a bug because this peace 
of code:
{code:java}
Dataset<Row> test= spark.createDataFrame(
Arrays.asList(
new Movie("movie1",2323d,"1212"),
new Movie("movie2",2323d,"1212"),
new Movie("movie3",2323d,"1212"),
new Movie("movie4",2323d,"1212")), 
Movie.class);
{code}
works perfectly well and it means that spark is perfectly able to handle pojos 
and convert it into Row in same cases. I was surprised that in case of udf 
conversion into Row is not applied automatically. Additionally documentation 
for udf is not very extensive so it quite hard distinguish what is a bug and 
what is a feature.

Simple checking if given type of value returned by udf is supported or not 
would be very helpful.

 

> Returning pojo from udf not working
> -----------------------------------
>
>                 Key: SPARK-29009
>                 URL: https://issues.apache.org/jira/browse/SPARK-29009
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.4.3
>            Reporter: Tomasz Belina
>            Priority: Major
>
>  It looks like spark is unable to construct row from pojo returned from udf.
> Give POJO:
> {code:java}
> public class SegmentStub {
>     private int id;
>     private Date statusDateTime;
>     private int healthPointRatio;
> }
> {code}
> Registration of the UDF:
> {code:java}
> public class ParseResultsUdf {
>     public String registerUdf(SparkSession sparkSession) {
>         Encoder<SegmentStub> encoder = Encoders.bean(SegmentStub.class);
>         final StructType schema = encoder.schema();
>         sparkSession.udf().register(UDF_NAME,
>                 (UDF2<String, String, SegmentStub>) (s, s2) -> new 
> SegmentStub(1, Date.valueOf(LocalDate.now()), 2),
>                 schema
>         );
>         return UDF_NAME;
>     }
> }
> {code}
> Test code:
> {code:java}
>         List<String[]> strings = Arrays.asList(new String[]{"one", "two"},new 
> String[]{"3", "4"});
>         JavaRDD<Row> rowJavaRDD = 
> sparkContext.parallelize(strings).map(RowFactory::create);
>         StructType schema = DataTypes
>                 .createStructType(new StructField[] { 
> DataTypes.createStructField("foe1", DataTypes.StringType, false),
>                         DataTypes.createStructField("foe2", 
> DataTypes.StringType, false) });
>         Dataset<Row> dataFrame = 
> sparkSession.sqlContext().createDataFrame(rowJavaRDD, schema);
>         Seq<Column> columnSeq = new Set.Set2<>(col("foe1"), 
> col("foe2")).toSeq();
>         dataFrame.select(callUDF(udfName, columnSeq)).show();
> {code}
>  throws exception: 
> {code:java}
> Caused by: java.lang.IllegalArgumentException: The value (SegmentStub(id=1, 
> statusDateTime=2019-09-06, healthPointRatio=2)) of the type (udf.SegmentStub) 
> cannot be converted to struct<healthPointRatio:int,id:int,statusDateTime:date>
>       at 
> org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:262)
>       at 
> org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:238)
>       at 
> org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
>       at 
> org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:396)
>       ... 21 more
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to