[
https://issues.apache.org/jira/browse/SPARK-4314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14205477#comment-14205477
]
Sean Owen commented on SPARK-4314:
----------------------------------
To clarify, the suffix is _COPYING_ right? Yeah that's the standard naming for
a destination file during a copy. It's renamed on completion. I think you're
seeing that the file was present when the dir was listed but not when it was
subsequently read. the file could also have been encountered in an intermediate
partly written state.
Normally I'd say "just don't do that" but the point of streaming is to read a
data source that is being written to with new files. I think this and other
streaming processes that read HDFS should filter out "*._COPYING_" files, yes.
> Exception throws when finding new files like intermediate result(_COPYING_
> file) through hdfs interface
> -------------------------------------------------------------------------------------------------------
>
> Key: SPARK-4314
> URL: https://issues.apache.org/jira/browse/SPARK-4314
> Project: Spark
> Issue Type: Bug
> Reporter: maji2014
>
> [Reproduce]
> 1. Run HdfsWordCount interface, such as "ssc.textFileStream(args(0))"
> 2. Upload file to hdfs(reason as followings)
> 3. Exception as followings.
> [Exception stack]
> 14/11/10 01:21:19 DEBUG Client: IPC Client (842425021) connection to
> master/192.168.84.142:9000 from ocdc sending #13
> 14/11/10 01:21:19 ERROR JobScheduler: Error generating jobs for time
> 1415611274000 ms
> org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does
> not exist: hdfs://master:9000/user/spark/200.COPYING
> at
> org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:285)
> at
> org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:340)
> at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:95)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
> at
> org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD$1.apply(FileInputDStream.scala:125)
> at
> org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD$1.apply(FileInputDStream.scala:124)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD(FileInputDStream.scala:124)
> at
> org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:83)
> at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
> at
> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
> at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
> at
> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
> at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
> at
> org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:40)
> at
> org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:40)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at
> org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:40)
> at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
> at
> org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41)
> at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
> at
> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
> at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
> at
> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
> at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
> at
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> at
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:115)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)
> at scala.util.Try$.apply(Try.scala:161)
> at
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:221)
> at
> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:165)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Exception in thread "main" 14/11/10 01:21:19 DEBUG Client: IPC Client
> (842425021) connection to master/192.168.84.142:9000 from ocdc got value #13
> org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does
> not exist: hdfs://master:9000/user/spark/200.COPYING
> at
> org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:285)
> at
> org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:340)
> at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:95)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
> at
> org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD$1.apply(FileInputDStream.scala:125)
> at
> org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD$1.apply(FileInputDStream.scala:124)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD(FileInputDStream.scala:124)
> at
> org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:83)
> at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
> at
> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
> at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
> at
> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
> at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
> at
> org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:40)
> at
> org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:40)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at
> org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:40)
> at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
> at
> org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41)
> at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
> at
> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
> at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
> at
> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
> at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
> at
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> at
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:115)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)
> at scala.util.Try$.apply(Try.scala:161)
> at
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:221)
> at
> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:165)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 14/11/10 01:21:19 DEBUG ProtobufRpcEngine: Call: getListing took 3ms
> [Reason]
> Upload intermediate file 200.COPYING is found by FileInputDStream interface,
> and exception throws when NewHadoopRDD ready to handle non-existent
> 200.COPYING file because file 200.COPYING is changed to file 200 when upload
> is finished
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]