Re: Running into the same problem as JIRA SPARK-19268

2017-05-26 Thread kant kodali
https://issues.apache.org/jira/browse/SPARK-20894

On Thu, May 25, 2017 at 4:31 PM, Shixiong(Ryan) Zhu  wrote:

> I don't know what happened in your case so cannot provide any work around.
> It would be great if you can provide logs output
> by HDFSBackedStateStoreProvider.
>
> On Thu, May 25, 2017 at 4:05 PM, kant kodali  wrote:
>
>>
>> On Thu, May 25, 2017 at 3:41 PM, Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>>> bin/hadoop fs -ls /usr/local/hadoop/checkpoint/state/0/*
>>>
>>
>> Hi,
>>
>> There are no files under bin/hadoop fs -ls 
>> /usr/local/hadoop/checkpoint/state/0/*
>> but all the directories until /usr/local/hadoop/checkpoint/state/0 does
>> exist(which are created by spark).
>>
>> yes I can attach the log but pretty much it looks like same as I sent on
>> this thread.
>>
>> Is there any work around to this for now? Will create a ticket shortly.
>>
>> Thanks!
>>
>
>


Re: Running into the same problem as JIRA SPARK-19268

2017-05-25 Thread Shixiong(Ryan) Zhu
I don't know what happened in your case so cannot provide any work around.
It would be great if you can provide logs output by
HDFSBackedStateStoreProvider.

On Thu, May 25, 2017 at 4:05 PM, kant kodali  wrote:

>
> On Thu, May 25, 2017 at 3:41 PM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> bin/hadoop fs -ls /usr/local/hadoop/checkpoint/state/0/*
>>
>
> Hi,
>
> There are no files under bin/hadoop fs -ls 
> /usr/local/hadoop/checkpoint/state/0/*
> but all the directories until /usr/local/hadoop/checkpoint/state/0 does
> exist(which are created by spark).
>
> yes I can attach the log but pretty much it looks like same as I sent on
> this thread.
>
> Is there any work around to this for now? Will create a ticket shortly.
>
> Thanks!
>


Re: Running into the same problem as JIRA SPARK-19268

2017-05-25 Thread kant kodali
On Thu, May 25, 2017 at 3:41 PM, Shixiong(Ryan) Zhu  wrote:

> bin/hadoop fs -ls /usr/local/hadoop/checkpoint/state/0/*
>

Hi,

There are no files under bin/hadoop fs -ls
/usr/local/hadoop/checkpoint/state/0/*
but all the directories until /usr/local/hadoop/checkpoint/state/0 does
exist(which are created by spark).

yes I can attach the log but pretty much it looks like same as I sent on
this thread.

Is there any work around to this for now? Will create a ticket shortly.

Thanks!


Re: Running into the same problem as JIRA SPARK-19268

2017-05-25 Thread Shixiong(Ryan) Zhu
Feel free to create a new ticket. Could you also provide the files in
"/usr/local/hadoop/checkpoint/state/0" (Just run "bin/hadoop fs -ls
/usr/local/hadoop/checkpoint/state/0/*") in the ticket and the Spark logs?

On Thu, May 25, 2017 at 2:53 PM, kant kodali  wrote:

> Should I file a ticket or should I try another version like Spark 2.2
> since I am currently using 2.1.1?
>
> On Thu, May 25, 2017 at 2:38 PM, kant kodali  wrote:
>
>> Hi Ryan,
>>
>> You are right I was setting checkpointLocation for readStream. Now I did
>> set if for writeStream as well  like below
>>
>> StreamingQuery query = df2.writeStream().foreach(new KafkaSink()).option(
>> "checkpointLocation","/usr/local/hadoop/checkpoint").outputMode("update"
>> ).start();
>>
>> query.awaitTermination();
>>
>> *and now I can at very least see there are directories like*
>>
>> -rw-r--r--   2 ubuntu supergroup 45 2017-05-25 21:29 
>> /usr/local/hadoop/checkpoint/metadata
>> drwxr-xr-x   - ubuntu supergroup  0 2017-05-25 21:30 
>> /usr/local/hadoop/checkpoint/offsets
>> drwxr-xr-x   - ubuntu supergroup  0 2017-05-25 21:29 
>> /usr/local/hadoop/checkpoint/sources
>> drwxr-xr-x   - ubuntu supergroup  0 2017-05-25 21:30 
>> /usr/local/hadoop/checkpoint/state
>>
>>
>> However it still fails with
>>
>> *org.apache.hadoop.ipc.RemoteException(java.io 
>> .FileNotFoundException): File does not exist: 
>> /usr/local/hadoop/checkpoint/state/0/1/1.delta*
>>
>>
>>
>> On Thu, May 25, 2017 at 2:31 PM, Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>>> Read your codes again and found one issue: you set "checkpointLocation"
>>> in `readStream`. It should be set in `writeStream`. However, I still have
>>> no idea why use a temp checkpoint location will fail.
>>>
>>> On Thu, May 25, 2017 at 2:23 PM, kant kodali  wrote:
>>>
 I did the following

 *bin/hadoop fs -mkdir -p **/usr/local/hadoop/checkpoint* and did 
 *bin/hadoop
 fs -ls / *

 and I can actually see */tmp* and */usr* and inside of */usr *there is
 indeed *local/hadoop/checkpoint. *

 So until here it looks fine.

 I also cleared everything */tmp/** as @Michael suggested using *bin/hadoop
 fs -rmdir * such that when I do *bin/hadoop fs -ls /tmp *I don't see
 anything.

 Now I ran my spark driver program using spark-submit it failed with the
 following exception

 *File does not exist:
 /tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2/1.delta*

 so I did *bin/hadoop fs -ls *
 */tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2 *

 and I did not see anything there like *1.delta(there are just no
 files)* however all these directories 
 */tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2
  *do exist.

 For my checkPointLocation I had passed   */usr/local/hadoop/checkpoint
 *and  *hdfs:///usr/local/hadoop/checkpoint  *so far and
 both didn't work for me. It is failing with the same error "*File does
 not exist:
 /tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2/1.delta*"

 so what can be the problem? any ideas?

 Thanks!

 On Thu, May 25, 2017 at 1:31 AM, kant kodali 
 wrote:

