I meant using HDFS command to check the directory. Such as "bin/hadoop fs -ls /usr/local/hadoop/checkpoint". My hunch is the default file system in driver probably is the local file system. Could you add the following line into your code to print the default file system?
println(org.apache.hadoop.fs.FileSystem.get(sc.hadoopConfiguration).getClass) On Wed, May 24, 2017 at 5:59 PM, kant kodali <kanth...@gmail.com> wrote: > Hi All, > > I specified hdfsCheckPointDir = /usr/local/hadoop/checkpoint as you can > see below however I dont see checkpoint directory under my hadoop_home= > /usr/local/hadoop in either datanodes or namenodes however in datanode > machine there seems to be some data under > > /usr/local/hadoop/hdfs/namenode/current/BP-1469808024-X.X.X.X- > 1495672725898/current/finalized/subdir0/subdir0 > > I thought the checkpoint directory will be created by spark once I specify > the path but do I manually need to create checkpoint dir using mkdir in all > spark worker machines? I am new to HDFS as well so please let me know. I > can try sending df.explain("true") but I have 100 fields in my schema so > Project looks really big and if this is not a problem for you guys I can > send that as well. > > > +- StreamingRelation > DataSource(org.apache.spark.sql.SparkSession@21002393,kafka,List(),None,List(),None,Map(subscribe > -> analytics2, failOnDataLoss -> false, kafka.bootstrap.servers -> > X.X.X.X:9092 <http://172.31.41.30:9092>, checkpointLocation -> > /usr/local/hadoop/checkpoint, startingOffsets -> earliest),None), kafka, > [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, > timestampType#6] > > *Here is the stack trace* > > StructField(OptionalContext4,StringType,true), > StructField(OptionalContext5,StringType,true)),true), cast(value#1 as > string)) AS payload#15] > +- StreamingExecutionRelation KafkaSource[Subscribe[analytics2]], > [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, > timestampType#6] > > at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:305) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:191) > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 2 in stage 3.0 failed 4 times, most recent failure: Lost task 2.3 in > stage 3.0 (TID 222, 172.31.25.189, executor 0): > java.lang.IllegalStateException: Error reading delta file > /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta of > HDFSStateStoreProvider[id = (op=0, part=2), dir = > /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2]: > /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta does > not exist > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:365) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:317) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:314) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:314) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:313) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:313) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:220) > at > org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:186) > at > org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:61) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.FileNotFoundException: File does not exist: > /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta > at > org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71) > at > org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043) > > at sun.reflect.GeneratedConstructorAccessor13.newInstance(Unknown Source) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106) > at > org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73) > at > org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1228) > at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1213) > at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1201) > at > org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:306) > at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:272) > at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:264) > at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1526) > at > org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304) > at > org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312) > at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:362) > ... 21 more > Caused by: > org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File > does not exist: > /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta > at > org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71) > at > org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043) > > at org.apache.hadoop.ipc.Client.call(Client.java:1475) > at org.apache.hadoop.ipc.Client.call(Client.java:1412) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229) > at com.sun.proxy.$Proxy15.getBlockLocations(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:255) > at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy16.getBlockLocations(Unknown Source) > at > org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1226) > ... 33 more > > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) > at scala.Option.foreach(Option.scala:257) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965) > at > org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:926) > at > org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) > at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924) > at > org.apache.spark.sql.execution.streaming.ForeachSink.addBatch(ForeachSink.scala:49) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply$mcV$sp(StreamExecution.scala:554) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:554) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:554) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:278) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:49) > at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:553) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcV$sp(StreamExecution.scala:273) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:262) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:262) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:278) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:49) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:262) > at > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43) > at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:257) > ... 1 more > Caused by: java.lang.IllegalStateException: Error reading delta file > /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta of > HDFSStateStoreProvider[id = (op=0, part=2), dir = > /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2]: > /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta does > not exist > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:365) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:317) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:314) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:314) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:313) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:313) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:220) > at > org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:186) > at > org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:61) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.FileNotFoundException: File does not exist: > /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta > at > org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71) > at > org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043) > > at sun.reflect.GeneratedConstructorAccessor13.newInstance(Unknown Source) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106) > at > org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73) > at > org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1228) > at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1213) > at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1201) > at > org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:306) > at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:272) > at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:264) > at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1526) > at > org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304) > at > org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312) > at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:362) > ... 21 more > Caused by: > org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File > does not exist: > /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta > at > org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71) > at > org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043) > > at org.apache.hadoop.ipc.Client.call(Client.java:1475) > at org.apache.hadoop.ipc.Client.call(Client.java:1412) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229) > at com.sun.proxy.$Proxy15.getBlockLocations(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:255) > at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy16.getBlockLocations(Unknown Source) > at > org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1226) > > > On Wed, May 24, 2017 at 4:29 PM, Shixiong(Ryan) Zhu < > shixi...@databricks.com> wrote: > >> What's the value of "hdfsCheckPointDir"? Could you list this directory on >> HDFS and report the files there? >> >> On Wed, May 24, 2017 at 3:50 PM, Michael Armbrust <mich...@databricks.com >> > wrote: >> >>> -dev >>> >>> Have you tried clearing out the checkpoint directory? Can you also give >>> the full stack trace? >>> >>> On Wed, May 24, 2017 at 3:45 PM, kant kodali <kanth...@gmail.com> wrote: >>> >>>> Even if I do simple count aggregation like below I get the same error >>>> as https://issues.apache.org/jira/browse/SPARK-19268 >>>> >>>> Dataset<Row> df2 = df1.groupBy(functions.window(df1.col("Timestamp5"), "24 >>>> hours", "24 hours"), df1.col("AppName")).count(); >>>> >>>> >>>> On Wed, May 24, 2017 at 3:35 PM, kant kodali <kanth...@gmail.com> >>>> wrote: >>>> >>>>> Hi All, >>>>> >>>>> I am using Spark 2.1.1 and running in a Standalone mode using HDFS and >>>>> Kafka >>>>> >>>>> I am running into the same problem as https://issues.apache.org/jira >>>>> /browse/SPARK-19268 with my app(not KafkaWordCount). >>>>> >>>>> Here is my sample code >>>>> >>>>> *Here is how I create ReadStream* >>>>> >>>>> sparkSession.readStream() >>>>> .format("kafka") >>>>> .option("kafka.bootstrap.servers", >>>>> config.getString("kafka.consumer.settings.bootstrapServers")) >>>>> .option("subscribe", >>>>> config.getString("kafka.consumer.settings.topicName")) >>>>> .option("startingOffsets", "earliest") >>>>> .option("failOnDataLoss", "false") >>>>> .option("checkpointLocation", hdfsCheckPointDir) >>>>> .load(); >>>>> >>>>> >>>>> *The core logic* >>>>> >>>>> Dataset<Row> df = ds.select(from_json(new Column("value").cast("string"), >>>>> client.getSchema()).as("payload")); >>>>> Dataset<Row> df1 = df.selectExpr("payload.info.*", "payload.data.*"); >>>>> Dataset<Row> df2 = df1.groupBy(window(df1.col("Timestamp5"), "24 hours", >>>>> "24 hours"), df1.col("AppName")).agg(sum("Amount")); >>>>> StreamingQuery query = df1.writeStream().foreach(new >>>>> KafkaSink()).outputMode("update").start(); >>>>> query.awaitTermination(); >>>>> >>>>> >>>>> I can also provide any other information you may need. >>>>> >>>>> Thanks! >>>>> >>>> >>>> >>> >> >