[ 
https://issues.apache.org/jira/browse/SPARK-27593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847734#comment-16847734
 ] 

Ladislav Jech commented on SPARK-27593:
---------------------------------------

I cannot get this working, I have following code:
{code:java}
def main():
 customSchema = StructType([
 StructField("project", StringType(), True),
 StructField("article", StringType(), True),
 StructField("requests", IntegerType(), True),
 StructField("bytes_served", DoubleType(), True)
 ])

 spark = SparkSession \
 .builder \
 .appName("RandomForest") \
 .config("spark.executor.heartbeatInterval", "60s") \
 .getOrCreate()
 malformedRecords = []
 sc = spark.sparkContext
 sc.setLogLevel("INFO")
 sqlContext = SQLContext(sc)
 dataframe = sqlContext.read.format("csv") \
 .option("schema", customSchema)\
 .option("quote", "")\
 .option("inferSchema", False)\
 .option("sep", ",")\
 .option("mode", "PERMISSIVE")\
 .option("columnNameOfCorruptRecord","ErrorField")\
 .option("multiLine", False)\
 .option("path","/home/zangetsu/data.csv")\
 .load()

 total_row_count = dataframe.count()
 dataframe.printSchema()
 dataframe.show()
 print("total_row_count = " + str(total_row_count))

 errors = dataframe.filter(col("ErrorField").isNotNull())
 errors.show(){code}
And following CSV content:
{code:java}
alfa,beta,gamma,delta
1,2,3,4
something, some
thing else, s,s{code}
So first 2 rows are ok as per schema, 2 others are not.

First issue I noticed is that the schema is not reflected on DF:
{code:java}
+----------+-----+-----+-----+
| _c0| _c1| _c2| _c3|
+----------+-----+-----+-----+
| alfa| beta|gamma|delta|
| 1| 2| 3| 4|
| something| some| null| null|
|thing else| s| s| null|
+----------+-----+-----+-----+{code}
So the data are loaded all, and also column names are not those provided by 
custom schema. it also means the ErrorField column is not available.

> CSV Parser returns 2 DataFrame - Valid and Malformed DFs
> --------------------------------------------------------
>
>                 Key: SPARK-27593
>                 URL: https://issues.apache.org/jira/browse/SPARK-27593
>             Project: Spark
>          Issue Type: New Feature
>          Components: Spark Core
>    Affects Versions: 2.4.2
>            Reporter: Ladislav Jech
>            Priority: Major
>
> When we process CSV in any kind of data warehouse, its common procedure to 
> report corrupted records for audit purposes and feedback back to vendor, so 
> they can enhance their procedure. CSV is no difference from XSD from 
> perspective that it define a schema although in very limited way (in some 
> cases only as number of columns without even headers, and we don't have 
> types), but when I check XML document against XSD file, I get exact report of 
> if the file is completely valid and if not I get exact report of what records 
> are not following schema. 
> Such feature will have big value in Spark for CSV, get malformed records into 
> some dataframe, with line count (pointer within the data object), so I can 
> log both pointer and real data (line/row) and trigger action on this 
> unfortunate event.
> load() method could return Array of DFs (Valid, Invalid)
> PERMISSIVE MODE isn't enough as soon as it fill missing fields with nulls, so 
> it is even harder to detect what is really wrong. Another approach at moment 
> is to read both permissive and dropmalformed modes into 2 dataframes and 
> compare those one against each other.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to