[ 
https://issues.apache.org/jira/browse/SPARK-24364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon reassigned SPARK-24364:
------------------------------------

    Assignee: Hyukjin Kwon

> Files deletion after globbing may fail StructuredStreaming jobs
> ---------------------------------------------------------------
>
>                 Key: SPARK-24364
>                 URL: https://issues.apache.org/jira/browse/SPARK-24364
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.3.0, 2.4.0
>            Reporter: Hyukjin Kwon
>            Assignee: Hyukjin Kwon
>            Priority: Major
>             Fix For: 2.3.1, 2.4.0
>
>
> This is related with SPARK-17599. SPARK-17599 checked the directory only but 
> actually it can file on another FileSystem operation when the file is not 
> found. For example see:
> {code}
> Error occurred while processing: File does not exist: 
> /rel/00171151/input/PJ/part-00136-b6403bac-a240-44f8-a792-fc2e174682b7-c000.csv
>       at 
> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
>       at 
> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:2029)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:2000)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1913)
>       at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:700)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:377)
>       at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
>       at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>       at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351)
>       at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:422)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869)
>       at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2347)
> java.io.FileNotFoundException: File does not exist: 
> /rel/00171151/input/PJ/part-00136-b6403bac-a240-44f8-a792-fc2e174682b7-c000.csv
>       at 
> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
>       at 
> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:2029)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:2000)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1913)
>       at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:700)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:377)
>       at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
>       at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>       at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351)
>       at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:422)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869)
>       at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2347)
>       at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>       at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>       at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>       at 
> org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
>       at 
> org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
>       at 
> org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1251)
>       at 
> org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1236)
>       at 
> org.apache.hadoop.hdfs.DFSClient.getBlockLocations(DFSClient.java:1294)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem$2.doCall(DistributedFileSystem.java:242)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem$2.doCall(DistributedFileSystem.java:238)
>       at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:249)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:229)
>       at 
> org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$listLeafFiles$3.apply(InMemoryFileIndex.scala:314)
>       at 
> org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$listLeafFiles$3.apply(InMemoryFileIndex.scala:297)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>       at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>       at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
>       at 
> org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$listLeafFiles(InMemoryFileIndex.scala:297)
>       at 
> org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$bulkListLeafFiles$1.apply(InMemoryFileIndex.scala:174)
>       at 
> org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$bulkListLeafFiles$1.apply(InMemoryFileIndex.scala:173)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>       at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>       at 
> org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$bulkListLeafFiles(InMemoryFileIndex.scala:173)
>       at 
> org.apache.spark.sql.execution.datasources.InMemoryFileIndex.listLeafFiles(InMemoryFileIndex.scala:126)
>       at 
> org.apache.spark.sql.execution.datasources.InMemoryFileIndex.refresh0(InMemoryFileIndex.scala:91)
>       at 
> org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(InMemoryFileIndex.scala:67)
>       at 
> org.apache.spark.sql.execution.datasources.DataSource.tempFileIndex$lzycompute$1(DataSource.scala:161)
>       at 
> org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$tempFileIndex$1(DataSource.scala:152)
>       at 
> org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:166)
>       at 
> org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:261)
>       at 
> org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:94)
>       at 
> org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:94)
>       at 
> org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:33)
>       at 
> org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:196)
>       at 
> org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:206)
>       at com.hwx.StreamTest$.main(StreamTest.scala:97)
>       at com.hwx.StreamTest.main(StreamTest.scala)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
>       at 
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:906)
>       at 
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
>       at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
>       at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
>       at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File 
> does not exist: 
> /rel/00171151/input/PJ/part-00136-b6403bac-a240-44f8-a792-fc2e174682b7-c000.csv
>       at 
> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
>       at 
> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:2029)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:2000)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1913)
>       at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:700)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:377)
>       at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
>       at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>       at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351)
>       at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:422)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869)
>       at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2347)
>       at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1554)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1498)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1398)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233)
>       at com.sun.proxy.$Proxy12.getBlockLocations(Unknown Source)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:272)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:290)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:202)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:184)
>       at com.sun.proxy.$Proxy13.getBlockLocations(Unknown Source)
>       at 
> org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1249)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to