get partition locations in spark

2017-05-25 Thread girish hilage
Hi,
In order to get the preferred locations for partitions I executed below 
statement,
r1.preferredLocations(part);but it returned an empty List().
How can I print the hostnames the partition is likely on?
Regards,Girish


Re: Running into the same problem as JIRA SPARK-19268

2017-05-25 Thread Shixiong(Ryan) Zhu
I don't know what happened in your case so cannot provide any work around.
It would be great if you can provide logs output by
HDFSBackedStateStoreProvider.

On Thu, May 25, 2017 at 4:05 PM, kant kodali  wrote:

>
> On Thu, May 25, 2017 at 3:41 PM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> bin/hadoop fs -ls /usr/local/hadoop/checkpoint/state/0/*
>>
>
> Hi,
>
> There are no files under bin/hadoop fs -ls 
> /usr/local/hadoop/checkpoint/state/0/*
> but all the directories until /usr/local/hadoop/checkpoint/state/0 does
> exist(which are created by spark).
>
> yes I can attach the log but pretty much it looks like same as I sent on
> this thread.
>
> Is there any work around to this for now? Will create a ticket shortly.
>
> Thanks!
>


Re: Running into the same problem as JIRA SPARK-19268

2017-05-25 Thread kant kodali
On Thu, May 25, 2017 at 3:41 PM, Shixiong(Ryan) Zhu  wrote:

> bin/hadoop fs -ls /usr/local/hadoop/checkpoint/state/0/*
>

Hi,

There are no files under bin/hadoop fs -ls
/usr/local/hadoop/checkpoint/state/0/*
but all the directories until /usr/local/hadoop/checkpoint/state/0 does
exist(which are created by spark).

yes I can attach the log but pretty much it looks like same as I sent on
this thread.

Is there any work around to this for now? Will create a ticket shortly.

Thanks!


Re: Running into the same problem as JIRA SPARK-19268

2017-05-25 Thread Shixiong(Ryan) Zhu
Feel free to create a new ticket. Could you also provide the files in
"/usr/local/hadoop/checkpoint/state/0" (Just run "bin/hadoop fs -ls
/usr/local/hadoop/checkpoint/state/0/*") in the ticket and the Spark logs?

On Thu, May 25, 2017 at 2:53 PM, kant kodali  wrote:

> Should I file a ticket or should I try another version like Spark 2.2
> since I am currently using 2.1.1?
>
> On Thu, May 25, 2017 at 2:38 PM, kant kodali  wrote:
>
>> Hi Ryan,
>>
>> You are right I was setting checkpointLocation for readStream. Now I did
>> set if for writeStream as well  like below
>>
>> StreamingQuery query = df2.writeStream().foreach(new KafkaSink()).option(
>> "checkpointLocation","/usr/local/hadoop/checkpoint").outputMode("update"
>> ).start();
>>
>> query.awaitTermination();
>>
>> *and now I can at very least see there are directories like*
>>
>> -rw-r--r--   2 ubuntu supergroup 45 2017-05-25 21:29 
>> /usr/local/hadoop/checkpoint/metadata
>> drwxr-xr-x   - ubuntu supergroup  0 2017-05-25 21:30 
>> /usr/local/hadoop/checkpoint/offsets
>> drwxr-xr-x   - ubuntu supergroup  0 2017-05-25 21:29 
>> /usr/local/hadoop/checkpoint/sources
>> drwxr-xr-x   - ubuntu supergroup  0 2017-05-25 21:30 
>> /usr/local/hadoop/checkpoint/state
>>
>>
>> However it still fails with
>>
>> *org.apache.hadoop.ipc.RemoteException(java.io 
>> .FileNotFoundException): File does not exist: 
>> /usr/local/hadoop/checkpoint/state/0/1/1.delta*
>>
>>
>>
>> On Thu, May 25, 2017 at 2:31 PM, Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>>> Read your codes again and found one issue: you set "checkpointLocation"
>>> in `readStream`. It should be set in `writeStream`. However, I still have
>>> no idea why use a temp checkpoint location will fail.
>>>
>>> On Thu, May 25, 2017 at 2:23 PM, kant kodali  wrote:
>>>
 I did the following

 *bin/hadoop fs -mkdir -p **/usr/local/hadoop/checkpoint* and did 
 *bin/hadoop
 fs -ls / *

 and I can actually see */tmp* and */usr* and inside of */usr *there is
 indeed *local/hadoop/checkpoint. *

 So until here it looks fine.

 I also cleared everything */tmp/** as @Michael suggested using *bin/hadoop
 fs -rmdir * such that when I do *bin/hadoop fs -ls /tmp *I don't see
 anything.

 Now I ran my spark driver program using spark-submit it failed with the
 following exception

 *File does not exist:
 /tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2/1.delta*

 so I did *bin/hadoop fs -ls *
 */tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2 *

 and I did not see anything there like *1.delta(there are just no
 files)* however all these directories 
 */tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2
  *do exist.

 For my checkPointLocation I had passed   */usr/local/hadoop/checkpoint
 *and  *hdfs:///usr/local/hadoop/checkpoint  *so far and
 both didn't work for me. It is failing with the same error "*File does
 not exist:
 /tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2/1.delta*"

 so what can be the problem? any ideas?

 Thanks!

 On Thu, May 25, 2017 at 1:31 AM, kant kodali 
 wrote:

> Executing this bin/hadoop fs -ls /usr/local/hadoop/checkpoint says
>
> ls: `/usr/local/hadoop/checkpoint': No such file or directory
>
> This is what I expected as well since I don't see any checkpoint
> directory under /usr/local/hadoop. Am I missing any configuration variable
> like HADOOP_CONF_DIR ? I am currently not setting that in
> conf/spark-env.sh and thats the only hadoop related environment variable I
> see. please let me know
>
> thanks!
>
>
>
> On Thu, May 25, 2017 at 1:19 AM, kant kodali 
> wrote:
>
>> Hi Ryan,
>>
>> I did add that print statement and here is what I got.
>>
>> class org.apache.hadoop.hdfs.DistributedFileSystem
>>
>> Thanks!
>>
>> On Wed, May 24, 2017 at 11:39 PM, Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>>> I meant using HDFS command to check the directory. Such as
>>> "bin/hadoop fs -ls /usr/local/hadoop/checkpoint". My hunch is the 
>>> default
>>> file system in driver probably is the local file system. Could you add 
>>> the
>>> following line into your code to print the default file system?
>>>
>>> println(org.apache.hadoop.fs.FileSystem.get(sc.hadoopConfigu
>>> ration).getClass)
>>>
>>> On Wed, May 24, 2017 at 5:59 PM, kant kodali 
>>> wrote:
>>>
 Hi All,

 I specified hdfsCheckPointDir = /usr/local/hadoop/checkpoint as
 you can see below however I dont see checkpoint directory under my
 

shuffle write is very slow

2017-05-25 Thread KhajaAsmath Mohammed
Hi,

I am converting hive job with spark job. I have tested on small set and
logic is correct in hive and spark.

when i started testing on large data, spark is very slow when compared to
hive.

shuffle write is taking long time. any suggestions?

I am creating temporary table in spark and overwriting hive table with
partitions from that temporary table created on spark.

 dataframe_transposed.registerTempTable(srcTable)
import sqlContext._
import sqlContext.implicits._
val query=s"INSERT OVERWRITE TABLE ${destTable} SELECT * from
${srcTable}"
println(s"INSERT OVERWRITE TABLE ${destTable} SELECT * from
${srcTable}")
logger.info(s"Executing Query ${query}")
sqlContext.sql(query)

total size of dataframe is around 190 GB and it is running for ever in this
case while hive job can be completed in 4 hours.

Thanks,
Asmath.


Re: Running into the same problem as JIRA SPARK-19268

2017-05-25 Thread kant kodali
Should I file a ticket or should I try another version like Spark 2.2 since
I am currently using 2.1.1?

On Thu, May 25, 2017 at 2:38 PM, kant kodali  wrote:

> Hi Ryan,
>
> You are right I was setting checkpointLocation for readStream. Now I did
> set if for writeStream as well  like below
>
> StreamingQuery query = df2.writeStream().foreach(new KafkaSink()).option("
> checkpointLocation","/usr/local/hadoop/checkpoint").outputMode("update"
> ).start();
>
> query.awaitTermination();
>
> *and now I can at very least see there are directories like*
>
> -rw-r--r--   2 ubuntu supergroup 45 2017-05-25 21:29 
> /usr/local/hadoop/checkpoint/metadata
> drwxr-xr-x   - ubuntu supergroup  0 2017-05-25 21:30 
> /usr/local/hadoop/checkpoint/offsets
> drwxr-xr-x   - ubuntu supergroup  0 2017-05-25 21:29 
> /usr/local/hadoop/checkpoint/sources
> drwxr-xr-x   - ubuntu supergroup  0 2017-05-25 21:30 
> /usr/local/hadoop/checkpoint/state
>
>
> However it still fails with
>
> *org.apache.hadoop.ipc.RemoteException(java.io 
> .FileNotFoundException): File does not exist: 
> /usr/local/hadoop/checkpoint/state/0/1/1.delta*
>
>
>
> On Thu, May 25, 2017 at 2:31 PM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Read your codes again and found one issue: you set "checkpointLocation"
>> in `readStream`. It should be set in `writeStream`. However, I still have
>> no idea why use a temp checkpoint location will fail.
>>
>> On Thu, May 25, 2017 at 2:23 PM, kant kodali  wrote:
>>
>>> I did the following
>>>
>>> *bin/hadoop fs -mkdir -p **/usr/local/hadoop/checkpoint* and did *bin/hadoop
>>> fs -ls / *
>>>
>>> and I can actually see */tmp* and */usr* and inside of */usr *there is
>>> indeed *local/hadoop/checkpoint. *
>>>
>>> So until here it looks fine.
>>>
>>> I also cleared everything */tmp/** as @Michael suggested using *bin/hadoop
>>> fs -rmdir * such that when I do *bin/hadoop fs -ls /tmp *I don't see
>>> anything.
>>>
>>> Now I ran my spark driver program using spark-submit it failed with the
>>> following exception
>>>
>>> *File does not exist:
>>> /tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2/1.delta*
>>>
>>> so I did *bin/hadoop fs -ls *
>>> */tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2 *
>>>
>>> and I did not see anything there like *1.delta(there are just no files)* 
>>> however
>>> all these directories 
>>> */tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2
>>>  *do exist.
>>>
>>> For my checkPointLocation I had passed   */usr/local/hadoop/checkpoint *
>>> and  *hdfs:///usr/local/hadoop/checkpoint  *so far and
>>> both didn't work for me. It is failing with the same error "*File does
>>> not exist:
>>> /tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2/1.delta*"
>>>
>>> so what can be the problem? any ideas?
>>>
>>> Thanks!
>>>
>>> On Thu, May 25, 2017 at 1:31 AM, kant kodali  wrote:
>>>
 Executing this bin/hadoop fs -ls /usr/local/hadoop/checkpoint says

 ls: `/usr/local/hadoop/checkpoint': No such file or directory

 This is what I expected as well since I don't see any checkpoint
 directory under /usr/local/hadoop. Am I missing any configuration variable
 like HADOOP_CONF_DIR ? I am currently not setting that in
 conf/spark-env.sh and thats the only hadoop related environment variable I
 see. please let me know

 thanks!



 On Thu, May 25, 2017 at 1:19 AM, kant kodali 
 wrote:

> Hi Ryan,
>
> I did add that print statement and here is what I got.
>
> class org.apache.hadoop.hdfs.DistributedFileSystem
>
> Thanks!
>
> On Wed, May 24, 2017 at 11:39 PM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> I meant using HDFS command to check the directory. Such as
>> "bin/hadoop fs -ls /usr/local/hadoop/checkpoint". My hunch is the default
>> file system in driver probably is the local file system. Could you add 
>> the
>> following line into your code to print the default file system?
>>
>> println(org.apache.hadoop.fs.FileSystem.get(sc.hadoopConfigu
>> ration).getClass)
>>
>> On Wed, May 24, 2017 at 5:59 PM, kant kodali 
>> wrote:
>>
>>> Hi All,
>>>
>>> I specified hdfsCheckPointDir = /usr/local/hadoop/checkpoint as you
>>> can see below however I dont see checkpoint directory under my 
>>> hadoop_home=
>>> /usr/local/hadoop in either datanodes or namenodes however in
>>> datanode machine there seems to be some data under
>>>
>>> /usr/local/hadoop/hdfs/namenode/current/BP-1469808024-X.X.X.
>>> X-1495672725898/current/finalized/subdir0/subdir0
>>>
>>> I thought the checkpoint directory will be created by spark once I
>>> specify the path but do I manually need to create checkpoint dir using

Re: Running into the same problem as JIRA SPARK-19268

2017-05-25 Thread kant kodali
Hi Ryan,

You are right I was setting checkpointLocation for readStream. Now I did
set if for writeStream as well  like below

StreamingQuery query = df2.writeStream().foreach(new KafkaSink()).option(
"checkpointLocation","/usr/local/hadoop/checkpoint").outputMode("update"
).start();

query.awaitTermination();

*and now I can at very least see there are directories like*

-rw-r--r--   2 ubuntu supergroup 45 2017-05-25 21:29
/usr/local/hadoop/checkpoint/metadata
drwxr-xr-x   - ubuntu supergroup  0 2017-05-25 21:30
/usr/local/hadoop/checkpoint/offsets
drwxr-xr-x   - ubuntu supergroup  0 2017-05-25 21:29
/usr/local/hadoop/checkpoint/sources
drwxr-xr-x   - ubuntu supergroup  0 2017-05-25 21:30
/usr/local/hadoop/checkpoint/state


However it still fails with

*org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException):
File does not exist: /usr/local/hadoop/checkpoint/state/0/1/1.delta*



On Thu, May 25, 2017 at 2:31 PM, Shixiong(Ryan) Zhu  wrote:

> Read your codes again and found one issue: you set "checkpointLocation" in
> `readStream`. It should be set in `writeStream`. However, I still have no
> idea why use a temp checkpoint location will fail.
>
> On Thu, May 25, 2017 at 2:23 PM, kant kodali  wrote:
>
>> I did the following
>>
>> *bin/hadoop fs -mkdir -p **/usr/local/hadoop/checkpoint* and did *bin/hadoop
>> fs -ls / *
>>
>> and I can actually see */tmp* and */usr* and inside of */usr *there is
>> indeed *local/hadoop/checkpoint. *
>>
>> So until here it looks fine.
>>
>> I also cleared everything */tmp/** as @Michael suggested using *bin/hadoop
>> fs -rmdir * such that when I do *bin/hadoop fs -ls /tmp *I don't see
>> anything.
>>
>> Now I ran my spark driver program using spark-submit it failed with the
>> following exception
>>
>> *File does not exist:
>> /tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2/1.delta*
>>
>> so I did *bin/hadoop fs -ls *
>> */tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2 *
>>
>> and I did not see anything there like *1.delta(there are just no files)* 
>> however
>> all these directories 
>> */tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2
>>  *do exist.
>>
>> For my checkPointLocation I had passed   */usr/local/hadoop/checkpoint *
>> and  *hdfs:///usr/local/hadoop/checkpoint  *so far and
>> both didn't work for me. It is failing with the same error "*File does
>> not exist:
>> /tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2/1.delta*"
>>
>> so what can be the problem? any ideas?
>>
>> Thanks!
>>
>> On Thu, May 25, 2017 at 1:31 AM, kant kodali  wrote:
>>
>>> Executing this bin/hadoop fs -ls /usr/local/hadoop/checkpoint says
>>>
>>> ls: `/usr/local/hadoop/checkpoint': No such file or directory
>>>
>>> This is what I expected as well since I don't see any checkpoint
>>> directory under /usr/local/hadoop. Am I missing any configuration variable
>>> like HADOOP_CONF_DIR ? I am currently not setting that in
>>> conf/spark-env.sh and thats the only hadoop related environment variable I
>>> see. please let me know
>>>
>>> thanks!
>>>
>>>
>>>
>>> On Thu, May 25, 2017 at 1:19 AM, kant kodali  wrote:
>>>
 Hi Ryan,

 I did add that print statement and here is what I got.

 class org.apache.hadoop.hdfs.DistributedFileSystem

 Thanks!

 On Wed, May 24, 2017 at 11:39 PM, Shixiong(Ryan) Zhu <
 shixi...@databricks.com> wrote:

> I meant using HDFS command to check the directory. Such as "bin/hadoop
> fs -ls /usr/local/hadoop/checkpoint". My hunch is the default file system
> in driver probably is the local file system. Could you add the following
> line into your code to print the default file system?
>
> println(org.apache.hadoop.fs.FileSystem.get(sc.hadoopConfigu
> ration).getClass)
>
> On Wed, May 24, 2017 at 5:59 PM, kant kodali 
> wrote:
>
>> Hi All,
>>
>> I specified hdfsCheckPointDir = /usr/local/hadoop/checkpoint as you
>> can see below however I dont see checkpoint directory under my 
>> hadoop_home=
>> /usr/local/hadoop in either datanodes or namenodes however in
>> datanode machine there seems to be some data under
>>
>> /usr/local/hadoop/hdfs/namenode/current/BP-1469808024-X.X.X.
>> X-1495672725898/current/finalized/subdir0/subdir0
>>
>> I thought the checkpoint directory will be created by spark once I
>> specify the path but do I manually need to create checkpoint dir using
>> mkdir in all spark worker machines? I am new to HDFS as well so please 
>> let
>> me know. I can try sending df.explain("true") but I have 100 fields in my
>> schema so Project looks really big and if this is not a problem for you
>> guys I can send that as well.
>>
>>
>>   +- StreamingRelation 
>> 

Re: Running into the same problem as JIRA SPARK-19268

2017-05-25 Thread Shixiong(Ryan) Zhu
Read your codes again and found one issue: you set "checkpointLocation" in
`readStream`. It should be set in `writeStream`. However, I still have no
idea why use a temp checkpoint location will fail.

On Thu, May 25, 2017 at 2:23 PM, kant kodali  wrote:

> I did the following
>
> *bin/hadoop fs -mkdir -p **/usr/local/hadoop/checkpoint* and did *bin/hadoop
> fs -ls / *
>
> and I can actually see */tmp* and */usr* and inside of */usr *there is
> indeed *local/hadoop/checkpoint. *
>
> So until here it looks fine.
>
> I also cleared everything */tmp/** as @Michael suggested using *bin/hadoop
> fs -rmdir * such that when I do *bin/hadoop fs -ls /tmp *I don't see
> anything.
>
> Now I ran my spark driver program using spark-submit it failed with the
> following exception
>
> *File does not exist:
> /tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2/1.delta*
>
> so I did *bin/hadoop fs -ls *
> */tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2 *
>
> and I did not see anything there like *1.delta(there are just no files)* 
> however
> all these directories 
> */tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2
>  *do exist.
>
> For my checkPointLocation I had passed   */usr/local/hadoop/checkpoint *
> and  *hdfs:///usr/local/hadoop/checkpoint  *so far and
> both didn't work for me. It is failing with the same error "*File does
> not exist:
> /tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2/1.delta*"
>
> so what can be the problem? any ideas?
>
> Thanks!
>
> On Thu, May 25, 2017 at 1:31 AM, kant kodali  wrote:
>
>> Executing this bin/hadoop fs -ls /usr/local/hadoop/checkpoint says
>>
>> ls: `/usr/local/hadoop/checkpoint': No such file or directory
>>
>> This is what I expected as well since I don't see any checkpoint
>> directory under /usr/local/hadoop. Am I missing any configuration variable
>> like HADOOP_CONF_DIR ? I am currently not setting that in
>> conf/spark-env.sh and thats the only hadoop related environment variable I
>> see. please let me know
>>
>> thanks!
>>
>>
>>
>> On Thu, May 25, 2017 at 1:19 AM, kant kodali  wrote:
>>
>>> Hi Ryan,
>>>
>>> I did add that print statement and here is what I got.
>>>
>>> class org.apache.hadoop.hdfs.DistributedFileSystem
>>>
>>> Thanks!
>>>
>>> On Wed, May 24, 2017 at 11:39 PM, Shixiong(Ryan) Zhu <
>>> shixi...@databricks.com> wrote:
>>>
 I meant using HDFS command to check the directory. Such as "bin/hadoop
 fs -ls /usr/local/hadoop/checkpoint". My hunch is the default file system
 in driver probably is the local file system. Could you add the following
 line into your code to print the default file system?

 println(org.apache.hadoop.fs.FileSystem.get(sc.hadoopConfigu
 ration).getClass)

 On Wed, May 24, 2017 at 5:59 PM, kant kodali 
 wrote:

