Feel free to create a new ticket. Could you also provide the files in
"/usr/local/hadoop/checkpoint/state/0" (Just run "bin/hadoop fs -ls
/usr/local/hadoop/checkpoint/state/0/*") in the ticket and the Spark logs?

On Thu, May 25, 2017 at 2:53 PM, kant kodali <kanth...@gmail.com> wrote:

> Should I file a ticket or should I try another version like Spark 2.2
> since I am currently using 2.1.1?
>
> On Thu, May 25, 2017 at 2:38 PM, kant kodali <kanth...@gmail.com> wrote:
>
>> Hi Ryan,
>>
>> You are right I was setting checkpointLocation for readStream. Now I did
>> set if for writeStream as well  like below
>>
>> StreamingQuery query = df2.writeStream().foreach(new KafkaSink()).option(
>> "checkpointLocation","/usr/local/hadoop/checkpoint").outputMode("update"
>> ).start();
>>
>> query.awaitTermination();
>>
>> *and now I can at very least see there are directories like*
>>
>> -rw-r--r--   2 ubuntu supergroup         45 2017-05-25 21:29 
>> /usr/local/hadoop/checkpoint/metadata
>> drwxr-xr-x   - ubuntu supergroup          0 2017-05-25 21:30 
>> /usr/local/hadoop/checkpoint/offsets
>> drwxr-xr-x   - ubuntu supergroup          0 2017-05-25 21:29 
>> /usr/local/hadoop/checkpoint/sources
>> drwxr-xr-x   - ubuntu supergroup          0 2017-05-25 21:30 
>> /usr/local/hadoop/checkpoint/state
>>
>>
>> However it still fails with
>>
>> *org.apache.hadoop.ipc.RemoteException(java.io 
>> <http://java.io>.FileNotFoundException): File does not exist: 
>> /usr/local/hadoop/checkpoint/state/0/1/1.delta*
>>
>>
>>
>> On Thu, May 25, 2017 at 2:31 PM, Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>>> Read your codes again and found one issue: you set "checkpointLocation"
>>> in `readStream`. It should be set in `writeStream`. However, I still have
>>> no idea why use a temp checkpoint location will fail.
>>>
>>> On Thu, May 25, 2017 at 2:23 PM, kant kodali <kanth...@gmail.com> wrote:
>>>
>>>> I did the following
>>>>
>>>> *bin/hadoop fs -mkdir -p **/usr/local/hadoop/checkpoint* and did 
>>>> *bin/hadoop
>>>> fs -ls / *
>>>>
>>>> and I can actually see */tmp* and */usr* and inside of */usr *there is
>>>> indeed *local/hadoop/checkpoint. *
>>>>
>>>> So until here it looks fine.
>>>>
>>>> I also cleared everything */tmp/** as @Michael suggested using *bin/hadoop
>>>> fs -rmdir * such that when I do *bin/hadoop fs -ls /tmp *I don't see
>>>> anything.
>>>>
>>>> Now I ran my spark driver program using spark-submit it failed with the
>>>> following exception
>>>>
>>>> *File does not exist:
>>>> /tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2/1.delta*
>>>>
>>>> so I did *bin/hadoop fs -ls *
>>>> */tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2 *
>>>>
>>>> and I did not see anything there like *1.delta(there are just no
>>>> files)* however all these directories 
>>>> */tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2
>>>>  *do exist.
>>>>
>>>> For my checkPointLocation I had passed   */usr/local/hadoop/checkpoint
>>>> *and  *hdfs://<namenode:port>/usr/local/hadoop/checkpoint  *so far and
>>>> both didn't work for me. It is failing with the same error "*File does
>>>> not exist:
>>>> /tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2/1.delta*"
>>>>
>>>> so what can be the problem? any ideas?
>>>>
>>>> Thanks!
>>>>
>>>> On Thu, May 25, 2017 at 1:31 AM, kant kodali <kanth...@gmail.com>
>>>> wrote:
>>>>
>>>>> Executing this bin/hadoop fs -ls /usr/local/hadoop/checkpoint says
>>>>>
>>>>> ls: `/usr/local/hadoop/checkpoint': No such file or directory
>>>>>
>>>>> This is what I expected as well since I don't see any checkpoint
>>>>> directory under /usr/local/hadoop. Am I missing any configuration variable
>>>>> like HADOOP_CONF_DIR ? I am currently not setting that in
>>>>> conf/spark-env.sh and thats the only hadoop related environment variable I
>>>>> see. please let me know
>>>>>
>>>>> thanks!
>>>>>
>>>>>
>>>>>
>>>>> On Thu, May 25, 2017 at 1:19 AM, kant kodali <kanth...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Ryan,
>>>>>>
>>>>>> I did add that print statement and here is what I got.
>>>>>>
>>>>>> class org.apache.hadoop.hdfs.DistributedFileSystem
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> On Wed, May 24, 2017 at 11:39 PM, Shixiong(Ryan) Zhu <
>>>>>> shixi...@databricks.com> wrote:
>>>>>>
>>>>>>> I meant using HDFS command to check the directory. Such as
>>>>>>> "bin/hadoop fs -ls /usr/local/hadoop/checkpoint". My hunch is the 
>>>>>>> default
>>>>>>> file system in driver probably is the local file system. Could you add 
>>>>>>> the
>>>>>>> following line into your code to print the default file system?
>>>>>>>
>>>>>>> println(org.apache.hadoop.fs.FileSystem.get(sc.hadoopConfigu
>>>>>>> ration).getClass)
>>>>>>>
>>>>>>> On Wed, May 24, 2017 at 5:59 PM, kant kodali <kanth...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi All,
>>>>>>>>
>>>>>>>> I specified hdfsCheckPointDir = /usr/local/hadoop/checkpoint as
>>>>>>>> you can see below however I dont see checkpoint directory under my
>>>>>>>> hadoop_home=/usr/local/hadoop in either datanodes or namenodes
>>>>>>>> however in datanode machine there seems to be some data under
>>>>>>>>
>>>>>>>> /usr/local/hadoop/hdfs/namenode/current/BP-1469808024-X.X.X.
>>>>>>>> X-1495672725898/current/finalized/subdir0/subdir0
>>>>>>>>
>>>>>>>> I thought the checkpoint directory will be created by spark once I
>>>>>>>> specify the path but do I manually need to create checkpoint dir using
>>>>>>>> mkdir in all spark worker machines? I am new to HDFS as well so please 
>>>>>>>> let
>>>>>>>> me know. I can try sending df.explain("true") but I have 100 fields in 
>>>>>>>> my
>>>>>>>> schema so Project looks really big and if this is not a problem for you
>>>>>>>> guys I can send that as well.
>>>>>>>>
>>>>>>>>
>>>>>>>>   +- StreamingRelation 
>>>>>>>> DataSource(org.apache.spark.sql.SparkSession@21002393,kafka,List(),None,List(),None,Map(subscribe
>>>>>>>>  -> analytics2, failOnDataLoss -> false, kafka.bootstrap.servers -> 
>>>>>>>> X.X.X.X:9092 <http://172.31.41.30:9092>, checkpointLocation -> 
>>>>>>>> /usr/local/hadoop/checkpoint, startingOffsets -> earliest),None), 
>>>>>>>> kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, 
>>>>>>>> timestampType#6]
>>>>>>>>
>>>>>>>> *Here is the stack trace*
>>>>>>>>
>>>>>>>> StructField(OptionalContext4,StringType,true), 
>>>>>>>> StructField(OptionalContext5,StringType,true)),true), cast(value#1 as 
>>>>>>>> string)) AS payload#15]
>>>>>>>>             +- StreamingExecutionRelation 
>>>>>>>> KafkaSource[Subscribe[analytics2]], [key#0, value#1, topic#2, 
>>>>>>>> partition#3, offset#4L, timestamp#5, timestampType#6]
>>>>>>>>
>>>>>>>>     at 
>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:305)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:191)
>>>>>>>> Caused by: org.apache.spark.SparkException: Job aborted due to stage 
>>>>>>>> failure: Task 2 in stage 3.0 failed 4 times, most recent failure: Lost 
>>>>>>>> task 2.3 in stage 3.0 (TID 222, 172.31.25.189, executor 0): 
>>>>>>>> java.lang.IllegalStateException: Error reading delta file 
>>>>>>>> /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta 
>>>>>>>> of HDFSStateStoreProvider[id = (op=0, part=2), dir = 
>>>>>>>> /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2]: 
>>>>>>>> /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta 
>>>>>>>> does not exist
>>>>>>>>     at 
>>>>>>>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:365)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:317)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:314)
>>>>>>>>     at scala.Option.getOrElse(Option.scala:121)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:314)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:313)
>>>>>>>>     at scala.Option.getOrElse(Option.scala:121)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:313)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:220)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:186)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:61)
>>>>>>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>>>>>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>>>>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>>>>>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>>>>>>>>     at org.apache.spark.scheduler.Task.run(Task.scala:99)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
>>>>>>>>     at 
>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>>>     at 
>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>>>>> Caused by: java.io.FileNotFoundException: File does not exist: 
>>>>>>>> /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>>>>>>>>     at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>>>>>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>>>>>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>>>>>>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>>>>>>     at javax.security.auth.Subject.doAs(Subject.java:422)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>>>>>>>>     at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>>>>>>>>
>>>>>>>>     at sun.reflect.GeneratedConstructorAccessor13.newInstance(Unknown 
>>>>>>>> Source)
>>>>>>>>     at 
>>>>>>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>>>>>>     at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1228)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1213)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1201)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:306)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:272)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:264)
>>>>>>>>     at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1526)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312)
>>>>>>>>     at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:362)
>>>>>>>>     ... 21 more
>>>>>>>> Caused by: 
>>>>>>>> org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): 
>>>>>>>> File does not exist: 
>>>>>>>> /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>>>>>>>>     at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>>>>>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>>>>>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>>>>>>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>>>>>>     at javax.security.auth.Subject.doAs(Subject.java:422)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>>>>>>>>     at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>>>>>>>>
>>>>>>>>     at org.apache.hadoop.ipc.Client.call(Client.java:1475)
>>>>>>>>     at org.apache.hadoop.ipc.Client.call(Client.java:1412)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>>>>>>>>     at com.sun.proxy.$Proxy15.getBlockLocations(Unknown Source)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:255)
>>>>>>>>     at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source)
>>>>>>>>     at 
>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>>>>>>>>     at com.sun.proxy.$Proxy16.getBlockLocations(Unknown Source)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1226)
>>>>>>>>     ... 33 more
>>>>>>>>
>>>>>>>> Driver stacktrace:
>>>>>>>>     at 
>>>>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
>>>>>>>>     at 
>>>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>>>>>     at 
>>>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
>>>>>>>>     at scala.Option.foreach(Option.scala:257)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
>>>>>>>>     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
>>>>>>>>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
>>>>>>>>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
>>>>>>>>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
>>>>>>>>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:926)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>>>>>>>>     at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>>>>>>>>     at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.sql.execution.streaming.ForeachSink.addBatch(ForeachSink.scala:49)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply$mcV$sp(StreamExecution.scala:554)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:554)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:554)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:278)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:49)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:553)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcV$sp(StreamExecution.scala:273)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:262)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:262)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:278)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:49)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:262)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:257)
>>>>>>>>     ... 1 more
>>>>>>>> Caused by: java.lang.IllegalStateException: Error reading delta file 
>>>>>>>> /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta 
>>>>>>>> of HDFSStateStoreProvider[id = (op=0, part=2), dir = 
>>>>>>>> /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2]: 
>>>>>>>> /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta 
>>>>>>>> does not exist
>>>>>>>>     at 
>>>>>>>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:365)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:317)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:314)
>>>>>>>>     at scala.Option.getOrElse(Option.scala:121)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:314)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:313)
>>>>>>>>     at scala.Option.getOrElse(Option.scala:121)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:313)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:220)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:186)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:61)
>>>>>>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>>>>>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>>>>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>>>>>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>>>>>>>>     at org.apache.spark.scheduler.Task.run(Task.scala:99)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
>>>>>>>>     at 
>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>>>     at 
>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>>>>> Caused by: java.io.FileNotFoundException: File does not exist: 
>>>>>>>> /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>>>>>>>>     at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>>>>>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>>>>>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>>>>>>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>>>>>>     at javax.security.auth.Subject.doAs(Subject.java:422)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>>>>>>>>     at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>>>>>>>>
>>>>>>>>     at sun.reflect.GeneratedConstructorAccessor13.newInstance(Unknown 
>>>>>>>> Source)
>>>>>>>>     at 
>>>>>>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>>>>>>     at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1228)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1213)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1201)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:306)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:272)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:264)
>>>>>>>>     at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1526)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312)
>>>>>>>>     at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:362)
>>>>>>>>     ... 21 more
>>>>>>>> Caused by: 
>>>>>>>> org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): 
>>>>>>>> File does not exist: 
>>>>>>>> /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>>>>>>>>     at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>>>>>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>>>>>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>>>>>>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>>>>>>     at javax.security.auth.Subject.doAs(Subject.java:422)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>>>>>>>>     at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>>>>>>>>
>>>>>>>>     at org.apache.hadoop.ipc.Client.call(Client.java:1475)
>>>>>>>>     at org.apache.hadoop.ipc.Client.call(Client.java:1412)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>>>>>>>>     at com.sun.proxy.$Proxy15.getBlockLocations(Unknown Source)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:255)
>>>>>>>>     at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source)
>>>>>>>>     at 
>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>>>>>>>>     at com.sun.proxy.$Proxy16.getBlockLocations(Unknown Source)
>>>>>>>>     at 
>>>>>>>> org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1226)
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, May 24, 2017 at 4:29 PM, Shixiong(Ryan) Zhu <
>>>>>>>> shixi...@databricks.com> wrote:
>>>>>>>>
>>>>>>>>> What's the value of "hdfsCheckPointDir"? Could you list this
>>>>>>>>> directory on HDFS and report the files there?
>>>>>>>>>
>>>>>>>>> On Wed, May 24, 2017 at 3:50 PM, Michael Armbrust <
>>>>>>>>> mich...@databricks.com> wrote:
>>>>>>>>>
>>>>>>>>>> -dev
>>>>>>>>>>
>>>>>>>>>> Have you tried clearing out the checkpoint directory?  Can you
>>>>>>>>>> also give the full stack trace?
>>>>>>>>>>
>>>>>>>>>> On Wed, May 24, 2017 at 3:45 PM, kant kodali <kanth...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Even if I do simple count aggregation like below I get the same
>>>>>>>>>>> error as https://issues.apache.org/jira/browse/SPARK-19268
>>>>>>>>>>>
>>>>>>>>>>> Dataset<Row> df2 = 
>>>>>>>>>>> df1.groupBy(functions.window(df1.col("Timestamp5"), "24 hours", "24 
>>>>>>>>>>> hours"), df1.col("AppName")).count();
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, May 24, 2017 at 3:35 PM, kant kodali <kanth...@gmail.com
>>>>>>>>>>> > wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi All,
>>>>>>>>>>>>
>>>>>>>>>>>> I am using Spark 2.1.1 and running in a Standalone mode using
>>>>>>>>>>>> HDFS and Kafka
>>>>>>>>>>>>
>>>>>>>>>>>> I am running into the same problem as
>>>>>>>>>>>> https://issues.apache.org/jira/browse/SPARK-19268 with my
>>>>>>>>>>>> app(not KafkaWordCount).
>>>>>>>>>>>>
>>>>>>>>>>>> Here is my sample code
>>>>>>>>>>>>
>>>>>>>>>>>> *Here is how I create ReadStream*
>>>>>>>>>>>>
>>>>>>>>>>>> sparkSession.readStream()
>>>>>>>>>>>>                 .format("kafka")
>>>>>>>>>>>>                 .option("kafka.bootstrap.servers", 
>>>>>>>>>>>> config.getString("kafka.consumer.settings.bootstrapServers"))
>>>>>>>>>>>>                 .option("subscribe", 
>>>>>>>>>>>> config.getString("kafka.consumer.settings.topicName"))
>>>>>>>>>>>>                 .option("startingOffsets", "earliest")
>>>>>>>>>>>>                 .option("failOnDataLoss", "false")
>>>>>>>>>>>>                 .option("checkpointLocation", hdfsCheckPointDir)
>>>>>>>>>>>>                 .load();
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> *The core logic*
>>>>>>>>>>>>
>>>>>>>>>>>> Dataset<Row> df = ds.select(from_json(new 
>>>>>>>>>>>> Column("value").cast("string"), client.getSchema()).as("payload"));
>>>>>>>>>>>> Dataset<Row> df1 = df.selectExpr("payload.info.*", 
>>>>>>>>>>>> "payload.data.*");
>>>>>>>>>>>> Dataset<Row> df2 = df1.groupBy(window(df1.col("Timestamp5"), "24 
>>>>>>>>>>>> hours", "24 hours"), df1.col("AppName")).agg(sum("Amount"));
>>>>>>>>>>>> StreamingQuery query = df1.writeStream().foreach(new 
>>>>>>>>>>>> KafkaSink()).outputMode("update").start();
>>>>>>>>>>>> query.awaitTermination();
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> I can also provide any other information you may need.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to