> Executing this bin/hadoop fs -ls /usr/local/hadoop/checkpoint says
>
> ls: `/usr/local/hadoop/checkpoint': No such file or directory
>
> This is what I expected as well since I don't see any checkpoint
> directory under /usr/local/hadoop. Am I missing any configuration variable
> like HADOOP_CONF_DIR ? I am currently not setting that in
> conf/spark-env.sh and thats the only hadoop related environment variable I
> see. please let me know
>
> thanks!
>
>
>
> On Thu, May 25, 2017 at 1:19 AM, kant kodali 
> wrote:
>
>> Hi Ryan,
>>
>> I did add that print statement and here is what I got.
>>
>> class org.apache.hadoop.hdfs.DistributedFileSystem
>>
>> Thanks!
>>
>> On Wed, May 24, 2017 at 11:39 PM, Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>>> I meant using HDFS command to check the directory. Such as
>>> "bin/hadoop fs -ls /usr/local/hadoop/checkpoint". My hunch is the 
>>> default
>>> file system in driver probably is the local file system. Could you add 
>>> the
>>> following line into your code to print the default file system?
>>>
>>> println(org.apache.hadoop.fs.FileSystem.get(sc.hadoopConfigu
>>> ration).getClass)
>>>
>>> On Wed, May 24, 2017 at 5:59 PM, kant kodali 
>>> wrote:
>>>
 Hi All,

 I specified hdfsCheckPointDir = /usr/local/hadoop/checkpoint as
 you can see below however I dont see checkpoint directory under my
 

Re: Running into the same problem as JIRA SPARK-19268

2017-05-25 Thread kant kodali
Should I file a ticket or should I try another version like Spark 2.2 since
I am currently using 2.1.1?

On Thu, May 25, 2017 at 2:38 PM, kant kodali  wrote:

> Hi Ryan,
>
> You are right I was setting checkpointLocation for readStream. Now I did
> set if for writeStream as well  like below
>
> StreamingQuery query = df2.writeStream().foreach(new KafkaSink()).option("
> checkpointLocation","/usr/local/hadoop/checkpoint").outputMode("update"
> ).start();
>
> query.awaitTermination();
>
> *and now I can at very least see there are directories like*
>
> -rw-r--r--   2 ubuntu supergroup 45 2017-05-25 21:29 
> /usr/local/hadoop/checkpoint/metadata
> drwxr-xr-x   - ubuntu supergroup  0 2017-05-25 21:30 
> /usr/local/hadoop/checkpoint/offsets
> drwxr-xr-x   - ubuntu supergroup  0 2017-05-25 21:29 
> /usr/local/hadoop/checkpoint/sources
> drwxr-xr-x   - ubuntu supergroup  0 2017-05-25 21:30 
> /usr/local/hadoop/checkpoint/state
>
>
> However it still fails with
>
> *org.apache.hadoop.ipc.RemoteException(java.io 
> .FileNotFoundException): File does not exist: 
> /usr/local/hadoop/checkpoint/state/0/1/1.delta*
>
>
>
> On Thu, May 25, 2017 at 2:31 PM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Read your codes again and found one issue: you set "checkpointLocation"
>> in `readStream`. It should be set in `writeStream`. However, I still have
>> no idea why use a temp checkpoint location will fail.
>>
>> On Thu, May 25, 2017 at 2:23 PM, kant kodali  wrote:
>>
>>> I did the following
>>>
>>> *bin/hadoop fs -mkdir -p **/usr/local/hadoop/checkpoint* and did *bin/hadoop
>>> fs -ls / *
>>>
>>> and I can actually see */tmp* and */usr* and inside of */usr *there is
>>> indeed *local/hadoop/checkpoint. *
>>>
>>> So until here it looks fine.
>>>
>>> I also cleared everything */tmp/** as @Michael suggested using *bin/hadoop
>>> fs -rmdir * such that when I do *bin/hadoop fs -ls /tmp *I don't see
>>> anything.
>>>
>>> Now I ran my spark driver program using spark-submit it failed with the
>>> following exception
>>>
>>> *File does not exist:
>>> /tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2/1.delta*
>>>
>>> so I did *bin/hadoop fs -ls *
>>> */tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2 *
>>>
>>> and I did not see anything there like *1.delta(there are just no files)* 
>>> however
>>> all these directories 
>>> */tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2
>>>  *do exist.
>>>
>>> For my checkPointLocation I had passed   */usr/local/hadoop/checkpoint *
>>> and  *hdfs:///usr/local/hadoop/checkpoint  *so far and
>>> both didn't work for me. It is failing with the same error "*File does
>>> not exist:
>>> /tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2/1.delta*"
>>>
>>> so what can be the problem? any ideas?
>>>
>>> Thanks!
>>>
>>> On Thu, May 25, 2017 at 1:31 AM, kant kodali  wrote:
>>>
 Executing this bin/hadoop fs -ls /usr/local/hadoop/checkpoint says

 ls: `/usr/local/hadoop/checkpoint': No such file or directory

 This is what I expected as well since I don't see any checkpoint
 directory under /usr/local/hadoop. Am I missing any configuration variable
 like HADOOP_CONF_DIR ? I am currently not setting that in
 conf/spark-env.sh and thats the only hadoop related environment variable I
 see. please let me know

 thanks!



 On Thu, May 25, 2017 at 1:19 AM, kant kodali 
 wrote:

