[ 
https://issues.apache.org/jira/browse/SPARK-48361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Chester Jenks updated SPARK-48361:
--------------------------------------
    Description: 
Using corrupt record in CSV parsing for some data cleaning logic, I came across 
a correctness bug.

 

The following repro can be ran with spark-shell 3.5.1.

*Create test.csv with the following content:*
{code:java}
test,1,2,three
four,5,6,seven
8,9
ten,11,12,thirteen {code}
 

 

*In spark-shell:*
{code:java}
import org.apache.spark.sql.types._ 
import org.apache.spark.sql.functions._
 
# define a STRING, DOUBLE, DOUBLE, STRING schema for the data
val schema = StructType(List(StructField("column1", StringType, true), 
StructField("column2", DoubleType, true), StructField("column3", DoubleType, 
true), StructField("column4", StringType, true)))
 
# add a column for corrupt records to the schema
val schemaWithCorrupt = StructType(schema.fields :+ 
StructField("_corrupt_record", StringType, true)) 
 
# read the CSV with the schema, headers, permissive parsing, and the corrupt 
record column
val df = spark.read.option("header", "true").option("mode", 
"PERMISSIVE").option("columnNameOfCorruptRecord", 
"_corrupt_record").schema(schemaWithCorrupt).csv("test.csv") 
 
# define a UDF to count the commas in the corrupt record column
val countCommas = udf((s: String) => if (s != null) s.count(_ == ',') else -1) 
 
# add a true/false column for whether the number of commas is 3
val dfWithJagged = df.withColumn("__is_jagged", 
when(col("_corrupt_record").isNull, 
false).otherwise(countCommas(col("_corrupt_record")) =!= 3))

dfWithJagged.show(){code}
*Returns:*
{code:java}
+-------+-------+-------+--------+---------------+-----------+
|column1|column2|column3| column4|_corrupt_record|__is_jagged|
+-------+-------+-------+--------+---------------+-----------+
|   four|    5.0|    6.0|   seven|           NULL|      false|
|      8|    9.0|   NULL|    NULL|            8,9|       true|
|    ten|   11.0|   12.0|thirteen|           NULL|      false|
+-------+-------+-------+--------+---------------+-----------+ {code}
So far so good...

 

*BUT*

 

*If we add an aggregate before we show:*
{code:java}
import org.apache.spark.sql.types._ 
import org.apache.spark.sql.functions._
 
# define a STRING, DOUBLE, DOUBLE, STRING schema for the data
val schema = StructType(List(StructField("column1", StringType, true), 
StructField("column2", DoubleType, true), StructField("column3", DoubleType, 
true), StructField("column4", StringType, true)))
 
# add a column for corrupt records to the schema
val schemaWithCorrupt = StructType(schema.fields :+ 
StructField("_corrupt_record", StringType, true)) 
 
# read the CSV with the schema, headers, permissive parsing, and the corrupt 
record column
val df = spark.read.option("header", "true").option("mode", 
"PERMISSIVE").option("columnNameOfCorruptRecord", 
"_corrupt_record").schema(schemaWithCorrupt).csv("test.csv") 
 
# define a UDF to count the commas in the corrupt record column
val countCommas = udf((s: String) => if (s != null) s.count(_ == ',') else -1) 
 
# add a true/false column for whether the number of commas is 3
val dfWithJagged = df.withColumn("__is_jagged", 
when(col("_corrupt_record").isNull, 
false).otherwise(countCommas(col("_corrupt_record")) =!= 3))
  
# sum up column1
val groupedSum = 
dfWithJagged.groupBy("column1").agg(sum("column2").alias("sum_column2"))

groupedSum.show(){code}
*We get:*
{code:java}
+-------+-----------+
|column1|sum_column2|
+-------+-----------+
|      8|        9.0|
|   four|        5.0|
|    ten|       11.0|
+-------+-----------+ {code}
 

*Which is not correct*

 

With the addition of the aggregate, the filter down to rows with 3 commas in 
the corrupt record column is ignored. This does not happed with any other 
operators I have tried - just aggregates so far.

 

 

 

  was:
Using corrupt record in CSV parsing for some data cleaning logic, I came across 
a correctness bug.

 

The following repro can be ran with spark-shell 3.5.1.

*Create test.csv with the following content:*

 
{code:java}
test,1,2,three
four,5,6,seven
8,9
ten,11,12,thirteen {code}
 

 

*In spark-shell:*

 
{code:java}
import org.apache.spark.sql.types._ 
import org.apache.spark.sql.functions._
 
# define a STRING, DOUBLE, DOUBLE, STRING schema for the data
val schema = StructType(List(StructField("column1", StringType, true), 
StructField("column2", DoubleType, true), StructField("column3", DoubleType, 
true), StructField("column4", StringType, true)))
 
# add a column for corrupt records to the schema
val schemaWithCorrupt = StructType(schema.fields :+ 
StructField("_corrupt_record", StringType, true)) 
 
# read the CSV with the schema, headers, permissive parsing, and the corrupt 
record column
val df = spark.read.option("header", "true").option("mode", 
"PERMISSIVE").option("columnNameOfCorruptRecord", 
"_corrupt_record").schema(schemaWithCorrupt).csv("test.csv") 
 
# define a UDF to count the commas in the corrupt record column
val countCommas = udf((s: String) => if (s != null) s.count(_ == ',') else -1) 
 
# add a true/false column for whether the number of commas is 3
val dfWithJagged = df.withColumn("__is_jagged", 
when(col("_corrupt_record").isNull, 
false).otherwise(countCommas(col("_corrupt_record")) =!= 3))

dfWithJagged.show(){code}
*Returns:*
{code:java}
+-------+-------+-------+--------+---------------+-----------+
|column1|column2|column3| column4|_corrupt_record|__is_jagged|
+-------+-------+-------+--------+---------------+-----------+
|   four|    5.0|    6.0|   seven|           NULL|      false|
|      8|    9.0|   NULL|    NULL|            8,9|       true|
|    ten|   11.0|   12.0|thirteen|           NULL|      false|
+-------+-------+-------+--------+---------------+-----------+ {code}
So far so good...

 

*BUT*

 

*If we add an aggregate before we show:*
{code:java}
import org.apache.spark.sql.types._ 
import org.apache.spark.sql.functions._
 
# define a STRING, DOUBLE, DOUBLE, STRING schema for the data
val schema = StructType(List(StructField("column1", StringType, true), 
StructField("column2", DoubleType, true), StructField("column3", DoubleType, 
true), StructField("column4", StringType, true)))
 
# add a column for corrupt records to the schema
val schemaWithCorrupt = StructType(schema.fields :+ 
StructField("_corrupt_record", StringType, true)) 
 
# read the CSV with the schema, headers, permissive parsing, and the corrupt 
record column
val df = spark.read.option("header", "true").option("mode", 
"PERMISSIVE").option("columnNameOfCorruptRecord", 
"_corrupt_record").schema(schemaWithCorrupt).csv("test.csv") 
 
# define a UDF to count the commas in the corrupt record column
val countCommas = udf((s: String) => if (s != null) s.count(_ == ',') else -1) 
 
# add a true/false column for whether the number of commas is 3
val dfWithJagged = df.withColumn("__is_jagged", 
when(col("_corrupt_record").isNull, 
false).otherwise(countCommas(col("_corrupt_record")) =!= 3))
  
# sum up column1
val groupedSum = 
dfWithJagged.groupBy("column1").agg(sum("column2").alias("sum_column2"))

groupedSum.show(){code}
*We get:*
{code:java}
+-------+-----------+
|column1|sum_column2|
+-------+-----------+
|      8|        9.0|
|   four|        5.0|
|    ten|       11.0|
+-------+-----------+ {code}
*Which is not correct*

 

With the addition of the aggregate, the filter down to rows with 3 commas in 
the corrupt record column is ignored. This does not happed with any other 
operators I have tried - just aggregates so far.

 

 

 


> Correctness: CSV corrupt record filter with aggregate ignored
> -------------------------------------------------------------
>
>                 Key: SPARK-48361
>                 URL: https://issues.apache.org/jira/browse/SPARK-48361
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.5.1
>         Environment: Using spark shell 3.5.1 on M1 Mac
>            Reporter: Ted Chester Jenks
>            Priority: Major
>
> Using corrupt record in CSV parsing for some data cleaning logic, I came 
> across a correctness bug.
>  
> The following repro can be ran with spark-shell 3.5.1.
> *Create test.csv with the following content:*
> {code:java}
> test,1,2,three
> four,5,6,seven
> 8,9
> ten,11,12,thirteen {code}
>  
>  
> *In spark-shell:*
> {code:java}
> import org.apache.spark.sql.types._ 
> import org.apache.spark.sql.functions._
>  
> # define a STRING, DOUBLE, DOUBLE, STRING schema for the data
> val schema = StructType(List(StructField("column1", StringType, true), 
> StructField("column2", DoubleType, true), StructField("column3", DoubleType, 
> true), StructField("column4", StringType, true)))
>  
> # add a column for corrupt records to the schema
> val schemaWithCorrupt = StructType(schema.fields :+ 
> StructField("_corrupt_record", StringType, true)) 
>  
> # read the CSV with the schema, headers, permissive parsing, and the corrupt 
> record column
> val df = spark.read.option("header", "true").option("mode", 
> "PERMISSIVE").option("columnNameOfCorruptRecord", 
> "_corrupt_record").schema(schemaWithCorrupt).csv("test.csv") 
>  
> # define a UDF to count the commas in the corrupt record column
> val countCommas = udf((s: String) => if (s != null) s.count(_ == ',') else 
> -1) 
>  
> # add a true/false column for whether the number of commas is 3
> val dfWithJagged = df.withColumn("__is_jagged", 
> when(col("_corrupt_record").isNull, 
> false).otherwise(countCommas(col("_corrupt_record")) =!= 3))
> dfWithJagged.show(){code}
> *Returns:*
> {code:java}
> +-------+-------+-------+--------+---------------+-----------+
> |column1|column2|column3| column4|_corrupt_record|__is_jagged|
> +-------+-------+-------+--------+---------------+-----------+
> |   four|    5.0|    6.0|   seven|           NULL|      false|
> |      8|    9.0|   NULL|    NULL|            8,9|       true|
> |    ten|   11.0|   12.0|thirteen|           NULL|      false|
> +-------+-------+-------+--------+---------------+-----------+ {code}
> So far so good...
>  
> *BUT*
>  
> *If we add an aggregate before we show:*
> {code:java}
> import org.apache.spark.sql.types._ 
> import org.apache.spark.sql.functions._
>  
> # define a STRING, DOUBLE, DOUBLE, STRING schema for the data
> val schema = StructType(List(StructField("column1", StringType, true), 
> StructField("column2", DoubleType, true), StructField("column3", DoubleType, 
> true), StructField("column4", StringType, true)))
>  
> # add a column for corrupt records to the schema
> val schemaWithCorrupt = StructType(schema.fields :+ 
> StructField("_corrupt_record", StringType, true)) 
>  
> # read the CSV with the schema, headers, permissive parsing, and the corrupt 
> record column
> val df = spark.read.option("header", "true").option("mode", 
> "PERMISSIVE").option("columnNameOfCorruptRecord", 
> "_corrupt_record").schema(schemaWithCorrupt).csv("test.csv") 
>  
> # define a UDF to count the commas in the corrupt record column
> val countCommas = udf((s: String) => if (s != null) s.count(_ == ',') else 
> -1) 
>  
> # add a true/false column for whether the number of commas is 3
> val dfWithJagged = df.withColumn("__is_jagged", 
> when(col("_corrupt_record").isNull, 
> false).otherwise(countCommas(col("_corrupt_record")) =!= 3))
>   
> # sum up column1
> val groupedSum = 
> dfWithJagged.groupBy("column1").agg(sum("column2").alias("sum_column2"))
> groupedSum.show(){code}
> *We get:*
> {code:java}
> +-------+-----------+
> |column1|sum_column2|
> +-------+-----------+
> |      8|        9.0|
> |   four|        5.0|
> |    ten|       11.0|
> +-------+-----------+ {code}
>  
> *Which is not correct*
>  
> With the addition of the aggregate, the filter down to rows with 3 commas in 
> the corrupt record column is ignored. This does not happed with any other 
> operators I have tried - just aggregates so far.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to