[
https://issues.apache.org/jira/browse/SPARK-48361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17848692#comment-17848692
]
Bruce Robbins commented on SPARK-48361:
---------------------------------------
Sorry for being dense. What would the correct answer be?
> Correctness: CSV corrupt record filter with aggregate ignored
> -------------------------------------------------------------
>
> Key: SPARK-48361
> URL: https://issues.apache.org/jira/browse/SPARK-48361
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 3.5.1
> Environment: Using spark shell 3.5.1 on M1 Mac
> Reporter: Ted Chester Jenks
> Priority: Major
>
> Using corrupt record in CSV parsing for some data cleaning logic, I came
> across a correctness bug.
>
> The following repro can be ran with spark-shell 3.5.1.
> *Create test.csv with the following content:*
> {code:java}
> test,1,2,three
> four,5,6,seven
> 8,9
> ten,11,12,thirteen {code}
>
>
> *In spark-shell:*
> {code:java}
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.functions._
>
> # define a STRING, DOUBLE, DOUBLE, STRING schema for the data
> val schema = StructType(List(StructField("column1", StringType, true),
> StructField("column2", DoubleType, true), StructField("column3", DoubleType,
> true), StructField("column4", StringType, true)))
>
> # add a column for corrupt records to the schema
> val schemaWithCorrupt = StructType(schema.fields :+
> StructField("_corrupt_record", StringType, true))
>
> # read the CSV with the schema, headers, permissive parsing, and the corrupt
> record column
> val df = spark.read.option("header", "true").option("mode",
> "PERMISSIVE").option("columnNameOfCorruptRecord",
> "_corrupt_record").schema(schemaWithCorrupt).csv("test.csv")
>
> # define a UDF to count the commas in the corrupt record column
> val countCommas = udf((s: String) => if (s != null) s.count(_ == ',') else
> -1)
>
> # add a true/false column for whether the number of commas is 3
> val dfWithJagged = df.withColumn("__is_jagged",
> when(col("_corrupt_record").isNull,
> false).otherwise(countCommas(col("_corrupt_record")) =!= 3))
> dfWithJagged.show(){code}
> *Returns:*
> {code:java}
> +-------+-------+-------+--------+---------------+-----------+
> |column1|column2|column3| column4|_corrupt_record|__is_jagged|
> +-------+-------+-------+--------+---------------+-----------+
> | four| 5.0| 6.0| seven| NULL| false|
> | 8| 9.0| NULL| NULL| 8,9| true|
> | ten| 11.0| 12.0|thirteen| NULL| false|
> +-------+-------+-------+--------+---------------+-----------+ {code}
> So far so good...
>
> *BUT*
>
> *If we add an aggregate before we show:*
> {code:java}
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.functions._
>
> # define a STRING, DOUBLE, DOUBLE, STRING schema for the data
> val schema = StructType(List(StructField("column1", StringType, true),
> StructField("column2", DoubleType, true), StructField("column3", DoubleType,
> true), StructField("column4", StringType, true)))
>
> # add a column for corrupt records to the schema
> val schemaWithCorrupt = StructType(schema.fields :+
> StructField("_corrupt_record", StringType, true))
>
> # read the CSV with the schema, headers, permissive parsing, and the corrupt
> record column
> val df = spark.read.option("header", "true").option("mode",
> "PERMISSIVE").option("columnNameOfCorruptRecord",
> "_corrupt_record").schema(schemaWithCorrupt).csv("test.csv")
>
> # define a UDF to count the commas in the corrupt record column
> val countCommas = udf((s: String) => if (s != null) s.count(_ == ',') else
> -1)
>
> # add a true/false column for whether the number of commas is 3
> val dfWithJagged = df.withColumn("__is_jagged",
> when(col("_corrupt_record").isNull,
> false).otherwise(countCommas(col("_corrupt_record")) =!= 3))
>
> # sum up column1
> val groupedSum =
> dfWithJagged.groupBy("column1").agg(sum("column2").alias("sum_column2"))
> groupedSum.show(){code}
> *We get:*
> {code:java}
> +-------+-----------+
> |column1|sum_column2|
> +-------+-----------+
> | 8| 9.0|
> | four| 5.0|
> | ten| 11.0|
> +-------+-----------+ {code}
>
> *Which is not correct*
>
> With the addition of the aggregate, the filter down to rows with 3 commas in
> the corrupt record column is ignored. This does not happed with any other
> operators I have tried - just aggregates so far.
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]