[ https://issues.apache.org/jira/browse/SPARK-38402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17502058#comment-17502058 ]
Hyukjin Kwon commented on SPARK-38402: -------------------------------------- {quote} we currently cache the data frame before using it , however, its not convenient and we would like to see a better user experience. {quote} {code} spark.createDataFrame(df.rdd, df.schema) {code} this should work instead of caching > Improve user experience when working on data frames created from CSV and JSON > in PERMISSIVE mode. > ------------------------------------------------------------------------------------------------- > > Key: SPARK-38402 > URL: https://issues.apache.org/jira/browse/SPARK-38402 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 3.2.1 > Reporter: Dilip Biswal > Priority: Major > > In our data processing pipeline, we first process the user supplied data and > eliminate invalid/corrupt records. So we parse JSON and CSV files in > PERMISSIVE mode where all the invalid records are captured in > "_corrupt_record". We then apply predicates on "_corrupt_record" to eliminate > the bad records before subjecting the good records further in the processing > pipeline. > We encountered two issues. > 1. The introduction of "predicate pushdown" for CSV, does not take into > account this system generated "_corrupt_column" and tries to push this down > to scan resulting in an exception as the column is not part of base schema. > 2. Applying predicates on "_corrupt_column" results in a AnalysisException > like following. > {code:java} > val schema = new StructType() > .add("id",IntegerType,true) > .add("weight",IntegerType,true) // The weight field is defined wrongly. The > actual data contains floating point numbers, while the schema specifies an > integer. > .add("price",IntegerType,true) > .add("_corrupt_record", StringType, true) // The schema contains a special > column _corrupt_record, which does not exist in the data. This column > captures rows that did not parse correctly. > val csv_with_wrong_schema = spark.read.format("csv") > .option("header", "true") > .schema(schema) > .load("/FileStore/tables/csv_corrupt_record.csv") > val badRows = csv_with_wrong_schema.filter($"_corrupt_record".isNotNull) > 7 > val numBadRows = badRows.count() > Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the > referenced columns only include the internal corrupt record column > (named _corrupt_record by default). For example: > spark.read.schema(schema).csv(file).filter($"_corrupt_record".isNotNull).count() > and spark.read.schema(schema).csv(file).select("_corrupt_record").show(). > Instead, you can cache or save the parsed results and then send the same > query. > For example, val df = spark.read.schema(schema).csv(file).cache() and then > df.filter($"_corrupt_record".isNotNull).count(). > {code:java} > For (1), we have disabled predicate pushdown. > For (2), we currently cache the data frame before using it , however, its not > convenient and we would like to see a better user experience. > -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org