Re: Spark 2.0 with Hadoop 3.0?
Worked for me 2 weeks ago with a 3.0.0-alpha2 snapshot. Just changed hadoop.version while building. On Fri, Oct 28, 2016, 11:50 Sean Owenwrote: > I don't think it works, but, there is no Hadoop 3.0 right now either. As > the version implies, it's going to be somewhat different API-wise. > > On Thu, Oct 27, 2016 at 11:04 PM adam kramer wrote: > > Is the version of Spark built for Hadoop 2.7 and later only for 2.x > releases? > > Is there any reason why Hadoop 3.0 is a non-starter for use with Spark > 2.0? The version of aws-sdk in 3.0 actually works for DynamoDB which > would resolve our driver dependency issues. > > Thanks, > Adam > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: Spark Streaming updateStateByKey Implementation
It is implemented with cogroup. Basically it stores states in a separate RDD and cogroups the target RDD with the state RDD, which is then hidden from you. See StateDStream.scala, there is everything you need to know. On Fri, Nov 6, 2015 at 6:25 PM Hien Luuwrote: > Hi, > > I am interested in learning about the implementation of updateStateByKey. > Does anyone know of a jira or design doc I read? > > I did a quick search and couldn't find much info. on the implementation. > > Thanks in advance, > > Hien >
Re: Shuffle Write v/s Shuffle Read
Hi, Shuffle output goes to local disk each time, as far as I know, never to memory. On Fri, Oct 2, 2015 at 1:26 PM Adrian Tanasewrote: > I’m not sure this is related to memory management – the shuffle is the > central act of moving data around nodes when the computations need the data > on another node (E.g. Group by, sort, etc) > > Shuffle read and shuffle write should be mirrored on the left/right side > of a shuffle between 2 stages. > > -adrian > > From: Kartik Mathur > Date: Thursday, October 1, 2015 at 10:36 PM > To: user > Subject: Shuffle Write v/s Shuffle Read > > Hi > > I am trying to better understand shuffle in spark . > > Based on my understanding thus far , > > *Shuffle Write* : writes stage output for intermediate stage on local > disk if memory is not sufficient., > Example , if each worker has 200 MB memory for intermediate results and > the results are 300MB then , each executer* will keep 200 MB in memory > and will write remaining 100 MB on local disk . * > > *Shuffle Read : *Each executer will read from other executer's *memory + > disk , so total read in above case will be 300(200 from memory and 100 from > disk)*num of executers ? * > > Is my understanding correct ? > > Thanks, > Kartik >
Re: OutOfMemory error with Spark ML 1.5 logreg example
Hey, I'd try to debug, profile ResolvedDataSource. As far as I know, your write will be performed by the JVM. On Mon, Sep 7, 2015 at 4:11 PM Tóth Zoltánwrote: > Unfortunately I'm getting the same error: > The other interesting things are that: > - the parquet files got actually written to HDFS (also with > .write.parquet() ) > - the application gets stuck in the RUNNING state for good even after the > error is thrown > > 15/09/07 10:01:10 INFO spark.ContextCleaner: Cleaned accumulator 19 > 15/09/07 10:01:10 INFO spark.ContextCleaner: Cleaned accumulator 5 > 15/09/07 10:01:12 INFO spark.ContextCleaner: Cleaned accumulator 20 > Exception in thread "Thread-7" > Exception: java.lang.OutOfMemoryError thrown from the > UncaughtExceptionHandler in thread "Thread-7" > Exception in thread "org.apache.hadoop.hdfs.PeerCache@4070d501" > Exception: java.lang.OutOfMemoryError thrown from the > UncaughtExceptionHandler in thread "org.apache.hadoop.hdfs.PeerCache@4070d501" > Exception in thread "LeaseRenewer:r...@docker.rapidminer.com:8020" > Exception: java.lang.OutOfMemoryError thrown from the > UncaughtExceptionHandler in thread > "LeaseRenewer:r...@docker.rapidminer.com:8020" > Exception in thread "Reporter" > Exception: java.lang.OutOfMemoryError thrown from the > UncaughtExceptionHandler in thread "Reporter" > Exception in thread "qtp2134582502-46" > Exception: java.lang.OutOfMemoryError thrown from the > UncaughtExceptionHandler in thread "qtp2134582502-46" > > > > > On Mon, Sep 7, 2015 at 3:48 PM, boci wrote: > >> Hi, >> >> Can you try to using save method instead of write? >> >> ex: out_df.save("path","parquet") >> >> b0c1 >> >> >> -- >> Skype: boci13, Hangout: boci.b...@gmail.com >> >> On Mon, Sep 7, 2015 at 3:35 PM, Zoltán Tóth >> wrote: >> >>> Aaand, the error! :) >>> >>> Exception in thread "org.apache.hadoop.hdfs.PeerCache@4e000abf" >>> Exception: java.lang.OutOfMemoryError thrown from the >>> UncaughtExceptionHandler in thread >>> "org.apache.hadoop.hdfs.PeerCache@4e000abf" >>> Exception in thread "Thread-7" >>> Exception: java.lang.OutOfMemoryError thrown from the >>> UncaughtExceptionHandler in thread "Thread-7" >>> Exception in thread "LeaseRenewer:r...@docker.rapidminer.com:8020" >>> Exception: java.lang.OutOfMemoryError thrown from the >>> UncaughtExceptionHandler in thread >>> "LeaseRenewer:r...@docker.rapidminer.com:8020" >>> Exception in thread "Reporter" >>> Exception: java.lang.OutOfMemoryError thrown from the >>> UncaughtExceptionHandler in thread "Reporter" >>> Exception in thread "qtp2115718813-47" >>> Exception: java.lang.OutOfMemoryError thrown from the >>> UncaughtExceptionHandler in thread "qtp2115718813-47" >>> >>> Exception: java.lang.OutOfMemoryError thrown from the >>> UncaughtExceptionHandler in thread "sparkDriver-scheduler-1" >>> >>> Log Type: stdout >>> >>> Log Upload Time: Mon Sep 07 09:03:01 -0400 2015 >>> >>> Log Length: 986 >>> >>> Traceback (most recent call last): >>> File "spark-ml.py", line 33, in >>> out_df.write.parquet("/tmp/logparquet") >>> File >>> "/var/lib/hadoop-yarn/cache/yarn/nm-local-dir/usercache/root/appcache/application_1441224592929_0022/container_1441224592929_0022_01_01/pyspark.zip/pyspark/sql/readwriter.py", >>> line 422, in parquet >>> File >>> "/var/lib/hadoop-yarn/cache/yarn/nm-local-dir/usercache/root/appcache/application_1441224592929_0022/container_1441224592929_0022_01_01/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", >>> line 538, in __call__ >>> File >>> "/var/lib/hadoop-yarn/cache/yarn/nm-local-dir/usercache/root/appcache/application_1441224592929_0022/container_1441224592929_0022_01_01/pyspark.zip/pyspark/sql/utils.py", >>> line 36, in deco >>> File >>> "/var/lib/hadoop-yarn/cache/yarn/nm-local-dir/usercache/root/appcache/application_1441224592929_0022/container_1441224592929_0022_01_01/py4j-0.8.2.1-src.zip/py4j/protocol.py", >>> line 300, in get_return_value >>> py4j.protocol.Py4JJavaError >>> >>> >>> >>> On Mon, Sep 7, 2015 at 3:27 PM, Zoltán Tóth >>> wrote: >>> Hi, When I execute the Spark ML Logisitc Regression example in pyspark I run into an OutOfMemory exception. I'm wondering if any of you experienced the same or has a hint about how to fix this. The interesting bit is that I only get the exception when I try to write the result DataFrame into a file. If I only "print" any of the results, it all works fine. My Setup: Spark 1.5.0-SNAPSHOT built for Hadoop 2.6.0 (I'm working with the latest nightly build) Build flags: -Psparkr -Phadoop-2.6 -Phive -Phive-thriftserver -Pyarn -DzincPort=3034 I'm using the default resource setup 15/09/07 08:49:04 INFO yarn.YarnAllocator: Will request 2
Re: What's the best practice for developing new features for spark ?
I personally build with SBT and run Spark on YARN with IntelliJ. You need to connect to remote JVMs with a remote debugger. You also need to do similar, if you use Python, because it will launch a JVM on the driver aswell. On Wed, Aug 19, 2015 at 2:10 PM canan chen ccn...@gmail.com wrote: Thanks Ted. I notice another thread about running spark programmatically (client mode for standalone and yarn). Would it be much easier to debug spark if is is possible ? Hasn't anyone thought about it ? On Wed, Aug 19, 2015 at 5:50 PM, Ted Yu yuzhih...@gmail.com wrote: See this thread: http://search-hadoop.com/m/q3RTtdZv0d1btRHl/Spark+build+modulesubj=Building+Spark+Building+just+one+module+ On Aug 19, 2015, at 1:44 AM, canan chen ccn...@gmail.com wrote: I want to work on one jira, but it is not easy to do unit test, because it involves different components especially UI. spark building is pretty slow, I don't want to build it each time to test my code change. I am wondering how other people do ? Is there any experience can share ? Thanks
Re: Always two tasks slower than others, and then job fails
Data skew is still a problem with Spark. - If you use groupByKey, try to express your logic by not using groupByKey. - If you need to use groupByKey, all you can do is to scale vertically. - If you can, repartition with a finer HashPartitioner. You will have many tasks for each stage, but tasks are light-weight in Spark, so it should not introduce a heavy overhead. If you have your own domain-partitioner, try to rewrite it by introducing a secondary-key. I hope I gave some insights and help. On Fri, Aug 14, 2015 at 9:37 AM Jeff Zhang zjf...@gmail.com wrote: Data skew ? May your partition key has some special value like null or empty string On Fri, Aug 14, 2015 at 11:01 AM, randylu randyl...@gmail.com wrote: It is strange that there are always two tasks slower than others, and the corresponding partitions's data are larger, no matter how many partitions? Executor ID Address Task Time Shuffle Read Size / Records 1 slave129.vsvs.com:56691 16 s1 99.5 MB / 18865432 *10 slave317.vsvs.com:59281 0 ms0 413.5 MB / 311001318* 100 slave290.vsvs.com:60241 19 s1 110.8 MB / 27075926 101 slave323.vsvs.com:36246 14 s1 126.1 MB / 25052808 Task time and records of Executor 10 seems strange, and the cpus on the node are all 100% busy. Anyone meets the same problem, Thanks in advance for any answer! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Always-two-tasks-slower-than-others-and-then-job-fails-tp24257.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Best Regards Jeff Zhang
Re: What is the Effect of Serialization within Stages?
Serialization only occurs intra-stage, when you are using Python, and as far as I know, only in the first stage, when reading the data and passing it to the Python interpreter the first time. Multiple operations are just chains of simple *map *and *flatMap *operators at task level on simple Scala *Iterator of type T*, where T is the type of RDD. On Thu, Aug 13, 2015 at 4:09 PM Hemant Bhanawat hemant9...@gmail.com wrote: A chain of map and flatmap does not cause any serialization-deserialization. On Wed, Aug 12, 2015 at 4:02 PM, Mark Heimann mark.heim...@kard.info wrote: Hello everyone, I am wondering what the effect of serialization is within a stage. My understanding of Spark as an execution engine is that the data flow graph is divided into stages and a new stage always starts after an operation/transformation that cannot be pipelined (such as groupBy or join) because it can only be completed after the whole data set has been taken care off. At the end of a stage shuffle files are written and at the beginning of the next stage they are read from. Within a stage my understanding is that pipelining is used, therefore I wonder whether there is any serialization overhead involved when there is no shuffling taking place. I am also assuming that my data set fits into memory and must not be spilled to disk. So if I would chain multiple *map* or *flatMap* operations and they end up in the same stage, will there be any serialization overhead for piping the result of the first *map* operation as a parameter into the following *map* operation? Any ideas and feedback appreciated, thanks a lot. Best regards, Mark
Re: YARN mode startup takes too long (10+ secs)
So is this sleep occurs before allocating resources for the first few executors to start the job? On Fri, May 8, 2015 at 6:23 AM Taeyun Kim taeyun@innowireless.com wrote: I think I’ve found the (maybe partial, but major) reason. It’s between the following lines, (it’s newly captured, but essentially the same place that Zoltán Zvara picked: 15/05/08 11:36:32 INFO BlockManagerMaster: Registered BlockManager 15/05/08 11:36:38 INFO YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@cluster04:55237/user/Executor#-149550753] with ID 1 When I read the logs on cluster side, the following lines were found: (the exact time is different with above line, but it’s the difference between machines) 15/05/08 11:36:23 INFO yarn.ApplicationMaster: Started progress reporter thread - sleep time : 5000 15/05/08 11:36:28 INFO impl.AMRMClientImpl: Received new token for : cluster04:45454 It seemed that Spark deliberately sleeps 5 secs. I’ve read the Spark source code, and in org.apache.spark.deploy.yarn.ApplicationMaster.scala, launchReporterThread() had the code for that. It loops calling allocator.allocateResources() and Thread.sleep(). For sleep, it reads the configuration variable spark.yarn.scheduler.heartbeat.interval-ms (the default value is 5000, which is 5 secs). According to the comment, “we want to be reasonably responsive without causing too many requests to RM”. So, unless YARN immediately fulfill the allocation request, it seems that 5 secs will be wasted. When I modified the configuration variable to 1000, it only waited for 1 sec. Here is the log lines after the change: 15/05/08 11:47:21 INFO yarn.ApplicationMaster: Started progress reporter thread - sleep time : 1000 15/05/08 11:47:22 INFO impl.AMRMClientImpl: Received new token for : cluster04:45454 4 secs saved. So, when one does not want to wait 5 secs, one can change the spark.yarn.scheduler.heartbeat.interval-ms. I hope that the additional overhead it incurs would be negligible. *From:* Zoltán Zvara [mailto:zoltan.zv...@gmail.com] *Sent:* Thursday, May 07, 2015 10:05 PM *To:* Taeyun Kim; user@spark.apache.org *Subject:* Re: YARN mode startup takes too long (10+ secs) Without considering everything, just a few hints: You are running on YARN. From 09:18:34 to 09:18:37 your application is in state ACCEPTED. There is a noticeable overhead introduced due to communicating with YARN's ResourceManager, NodeManager and given that the YARN scheduler needs time to make a decision. I guess somewhere from 09:18:38 to 09:18:43 your application JAR gets copied to another container requested by the Spark ApplicationMaster deployed on YARN's container 0. Deploying an executor needs further resource negotiations with the ResourceManager usually. Also, as I said, your JAR and Executor's code requires copying to the container's local directory - execution blocked until that is complete. On Thu, May 7, 2015 at 3:09 AM Taeyun Kim taeyun@innowireless.com wrote: Hi, I’m running a spark application with YARN-client or YARN-cluster mode. But it seems to take too long to startup. It takes 10+ seconds to initialize the spark context. Is this normal? Or can it be optimized? The environment is as follows: - Hadoop: Hortonworks HDP 2.2 (Hadoop 2.6) - Spark: 1.3.1 - Client: Windows 7, but similar result on CentOS 6.6 The following is the startup part of the application log. (Some private information was edited) ‘Main: Initializing context’ at the first line and ‘MainProcessor: Deleting previous output files’ at the last line are the logs by the application. Others in between are from Spark itself. Application logic is executed after this log is displayed. --- 15/05/07 09:18:31 INFO Main: Initializing context 15/05/07 09:18:31 INFO SparkContext: Running Spark version 1.3.1 15/05/07 09:18:31 INFO SecurityManager: Changing view acls to: myuser,myapp 15/05/07 09:18:31 INFO SecurityManager: Changing modify acls to: myuser,myapp 15/05/07 09:18:31 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(myuser, myapp); users with modify permissions: Set(myuser, myapp) 15/05/07 09:18:31 INFO Slf4jLogger: Slf4jLogger started 15/05/07 09:18:31 INFO Remoting: Starting remoting 15/05/07 09:18:31 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@mymachine:54449] 15/05/07 09:18:31 INFO Utils: Successfully started service 'sparkDriver' on port 54449. 15/05/07 09:18:31 INFO SparkEnv: Registering MapOutputTracker 15/05/07 09:18:32 INFO SparkEnv: Registering BlockManagerMaster 15/05/07 09:18:32 INFO DiskBlockManager: Created local directory at C:\Users\myuser\AppData\Local\Temp\spark-2d3db9d6-ea78-438e-956f-be9c1dcf3a9d\blockmgr-e9ade223-a4b8-4d9f-b038-efd66adf9772 15/05/07 09:18:32 INFO MemoryStore
Re: JAVA for SPARK certification
I might join in to this conversation with an ask. Would someone point me to a decent exercise that would approximate the level of this exam (from above)? Thanks! On Tue, May 5, 2015 at 3:37 PM Kartik Mehta kartik.meht...@gmail.com wrote: Production - not whole lot of companies have implemented Spark in production and so though it is good to have, not must. If you are on LinkedIn, a group of folks including myself are preparing for Spark certification, learning in group makes learning easy and fun. Kartik On May 5, 2015 7:31 AM, ayan guha guha.a...@gmail.com wrote: And how important is to have production environment? On 5 May 2015 20:51, Stephen Boesch java...@gmail.com wrote: There are questions in all three languages. 2015-05-05 3:49 GMT-07:00 Kartik Mehta kartik.meht...@gmail.com: I too have similar question. My understanding is since Spark written in scala, having done in Scala will be ok for certification. If someone who has done certification can confirm. Thanks, Kartik On May 5, 2015 5:57 AM, Gourav Sengupta gourav.sengu...@gmail.com wrote: Hi, how important is JAVA for Spark certification? Will learning only Python and Scala not work? Regards, Gourav
Re: spark-defaults.conf
You should distribute your configuration file to workers and set the appropriate environment variables, like HADOOP_HOME, SPARK_HOME, HADOOP_CONF_DIR, SPARK_CONF_DIR. On Mon, Apr 27, 2015 at 12:56 PM James King jakwebin...@gmail.com wrote: I renamed spark-defaults.conf.template to spark-defaults.conf and invoked spark-1.3.0-bin-hadoop2.4/sbin/start-slave.sh But I still get failed to launch org.apache.spark.deploy.worker.Worker: --properties-file FILE Path to a custom Spark properties file. Default is conf/spark-defaults.conf. But I'm thinking it should pick up the default spark-defaults.conf from conf dir Am I expecting or doing something wrong? Regards jk
Re: How to debug Spark on Yarn?
You can check container logs from RM web UI or when log-aggregation is enabled with the yarn command. There are other, but less convenient options. On Mon, Apr 27, 2015 at 8:53 AM ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Spark 1.3 1. View stderr/stdout from executor from Web UI: when the job is running i figured out the executor that am suppose to see, and those two links show 4 special characters on browser. 2. Tail on Yarn logs: /apache/hadoop/bin/yarn logs -applicationId application_1429087638744_151059 | less Threw me: Application has not completed. Logs are only available after an application completes Any other ideas that i can try ? On Sat, Apr 25, 2015 at 12:07 AM, Sven Krasser kras...@gmail.com wrote: On Fri, Apr 24, 2015 at 11:31 AM, Marcelo Vanzin van...@cloudera.com wrote: Spark 1.3 should have links to the executor logs in the UI while the application is running. Not yet in the history server, though. You're absolutely correct -- didn't notice it until now. This is a great addition! -- www.skrasser.com http://www.skrasser.com/?utm_source=sig -- Deepak
Re: Complexity of transformations in Spark
You can calculate the complexity of these operators by looking at the RDD.scala basically. There, you will find - for example - what happens when you call a map on RDDs. It's a simple Scala map function on a simple Iterator of type T. Distinct has been implemented with mapping and grouping on the iterator as I resemble. Zoltán On Sun, Apr 26, 2015 at 7:43 PM Vijayasarathy Kannan kvi...@vt.edu wrote: What is the complexity of transformations and actions in Spark, such as groupBy(), flatMap(), collect(), etc.? What attributes do we need to factor (such as number of partitions) in while analyzing codes using these operations?