[ 
https://issues.apache.org/jira/browse/SPARK-38402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17502058#comment-17502058
 ] 

Hyukjin Kwon commented on SPARK-38402:
--------------------------------------

{quote}
we currently cache the data frame before using it , however, its not convenient 
and we would like to see a better user experience.
{quote}

{code}
spark.createDataFrame(df.rdd, df.schema)
{code}

this should work instead of caching


> Improve user experience when working on data frames created from CSV and JSON 
> in PERMISSIVE mode.
> -------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-38402
>                 URL: https://issues.apache.org/jira/browse/SPARK-38402
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.2.1
>            Reporter: Dilip Biswal
>            Priority: Major
>
> In our data processing pipeline, we first process the user supplied data and 
> eliminate invalid/corrupt records. So we parse JSON and CSV files in 
> PERMISSIVE mode where all the invalid records are captured in 
> "_corrupt_record". We then apply predicates on "_corrupt_record" to eliminate 
> the bad records before subjecting the good records further in the processing 
> pipeline.
> We encountered two issues.
> 1. The introduction of "predicate pushdown" for CSV, does not take into 
> account this system generated "_corrupt_column" and tries to push this down 
> to scan resulting in an exception as the column is not part of base schema. 
> 2. Applying predicates on "_corrupt_column" results in a AnalysisException 
> like following.
> {code:java}
> val schema = new StructType()
>   .add("id",IntegerType,true)
>   .add("weight",IntegerType,true) // The weight field is defined wrongly. The 
> actual data contains floating point numbers, while the schema specifies an 
> integer.
>   .add("price",IntegerType,true)
>   .add("_corrupt_record", StringType, true) // The schema contains a special 
> column _corrupt_record, which does not exist in the data. This column 
> captures rows that did not parse correctly.
> val csv_with_wrong_schema = spark.read.format("csv")
>   .option("header", "true")
>   .schema(schema)
>   .load("/FileStore/tables/csv_corrupt_record.csv")
> val badRows = csv_with_wrong_schema.filter($"_corrupt_record".isNotNull)
> 7
> val numBadRows = badRows.count()
>  Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the
> referenced columns only include the internal corrupt record column
> (named _corrupt_record by default). For example:
> spark.read.schema(schema).csv(file).filter($"_corrupt_record".isNotNull).count()
> and spark.read.schema(schema).csv(file).select("_corrupt_record").show().
> Instead, you can cache or save the parsed results and then send the same 
> query.
> For example, val df = spark.read.schema(schema).csv(file).cache() and then
> df.filter($"_corrupt_record".isNotNull).count().
> {code:java}
> For (1), we have disabled predicate pushdown.
> For (2), we currently cache the data frame before using it , however, its not 
> convenient and we would like to see a better user experience.  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to