Substring in Spark SQL

2014-08-04 Thread Tom
Hi,

I am trying to run the  Big Data Benchmark
https://amplab.cs.berkeley.edu/benchmark/  , and I am stuck at Query 2 for
Spark SQL using Spark 1.0.1:
SELECT SUBSTR(sourceIP, 1, X), SUM(adRevenue) FROM uservisits GROUP BY
SUBSTR(sourceIP, 1, X)
When I look into the sourcecode, it seems that substr is supported by
HiveQL, but not by Spark SQL, correct?

Thanks!

Tom



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Substring-in-Spark-SQL-tp11373.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Retrieve dataset of Big Data Benchmark

2014-07-17 Thread Tom
Hi Burak,

I tried running it through the Spark shell, but I still ended with the same
error message as in Hadoop:
java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key
must be specified as the username or password (respectively) of a s3n URL,
or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey
properties (respectively).
I guess the files are publicly available, but only to registered AWS users,
so I caved in and registered for the service. Using the credentials that I
got I was able to download the files using the local spark shell. 

Thanks!

Tom



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Retrieve-dataset-of-Big-Data-Benchmark-tp9821p10096.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Retrieve dataset of Big Data Benchmark

2014-07-16 Thread Tom
Hi Burak,

Thank you for your pointer, it is really helping out. I do have some
consecutive questions though.

After looking at the  Big Data Benchmark page
https://amplab.cs.berkeley.edu/benchmark/   (Section Run this benchmark
yourself), I was expecting the following combination of files:
Sets: Uservisits, Rankings, Crawl
Size: tiny, 1node, 5node
Both in text and Sequence file.

When looking at http://s3.amazonaws.com/big-data-benchmark/, I only see  
sequence-snappy/5nodes/_distcp_logs_44js2v part 0 to 103
sequence-snappy/5nodes/_distcp_logs_nclxhd part 0 to 102
sequence-snappy/5nodes/_distcp_logs_vnuhym part 0 to 24
sequence-snappy/5nodes/crawl part 0 to 743

As Crawl is the name of a set I am looking for, I started to download it.
Since it was the end of the day and I was going to download it overnight, I
just wrote a for loop from 0 to 999 with wget, expecting it to download
until 7-something and 404 errors for the others. When I looked at it this
morning, I noticed that it all completed downloading. The total Crawl set
for 5 nodes should be ~30Gb, I am currently at part 1020 with a total set of
40G. 

This leads to my (sub)questions:
Does anybody know what exactly is still hosted:
- Are the tiny and 1node sets still available? 
- Are the Uservisits and Rankings still available?
- Why is the crawl set bigger than expected, and how big is it?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Retrieve-dataset-of-Big-Data-Benchmark-tp9821p9938.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Retrieve dataset of Big Data Benchmark

2014-07-15 Thread Tom
Hi,

I would like to use the dataset used in the  Big Data Benchmark
https://amplab.cs.berkeley.edu/benchmark/   on my own cluster, to run some
tests between Hadoop and Spark. The dataset should be available at
s3n://big-data-benchmark/pavlo/[text|text-deflate|sequence|sequence-snappy]/[suffix],
in the amazon cluster. Is there a way I can download this without being a
user of the Amazon cluster? I tried 
bin/hadoop distcp s3n://123:456@big-data-benchmark/pavlo/text/tiny/* ./
but it asks for an AWS Access Key ID and Secret Access Key which I do not
have. 

Thanks in advance,

Tom



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Retrieve-dataset-of-Big-Data-Benchmark-tp9821.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Is There Any Benchmarks Comparing C++ MPI with Spark

2014-06-16 Thread Tom Vacek
Spark gives you four of the classical collectives: broadcast, reduce,
scatter, and gather.  There are also a few additional primitives, mostly
based on a join.  Spark is certainly less optimized than MPI for these, but
maybe that isn't such a big deal.  Spark has one theoretical disadvantage
compared to MPI: every collective operation requires the task closures to
be distributed, and---to my knowledge---this is an O(p) operation.
 (Perhaps there has been some progress on this??)  That O(p) term spoils
any parallel isoefficiency analysis.  In MPI, binaries are distributed
once, and wireup is a O(log p).  In practice, it prevents
reasonable-looking strong scaling curves; with MPI, the overall runtime
will stop declining and level off with increasing p, but with Spark it can
go up sharply.  So, Spark is great for a small cluster.  For a huge
cluster, or a job with a lot of collectives, it isn't so great.


On Mon, Jun 16, 2014 at 1:36 PM, Bertrand Dechoux decho...@gmail.com
wrote:

 I guess you have to understand the difference of architecture. I don't
 know much about C++ MPI but it is basically MPI whereas Spark is inspired
 from Hadoop MapReduce and optimised for reading/writing large amount of
 data with a smart caching and locality strategy. Intuitively, if you have a
 high ratio CPU/message then MPI might be better. But what is the ratio is
 hard to say and in the end this ratio will depend on your specific
 application. Finally, in real life, this difference of performance due to
 the architecture may not be the only or the most important factor of choice
 like Michael already explained.

 Bertrand

 On Mon, Jun 16, 2014 at 1:23 PM, Michael Cutler mich...@tumra.com wrote:

 Hello Wei,

 I talk from experience of writing many HPC distributed application using
 Open MPI (C/C++) on x86, PowerPC and Cell B.E. processors, and Parallel
 Virtual Machine (PVM) way before that back in the 90's.  I can say with
 absolute certainty:

 *Any gains you believe there are because C++ is faster than Java/Scala
 will be completely blown by the inordinate amount of time you spend
 debugging your code and/or reinventing the wheel to do even basic tasks
 like linear regression.*


 There are undoubtably some very specialised use-cases where MPI and its
 brethren still dominate for High Performance Computing tasks -- like for
 example the nuclear decay simulations run by the US Department of Energy on
 supercomputers where they've invested billions solving that use case.

 Spark is part of the wider Big Data ecosystem, and its biggest
 advantages are traction amongst internet scale companies, hundreds of
 developers contributing to it and a community of thousands using it.

 Need a distributed fault-tolerant file system? Use HDFS.  Need a
 distributed/fault-tolerant message-queue? Use Kafka.  Need to co-ordinate
 between your worker processes? Use Zookeeper.  Need to run it on a flexible
 grid of computing resources and handle failures? Run it on Mesos!

 The barrier to entry to get going with Spark is very low, download the
 latest distribution and start the Spark shell.  Language bindings for Scala
 / Java / Python are excellent meaning you spend less time writing
 boilerplate code, and more time solving problems.

 Even if you believe you *need* to use native code to do something
 specific, like fetching HD video frames from satellite video capture cards
 -- wrap it in a small native library and use the Java Native Access
 interface to call it from your Java/Scala code.

 Have fun, and if you get stuck we're here to help!

 MC


 On 16 June 2014 08:17, Wei Da xwd0...@gmail.com wrote:

 Hi guys,
 We are making choices between C++ MPI and Spark. Is there any official
 comparation between them? Thanks a lot!

 Wei