> Hi All,
>
> I specified hdfsCheckPointDir = /usr/local/hadoop/checkpoint as you
> can see below however I dont see checkpoint directory under my 
> hadoop_home=
> /usr/local/hadoop in either datanodes or namenodes however in
> datanode machine there seems to be some data under
>
> /usr/local/hadoop/hdfs/namenode/current/BP-1469808024-X.X.X.
> X-1495672725898/current/finalized/subdir0/subdir0
>
> I thought the checkpoint directory will be created by spark once I
> specify the path but do I manually need to create checkpoint dir using
> mkdir in all spark worker machines? I am new to HDFS as well so please let
> me know. I can try sending df.explain("true") but I have 100 fields in my
> schema so Project looks really big and if this is not a problem for you
> guys I can send that as well.
>
>
>   +- StreamingRelation 
> DataSource(org.apache.spark.sql.SparkSession@21002393,kafka,List(),None,List(),None,Map(subscribe
>  -> analytics2, failOnDataLoss -> false, kafka.bootstrap.servers -> 
> X.X.X.X:9092 , checkpointLocation -> 
> /usr/local/hadoop/checkpoint, startingOffsets -> earliest),None), kafka, 
> [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, 
> timestampType#6]
>
> *Here is the stack trace*
>
> StructField(OptionalContext4,StringType,true), 
> StructField(OptionalContext5,StringType,true)),true), cast(value#1 as 
> string)) AS payload#15]
> +- StreamingExecutionRelation 
> KafkaSource[Subscribe[analytics2]], [key#0, value#1, topic#2, 
> partition#3, offset#4L, timestamp#5, timestampType#6]
>
> at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:305)
> at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:191)
> Caused by: org.apache.spark.SparkException: Job aborted due to stage 
> 

Re: Running into the same problem as JIRA SPARK-19268

2017-05-25 Thread kant kodali
I did the following

*bin/hadoop fs -mkdir -p **/usr/local/hadoop/checkpoint* and did *bin/hadoop
fs -ls / *

and I can actually see */tmp* and */usr* and inside of */usr *there is
indeed *local/hadoop/checkpoint. *

So until here it looks fine.

I also cleared everything */tmp/** as @Michael suggested using *bin/hadoop
fs -rmdir * such that when I do *bin/hadoop fs -ls /tmp *I don't see
anything.

Now I ran my spark driver program using spark-submit it failed with the
following exception

*File does not exist:
/tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2/1.delta*

so I did *bin/hadoop fs -ls *
*/tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2 *

and I did not see anything there like *1.delta(there are just no
files)* however
all these directories
*/tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2
 *do exist.

For my checkPointLocation I had passed   */usr/local/hadoop/checkpoint *and
 *hdfs:///usr/local/hadoop/checkpoint  *so far and both
didn't work for me. It is failing with the same error "*File does not
exist:
/tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2/1.delta*"

so what can be the problem? any ideas?

Thanks!

On Thu, May 25, 2017 at 1:31 AM, kant kodali  wrote:

