Hi i need to implement MeanImputor - impute missing values with mean. If i set missing values to null - then dataframe aggregation works properly, but in UDF it treats null values to 0.0. Here’s example:

|val df = sc.parallelize(Array(1.0,2.0, null, 3.0, 5.0, null)).toDF df.agg(avg("_1")).first //res45: org.apache.spark.sql.Row = [2.75] df.withColumn("d2", callUDF({(value: Double) => value}, DoubleType, df("d"))),show() d d2 1.0 1.0 2.0 2.0 null 0.0 3.0 3.0 5.0 5.0 null 0.0 val df = sc.parallelize(Array(1.0,2.0, Double.NaN, 3.0, 5.0, Double.NaN)).toDF df.agg(avg("_1")).first //res46: org.apache.spark.sql.Row = [Double.NaN] |

In UDF i cannot compare scala’s Double to null:

|comparing values of types Double and Null using `==' will always yield false [warn] if (value==null) meanValue else value |

With Double.NaN instead of null i can compare in UDF, but aggregation doesn’t work properly. Maybe it’s related to : https://issues.apache.org/jira/browse/SPARK-6573

Thanks,
Peter Rudenko

​

Reply via email to