Re: Failed RC-10 yarn-cluster job for FS closed error when cleaning up staging directory

2014-05-21 Thread Tom Graves
It sounds like something is closing the hdfs filesystem before everyone is 
really done with it. The filesystem gets cached and is shared so if someone 
closes it while other threads are still using it you run into this error.   Is 
your application closing the filesystem?     Are you using the event logging 
feature?   Could you share the options you are running with?

Yarn will retry the application depending on how the Application Master attempt 
fails (this is a configurable setting as to how many times it retries).  That 
is probably the second driver you are referring to.  But they shouldn't have 
overlapped as far as both being up at the same time. Is that the case you are 
seeing?  Generally you want to look at why the first application attempt fails.

Tom




On Wednesday, May 21, 2014 6:10 PM, Kevin Markey kevin.mar...@oracle.com 
wrote:
 


I tested an application on RC-10 and Hadoop 2.3.0 in yarn-cluster mode that had 
run successfully with Spark-0.9.1 and Hadoop 2.3 or 2.2.  The application 
successfully ran to conclusion but it ultimately failed.  

There were 2 anomalies...

1. ASM reported only that the application was ACCEPTED.  It never
indicated that the application was RUNNING.

14/05/21 16:06:12 INFO yarn.Client: Application report from ASM:
 application identifier: application_1400696988985_0007
 appId: 7
 clientToAMToken: null
 appDiagnostics:
 appMasterHost: N/A
 appQueue: default
 appMasterRpcPort: -1
 appStartTime: 1400709970857
 yarnAppState: ACCEPTED
 distributedFinalState: UNDEFINED
 appTrackingUrl: 
http://Sleepycat:8088/proxy/application_1400696988985_0007/
 appUser: hduser

Furthermore, it started a second container, running two partly overlapping 
drivers, when it appeared that the application never started.  Each container 
ran to conclusion as explained above, taking twice as long as usual for both to 
complete.  Both instances had the same concluding failure.

2. Each instance failed as indicated by the stderr log, finding that
the filesystem was closed when trying to clean up the staging directories.  

14/05/21 16:08:24 INFO Executor: Serialized size of result for 1453 is 863
14/05/21 16:08:24 INFO Executor: Sending result for 1453 directly to driver
14/05/21 16:08:24 INFO Executor: Finished task ID 1453
14/05/21 16:08:24 INFO TaskSetManager: Finished TID 1453 in 202 ms on localhost 
(progress: 2/2)
14/05/21 16:08:24 INFO DAGScheduler: Completed ResultTask(1507, 1)
14/05/21 16:08:24 INFO TaskSchedulerImpl: Removed TaskSet 1507.0, whose tasks 
have all completed, from pool
14/05/21 16:08:24 INFO DAGScheduler: Stage 1507 (count at KEval.scala:32) 
finished in 0.417 s
14/05/21 16:08:24 INFO SparkContext: Job finished: count at KEval.scala:32, 
took 1.532789283 s
14/05/21 16:08:24 INFO SparkUI: Stopped Spark web UI at 
http://dhcp-brm-bl1-215-1e-east-10-135-123-92.usdhcp.oraclecorp.com:42250
14/05/21 16:08:24 INFO DAGScheduler: Stopping DAGScheduler
14/05/21 16:08:25 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor 
stopped!
14/05/21 16:08:25 INFO ConnectionManager: Selector thread was interrupted!
14/05/21 16:08:25 INFO ConnectionManager: ConnectionManager stopped
14/05/21 16:08:25 INFO MemoryStore: MemoryStore cleared
14/05/21 16:08:25 INFO BlockManager: BlockManager stopped
14/05/21 16:08:25 INFO BlockManagerMasterActor: Stopping BlockManagerMaster
14/05/21 16:08:25 INFO BlockManagerMaster: BlockManagerMaster stopped
14/05/21 16:08:25 INFO SparkContext: Successfully stopped SparkContext
14/05/21 16:08:25 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down 
remote daemon.
14/05/21 16:08:25 INFO ApplicationMaster: finishApplicationMaster with SUCCEEDED
14/05/21 16:08:25 INFO ApplicationMaster: AppMaster received a signal.
14/05/21 16:08:25 INFO ApplicationMaster: Deleting staging directory 
.sparkStaging/application_1400696988985_0007
14/05/21 16:08:25 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon 
shut down; proceeding with flushing remote transports.
14/05/21 16:08:25 ERROR ApplicationMaster: Failed to cleanup staging dir 
.sparkStaging/application_1400696988985_0007
java.io.IOException: Filesystem closed
    at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:689)
    at org.apache.hadoop.hdfs.DFSClient.delete(DFSClient.java:1685)
    at
org.apache.hadoop.hdfs.DistributedFileSystem$11.doCall(DistributedFileSystem.java:591)
    at
org.apache.hadoop.hdfs.DistributedFileSystem$11.doCall(DistributedFileSystem.java:587)
    at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at
org.apache.hadoop.hdfs.DistributedFileSystem.delete(DistributedFileSystem.java:587)
    at
org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$$cleanupStagingDir(ApplicationMaster.scala:371)
    at