> Executing this bin/hadoop fs -ls /usr/local/hadoop/checkpoint says
>
> ls: `/usr/local/hadoop/checkpoint': No such file or directory
>
> This is what I expected as well since I don't see any checkpoint directory
> under /usr/local/hadoop. Am I missing any configuration variable like
> HADOOP_CONF_DIR ? I am currently not setting that in conf/spark-env.sh
> and thats the only hadoop related environment variable I see. please let me
> know
>
> thanks!
>
>
>
> On Thu, May 25, 2017 at 1:19 AM, kant kodali  wrote:
>
>> Hi Ryan,
>>
>> I did add that print statement and here is what I got.
>>
>> class org.apache.hadoop.hdfs.DistributedFileSystem
>>
>> Thanks!
>>
>> On Wed, May 24, 2017 at 11:39 PM, Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>>> I meant using HDFS command to check the directory. Such as "bin/hadoop
>>> fs -ls /usr/local/hadoop/checkpoint". My hunch is the default file system
>>> in driver probably is the local file system. Could you add the following
>>> line into your code to print the default file system?
>>>
>>> println(org.apache.hadoop.fs.FileSystem.get(sc.hadoopConfigu
>>> ration).getClass)
>>>
>>> On Wed, May 24, 2017 at 5:59 PM, kant kodali  wrote:
>>>
 Hi All,

 I specified hdfsCheckPointDir = /usr/local/hadoop/checkpoint as you
 can see below however I dont see checkpoint directory under my hadoop_home=
 /usr/local/hadoop in either datanodes or namenodes however in datanode
 machine there seems to be some data under

 /usr/local/hadoop/hdfs/namenode/current/BP-1469808024-X.X.X.
 X-1495672725898/current/finalized/subdir0/subdir0

 I thought the checkpoint directory will be created by spark once I
 specify the path but do I manually need to create checkpoint dir using
 mkdir in all spark worker machines? I am new to HDFS as well so please let
 me know. I can try sending df.explain("true") but I have 100 fields in my
 schema so Project looks really big and if this is not a problem for you
 guys I can send that as well.


   +- StreamingRelation 
 DataSource(org.apache.spark.sql.SparkSession@21002393,kafka,List(),None,List(),None,Map(subscribe
  -> analytics2, failOnDataLoss -> false, kafka.bootstrap.servers -> 
 X.X.X.X:9092 , checkpointLocation -> 
 /usr/local/hadoop/checkpoint, startingOffsets -> earliest),None), kafka, 
 [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, 
 timestampType#6]

 *Here is the stack trace*

 StructField(OptionalContext4,StringType,true), 
 StructField(OptionalContext5,StringType,true)),true), cast(value#1 as 
 string)) AS payload#15]
 +- StreamingExecutionRelation 
 KafkaSource[Subscribe[analytics2]], [key#0, value#1, topic#2, partition#3, 
 offset#4L, timestamp#5, timestampType#6]

 at 
 org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:305)
 at 
 org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:191)
 Caused by: org.apache.spark.SparkException: Job aborted due to stage 
 failure: Task 2 in stage 3.0 failed 4 times, most recent failure: Lost 
 task 2.3 in stage 3.0 (TID 222, 172.31.25.189, executor 0): 
 java.lang.IllegalStateException: Error reading delta file 
 /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta of 
 HDFSStateStoreProvider[id = (op=0, part=2), dir = 
 /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2]: 
 

Re: Structured Streaming from Parquet

2017-05-25 Thread upendra 1991
Paul,
Did you try, writing to disk rather than in memory. When files are large 
depending upon which one of quality (performance)/quantity 
You want to have, writing to disk would get the load of executors down and will 
pass to stage where format your data in app2.
Other options are to use Kafka sinks and write from spark App1 to sink and 
spark App2 here would be able to process as the data comes in. Performance of 
App2 would also be better in this case.
Thanks,Upendra.MData Platform Engineer
Sent from Yahoo Mail on Android 
 
  On Thu, May 25, 2017 at 12:47 PM, Burak Yavuz wrote:   Hi 
Paul,
>From what you're describing, it seems that stream1 is possibly generating tons 
>of small files and stream2 is OOMing because it tries to maintain an in-memory 
>list of files. Some notes/questions:
 1. Parquet files are splittable, therefore having large parquet files 
shouldn't be a problem. The larger a parquet file is, the longer the write 
process will take, but the read path shouldn't be adversely affected. 2. How 
many partitions are you writing out to? 3. In order to reduce the number of 
files, you may 
call:`repartition(partitionColumns).writeStream.partitionBy(partitionColumns)` 
so that every trigger, you output only 1 file per partition. After some time, 
you may want to compact files if you don't partition by date.
Best,Burak


On Thu, May 25, 2017 at 7:13 AM, Paul Corley  
wrote:


I have a Spark Structured Streaming process that is implemented in 2 separate 
streaming apps.

 

First App reads .gz, which range in size from 1GB to 9GB compressed, files in 
from s3 filters out invalid records and repartitions the data and outputs to 
parquet on s3 partitioned the same as the stream is partitioned. This process 
produces thousands of files which other processes consume.  The thought on this 
approach was to:

1)  Break the file down to smaller more easily consumed sizes

2)  Allow a more parallelism in the processes that consume the data.

3)  Allow multiple downstream processes to consume data that has already

a.  Had bad records filtered out

b.  Not have to fully read in such large files

 

Second application reads in the files produced by the first app.  This process 
then reformats the data from a row that is:

 

12NDSIN|20170101:123313, 5467;20170115:987

 

into:

12NDSIN, 20170101, 123313

12NDSIN, 20170101, 5467

12NDSIN, 20170115, 987

 

App 1 runs no problems and churns through files in its source directory on s3.  
Total process time for a file is < 10min.  App2 is the one having issues.

 

The source is defined as

val rawReader = sparkSession
  .readStream
  .option("latestFirst", "true")
  .option("maxFilesPerTrigger", batchSize) 
  .schema(rawSchema)
  .parquet(config.getString(" aws.s3.sourcepath"))   ç===Line85

 

output is defined as

val query = output
  .writeStream
  .queryName("bk")
  .format("parquet")
  .partitionBy("expireDate")
  .trigger(ProcessingTime("10 seconds"))
  .option("checkpointLocation",c onfig.getString("spark.app. checkpoint_dir") 
+"/bk")
  .option("path", config.getString("spark.app. s3.output"))
  .start()
  .awaitTermination()

 

If files exist from app 1 app 2 enters a cycle of just cycling throughparquet 
at ProcessFromSource.scala:85   3999/3999

 

If there are a few files output from app1 eventually it will enter the stage 
where it actually processes the data and begins to output, but the more files 
produced by app1 the longer it takes if it ever completes these steps.  With an 
extremely large number of files the app eventually throws a java OOM error. 
Additionally each cycle through this step takes successively longer.

Hopefully someone can lend some insight as to what is actually taking place in 
this step and how to alleviate it

 

 

 

Thanks,

 

Paul Corley| Principle Data Engineer




  


Re: Questions regarding Jobs, Stages and Caching

2017-05-25 Thread Ram Navan
Thank You Stephen and Nicholas.


I specified the schema to spark.read.json() and the time to execute this
instruction got reduced to 4 minutes from original 8 minutes! I also see
only two jobs (instead of three when calling with no schema) created.
Please refer to attachment job0 and job2 from the first message in the
thread.

Now why do we have two jobs to execute spark.read.json? Shouldn't there be
only one job? Each job has 1 state each and 1 tasks at each stage. The
DAG visualization of the stage looks like Parallelize --> mapPartitions -->
map

Thanks
Ram

On Thu, May 25, 2017 at 9:46 AM, Nicholas Hakobian <
nicholas.hakob...@rallyhealth.com> wrote:

> If you do not specify a schema, then the json() function will attempt to
> determine the schema, which requires a full scan of the file. Any
> subsequent actions will again have to read in the data. See the
> documentation at:
>
> http://spark.apache.org/docs/latest/api/python/pyspark.sql.
> html#pyspark.sql.DataFrameReader.json
>
> "If the schema parameter is not specified, this function goes through the
> input once to determine the input schema."
>
>
> Nicholas Szandor Hakobian, Ph.D.
> Senior Data Scientist
> Rally Health
> nicholas.hakob...@rallyhealth.com
>
>
> On Thu, May 25, 2017 at 9:24 AM, Steffen Schmitz <
> steffenschm...@hotmail.de> wrote:
>
>> Hi Ram,
>>
>> spark.read.json() should be evaluated on the first the call of .count().
>> It should then be read into memory once and the rows are counted. After
>> this operation it will be in memory and access will be faster.
>> If you add println statements in between of your function calls you
>> should see start Spark starts to work only after the call of count.
>>
>> Regards,
>> Steffen
>>
>> On 25. May 2017, at 17:02, Ram Navan  wrote:
>>
>> Hi Steffen,
>>
>> Thanks for your response.
>>
>> Isn't spark.read.json() an action function? It reads the files from the
>> source directory, infers the schema and creates a dataframe right?
>> dataframe.cache() prints out this schema as well. I am not sure why
>> dataframe.count() will try to do the same thing again (reading files from
>> source). spark.read.json() and count() - both actions took 8 minutes each
>> in my scenario. I'd expect only one of the action should incur the expenses
>> of reading 19949 files from s3. Am I missing anything?
>>
>> Thank you!
>>
>> Ram
>>
>>
>> On Thu, May 25, 2017 at 1:34 AM, Steffen Schmitz <
>> steffenschm...@hotmail.de> wrote:
>>
>>> Hi Ram,
>>>
>>> Regarding your caching question:
>>> The data frame is evaluated lazy. That means it isn’t cached directly on
>>> invoking of .cache(), but on calling the first action on it (in your case
>>> count).
>>> Then it is loaded into memory and the rows are counted, not on the call
>>> of .cache().
>>> On the second call to count it is already in memory and cached and
>>> that’s why it’s faster.
>>>
>>> I do not know if it’s allowed to recommend resources here, but I really
>>> liked the Big Data Analysis with Spark Course by Heather Miller on Coursera.
>>> And the Spark documentation is also a good place to start.
>>>
>>> Regards,
>>> Steffen
>>>
>>> > On 25. May 2017, at 07:28, ramnavan  wrote:
>>> >
>>> > Hi,
>>> >
>>> > I’m new to Spark and trying to understand the inner workings of Spark
>>> in the
>>> > below mentioned scenarios. I’m using PySpark and Spark 2.1.1
>>> >
>>> > Spark.read.json():
>>> >
>>> > I am running executing this line
>>> > “spark.read.json(‘s3a:///*.json’)” and a cluster with
>>> three
>>> > worker nodes (AWS M4.xlarge instances). The bucket has about 19949 json
>>> > files and the total size is about 4.4 GB. The line created three spark
>>> jobs
>>> > first job with 1 tasks, second job with 19949 tasks and third job
>>> with
>>> > 1 tasks. Each of the jobs have one stage in it. Please refer to the
>>> > attached images job0, job1 and job2.jpg.   job0.jpg
>>> > >> 8708/job0.jpg>
>>> > job1.jpg
>>> > >> 8708/job1.jpg>
>>> > job2.jpg
>>> > >> 8708/job2.jpg>
>>> > I was expecting it to create 1 job with 19949 tasks.  I’d like to
>>> understand
>>> > why there are three jobs instead of just one and why reading json files
>>> > calls for map operation.
>>> >
>>> > Caching and Count():
>>> >
>>> > Once spark reads 19949 json files into a dataframe (let’s call it
>>> files_df),
>>> > I am calling these two operations files_df.createOrReplaceTempVi
>>> ew(“files)
>>> > and files_df.cache(). I am expecting files_df.cache() will cache the
>>> entire
>>> > dataframe in memory so any subsequent operation will be faster. My next
>>> > statement is files_df.count(). This operation took an entire 8.8
>>> minutes and
>>> > it looks like it read the files again from s3 and calculated the count.
>>> > Please refer to attached 

Re: Structured Streaming from Parquet

2017-05-25 Thread Burak Yavuz
Hi Paul,

>From what you're describing, it seems that stream1 is possibly generating
tons of small files and stream2 is OOMing because it tries to maintain an
in-memory list of files. Some notes/questions:

 1. Parquet files are splittable, therefore having large parquet files
shouldn't be a problem. The larger a parquet file is, the longer the write
process will take, but the read path shouldn't be adversely affected.
 2. How many partitions are you writing out to?
 3. In order to reduce the number of files, you may call:
`repartition(partitionColumns).writeStream.partitionBy(partitionColumns)`
so that every trigger, you output only 1 file per partition. After some
time, you may want to compact files if you don't partition by date.

Best,
Burak



On Thu, May 25, 2017 at 7:13 AM, Paul Corley 
wrote:

> I have a Spark Structured Streaming process that is implemented in 2
> separate streaming apps.
>
>
>
> First App reads .gz, which range in size from 1GB to 9GB compressed, files
> in from s3 filters out invalid records and repartitions the data and
> outputs to parquet on s3 partitioned the same as the stream is partitioned.
> This process produces thousands of files which other processes consume.
> The thought on this approach was to:
>
> 1)   Break the file down to smaller more easily consumed sizes
>
> 2)   Allow a more parallelism in the processes that consume the data.
>
> 3)   Allow multiple downstream processes to consume data that has
> already
>
> a.   Had bad records filtered out
>
> b.   Not have to fully read in such large files
>
>
>
> Second application reads in the files produced by the first app.  This
> process then reformats the data from a row that is:
>
>
>
> 12NDSIN|20170101:123313, 5467;20170115:987
>
>
>
> into:
>
> 12NDSIN, 20170101, 123313
>
> 12NDSIN, 20170101, 5467
>
> 12NDSIN, 20170115, 987
>
>
>
> App 1 runs no problems and churns through files in its source directory on
> s3.  Total process time for a file is < 10min.  App2 is the one having
> issues.
>
>
>
> The source is defined as
>
> *val *rawReader = sparkSession
>   .readStream
>   .option(*"latestFirst"*, *"true"*)
>   .option(*"maxFilesPerTrigger"*, batchSize)
>   .schema(rawSchema)
>   .parquet(config.getString(*"aws.s3.sourcepath"*))   ç===Line85
>
>
>
> output is defined as
>
> *val *query = output
>   .writeStream
>   .queryName(*"bk"*)
>   .format(*"parquet"*)
>   .partitionBy(*"expireDate"*)
>   .trigger(*ProcessingTime*(*"10 seconds"*))
>   .option(*"checkpointLocation"*,*config*.getString(
> *"spark.app.checkpoint_dir"*) + *"/bk"*)
>   .option(*"path"*, *config*.getString(*"spark.app.s3.output"*))
>   .start()
>   .awaitTermination()
>
>
>
> If files exist from app 1 app 2 enters a cycle of just cycling through parquet
> at ProcessFromSource.scala:85
> 
>   3999/3999
>
>
>
> If there are a few files output from app1 eventually it will enter the
> stage where it actually processes the data and begins to output, but the
> more files produced by app1 the longer it takes if it ever completes these
> steps.  With an extremely large number of files the app eventually throws a
> java OOM error. Additionally each cycle through this step takes
> successively longer.
>
> Hopefully someone can lend some insight as to what is actually taking
> place in this step and how to alleviate it
>
>
>
>
>
>
>
> Thanks,
>
>
>
> *Paul Corley* | Principle Data Engineer
>
>


Re: Questions regarding Jobs, Stages and Caching

2017-05-25 Thread Nicholas Hakobian
If you do not specify a schema, then the json() function will attempt to
determine the schema, which requires a full scan of the file. Any
subsequent actions will again have to read in the data. See the
documentation at:

http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.json

"If the schema parameter is not specified, this function goes through the
input once to determine the input schema."


Nicholas Szandor Hakobian, Ph.D.
Senior Data Scientist
Rally Health
nicholas.hakob...@rallyhealth.com


On Thu, May 25, 2017 at 9:24 AM, Steffen Schmitz 
wrote:

> Hi Ram,
>
> spark.read.json() should be evaluated on the first the call of .count().
> It should then be read into memory once and the rows are counted. After
> this operation it will be in memory and access will be faster.
> If you add println statements in between of your function calls you should
> see start Spark starts to work only after the call of count.
>
> Regards,
> Steffen
>
> On 25. May 2017, at 17:02, Ram Navan  wrote:
>
> Hi Steffen,
>
> Thanks for your response.
>
> Isn't spark.read.json() an action function? It reads the files from the
> source directory, infers the schema and creates a dataframe right?
> dataframe.cache() prints out this schema as well. I am not sure why
> dataframe.count() will try to do the same thing again (reading files from
> source). spark.read.json() and count() - both actions took 8 minutes each
> in my scenario. I'd expect only one of the action should incur the expenses
> of reading 19949 files from s3. Am I missing anything?
>
> Thank you!
>
> Ram
>
>
> On Thu, May 25, 2017 at 1:34 AM, Steffen Schmitz <
> steffenschm...@hotmail.de> wrote:
>
>> Hi Ram,
>>
>> Regarding your caching question:
>> The data frame is evaluated lazy. That means it isn’t cached directly on
>> invoking of .cache(), but on calling the first action on it (in your case
>> count).
>> Then it is loaded into memory and the rows are counted, not on the call
>> of .cache().
>> On the second call to count it is already in memory and cached and that’s
>> why it’s faster.
>>
>> I do not know if it’s allowed to recommend resources here, but I really
>> liked the Big Data Analysis with Spark Course by Heather Miller on Coursera.
>> And the Spark documentation is also a good place to start.
>>
>> Regards,
>> Steffen
>>
>> > On 25. May 2017, at 07:28, ramnavan  wrote:
>> >
>> > Hi,
>> >
>> > I’m new to Spark and trying to understand the inner workings of Spark
>> in the
>> > below mentioned scenarios. I’m using PySpark and Spark 2.1.1
>> >
>> > Spark.read.json():
>> >
>> > I am running executing this line
>> > “spark.read.json(‘s3a:///*.json’)” and a cluster with
>> three
>> > worker nodes (AWS M4.xlarge instances). The bucket has about 19949 json
>> > files and the total size is about 4.4 GB. The line created three spark
>> jobs
>> > first job with 1 tasks, second job with 19949 tasks and third job
>> with
>> > 1 tasks. Each of the jobs have one stage in it. Please refer to the
>> > attached images job0, job1 and job2.jpg.   job0.jpg
>> > > n28708/job0.jpg>
>> > job1.jpg
>> > > n28708/job1.jpg>
>> > job2.jpg
>> > > n28708/job2.jpg>
>> > I was expecting it to create 1 job with 19949 tasks.  I’d like to
>> understand
>> > why there are three jobs instead of just one and why reading json files
>> > calls for map operation.
>> >
>> > Caching and Count():
>> >
>> > Once spark reads 19949 json files into a dataframe (let’s call it
>> files_df),
>> > I am calling these two operations files_df.createOrReplaceTempVi
>> ew(“files)
>> > and files_df.cache(). I am expecting files_df.cache() will cache the
>> entire
>> > dataframe in memory so any subsequent operation will be faster. My next
>> > statement is files_df.count(). This operation took an entire 8.8
>> minutes and
>> > it looks like it read the files again from s3 and calculated the count.
>> > Please refer to attached count.jpg file for reference.   count.jpg
>> > > n28708/count.jpg>
>> > Why is this happening? If I call files_df.count() for the second time,
>> it
>> > comes back fast within few seconds. Can someone explain this?
>> >
>> > In general, I am looking for a good source to learn about Spark
>> Internals
>> > and try to understand what’s happening beneath the hood.
>> >
>> > Thanks in advance!
>> >
>> > Ram
>> >
>> >
>> >
>> > --
>> > View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Questions-regarding-Jobs-Stages-and-
>> Caching-tp28708.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com
>> .
>> >
>> > 

Re: Questions regarding Jobs, Stages and Caching

2017-05-25 Thread Steffen Schmitz
Hi Ram,

spark.read.json() should be evaluated on the first the call of .count(). It 
should then be read into memory once and the rows are counted. After this 
operation it will be in memory and access will be faster.
If you add println statements in between of your function calls you should see 
start Spark starts to work only after the call of count.

Regards,
Steffen

On 25. May 2017, at 17:02, Ram Navan 
> wrote:

Hi Steffen,

Thanks for your response.

Isn't spark.read.json() an action function? It reads the files from the source 
directory, infers the schema and creates a dataframe right? dataframe.cache() 
prints out this schema as well. I am not sure why dataframe.count() will try to 
do the same thing again (reading files from source). spark.read.json() and 
count() - both actions took 8 minutes each in my scenario. I'd expect only one 
of the action should incur the expenses of reading 19949 files from s3. Am I 
missing anything?

Thank you!

Ram


On Thu, May 25, 2017 at 1:34 AM, Steffen Schmitz 
> wrote:
Hi Ram,

Regarding your caching question:
The data frame is evaluated lazy. That means it isn’t cached directly on 
invoking of .cache(), but on calling the first action on it (in your case 
count).
Then it is loaded into memory and the rows are counted, not on the call of 
.cache().
On the second call to count it is already in memory and cached and that’s why 
it’s faster.

I do not know if it’s allowed to recommend resources here, but I really liked 
the Big Data Analysis with Spark Course by Heather Miller on Coursera.
And the Spark documentation is also a good place to start.

Regards,
Steffen

> On 25. May 2017, at 07:28, ramnavan 
> > wrote:
>
> Hi,
>
> I’m new to Spark and trying to understand the inner workings of Spark in the
> below mentioned scenarios. I’m using PySpark and Spark 2.1.1
>
> Spark.read.json():
>
> I am running executing this line
> “spark.read.json(‘s3a:///*.json’)” and a cluster with three
> worker nodes (AWS M4.xlarge instances). The bucket has about 19949 json
> files and the total size is about 4.4 GB. The line created three spark jobs
> first job with 1 tasks, second job with 19949 tasks and third job with
> 1 tasks. Each of the jobs have one stage in it. Please refer to the
> attached images job0, job1 and job2.jpg.   job0.jpg
> 
> job1.jpg
> 
> job2.jpg
> 
> I was expecting it to create 1 job with 19949 tasks.  I’d like to understand
> why there are three jobs instead of just one and why reading json files
> calls for map operation.
>
> Caching and Count():
>
> Once spark reads 19949 json files into a dataframe (let’s call it files_df),
> I am calling these two operations files_df.createOrReplaceTempView(“files)
> and files_df.cache(). I am expecting files_df.cache() will cache the entire
> dataframe in memory so any subsequent operation will be faster. My next
> statement is files_df.count(). This operation took an entire 8.8 minutes and
> it looks like it read the files again from s3 and calculated the count.
> Please refer to attached count.jpg file for reference.   count.jpg
> 
> Why is this happening? If I call files_df.count() for the second time, it
> comes back fast within few seconds. Can someone explain this?
>
> In general, I am looking for a good source to learn about Spark Internals
> and try to understand what’s happening beneath the hood.
>
> Thanks in advance!
>
> Ram
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Questions-regarding-Jobs-Stages-and-Caching-tp28708.html
> Sent from the Apache Spark User List mailing list archive at 
> Nabble.com.
>
> -
> To unsubscribe e-mail: 
> user-unsubscr...@spark.apache.org
>




--
Ram



Re: Questions regarding Jobs, Stages and Caching

2017-05-25 Thread Ram Navan
Hi Steffen,

Thanks for your response.

Isn't spark.read.json() an action function? It reads the files from the
source directory, infers the schema and creates a dataframe right?
dataframe.cache() prints out this schema as well. I am not sure why
dataframe.count() will try to do the same thing again (reading files from
source). spark.read.json() and count() - both actions took 8 minutes each
in my scenario. I'd expect only one of the action should incur the expenses
of reading 19949 files from s3. Am I missing anything?

Thank you!

Ram


On Thu, May 25, 2017 at 1:34 AM, Steffen Schmitz 
wrote:

> Hi Ram,
>
> Regarding your caching question:
> The data frame is evaluated lazy. That means it isn’t cached directly on
> invoking of .cache(), but on calling the first action on it (in your case
> count).
> Then it is loaded into memory and the rows are counted, not on the call of
> .cache().
> On the second call to count it is already in memory and cached and that’s
> why it’s faster.
>
> I do not know if it’s allowed to recommend resources here, but I really
> liked the Big Data Analysis with Spark Course by Heather Miller on Coursera.
> And the Spark documentation is also a good place to start.
>
> Regards,
> Steffen
>
> > On 25. May 2017, at 07:28, ramnavan  wrote:
> >
> > Hi,
> >
> > I’m new to Spark and trying to understand the inner workings of Spark in
> the
> > below mentioned scenarios. I’m using PySpark and Spark 2.1.1
> >
> > Spark.read.json():
> >
> > I am running executing this line
> > “spark.read.json(‘s3a:///*.json’)” and a cluster with three
> > worker nodes (AWS M4.xlarge instances). The bucket has about 19949 json
> > files and the total size is about 4.4 GB. The line created three spark
> jobs
> > first job with 1 tasks, second job with 19949 tasks and third job
> with
> > 1 tasks. Each of the jobs have one stage in it. Please refer to the
> > attached images job0, job1 and job2.jpg.   job0.jpg
> >  file/n28708/job0.jpg>
> > job1.jpg
> >  file/n28708/job1.jpg>
> > job2.jpg
> >  file/n28708/job2.jpg>
> > I was expecting it to create 1 job with 19949 tasks.  I’d like to
> understand
> > why there are three jobs instead of just one and why reading json files
> > calls for map operation.
> >
> > Caching and Count():
> >
> > Once spark reads 19949 json files into a dataframe (let’s call it
> files_df),
> > I am calling these two operations files_df.createOrReplaceTempView(“
> files)
> > and files_df.cache(). I am expecting files_df.cache() will cache the
> entire
> > dataframe in memory so any subsequent operation will be faster. My next
> > statement is files_df.count(). This operation took an entire 8.8 minutes
> and
> > it looks like it read the files again from s3 and calculated the count.
> > Please refer to attached count.jpg file for reference.   count.jpg
> >  file/n28708/count.jpg>
> > Why is this happening? If I call files_df.count() for the second time, it
> > comes back fast within few seconds. Can someone explain this?
> >
> > In general, I am looking for a good source to learn about Spark Internals
> > and try to understand what’s happening beneath the hood.
> >
> > Thanks in advance!
> >
> > Ram
> >
> >
> >
> > --
> > View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Questions-regarding-Jobs-Stages-and-Caching-tp28708.
> html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>
>


-- 
Ram


access error while trying to run distcp from source cluster

2017-05-25 Thread nancy henry
Hi Team,

I am trying to copy data from A cluster to B cluster and same user for both

I am running distcp command on source cluster A

but i am getting error

17/05/25 07:24:08 INFO mapreduce.Job: Running job: job_1492549627402_344485
17/05/25 07:24:17 INFO mapreduce.Job: Job job_1492549627402_344485 running
in uber mode : false 17/05/25 07:24:17 INFO mapreduce.Job:  map 0% reduce
0% 17/05/25 07:24:26 INFO mapreduce.Job: Task Id :
attempt_1492549627402_344485_m_00_0, Status : FAILED Error:
org.apache.hadoop.security.AccessControlException: User abcde (user id
50006054)  has been denied access to create distcptest2 at
com.mapr.fs.MapRFileSystem.makeDir(MapRFileSystem.java:1282) at
com.mapr.fs.MapRFileSystem.mkdirs(MapRFileSystem.java:1302) at
org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1913) at
org.apache.hadoop.tools.mapred.CopyMapper.map(CopyMapper.java:272)
at org.apache.hadoop.tools.mapred.CopyMapper.map(CopyMapper.java:51)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146) at
org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:796) at
org.apache.hadoop.mapred.MapTask.run(MapTask.java:346) at
org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163) at
java.security.AccessController.doPrivileged(Native Method) at
javax.security.auth.Subject.doAs(Subject.java:415) at
org.apache.hadoop.security.UserGroupInformation.doAs(
UserGroupInformation.java:1595) at org.apache.hadoop.mapred.
YarnChild.main(YarnChild.java:158)
this is the error


Structured Streaming from Parquet

2017-05-25 Thread Paul Corley
I have a Spark Structured Streaming process that is implemented in 2 separate 
streaming apps.

First App reads .gz, which range in size from 1GB to 9GB compressed, files in 
from s3 filters out invalid records and repartitions the data and outputs to 
parquet on s3 partitioned the same as the stream is partitioned. This process 
produces thousands of files which other processes consume.  The thought on this 
approach was to:

1)   Break the file down to smaller more easily consumed sizes

2)   Allow a more parallelism in the processes that consume the data.

3)   Allow multiple downstream processes to consume data that has already

a.   Had bad records filtered out

b.   Not have to fully read in such large files

Second application reads in the files produced by the first app.  This process 
then reformats the data from a row that is:

12NDSIN|20170101:123313, 5467;20170115:987

into:
12NDSIN, 20170101, 123313
12NDSIN, 20170101, 5467
12NDSIN, 20170115, 987

App 1 runs no problems and churns through files in its source directory on s3.  
Total process time for a file is < 10min.  App2 is the one having issues.

The source is defined as
val rawReader = sparkSession
  .readStream
  .option("latestFirst", "true")
  .option("maxFilesPerTrigger", batchSize)
  .schema(rawSchema)
  .parquet(config.getString("aws.s3.sourcepath"))   <=Line85

output is defined as
val query = output
  .writeStream
  .queryName("bk")
  .format("parquet")
  .partitionBy("expireDate")
  .trigger(ProcessingTime("10 seconds"))
  .option("checkpointLocation",config.getString("spark.app.checkpoint_dir") + 
"/bk")
  .option("path", config.getString("spark.app.s3.output"))
  .start()
  .awaitTermination()

If files exist from app 1 app 2 enters a cycle of just cycling through parquet 
at 
ProcessFromSource.scala:85
   3999/3999

If there are a few files output from app1 eventually it will enter the stage 
where it actually processes the data and begins to output, but the more files 
produced by app1 the longer it takes if it ever completes these steps.  With an 
extremely large number of files the app eventually throws a java OOM error. 
Additionally each cycle through this step takes successively longer.

Hopefully someone can lend some insight as to what is actually taking place in 
this step and how to alleviate it



Thanks,

Paul Corley | Principle Data Engineer


user-unsubscr...@spark.apache.org

2017-05-25 Thread williamtellme123
 

 

From: Steffen Schmitz [mailto:steffenschm...@hotmail.de] 
Sent: Thursday, May 25, 2017 3:34 AM
To: ramnavan 
Cc: user@spark.apache.org
Subject: Re: Questions regarding Jobs, Stages and Caching

 

 



unsubscribe

2017-05-25 Thread 信息安全部
unsubscribe


RE: strange warning

2017-05-25 Thread Mendelson, Assaf
Some more info:
It seems this is caused due to complex data structure.
Consider the following simple example:

case class A(v: Int)
case class B(v: A)
val filename = "test"
val a = A(1)
val b = B(a)
val df1: DataFrame = Seq[B](b).toDF
df1.write.parquet(filename)
val df2 = spark.read.parquet(filename)
df2.show()

Any ideas?

Thanks,
  Assaf.

From: Mendelson, Assaf [mailto:assaf.mendel...@rsa.com]
Sent: Thursday, May 25, 2017 9:55 AM
To: user@spark.apache.org
Subject: strange warning

Hi all,

Today, I got the following warning:
[WARN] org.apache.parquet.hadoop.ParquetRecordReader: Can not initialize 
counter due to context is not a instance of TaskInputOutputContext, but is 
org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl

This occurs on one of my tests but not on others (all use parquet). I found 
this https://issues.apache.org/jira/browse/PARQUET-220 but I am using spark 
2.1.0 which uses parquet 1.8 if I am not mistaken. I also found this: 
https://issues.apache.org/jira/browse/SPARK-8118 but again, it is very old. 
Also it only happens on one case where I save my parquet files and not others.

Does anyone know what it means and how to get rid of it?

Thanks,
  Assaf.



Re: Questions regarding Jobs, Stages and Caching

2017-05-25 Thread Steffen Schmitz
Hi Ram,

Regarding your caching question:
The data frame is evaluated lazy. That means it isn’t cached directly on 
invoking of .cache(), but on calling the first action on it (in your case 
count).
Then it is loaded into memory and the rows are counted, not on the call of 
.cache(). 
On the second call to count it is already in memory and cached and that’s why 
it’s faster.

I do not know if it’s allowed to recommend resources here, but I really liked 
the Big Data Analysis with Spark Course by Heather Miller on Coursera.
And the Spark documentation is also a good place to start.

Regards,
Steffen

> On 25. May 2017, at 07:28, ramnavan  wrote:
> 
> Hi,
> 
> I’m new to Spark and trying to understand the inner workings of Spark in the
> below mentioned scenarios. I’m using PySpark and Spark 2.1.1
> 
> Spark.read.json():
> 
> I am running executing this line
> “spark.read.json(‘s3a:///*.json’)” and a cluster with three
> worker nodes (AWS M4.xlarge instances). The bucket has about 19949 json
> files and the total size is about 4.4 GB. The line created three spark jobs
> first job with 1 tasks, second job with 19949 tasks and third job with
> 1 tasks. Each of the jobs have one stage in it. Please refer to the
> attached images job0, job1 and job2.jpg.   job0.jpg
> 
> job1.jpg
> 
> job2.jpg
>    
> I was expecting it to create 1 job with 19949 tasks.  I’d like to understand
> why there are three jobs instead of just one and why reading json files
> calls for map operation.
> 
> Caching and Count():
> 
> Once spark reads 19949 json files into a dataframe (let’s call it files_df),
> I am calling these two operations files_df.createOrReplaceTempView(“files)
> and files_df.cache(). I am expecting files_df.cache() will cache the entire
> dataframe in memory so any subsequent operation will be faster. My next
> statement is files_df.count(). This operation took an entire 8.8 minutes and
> it looks like it read the files again from s3 and calculated the count. 
> Please refer to attached count.jpg file for reference.   count.jpg
>   
> Why is this happening? If I call files_df.count() for the second time, it
> comes back fast within few seconds. Can someone explain this?
> 
> In general, I am looking for a good source to learn about Spark Internals
> and try to understand what’s happening beneath the hood.
> 
> Thanks in advance!
> 
> Ram
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Questions-regarding-Jobs-Stages-and-Caching-tp28708.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 



Re: Running into the same problem as JIRA SPARK-19268

2017-05-25 Thread kant kodali
Executing this bin/hadoop fs -ls /usr/local/hadoop/checkpoint says

ls: `/usr/local/hadoop/checkpoint': No such file or directory