> Hi Ryan,
>
> I did add that print statement and here is what I got.
>
> class org.apache.hadoop.hdfs.DistributedFileSystem
>
> Thanks!
>
> On Wed, May 24, 2017 at 11:39 PM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> I meant using HDFS command to check the directory. Such as
>> "bin/hadoop fs -ls /usr/local/hadoop/checkpoint". My hunch is the default
>> file system in driver probably is the local file system. Could you add 
>> the
>> following line into your code to print the default file system?
>>
>> println(org.apache.hadoop.fs.FileSystem.get(sc.hadoopConfigu
>> ration).getClass)
>>
>> On Wed, May 24, 2017 at 5:59 PM, kant kodali 
>> wrote:
>>
>>> Hi All,
>>>
>>> I specified hdfsCheckPointDir = /usr/local/hadoop/checkpoint as you
>>> can see below however I dont see checkpoint directory under my 
>>> hadoop_home=
>>> /usr/local/hadoop in either datanodes or namenodes however in
>>> datanode machine there seems to be some data under
>>>
>>> /usr/local/hadoop/hdfs/namenode/current/BP-1469808024-X.X.X.
>>> X-1495672725898/current/finalized/subdir0/subdir0
>>>
>>> I thought the checkpoint directory will be created by spark once I
>>> specify the path but do I manually need to create checkpoint dir using

Re: Running into the same problem as JIRA SPARK-19268

2017-05-25 Thread kant kodali
Hi Ryan,

You are right I was setting checkpointLocation for readStream. Now I did
set if for writeStream as well  like below

StreamingQuery query = df2.writeStream().foreach(new KafkaSink()).option(
"checkpointLocation","/usr/local/hadoop/checkpoint").outputMode("update"
).start();

query.awaitTermination();

*and now I can at very least see there are directories like*

-rw-r--r--   2 ubuntu supergroup 45 2017-05-25 21:29
/usr/local/hadoop/checkpoint/metadata
drwxr-xr-x   - ubuntu supergroup  0 2017-05-25 21:30
/usr/local/hadoop/checkpoint/offsets
drwxr-xr-x   - ubuntu supergroup  0 2017-05-25 21:29
/usr/local/hadoop/checkpoint/sources
drwxr-xr-x   - ubuntu supergroup  0 2017-05-25 21:30
/usr/local/hadoop/checkpoint/state


However it still fails with

*org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException):
File does not exist: /usr/local/hadoop/checkpoint/state/0/1/1.delta*



On Thu, May 25, 2017 at 2:31 PM, Shixiong(Ryan) Zhu  wrote:

> Read your codes again and found one issue: you set "checkpointLocation" in
> `readStream`. It should be set in `writeStream`. However, I still have no
> idea why use a temp checkpoint location will fail.
>
> On Thu, May 25, 2017 at 2:23 PM, kant kodali  wrote:
>
>> I did the following
>>
>> *bin/hadoop fs -mkdir -p **/usr/local/hadoop/checkpoint* and did *bin/hadoop
>> fs -ls / *
>>
>> and I can actually see */tmp* and */usr* and inside of */usr *there is
>> indeed *local/hadoop/checkpoint. *
>>
>> So until here it looks fine.
>>
>> I also cleared everything */tmp/** as @Michael suggested using *bin/hadoop
>> fs -rmdir * such that when I do *bin/hadoop fs -ls /tmp *I don't see
>> anything.
>>
>> Now I ran my spark driver program using spark-submit it failed with the
>> following exception
>>
>> *File does not exist:
>> /tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2/1.delta*
>>
>> so I did *bin/hadoop fs -ls *
>> */tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2 *
>>
>> and I did not see anything there like *1.delta(there are just no files)* 
>> however
>> all these directories 
>> */tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2
>>  *do exist.
>>
>> For my checkPointLocation I had passed   */usr/local/hadoop/checkpoint *
>> and  *hdfs:///usr/local/hadoop/checkpoint  *so far and
>> both didn't work for me. It is failing with the same error "*File does
>> not exist:
>> /tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2/1.delta*"
>>
>> so what can be the problem? any ideas?
>>
>> Thanks!
>>
>> On Thu, May 25, 2017 at 1:31 AM, kant kodali  wrote:
>>
>>> Executing this bin/hadoop fs -ls /usr/local/hadoop/checkpoint says
>>>
>>> ls: `/usr/local/hadoop/checkpoint': No such file or directory
>>>
>>> This is what I expected as well since I don't see any checkpoint
>>> directory under /usr/local/hadoop. Am I missing any configuration variable
>>> like HADOOP_CONF_DIR ? I am currently not setting that in
>>> conf/spark-env.sh and thats the only hadoop related environment variable I
>>> see. please let me know
>>>
>>> thanks!
>>>
>>>
>>>
>>> On Thu, May 25, 2017 at 1:19 AM, kant kodali  wrote:
>>>
 Hi Ryan,

 I did add that print statement and here is what I got.

 class org.apache.hadoop.hdfs.DistributedFileSystem

 Thanks!

 On Wed, May 24, 2017 at 11:39 PM, Shixiong(Ryan) Zhu <
 shixi...@databricks.com> wrote:

> I meant using HDFS command to check the directory. Such as "bin/hadoop
> fs -ls /usr/local/hadoop/checkpoint". My hunch is the default file system
> in driver probably is the local file system. Could you add the following
> line into your code to print the default file system?
>
> println(org.apache.hadoop.fs.FileSystem.get(sc.hadoopConfigu
> ration).getClass)
>
> On Wed, May 24, 2017 at 5:59 PM, kant kodali 
> wrote:
>
>> Hi All,
>>
>> I specified hdfsCheckPointDir = /usr/local/hadoop/checkpoint as you
>> can see below however I dont see checkpoint directory under my 
>> hadoop_home=
>> /usr/local/hadoop in either datanodes or namenodes however in
>> datanode machine there seems to be some data under
>>
>> /usr/local/hadoop/hdfs/namenode/current/BP-1469808024-X.X.X.
>> X-1495672725898/current/finalized/subdir0/subdir0
>>
>> I thought the checkpoint directory will be created by spark once I
>> specify the path but do I manually need to create checkpoint dir using
>> mkdir in all spark worker machines? I am new to HDFS as well so please 
>> let
>> me know. I can try sending df.explain("true") but I have 100 fields in my
>> schema so Project looks really big and if this is not a problem for you
>> guys I can send that as well.
>>
>>
>>   +- StreamingRelation 
>> 

Re: Running into the same problem as JIRA SPARK-19268

2017-05-25 Thread Shixiong(Ryan) Zhu
Read your codes again and found one issue: you set "checkpointLocation" in
`readStream`. It should be set in `writeStream`. However, I still have no
idea why use a temp checkpoint location will fail.

On Thu, May 25, 2017 at 2:23 PM, kant kodali  wrote:

> I did the following
>
> *bin/hadoop fs -mkdir -p **/usr/local/hadoop/checkpoint* and did *bin/hadoop
> fs -ls / *
>
> and I can actually see */tmp* and */usr* and inside of */usr *there is
> indeed *local/hadoop/checkpoint. *
>
> So until here it looks fine.
>
> I also cleared everything */tmp/** as @Michael suggested using *bin/hadoop
> fs -rmdir * such that when I do *bin/hadoop fs -ls /tmp *I don't see
> anything.
>
> Now I ran my spark driver program using spark-submit it failed with the
> following exception
>
> *File does not exist:
> /tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2/1.delta*
>
> so I did *bin/hadoop fs -ls *
> */tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2 *
>
> and I did not see anything there like *1.delta(there are just no files)* 
> however
> all these directories 
> */tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2
>  *do exist.
>
> For my checkPointLocation I had passed   */usr/local/hadoop/checkpoint *
> and  *hdfs:///usr/local/hadoop/checkpoint  *so far and
> both didn't work for me. It is failing with the same error "*File does
> not exist:
> /tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2/1.delta*"
>
> so what can be the problem? any ideas?
>
> Thanks!
>
> On Thu, May 25, 2017 at 1:31 AM, kant kodali  wrote:
>
>> Executing this bin/hadoop fs -ls /usr/local/hadoop/checkpoint says
>>
>> ls: `/usr/local/hadoop/checkpoint': No such file or directory
>>
>> This is what I expected as well since I don't see any checkpoint
>> directory under /usr/local/hadoop. Am I missing any configuration variable
>> like HADOOP_CONF_DIR ? I am currently not setting that in
>> conf/spark-env.sh and thats the only hadoop related environment variable I
>> see. please let me know
>>
>> thanks!
>>
>>
>>
>> On Thu, May 25, 2017 at 1:19 AM, kant kodali  wrote:
>>
>>> Hi Ryan,
>>>
>>> I did add that print statement and here is what I got.
>>>
>>> class org.apache.hadoop.hdfs.DistributedFileSystem
>>>
>>> Thanks!
>>>
>>> On Wed, May 24, 2017 at 11:39 PM, Shixiong(Ryan) Zhu <
>>> shixi...@databricks.com> wrote:
>>>
 I meant using HDFS command to check the directory. Such as "bin/hadoop
 fs -ls /usr/local/hadoop/checkpoint". My hunch is the default file system
 in driver probably is the local file system. Could you add the following
 line into your code to print the default file system?

 println(org.apache.hadoop.fs.FileSystem.get(sc.hadoopConfigu
 ration).getClass)

 On Wed, May 24, 2017 at 5:59 PM, kant kodali 
 wrote:

> Hi All,
>
> I specified hdfsCheckPointDir = /usr/local/hadoop/checkpoint as you
> can see below however I dont see checkpoint directory under my 
> hadoop_home=
> /usr/local/hadoop in either datanodes or namenodes however in
> datanode machine there seems to be some data under
>
> /usr/local/hadoop/hdfs/namenode/current/BP-1469808024-X.X.X.
> X-1495672725898/current/finalized/subdir0/subdir0
>
> I thought the checkpoint directory will be created by spark once I
> specify the path but do I manually need to create checkpoint dir using
> mkdir in all spark worker machines? I am new to HDFS as well so please let
> me know. I can try sending df.explain("true") but I have 100 fields in my
> schema so Project looks really big and if this is not a problem for you
> guys I can send that as well.
>
>
>   +- StreamingRelation 
> DataSource(org.apache.spark.sql.SparkSession@21002393,kafka,List(),None,List(),None,Map(subscribe
>  -> analytics2, failOnDataLoss -> false, kafka.bootstrap.servers -> 
> X.X.X.X:9092 , checkpointLocation -> 
> /usr/local/hadoop/checkpoint, startingOffsets -> earliest),None), kafka, 
> [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, 
> timestampType#6]
>
> *Here is the stack trace*
>
> StructField(OptionalContext4,StringType,true), 
> StructField(OptionalContext5,StringType,true)),true), cast(value#1 as 
> string)) AS payload#15]
> +- StreamingExecutionRelation 
> KafkaSource[Subscribe[analytics2]], [key#0, value#1, topic#2, 
> partition#3, offset#4L, timestamp#5, timestampType#6]
>
> at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:305)
> at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:191)
> Caused by: org.apache.spark.SparkException: Job aborted due to stage 
> 

Re: Running into the same problem as JIRA SPARK-19268

2017-05-25 Thread kant kodali
I did the following

*bin/hadoop fs -mkdir -p **/usr/local/hadoop/checkpoint* and did *bin/hadoop
fs -ls / *

and I can actually see */tmp* and */usr* and inside of */usr *there is
indeed *local/hadoop/checkpoint. *

