Re: Spark 2.0 with Hadoop 3.0?

2016-10-28 Thread Zoltán Zvara
Worked for me 2 weeks ago with a 3.0.0-alpha2 snapshot. Just changed
hadoop.version while building.

On Fri, Oct 28, 2016, 11:50 Sean Owen  wrote:

> I don't think it works, but, there is no Hadoop 3.0 right now either. As
> the version implies, it's going to be somewhat different API-wise.
>
> On Thu, Oct 27, 2016 at 11:04 PM adam kramer  wrote:
>
> Is the version of Spark built for Hadoop 2.7 and later only for 2.x
> releases?
>
> Is there any reason why Hadoop 3.0 is a non-starter for use with Spark
> 2.0? The version of aws-sdk in 3.0 actually works for DynamoDB which
> would resolve our driver dependency issues.
>
> Thanks,
> Adam
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark Streaming updateStateByKey Implementation

2015-11-08 Thread Zoltán Zvara
It is implemented with cogroup. Basically it stores states in a separate
RDD and cogroups the target RDD with the state RDD, which is then hidden
from you. See StateDStream.scala, there is everything you need to know.

On Fri, Nov 6, 2015 at 6:25 PM Hien Luu  wrote:

> Hi,
>
> I am interested in learning about the implementation of updateStateByKey.
> Does anyone know of a jira or design doc I read?
>
> I did a quick search and couldn't find much info. on the implementation.
>
> Thanks in advance,
>
> Hien
>


Re: Shuffle Write v/s Shuffle Read

2015-10-02 Thread Zoltán Zvara
Hi,

Shuffle output goes to local disk each time, as far as I know, never to
memory.

On Fri, Oct 2, 2015 at 1:26 PM Adrian Tanase  wrote:

> I’m not sure this is related to memory management – the shuffle is the
> central act of moving data around nodes when the computations need the data
> on another node (E.g. Group by, sort, etc)
>
> Shuffle read and shuffle write should be mirrored on the left/right side
> of a shuffle between 2 stages.
>
> -adrian
>
> From: Kartik Mathur
> Date: Thursday, October 1, 2015 at 10:36 PM
> To: user
> Subject: Shuffle Write v/s Shuffle Read
>
> Hi
>
> I am trying to better understand shuffle in spark .
>
> Based on my understanding thus far ,
>
> *Shuffle Write* : writes stage output for intermediate stage on local
> disk if memory is not sufficient.,
> Example , if each worker has 200 MB memory for intermediate results and
> the results are 300MB then , each executer* will keep 200 MB in memory
> and will write remaining 100 MB on local disk .  *
>
> *Shuffle Read : *Each executer will read from other executer's *memory +
> disk , so total read in above case will be 300(200 from memory and 100 from
> disk)*num of executers ? *
>
> Is my understanding correct ?
>
> Thanks,
> Kartik
>


Re: OutOfMemory error with Spark ML 1.5 logreg example

2015-09-07 Thread Zoltán Zvara
Hey, I'd try to debug, profile ResolvedDataSource. As far as I know, your
write will be performed by the JVM.

On Mon, Sep 7, 2015 at 4:11 PM Tóth Zoltán  wrote:

> Unfortunately I'm getting the same error:
> The other interesting things are that:
>  - the parquet files got actually written to HDFS (also with
> .write.parquet() )
>  - the application gets stuck in the RUNNING state for good even after the
> error is thrown
>
> 15/09/07 10:01:10 INFO spark.ContextCleaner: Cleaned accumulator 19
> 15/09/07 10:01:10 INFO spark.ContextCleaner: Cleaned accumulator 5
> 15/09/07 10:01:12 INFO spark.ContextCleaner: Cleaned accumulator 20
> Exception in thread "Thread-7"
> Exception: java.lang.OutOfMemoryError thrown from the 
> UncaughtExceptionHandler in thread "Thread-7"
> Exception in thread "org.apache.hadoop.hdfs.PeerCache@4070d501"
> Exception: java.lang.OutOfMemoryError thrown from the 
> UncaughtExceptionHandler in thread "org.apache.hadoop.hdfs.PeerCache@4070d501"
> Exception in thread "LeaseRenewer:r...@docker.rapidminer.com:8020"
> Exception: java.lang.OutOfMemoryError thrown from the 
> UncaughtExceptionHandler in thread 
> "LeaseRenewer:r...@docker.rapidminer.com:8020"
> Exception in thread "Reporter"
> Exception: java.lang.OutOfMemoryError thrown from the 
> UncaughtExceptionHandler in thread "Reporter"
> Exception in thread "qtp2134582502-46"
> Exception: java.lang.OutOfMemoryError thrown from the 
> UncaughtExceptionHandler in thread "qtp2134582502-46"
>
>
>
>
> On Mon, Sep 7, 2015 at 3:48 PM, boci  wrote:
>
>> Hi,
>>
>> Can you try to using save method instead of write?
>>
>> ex: out_df.save("path","parquet")
>>
>> b0c1
>>
>>
>> --
>> Skype: boci13, Hangout: boci.b...@gmail.com
>>
>> On Mon, Sep 7, 2015 at 3:35 PM, Zoltán Tóth 
>> wrote:
>>
>>> Aaand, the error! :)
>>>
>>> Exception in thread "org.apache.hadoop.hdfs.PeerCache@4e000abf"
>>> Exception: java.lang.OutOfMemoryError thrown from the 
>>> UncaughtExceptionHandler in thread 
>>> "org.apache.hadoop.hdfs.PeerCache@4e000abf"
>>> Exception in thread "Thread-7"
>>> Exception: java.lang.OutOfMemoryError thrown from the 
>>> UncaughtExceptionHandler in thread "Thread-7"
>>> Exception in thread "LeaseRenewer:r...@docker.rapidminer.com:8020"
>>> Exception: java.lang.OutOfMemoryError thrown from the 
>>> UncaughtExceptionHandler in thread 
>>> "LeaseRenewer:r...@docker.rapidminer.com:8020"
>>> Exception in thread "Reporter"
>>> Exception: java.lang.OutOfMemoryError thrown from the 
>>> UncaughtExceptionHandler in thread "Reporter"
>>> Exception in thread "qtp2115718813-47"
>>> Exception: java.lang.OutOfMemoryError thrown from the 
>>> UncaughtExceptionHandler in thread "qtp2115718813-47"
>>>
>>> Exception: java.lang.OutOfMemoryError thrown from the 
>>> UncaughtExceptionHandler in thread "sparkDriver-scheduler-1"
>>>
>>> Log Type: stdout
>>>
>>> Log Upload Time: Mon Sep 07 09:03:01 -0400 2015
>>>
>>> Log Length: 986
>>>
>>> Traceback (most recent call last):
>>>   File "spark-ml.py", line 33, in 
>>> out_df.write.parquet("/tmp/logparquet")
>>>   File 
>>> "/var/lib/hadoop-yarn/cache/yarn/nm-local-dir/usercache/root/appcache/application_1441224592929_0022/container_1441224592929_0022_01_01/pyspark.zip/pyspark/sql/readwriter.py",
>>>  line 422, in parquet
>>>   File 
>>> "/var/lib/hadoop-yarn/cache/yarn/nm-local-dir/usercache/root/appcache/application_1441224592929_0022/container_1441224592929_0022_01_01/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
>>>  line 538, in __call__
>>>   File 
>>> "/var/lib/hadoop-yarn/cache/yarn/nm-local-dir/usercache/root/appcache/application_1441224592929_0022/container_1441224592929_0022_01_01/pyspark.zip/pyspark/sql/utils.py",
>>>  line 36, in deco
>>>   File 
>>> "/var/lib/hadoop-yarn/cache/yarn/nm-local-dir/usercache/root/appcache/application_1441224592929_0022/container_1441224592929_0022_01_01/py4j-0.8.2.1-src.zip/py4j/protocol.py",
>>>  line 300, in get_return_value
>>> py4j.protocol.Py4JJavaError
>>>
>>>
>>>
>>> On Mon, Sep 7, 2015 at 3:27 PM, Zoltán Tóth 
>>> wrote:
>>>
 Hi,

 When I execute the Spark ML Logisitc Regression example in pyspark I
 run into an OutOfMemory exception. I'm wondering if any of you experienced
 the same or has a hint about how to fix this.

 The interesting bit is that I only get the exception when I try to
 write the result DataFrame into a file. If I only "print" any of the
 results, it all works fine.

 My Setup:
 Spark 1.5.0-SNAPSHOT built for Hadoop 2.6.0 (I'm working with the
 latest nightly build)
 Build flags: -Psparkr -Phadoop-2.6 -Phive -Phive-thriftserver -Pyarn
 -DzincPort=3034

 I'm using the default resource setup
 15/09/07 08:49:04 INFO yarn.YarnAllocator: Will request 2 

Re: What's the best practice for developing new features for spark ?

2015-08-19 Thread Zoltán Zvara
I personally build with SBT and run Spark on YARN with IntelliJ. You need
to connect to remote JVMs with a remote debugger. You also need to do
similar, if you use Python, because it will launch a JVM on the driver
aswell.