org.apache.spark.deploy.yarn.ApplicationMaster$AppMasterShutdownHook.run(ApplicationMaster.scala:386

Re: Spark LIBLINEAR

2014-05-16 Thread Tom Vacek
I've done some comparisons with my own implementation of TRON on Spark.
 From a distributed computing perspective, it does 2x more local work per
iteration than LBFGS, so the parallel isoefficiency is improved slightly.
 I think the truncated Newton solver holds some potential because there
have been some recent work in preconditioners:
http://dx.doi.org/10.1016/j.amc.2014.03.006


On Wed, May 14, 2014 at 9:32 AM, Debasish Das debasish.da...@gmail.comwrote:

 Hi Professor Lin,

 On our internal datasets,  I am getting accuracy at par with glmnet-R for
 sparse feature selection from liblinear. The default mllib based gradient
 descent was way off. I did not tune learning rate but I run with varying
 lambda. Ths feature selection was weak.

 I used liblinear code. Next I will explore the distributed liblinear.

 Adding the code on github will definitely help for collaboration.

 I am experimenting if a bfgs / owlqn based sparse logistic in spark mllib
 give us accuracy at par with liblinear.

 If liblinear solver outperforms them (either accuracy/performance) we have
 to bring tron to mllib and let other algorithms benefit from it as well.

 We are using Bfgs and Owlqn solvers from breeze opt.

 Thanks.
 Deb
  On May 12, 2014 9:07 PM, DB Tsai dbt...@stanford.edu wrote:

 It seems that the code isn't managed in github. Can be downloaded from
 http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/distributed-liblinear/spark/spark-liblinear-1.94.zip

 It will be easier to track the changes in github.



 Sincerely,

 DB Tsai
 ---
 My Blog: https://www.dbtsai.com
 LinkedIn: https://www.linkedin.com/in/dbtsai


 On Mon, May 12, 2014 at 7:53 AM, Xiangrui Meng men...@gmail.com wrote:

 Hi Chieh-Yen,

 Great to see the Spark implementation of LIBLINEAR! We will definitely
 consider adding a wrapper in MLlib to support it. Is the source code
 on github?

 Deb, Spark LIBLINEAR uses BSD license, which is compatible with Apache.

 Best,
 Xiangrui

 On Sun, May 11, 2014 at 10:29 AM, Debasish Das debasish.da...@gmail.com
 wrote:
  Hello Prof. Lin,
 
  Awesome news ! I am curious if you have any benchmarks comparing C++
 MPI
  with Scala Spark liblinear implementations...
 
  Is Spark Liblinear apache licensed or there are any specific
 restrictions on
  using it ?
 
  Except using native blas libraries (which each user has to manage by
 pulling
  in their best proprietary BLAS package), all Spark code is Apache
 licensed.
 
  Thanks.
  Deb
 
 
  On Sun, May 11, 2014 at 3:01 AM, DB Tsai dbt...@stanford.edu wrote:
 
  Dear Prof. Lin,
 
  Interesting! We had an implementation of L-BFGS in Spark and already
  merged in the upstream now.
 
  We read your paper comparing TRON and OWL-QN for logistic regression
 with
  L1 (http://www.csie.ntu.edu.tw/~cjlin/papers/l1.pdf), but it seems
 that it's
  not in the distributed setup.
 
  Will be very interesting to know the L2 logistic regression benchmark
  result in Spark with your TRON optimizer and the L-BFGS optimizer
 against
  different datasets (sparse, dense, and wide, etc).
 
  I'll try your TRON out soon.
 
 
  Sincerely,
 
  DB Tsai
  ---
  My Blog: https://www.dbtsai.com
  LinkedIn: https://www.linkedin.com/in/dbtsai
 
 
  On Sun, May 11, 2014 at 1:49 AM, Chieh-Yen r01944...@csie.ntu.edu.tw
 
  wrote:
 
  Dear all,
 
  Recently we released a distributed extension of LIBLINEAR at
 
  http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/distributed-liblinear/
 
  Currently, TRON for logistic regression and L2-loss SVM is supported.
  We provided both MPI and Spark implementations.
  This is very preliminary so your comments are very welcome.
 
  Thanks,
  Chieh-Yen
 
 
 





Re: Spark on Yarn - A small issue !

2014-05-14 Thread Tom Graves
You need to look at the logs files for yarn.  Generally this can be done with 
yarn logs -applicationId your_app_id.  That only works if you have log 
aggregation enabled though.   You should be able to see atleast the application 
master logs through the yarn resourcemanager web ui.  I would try that first. 

If that doesn't work you can turn on debug in the nodemanager:

To review per-container launch environment, increase 
yarn.nodemanager.delete.debug-delay-sec to a large value (e.g. 36000), and then 
access the application cache through yarn.nodemanager.local-dirs on the nodes 
on which containers are launched. This directory contains the launch script, 
jars, and all environment variables used for launching each container. This 
process is useful for debugging classpath problems in particular. (Note that 
enabling this requires admin privileges on cluster settings and a restart of 
all node managers. Thus, this is not applicable to hosted clusters).



Tom


On Monday, May 12, 2014 9:38 AM, Sai Prasanna ansaiprasa...@gmail.com wrote:
 
Hi All, 

I wanted to launch Spark on Yarn, interactive - yarn client mode.

With default settings of yarn-site.xml and spark-env.sh, i followed the given 
link 
http://spark.apache.org/docs/0.8.1/running-on-yarn.html

I get the pi value correct when i run without launching the shell.

When i launch the shell, with following command,
SPARK_JAR=./assembly/target/scala-2.9.3/spark-assembly-0.8.1-incubating-hadoop2.3.0.jar
 \
SPARK_YARN_APP_JAR=examples/target/scala-2.9.3/spark-examples-assembly-0.8.1-incubating.jar
 \
MASTER=yarn-client ./spark-shell
And try to create RDDs and do some action on it, i get nothing. After sometime 
tasks fails.

LogFile of spark: 
519095 14/05/12 13:30:40 INFO YarnClientClusterScheduler: 
YarnClientClusterScheduler.postStartHook done
519096 14/05/12 13:30:40 INFO BlockManagerMasterActor$BlockManagerInfo: 
Registering block manager s1:38355 with 324.4 MB RAM
519097 14/05/12 13:31:38 INFO MemoryStore: ensureFreeSpace(202584) called with 
curMem=0, maxMem=340147568
519098 14/05/12 13:31:38 INFO MemoryStore: Block broadcast_0 stored as values 
to memory (estimated size 197.8 KB, free 324.2 MB)
519099 14/05/12 13:31:49 INFO FileInputFormat: Total input paths to process : 1
519100 14/05/12 13:31:49 INFO NetworkTopology: Adding a new node: 
/default-rack/192.168.1.100:50010
519101 14/05/12 13:31:49 INFO SparkContext: Starting job: top at console:15
519102 14/05/12 13:31:49 INFO DAGScheduler: Got job 0 (top at console:15) 
with 4 output partitions (allowLocal=false)
519103 14/05/12 13:31:49 INFO DAGScheduler: Final stage: Stage 0 (top at 
console:15)
519104 14/05/12 13:31:49 INFO DAGScheduler: Parents of final stage: List()
519105 14/05/12 13:31:49 INFO DAGScheduler: Missing parents: List()
519106 14/05/12 13:31:49 INFO DAGScheduler: Submitting Stage 0 
(MapPartitionsRDD[2] at top at console:15), which has no missing par   
ents
519107 14/05/12 13:31:49 INFO DAGScheduler: Submitting 4 missing tasks from 
Stage 0 (MapPartitionsRDD[2] at top at console:15)
519108 14/05/12 13:31:49 INFO YarnClientClusterScheduler: Adding task set 0.0 
with 4 tasks
519109 14/05/12 13:31:49 INFO RackResolver: Resolved s1 to /default-rack
519110 14/05/12 13:31:49 INFO ClusterTaskSetManager: Starting task 0.0:3 as TID 
0 on executor 1: s1 (PROCESS_LOCAL)
519111 14/05/12 13:31:49 INFO ClusterTaskSetManager: Serialized task 0.0:3 as 
1811 bytes in 4 ms
519112 14/05/12 13:31:49 INFO ClusterTaskSetManager: Starting task 0.0:0 as TID 
1 on executor 1: s1 (NODE_LOCAL)
519113 14/05/12 13:31:49 INFO ClusterTaskSetManager: Serialized task 0.0:0 as 
1811 bytes in 1 ms
519114 14/05/12 13:32:18INFO YarnClientSchedulerBackend: Executor 1 
disconnected, so removing it
519115 14/05/12 13:32:18 ERROR YarnClientClusterScheduler: Lost executor 1 on 
s1: remote Akka client shutdown
519116 14/05/12 13:32:18 INFO ClusterTaskSetManager: Re-queueing tasks for 1 
from TaskSet 0.0
519117 14/05/12 13:32:18 WARN ClusterTaskSetManager: Lost TID 1 (task 0.0:0)
519118 14/05/12 13:32:18 WARN ClusterTaskSetManager: Lost TID 0 (task 0.0:3)
519119 14/05/12 13:32:18 INFO DAGScheduler: Executor lost: 1 (epoch 0)
519120 14/05/12 13:32:18 INFO BlockManagerMasterActor: Trying to remove 
executor 1 from BlockManagerMaster.
519121 14/05/12 13:32:18 INFO BlockManagerMaster: Removed 1 successfully in 
removeExecutor


 Do i need to set any other env-variable specifically for SPARK on YARN. What 
could be the isuue ??


Can anyone please help me in this regard.

Thanks in Advance !!

Re: configure spark history server for running on Yarn

2014-05-05 Thread Tom Graves
Since 1.0 is still in development you can pick up the latest docs in git: 
https://github.com/apache/spark/tree/branch-1.0/docs

I didn't see anywhere that you said you started the spark history server?

there are multiple things that need to happen for the spark history server to 
work.

1) configure your application to save the history logs - see the eventLog 
settings here 
https://github.com/apache/spark/blob/branch-1.0/docs/configuration.md

2) On yarn -  know the host/port where you are going to start the spark history 
server and configure: spark.yarn.historyServer.address to point to it.  Note 
that this purely makes the link from the ResourceManager UI properly point to 
the Spark History Server Daemon.