This is what I expected as well since I don't see any checkpoint directory
under /usr/local/hadoop. Am I missing any configuration variable like
HADOOP_CONF_DIR ? I am currently not setting that in conf/spark-env.sh and
thats the only hadoop related environment variable I see. please let me know

thanks!



On Thu, May 25, 2017 at 1:19 AM, kant kodali  wrote:

> Hi Ryan,
>
> I did add that print statement and here is what I got.
>
> class org.apache.hadoop.hdfs.DistributedFileSystem
>
> Thanks!
>
> On Wed, May 24, 2017 at 11:39 PM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> I meant using HDFS command to check the directory. Such as "bin/hadoop fs
>> -ls /usr/local/hadoop/checkpoint". My hunch is the default file system in
>> driver probably is the local file system. Could you add the following line
>> into your code to print the default file system?
>>
>> println(org.apache.hadoop.fs.FileSystem.get(sc.hadoopConfigu
>> ration).getClass)
>>
>> On Wed, May 24, 2017 at 5:59 PM, kant kodali  wrote:
>>
>>> Hi All,
>>>
>>> I specified hdfsCheckPointDir = /usr/local/hadoop/checkpoint as you can
>>> see below however I dont see checkpoint directory under my hadoop_home=
>>> /usr/local/hadoop in either datanodes or namenodes however in datanode
>>> machine there seems to be some data under
>>>
>>> /usr/local/hadoop/hdfs/namenode/current/BP-1469808024-X.X.X.
>>> X-1495672725898/current/finalized/subdir0/subdir0
>>>
>>> I thought the checkpoint directory will be created by spark once I
>>> specify the path but do I manually need to create checkpoint dir using
>>> mkdir in all spark worker machines? I am new to HDFS as well so please let
>>> me know. I can try sending df.explain("true") but I have 100 fields in my
>>> schema so Project looks really big and if this is not a problem for you
>>> guys I can send that as well.
>>>
>>>
>>>   +- StreamingRelation 
>>> DataSource(org.apache.spark.sql.SparkSession@21002393,kafka,List(),None,List(),None,Map(subscribe
>>>  -> analytics2, failOnDataLoss -> false, kafka.bootstrap.servers -> 
>>> X.X.X.X:9092 , checkpointLocation -> 
>>> /usr/local/hadoop/checkpoint, startingOffsets -> earliest),None), kafka, 
>>> [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, 
>>> timestampType#6]
>>>
>>> *Here is the stack trace*
>>>
>>> StructField(OptionalContext4,StringType,true), 
>>> StructField(OptionalContext5,StringType,true)),true), cast(value#1 as 
>>> string)) AS payload#15]
>>> +- StreamingExecutionRelation 
>>> KafkaSource[Subscribe[analytics2]], [key#0, value#1, topic#2, partition#3, 
>>> offset#4L, timestamp#5, timestampType#6]
>>>
>>> at 
>>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:305)
>>> at 
>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:191)
>>> Caused by: org.apache.spark.SparkException: Job aborted due to stage 
>>> failure: Task 2 in stage 3.0 failed 4 times, most recent failure: Lost task 
>>> 2.3 in stage 3.0 (TID 222, 172.31.25.189, executor 0): 
>>> java.lang.IllegalStateException: Error reading delta file 
>>> /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta of 
>>> HDFSStateStoreProvider[id = (op=0, part=2), dir = 
>>> /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2]: 
>>> /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta does 
>>> not exist
>>> at 
>>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:365)
>>> at 
>>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:317)
>>> at 
>>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:314)
>>> at scala.Option.getOrElse(Option.scala:121)
>>> at 
>>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:314)
>>> at 
>>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:313)
>>> at scala.Option.getOrElse(Option.scala:121)
>>> 

