Matt Hawes created SPARK-28818:
----------------------------------
Summary: FreqItems applies an incorrect schema to the resulting
dataframe when nulls are present
Key: SPARK-28818
URL: https://issues.apache.org/jira/browse/SPARK-28818
Project: Spark
Issue Type: Bug
Components: Spark Core
Affects Versions: 2.4.3
Reporter: Matt Hawes
A trivially reproducible bug in the code for `FrequentItems`. The schema for
the resulting arrays of frequent items is [hard coded|#L122]] to have
non-nullable array elements:
{code:scala}
val outputCols = colInfo.map { v =>
StructField(v._1 + "_freqItems", ArrayType(v._2, false))
}
val schema = StructType(outputCols).toAttributes
Dataset.ofRows(df.sparkSession, LocalRelation.fromExternalRows(schema,
Seq(resultRow)))
{code}
However if the column contains frequent nulls then these nulls are included in
the frequent items array. This results in various errors such as any attempt to
`collect()` resulting in a null pointer exception:
{code:python}
from pyspark.sql import SparkSession
spark = SparkSession.Builder().getOrCreate()
df = spark.createDataFrame([
(1, 'a'),
(2, None),
(3, 'b'),
], schema="id INTEGER, val STRING")
rows = df.freqItems(df.columns).collect()
{code}
Results in:
{code:java}
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File
"/usr/local/bin/spark-2.4.3-bin-hadoop2.7/python/pyspark/sql/dataframe.py",
line 533, in collect
sock_info = self._jdf.collectToPython()
File
"/usr/local/bin/spark-2.4.3-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
line 1257, in __call__
File "/usr/local/bin/spark-2.4.3-bin-hadoop2.7/python/pyspark/sql/utils.py",
line 63, in deco
return f(*a, **kw)
File
"/usr/local/bin/spark-2.4.3-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py",
line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling
o40.collectToPython.
: java.lang.NullPointerException
at
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.write(UnsafeWriter.java:109)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
Source)
at
org.apache.spark.sql.execution.LocalTableScanExec$$anonfun$unsafeRows$1.apply(LocalTableScanExec.scala:44)
at
org.apache.spark.sql.execution.LocalTableScanExec$$anonfun$unsafeRows$1.apply(LocalTableScanExec.scala:44)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:296)
at
org.apache.spark.sql.execution.LocalTableScanExec.unsafeRows$lzycompute(LocalTableScanExec.scala:44)
at
org.apache.spark.sql.execution.LocalTableScanExec.unsafeRows(LocalTableScanExec.scala:39)
at
org.apache.spark.sql.execution.LocalTableScanExec.executeCollect(LocalTableScanExec.scala:70)
at
org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3257)
at
org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3254)
at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
at
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3254)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
{code}
Unclear if the hardcoding is at fault or if the algorithm is actually designed
to not return nulls even if they are frequent. In which case the hard coding
would be appropriate. I'll put a PR in that assumes that the hardcoding is the
bug unless people know otherwise?
--
This message was sent by Atlassian Jira
(v8.3.2#803003)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]