3) Start the spark history server pointing to the same directory as specified 
in your application (spark.eventLog.dir)

4) run your application. once it finishes then you can either go to the RM UI 
to link to the spark history UI or go directly to the spark history server ui.

Tom
On Thursday, May 1, 2014 7:09 PM, Jenny Zhao linlin200...@gmail.com wrote:
 
Hi,

I have installed spark 1.0 from the branch-1.0, build went fine, and I have 
tried running the example on Yarn client mode, here is my command: 

/home/hadoop/spark-branch-1.0/bin/spark-submit 
/home/hadoop/spark-branch-1.0/examples/target/scala-2.10/spark-examples-1.0.0-hadoop2.2.0.jar
 --master yarn --deploy-mode client --executor-memory 6g --executor-cores 3 
--driver-memory 3g --name SparkPi --num-executors 2 --class 
org.apache.spark.examples.SparkPi yarn-client 5

after the run, I was not being able to retrieve the log from Yarn's web UI, 
while I have tried to specify the history server in spark-env.sh 

export SPARK_DAEMON_JAVA_OPTS=-Dspark.yarn.historyServer.address=master:18080


I also tried to specify it in spark-defaults.conf, doesn't work as well, I 
would appreciate if someone can tell me what is the way of specifying it either 
in spark-env.sh or spark-defaults.conf, so that this option can be applied to 
any spark application. 


another thing I found is the usage output for spark-submit is not complete/not 
in sync with the online documentation, hope it is addressed with the formal 
release. 

and is this the latest documentation for spark 1.0? 
http://people.csail.mit.edu/matei/spark-unified-docs/running-on-yarn.html

Thank you! 

Re: is it okay to reuse objects across RDD's?

2014-04-28 Thread Tom Vacek
As to your last line: I've used RDD zipping to avoid GC since MyBaseData is
large and doesn't change.  I think this is a very good solution to what is
being asked for.