On Wed, Aug 19, 2015 at 2:10 PM canan chen ccn...@gmail.com wrote:

 Thanks Ted.  I notice another thread about running spark programmatically
 (client mode for standalone and yarn). Would it be much easier to debug
 spark if is is possible ? Hasn't anyone thought about it ?

 On Wed, Aug 19, 2015 at 5:50 PM, Ted Yu yuzhih...@gmail.com wrote:

 See this thread:


 http://search-hadoop.com/m/q3RTtdZv0d1btRHl/Spark+build+modulesubj=Building+Spark+Building+just+one+module+



  On Aug 19, 2015, at 1:44 AM, canan chen ccn...@gmail.com wrote:
 
  I want to work on one jira, but it is not easy to do unit test, because
 it involves different components especially UI. spark building is pretty
 slow, I don't want to build it each time to test my code change. I am
 wondering how other people do ? Is there any experience can share ? Thanks
 
 





Re: Always two tasks slower than others, and then job fails

2015-08-14 Thread Zoltán Zvara
Data skew is still a problem with Spark.

- If you use groupByKey, try to express your logic by not using groupByKey.
- If you need to use groupByKey, all you can do is to scale vertically.
- If you can, repartition with a finer HashPartitioner. You will have many
tasks for each stage, but tasks are light-weight in Spark, so it should not
introduce a heavy overhead. If you have your own domain-partitioner, try to
rewrite it by introducing a secondary-key.

I hope I gave some insights and help.

On Fri, Aug 14, 2015 at 9:37 AM Jeff Zhang zjf...@gmail.com wrote:

 Data skew ? May your partition key has some special value like null or
 empty string

 On Fri, Aug 14, 2015 at 11:01 AM, randylu randyl...@gmail.com wrote:

   It is strange that there are always two tasks slower than others, and
 the
 corresponding partitions's data are larger, no matter how many partitions?


 Executor ID Address Task Time   Shuffle Read Size
 /
 Records
 1   slave129.vsvs.com:56691 16 s1   99.5 MB / 18865432
 *10 slave317.vsvs.com:59281 0 ms0   413.5 MB / 311001318*
 100 slave290.vsvs.com:60241 19 s1   110.8 MB / 27075926
 101 slave323.vsvs.com:36246 14 s1   126.1 MB / 25052808

   Task time and records of Executor 10 seems strange, and the cpus on the
 node are all 100% busy.

   Anyone meets the same problem,  Thanks in advance for any answer!




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Always-two-tasks-slower-than-others-and-then-job-fails-tp24257.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




 --
 Best Regards

 Jeff Zhang



Re: What is the Effect of Serialization within Stages?

2015-08-13 Thread Zoltán Zvara
Serialization only occurs intra-stage, when you are using Python, and as
far as I know, only in the first stage, when reading the data and passing
it to the Python interpreter the first time.

Multiple operations are just chains of simple *map *and *flatMap *operators
at task level on simple Scala *Iterator of type T*, where T is the type of
RDD.

On Thu, Aug 13, 2015 at 4:09 PM Hemant Bhanawat hemant9...@gmail.com
wrote:

 A chain of map and flatmap does not cause any
 serialization-deserialization.



 On Wed, Aug 12, 2015 at 4:02 PM, Mark Heimann mark.heim...@kard.info
 wrote:

 Hello everyone,

 I am wondering what the effect of serialization is within a stage.

 My understanding of Spark as an execution engine is that the data flow
 graph is divided into stages and a new stage always starts after an
 operation/transformation that cannot be pipelined (such as groupBy or join)
 because it can only be completed after the whole data set has been taken
 care off. At the end of a stage shuffle files are written and at the
 beginning of the next stage they are read from.

 Within a stage my understanding is that pipelining is used, therefore I
 wonder whether there is any serialization overhead involved when there is
 no shuffling taking place. I am also assuming that my data set fits into
 memory and must not be spilled to disk.

 So if I would chain multiple *map* or *flatMap* operations and they end
 up in the same stage, will there be any serialization overhead for piping
 the result of the first *map* operation as a parameter into the
 following *map* operation?

 Any ideas and feedback appreciated, thanks a lot.

 Best regards,
 Mark





Re: YARN mode startup takes too long (10+ secs)

2015-05-08 Thread Zoltán Zvara
So is this sleep occurs before allocating resources for the first few
executors to start the job?