Re: Running into the same problem as JIRA SPARK-19268

2017-05-25 Thread kant kodali
Hi Ryan,

I did add that print statement and here is what I got.

class org.apache.hadoop.hdfs.DistributedFileSystem

Thanks!

On Wed, May 24, 2017 at 11:39 PM, Shixiong(Ryan) Zhu <
shixi...@databricks.com> wrote:

> I meant using HDFS command to check the directory. Such as "bin/hadoop fs
> -ls /usr/local/hadoop/checkpoint". My hunch is the default file system in
> driver probably is the local file system. Could you add the following line
> into your code to print the default file system?
>
> println(org.apache.hadoop.fs.FileSystem.get(sc.
> hadoopConfiguration).getClass)
>
> On Wed, May 24, 2017 at 5:59 PM, kant kodali  wrote:
>
>> Hi All,
>>
>> I specified hdfsCheckPointDir = /usr/local/hadoop/checkpoint as you can
>> see below however I dont see checkpoint directory under my hadoop_home=
>> /usr/local/hadoop in either datanodes or namenodes however in datanode
>> machine there seems to be some data under
>>
>> /usr/local/hadoop/hdfs/namenode/current/BP-1469808024-X.X.X.
>> X-1495672725898/current/finalized/subdir0/subdir0
>>
>> I thought the checkpoint directory will be created by spark once I
>> specify the path but do I manually need to create checkpoint dir using
>> mkdir in all spark worker machines? I am new to HDFS as well so please let
>> me know. I can try sending df.explain("true") but I have 100 fields in my
>> schema so Project looks really big and if this is not a problem for you
>> guys I can send that as well.
>>
>>
>>   +- StreamingRelation 
>> DataSource(org.apache.spark.sql.SparkSession@21002393,kafka,List(),None,List(),None,Map(subscribe
>>  -> analytics2, failOnDataLoss -> false, kafka.bootstrap.servers -> 
>> X.X.X.X:9092 , checkpointLocation -> 
>> /usr/local/hadoop/checkpoint, startingOffsets -> earliest),None), kafka, 
>> [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, 
>> timestampType#6]
>>
>> *Here is the stack trace*
>>
>> StructField(OptionalContext4,StringType,true), 
>> StructField(OptionalContext5,StringType,true)),true), cast(value#1 as 
>> string)) AS payload#15]
>> +- StreamingExecutionRelation 
>> KafkaSource[Subscribe[analytics2]], [key#0, value#1, topic#2, partition#3, 
>> offset#4L, timestamp#5, timestampType#6]
>>
>> at 
>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:305)
>> at 
>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:191)
>> Caused by: org.apache.spark.SparkException: Job aborted due to stage 
>> failure: Task 2 in stage 3.0 failed 4 times, most recent failure: Lost task 
>> 2.3 in stage 3.0 (TID 222, 172.31.25.189, executor 0): 
>> java.lang.IllegalStateException: Error reading delta file 
>> /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta of 
>> HDFSStateStoreProvider[id = (op=0, part=2), dir = 
>> /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2]: 
>> /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta does 
>> not exist
>> at 
>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:365)
>> at 
>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:317)
>> at 
>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:314)
>> at scala.Option.getOrElse(Option.scala:121)
>> at 
>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:314)
>> at 
>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:313)
>> at scala.Option.getOrElse(Option.scala:121)
>> at 
>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:313)
>> at 
>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:220)
>> at 
>> org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:186)
>> at 
>> org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:61)
>> at 

