Substring in Spark SQL
Hi, I am trying to run the Big Data Benchmark https://amplab.cs.berkeley.edu/benchmark/ , and I am stuck at Query 2 for Spark SQL using Spark 1.0.1: SELECT SUBSTR(sourceIP, 1, X), SUM(adRevenue) FROM uservisits GROUP BY SUBSTR(sourceIP, 1, X) When I look into the sourcecode, it seems that substr is supported by HiveQL, but not by Spark SQL, correct? Thanks! Tom -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Substring-in-Spark-SQL-tp11373.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Retrieve dataset of Big Data Benchmark
Hi Burak, I tried running it through the Spark shell, but I still ended with the same error message as in Hadoop: java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively). I guess the files are publicly available, but only to registered AWS users, so I caved in and registered for the service. Using the credentials that I got I was able to download the files using the local spark shell. Thanks! Tom -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Retrieve-dataset-of-Big-Data-Benchmark-tp9821p10096.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Retrieve dataset of Big Data Benchmark
Hi Burak, Thank you for your pointer, it is really helping out. I do have some consecutive questions though. After looking at the Big Data Benchmark page https://amplab.cs.berkeley.edu/benchmark/ (Section Run this benchmark yourself), I was expecting the following combination of files: Sets: Uservisits, Rankings, Crawl Size: tiny, 1node, 5node Both in text and Sequence file. When looking at http://s3.amazonaws.com/big-data-benchmark/, I only see sequence-snappy/5nodes/_distcp_logs_44js2v part 0 to 103 sequence-snappy/5nodes/_distcp_logs_nclxhd part 0 to 102 sequence-snappy/5nodes/_distcp_logs_vnuhym part 0 to 24 sequence-snappy/5nodes/crawl part 0 to 743 As Crawl is the name of a set I am looking for, I started to download it. Since it was the end of the day and I was going to download it overnight, I just wrote a for loop from 0 to 999 with wget, expecting it to download until 7-something and 404 errors for the others. When I looked at it this morning, I noticed that it all completed downloading. The total Crawl set for 5 nodes should be ~30Gb, I am currently at part 1020 with a total set of 40G. This leads to my (sub)questions: Does anybody know what exactly is still hosted: - Are the tiny and 1node sets still available? - Are the Uservisits and Rankings still available? - Why is the crawl set bigger than expected, and how big is it? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Retrieve-dataset-of-Big-Data-Benchmark-tp9821p9938.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Retrieve dataset of Big Data Benchmark
Hi, I would like to use the dataset used in the Big Data Benchmark https://amplab.cs.berkeley.edu/benchmark/ on my own cluster, to run some tests between Hadoop and Spark. The dataset should be available at s3n://big-data-benchmark/pavlo/[text|text-deflate|sequence|sequence-snappy]/[suffix], in the amazon cluster. Is there a way I can download this without being a user of the Amazon cluster? I tried bin/hadoop distcp s3n://123:456@big-data-benchmark/pavlo/text/tiny/* ./ but it asks for an AWS Access Key ID and Secret Access Key which I do not have. Thanks in advance, Tom -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Retrieve-dataset-of-Big-Data-Benchmark-tp9821.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Is There Any Benchmarks Comparing C++ MPI with Spark
Spark gives you four of the classical collectives: broadcast, reduce, scatter, and gather. There are also a few additional primitives, mostly based on a join. Spark is certainly less optimized than MPI for these, but maybe that isn't such a big deal. Spark has one theoretical disadvantage compared to MPI: every collective operation requires the task closures to be distributed, and---to my knowledge---this is an O(p) operation. (Perhaps there has been some progress on this??) That O(p) term spoils any parallel isoefficiency analysis. In MPI, binaries are distributed once, and wireup is a O(log p). In practice, it prevents reasonable-looking strong scaling curves; with MPI, the overall runtime will stop declining and level off with increasing p, but with Spark it can go up sharply. So, Spark is great for a small cluster. For a huge cluster, or a job with a lot of collectives, it isn't so great. On Mon, Jun 16, 2014 at 1:36 PM, Bertrand Dechoux decho...@gmail.com wrote: I guess you have to understand the difference of architecture. I don't know much about C++ MPI but it is basically MPI whereas Spark is inspired from Hadoop MapReduce and optimised for reading/writing large amount of data with a smart caching and locality strategy. Intuitively, if you have a high ratio CPU/message then MPI might be better. But what is the ratio is hard to say and in the end this ratio will depend on your specific application. Finally, in real life, this difference of performance due to the architecture may not be the only or the most important factor of choice like Michael already explained. Bertrand On Mon, Jun 16, 2014 at 1:23 PM, Michael Cutler mich...@tumra.com wrote: Hello Wei, I talk from experience of writing many HPC distributed application using Open MPI (C/C++) on x86, PowerPC and Cell B.E. processors, and Parallel Virtual Machine (PVM) way before that back in the 90's. I can say with absolute certainty: *Any gains you believe there are because C++ is faster than Java/Scala will be completely blown by the inordinate amount of time you spend debugging your code and/or reinventing the wheel to do even basic tasks like linear regression.* There are undoubtably some very specialised use-cases where MPI and its brethren still dominate for High Performance Computing tasks -- like for example the nuclear decay simulations run by the US Department of Energy on supercomputers where they've invested billions solving that use case. Spark is part of the wider Big Data ecosystem, and its biggest advantages are traction amongst internet scale companies, hundreds of developers contributing to it and a community of thousands using it. Need a distributed fault-tolerant file system? Use HDFS. Need a distributed/fault-tolerant message-queue? Use Kafka. Need to co-ordinate between your worker processes? Use Zookeeper. Need to run it on a flexible grid of computing resources and handle failures? Run it on Mesos! The barrier to entry to get going with Spark is very low, download the latest distribution and start the Spark shell. Language bindings for Scala / Java / Python are excellent meaning you spend less time writing boilerplate code, and more time solving problems. Even if you believe you *need* to use native code to do something specific, like fetching HD video frames from satellite video capture cards -- wrap it in a small native library and use the Java Native Access interface to call it from your Java/Scala code. Have fun, and if you get stuck we're here to help! MC On 16 June 2014 08:17, Wei Da xwd0...@gmail.com wrote: Hi guys, We are making choices between C++ MPI and Spark. Is there any official comparation between them? Thanks a lot! Wei
Re: Failed RC-10 yarn-cluster job for FS closed error when cleaning up staging directory
It sounds like something is closing the hdfs filesystem before everyone is really done with it. The filesystem gets cached and is shared so if someone closes it while other threads are still using it you run into this error. Is your application closing the filesystem? Are you using the event logging feature? Could you share the options you are running with? Yarn will retry the application depending on how the Application Master attempt fails (this is a configurable setting as to how many times it retries). That is probably the second driver you are referring to. But they shouldn't have overlapped as far as both being up at the same time. Is that the case you are seeing? Generally you want to look at why the first application attempt fails. Tom On Wednesday, May 21, 2014 6:10 PM, Kevin Markey kevin.mar...@oracle.com wrote: I tested an application on RC-10 and Hadoop 2.3.0 in yarn-cluster mode that had run successfully with Spark-0.9.1 and Hadoop 2.3 or 2.2. The application successfully ran to conclusion but it ultimately failed. There were 2 anomalies... 1. ASM reported only that the application was ACCEPTED. It never indicated that the application was RUNNING. 14/05/21 16:06:12 INFO yarn.Client: Application report from ASM: application identifier: application_1400696988985_0007 appId: 7 clientToAMToken: null appDiagnostics: appMasterHost: N/A appQueue: default appMasterRpcPort: -1 appStartTime: 1400709970857 yarnAppState: ACCEPTED distributedFinalState: UNDEFINED appTrackingUrl: http://Sleepycat:8088/proxy/application_1400696988985_0007/ appUser: hduser Furthermore, it started a second container, running two partly overlapping drivers, when it appeared that the application never started. Each container ran to conclusion as explained above, taking twice as long as usual for both to complete. Both instances had the same concluding failure. 2. Each instance failed as indicated by the stderr log, finding that the filesystem was closed when trying to clean up the staging directories. 14/05/21 16:08:24 INFO Executor: Serialized size of result for 1453 is 863 14/05/21 16:08:24 INFO Executor: Sending result for 1453 directly to driver 14/05/21 16:08:24 INFO Executor: Finished task ID 1453 14/05/21 16:08:24 INFO TaskSetManager: Finished TID 1453 in 202 ms on localhost (progress: 2/2) 14/05/21 16:08:24 INFO DAGScheduler: Completed ResultTask(1507, 1) 14/05/21 16:08:24 INFO TaskSchedulerImpl: Removed TaskSet 1507.0, whose tasks have all completed, from pool 14/05/21 16:08:24 INFO DAGScheduler: Stage 1507 (count at KEval.scala:32) finished in 0.417 s 14/05/21 16:08:24 INFO SparkContext: Job finished: count at KEval.scala:32, took 1.532789283 s 14/05/21 16:08:24 INFO SparkUI: Stopped Spark web UI at http://dhcp-brm-bl1-215-1e-east-10-135-123-92.usdhcp.oraclecorp.com:42250 14/05/21 16:08:24 INFO DAGScheduler: Stopping DAGScheduler 14/05/21 16:08:25 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor stopped! 14/05/21 16:08:25 INFO ConnectionManager: Selector thread was interrupted! 14/05/21 16:08:25 INFO ConnectionManager: ConnectionManager stopped 14/05/21 16:08:25 INFO MemoryStore: MemoryStore cleared 14/05/21 16:08:25 INFO BlockManager: BlockManager stopped 14/05/21 16:08:25 INFO BlockManagerMasterActor: Stopping BlockManagerMaster 14/05/21 16:08:25 INFO BlockManagerMaster: BlockManagerMaster stopped 14/05/21 16:08:25 INFO SparkContext: Successfully stopped SparkContext 14/05/21 16:08:25 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 14/05/21 16:08:25 INFO ApplicationMaster: finishApplicationMaster with SUCCEEDED 14/05/21 16:08:25 INFO ApplicationMaster: AppMaster received a signal. 14/05/21 16:08:25 INFO ApplicationMaster: Deleting staging directory .sparkStaging/application_1400696988985_0007 14/05/21 16:08:25 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 14/05/21 16:08:25 ERROR ApplicationMaster: Failed to cleanup staging dir .sparkStaging/application_1400696988985_0007 java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:689) at org.apache.hadoop.hdfs.DFSClient.delete(DFSClient.java:1685) at org.apache.hadoop.hdfs.DistributedFileSystem$11.doCall(DistributedFileSystem.java:591) at org.apache.hadoop.hdfs.DistributedFileSystem$11.doCall(DistributedFileSystem.java:587) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.delete(DistributedFileSystem.java:587) at org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$$cleanupStagingDir(ApplicationMaster.scala:371) at org.apache.spark.deploy.yarn.ApplicationMaster$AppMasterShutdownHook.run(ApplicationMaster.scala:386
Re: Spark LIBLINEAR
I've done some comparisons with my own implementation of TRON on Spark. From a distributed computing perspective, it does 2x more local work per iteration than LBFGS, so the parallel isoefficiency is improved slightly. I think the truncated Newton solver holds some potential because there have been some recent work in preconditioners: http://dx.doi.org/10.1016/j.amc.2014.03.006 On Wed, May 14, 2014 at 9:32 AM, Debasish Das debasish.da...@gmail.comwrote: Hi Professor Lin, On our internal datasets, I am getting accuracy at par with glmnet-R for sparse feature selection from liblinear. The default mllib based gradient descent was way off. I did not tune learning rate but I run with varying lambda. Ths feature selection was weak. I used liblinear code. Next I will explore the distributed liblinear. Adding the code on github will definitely help for collaboration. I am experimenting if a bfgs / owlqn based sparse logistic in spark mllib give us accuracy at par with liblinear. If liblinear solver outperforms them (either accuracy/performance) we have to bring tron to mllib and let other algorithms benefit from it as well. We are using Bfgs and Owlqn solvers from breeze opt. Thanks. Deb On May 12, 2014 9:07 PM, DB Tsai dbt...@stanford.edu wrote: It seems that the code isn't managed in github. Can be downloaded from http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/distributed-liblinear/spark/spark-liblinear-1.94.zip It will be easier to track the changes in github. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Mon, May 12, 2014 at 7:53 AM, Xiangrui Meng men...@gmail.com wrote: Hi Chieh-Yen, Great to see the Spark implementation of LIBLINEAR! We will definitely consider adding a wrapper in MLlib to support it. Is the source code on github? Deb, Spark LIBLINEAR uses BSD license, which is compatible with Apache. Best, Xiangrui On Sun, May 11, 2014 at 10:29 AM, Debasish Das debasish.da...@gmail.com wrote: Hello Prof. Lin, Awesome news ! I am curious if you have any benchmarks comparing C++ MPI with Scala Spark liblinear implementations... Is Spark Liblinear apache licensed or there are any specific restrictions on using it ? Except using native blas libraries (which each user has to manage by pulling in their best proprietary BLAS package), all Spark code is Apache licensed. Thanks. Deb On Sun, May 11, 2014 at 3:01 AM, DB Tsai dbt...@stanford.edu wrote: Dear Prof. Lin, Interesting! We had an implementation of L-BFGS in Spark and already merged in the upstream now. We read your paper comparing TRON and OWL-QN for logistic regression with L1 (http://www.csie.ntu.edu.tw/~cjlin/papers/l1.pdf), but it seems that it's not in the distributed setup. Will be very interesting to know the L2 logistic regression benchmark result in Spark with your TRON optimizer and the L-BFGS optimizer against different datasets (sparse, dense, and wide, etc). I'll try your TRON out soon. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Sun, May 11, 2014 at 1:49 AM, Chieh-Yen r01944...@csie.ntu.edu.tw wrote: Dear all, Recently we released a distributed extension of LIBLINEAR at http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/distributed-liblinear/ Currently, TRON for logistic regression and L2-loss SVM is supported. We provided both MPI and Spark implementations. This is very preliminary so your comments are very welcome. Thanks, Chieh-Yen
Re: Spark on Yarn - A small issue !
You need to look at the logs files for yarn. Generally this can be done with yarn logs -applicationId your_app_id. That only works if you have log aggregation enabled though. You should be able to see atleast the application master logs through the yarn resourcemanager web ui. I would try that first. If that doesn't work you can turn on debug in the nodemanager: To review per-container launch environment, increase yarn.nodemanager.delete.debug-delay-sec to a large value (e.g. 36000), and then access the application cache through yarn.nodemanager.local-dirs on the nodes on which containers are launched. This directory contains the launch script, jars, and all environment variables used for launching each container. This process is useful for debugging classpath problems in particular. (Note that enabling this requires admin privileges on cluster settings and a restart of all node managers. Thus, this is not applicable to hosted clusters). Tom On Monday, May 12, 2014 9:38 AM, Sai Prasanna ansaiprasa...@gmail.com wrote: Hi All, I wanted to launch Spark on Yarn, interactive - yarn client mode. With default settings of yarn-site.xml and spark-env.sh, i followed the given link http://spark.apache.org/docs/0.8.1/running-on-yarn.html I get the pi value correct when i run without launching the shell. When i launch the shell, with following command, SPARK_JAR=./assembly/target/scala-2.9.3/spark-assembly-0.8.1-incubating-hadoop2.3.0.jar \ SPARK_YARN_APP_JAR=examples/target/scala-2.9.3/spark-examples-assembly-0.8.1-incubating.jar \ MASTER=yarn-client ./spark-shell And try to create RDDs and do some action on it, i get nothing. After sometime tasks fails. LogFile of spark: 519095 14/05/12 13:30:40 INFO YarnClientClusterScheduler: YarnClientClusterScheduler.postStartHook done 519096 14/05/12 13:30:40 INFO BlockManagerMasterActor$BlockManagerInfo: Registering block manager s1:38355 with 324.4 MB RAM 519097 14/05/12 13:31:38 INFO MemoryStore: ensureFreeSpace(202584) called with curMem=0, maxMem=340147568 519098 14/05/12 13:31:38 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 197.8 KB, free 324.2 MB) 519099 14/05/12 13:31:49 INFO FileInputFormat: Total input paths to process : 1 519100 14/05/12 13:31:49 INFO NetworkTopology: Adding a new node: /default-rack/192.168.1.100:50010 519101 14/05/12 13:31:49 INFO SparkContext: Starting job: top at console:15 519102 14/05/12 13:31:49 INFO DAGScheduler: Got job 0 (top at console:15) with 4 output partitions (allowLocal=false) 519103 14/05/12 13:31:49 INFO DAGScheduler: Final stage: Stage 0 (top at console:15) 519104 14/05/12 13:31:49 INFO DAGScheduler: Parents of final stage: List() 519105 14/05/12 13:31:49 INFO DAGScheduler: Missing parents: List() 519106 14/05/12 13:31:49 INFO DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[2] at top at console:15), which has no missing par ents 519107 14/05/12 13:31:49 INFO DAGScheduler: Submitting 4 missing tasks from Stage 0 (MapPartitionsRDD[2] at top at console:15) 519108 14/05/12 13:31:49 INFO YarnClientClusterScheduler: Adding task set 0.0 with 4 tasks 519109 14/05/12 13:31:49 INFO RackResolver: Resolved s1 to /default-rack 519110 14/05/12 13:31:49 INFO ClusterTaskSetManager: Starting task 0.0:3 as TID 0 on executor 1: s1 (PROCESS_LOCAL) 519111 14/05/12 13:31:49 INFO ClusterTaskSetManager: Serialized task 0.0:3 as 1811 bytes in 4 ms 519112 14/05/12 13:31:49 INFO ClusterTaskSetManager: Starting task 0.0:0 as TID 1 on executor 1: s1 (NODE_LOCAL) 519113 14/05/12 13:31:49 INFO ClusterTaskSetManager: Serialized task 0.0:0 as 1811 bytes in 1 ms 519114 14/05/12 13:32:18INFO YarnClientSchedulerBackend: Executor 1 disconnected, so removing it 519115 14/05/12 13:32:18 ERROR YarnClientClusterScheduler: Lost executor 1 on s1: remote Akka client shutdown 519116 14/05/12 13:32:18 INFO ClusterTaskSetManager: Re-queueing tasks for 1 from TaskSet 0.0 519117 14/05/12 13:32:18 WARN ClusterTaskSetManager: Lost TID 1 (task 0.0:0) 519118 14/05/12 13:32:18 WARN ClusterTaskSetManager: Lost TID 0 (task 0.0:3) 519119 14/05/12 13:32:18 INFO DAGScheduler: Executor lost: 1 (epoch 0) 519120 14/05/12 13:32:18 INFO BlockManagerMasterActor: Trying to remove executor 1 from BlockManagerMaster. 519121 14/05/12 13:32:18 INFO BlockManagerMaster: Removed 1 successfully in removeExecutor Do i need to set any other env-variable specifically for SPARK on YARN. What could be the isuue ?? Can anyone please help me in this regard. Thanks in Advance !!
Re: configure spark history server for running on Yarn
Since 1.0 is still in development you can pick up the latest docs in git: https://github.com/apache/spark/tree/branch-1.0/docs I didn't see anywhere that you said you started the spark history server? there are multiple things that need to happen for the spark history server to work. 1) configure your application to save the history logs - see the eventLog settings here https://github.com/apache/spark/blob/branch-1.0/docs/configuration.md 2) On yarn - know the host/port where you are going to start the spark history server and configure: spark.yarn.historyServer.address to point to it. Note that this purely makes the link from the ResourceManager UI properly point to the Spark History Server Daemon. 3) Start the spark history server pointing to the same directory as specified in your application (spark.eventLog.dir) 4) run your application. once it finishes then you can either go to the RM UI to link to the spark history UI or go directly to the spark history server ui. Tom On Thursday, May 1, 2014 7:09 PM, Jenny Zhao linlin200...@gmail.com wrote: Hi, I have installed spark 1.0 from the branch-1.0, build went fine, and I have tried running the example on Yarn client mode, here is my command: /home/hadoop/spark-branch-1.0/bin/spark-submit /home/hadoop/spark-branch-1.0/examples/target/scala-2.10/spark-examples-1.0.0-hadoop2.2.0.jar --master yarn --deploy-mode client --executor-memory 6g --executor-cores 3 --driver-memory 3g --name SparkPi --num-executors 2 --class org.apache.spark.examples.SparkPi yarn-client 5 after the run, I was not being able to retrieve the log from Yarn's web UI, while I have tried to specify the history server in spark-env.sh export SPARK_DAEMON_JAVA_OPTS=-Dspark.yarn.historyServer.address=master:18080 I also tried to specify it in spark-defaults.conf, doesn't work as well, I would appreciate if someone can tell me what is the way of specifying it either in spark-env.sh or spark-defaults.conf, so that this option can be applied to any spark application. another thing I found is the usage output for spark-submit is not complete/not in sync with the online documentation, hope it is addressed with the formal release. and is this the latest documentation for spark 1.0? http://people.csail.mit.edu/matei/spark-unified-docs/running-on-yarn.html Thank you!
Re: is it okay to reuse objects across RDD's?
As to your last line: I've used RDD zipping to avoid GC since MyBaseData is large and doesn't change. I think this is a very good solution to what is being asked for. On Mon, Apr 28, 2014 at 10:44 AM, Ian O'Connell i...@ianoconnell.com wrote: A mutable map in an object should do what your looking for then I believe. You just reference the object as an object in your closure so it won't be swept up when your closure is serialized and you can reference variables of the object on the remote host then. e.g.: object MyObject { val mmap = scala.collection.mutable.Map[Long, Long]() } rdd.map { ele = MyObject.mmap.getOrElseUpdate(ele, 1L) ... }.map {ele = require(MyObject.mmap(ele) == 1L) }.count Along with the data loss just be careful with thread safety and multiple threads/partitions on one host so the map should be viewed as shared amongst a larger space. Also with your exact description it sounds like your data should be encoded into the RDD if its per-record/per-row: RDD[(MyBaseData, LastIterationSideValues)] On Mon, Apr 28, 2014 at 1:51 AM, Sung Hwan Chung coded...@cs.stanford.edu wrote: In our case, we'd like to keep memory content from one iteration to the next, and not just during a single mapPartition call because then we can do more efficient computations using the values from the previous iteration. So essentially, we need to declare objects outside the scope of the map/reduce calls (but residing in individual workers), then those can be accessed from the map/reduce calls. We'd be making some assumptions as you said, such as - RDD partition is statically located and can't move from worker to another worker unless the worker crashes. On Mon, Apr 28, 2014 at 1:35 AM, Sean Owen so...@cloudera.com wrote: On Mon, Apr 28, 2014 at 9:30 AM, Sung Hwan Chung coded...@cs.stanford.edu wrote: Actually, I do not know how to do something like this or whether this is possible - thus my suggestive statement. Can you already declare persistent memory objects per worker? I tried something like constructing a singleton object within map functions, but that didn't work as it seemed to actually serialize singletons and pass it back and forth in a weird manner. Does it need to be persistent across operations, or just persist for the lifetime of processing of one partition in one mapPartition? The latter is quite easy and might give most of the speedup. Maybe that's 'enough', even if it means you re-cache values several times in a repeated iterative computation. It would certainly avoid managing a lot of complexity in trying to keep that state alive remotely across operations. I'd also be interested if there is any reliable way to do that, though it seems hard since it means you embed assumptions about where particular data is going to be processed.
Re: is it okay to reuse objects across RDD's?
I'm not sure what I said came through. RDD zip is not hacky at all, as it only depends on a user not changing the partitioning. Basically, you would keep your losses as an RDD[Double] and zip whose with the RDD of examples, and update the losses. You're doing a copy (and GC) on the RDD of losses each time, but this is negligible. On Mon, Apr 28, 2014 at 11:33 AM, Sung Hwan Chung coded...@cs.stanford.eduwrote: Yes, this is what we've done as of now (if you read earlier threads). And we were saying that we'd prefer if Spark supported persistent worker memory management in a little bit less hacky way ;) On Mon, Apr 28, 2014 at 8:44 AM, Ian O'Connell i...@ianoconnell.comwrote: A mutable map in an object should do what your looking for then I believe. You just reference the object as an object in your closure so it won't be swept up when your closure is serialized and you can reference variables of the object on the remote host then. e.g.: object MyObject { val mmap = scala.collection.mutable.Map[Long, Long]() } rdd.map { ele = MyObject.mmap.getOrElseUpdate(ele, 1L) ... }.map {ele = require(MyObject.mmap(ele) == 1L) }.count Along with the data loss just be careful with thread safety and multiple threads/partitions on one host so the map should be viewed as shared amongst a larger space. Also with your exact description it sounds like your data should be encoded into the RDD if its per-record/per-row: RDD[(MyBaseData, LastIterationSideValues)] On Mon, Apr 28, 2014 at 1:51 AM, Sung Hwan Chung coded...@cs.stanford.edu wrote: In our case, we'd like to keep memory content from one iteration to the next, and not just during a single mapPartition call because then we can do more efficient computations using the values from the previous iteration. So essentially, we need to declare objects outside the scope of the map/reduce calls (but residing in individual workers), then those can be accessed from the map/reduce calls. We'd be making some assumptions as you said, such as - RDD partition is statically located and can't move from worker to another worker unless the worker crashes. On Mon, Apr 28, 2014 at 1:35 AM, Sean Owen so...@cloudera.com wrote: On Mon, Apr 28, 2014 at 9:30 AM, Sung Hwan Chung coded...@cs.stanford.edu wrote: Actually, I do not know how to do something like this or whether this is possible - thus my suggestive statement. Can you already declare persistent memory objects per worker? I tried something like constructing a singleton object within map functions, but that didn't work as it seemed to actually serialize singletons and pass it back and forth in a weird manner. Does it need to be persistent across operations, or just persist for the lifetime of processing of one partition in one mapPartition? The latter is quite easy and might give most of the speedup. Maybe that's 'enough', even if it means you re-cache values several times in a repeated iterative computation. It would certainly avoid managing a lot of complexity in trying to keep that state alive remotely across operations. I'd also be interested if there is any reliable way to do that, though it seems hard since it means you embed assumptions about where particular data is going to be processed.
Re: is it okay to reuse objects across RDD's?
Right---They are zipped at each iteration. On Mon, Apr 28, 2014 at 11:56 AM, Chester Chen chesterxgc...@yahoo.comwrote: Tom, Are you suggesting two RDDs, one with loss and another for the rest info, using zip to tie them together, but do update on loss RDD (copy) ? Chester Sent from my iPhone On Apr 28, 2014, at 9:45 AM, Tom Vacek minnesota...@gmail.com wrote: I'm not sure what I said came through. RDD zip is not hacky at all, as it only depends on a user not changing the partitioning. Basically, you would keep your losses as an RDD[Double] and zip whose with the RDD of examples, and update the losses. You're doing a copy (and GC) on the RDD of losses each time, but this is negligible. On Mon, Apr 28, 2014 at 11:33 AM, Sung Hwan Chung coded...@cs.stanford.edu wrote: Yes, this is what we've done as of now (if you read earlier threads). And we were saying that we'd prefer if Spark supported persistent worker memory management in a little bit less hacky way ;) On Mon, Apr 28, 2014 at 8:44 AM, Ian O'Connell i...@ianoconnell.comwrote: A mutable map in an object should do what your looking for then I believe. You just reference the object as an object in your closure so it won't be swept up when your closure is serialized and you can reference variables of the object on the remote host then. e.g.: object MyObject { val mmap = scala.collection.mutable.Map[Long, Long]() } rdd.map { ele = MyObject.mmap.getOrElseUpdate(ele, 1L) ... }.map {ele = require(MyObject.mmap(ele) == 1L) }.count Along with the data loss just be careful with thread safety and multiple threads/partitions on one host so the map should be viewed as shared amongst a larger space. Also with your exact description it sounds like your data should be encoded into the RDD if its per-record/per-row: RDD[(MyBaseData, LastIterationSideValues)] On Mon, Apr 28, 2014 at 1:51 AM, Sung Hwan Chung coded...@cs.stanford.edu wrote: In our case, we'd like to keep memory content from one iteration to the next, and not just during a single mapPartition call because then we can do more efficient computations using the values from the previous iteration. So essentially, we need to declare objects outside the scope of the map/reduce calls (but residing in individual workers), then those can be accessed from the map/reduce calls. We'd be making some assumptions as you said, such as - RDD partition is statically located and can't move from worker to another worker unless the worker crashes. On Mon, Apr 28, 2014 at 1:35 AM, Sean Owen so...@cloudera.com wrote: On Mon, Apr 28, 2014 at 9:30 AM, Sung Hwan Chung coded...@cs.stanford.edu wrote: Actually, I do not know how to do something like this or whether this is possible - thus my suggestive statement. Can you already declare persistent memory objects per worker? I tried something like constructing a singleton object within map functions, but that didn't work as it seemed to actually serialize singletons and pass it back and forth in a weird manner. Does it need to be persistent across operations, or just persist for the lifetime of processing of one partition in one mapPartition? The latter is quite easy and might give most of the speedup. Maybe that's 'enough', even if it means you re-cache values several times in a repeated iterative computation. It would certainly avoid managing a lot of complexity in trying to keep that state alive remotely across operations. I'd also be interested if there is any reliable way to do that, though it seems hard since it means you embed assumptions about where particular data is going to be processed.
Re: is it okay to reuse objects across RDD's?
Ian, I tried playing with your suggestion, but I get a task not serializable error (and some obvious things didn't fix it). Can you get that working? On Mon, Apr 28, 2014 at 10:58 AM, Tom Vacek minnesota...@gmail.com wrote: As to your last line: I've used RDD zipping to avoid GC since MyBaseData is large and doesn't change. I think this is a very good solution to what is being asked for. On Mon, Apr 28, 2014 at 10:44 AM, Ian O'Connell i...@ianoconnell.comwrote: A mutable map in an object should do what your looking for then I believe. You just reference the object as an object in your closure so it won't be swept up when your closure is serialized and you can reference variables of the object on the remote host then. e.g.: object MyObject { val mmap = scala.collection.mutable.Map[Long, Long]() } rdd.map { ele = MyObject.mmap.getOrElseUpdate(ele, 1L) ... }.map {ele = require(MyObject.mmap(ele) == 1L) }.count Along with the data loss just be careful with thread safety and multiple threads/partitions on one host so the map should be viewed as shared amongst a larger space. Also with your exact description it sounds like your data should be encoded into the RDD if its per-record/per-row: RDD[(MyBaseData, LastIterationSideValues)] On Mon, Apr 28, 2014 at 1:51 AM, Sung Hwan Chung coded...@cs.stanford.edu wrote: In our case, we'd like to keep memory content from one iteration to the next, and not just during a single mapPartition call because then we can do more efficient computations using the values from the previous iteration. So essentially, we need to declare objects outside the scope of the map/reduce calls (but residing in individual workers), then those can be accessed from the map/reduce calls. We'd be making some assumptions as you said, such as - RDD partition is statically located and can't move from worker to another worker unless the worker crashes. On Mon, Apr 28, 2014 at 1:35 AM, Sean Owen so...@cloudera.com wrote: On Mon, Apr 28, 2014 at 9:30 AM, Sung Hwan Chung coded...@cs.stanford.edu wrote: Actually, I do not know how to do something like this or whether this is possible - thus my suggestive statement. Can you already declare persistent memory objects per worker? I tried something like constructing a singleton object within map functions, but that didn't work as it seemed to actually serialize singletons and pass it back and forth in a weird manner. Does it need to be persistent across operations, or just persist for the lifetime of processing of one partition in one mapPartition? The latter is quite easy and might give most of the speedup. Maybe that's 'enough', even if it means you re-cache values several times in a repeated iterative computation. It would certainly avoid managing a lot of complexity in trying to keep that state alive remotely across operations. I'd also be interested if there is any reliable way to do that, though it seems hard since it means you embed assumptions about where particular data is going to be processed.
Re: is it okay to reuse objects across RDD's?
If you create your auxiliary RDD as a map from the examples, the partitioning will be inherited. On Mon, Apr 28, 2014 at 12:38 PM, Sung Hwan Chung coded...@cs.stanford.eduwrote: That might be a good alternative to what we are looking for. But I wonder if this would be as efficient as we want to. For instance, will RDDs of the same size usually get partitioned to the same machines - thus not triggering any cross machine aligning, etc. We'll explore it, but I would still very much like to see more direct worker memory management besides RDDs. On Mon, Apr 28, 2014 at 10:26 AM, Tom Vacek minnesota...@gmail.comwrote: Right---They are zipped at each iteration. On Mon, Apr 28, 2014 at 11:56 AM, Chester Chen chesterxgc...@yahoo.comwrote: Tom, Are you suggesting two RDDs, one with loss and another for the rest info, using zip to tie them together, but do update on loss RDD (copy) ? Chester Sent from my iPhone On Apr 28, 2014, at 9:45 AM, Tom Vacek minnesota...@gmail.com wrote: I'm not sure what I said came through. RDD zip is not hacky at all, as it only depends on a user not changing the partitioning. Basically, you would keep your losses as an RDD[Double] and zip whose with the RDD of examples, and update the losses. You're doing a copy (and GC) on the RDD of losses each time, but this is negligible. On Mon, Apr 28, 2014 at 11:33 AM, Sung Hwan Chung coded...@cs.stanford.edu wrote: Yes, this is what we've done as of now (if you read earlier threads). And we were saying that we'd prefer if Spark supported persistent worker memory management in a little bit less hacky way ;) On Mon, Apr 28, 2014 at 8:44 AM, Ian O'Connell i...@ianoconnell.comwrote: A mutable map in an object should do what your looking for then I believe. You just reference the object as an object in your closure so it won't be swept up when your closure is serialized and you can reference variables of the object on the remote host then. e.g.: object MyObject { val mmap = scala.collection.mutable.Map[Long, Long]() } rdd.map { ele = MyObject.mmap.getOrElseUpdate(ele, 1L) ... }.map {ele = require(MyObject.mmap(ele) == 1L) }.count Along with the data loss just be careful with thread safety and multiple threads/partitions on one host so the map should be viewed as shared amongst a larger space. Also with your exact description it sounds like your data should be encoded into the RDD if its per-record/per-row: RDD[(MyBaseData, LastIterationSideValues)] On Mon, Apr 28, 2014 at 1:51 AM, Sung Hwan Chung coded...@cs.stanford.edu wrote: In our case, we'd like to keep memory content from one iteration to the next, and not just during a single mapPartition call because then we can do more efficient computations using the values from the previous iteration. So essentially, we need to declare objects outside the scope of the map/reduce calls (but residing in individual workers), then those can be accessed from the map/reduce calls. We'd be making some assumptions as you said, such as - RDD partition is statically located and can't move from worker to another worker unless the worker crashes. On Mon, Apr 28, 2014 at 1:35 AM, Sean Owen so...@cloudera.comwrote: On Mon, Apr 28, 2014 at 9:30 AM, Sung Hwan Chung coded...@cs.stanford.edu wrote: Actually, I do not know how to do something like this or whether this is possible - thus my suggestive statement. Can you already declare persistent memory objects per worker? I tried something like constructing a singleton object within map functions, but that didn't work as it seemed to actually serialize singletons and pass it back and forth in a weird manner. Does it need to be persistent across operations, or just persist for the lifetime of processing of one partition in one mapPartition? The latter is quite easy and might give most of the speedup. Maybe that's 'enough', even if it means you re-cache values several times in a repeated iterative computation. It would certainly avoid managing a lot of complexity in trying to keep that state alive remotely across operations. I'd also be interested if there is any reliable way to do that, though it seems hard since it means you embed assumptions about where particular data is going to be processed.
Re: GraphX: Help understanding the limitations of Pregel
Here are some out-of-the-box ideas: If the elements lie in a fairly small range and/or you're willing to work with limited precision, you could use counting sort. Moreover, you could iteratively find the median using bisection, which would be associative and commutative. It's easy to think of improvements that would make this approach give a reasonable answer in a few iterations. I have no idea about mixing algorithmic iterations with median-finding iterations. On Wed, Apr 23, 2014 at 8:20 PM, Ryan Compton compton.r...@gmail.comwrote: I'm trying shoehorn a label propagation-ish algorithm into GraphX. I need to update each vertex with the median value of their neighbors. Unlike PageRank, which updates each vertex with the mean of their neighbors, I don't have a simple commutative and associative function to use for mergeMsg. What are my options? It looks like I can choose between: 1. a hacky mergeMsg (i.e. combine a,b - Array(a,b) and then do the median in vprog) 2. collectNeighbors and then median 3. ignore GraphX and just do the whole thing with joins (which I actually got working, but its slow) Is there another possibility that I'm missing?
internship opportunity
Thomson Reuters is looking for a graduate (or possibly advanced undergraduate) summer intern in Eagan, MN. This is a chance to work on an innovative project exploring how big data sets can be used by professionals such as lawyers, scientists and journalists. If you're subscribed to this mailing list, you're probably a good fit. http://goo.gl/ti2a3G
Re: Huge matrix
The last writer is suggesting using the triangle inequality to cut down the search space. If c is the centroid of cluster C, then the closest any point in C is to x is ||x-c|| - r(C), where r(C) is the (precomputed) radius of the cluster---the distance of the farthest point in C to c. Whether you want to use the bound for an exact algorithm or a search heuristic is up to you. Another approach can be very successful if the user-attributes matrix is very sparse: The idea is to exploit the additive nature of common similarity measures. I have used this for cosine and Bayesian posterior. Here is the basic idea: A User index contains all the attributes for a certain user (sorted by strength), and an Attribute index contains all the users with a certain attribute. (Again, sorted by strength). To find the best k matches for user i, start with user i's strongest attribute and search the attribute index to find some users with high scores for that attribute. Move to user i's next best attribute. If you proceed with a diagonal frontier (as you fetch candidates bases on i's weaker attributes, also go back and fetch more weak candidates for i's strong attributes), you can get a candidate pool and a bound on the highest score of any user not in the candidate pool. You just expand the candidate pool until the bound drops below the scores of the k best candidates. Practically, you'd want to limit the candidate pool size, but it's almost always exact. For a million users, you should be able to distribute the things needed to make a recommendation (either the centroids or the attributes matrix), and just break up the work based on the users you want to generate recommendations for. I hope this helps. Tom On Sat, Apr 12, 2014 at 11:35 AM, Xiaoli Li lixiaolima...@gmail.com wrote: Hi Guillaume, This sounds a good idea to me. I am a newbie here. Could you further explain how will you determine which clusters to keep? According to the distance between each element with each cluster center? Will you keep several clusters for each element for searching nearest neighbours? Thanks. Xiaoli On Sat, Apr 12, 2014 at 9:46 AM, Guillaume Pitel guillaume.pi...@exensa.com wrote: Hi, I'm doing this here for multiple tens of millions of elements (and the goal is to reach multiple billions), on a relatively small cluster (7 nodes 4 cores 32GB RAM). We use multiprobe KLSH. All you have to do is run a Kmeans on your data, then compute the distance between each element with each cluster center, keep a few clusters and only look into these clusters for nearest neighbours. This method is known to perform very well and vastly speedup your computation The hardest part is to decide how many clusters to compute, and how many to keep. As a rule of thumb, I generally want 300-1 elements per cluster, and use 5-20 clusters. Guillaume I am implementing an algorithm using Spark. I have one million users. I need to compute the similarity between each pair of users using some user's attributes. For each user, I need to get top k most similar users. What is the best way to implement this? Thanks. -- [image: eXenSa] *Guillaume PITEL, Président* +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05 inline: exensa_logo_mail.png
Re: Spark 1.0.0 release plan
Do we have a list of things we really want to get in for 1.X? Perhaps move any jira out to a 1.1 release if we aren't targetting them for 1.0. It might be nice to send out reminders when these dates are approaching. Tom On Thursday, April 3, 2014 11:19 PM, Bhaskar Dutta bhas...@gmail.com wrote: Thanks a lot guys! On Fri, Apr 4, 2014 at 5:34 AM, Patrick Wendell pwend...@gmail.com wrote: Btw - after that initial thread I proposed a slightly more detailed set of dates: https://cwiki.apache.org/confluence/display/SPARK/Wiki+Homepage - Patrick On Thu, Apr 3, 2014 at 11:28 AM, Matei Zaharia matei.zaha...@gmail.com wrote: Hey Bhaskar, this is still the plan, though QAing might take longer than 15 days. Right now since we’ve passed April 1st, the only features considered for a merge are those that had pull requests in review before. (Some big ones are things like annotating the public APIs and simplifying configuration). Bug fixes and things like adding Python / Java APIs for new components will also still be considered. Matei On Apr 3, 2014, at 10:30 AM, Bhaskar Dutta bhas...@gmail.com wrote: Hi, Is there any change in the release plan for Spark 1.0.0-rc1 release date from what is listed in the Proposal for Spark Release Strategy thread? == Tentative Release Window for 1.0.0 == Feb 1st - April 1st: General development April 1st: Code freeze for new features April 15th: RC1 Thanks, Bhaskar
Re: Pig on Spark
I had asked a similar question on the dev mailing list a while back (Jan 22nd). See the archives: http://mail-archives.apache.org/mod_mbox/spark-dev/201401.mbox/browser - look for spork. Basically Matei said: Yup, that was it, though I believe people at Twitter picked it up again recently. I’d suggest asking Dmitriy if you know him. I’ve seen interest in this from several other groups, and if there’s enough of it, maybe we can start another open source repo to track it. The work in that repo you pointed to was done over one week, and already had most of Pig’s operators working. (I helped out with this prototype over Twitter’s hack week.) That work also calls the Scala API directly, because it was done before we had a Java API; it should be easier with the Java one. Tom On Thursday, March 6, 2014 3:11 PM, Sameer Tilak ssti...@live.com wrote: Hi everyone, We are using to Pig to build our data pipeline. I came across Spork -- Pig on Spark at: https://github.com/dvryaboy/pig and not sure if it is still active. Can someone please let me know the status of Spork or any other effort that will let us run Pig on Spark? We can significantly benefit by using Spark, but we would like to keep using the existing Pig scripts.