So until here it looks fine.

I also cleared everything */tmp/** as @Michael suggested using *bin/hadoop
fs -rmdir * such that when I do *bin/hadoop fs -ls /tmp *I don't see
anything.

Now I ran my spark driver program using spark-submit it failed with the
following exception

*File does not exist:
/tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2/1.delta*

so I did *bin/hadoop fs -ls *
*/tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2 *

and I did not see anything there like *1.delta(there are just no
files)* however
all these directories
*/tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2
 *do exist.

For my checkPointLocation I had passed   */usr/local/hadoop/checkpoint *and
 *hdfs:///usr/local/hadoop/checkpoint  *so far and both
didn't work for me. It is failing with the same error "*File does not
exist:
/tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2/1.delta*"

so what can be the problem? any ideas?

Thanks!

On Thu, May 25, 2017 at 1:31 AM, kant kodali  wrote:

> Executing this bin/hadoop fs -ls /usr/local/hadoop/checkpoint says
>
> ls: `/usr/local/hadoop/checkpoint': No such file or directory
>
> This is what I expected as well since I don't see any checkpoint directory
> under /usr/local/hadoop. Am I missing any configuration variable like
> HADOOP_CONF_DIR ? I am currently not setting that in conf/spark-env.sh
> and thats the only hadoop related environment variable I see. please let me
> know
>
> thanks!
>
>
>
> On Thu, May 25, 2017 at 1:19 AM, kant kodali  wrote:
>
>> Hi Ryan,
>>
>> I did add that print statement and here is what I got.
>>
>> class org.apache.hadoop.hdfs.DistributedFileSystem
>>
>> Thanks!
>>
>> On Wed, May 24, 2017 at 11:39 PM, Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>>> I meant using HDFS command to check the directory. Such as "bin/hadoop
>>> fs -ls /usr/local/hadoop/checkpoint". My hunch is the default file system
>>> in driver probably is the local file system. Could you add the following
>>> line into your code to print the default file system?
>>>
>>> println(org.apache.hadoop.fs.FileSystem.get(sc.hadoopConfigu
>>> ration).getClass)
>>>
>>> On Wed, May 24, 2017 at 5:59 PM, kant kodali  wrote:
>>>
 Hi All,

 I specified hdfsCheckPointDir = /usr/local/hadoop/checkpoint as you
 can see below however I dont see checkpoint directory under my hadoop_home=
 /usr/local/hadoop in either datanodes or namenodes however in datanode
 machine there seems to be some data under

 /usr/local/hadoop/hdfs/namenode/current/BP-1469808024-X.X.X.
 X-1495672725898/current/finalized/subdir0/subdir0

 I thought the checkpoint directory will be created by spark once I
 specify the path but do I manually need to create checkpoint dir using
 mkdir in all spark worker machines? I am new to HDFS as well so please let
 me know. I can try sending df.explain("true") but I have 100 fields in my
 schema so Project looks really big and if this is not a problem for you
 guys I can send that as well.


   +- StreamingRelation 
 DataSource(org.apache.spark.sql.SparkSession@21002393,kafka,List(),None,List(),None,Map(subscribe
  -> analytics2, failOnDataLoss -> false, kafka.bootstrap.servers -> 
 X.X.X.X:9092 , checkpointLocation -> 
 /usr/local/hadoop/checkpoint, startingOffsets -> earliest),None), kafka, 
 [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, 
 timestampType#6]

 *Here is the stack trace*

 StructField(OptionalContext4,StringType,true), 
 StructField(OptionalContext5,StringType,true)),true), cast(value#1 as 
 string)) AS payload#15]
 +- StreamingExecutionRelation 
 KafkaSource[Subscribe[analytics2]], [key#0, value#1, topic#2, partition#3, 
 offset#4L, timestamp#5, timestampType#6]

 at 
 org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:305)
 at 
 org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:191)
 Caused by: org.apache.spark.SparkException: Job aborted due to stage 
 failure: Task 2 in stage 3.0 failed 4 times, most recent failure: Lost 
 task 2.3 in stage 3.0 (TID 222, 172.31.25.189, executor 0): 
 java.lang.IllegalStateException: Error reading delta file 
 /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta of 
 HDFSStateStoreProvider[id = (op=0, part=2), dir = 
 /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2]: 
 

Re: Running into the same problem as JIRA SPARK-19268

2017-05-25 Thread kant kodali
Executing this bin/hadoop fs -ls /usr/local/hadoop/checkpoint says

ls: `/usr/local/hadoop/checkpoint': No such file or directory

This is what I expected as well since I don't see any checkpoint directory
under /usr/local/hadoop. Am I missing any configuration variable like
HADOOP_CONF_DIR ? I am currently not setting that in conf/spark-env.sh and
thats the only hadoop related environment variable I see. please let me know

thanks!



On Thu, May 25, 2017 at 1:19 AM, kant kodali  wrote:

> Hi Ryan,
>
> I did add that print statement and here is what I got.
>
> class org.apache.hadoop.hdfs.DistributedFileSystem
>
> Thanks!
>
> On Wed, May 24, 2017 at 11:39 PM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> I meant using HDFS command to check the directory. Such as "bin/hadoop fs
>> -ls /usr/local/hadoop/checkpoint". My hunch is the default file system in
>> driver probably is the local file system. Could you add the following line
>> into your code to print the default file system?
>>
>> println(org.apache.hadoop.fs.FileSystem.get(sc.hadoopConfigu
>> ration).getClass)
>>
>> On Wed, May 24, 2017 at 5:59 PM, kant kodali  wrote:
>>
>>> Hi All,
>>>
>>> I specified hdfsCheckPointDir = /usr/local/hadoop/checkpoint as you can
>>> see below however I dont see checkpoint directory under my hadoop_home=
>>> /usr/local/hadoop in either datanodes or namenodes however in datanode
>>> machine there seems to be some data under
>>>
>>> /usr/local/hadoop/hdfs/namenode/current/BP-1469808024-X.X.X.
>>> X-1495672725898/current/finalized/subdir0/subdir0
>>>
>>> I thought the checkpoint directory will be created by spark once I
>>> specify the path but do I manually need to create checkpoint dir using
>>> mkdir in all spark worker machines? I am new to HDFS as well so please let
>>> me know. I can try sending df.explain("true") but I have 100 fields in my
>>> schema so Project looks really big and if this is not a problem for you
>>> guys I can send that as well.
>>>
>>>
>>>   +- StreamingRelation 
>>> DataSource(org.apache.spark.sql.SparkSession@21002393,kafka,List(),None,List(),None,Map(subscribe
>>>  -> analytics2, failOnDataLoss -> false, kafka.bootstrap.servers -> 
>>> X.X.X.X:9092 , checkpointLocation -> 
>>> /usr/local/hadoop/checkpoint, startingOffsets -> earliest),None), kafka, 
>>> [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, 
>>> timestampType#6]
>>>
>>> *Here is the stack trace*
>>>
>>> StructField(OptionalContext4,StringType,true), 
>>> StructField(OptionalContext5,StringType,true)),true), cast(value#1 as 
>>> string)) AS payload#15]
>>> +- StreamingExecutionRelation 
>>> KafkaSource[Subscribe[analytics2]], [key#0, value#1, topic#2, partition#3, 
>>> offset#4L, timestamp#5, timestampType#6]
>>>
>>> at 
>>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:305)
>>> at 
>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:191)
>>> Caused by: org.apache.spark.SparkException: Job aborted due to stage 
>>> failure: Task 2 in stage 3.0 failed 4 times, most recent failure: Lost task 
>>> 2.3 in stage 3.0 (TID 222, 172.31.25.189, executor 0): 
>>> java.lang.IllegalStateException: Error reading delta file 
>>> /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta of 
>>> HDFSStateStoreProvider[id = (op=0, part=2), dir = 
>>> /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2]: 
>>> /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta does 
>>> not exist
>>> at 
>>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:365)
>>> at 
>>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:317)
>>> at 
>>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:314)
>>> at scala.Option.getOrElse(Option.scala:121)
>>> at 
>>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:314)
>>> at 
>>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:313)
>>> at scala.Option.getOrElse(Option.scala:121)
>>> 

Re: Running into the same problem as JIRA SPARK-19268

2017-05-25 Thread kant kodali
Hi Ryan,

I did add that print statement and here is what I got.

class org.apache.hadoop.hdfs.DistributedFileSystem

Thanks!

On Wed, May 24, 2017 at 11:39 PM, Shixiong(Ryan) Zhu <
shixi...@databricks.com> wrote:

> I meant using HDFS command to check the directory. Such as "bin/hadoop fs
> -ls /usr/local/hadoop/checkpoint". My hunch is the default file system in
> driver probably is the local file system. Could you add the following line
> into your code to print the default file system?
>
> println(org.apache.hadoop.fs.FileSystem.get(sc.
> hadoopConfiguration).getClass)
>
> On Wed, May 24, 2017 at 5:59 PM, kant kodali  wrote:
>
>> Hi All,
>>
>> I specified hdfsCheckPointDir = /usr/local/hadoop/checkpoint as you can
>> see below however I dont see checkpoint directory under my hadoop_home=
>> /usr/local/hadoop in either datanodes or namenodes however in datanode
>> machine there seems to be some data under
>>
>> /usr/local/hadoop/hdfs/namenode/current/BP-1469808024-X.X.X.
>> X-1495672725898/current/finalized/subdir0/subdir0
>>
>> I thought the checkpoint directory will be created by spark once I
>> specify the path but do I manually need to create checkpoint dir using
>> mkdir in all spark worker machines? I am new to HDFS as well so please let
>> me know. I can try sending df.explain("true") but I have 100 fields in my
>> schema so Project looks really big and if this is not a problem for you
>> guys I can send that as well.
>>
>>
>>   +- StreamingRelation 
>> DataSource(org.apache.spark.sql.SparkSession@21002393,kafka,List(),None,List(),None,Map(subscribe
>>  -> analytics2, failOnDataLoss -> false, kafka.bootstrap.servers -> 
>> X.X.X.X:9092 , checkpointLocation -> 
>> /usr/local/hadoop/checkpoint, startingOffsets -> earliest),None), kafka, 
>> [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, 
>> timestampType#6]
>>
>> *Here is the stack trace*
>>
>> StructField(OptionalContext4,StringType,true), 
>> StructField(OptionalContext5,StringType,true)),true), cast(value#1 as 
>> string)) AS payload#15]
>> +- StreamingExecutionRelation 
>> KafkaSource[Subscribe[analytics2]], [key#0, value#1, topic#2, partition#3, 
>> offset#4L, timestamp#5, timestampType#6]
>>
>> at 
>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:305)
>> at 
>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:191)
>> Caused by: org.apache.spark.SparkException: Job aborted due to stage 
>> failure: Task 2 in stage 3.0 failed 4 times, most recent failure: Lost task 
>> 2.3 in stage 3.0 (TID 222, 172.31.25.189, executor 0): 
>> java.lang.IllegalStateException: Error reading delta file 
>> /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta of 
>> HDFSStateStoreProvider[id = (op=0, part=2), dir = 
>> /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2]: 
>> /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta does 
>> not exist
>> at 
>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:365)
>> at 
>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:317)
>> at 
>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:314)
>> at scala.Option.getOrElse(Option.scala:121)
>> at 
>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:314)
>> at 
>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:313)
>> at scala.Option.getOrElse(Option.scala:121)
>> at 
>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:313)
>> at 
>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:220)
>> at 
>> org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:186)
>> at 
>> org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:61)
>> at 