Re: Sharing my DataFrame (DataSet) cheat sheet.

2017-05-25 Thread Yan Facai
Thanks, Yuhao.
Similarly, I write a 10-minuters-to-spark-dataframe to share the code
snippets collected by myself.

+
https://github.com/facaiy/Spark-for-the-Impatient/blob/master/doc/10_minuters_to_spark_dataframe.md
+ https://facaiy.github.io/misc/2017/05/24/collection-of-spark-doc.html

I hope that is useful.


On Sun, Mar 5, 2017 at 4:55 AM, Yuhao Yang  wrote:

>
> Sharing some snippets I accumulated during developing with Apache Spark
> DataFrame (DataSet). Hope it can help you in some way.
>
> https://github.com/hhbyyh/DataFrameCheatSheet.
>
> [image: 内嵌图片 1]
>
>
>
>
>
> Regards,
> Yuhao Yang
>


strange warning

2017-05-25 Thread Mendelson, Assaf
Hi all,

Today, I got the following warning:
[WARN] org.apache.parquet.hadoop.ParquetRecordReader: Can not initialize 
counter due to context is not a instance of TaskInputOutputContext, but is 
org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl

This occurs on one of my tests but not on others (all use parquet). I found 
this https://issues.apache.org/jira/browse/PARQUET-220 but I am using spark 
2.1.0 which uses parquet 1.8 if I am not mistaken. I also found this: 
https://issues.apache.org/jira/browse/SPARK-8118 but again, it is very old. 
Also it only happens on one case where I save my parquet files and not others.