On Fri, May 8, 2015 at 6:23 AM Taeyun Kim taeyun@innowireless.com
wrote:

 I think I’ve found the (maybe partial, but major) reason.



 It’s between the following lines, (it’s newly captured, but essentially
 the same place that Zoltán Zvara picked:



 15/05/08 11:36:32 INFO BlockManagerMaster: Registered BlockManager

 15/05/08 11:36:38 INFO YarnClientSchedulerBackend: Registered executor:
 Actor[akka.tcp://sparkExecutor@cluster04:55237/user/Executor#-149550753]
 with ID 1



 When I read the logs on cluster side, the following lines were found: (the
 exact time is different with above line, but it’s the difference between
 machines)



 15/05/08 11:36:23 INFO yarn.ApplicationMaster: Started progress reporter
 thread - sleep time : 5000

 15/05/08 11:36:28 INFO impl.AMRMClientImpl: Received new token for :
 cluster04:45454



 It seemed that Spark deliberately sleeps 5 secs.

 I’ve read the Spark source code, and in
 org.apache.spark.deploy.yarn.ApplicationMaster.scala, launchReporterThread()
 had the code for that.

 It loops calling allocator.allocateResources() and Thread.sleep().

 For sleep, it reads the configuration variable
 spark.yarn.scheduler.heartbeat.interval-ms (the default value is 5000,
 which is 5 secs).

 According to the comment, “we want to be reasonably responsive without
 causing too many requests to RM”.

 So, unless YARN immediately fulfill the allocation request, it seems that
 5 secs will be wasted.



 When I modified the configuration variable to 1000, it only waited for 1
 sec.



 Here is the log lines after the change:



 15/05/08 11:47:21 INFO yarn.ApplicationMaster: Started progress reporter
 thread - sleep time : 1000

 15/05/08 11:47:22 INFO impl.AMRMClientImpl: Received new token for :
 cluster04:45454



 4 secs saved.

 So, when one does not want to wait 5 secs, one can change the
 spark.yarn.scheduler.heartbeat.interval-ms.

 I hope that the additional overhead it incurs would be negligible.





 *From:* Zoltán Zvara [mailto:zoltan.zv...@gmail.com]
 *Sent:* Thursday, May 07, 2015 10:05 PM
 *To:* Taeyun Kim; user@spark.apache.org
 *Subject:* Re: YARN mode startup takes too long (10+ secs)



 Without considering everything, just a few hints:

 You are running on YARN. From 09:18:34 to 09:18:37 your application is in
 state ACCEPTED. There is a noticeable overhead introduced due to
 communicating with YARN's ResourceManager, NodeManager and given that the
 YARN scheduler needs time to make a decision. I guess somewhere
 from 09:18:38 to 09:18:43 your application JAR gets copied to another
 container requested by the Spark ApplicationMaster deployed on YARN's
 container 0. Deploying an executor needs further resource negotiations with
 the ResourceManager usually. Also, as I said, your JAR and Executor's code
 requires copying to the container's local directory - execution blocked
 until that is complete.



 On Thu, May 7, 2015 at 3:09 AM Taeyun Kim taeyun@innowireless.com
 wrote:

 Hi,



 I’m running a spark application with YARN-client or YARN-cluster mode.

 But it seems to take too long to startup.

 It takes 10+ seconds to initialize the spark context.

 Is this normal? Or can it be optimized?



 The environment is as follows:

 - Hadoop: Hortonworks HDP 2.2 (Hadoop 2.6)

 - Spark: 1.3.1

 - Client: Windows 7, but similar result on CentOS 6.6



 The following is the startup part of the application log. (Some private
 information was edited)

 ‘Main: Initializing context’ at the first line and ‘MainProcessor:
 Deleting previous output files’ at the last line are the logs by the
 application. Others in between are from Spark itself. Application logic is
 executed after this log is displayed.



 ---



 15/05/07 09:18:31 INFO Main: Initializing context

 15/05/07 09:18:31 INFO SparkContext: Running Spark version 1.3.1

 15/05/07 09:18:31 INFO SecurityManager: Changing view acls to: myuser,myapp

 15/05/07 09:18:31 INFO SecurityManager: Changing modify acls to:
 myuser,myapp

 15/05/07 09:18:31 INFO SecurityManager: SecurityManager: authentication
 disabled; ui acls disabled; users with view permissions: Set(myuser,
 myapp); users with modify permissions: Set(myuser, myapp)

 15/05/07 09:18:31 INFO Slf4jLogger: Slf4jLogger started

 15/05/07 09:18:31 INFO Remoting: Starting remoting

 15/05/07 09:18:31 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp://sparkDriver@mymachine:54449]

 15/05/07 09:18:31 INFO Utils: Successfully started service 'sparkDriver'
 on port 54449.

 15/05/07 09:18:31 INFO SparkEnv: Registering MapOutputTracker

 15/05/07 09:18:32 INFO SparkEnv: Registering BlockManagerMaster

 15/05/07 09:18:32 INFO DiskBlockManager: Created local directory at
 C:\Users\myuser\AppData\Local\Temp\spark-2d3db9d6-ea78-438e-956f-be9c1dcf3a9d\blockmgr-e9ade223-a4b8-4d9f-b038-efd66adf9772

 15/05/07 09:18:32 INFO MemoryStore

Re: JAVA for SPARK certification

2015-05-05 Thread Zoltán Zvara
I might join in to this conversation with an ask. Would someone point me to
a decent exercise that would approximate the level of this exam (from
above)? Thanks!

On Tue, May 5, 2015 at 3:37 PM Kartik Mehta kartik.meht...@gmail.com
wrote:

 Production - not whole lot of companies have implemented Spark in
 production and so though it is good to have, not must.

 If you are on LinkedIn, a group of folks including myself are preparing
 for Spark certification, learning in group makes learning easy and fun.

 Kartik
 On May 5, 2015 7:31 AM, ayan guha guha.a...@gmail.com wrote:

 And how important is to have production environment?
 On 5 May 2015 20:51, Stephen Boesch java...@gmail.com wrote:

 There are questions in all three languages.

 2015-05-05 3:49 GMT-07:00 Kartik Mehta kartik.meht...@gmail.com:

 I too have similar question.

 My understanding is since Spark written in scala, having done in Scala
 will be ok for certification.

 If someone who has done certification can confirm.

 Thanks,

 Kartik
 On May 5, 2015 5:57 AM, Gourav Sengupta gourav.sengu...@gmail.com
 wrote:

 Hi,

 how important is JAVA for Spark certification? Will learning only
 Python and Scala not work?


 Regards,
 Gourav





Re: spark-defaults.conf

2015-04-27 Thread Zoltán Zvara
You should distribute your configuration file to workers and set the
appropriate environment variables, like HADOOP_HOME, SPARK_HOME,
HADOOP_CONF_DIR, SPARK_CONF_DIR.

On Mon, Apr 27, 2015 at 12:56 PM James King jakwebin...@gmail.com wrote:

 I renamed spark-defaults.conf.template to spark-defaults.conf
 and invoked

 spark-1.3.0-bin-hadoop2.4/sbin/start-slave.sh

 But I still get

 failed to launch org.apache.spark.deploy.worker.Worker:
 --properties-file FILE   Path to a custom Spark properties file.
  Default is conf/spark-defaults.conf.

 But I'm thinking it should pick up the default spark-defaults.conf from
 conf dir

 Am I expecting or doing something wrong?

 Regards
 jk





Re: How to debug Spark on Yarn?

2015-04-27 Thread Zoltán Zvara
You can check container logs from RM web UI or when log-aggregation is
enabled with the yarn command. There are other, but less convenient options.

On Mon, Apr 27, 2015 at 8:53 AM ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 Spark 1.3

 1. View stderr/stdout from executor from Web UI: when the job is running i
 figured out the executor that am suppose to see, and those two links show 4
 special characters on browser.

 2. Tail on Yarn logs:

 /apache/hadoop/bin/yarn logs -applicationId
 application_1429087638744_151059 | less
 Threw me: Application has not completed. Logs are only available after an
 application completes


 Any other ideas that i can try ?



 On Sat, Apr 25, 2015 at 12:07 AM, Sven Krasser kras...@gmail.com wrote:

 On Fri, Apr 24, 2015 at 11:31 AM, Marcelo Vanzin van...@cloudera.com
 wrote:


 Spark 1.3 should have links to the executor logs in the UI while the
 application is running. Not yet in the history server, though.


 You're absolutely correct -- didn't notice it until now. This is a great
 addition!

 --
 www.skrasser.com http://www.skrasser.com/?utm_source=sig




 --
 Deepak




Re: Complexity of transformations in Spark

2015-04-26 Thread Zoltán Zvara
You can calculate the complexity of these operators by looking at the
RDD.scala basically. There, you will find - for example - what happens when
you call a map on RDDs. It's a simple Scala map function on a simple
Iterator of type T. Distinct has been implemented with mapping and grouping
on the iterator as I resemble.

Zoltán

On Sun, Apr 26, 2015 at 7:43 PM Vijayasarathy Kannan kvi...@vt.edu wrote:

 What is the complexity of transformations and actions in Spark, such as
 groupBy(), flatMap(), collect(), etc.?

 What attributes do we need to factor (such as number of partitions) in
 while analyzing codes using these operations?