Re: Running into the same problem as JIRA SPARK-19268

2017-05-25 Thread Shixiong(Ryan) Zhu
I meant using HDFS command to check the directory. Such as "bin/hadoop fs
-ls /usr/local/hadoop/checkpoint". My hunch is the default file system in
driver probably is the local file system. Could you add the following line
into your code to print the default file system?

println(org.apache.hadoop.fs.FileSystem.get(sc.hadoopConfiguration).getClass)

On Wed, May 24, 2017 at 5:59 PM, kant kodali  wrote:

> Hi All,
>
> I specified hdfsCheckPointDir = /usr/local/hadoop/checkpoint as you can
> see below however I dont see checkpoint directory under my hadoop_home=
> /usr/local/hadoop in either datanodes or namenodes however in datanode
> machine there seems to be some data under
>
> /usr/local/hadoop/hdfs/namenode/current/BP-1469808024-X.X.X.X-
> 1495672725898/current/finalized/subdir0/subdir0
>
> I thought the checkpoint directory will be created by spark once I specify
> the path but do I manually need to create checkpoint dir using mkdir in all
> spark worker machines? I am new to HDFS as well so please let me know. I
> can try sending df.explain("true") but I have 100 fields in my schema so
> Project looks really big and if this is not a problem for you guys I can
> send that as well.
>
>
>   +- StreamingRelation 
> DataSource(org.apache.spark.sql.SparkSession@21002393,kafka,List(),None,List(),None,Map(subscribe
>  -> analytics2, failOnDataLoss -> false, kafka.bootstrap.servers -> 
> X.X.X.X:9092 , checkpointLocation -> 
> /usr/local/hadoop/checkpoint, startingOffsets -> earliest),None), kafka, 
> [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, 
> timestampType#6]
>
> *Here is the stack trace*
>
> StructField(OptionalContext4,StringType,true), 
> StructField(OptionalContext5,StringType,true)),true), cast(value#1 as 
> string)) AS payload#15]
> +- StreamingExecutionRelation KafkaSource[Subscribe[analytics2]], 
> [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, 
> timestampType#6]
>
> at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:305)
> at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:191)
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 2 in stage 3.0 failed 4 times, most recent failure: Lost task 2.3 in 
> stage 3.0 (TID 222, 172.31.25.189, executor 0): 
> java.lang.IllegalStateException: Error reading delta file 
> /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta of 
> HDFSStateStoreProvider[id = (op=0, part=2), dir = 
> /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2]: 
> /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta does 
> not exist
> at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:365)
> at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:317)
> at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:314)
> at scala.Option.getOrElse(Option.scala:121)
> at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:314)
> at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:313)
> at scala.Option.getOrElse(Option.scala:121)
> at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:313)
> at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:220)
> at 
> org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:186)
> at 
> org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:61)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>   

Re: Running into the same problem as JIRA SPARK-19268

2017-05-24 Thread kant kodali
Hi All,

I specified hdfsCheckPointDir = /usr/local/hadoop/checkpoint as you can see
below however I dont see checkpoint directory under my hadoop_home=
/usr/local/hadoop in either datanodes or namenodes however in datanode
machine there seems to be some data under

/usr/local/hadoop/hdfs/namenode/current/BP-1469808024-X.X.X.X-1495672725898/current/finalized/subdir0/subdir0

I thought the checkpoint directory will be created by spark once I specify
the path but do I manually need to create checkpoint dir using mkdir in all
spark worker machines? I am new to HDFS as well so please let me know. I
can try sending df.explain("true") but I have 100 fields in my schema so
Project looks really big and if this is not a problem for you guys I can
send that as well.


  +- StreamingRelation
DataSource(org.apache.spark.sql.SparkSession@21002393,kafka,List(),None,List(),None,Map(subscribe
-> analytics2, failOnDataLoss -> false, kafka.bootstrap.servers ->
X.X.X.X:9092 , checkpointLocation ->
/usr/local/hadoop/checkpoint, startingOffsets -> earliest),None),
kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5,
timestampType#6]

*Here is the stack trace*

StructField(OptionalContext4,StringType,true),
StructField(OptionalContext5,StringType,true)),true), cast(value#1 as
string)) AS payload#15]
+- StreamingExecutionRelation
KafkaSource[Subscribe[analytics2]], [key#0, value#1, topic#2,
partition#3, offset#4L, timestamp#5, timestampType#6]

at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:305)
at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:191)
Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 2 in stage 3.0 failed 4 times, most recent failure: Lost
task 2.3 in stage 3.0 (TID 222, 172.31.25.189, executor 0):
java.lang.IllegalStateException: Error reading delta file
/tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
of HDFSStateStoreProvider[id = (op=0, part=2), dir =
/tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2]:
/tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
does not exist
at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:365)
at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:317)
at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:314)
at scala.Option.getOrElse(Option.scala:121)
at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:314)
at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:313)
at scala.Option.getOrElse(Option.scala:121)
at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:313)
at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:220)
at 
org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:186)
at 
org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:61)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.FileNotFoundException: File does not exist:
/tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
at 

Re: Running into the same problem as JIRA SPARK-19268

2017-05-24 Thread Shixiong(Ryan) Zhu
What's the value of "hdfsCheckPointDir"? Could you list this directory on
HDFS and report the files there?

On Wed, May 24, 2017 at 3:50 PM, Michael Armbrust 
wrote:

> -dev
>
> Have you tried clearing out the checkpoint directory?  Can you also give
> the full stack trace?
>
> On Wed, May 24, 2017 at 3:45 PM, kant kodali  wrote:
>
>> Even if I do simple count aggregation like below I get the same error as
>> https://issues.apache.org/jira/browse/SPARK-19268
>>
>> Dataset df2 = df1.groupBy(functions.window(df1.col("Timestamp5"), "24 
>> hours", "24 hours"), df1.col("AppName")).count();
>>
>>
>> On Wed, May 24, 2017 at 3:35 PM, kant kodali  wrote:
>>
>>> Hi All,
>>>
>>> I am using Spark 2.1.1 and running in a Standalone mode using HDFS and
>>> Kafka
>>>
>>> I am running into the same problem as https://issues.apache.org/jira
>>> /browse/SPARK-19268 with my app(not KafkaWordCount).
>>>
>>> Here is my sample code
>>>
>>> *Here is how I create ReadStream*
>>>
>>> sparkSession.readStream()
>>> .format("kafka")
>>> .option("kafka.bootstrap.servers", 
>>> config.getString("kafka.consumer.settings.bootstrapServers"))
>>> .option("subscribe", 
>>> config.getString("kafka.consumer.settings.topicName"))
>>> .option("startingOffsets", "earliest")
>>> .option("failOnDataLoss", "false")
>>> .option("checkpointLocation", hdfsCheckPointDir)
>>> .load();
>>>
>>>
>>> *The core logic*
>>>
>>> Dataset df = ds.select(from_json(new Column("value").cast("string"), 
>>> client.getSchema()).as("payload"));
>>> Dataset df1 = df.selectExpr("payload.info.*", "payload.data.*");
>>> Dataset df2 = df1.groupBy(window(df1.col("Timestamp5"), "24 hours", 
>>> "24 hours"), df1.col("AppName")).agg(sum("Amount"));
>>> StreamingQuery query = df1.writeStream().foreach(new 
>>> KafkaSink()).outputMode("update").start();
>>> query.awaitTermination();
>>>
>>>
>>> I can also provide any other information you may need.
>>>
>>> Thanks!
>>>
>>
>>
>


Re: Running into the same problem as JIRA SPARK-19268

2017-05-24 Thread Michael Armbrust
-dev

Have you tried clearing out the checkpoint directory?  Can you also give
the full stack trace?

On Wed, May 24, 2017 at 3:45 PM, kant kodali  wrote:

> Even if I do simple count aggregation like below I get the same error as
> https://issues.apache.org/jira/browse/SPARK-19268
>
> Dataset df2 = df1.groupBy(functions.window(df1.col("Timestamp5"), "24 
> hours", "24 hours"), df1.col("AppName")).count();
>
>
> On Wed, May 24, 2017 at 3:35 PM, kant kodali  wrote:
>
>> Hi All,
>>
>> I am using Spark 2.1.1 and running in a Standalone mode using HDFS and
>> Kafka
>>
>> I am running into the same problem as https://issues.apache.org/jira
>> /browse/SPARK-19268 with my app(not KafkaWordCount).
>>
>> Here is my sample code
>>
>> *Here is how I create ReadStream*
>>
>> sparkSession.readStream()
>> .format("kafka")
>> .option("kafka.bootstrap.servers", 
>> config.getString("kafka.consumer.settings.bootstrapServers"))
>> .option("subscribe", 
>> config.getString("kafka.consumer.settings.topicName"))
>> .option("startingOffsets", "earliest")
>> .option("failOnDataLoss", "false")
>> .option("checkpointLocation", hdfsCheckPointDir)
>> .load();
>>
>>
>> *The core logic*
>>
>> Dataset df = ds.select(from_json(new Column("value").cast("string"), 
>> client.getSchema()).as("payload"));
>> Dataset df1 = df.selectExpr("payload.info.*", "payload.data.*");
>> Dataset df2 = df1.groupBy(window(df1.col("Timestamp5"), "24 hours", "24 
>> hours"), df1.col("AppName")).agg(sum("Amount"));
>> StreamingQuery query = df1.writeStream().foreach(new 
>> KafkaSink()).outputMode("update").start();
>> query.awaitTermination();
>>
>>
>> I can also provide any other information you may need.
>>
>> Thanks!
>>
>
>


Re: Running into the same problem as JIRA SPARK-19268

2017-05-24 Thread kant kodali
Even if I do simple count aggregation like below I get the same error as
https://issues.apache.org/jira/browse/SPARK-19268

Dataset df2 = df1.groupBy(functions.window(df1.col("Timestamp5"),
"24 hours", "24 hours"), df1.col("AppName")).count();


On Wed, May 24, 2017 at 3:35 PM, kant kodali  wrote:

> Hi All,
>
> I am using Spark 2.1.1 and running in a Standalone mode using HDFS and
> Kafka
>
> I am running into the same problem as https://issues.apache.org/
> jira/browse/SPARK-19268 with my app(not KafkaWordCount).
>
> Here is my sample code
>
> *Here is how I create ReadStream*
>
> sparkSession.readStream()
> .format("kafka")
> .option("kafka.bootstrap.servers", 
> config.getString("kafka.consumer.settings.bootstrapServers"))
> .option("subscribe", 
> config.getString("kafka.consumer.settings.topicName"))
> .option("startingOffsets", "earliest")
> .option("failOnDataLoss", "false")
> .option("checkpointLocation", hdfsCheckPointDir)
> .load();
>
>
> *The core logic*
>
> Dataset df = ds.select(from_json(new Column("value").cast("string"), 
> client.getSchema()).as("payload"));
> Dataset df1 = df.selectExpr("payload.info.*", "payload.data.*");
> Dataset df2 = df1.groupBy(window(df1.col("Timestamp5"), "24 hours", "24 
> hours"), df1.col("AppName")).agg(sum("Amount"));
> StreamingQuery query = df1.writeStream().foreach(new 
> KafkaSink()).outputMode("update").start();
> query.awaitTermination();
>
>
> I can also provide any other information you may need.
>
> Thanks!
>