Does anyone know what it means and how to get rid of it?

Thanks,
  Assaf.



Re: Running into the same problem as JIRA SPARK-19268

2017-05-25 Thread Shixiong(Ryan) Zhu
I meant using HDFS command to check the directory. Such as "bin/hadoop fs
-ls /usr/local/hadoop/checkpoint". My hunch is the default file system in
driver probably is the local file system. Could you add the following line
into your code to print the default file system?

println(org.apache.hadoop.fs.FileSystem.get(sc.hadoopConfiguration).getClass)

On Wed, May 24, 2017 at 5:59 PM, kant kodali  wrote:

> Hi All,
>
> I specified hdfsCheckPointDir = /usr/local/hadoop/checkpoint as you can
> see below however I dont see checkpoint directory under my hadoop_home=
> /usr/local/hadoop in either datanodes or namenodes however in datanode
> machine there seems to be some data under
>
> /usr/local/hadoop/hdfs/namenode/current/BP-1469808024-X.X.X.X-
> 1495672725898/current/finalized/subdir0/subdir0
>
> I thought the checkpoint directory will be created by spark once I specify
> the path but do I manually need to create checkpoint dir using mkdir in all
> spark worker machines? I am new to HDFS as well so please let me know. I
> can try sending df.explain("true") but I have 100 fields in my schema so
> Project looks really big and if this is not a problem for you guys I can
> send that as well.
>
>
>   +- StreamingRelation 
> DataSource(org.apache.spark.sql.SparkSession@21002393,kafka,List(),None,List(),None,Map(subscribe
>  -> analytics2, failOnDataLoss -> false, kafka.bootstrap.servers -> 
> X.X.X.X:9092 , checkpointLocation -> 
> /usr/local/hadoop/checkpoint, startingOffsets -> earliest),None), kafka, 
> [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, 
> timestampType#6]
>
> *Here is the stack trace*
>
> StructField(OptionalContext4,StringType,true), 
> StructField(OptionalContext5,StringType,true)),true), cast(value#1 as 
> string)) AS payload#15]
> +- StreamingExecutionRelation KafkaSource[Subscribe[analytics2]], 
> [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, 
> timestampType#6]
>
> at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:305)
> at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:191)
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 2 in stage 3.0 failed 4 times, most recent failure: Lost task 2.3 in 
> stage 3.0 (TID 222, 172.31.25.189, executor 0): 
> java.lang.IllegalStateException: Error reading delta file 
> /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta of 
> HDFSStateStoreProvider[id = (op=0, part=2), dir = 
> /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2]: 
> /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta does 
> not exist
> at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:365)
> at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:317)
> at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:314)
> at scala.Option.getOrElse(Option.scala:121)
> at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:314)
> at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:313)
> at scala.Option.getOrElse(Option.scala:121)
> at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:313)
> at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:220)
> at 
> org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:186)
> at 
> org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:61)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>   

One question / kerberos, yarn-cluster -> connection to hbase

2017-05-25 Thread sudhir37
Facing one issue with Kerberos enabled Hadoop/CDH cluster.
 
We are trying to run a streaming job on yarn-cluster, which interacts with
Kafka (direct stream), and hbase.
 
Somehow, we are not able to connect to hbase in the cluster mode. We use
keytab to login to hbase.
 
This is what we do:
spark-submit --master yarn-cluster --keytab "dev.keytab" --principal
"d...@io-int.com"  --conf
"spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j_executor_conf.properties
-XX:+UseG1GC" --conf
"spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j_driver_conf.properties
-XX:+UseG1GC" --conf spark.yarn.stagingDir=hdfs:///tmp/spark/ --files
"job.properties,log4j_driver_conf.properties,log4j_executor_conf.properties"
service-0.0.1-SNAPSHOT.jar job.properties
 
To connect to hbase:
 def getHbaseConnection(properties: SerializedProperties): (Connection,
UserGroupInformation) = {
 
   
val config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum", HBASE_ZOOKEEPER_QUORUM_VALUE);
config.set("hbase.zookeeper.property.clientPort", 2181);
config.set("hadoop.security.authentication", "kerberos");
config.set("hbase.security.authentication", "kerberos");
config.set("hbase.cluster.distributed", "true");
config.set("hbase.rpc.protection", "privacy");
   config.set("hbase.regionserver.kerberos.principal",
“hbase/_h...@io-int.com”);
config.set("hbase.master.kerberos.principal", “hbase/_h...@io-int.com”);
 
UserGroupInformation.setConfiguration(config);
 
 var ugi: UserGroupInformation = null;
  if (SparkFiles.get(properties.keytab) != null
&& (new java.io.File(SparkFiles.get(properties.keytab)).exists)) {
ugi =
UserGroupInformation.loginUserFromKeytabAndReturnUGI(properties.kerberosPrincipal,
  SparkFiles.get(properties.keytab));
  } else {
ugi =
UserGroupInformation.loginUserFromKeytabAndReturnUGI(properties.kerberosPrincipal,
  properties.keytab);
  }
   
 
val connection = ConnectionFactory.createConnection(config);
return (connection, ugi);
  }
 
and we connect to hbase:
 ….foreachRDD { rdd =>
  if (!rdd.isEmpty()) {
//var ugi: UserGroupInformation =
Utils.getHbaseConnection(properties)._2
rdd.foreachPartition { partition =>
  val connection = Utils.getHbaseConnection(propsObj)._1
  val table = …
  partition.foreach { json =>
   
  }
  table.put(puts)
  table.close()
  connection.close()
}
  }
}
 
 
Keytab file is not getting copied to yarn staging/temp directory, we are not
getting that in SparkFiles.get… and if we pass keytab with --files,
spark-submit is failing because it’s there in --keytab already.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/One-question-kerberos-yarn-cluster-connection-to-hbase-tp28709.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org