[
https://issues.apache.org/jira/browse/SPARK-19885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904747#comment-15904747
]
Hyukjin Kwon commented on SPARK-19885:
--------------------------------------
Thank you for cc'ing me. Up to my knowledge, {{LineReader}} itself assumes the
input is UTF-8 according to according to
[MAPREDUCE-232|https://issues.apache.org/jira/browse/MAPREDUCE-232], it looks
[{{TextInputFormat}}|https://github.com/apache/hadoop/blob/master/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/TextInputFormat.java]
does not guarantee all encoding types but officially only UTF-8 (as commented
in
[{{LineRecordReader#L147}}|https://github.com/apache/hadoop/blob/master/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java#L147]).
So, it seems it only works if the encodings are compatible with this. IMHO,
this is an incorrect implementation which we should remove. We should support
this when actually we starts to read (not relying on {{TextInputFormat}}) but I
guess this is a big job.
BTW, I think we use {{FileScanRDD}} in schema inference only in CSV datasource
when encoding is default in favour of SPARK-18362 and I guess this probably
would not work for JSON too (let me maybe open a PR for JSON one soon).
> The config ignoreCorruptFiles doesn't work for CSV
> --------------------------------------------------
>
> Key: SPARK-19885
> URL: https://issues.apache.org/jira/browse/SPARK-19885
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.1.0
> Reporter: Shixiong Zhu
>
> CSVFileFormat.inferSchema doesn't use FileScanRDD so the SQL
> "ignoreCorruptFiles" doesn't work.
> {code}
> java.io.EOFException: Unexpected end of input stream
> at
> org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:145)
> at
> org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85)
> at java.io.InputStream.read(InputStream.java:101)
> at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)
> at
> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
> at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
> at
> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:248)
> at
> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:48)
> at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:266)
> at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:211)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
> at
> scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:214)
> at scala.collection.AbstractIterator.aggregate(Iterator.scala:1336)
> at
> org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$22.apply(RDD.scala:1112)
> at
> org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$22.apply(RDD.scala:1112)
> at
> org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1980)
> at
> org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1980)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:99)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Driver stacktrace:
> at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
> at scala.Option.foreach(Option.scala:257)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1981)
> at org.apache.spark.rdd.RDD$$anonfun$aggregate$1.apply(RDD.scala:1114)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
> at org.apache.spark.rdd.RDD.aggregate(RDD.scala:1107)
> at
> org.apache.spark.sql.execution.datasources.csv.CSVInferSchema$.infer(CSVInferSchema.scala:47)
> at
> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.inferSchema(CSVFileFormat.scala:67)
> at
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:174)
> at
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:174)
> at scala.Option.orElse(Option.scala:289)
> at
> org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:173)
> at
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:377)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:135)
> {code}
> Right now a workaround is also setting "spark.files.ignoreCorruptFiles" to
> true.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]