On Mon, Apr 28, 2014 at 10:44 AM, Ian O'Connell i...@ianoconnell.com wrote:

 A mutable map in an object should do what your looking for then I believe.
 You just reference the object as an object in your closure so it won't be
 swept up when your closure is serialized and you can reference variables of
 the object on the remote host then. e.g.:

 object MyObject {
   val mmap = scala.collection.mutable.Map[Long, Long]()
 }

 rdd.map { ele =
 MyObject.mmap.getOrElseUpdate(ele, 1L)
 ...
 }.map {ele =
 require(MyObject.mmap(ele) == 1L)

 }.count

 Along with the data loss just be careful with thread safety and multiple
 threads/partitions on one host so the map should be viewed as shared
 amongst a larger space.



 Also with your exact description it sounds like your data should be
 encoded into the RDD if its per-record/per-row:  RDD[(MyBaseData,
 LastIterationSideValues)]



 On Mon, Apr 28, 2014 at 1:51 AM, Sung Hwan Chung coded...@cs.stanford.edu
  wrote:

 In our case, we'd like to keep memory content from one iteration to the
 next, and not just during a single mapPartition call because then we can do
 more efficient computations using the values from the previous iteration.

 So essentially, we need to declare objects outside the scope of the
 map/reduce calls (but residing in individual workers), then those can be
 accessed from the map/reduce calls.

 We'd be making some assumptions as you said, such as - RDD partition is
 statically located and can't move from worker to another worker unless the
 worker crashes.



 On Mon, Apr 28, 2014 at 1:35 AM, Sean Owen so...@cloudera.com wrote:

 On Mon, Apr 28, 2014 at 9:30 AM, Sung Hwan Chung 
 coded...@cs.stanford.edu wrote:

 Actually, I do not know how to do something like this or whether this
 is possible - thus my suggestive statement.

 Can you already declare persistent memory objects per worker? I tried
 something like constructing a singleton object within map functions, but
 that didn't work as it seemed to actually serialize singletons and pass it
 back and forth in a weird manner.


 Does it need to be persistent across operations, or just persist for the
 lifetime of processing of one partition in one mapPartition? The latter is
 quite easy and might give most of the speedup.

 Maybe that's 'enough', even if it means you re-cache values several
 times in a repeated iterative computation. It would certainly avoid
 managing a lot of complexity in trying to keep that state alive remotely
 across operations. I'd also be interested if there is any reliable way to
 do that, though it seems hard since it means you embed assumptions about
 where particular data is going to be processed.






Re: is it okay to reuse objects across RDD's?

2014-04-28 Thread Tom Vacek
I'm not sure what I said came through.  RDD zip is not hacky at all, as it
only depends on a user not changing the partitioning.  Basically, you would
keep your losses as an RDD[Double] and zip whose with the RDD of examples,
and update the losses.  You're doing a copy (and GC) on the RDD of losses
each time, but this is negligible.


On Mon, Apr 28, 2014 at 11:33 AM, Sung Hwan Chung
coded...@cs.stanford.eduwrote:

 Yes, this is what we've done as of now (if you read earlier threads). And
 we were saying that we'd prefer if Spark supported persistent worker memory
 management in a little bit less hacky way ;)


 On Mon, Apr 28, 2014 at 8:44 AM, Ian O'Connell i...@ianoconnell.comwrote:

 A mutable map in an object should do what your looking for then I
 believe. You just reference the object as an object in your closure so it
 won't be swept up when your closure is serialized and you can reference
 variables of the object on the remote host then. e.g.:

 object MyObject {
   val mmap = scala.collection.mutable.Map[Long, Long]()
 }

 rdd.map { ele =
 MyObject.mmap.getOrElseUpdate(ele, 1L)
 ...
 }.map {ele =
 require(MyObject.mmap(ele) == 1L)

 }.count

 Along with the data loss just be careful with thread safety and multiple
 threads/partitions on one host so the map should be viewed as shared
 amongst a larger space.



 Also with your exact description it sounds like your data should be
 encoded into the RDD if its per-record/per-row:  RDD[(MyBaseData,
 LastIterationSideValues)]



 On Mon, Apr 28, 2014 at 1:51 AM, Sung Hwan Chung 
 coded...@cs.stanford.edu wrote:

 In our case, we'd like to keep memory content from one iteration to the
 next, and not just during a single mapPartition call because then we can do
 more efficient computations using the values from the previous iteration.

 So essentially, we need to declare objects outside the scope of the
 map/reduce calls (but residing in individual workers), then those can be
 accessed from the map/reduce calls.

 We'd be making some assumptions as you said, such as - RDD partition is
 statically located and can't move from worker to another worker unless the
 worker crashes.



 On Mon, Apr 28, 2014 at 1:35 AM, Sean Owen so...@cloudera.com wrote:

 On Mon, Apr 28, 2014 at 9:30 AM, Sung Hwan Chung 
 coded...@cs.stanford.edu wrote:

 Actually, I do not know how to do something like this or whether this
 is possible - thus my suggestive statement.

 Can you already declare persistent memory objects per worker? I tried
 something like constructing a singleton object within map functions, but
 that didn't work as it seemed to actually serialize singletons and pass it
 back and forth in a weird manner.


 Does it need to be persistent across operations, or just persist for
 the lifetime of processing of one partition in one mapPartition? The latter
 is quite easy and might give most of the speedup.

 Maybe that's 'enough', even if it means you re-cache values several
 times in a repeated iterative computation. It would certainly avoid
 managing a lot of complexity in trying to keep that state alive remotely
 across operations. I'd also be interested if there is any reliable way to
 do that, though it seems hard since it means you embed assumptions about
 where particular data is going to be processed.







Re: is it okay to reuse objects across RDD's?

2014-04-28 Thread Tom Vacek
Right---They are zipped at each iteration.


On Mon, Apr 28, 2014 at 11:56 AM, Chester Chen chesterxgc...@yahoo.comwrote:

 Tom,
 Are you suggesting two RDDs, one with loss and another for the rest
 info, using zip to tie them together, but do update on loss RDD (copy) ?

 Chester

 Sent from my iPhone

 On Apr 28, 2014, at 9:45 AM, Tom Vacek minnesota...@gmail.com wrote:

 I'm not sure what I said came through.  RDD zip is not hacky at all, as it
 only depends on a user not changing the partitioning.  Basically, you would
 keep your losses as an RDD[Double] and zip whose with the RDD of examples,
 and update the losses.  You're doing a copy (and GC) on the RDD of losses
 each time, but this is negligible.


 On Mon, Apr 28, 2014 at 11:33 AM, Sung Hwan Chung 
 coded...@cs.stanford.edu wrote:

 Yes, this is what we've done as of now (if you read earlier threads). And
 we were saying that we'd prefer if Spark supported persistent worker memory
 management in a little bit less hacky way ;)


 On Mon, Apr 28, 2014 at 8:44 AM, Ian O'Connell i...@ianoconnell.comwrote:

 A mutable map in an object should do what your looking for then I
 believe. You just reference the object as an object in your closure so it
 won't be swept up when your closure is serialized and you can reference
 variables of the object on the remote host then. e.g.:

 object MyObject {
   val mmap = scala.collection.mutable.Map[Long, Long]()
 }

 rdd.map { ele =
 MyObject.mmap.getOrElseUpdate(ele, 1L)
 ...
 }.map {ele =
 require(MyObject.mmap(ele) == 1L)

 }.count

 Along with the data loss just be careful with thread safety and multiple
 threads/partitions on one host so the map should be viewed as shared
 amongst a larger space.



 Also with your exact description it sounds like your data should be
 encoded into the RDD if its per-record/per-row:  RDD[(MyBaseData,
 LastIterationSideValues)]



 On Mon, Apr 28, 2014 at 1:51 AM, Sung Hwan Chung 
 coded...@cs.stanford.edu wrote:

 In our case, we'd like to keep memory content from one iteration to the
 next, and not just during a single mapPartition call because then we can do
 more efficient computations using the values from the previous iteration.

 So essentially, we need to declare objects outside the scope of the
 map/reduce calls (but residing in individual workers), then those can be
 accessed from the map/reduce calls.

 We'd be making some assumptions as you said, such as - RDD partition is
 statically located and can't move from worker to another worker unless the
 worker crashes.



 On Mon, Apr 28, 2014 at 1:35 AM, Sean Owen so...@cloudera.com wrote:

 On Mon, Apr 28, 2014 at 9:30 AM, Sung Hwan Chung 
 coded...@cs.stanford.edu wrote:

 Actually, I do not know how to do something like this or whether this
 is possible - thus my suggestive statement.

 Can you already declare persistent memory objects per worker? I tried
 something like constructing a singleton object within map functions, but
 that didn't work as it seemed to actually serialize singletons and pass 
 it
 back and forth in a weird manner.


 Does it need to be persistent across operations, or just persist for
 the lifetime of processing of one partition in one mapPartition? The 
 latter
 is quite easy and might give most of the speedup.

 Maybe that's 'enough', even if it means you re-cache values several
 times in a repeated iterative computation. It would certainly avoid
 managing a lot of complexity in trying to keep that state alive remotely
 across operations. I'd also be interested if there is any reliable way to
 do that, though it seems hard since it means you embed assumptions about
 where particular data is going to be processed.








Re: is it okay to reuse objects across RDD's?

2014-04-28 Thread Tom Vacek
Ian, I tried playing with your suggestion, but I get a task not
serializable error (and some obvious things didn't fix it).  Can you get
that working?


On Mon, Apr 28, 2014 at 10:58 AM, Tom Vacek minnesota...@gmail.com wrote:

 As to your last line: I've used RDD zipping to avoid GC since MyBaseData
 is large and doesn't change.  I think this is a very good solution to what
 is being asked for.


 On Mon, Apr 28, 2014 at 10:44 AM, Ian O'Connell i...@ianoconnell.comwrote:

 A mutable map in an object should do what your looking for then I
 believe. You just reference the object as an object in your closure so it
 won't be swept up when your closure is serialized and you can reference
 variables of the object on the remote host then. e.g.:

 object MyObject {
   val mmap = scala.collection.mutable.Map[Long, Long]()
 }

 rdd.map { ele =
 MyObject.mmap.getOrElseUpdate(ele, 1L)
 ...
 }.map {ele =
 require(MyObject.mmap(ele) == 1L)

 }.count

 Along with the data loss just be careful with thread safety and multiple
 threads/partitions on one host so the map should be viewed as shared
 amongst a larger space.



 Also with your exact description it sounds like your data should be
 encoded into the RDD if its per-record/per-row:  RDD[(MyBaseData,
 LastIterationSideValues)]



 On Mon, Apr 28, 2014 at 1:51 AM, Sung Hwan Chung 
 coded...@cs.stanford.edu wrote:

 In our case, we'd like to keep memory content from one iteration to the
 next, and not just during a single mapPartition call because then we can do
 more efficient computations using the values from the previous iteration.

 So essentially, we need to declare objects outside the scope of the
 map/reduce calls (but residing in individual workers), then those can be
 accessed from the map/reduce calls.

 We'd be making some assumptions as you said, such as - RDD partition is
 statically located and can't move from worker to another worker unless the
 worker crashes.



 On Mon, Apr 28, 2014 at 1:35 AM, Sean Owen so...@cloudera.com wrote:

 On Mon, Apr 28, 2014 at 9:30 AM, Sung Hwan Chung 
 coded...@cs.stanford.edu wrote:

 Actually, I do not know how to do something like this or whether this
 is possible - thus my suggestive statement.

 Can you already declare persistent memory objects per worker? I tried
 something like constructing a singleton object within map functions, but
 that didn't work as it seemed to actually serialize singletons and pass it
 back and forth in a weird manner.


 Does it need to be persistent across operations, or just persist for
 the lifetime of processing of one partition in one mapPartition? The latter
 is quite easy and might give most of the speedup.

 Maybe that's 'enough', even if it means you re-cache values several
 times in a repeated iterative computation. It would certainly avoid
 managing a lot of complexity in trying to keep that state alive remotely
 across operations. I'd also be interested if there is any reliable way to
 do that, though it seems hard since it means you embed assumptions about
 where particular data is going to be processed.







Re: is it okay to reuse objects across RDD's?

2014-04-28 Thread Tom Vacek
If you create your auxiliary RDD as a map from the examples, the
partitioning will be inherited.


On Mon, Apr 28, 2014 at 12:38 PM, Sung Hwan Chung
coded...@cs.stanford.eduwrote:

 That might be a good alternative to what we are looking for. But I wonder
 if this would be as efficient as we want to. For instance, will RDDs of the
 same size usually get partitioned to the same machines - thus not
 triggering any cross machine aligning, etc. We'll explore it, but I would
 still very much like to see more direct worker memory management besides
 RDDs.


 On Mon, Apr 28, 2014 at 10:26 AM, Tom Vacek minnesota...@gmail.comwrote:

 Right---They are zipped at each iteration.


 On Mon, Apr 28, 2014 at 11:56 AM, Chester Chen 
 chesterxgc...@yahoo.comwrote:

 Tom,
 Are you suggesting two RDDs, one with loss and another for the rest
 info, using zip to tie them together, but do update on loss RDD (copy) ?

 Chester

 Sent from my iPhone

 On Apr 28, 2014, at 9:45 AM, Tom Vacek minnesota...@gmail.com wrote:

 I'm not sure what I said came through.  RDD zip is not hacky at all, as
 it only depends on a user not changing the partitioning.  Basically, you
 would keep your losses as an RDD[Double] and zip whose with the RDD of
 examples, and update the losses.  You're doing a copy (and GC) on the RDD
 of losses each time, but this is negligible.


 On Mon, Apr 28, 2014 at 11:33 AM, Sung Hwan Chung 
 coded...@cs.stanford.edu wrote:

 Yes, this is what we've done as of now (if you read earlier threads).
 And we were saying that we'd prefer if Spark supported persistent worker
 memory management in a little bit less hacky way ;)


 On Mon, Apr 28, 2014 at 8:44 AM, Ian O'Connell i...@ianoconnell.comwrote:

 A mutable map in an object should do what your looking for then I
 believe. You just reference the object as an object in your closure so it
 won't be swept up when your closure is serialized and you can reference
 variables of the object on the remote host then. e.g.:

 object MyObject {
   val mmap = scala.collection.mutable.Map[Long, Long]()
 }

 rdd.map { ele =
 MyObject.mmap.getOrElseUpdate(ele, 1L)
 ...
 }.map {ele =
 require(MyObject.mmap(ele) == 1L)

 }.count

 Along with the data loss just be careful with thread safety and
 multiple threads/partitions on one host so the map should be viewed as
 shared amongst a larger space.



 Also with your exact description it sounds like your data should be
 encoded into the RDD if its per-record/per-row:  RDD[(MyBaseData,
 LastIterationSideValues)]



 On Mon, Apr 28, 2014 at 1:51 AM, Sung Hwan Chung 
 coded...@cs.stanford.edu wrote:

 In our case, we'd like to keep memory content from one iteration to
 the next, and not just during a single mapPartition call because then we
 can do more efficient computations using the values from the previous
 iteration.

 So essentially, we need to declare objects outside the scope of the
 map/reduce calls (but residing in individual workers), then those can be
 accessed from the map/reduce calls.

 We'd be making some assumptions as you said, such as - RDD partition
 is statically located and can't move from worker to another worker unless
 the worker crashes.



 On Mon, Apr 28, 2014 at 1:35 AM, Sean Owen so...@cloudera.comwrote:

 On Mon, Apr 28, 2014 at 9:30 AM, Sung Hwan Chung 
 coded...@cs.stanford.edu wrote:

 Actually, I do not know how to do something like this or whether
 this is possible - thus my suggestive statement.

 Can you already declare persistent memory objects per worker? I
 tried something like constructing a singleton object within map 
 functions,
 but that didn't work as it seemed to actually serialize singletons and 
 pass
 it back and forth in a weird manner.


 Does it need to be persistent across operations, or just persist for
 the lifetime of processing of one partition in one mapPartition? The 
 latter
 is quite easy and might give most of the speedup.

 Maybe that's 'enough', even if it means you re-cache values several
 times in a repeated iterative computation. It would certainly avoid
 managing a lot of complexity in trying to keep that state alive remotely
 across operations. I'd also be interested if there is any reliable way 
 to
 do that, though it seems hard since it means you embed assumptions about
 where particular data is going to be processed.










Re: GraphX: Help understanding the limitations of Pregel

2014-04-23 Thread Tom Vacek
Here are some out-of-the-box ideas:  If the elements lie in a fairly small
range and/or you're willing to work with limited precision, you could use
counting sort.  Moreover, you could iteratively find the median using
bisection, which would be associative and commutative.  It's easy to think
of improvements that would make this approach give a reasonable answer in a
few iterations.  I have no idea about mixing algorithmic iterations with
median-finding iterations.


On Wed, Apr 23, 2014 at 8:20 PM, Ryan Compton compton.r...@gmail.comwrote:

 I'm trying shoehorn a label propagation-ish algorithm into GraphX. I
 need to update each vertex with the median value of their neighbors.
 Unlike PageRank, which updates each vertex with the mean of their
 neighbors, I don't have a simple commutative and associative function
 to use for mergeMsg.

 What are my options? It looks like I can choose between:

 1. a hacky mergeMsg (i.e. combine a,b - Array(a,b) and then do the
 median in vprog)
 2. collectNeighbors and then median
 3. ignore GraphX and just do the whole thing with joins (which I
 actually got working, but its slow)

 Is there another possibility that I'm missing?



internship opportunity

2014-04-22 Thread Tom Vacek
Thomson Reuters is looking for a graduate (or possibly advanced
undergraduate) summer intern in Eagan, MN. This is a chance to work on an
innovative project exploring how big data sets can be used by professionals
such as lawyers, scientists and journalists.  If you're subscribed to this
mailing list, you're probably a good fit.  http://goo.gl/ti2a3G


Re: Huge matrix

2014-04-12 Thread Tom V
The last writer is suggesting using the triangle inequality to cut down the
search space.  If c is the centroid of cluster C, then the closest any
point in C is to x is ||x-c|| - r(C), where r(C) is the (precomputed)
radius of the cluster---the distance of the farthest point in C to c.
 Whether you want to use the bound for an exact algorithm or a search
heuristic is up to you.

Another approach can be very successful if the user-attributes matrix is
very sparse:  The idea is to exploit the additive nature of common
similarity measures.  I have used this for cosine and Bayesian posterior.
 Here is the basic idea: A User index contains all the attributes for a
certain user (sorted by strength), and an Attribute index contains all
the users with a certain attribute.  (Again, sorted by strength).  To find
the best k matches for user i, start with user i's strongest attribute and
search the attribute index to find some users with high scores for that
attribute.  Move to user i's next best attribute.  If you proceed with a
diagonal frontier (as you fetch candidates bases on i's weaker
attributes, also go back and fetch more weak candidates for i's strong
attributes), you can get a candidate pool and a bound on the highest score
of any user not in the candidate pool.  You just expand the candidate pool
until the bound drops below the scores of the k best candidates.
 Practically, you'd want to limit the candidate pool size, but it's almost
always exact.

For a million users, you should be able to distribute the things needed to
make a recommendation (either the centroids or the attributes matrix), and
just break up the work based on the users you want to generate
recommendations for.  I hope this helps.

Tom


On Sat, Apr 12, 2014 at 11:35 AM, Xiaoli Li lixiaolima...@gmail.com wrote:

 Hi Guillaume,

 This sounds a good idea to me. I am a newbie here. Could you further
 explain how will you determine which clusters to keep? According to the
 distance between each element with each cluster center?
 Will you keep several clusters for each element for searching nearest
 neighbours? Thanks.

   Xiaoli




 On Sat, Apr 12, 2014 at 9:46 AM, Guillaume Pitel 
 guillaume.pi...@exensa.com wrote:

  Hi,

 I'm doing this here for multiple tens of millions of elements (and the
 goal is to reach multiple billions), on a relatively small cluster (7 nodes
 4 cores 32GB RAM). We use multiprobe KLSH. All you have to do is run a
 Kmeans on your data, then compute the distance between each element with
 each cluster center, keep a few clusters and only look into these clusters
 for nearest neighbours.

 This method is known to perform very well and vastly speedup your
 computation

 The hardest part is to decide how many clusters to compute, and how many
 to keep. As a rule of thumb, I generally want 300-1 elements per
 cluster, and use 5-20 clusters.

 Guillaume


  I am implementing an algorithm using Spark. I have one million users. I
 need to compute the similarity between each pair of users using some user's
 attributes.  For each user, I need to get top k most similar users. What is
 the best way to implement this?


  Thanks.



 --
[image: eXenSa]
  *Guillaume PITEL, Président*
 +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53

  eXenSa S.A.S. http://www.exensa.com/
  41, rue Périer - 92120 Montrouge - FRANCE
 Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05



inline: exensa_logo_mail.png

Re: Spark 1.0.0 release plan

2014-04-04 Thread Tom Graves
Do we have a list of things we really want to get in for 1.X?   Perhaps move 
any jira out to a 1.1 release if we aren't targetting them for 1.0.

 It might be nice to send out reminders when these dates are approaching. 

Tom
On Thursday, April 3, 2014 11:19 PM, Bhaskar Dutta bhas...@gmail.com wrote:
 
Thanks a lot guys!





On Fri, Apr 4, 2014 at 5:34 AM, Patrick Wendell pwend...@gmail.com wrote:

Btw - after that initial thread I proposed a slightly more detailed set of 
dates:

https://cwiki.apache.org/confluence/display/SPARK/Wiki+Homepage 


- Patrick



On Thu, Apr 3, 2014 at 11:28 AM, Matei Zaharia matei.zaha...@gmail.com wrote:

Hey Bhaskar, this is still the plan, though QAing might take longer than 15 
days. Right now since we’ve passed April 1st, the only features considered for 
a merge are those that had pull requests in review before. (Some big ones are 
things like annotating the public APIs and simplifying configuration). Bug 
fixes and things like adding Python / Java APIs for new components will also 
still be considered.


Matei


On Apr 3, 2014, at 10:30 AM, Bhaskar Dutta bhas...@gmail.com wrote:

Hi,


Is there any change in the release plan for Spark 1.0.0-rc1 release date 
from what is listed in the Proposal for Spark Release Strategy thread?
== Tentative Release Window for 1.0.0 ==
Feb 1st - April 1st: General development
April 1st: Code freeze for new features
April 15th: RC1
Thanks,
Bhaskar



Re: Pig on Spark

2014-03-06 Thread Tom Graves
I had asked a similar question on the dev mailing list a while back (Jan 22nd). 

See the archives: 
http://mail-archives.apache.org/mod_mbox/spark-dev/201401.mbox/browser - look 
for spork.

Basically Matei said:

Yup, that was it, though I believe people at Twitter picked it up again 
recently. I’d suggest
asking Dmitriy if you know him. I’ve seen interest in this from several other 
groups, and
if there’s enough of it, maybe we can start another open source repo to track 
it. The work
in that repo you pointed to was done over one week, and already had most of 
Pig’s operators
working. (I helped out with this prototype over Twitter’s hack week.) That work 
also calls
the Scala API directly, because it was done before we had a Java API; it should 
be easier
with the Java one.

Tom



On Thursday, March 6, 2014 3:11 PM, Sameer Tilak ssti...@live.com wrote:
 
 
Hi everyone,
We are using to Pig to build our data pipeline. I came across Spork -- Pig on 
Spark at: https://github.com/dvryaboy/pig and not sure if it is still active.   

Can someone please let me know the status of Spork or any other effort that 
will let us run Pig on Spark? We can significantly benefit by using Spark, but 
we would like to keep using the existing Pig scripts.  

<    1   2