Feel free to create a new ticket. Could you also provide the files in "/usr/local/hadoop/checkpoint/state/0" (Just run "bin/hadoop fs -ls /usr/local/hadoop/checkpoint/state/0/*") in the ticket and the Spark logs?
On Thu, May 25, 2017 at 2:53 PM, kant kodali <kanth...@gmail.com> wrote: > Should I file a ticket or should I try another version like Spark 2.2 > since I am currently using 2.1.1? > > On Thu, May 25, 2017 at 2:38 PM, kant kodali <kanth...@gmail.com> wrote: > >> Hi Ryan, >> >> You are right I was setting checkpointLocation for readStream. Now I did >> set if for writeStream as well like below >> >> StreamingQuery query = df2.writeStream().foreach(new KafkaSink()).option( >> "checkpointLocation","/usr/local/hadoop/checkpoint").outputMode("update" >> ).start(); >> >> query.awaitTermination(); >> >> *and now I can at very least see there are directories like* >> >> -rw-r--r-- 2 ubuntu supergroup 45 2017-05-25 21:29 >> /usr/local/hadoop/checkpoint/metadata >> drwxr-xr-x - ubuntu supergroup 0 2017-05-25 21:30 >> /usr/local/hadoop/checkpoint/offsets >> drwxr-xr-x - ubuntu supergroup 0 2017-05-25 21:29 >> /usr/local/hadoop/checkpoint/sources >> drwxr-xr-x - ubuntu supergroup 0 2017-05-25 21:30 >> /usr/local/hadoop/checkpoint/state >> >> >> However it still fails with >> >> *org.apache.hadoop.ipc.RemoteException(java.io >> <http://java.io>.FileNotFoundException): File does not exist: >> /usr/local/hadoop/checkpoint/state/0/1/1.delta* >> >> >> >> On Thu, May 25, 2017 at 2:31 PM, Shixiong(Ryan) Zhu < >> shixi...@databricks.com> wrote: >> >>> Read your codes again and found one issue: you set "checkpointLocation" >>> in `readStream`. It should be set in `writeStream`. However, I still have >>> no idea why use a temp checkpoint location will fail. >>> >>> On Thu, May 25, 2017 at 2:23 PM, kant kodali <kanth...@gmail.com> wrote: >>> >>>> I did the following >>>> >>>> *bin/hadoop fs -mkdir -p **/usr/local/hadoop/checkpoint* and did >>>> *bin/hadoop >>>> fs -ls / * >>>> >>>> and I can actually see */tmp* and */usr* and inside of */usr *there is >>>> indeed *local/hadoop/checkpoint. * >>>> >>>> So until here it looks fine. >>>> >>>> I also cleared everything */tmp/** as @Michael suggested using *bin/hadoop >>>> fs -rmdir * such that when I do *bin/hadoop fs -ls /tmp *I don't see >>>> anything. >>>> >>>> Now I ran my spark driver program using spark-submit it failed with the >>>> following exception >>>> >>>> *File does not exist: >>>> /tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2/1.delta* >>>> >>>> so I did *bin/hadoop fs -ls * >>>> */tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2 * >>>> >>>> and I did not see anything there like *1.delta(there are just no >>>> files)* however all these directories >>>> */tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2 >>>> *do exist. >>>> >>>> For my checkPointLocation I had passed */usr/local/hadoop/checkpoint >>>> *and *hdfs://<namenode:port>/usr/local/hadoop/checkpoint *so far and >>>> both didn't work for me. It is failing with the same error "*File does >>>> not exist: >>>> /tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2/1.delta*" >>>> >>>> so what can be the problem? any ideas? >>>> >>>> Thanks! >>>> >>>> On Thu, May 25, 2017 at 1:31 AM, kant kodali <kanth...@gmail.com> >>>> wrote: >>>> >>>>> Executing this bin/hadoop fs -ls /usr/local/hadoop/checkpoint says >>>>> >>>>> ls: `/usr/local/hadoop/checkpoint': No such file or directory >>>>> >>>>> This is what I expected as well since I don't see any checkpoint >>>>> directory under /usr/local/hadoop. Am I missing any configuration variable >>>>> like HADOOP_CONF_DIR ? I am currently not setting that in >>>>> conf/spark-env.sh and thats the only hadoop related environment variable I >>>>> see. please let me know >>>>> >>>>> thanks! >>>>> >>>>> >>>>> >>>>> On Thu, May 25, 2017 at 1:19 AM, kant kodali <kanth...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Ryan, >>>>>> >>>>>> I did add that print statement and here is what I got. >>>>>> >>>>>> class org.apache.hadoop.hdfs.DistributedFileSystem >>>>>> >>>>>> Thanks! >>>>>> >>>>>> On Wed, May 24, 2017 at 11:39 PM, Shixiong(Ryan) Zhu < >>>>>> shixi...@databricks.com> wrote: >>>>>> >>>>>>> I meant using HDFS command to check the directory. Such as >>>>>>> "bin/hadoop fs -ls /usr/local/hadoop/checkpoint". My hunch is the >>>>>>> default >>>>>>> file system in driver probably is the local file system. Could you add >>>>>>> the >>>>>>> following line into your code to print the default file system? >>>>>>> >>>>>>> println(org.apache.hadoop.fs.FileSystem.get(sc.hadoopConfigu >>>>>>> ration).getClass) >>>>>>> >>>>>>> On Wed, May 24, 2017 at 5:59 PM, kant kodali <kanth...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi All, >>>>>>>> >>>>>>>> I specified hdfsCheckPointDir = /usr/local/hadoop/checkpoint as >>>>>>>> you can see below however I dont see checkpoint directory under my >>>>>>>> hadoop_home=/usr/local/hadoop in either datanodes or namenodes >>>>>>>> however in datanode machine there seems to be some data under >>>>>>>> >>>>>>>> /usr/local/hadoop/hdfs/namenode/current/BP-1469808024-X.X.X. >>>>>>>> X-1495672725898/current/finalized/subdir0/subdir0 >>>>>>>> >>>>>>>> I thought the checkpoint directory will be created by spark once I >>>>>>>> specify the path but do I manually need to create checkpoint dir using >>>>>>>> mkdir in all spark worker machines? I am new to HDFS as well so please >>>>>>>> let >>>>>>>> me know. I can try sending df.explain("true") but I have 100 fields in >>>>>>>> my >>>>>>>> schema so Project looks really big and if this is not a problem for you >>>>>>>> guys I can send that as well. >>>>>>>> >>>>>>>> >>>>>>>> +- StreamingRelation >>>>>>>> DataSource(org.apache.spark.sql.SparkSession@21002393,kafka,List(),None,List(),None,Map(subscribe >>>>>>>> -> analytics2, failOnDataLoss -> false, kafka.bootstrap.servers -> >>>>>>>> X.X.X.X:9092 <http://172.31.41.30:9092>, checkpointLocation -> >>>>>>>> /usr/local/hadoop/checkpoint, startingOffsets -> earliest),None), >>>>>>>> kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, >>>>>>>> timestampType#6] >>>>>>>> >>>>>>>> *Here is the stack trace* >>>>>>>> >>>>>>>> StructField(OptionalContext4,StringType,true), >>>>>>>> StructField(OptionalContext5,StringType,true)),true), cast(value#1 as >>>>>>>> string)) AS payload#15] >>>>>>>> +- StreamingExecutionRelation >>>>>>>> KafkaSource[Subscribe[analytics2]], [key#0, value#1, topic#2, >>>>>>>> partition#3, offset#4L, timestamp#5, timestampType#6] >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:305) >>>>>>>> at >>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:191) >>>>>>>> Caused by: org.apache.spark.SparkException: Job aborted due to stage >>>>>>>> failure: Task 2 in stage 3.0 failed 4 times, most recent failure: Lost >>>>>>>> task 2.3 in stage 3.0 (TID 222, 172.31.25.189, executor 0): >>>>>>>> java.lang.IllegalStateException: Error reading delta file >>>>>>>> /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta >>>>>>>> of HDFSStateStoreProvider[id = (op=0, part=2), dir = >>>>>>>> /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2]: >>>>>>>> /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta >>>>>>>> does not exist >>>>>>>> at >>>>>>>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:365) >>>>>>>> at >>>>>>>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:317) >>>>>>>> at >>>>>>>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:314) >>>>>>>> at scala.Option.getOrElse(Option.scala:121) >>>>>>>> at >>>>>>>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:314) >>>>>>>> at >>>>>>>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:313) >>>>>>>> at scala.Option.getOrElse(Option.scala:121) >>>>>>>> at >>>>>>>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:313) >>>>>>>> at >>>>>>>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:220) >>>>>>>> at >>>>>>>> org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:186) >>>>>>>> at >>>>>>>> org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:61) >>>>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) >>>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) >>>>>>>> at >>>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>>>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) >>>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) >>>>>>>> at >>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) >>>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:99) >>>>>>>> at >>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) >>>>>>>> at >>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>>>>> at >>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>> Caused by: java.io.FileNotFoundException: File does not exist: >>>>>>>> /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) >>>>>>>> at >>>>>>>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616) >>>>>>>> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982) >>>>>>>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049) >>>>>>>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045) >>>>>>>> at java.security.AccessController.doPrivileged(Native Method) >>>>>>>> at javax.security.auth.Subject.doAs(Subject.java:422) >>>>>>>> at >>>>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) >>>>>>>> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043) >>>>>>>> >>>>>>>> at sun.reflect.GeneratedConstructorAccessor13.newInstance(Unknown >>>>>>>> Source) >>>>>>>> at >>>>>>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >>>>>>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423) >>>>>>>> at >>>>>>>> org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106) >>>>>>>> at >>>>>>>> org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1228) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1213) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1201) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:306) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:272) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:264) >>>>>>>> at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1526) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299) >>>>>>>> at >>>>>>>> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312) >>>>>>>> at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769) >>>>>>>> at >>>>>>>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:362) >>>>>>>> ... 21 more >>>>>>>> Caused by: >>>>>>>> org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): >>>>>>>> File does not exist: >>>>>>>> /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) >>>>>>>> at >>>>>>>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616) >>>>>>>> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982) >>>>>>>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049) >>>>>>>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045) >>>>>>>> at java.security.AccessController.doPrivileged(Native Method) >>>>>>>> at javax.security.auth.Subject.doAs(Subject.java:422) >>>>>>>> at >>>>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) >>>>>>>> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043) >>>>>>>> >>>>>>>> at org.apache.hadoop.ipc.Client.call(Client.java:1475) >>>>>>>> at org.apache.hadoop.ipc.Client.call(Client.java:1412) >>>>>>>> at >>>>>>>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229) >>>>>>>> at com.sun.proxy.$Proxy15.getBlockLocations(Unknown Source) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:255) >>>>>>>> at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source) >>>>>>>> at >>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>>>>> at >>>>>>>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) >>>>>>>> at >>>>>>>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) >>>>>>>> at com.sun.proxy.$Proxy16.getBlockLocations(Unknown Source) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1226) >>>>>>>> ... 33 more >>>>>>>> >>>>>>>> Driver stacktrace: >>>>>>>> at >>>>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) >>>>>>>> at >>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) >>>>>>>> at >>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) >>>>>>>> at >>>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>>>>>>> at >>>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) >>>>>>>> at >>>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) >>>>>>>> at >>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) >>>>>>>> at >>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) >>>>>>>> at scala.Option.foreach(Option.scala:257) >>>>>>>> at >>>>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) >>>>>>>> at >>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) >>>>>>>> at >>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) >>>>>>>> at >>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) >>>>>>>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) >>>>>>>> at >>>>>>>> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) >>>>>>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925) >>>>>>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938) >>>>>>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951) >>>>>>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965) >>>>>>>> at >>>>>>>> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:926) >>>>>>>> at >>>>>>>> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924) >>>>>>>> at >>>>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) >>>>>>>> at >>>>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) >>>>>>>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) >>>>>>>> at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924) >>>>>>>> at >>>>>>>> org.apache.spark.sql.execution.streaming.ForeachSink.addBatch(ForeachSink.scala:49) >>>>>>>> at >>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply$mcV$sp(StreamExecution.scala:554) >>>>>>>> at >>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:554) >>>>>>>> at >>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:554) >>>>>>>> at >>>>>>>> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:278) >>>>>>>> at >>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:49) >>>>>>>> at >>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:553) >>>>>>>> at >>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcV$sp(StreamExecution.scala:273) >>>>>>>> at >>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:262) >>>>>>>> at >>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:262) >>>>>>>> at >>>>>>>> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:278) >>>>>>>> at >>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:49) >>>>>>>> at >>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:262) >>>>>>>> at >>>>>>>> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43) >>>>>>>> at >>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:257) >>>>>>>> ... 1 more >>>>>>>> Caused by: java.lang.IllegalStateException: Error reading delta file >>>>>>>> /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta >>>>>>>> of HDFSStateStoreProvider[id = (op=0, part=2), dir = >>>>>>>> /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2]: >>>>>>>> /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta >>>>>>>> does not exist >>>>>>>> at >>>>>>>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:365) >>>>>>>> at >>>>>>>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:317) >>>>>>>> at >>>>>>>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:314) >>>>>>>> at scala.Option.getOrElse(Option.scala:121) >>>>>>>> at >>>>>>>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:314) >>>>>>>> at >>>>>>>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:313) >>>>>>>> at scala.Option.getOrElse(Option.scala:121) >>>>>>>> at >>>>>>>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:313) >>>>>>>> at >>>>>>>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:220) >>>>>>>> at >>>>>>>> org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:186) >>>>>>>> at >>>>>>>> org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:61) >>>>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) >>>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) >>>>>>>> at >>>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>>>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) >>>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) >>>>>>>> at >>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) >>>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:99) >>>>>>>> at >>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) >>>>>>>> at >>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>>>>> at >>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>> Caused by: java.io.FileNotFoundException: File does not exist: >>>>>>>> /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) >>>>>>>> at >>>>>>>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616) >>>>>>>> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982) >>>>>>>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049) >>>>>>>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045) >>>>>>>> at java.security.AccessController.doPrivileged(Native Method) >>>>>>>> at javax.security.auth.Subject.doAs(Subject.java:422) >>>>>>>> at >>>>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) >>>>>>>> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043) >>>>>>>> >>>>>>>> at sun.reflect.GeneratedConstructorAccessor13.newInstance(Unknown >>>>>>>> Source) >>>>>>>> at >>>>>>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >>>>>>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423) >>>>>>>> at >>>>>>>> org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106) >>>>>>>> at >>>>>>>> org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1228) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1213) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1201) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:306) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:272) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:264) >>>>>>>> at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1526) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299) >>>>>>>> at >>>>>>>> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312) >>>>>>>> at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769) >>>>>>>> at >>>>>>>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:362) >>>>>>>> ... 21 more >>>>>>>> Caused by: >>>>>>>> org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): >>>>>>>> File does not exist: >>>>>>>> /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) >>>>>>>> at >>>>>>>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616) >>>>>>>> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982) >>>>>>>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049) >>>>>>>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045) >>>>>>>> at java.security.AccessController.doPrivileged(Native Method) >>>>>>>> at javax.security.auth.Subject.doAs(Subject.java:422) >>>>>>>> at >>>>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) >>>>>>>> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043) >>>>>>>> >>>>>>>> at org.apache.hadoop.ipc.Client.call(Client.java:1475) >>>>>>>> at org.apache.hadoop.ipc.Client.call(Client.java:1412) >>>>>>>> at >>>>>>>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229) >>>>>>>> at com.sun.proxy.$Proxy15.getBlockLocations(Unknown Source) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:255) >>>>>>>> at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source) >>>>>>>> at >>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>>>>> at >>>>>>>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) >>>>>>>> at >>>>>>>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) >>>>>>>> at com.sun.proxy.$Proxy16.getBlockLocations(Unknown Source) >>>>>>>> at >>>>>>>> org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1226) >>>>>>>> >>>>>>>> >>>>>>>> On Wed, May 24, 2017 at 4:29 PM, Shixiong(Ryan) Zhu < >>>>>>>> shixi...@databricks.com> wrote: >>>>>>>> >>>>>>>>> What's the value of "hdfsCheckPointDir"? Could you list this >>>>>>>>> directory on HDFS and report the files there? >>>>>>>>> >>>>>>>>> On Wed, May 24, 2017 at 3:50 PM, Michael Armbrust < >>>>>>>>> mich...@databricks.com> wrote: >>>>>>>>> >>>>>>>>>> -dev >>>>>>>>>> >>>>>>>>>> Have you tried clearing out the checkpoint directory? Can you >>>>>>>>>> also give the full stack trace? >>>>>>>>>> >>>>>>>>>> On Wed, May 24, 2017 at 3:45 PM, kant kodali <kanth...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Even if I do simple count aggregation like below I get the same >>>>>>>>>>> error as https://issues.apache.org/jira/browse/SPARK-19268 >>>>>>>>>>> >>>>>>>>>>> Dataset<Row> df2 = >>>>>>>>>>> df1.groupBy(functions.window(df1.col("Timestamp5"), "24 hours", "24 >>>>>>>>>>> hours"), df1.col("AppName")).count(); >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Wed, May 24, 2017 at 3:35 PM, kant kodali <kanth...@gmail.com >>>>>>>>>>> > wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi All, >>>>>>>>>>>> >>>>>>>>>>>> I am using Spark 2.1.1 and running in a Standalone mode using >>>>>>>>>>>> HDFS and Kafka >>>>>>>>>>>> >>>>>>>>>>>> I am running into the same problem as >>>>>>>>>>>> https://issues.apache.org/jira/browse/SPARK-19268 with my >>>>>>>>>>>> app(not KafkaWordCount). >>>>>>>>>>>> >>>>>>>>>>>> Here is my sample code >>>>>>>>>>>> >>>>>>>>>>>> *Here is how I create ReadStream* >>>>>>>>>>>> >>>>>>>>>>>> sparkSession.readStream() >>>>>>>>>>>> .format("kafka") >>>>>>>>>>>> .option("kafka.bootstrap.servers", >>>>>>>>>>>> config.getString("kafka.consumer.settings.bootstrapServers")) >>>>>>>>>>>> .option("subscribe", >>>>>>>>>>>> config.getString("kafka.consumer.settings.topicName")) >>>>>>>>>>>> .option("startingOffsets", "earliest") >>>>>>>>>>>> .option("failOnDataLoss", "false") >>>>>>>>>>>> .option("checkpointLocation", hdfsCheckPointDir) >>>>>>>>>>>> .load(); >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> *The core logic* >>>>>>>>>>>> >>>>>>>>>>>> Dataset<Row> df = ds.select(from_json(new >>>>>>>>>>>> Column("value").cast("string"), client.getSchema()).as("payload")); >>>>>>>>>>>> Dataset<Row> df1 = df.selectExpr("payload.info.*", >>>>>>>>>>>> "payload.data.*"); >>>>>>>>>>>> Dataset<Row> df2 = df1.groupBy(window(df1.col("Timestamp5"), "24 >>>>>>>>>>>> hours", "24 hours"), df1.col("AppName")).agg(sum("Amount")); >>>>>>>>>>>> StreamingQuery query = df1.writeStream().foreach(new >>>>>>>>>>>> KafkaSink()).outputMode("update").start(); >>>>>>>>>>>> query.awaitTermination(); >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> I can also provide any other information you may need. >>>>>>>>>>>> >>>>>>>>>>>> Thanks! >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >