Re: PathFilter for newAPIHadoopFile?

2014-09-15 Thread Nat Padmanabhan
Hi Eric,

Something along the lines of the following should work

val fs = getFileSystem(...) // standard hadoop API call
val filteredConcatenatedPaths = fs.listStatus(topLevelDirPath,
pathFilter).map(_.getPath.toString).mkString(,)  // pathFilter is an
instance of org.apache.hadoop.fs.PathFilter
val parquetRdd = sc.hadoopFile(filteredConcatenatedPaths,
classOf[ParquetInputFormat[Something]], classOf[Void],
classOf[SomeAvroType], getConfiguration(...))

You have to do some initializations on ParquetInputFormat such as
AvroReadSetup/AvroWriteSupport etc but that you should be doing
already I am guessing.

Cheers,
Nat


On Sun, Sep 14, 2014 at 7:37 PM, Eric Friedman
eric.d.fried...@gmail.com wrote:
 Hi,

 I have a directory structure with parquet+avro data in it. There are a
 couple of administrative files (.foo and/or _foo) that I need to ignore when
 processing this data or Spark tries to read them as containing parquet
 content, which they do not.

 How can I set a PathFilter on the FileInputFormat used to construct an RDD?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Accuracy hit in classification with Spark

2014-09-15 Thread jatinpreet
Hi,

I have been able to get the same accuracy with MLlib as Mahout's. The
pre-processing phase of Mahout was the reason  behind the accuracy mismatch.
After studying and applying the same logic in my code, it worked like a
charm.

Thanks,
Jatin



-
Novice Big Data Programmer
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Accuracy-hit-in-classification-with-Spark-tp13773p14221.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



SparkSQL 1.1 hang when DROP or LOAD

2014-09-15 Thread linkpatrickliu
I started sparkSQL thrift server:
sbin/start-thriftserver.sh

Then I use beeline to connect to it:
bin/beeline
!connect jdbc:hive2://localhost:1 op1 op1

I have created a database for user op1.
create database dw_op1;

And grant all privileges to user op1;
grant all on database dw_op1 to user op1;

Then I create a table:
create tabel src(key int, value string)

Now, I want to load data into this table:
load data inpath kv1.txt into table src; (kv1.txt is located in the
/user/op1 directory in hdfs)

However, the client will hang...

The log in the thrift server:
14/09/15 14:21:25 INFO Driver: PERFLOG method=acquireReadWriteLocks


Then I ctrl-C to stop the beeline client, and restart the beelien client.
Now I want to drop the table src in dw_op1;
use dw_op1
drop table src

Then, the beeline client is hanging again..
The log in the thrift server:
14/09/15 14:23:27 INFO Driver: PERFLOG method=acquireReadWriteLocks


Anyone can help on this? Many thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-1-1-hang-when-DROP-or-LOAD-tp14222.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: combineByKey throws ClassCastException

2014-09-15 Thread x
How about this.

scala val rdd2 = rdd.combineByKey(
 | (v: Int) = v.toLong,
 | (c: Long, v: Int) = c + v,
 | (c1: Long, c2: Long) = c1 + c2)
rdd2: org.apache.spark.rdd.RDD[(String, Long)] = MapPartitionsRDD[9] at
combineB
yKey at console:14

xj @ Tokyo

On Mon, Sep 15, 2014 at 3:06 PM, Tao Xiao xiaotao.cs@gmail.com wrote:

 I followd an example presented in the tutorial Learning Spark
 http://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html
 to compute the per-key average as follows:


 val Array(appName) = args
 val sparkConf = new SparkConf()
 .setAppName(appName)
 val sc = new SparkContext(sparkConf)
 /*
  * compute the per-key average of values
  * results should be:
  *A : 5.8
  *B : 14
  *C : 60.6
  */
 val rdd = sc.parallelize(List(
 (A, 3), (A, 9), (A, 12), (A, 0), (A, 5),
 (B, 4), (B, 10), (B, 11), (B, 20), (B, 25),
 (C, 32), (C, 91), (C, 122), (C, 3), (C, 55)), 2)
 val avg = rdd.combineByKey(
 (x:Int) = (x, 1),  // java.lang.ClassCastException: scala.Tuple2$mcII$sp
 cannot be cast to java.lang.Integer
 (acc:(Int, Int), x) = (acc._1 + x, acc._2 + 1),
 (acc1:(Int, Int), acc2:(Int, Int)) = (acc1._1 + acc2._1, acc1._2 +
 acc2._2))
 .map{case (s, t) = (s, t._1/t._2.toFloat)}
  avg.collect.foreach(t = println(t._1 +  - + t._2))



 When I submitted the application, an exception of 
 *java.lang.ClassCastException:
 scala.Tuple2$mcII$sp cannot be cast to java.lang.Integer* was thrown
 out. The tutorial said that the first function of *combineByKey*, *(x:Int)
 = (x, 1)*, should take a single element in the source RDD and return an
 element of the desired type in the resulting RDD. In my application, we
 take a single element of type *Int *from the source RDD and return a
 tuple of type (*Int*, *Int*), which meets the requirements quite well.
 But why would such an exception be thrown?

 I'm using CDH 5.0 and Spark 0.9

 Thanks.





Re: Developing a spark streaming application

2014-09-15 Thread Santiago Mola
Just for the record, this is being discussed at StackOverflow:

http://stackoverflow.com/questions/25663026/developing-a-spark-streaming-application/25766618

2014-08-27 10:28 GMT+02:00 Filip Andrei andreis.fi...@gmail.com:

 Hey guys, so the problem i'm trying to tackle is the following:

 - I need a data source that emits messages at a certain frequency
 - There are N neural nets that need to process each message individually
 - The outputs from all neural nets are aggregated and only when all N
 outputs for each message are collected, should a message be declared fully
 processed
 - At the end i should measure the time it took for a message to be fully
 processed (time between when it was emitted and when all N neural net
 outputs from that message have been collected)


 What i'm mostly interested in is if i approached the problem correctly in
 the first place and if so some best practice pointers on my approach.






 And my current implementation if the following:


 For a data source i created the class
 public class JavaRandomReceiver extends ReceiverMaplt;String,
 Object

 As i decided a key-value store would be best suited to holding emitted
 data.


 The onStart() method initializes a custom random sequence generator and
 starts a thread that
 continuously generates new neural net inputs and stores them as following:

 SensorData sdata = generator.createSensorData();

 MapString, Object result = new HashMapString, Object();

 result.put(msgNo, sdata.getMsgNo());
 result.put(sensorTime, sdata.getSampleTime());
 result.put(list, sdata.getPayload());
 result.put(timeOfProc, sdata.getCreationTime());

 store(result);

 // sleeps for a given amount of time set at generator creation
 generator.waitForNextTuple();

 The msgNo here is incremented for each newly created message and is used to
 keep


 The neural net functionality is added by creating a custom mapper
 public class NeuralNetMapper implements FunctionMaplt;String,
 Object,
 MapString, Object

 whose call function basically just takes the input map, plugs its list
 object as the input to the neural net object, replaces the map's initial
 list with the neural net output and returns the modified map.




 The aggregator is implemented as a single class that has the following form

 public class JavaSyncBarrier implements
 FunctionJavaRDDlt;Maplt;String,Object, Void



 This class maintains a google guava cache of neural net outputs that it has
 received in the form of
 Long, Listlt;Maplt;String, Object, where the Long value is the msgNo
 and the list contains all maps containing said message number.

 When a new map is received, it is added to the cache, its list's length is
 compared to to the total number of neural nets and, if these numbers match,
 that message number is said to be fully processed and a difference between
 timeOfProc (all maps with the same msgNo have the same timeOfProc) and the
 current system time is displayed as the total time necessary for
 processing.





 Now the way all these components are linked together is the following:

 public static void main(String[] args) {


 SparkConf conf = new SparkConf();
 conf.setAppName(SimpleSparkStreamingTest);


 JavaStreamingContext jssc = new JavaStreamingContext(conf, new
 Duration(1000));

 jssc.checkpoint(/tmp/spark-tempdir);

 // Generator config goes here
 // Set to emit new message every 1 second
 // ---

 // Neural net config goes here
 // ---

 JavaReceiverInputDStreamMaplt;String, Object rndLists = jssc
 .receiverStream(new JavaRandomReceiver(generatorConfig);

 ListJavaDStreamlt;Maplt;String, Object
 neuralNetOutputStreams = new
 ArrayListJavaDStreamlt;Maplt;String, Object();

 for(int i = 0; i  numberOfNets; i++){

 neuralNetOutputStreams .add(
 rndLists.map(new NeuralNetMapper(neuralNetConfig))
 );
 }

 JavaDStreamMaplt;String, Object joined =
 joinStreams(neuralNetOutputs);

 joined.foreach(new JavaSyncBarrier(numberOfNets));

 jssc.start();
 jssc.awaitTermination();
 }

 where joinStreams unifies a list of streams:
 public static T JavaDStreamT
 joinStreams(ListJavaDStreamlt;T
 streams) {

 JavaDStreamT result = streams.get(0);
 for (int i = 1; i  streams.size(); i++) {
 result = result.union(streams.get(i));
 }

 return result;
 }



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Developing-a-spark-streaming-application-tp12893.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: 

Re: When does Spark switch from PROCESS_LOCAL to NODE_LOCAL or RACK_LOCAL?

2014-09-15 Thread Andrew Ash
Hi Brad and Nick,

Thanks for the comments!  I opened a ticket to get a more thorough
explanation of data locality into the docs here:
https://issues.apache.org/jira/browse/SPARK-3526

If you could put any other unanswered questions you have about data
locality on that ticket I'll try to incorporate answers to them in the
final addition I send in.

Andrew

On Sun, Sep 14, 2014 at 6:47 PM, Brad Miller bmill...@eecs.berkeley.edu
wrote:

 Hi Andrew,

 I agree with Nicholas.  That was a nice, concise summary of the
 meaning of the locality customization options, indicators and default
 Spark behaviors.  I haven't combed through the documentation
 end-to-end in a while, but I'm also not sure that information is
 presently represented somewhere and it would be great to persist it
 somewhere besides the mailing list.

 best,
 -Brad

 On Fri, Sep 12, 2014 at 12:12 PM, Nicholas Chammas
 nicholas.cham...@gmail.com wrote:
  Andrew,
 
  This email was pretty helpful. I feel like this stuff should be
 summarized
  in the docs somewhere, or perhaps in a blog post.
 
  Do you know if it is?
 
  Nick
 
 
  On Thu, Jun 5, 2014 at 6:36 PM, Andrew Ash and...@andrewash.com wrote:
 
  The locality is how close the data is to the code that's processing it.
  PROCESS_LOCAL means data is in the same JVM as the code that's running,
 so
  it's really fast.  NODE_LOCAL might mean that the data is in HDFS on the
  same node, or in another executor on the same node, so is a little
 slower
  because the data has to travel across an IPC connection.  RACK_LOCAL is
 even
  slower -- data is on a different server so needs to be sent over the
  network.
 
  Spark switches to lower locality levels when there's no unprocessed data
  on a node that has idle CPUs.  In that situation you have two options:
 wait
  until the busy CPUs free up so you can start another task that uses
 data on
  that server, or start a new task on a farther away server that needs to
  bring data from that remote place.  What Spark typically does is wait a
 bit
  in the hopes that a busy CPU frees up.  Once that timeout expires, it
 starts
  moving the data from far away to the free CPU.
 
  The main tunable option is how far long the scheduler waits before
  starting to move data rather than code.  Those are the spark.locality.*
  settings here: http://spark.apache.org/docs/latest/configuration.html
 
  If you want to prevent this from happening entirely, you can set the
  values to ridiculously high numbers.  The documentation also mentions
 that
  0 has special meaning, so you can try that as well.
 
  Good luck!
  Andrew
 
 
  On Thu, Jun 5, 2014 at 3:13 PM, Sung Hwan Chung 
 coded...@cs.stanford.edu
  wrote:
 
  I noticed that sometimes tasks would switch from PROCESS_LOCAL (I'd
  assume that this means fully cached) to NODE_LOCAL or even RACK_LOCAL.
 
  When these happen things get extremely slow.
 
  Does this mean that the executor got terminated and restarted?
 
  Is there a way to prevent this from happening (barring the machine
  actually going down, I'd rather stick with the same process)?
 
 
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Viewing web UI after fact

2014-09-15 Thread Grzegorz Białek
Hi Andrew,

sorry for late response. Thank you very much for solving my problem. There
was no APPLICATION_COMPLETE file in log directory due to not calling
sc.stop() at the end of program. With stopping spark context everything
works correctly, so thank you again.

Best regards,
Grzegorz


On Fri, Sep 5, 2014 at 8:06 PM, Andrew Or and...@databricks.com wrote:

 Hi Grzegorz,

 Can you verify that there are APPLICATION_COMPLETE files in the event
 log directories? E.g. Does
 file:/tmp/spark-events/app-name-1234567890/APPLICATION_COMPLETE exist? If
 not, it could be that your application didn't call sc.stop(), so the
 ApplicationEnd event is not actually logged. The HistoryServer looks for
 this special file to identify applications to display. You could also try
 manually adding the APPLICATION_COMPLETE file to this directory; the
 HistoryServer should pick this up and display the application, though the
 information displayed will be incomplete because the log did not capture
 all the events (sc.stop() does a final close() on the file written).

 Andrew


 2014-09-05 1:50 GMT-07:00 Grzegorz Białek grzegorz.bia...@codilime.com:

 Hi Andrew,

 thank you very much for your answer. Unfortunately it still doesn't work.
 I'm using Spark 1.0.0, and I start history server running
 sbin/start-history-server.sh dir, although I also set
  SPARK_HISTORY_OPTS=-Dspark.history.fs.logDirectory in
 conf/spark-env.sh. I tried also other dir than /tmp/spark-events which
 have all possible permissions enabled. Also adding file: (and file://)
 didn't help - history server still shows:
 History Server
 Event Log Location: file:/tmp/spark-events/
 No Completed Applications Found.

 Best regards,
 Grzegorz


 On Thu, Sep 4, 2014 at 8:20 PM, Andrew Or and...@databricks.com wrote:

 Hi Grzegorz,

 Sorry for the late response. Unfortunately, if the Master UI doesn't
 know about your applications (they are completed with respect to a
 different Master), then it can't regenerate the UIs even if the logs exist.
 You will have to use the history server for that.

 How did you start the history server? If you are using Spark =1.0, you
 can pass the directory as an argument to the sbin/start-history-server.sh
 script. Otherwise, you may need to set the following in your
 conf/spark-env.sh to specify the log directory:

 export
 SPARK_HISTORY_OPTS=-Dspark.history.fs.logDirectory=/tmp/spark-events

 It could also be a permissions thing. Make sure your logs in
 /tmp/spark-events are accessible by the JVM that runs the history server.
 Also, there's a chance that /tmp/spark-events is interpreted as an HDFS
 path depending on which Spark version you're running. To resolve any
 ambiguity, you may set the log path to file:/tmp/spark-events instead.
 But first verify whether they actually exist.

 Let me know if you get it working,
 -Andrew



 2014-08-19 8:23 GMT-07:00 Grzegorz Białek grzegorz.bia...@codilime.com
 :

 Hi,
 Is there any way view history of applications statistics in master ui
 after restarting master server? I have all logs ing /tmp/spark-events/ but
 when I start history server in this directory it says No Completed
 Applications Found. Maybe I could copy this logs to dir used by master
 server but I couldn't find any. Or maybe I'm doing something wrong
 launching history server.
 Do you have any idea how to solve it?

 Thanks,
 Grzegorz


 On Thu, Aug 14, 2014 at 10:53 AM, Grzegorz Białek 
 grzegorz.bia...@codilime.com wrote:

 Hi,

 Thank you both for your answers. Browsing using Master UI works fine.
 Unfortunately History Server shows No Completed Applications Found even
 if logs exists under given directory, but using Master UI is enough for 
 me.

 Best regards,
 Grzegorz



 On Wed, Aug 13, 2014 at 8:09 PM, Andrew Or and...@databricks.com
 wrote:

 The Spark UI isn't available through the same address; otherwise new
 applications won't be able to bind to it. Once the old application
 finishes, the standalone Master renders the after-the-fact application UI
 and exposes it under a different URL. To see this, go to the Master UI
 (master-url:8080) and click on your application in the Completed
 Applications table.


 2014-08-13 10:56 GMT-07:00 Matei Zaharia matei.zaha...@gmail.com:

 Take a look at http://spark.apache.org/docs/latest/monitoring.html
 -- you need to launch a history server to serve the logs.

 Matei

 On August 13, 2014 at 2:03:08 AM, grzegorz-bialek (
 grzegorz.bia...@codilime.com) wrote:

 Hi,
 I wanted to access Spark web UI after application stops. I set
 spark.eventLog.enabled to true and logs are availaible
 in JSON format in /tmp/spark-event but web UI isn't available under
 address
 http://driver-node:4040
 I'm running Spark in standalone mode.

 What should I do to access web UI after application ends?

 Thanks,
 Grzegorz



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Viewing-web-UI-after-fact-tp12023.html
 Sent from the Apache Spark User List mailing list 

Re: Dependency Problem with Spark / ScalaTest / SBT

2014-09-15 Thread Thorsten Bergler

Hello,

When I remove the line and try to execute sbt run, I end up with the 
following lines:



14/09/15 10:11:35 INFO ui.SparkUI: Stopped Spark web UI at http://base:4040
[...]
14/09/15 10:11:05 WARN scheduler.TaskSchedulerImpl: Initial job has not 
accepted any resources; check your cluster UI to ensure that workers are 
registered and have sufficient memory
14/09/15 10:11:15 INFO client.AppClient$ClientActor: Connecting to 
master spark://base:7077...


It seems that the configuration within sbt doesn't use my original 
Spark, because my original Spark web UI is running under 
http://base:8080.  Seems like sbt is starting another spark instance??


Best regards
Thorsten


Am 14.09.2014 um 18:56 schrieb Dean Wampler:

Sorry, I meant any *other* SBT files.

However, what happens if you remove the line:

  exclude(org.eclipse.jetty.orbit, javax.servlet)


dean


Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition 
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)

Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Sun, Sep 14, 2014 at 11:53 AM, Dean Wampler deanwamp...@gmail.com 
mailto:deanwamp...@gmail.com wrote:


Can you post your whole SBT build file(s)?

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Wed, Sep 10, 2014 at 6:48 AM, Thorsten Bergler
sp...@tbonline.de mailto:sp...@tbonline.de wrote:

Hi,

I just called:

 test

or

 run

Thorsten


Am 10.09.2014 um 13:38 schrieb arthur.hk.c...@gmail.com
mailto:arthur.hk.c...@gmail.com:

Hi,

What is your SBT command and the parameters?

Arthur


On 10 Sep, 2014, at 6:46 pm, Thorsten Bergler
sp...@tbonline.de mailto:sp...@tbonline.de wrote:

Hello,

I am writing a Spark App which is already working so far.
Now I started to build also some UnitTests, but I am
running into some dependecy problems and I cannot find
a solution right now. Perhaps someone could help me.

I build my Spark Project with SBT and it seems to be
configured well, because compiling, assembling and
running the built jar with spark-submit are working well.

Now I started with the UnitTests, which I located
under /src/test/scala.

When I call test in sbt, I get the following:

14/09/10 12:22:06 INFO storage.BlockManagerMaster:
Registered BlockManager
14/09/10 12:22:06 INFO spark.HttpServer: Starting HTTP
Server
[trace] Stack trace suppressed: run last test:test for
the full output.
[error] Could not run test test.scala.SetSuite:
java.lang.NoClassDefFoundError:
javax/servlet/http/HttpServletResponse
[info] Run completed in 626 milliseconds.
[info] Total number of tests run: 0
[info] Suites: completed 0, aborted 0
[info] Tests: succeeded 0, failed 0, canceled 0,
ignored 0, pending 0
[info] All tests passed.
[error] Error during tests:
[error] test.scala.SetSuite
[error] (test:test) sbt.TestsFailedException: Tests
unsuccessful
[error] Total time: 3 s, completed 10.09.2014 12:22:06

last test:test gives me the following:

last test:test

[debug] Running TaskDef(test.scala.SetSuite,
org.scalatest.tools.Framework$$anon$1@6e5626c8, false,
[SuiteSelector])
java.lang.NoClassDefFoundError:
javax/servlet/http/HttpServletResponse
at
org.apache.spark.HttpServer.start(HttpServer.scala:54)
at

org.apache.spark.broadcast.HttpBroadcast$.createServer(HttpBroadcast.scala:156)
at

org.apache.spark.broadcast.HttpBroadcast$.initialize(HttpBroadcast.scala:127)
at

org.apache.spark.broadcast.HttpBroadcastFactory.initialize(HttpBroadcastFactory.scala:31)
at

org.apache.spark.broadcast.BroadcastManager.initialize(BroadcastManager.scala:48)
at

org.apache.spark.broadcast.BroadcastManager.init(BroadcastManager.scala:35)
at
org.apache.spark.SparkEnv$.create(SparkEnv.scala:218)
at

Re: Broadcast error

2014-09-15 Thread Chengi Liu
Hi Akhil,
  So with your config (specifically with set(spark.akka.frameSize ,
1000)) , I see the error:
org.apache.spark.SparkException: Job aborted due to stage failure:
Serialized task 0:0 was 401970046 bytes which exceeds spark.akka.frameSize
(10485760 bytes). Consider using broadcast variables for large values.
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
at org.apache.spark

So, I changed
set(spark.akka.frameSize , 1000) to set(spark.akka.frameSize ,
1000*00*)
but now I get the same error?

y4j.protocol.Py4JJavaError: An error occurred while calling
o28.trainKMeansModel.
: org.apache.spark.SparkException: Job aborted due to stage failure: All
masters are unresponsive! Giving up.
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGSched


along with following:
14/09/15 01:44:11 WARN TaskSchedulerImpl: Initial job has not accepted any
resources; check your cluster UI to ensure that workers are registered and
have sufficient memory
14/09/15 01:44:21 INFO AppClient$ClientActor: Connecting to master
spark://host:7077...
14/09/15 01:44:21 WARN AppClient$ClientActor: Could not connect to
akka.tcp://sparkMaster@host:7077: akka.remote.EndpointAssociationException:
Association failed with [akka.tcp://sparkMaster@host:7077]
14/09/15 01:44:21 WARN AppClient$ClientActor: Could not connect to
akka.tcp://sparkMaster@host:7077: akka.remote.EndpointAssociationException:
Association failed with [akka.tcp://sparkMaster@host:7077]
14/09/15 01:44:21 WARN AppClient$ClientActor: Could not connect to
akka.tcp://sparkMaster@host:7077: akka.remote.EndpointAssociationException:
Association failed with [akka.tcp://sparkMaster@host:7077]
14/09/15 01:44:21 WARN AppClient$ClientActor: Could not connect to
akka.tcp://sparkMaster@host:7077: akka.remote.EndpointAssociationException:
Association failed with [akka.tcp://sparkMaster@host:7077]
14/09/15 01:44:26 WARN TaskSchedulerImpl: Initial job has not accepted any
resources; check your cluster UI to ensure that workers are registered and
have sufficient memory
14/09/15 01:44:41 WARN TaskSchedulerImpl: Initial job has not accepted any
resources; check your cluster UI to ensure that workers are registered and
have sufficient memory
14/09/15 01:44:41 ERROR SparkDeploySchedulerBackend: Application has been
killed. Reason: All masters are unresponsive! Giving up.


:-(

On Mon, Sep 15, 2014 at 1:20 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Can you give this a try:

 conf = SparkConf().set(spark.executor.memory, 
 32G)*.set(spark.akka.frameSize
 ,
 1000).set(spark.broadcast.factory,org.apache.spark.broadcast.TorrentBroadcastFactory)*
 sc = SparkContext(conf = conf)
 rdd = sc.parallelize(matrix,5)
 from pyspark.mllib.clustering import KMeans
 from math import sqrt
 clusters = KMeans.train(rdd, 5, maxIterations=2,runs=2,
 initializationMode=random)
 def error(point):
 center = clusters.centers[clusters.predict(point)]
 return sqrt(sum([x**2 for x in (point - center)]))
 WSSSE = rdd.map(lambda point: error(point)).reduce(lambda x, y: x + y)
 print Within Set Sum of Squared Error =  + str(WSSSE)


 Thanks
 Best Regards

 On Mon, Sep 15, 2014 at 9:16 AM, Chengi Liu chengi.liu...@gmail.com
 wrote:

 And the thing is code runs just fine if I reduce the number of rows in my
 data?

 On Sun, Sep 14, 2014 at 8:45 PM, Chengi Liu chengi.liu...@gmail.com
 wrote:

 I am using spark1.0.2.
 This is my work cluster.. so I can't setup a new version readily...
 But right now, I am not using broadcast ..


 conf = SparkConf().set(spark.executor.memory,
 32G).set(spark.akka.frameSize, 1000)
 sc = SparkContext(conf = conf)
 rdd = sc.parallelize(matrix,5)

 from pyspark.mllib.clustering import KMeans
 from math import sqrt
 clusters = KMeans.train(rdd, 5, maxIterations=2,runs=2,
 initializationMode=random)
 def error(point):
 center = clusters.centers[clusters.predict(point)]
 return sqrt(sum([x**2 for x in (point - center)]))

 WSSSE = rdd.map(lambda point: error(point)).reduce(lambda x, y: x + y)
 print Within Set Sum of Squared Error =  + str(WSSSE)


 executed by
 spark-submit --master $SPARKURL clustering_example.py  --executor-memory
 32G  --driver-memory 60G

 and the error I see
 py4j.protocol.Py4JJavaError: An error occurred while calling
 o26.trainKMeansModel.
 : 

Re: Broadcast error

2014-09-15 Thread Akhil Das
Try:

rdd = sc.broadcast(matrix)

Or

rdd = sc.parallelize(matrix,100) // Just increase the number of slices,
give it a try.



Thanks
Best Regards

On Mon, Sep 15, 2014 at 2:18 PM, Chengi Liu chengi.liu...@gmail.com wrote:

 Hi Akhil,
   So with your config (specifically with set(spark.akka.frameSize ,
 1000)) , I see the error:
 org.apache.spark.SparkException: Job aborted due to stage failure:
 Serialized task 0:0 was 401970046 bytes which exceeds spark.akka.frameSize
 (10485760 bytes). Consider using broadcast variables for large values.
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
 at org.apache.spark

 So, I changed
 set(spark.akka.frameSize , 1000) to set(spark.akka.frameSize ,
 1000*00*)
 but now I get the same error?

 y4j.protocol.Py4JJavaError: An error occurred while calling
 o28.trainKMeansModel.
 : org.apache.spark.SparkException: Job aborted due to stage failure: All
 masters are unresponsive! Giving up.
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGSched


 along with following:
 14/09/15 01:44:11 WARN TaskSchedulerImpl: Initial job has not accepted any
 resources; check your cluster UI to ensure that workers are registered and
 have sufficient memory
 14/09/15 01:44:21 INFO AppClient$ClientActor: Connecting to master
 spark://host:7077...
 14/09/15 01:44:21 WARN AppClient$ClientActor: Could not connect to
 akka.tcp://sparkMaster@host:7077:
 akka.remote.EndpointAssociationException: Association failed with
 [akka.tcp://sparkMaster@host:7077]
 14/09/15 01:44:21 WARN AppClient$ClientActor: Could not connect to
 akka.tcp://sparkMaster@host:7077:
 akka.remote.EndpointAssociationException: Association failed with
 [akka.tcp://sparkMaster@host:7077]
 14/09/15 01:44:21 WARN AppClient$ClientActor: Could not connect to
 akka.tcp://sparkMaster@host:7077:
 akka.remote.EndpointAssociationException: Association failed with
 [akka.tcp://sparkMaster@host:7077]
 14/09/15 01:44:21 WARN AppClient$ClientActor: Could not connect to
 akka.tcp://sparkMaster@host:7077:
 akka.remote.EndpointAssociationException: Association failed with
 [akka.tcp://sparkMaster@host:7077]
 14/09/15 01:44:26 WARN TaskSchedulerImpl: Initial job has not accepted any
 resources; check your cluster UI to ensure that workers are registered and
 have sufficient memory
 14/09/15 01:44:41 WARN TaskSchedulerImpl: Initial job has not accepted any
 resources; check your cluster UI to ensure that workers are registered and
 have sufficient memory
 14/09/15 01:44:41 ERROR SparkDeploySchedulerBackend: Application has been
 killed. Reason: All masters are unresponsive! Giving up.


 :-(

 On Mon, Sep 15, 2014 at 1:20 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Can you give this a try:

 conf = SparkConf().set(spark.executor.memory, 
 32G)*.set(spark.akka.frameSize
 ,
 1000).set(spark.broadcast.factory,org.apache.spark.broadcast.TorrentBroadcastFactory)*
 sc = SparkContext(conf = conf)
 rdd = sc.parallelize(matrix,5)
 from pyspark.mllib.clustering import KMeans
 from math import sqrt
 clusters = KMeans.train(rdd, 5, maxIterations=2,runs=2,
 initializationMode=random)
 def error(point):
 center = clusters.centers[clusters.predict(point)]
 return sqrt(sum([x**2 for x in (point - center)]))
 WSSSE = rdd.map(lambda point: error(point)).reduce(lambda x, y: x + y)
 print Within Set Sum of Squared Error =  + str(WSSSE)


 Thanks
 Best Regards

 On Mon, Sep 15, 2014 at 9:16 AM, Chengi Liu chengi.liu...@gmail.com
 wrote:

 And the thing is code runs just fine if I reduce the number of rows in
 my data?

 On Sun, Sep 14, 2014 at 8:45 PM, Chengi Liu chengi.liu...@gmail.com
 wrote:

 I am using spark1.0.2.
 This is my work cluster.. so I can't setup a new version readily...
 But right now, I am not using broadcast ..


 conf = SparkConf().set(spark.executor.memory,
 32G).set(spark.akka.frameSize, 1000)
 sc = SparkContext(conf = conf)
 rdd = sc.parallelize(matrix,5)

 from pyspark.mllib.clustering import KMeans
 from math import sqrt
 clusters = KMeans.train(rdd, 5, maxIterations=2,runs=2,
 initializationMode=random)
 def error(point):
 center = clusters.centers[clusters.predict(point)]
 return sqrt(sum([x**2 for x in (point - center)]))

 WSSSE = rdd.map(lambda point: error(point)).reduce(lambda x, y: x + y)
 print Within Set 

Re: About SparkSQL 1.1.0 join between more than two table

2014-09-15 Thread Yanbo Liang
Spark SQL can support SQL and HiveSQL which used SQLContext and HiveContext
separate.
As far as I know, SQLContext of Spark SQL 1.1.0 can not support three table
join directly.
However you can modify your query with subquery such as

SELECT * FROM (SELECT * FROM youhao_data left join youhao_age on
(youhao_data.rowkey=youhao_age.rowkey)) tmp left join youhao_totalKiloMeter
on (tmp.rowkey=youhao_totalKiloMeter.rowkey)

HiveContext of Spark 1.1.0 can support three table join.

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
sqlContext.sql(SELECT * FROM youhao_data left join youhao_age on
(youhao_data.rowkey=youhao_age.rowkey) left join youhao_totalKiloMeter on
(youhao_age.rowkey=youhao_totalKiloMeter.rowkey))

2014-09-15 10:41 GMT+08:00 boyingk...@163.com boyingk...@163.com:


 Hi:
 When I use spark SQL (1.0.1), I found it not support join between three
 tables,eg:
  sql(SELECT * FROM youhao_data left join youhao_age on
 (youhao_data.rowkey=youhao_age.rowkey) left join youhao_totalKiloMeter on
 (youhao_age.rowkey=youhao_totalKiloMeter.rowkey))
  I take the Exception:
  Exception in thread main java.lang.RuntimeException: [1.90] failure:
 ``UNION'' expected but `left' found

 If the Spark SQL 1.1.0 has support join between three tables?

 --
  boyingk...@163.com



Re: Broadcast error

2014-09-15 Thread Chengi Liu
So.. same result with parallelize (matrix,1000)
with broadcast.. seems like I got jvm core dump :-/
4/09/15 02:31:22 INFO BlockManagerInfo: Registering block manager
host:47978 with 19.2 GB RAM
14/09/15 02:31:22 INFO BlockManagerInfo: Registering block manager
host:43360 with 19.2 GB RAM
Unhandled exception
Unhandled exception
Type=Segmentation error vmState=0x
J9Generic_Signal_Number=0004 Signal_Number=000b
Error_Value= Signal_Code=0001
Handler1=2BF53760 Handler2=2C3069D0
InaccessibleAddress=
RDI=2AB9505F2698 RSI=2AABAE2C54D8 RAX=2AB7CE6009A0
RBX=2AB7CE6009C0
RCX=FFC7FFE0 RDX=2AB8509726A8 R8=7FE41FF0
R9=2000
R10=2DA318A0 R11=2AB850959520 R12=2AB5EF97DD88
R13=2AB5EF97BD88
R14=2C0CE940 R15=2AB5EF97BD88
RIP= GS= FS= RSP=007367A0
EFlags=00210282 CS=0033 RBP=00BCDB00 ERR=0014
TRAPNO=000E OLDMASK= CR2=
xmm0 4141414141414141 (f: 1094795648.00, d: 2.261635e+06)
xmm1 4141414141414141 (f: 1094795648.00, d: 2.261635e+06)
xmm2 4141414141414141 (f: 1094795648.00, d: 2.261635e+06)
xmm3 4141414141414141 (f: 1094795648.00, d: 2.261635e+06)
xmm4 4141414141414141 (f: 1094795648.00, d: 2.261635e+06)
xmm5 4141414141414141 (f: 1094795648.00, d: 2.261635e+06)
xmm6 4141414141414141 (f: 1094795648.00, d: 2.261635e+06)
xmm7 f180c714f8e2a139 (f: 4175601920.00, d: -5.462583e+238)
xmm8 428e8000 (f: 1116635136.00, d: 5.516911e-315)
xmm9  (f: 0.00, d: 0.00e+00)
xmm10  (f: 0.00, d: 0.00e+00)
xmm11  (f: 0.00, d: 0.00e+00)
xmm12  (f: 0.00, d: 0.00e+00)
xmm13  (f: 0.00, d: 0.00e+00)
xmm14  (f: 0.00, d: 0.00e+00)
xmm15  (f: 0.00, d: 0.00e+00)
Target=2_60_20140106_181350 (Linux 3.0.93-0.8.2_1.0502.8048-cray_ari_c)
CPU=amd64 (48 logical CPUs) (0xfc0c5b000 RAM)
--- Stack Backtrace ---
(0x2C2FA122 [libj9prt26.so+0x13122])
(0x2C30779F [libj9prt26.so+0x2079f])
(0x2C2F9E6B [libj9prt26.so+0x12e6b])
(0x2C2F9F67 [libj9prt26.so+0x12f67])
(0x2C30779F [libj9prt26.so+0x2079f])
(0x2C2F9A8B [libj9prt26.so+0x12a8b])
(0x2BF52C9D [libj9vm26.so+0x1ac9d])
(0x2C30779F [libj9prt26.so+0x2079f])
(0x2BF52F56 [libj9vm26.so+0x1af56])
(0x2BF96CA0 [libj9vm26.so+0x5eca0])
---
JVMDUMP039I
JVMDUMP032I


Note, this still is with the framesize I modified in the last email thread?

On Mon, Sep 15, 2014 at 2:12 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Try:

 rdd = sc.broadcast(matrix)

 Or

 rdd = sc.parallelize(matrix,100) // Just increase the number of slices,
 give it a try.



 Thanks
 Best Regards

 On Mon, Sep 15, 2014 at 2:18 PM, Chengi Liu chengi.liu...@gmail.com
 wrote:

 Hi Akhil,
   So with your config (specifically with set(spark.akka.frameSize ,
 1000)) , I see the error:
 org.apache.spark.SparkException: Job aborted due to stage failure:
 Serialized task 0:0 was 401970046 bytes which exceeds spark.akka.frameSize
 (10485760 bytes). Consider using broadcast variables for large values.
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
 at org.apache.spark

 So, I changed
 set(spark.akka.frameSize , 1000) to set(spark.akka.frameSize ,
 1000*00*)
 but now I get the same error?

 y4j.protocol.Py4JJavaError: An error occurred while calling
 o28.trainKMeansModel.
 : org.apache.spark.SparkException: Job aborted due to stage failure: All
 masters are unresponsive! Giving up.
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGSched


 along with following:
 14/09/15 01:44:11 WARN TaskSchedulerImpl: Initial job has not accepted
 any resources; check your cluster UI to ensure that workers are registered
 and have sufficient memory
 14/09/15 01:44:21 INFO AppClient$ClientActor: Connecting to master
 spark://host:7077...
 14/09/15 01:44:21 WARN AppClient$ClientActor: Could not connect to
 akka.tcp://sparkMaster@host:7077:
 akka.remote.EndpointAssociationException: Association failed with
 

Re: Low Level Kafka Consumer for Spark

2014-09-15 Thread Alon Pe'er
Hi Dibyendu,

Thanks for your great work!

I'm new to Spark Streaming, so I just want to make sure I understand Driver
failure issue correctly.

In my use case, I want to make sure that messages coming in from Kafka are
always broken into the same set of RDDs, meaning that if a set of messages
are assigned to one RDD, and the Driver dies before this RDD is processed,
then once the Driver recovers, the same set of messages are assigned to a
single RDD, instead of arbitrarily repartitioning the messages across
different RDDs.

Does your Receiver guarantee this behavior, until the problem is fixed in
Spark 1.2?

Regards,
Alon



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p14233.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Low Level Kafka Consumer for Spark

2014-09-15 Thread Dibyendu Bhattacharya
Hi Alon,

No this will not be guarantee that same set of messages will come in same
RDD. This fix just re-play the messages from last processed offset in same
order. Again this is just a interim fix we needed to solve our use case .
If you do not need this message re-play feature, just do not perform the
ack ( Acknowledgement) call in the Driver code. Then the processed messages
will not be written to ZK and hence replay will not happen.

Regards,
Dibyendu

On Mon, Sep 15, 2014 at 4:48 PM, Alon Pe'er alo...@supersonicads.com
wrote:

 Hi Dibyendu,

 Thanks for your great work!

 I'm new to Spark Streaming, so I just want to make sure I understand Driver
 failure issue correctly.

 In my use case, I want to make sure that messages coming in from Kafka are
 always broken into the same set of RDDs, meaning that if a set of messages
 are assigned to one RDD, and the Driver dies before this RDD is processed,
 then once the Driver recovers, the same set of messages are assigned to a
 single RDD, instead of arbitrarily repartitioning the messages across
 different RDDs.

 Does your Receiver guarantee this behavior, until the problem is fixed in
 Spark 1.2?

 Regards,
 Alon



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p14233.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Serving data

2014-09-15 Thread Marius Soutier
Thank you guys, I’ll try Parquet and if that’s not quick enough I’ll go the 
usual route with either read-only or normal database.

On 13.09.2014, at 12:45, andy petrella andy.petre...@gmail.com wrote:

 however, the cache is not guaranteed to remain, if other jobs are launched in 
 the cluster and require more memory than what's left in the overall caching 
 memory, previous RDDs will be discarded.
 
 Using an off heap cache like tachyon as a dump repo can help.
 
 In general, I'd say that using a persistent sink (like Cassandra for 
 instance) is best.
 
 my .2¢
 
 
 aℕdy ℙetrella
 about.me/noootsab
 
 
 
 On Sat, Sep 13, 2014 at 9:20 AM, Mayur Rustagi mayur.rust...@gmail.com 
 wrote:
 You can cache data in memory  query it using Spark Job Server. 
 Most folks dump data down to a queue/db for retrieval 
 You can batch up data  store into parquet partitions as well.  query it 
 using another SparkSQL  shell, JDBC driver in SparkSQL is part 1.1 i believe. 
 -- 
 Regards,
 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi
 
 
 On Fri, Sep 12, 2014 at 2:54 PM, Marius Soutier mps@gmail.com wrote:
 
 Hi there, 
 
 I’m pretty new to Spark, and so far I’ve written my jobs the same way I wrote 
 Scalding jobs - one-off, read data from HDFS, count words, write counts back 
 to HDFS. 
 
 Now I want to display these counts in a dashboard. Since Spark allows to 
 cache RDDs in-memory and you have to explicitly terminate your app (and 
 there’s even a new JDBC server in 1.1), I’m assuming it’s possible to keep an 
 app running indefinitely and query an in-memory RDD from the outside (via 
 SparkSQL for example). 
 
 Is this how others are using Spark? Or are you just dumping job results into 
 message queues or databases? 
 
 
 Thanks 
 - Marius 
 
 
 - 
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
 For additional commands, e-mail: user-h...@spark.apache.org 
 
 
 
 



Re: Serving data

2014-09-15 Thread andy petrella
I'm using Parquet in ADAM, and I can say that it works pretty fine!
Enjoy ;-)

aℕdy ℙetrella
about.me/noootsab
[image: aℕdy ℙetrella on about.me]

http://about.me/noootsab

On Mon, Sep 15, 2014 at 1:41 PM, Marius Soutier mps@gmail.com wrote:

 Thank you guys, I’ll try Parquet and if that’s not quick enough I’ll go
 the usual route with either read-only or normal database.

 On 13.09.2014, at 12:45, andy petrella andy.petre...@gmail.com wrote:

 however, the cache is not guaranteed to remain, if other jobs are launched
 in the cluster and require more memory than what's left in the overall
 caching memory, previous RDDs will be discarded.

 Using an off heap cache like tachyon as a dump repo can help.

 In general, I'd say that using a persistent sink (like Cassandra for
 instance) is best.

 my .2¢


 aℕdy ℙetrella
 about.me/noootsab
 [image: aℕdy ℙetrella on about.me]

 http://about.me/noootsab

 On Sat, Sep 13, 2014 at 9:20 AM, Mayur Rustagi mayur.rust...@gmail.com
 wrote:

 You can cache data in memory  query it using Spark Job Server.
 Most folks dump data down to a queue/db for retrieval
 You can batch up data  store into parquet partitions as well.  query it
 using another SparkSQL  shell, JDBC driver in SparkSQL is part 1.1 i
 believe.
 --
 Regards,
 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi


 On Fri, Sep 12, 2014 at 2:54 PM, Marius Soutier mps@gmail.com
 wrote:

 Hi there,

 I’m pretty new to Spark, and so far I’ve written my jobs the same way I
 wrote Scalding jobs - one-off, read data from HDFS, count words, write
 counts back to HDFS.

 Now I want to display these counts in a dashboard. Since Spark allows to
 cache RDDs in-memory and you have to explicitly terminate your app (and
 there’s even a new JDBC server in 1.1), I’m assuming it’s possible to keep
 an app running indefinitely and query an in-memory RDD from the outside
 (via SparkSQL for example).

 Is this how others are using Spark? Or are you just dumping job results
 into message queues or databases?


 Thanks
 - Marius


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Re: Serving data

2014-09-15 Thread Marius Soutier
So you are living the dream of using HDFS as a database? ;)

On 15.09.2014, at 13:50, andy petrella andy.petre...@gmail.com wrote:

 I'm using Parquet in ADAM, and I can say that it works pretty fine!
 Enjoy ;-)
 
 aℕdy ℙetrella
 about.me/noootsab
 
 
 
 On Mon, Sep 15, 2014 at 1:41 PM, Marius Soutier mps@gmail.com wrote:
 Thank you guys, I’ll try Parquet and if that’s not quick enough I’ll go the 
 usual route with either read-only or normal database.
 
 On 13.09.2014, at 12:45, andy petrella andy.petre...@gmail.com wrote:
 
 however, the cache is not guaranteed to remain, if other jobs are launched 
 in the cluster and require more memory than what's left in the overall 
 caching memory, previous RDDs will be discarded.
 
 Using an off heap cache like tachyon as a dump repo can help.
 
 In general, I'd say that using a persistent sink (like Cassandra for 
 instance) is best.
 
 my .2¢
 
 
 aℕdy ℙetrella
 about.me/noootsab
 
 
 
 On Sat, Sep 13, 2014 at 9:20 AM, Mayur Rustagi mayur.rust...@gmail.com 
 wrote:
 You can cache data in memory  query it using Spark Job Server. 
 Most folks dump data down to a queue/db for retrieval 
 You can batch up data  store into parquet partitions as well.  query it 
 using another SparkSQL  shell, JDBC driver in SparkSQL is part 1.1 i 
 believe. 
 -- 
 Regards,
 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi
 
 
 On Fri, Sep 12, 2014 at 2:54 PM, Marius Soutier mps@gmail.com wrote:
 
 Hi there, 
 
 I’m pretty new to Spark, and so far I’ve written my jobs the same way I 
 wrote Scalding jobs - one-off, read data from HDFS, count words, write 
 counts back to HDFS. 
 
 Now I want to display these counts in a dashboard. Since Spark allows to 
 cache RDDs in-memory and you have to explicitly terminate your app (and 
 there’s even a new JDBC server in 1.1), I’m assuming it’s possible to keep 
 an app running indefinitely and query an in-memory RDD from the outside (via 
 SparkSQL for example). 
 
 Is this how others are using Spark? Or are you just dumping job results into 
 message queues or databases? 
 
 
 Thanks 
 - Marius 
 
 
 - 
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
 For additional commands, e-mail: user-h...@spark.apache.org 
 
 
 
 
 
 



Upgrading a standalone cluster on ec2 from 1.0.2 to 1.1.0

2014-09-15 Thread Tomer Benyamini
Hi,

I would like to upgrade a standalone cluster to 1.1.0. What's the best
way to do it? Should I just replace the existing /root/spark folder
with the uncompressed folder from
http://d3kbcqa49mib13.cloudfront.net/spark-1.1.0-bin-cdh4.tgz ? What
about hdfs and other installations?

I have spark 1.0.2 with cdh4 hadoop 2.0 installed currently.

Thanks,
Tomer

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Write 1 RDD to multiple output paths in one go

2014-09-15 Thread Nicholas Chammas
Any tips from anybody on how to do this in PySpark? (Or regular Spark, for
that matter.)

On Sat, Sep 13, 2014 at 1:25 PM, Nick Chammas nicholas.cham...@gmail.com
wrote:

 Howdy doody Spark Users,

 I’d like to somehow write out a single RDD to multiple paths in one go.
 Here’s an example.

 I have an RDD of (key, value) pairs like this:

  a = sc.parallelize(['Nick', 'Nancy', 'Bob', 'Ben', 
  'Frankie']).keyBy(lambda x: x[0]) a.collect()
 [('N', 'Nick'), ('N', 'Nancy'), ('B', 'Bob'), ('B', 'Ben'), ('F', 'Frankie')]

 Now I want to write the RDD out to different paths depending on the keys,
 so that I have one output directory per distinct key. Each output directory
 could potentially have multiple part- files or whatever.

 So my output would be something like:

 /path/prefix/n [/part-1, /part-2, etc]
 /path/prefix/b [/part-1, /part-2, etc]
 /path/prefix/f [/part-1, /part-2, etc]

 How would you do that?

 I suspect I need to use saveAsNewAPIHadoopFile
 http://spark.apache.org/docs/latest/api/python/pyspark.rdd.RDD-class.html#saveAsNewAPIHadoopFile
 or saveAsHadoopFile
 http://spark.apache.org/docs/latest/api/python/pyspark.rdd.RDD-class.html#saveAsHadoopFile
 along with the MultipleTextOutputFormat output format class, but I’m not
 sure how.

 By the way, there is a very similar question to this here on Stack
 Overflow
 http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job
 .

 Nick
 ​

 --
 View this message in context: Write 1 RDD to multiple output paths in one
 go
 http://apache-spark-user-list.1001560.n3.nabble.com/Write-1-RDD-to-multiple-output-paths-in-one-go-tp14174.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.



vertex active/inactive feature in Pregel API ?

2014-09-15 Thread Yifan LI
Hi,

I am wondering if the vertex active/inactive(corresponding the change of its 
value between two supersteps) feature is introduced in Pregel API of GraphX?

if it is not a default setting, how to call it below? 
  def sendMessage(edge: EdgeTriplet[(Int,HashMap[VertexId, Double]), Int]) =
Iterator((edge.dstId, hmCal(edge.srcAttr)))

or, I should do that by a customised measure function, e.g. by keeping its 
change in vertex attribute after each iteration.


I noticed that there is an optional parameter “skipStale in mrTriplets 
operator.


Best,
Yifan LI

Found both spark.driver.extraClassPath and SPARK_CLASSPATH

2014-09-15 Thread Koert Kuipers
in spark 1.1.0 i get this error:

2014-09-14 23:17:01 ERROR actor.OneForOneStrategy: Found both
spark.driver.extraClassPath and SPARK_CLASSPATH. Use only the former.

i checked my application. i do not set spark.driver.extraClassPath or
SPARK_CLASSPATH.

SPARK_CLASSPATH is set in spark-env.sh since the machine is a worker and
this is how lzo is added to classpath. as a user i cannot modify this.

looking at the logs i see the value of SPARK_CLASSPATH ends up in
spark.driver.extraClassPath, but thats not done by me. i presume some logic
in spark-submit does this.


Re: About SparkSQL 1.1.0 join between more than two table

2014-09-15 Thread Yin Huai
1.0.1 does not have the support on outer joins (added in 1.1). Your query
should be fine in 1.1.

On Mon, Sep 15, 2014 at 5:35 AM, Yanbo Liang yanboha...@gmail.com wrote:

 Spark SQL can support SQL and HiveSQL which used SQLContext and
 HiveContext separate.
 As far as I know, SQLContext of Spark SQL 1.1.0 can not support three
 table join directly.
 However you can modify your query with subquery such as

 SELECT * FROM (SELECT * FROM youhao_data left join youhao_age on
 (youhao_data.rowkey=youhao_age.rowkey)) tmp left join
 youhao_totalKiloMeter on (tmp.rowkey=youhao_totalKiloMeter.rowkey)

 HiveContext of Spark 1.1.0 can support three table join.

 val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
 sqlContext.sql(SELECT * FROM youhao_data left join youhao_age on
 (youhao_data.rowkey=youhao_age.rowkey) left join youhao_totalKiloMeter on
 (youhao_age.rowkey=youhao_totalKiloMeter.rowkey))

 2014-09-15 10:41 GMT+08:00 boyingk...@163.com boyingk...@163.com:


 Hi:
 When I use spark SQL (1.0.1), I found it not support join between three
 tables,eg:
  sql(SELECT * FROM youhao_data left join youhao_age on
 (youhao_data.rowkey=youhao_age.rowkey) left join youhao_totalKiloMeter on
 (youhao_age.rowkey=youhao_totalKiloMeter.rowkey))
  I take the Exception:
  Exception in thread main java.lang.RuntimeException: [1.90] failure:
 ``UNION'' expected but `left' found

 If the Spark SQL 1.1.0 has support join between three tables?

 --
  boyingk...@163.com





Compiler issues for multiple map on RDD

2014-09-15 Thread Boromir Widas
Hello Folks,

I am trying to chain a couple of map operations and it seems the second map
fails with a mismatch in arguments(event though the compiler prints them to
be the same.) I checked the function and variable types using :t and they
look ok to me.

Have you seen this earlier? I am posting the code, data and output below.

Any pointers will be greatly appreciated.

Thanks,
Boromir.

/// SCRIPT
val data = sc.textFile(data/testpv.csv)

case class KVV(key: String, valvec: Array[Double])

def mapToKV(line: String) : KVV = {
val splits = line.split(,)
val key = splits(0).trim
val valvec = splits.drop(1).map(x = x.trim.toDouble)

val kvv = KVV(key, valvec)
return kvv
}

val kvs = data.map(line = mapToKV(line))

def mapKVtoKVL(kvv: KVV) : KVV = {
return kvv
}
val tvar = kvs.map(x = mapKVtoKVL(x))

/// SAMPLE DATA in testpv.csv
x,1.1,1.2,1.3
y,2.1,2.2,2.3

/// REPL OUTPUT
scala val data = sc.textFile(data/testpv.csv)
14/09/15 10:53:23 INFO MemoryStore: ensureFreeSpace(146579) called with
curMem=0, maxMem=308713881
14/09/15 10:53:23 INFO MemoryStore: Block broadcast_0 stored as values to
memory (estimated size 143.1 KB, free 294.3 MB)
data: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at
console:12

scala case class KVV(key: String, valvec: Array[Double])
defined class KVV

scala

scala def mapToKV(line: String) : KVV = {
 | val splits = line.split(,)
 | val key = splits(0).trim
 | val valvec = splits.drop(1).map(x = x.trim.toDouble)
 |
 | val kvv = KVV(key, valvec)
 | return kvv
 | }
mapToKV: (line: String)KVV

scala val kvs = data.map(line = mapToKV(line))
kvs: org.apache.spark.rdd.RDD[KVV] = MappedRDD[2] at map at console:18

scala

scala def mapKVtoKVL(kvv: KVV) : KVV = {
 | return kvv
 | }
mapKVtoKVL: (kvv: KVV)KVV

scala val tvar = kvs.map(x = mapKVtoKVL(x))
console:22: error: type mismatch;
 found   : KVV
 required: KVV
   val tvar = kvs.map(x = mapKVtoKVL(x))
  ^


File I/O in spark

2014-09-15 Thread rapelly kartheek
Hi

I am trying to perform some read/write file operations in spark. Somehow I
am neither able to write to a file nor read.

import java.io._

  val writer = new PrintWriter(new File(test.txt ))

  writer.write(Hello Scala)


Can someone please tell me how to perform file I/O in spark.


Re: Compiler issues for multiple map on RDD

2014-09-15 Thread Sean Owen
Looks like another instance of
https://issues.apache.org/jira/browse/SPARK-1199 which was intended to
be fixed in 1.1.0.

I'm not clear whether https://issues.apache.org/jira/browse/SPARK-2620
is the same issue and therefore whether it too is resolved in 1.1?

On Mon, Sep 15, 2014 at 4:37 PM, Boromir Widas vcsub...@gmail.com wrote:
 Hello Folks,

 I am trying to chain a couple of map operations and it seems the second map
 fails with a mismatch in arguments(event though the compiler prints them to
 be the same.) I checked the function and variable types using :t and they
 look ok to me.

 Have you seen this earlier? I am posting the code, data and output below.

 Any pointers will be greatly appreciated.

 Thanks,
 Boromir.

 /// SCRIPT
 val data = sc.textFile(data/testpv.csv)

 case class KVV(key: String, valvec: Array[Double])

 def mapToKV(line: String) : KVV = {
 val splits = line.split(,)
 val key = splits(0).trim
 val valvec = splits.drop(1).map(x = x.trim.toDouble)

 val kvv = KVV(key, valvec)
 return kvv
 }

 val kvs = data.map(line = mapToKV(line))

 def mapKVtoKVL(kvv: KVV) : KVV = {
 return kvv
 }
 val tvar = kvs.map(x = mapKVtoKVL(x))

 /// SAMPLE DATA in testpv.csv
 x,1.1,1.2,1.3
 y,2.1,2.2,2.3

 /// REPL OUTPUT
 scala val data = sc.textFile(data/testpv.csv)
 14/09/15 10:53:23 INFO MemoryStore: ensureFreeSpace(146579) called with
 curMem=0, maxMem=308713881
 14/09/15 10:53:23 INFO MemoryStore: Block broadcast_0 stored as values to
 memory (estimated size 143.1 KB, free 294.3 MB)
 data: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at
 console:12

 scala case class KVV(key: String, valvec: Array[Double])
 defined class KVV

 scala

 scala def mapToKV(line: String) : KVV = {
  | val splits = line.split(,)
  | val key = splits(0).trim
  | val valvec = splits.drop(1).map(x = x.trim.toDouble)
  |
  | val kvv = KVV(key, valvec)
  | return kvv
  | }
 mapToKV: (line: String)KVV

 scala val kvs = data.map(line = mapToKV(line))
 kvs: org.apache.spark.rdd.RDD[KVV] = MappedRDD[2] at map at console:18

 scala

 scala def mapKVtoKVL(kvv: KVV) : KVV = {
  | return kvv
  | }
 mapKVtoKVL: (kvv: KVV)KVV

 scala val tvar = kvs.map(x = mapKVtoKVL(x))
 console:22: error: type mismatch;
  found   : KVV
  required: KVV
val tvar = kvs.map(x = mapKVtoKVL(x))
   ^

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



scala 2.11?

2014-09-15 Thread Mohit Jaggi
Folks,
I understand Spark SQL uses quasiquotes. Does that mean Spark has now moved
to Scala 2.11?

Mohit.


Re: How to initiate a shutdown of Spark Streaming context?

2014-09-15 Thread stanley
Thank you. 

Would the following approaches to address this problem an overkills?

a. create a ServerSocket in a different thread from the main thread that
created the Spark StreamingContext, and listens to shutdown command there
b. create a web service that wraps around the main thread that created the
Spark StreamingContext, and responds to shutdown requests

Does Spark Streaming already provide similar capabilities? 

Stanley



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-initiate-a-shutdown-of-Spark-Streaming-context-tp14092p14252.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: File I/O in spark

2014-09-15 Thread Mohit Jaggi
Is this code running in an executor? You need to make sure the file is
accessible on ALL executors. One way to do that is to use a distributed
filesystem like HDFS or GlusterFS.

On Mon, Sep 15, 2014 at 8:51 AM, rapelly kartheek kartheek.m...@gmail.com
wrote:

 Hi

 I am trying to perform some read/write file operations in spark. Somehow I
 am neither able to write to a file nor read.

 import java.io._

   val writer = new PrintWriter(new File(test.txt ))

   writer.write(Hello Scala)


 Can someone please tell me how to perform file I/O in spark.




Re: Compiler issues for multiple map on RDD

2014-09-15 Thread Sean Owen
(Adding back the user list)

Boromir says:

Thanks much Sean, verified 1.1.0 does not have this issue.

On Mon, Sep 15, 2014 at 4:47 PM, Sean Owen so...@cloudera.com wrote:
 Looks like another instance of
 https://issues.apache.org/jira/browse/SPARK-1199 which was intended to
 be fixed in 1.1.0.

 I'm not clear whether https://issues.apache.org/jira/browse/SPARK-2620
 is the same issue and therefore whether it too is resolved in 1.1?

 On Mon, Sep 15, 2014 at 4:37 PM, Boromir Widas vcsub...@gmail.com wrote:
 Hello Folks,

 I am trying to chain a couple of map operations and it seems the second map
 fails with a mismatch in arguments(event though the compiler prints them to
 be the same.) I checked the function and variable types using :t and they
 look ok to me.

 Have you seen this earlier? I am posting the code, data and output below.

 Any pointers will be greatly appreciated.

 Thanks,
 Boromir.

 /// SCRIPT
 val data = sc.textFile(data/testpv.csv)

 case class KVV(key: String, valvec: Array[Double])

 def mapToKV(line: String) : KVV = {
 val splits = line.split(,)
 val key = splits(0).trim
 val valvec = splits.drop(1).map(x = x.trim.toDouble)

 val kvv = KVV(key, valvec)
 return kvv
 }

 val kvs = data.map(line = mapToKV(line))

 def mapKVtoKVL(kvv: KVV) : KVV = {
 return kvv
 }
 val tvar = kvs.map(x = mapKVtoKVL(x))

 /// SAMPLE DATA in testpv.csv
 x,1.1,1.2,1.3
 y,2.1,2.2,2.3

 /// REPL OUTPUT
 scala val data = sc.textFile(data/testpv.csv)
 14/09/15 10:53:23 INFO MemoryStore: ensureFreeSpace(146579) called with
 curMem=0, maxMem=308713881
 14/09/15 10:53:23 INFO MemoryStore: Block broadcast_0 stored as values to
 memory (estimated size 143.1 KB, free 294.3 MB)
 data: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at
 console:12

 scala case class KVV(key: String, valvec: Array[Double])
 defined class KVV

 scala

 scala def mapToKV(line: String) : KVV = {
  | val splits = line.split(,)
  | val key = splits(0).trim
  | val valvec = splits.drop(1).map(x = x.trim.toDouble)
  |
  | val kvv = KVV(key, valvec)
  | return kvv
  | }
 mapToKV: (line: String)KVV

 scala val kvs = data.map(line = mapToKV(line))
 kvs: org.apache.spark.rdd.RDD[KVV] = MappedRDD[2] at map at console:18

 scala

 scala def mapKVtoKVL(kvv: KVV) : KVV = {
  | return kvv
  | }
 mapKVtoKVL: (kvv: KVV)KVV

 scala val tvar = kvs.map(x = mapKVtoKVL(x))
 console:22: error: type mismatch;
  found   : KVV
  required: KVV
val tvar = kvs.map(x = mapKVtoKVL(x))
   ^

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: File I/O in spark

2014-09-15 Thread rapelly kartheek
Yes. I have HDFS. My cluster has 5 nodes. When I run the above commands, I
see that the file gets created in the master node. But, there wont be any
data written to it.


On Mon, Sep 15, 2014 at 10:06 PM, Mohit Jaggi mohitja...@gmail.com wrote:

 Is this code running in an executor? You need to make sure the file is
 accessible on ALL executors. One way to do that is to use a distributed
 filesystem like HDFS or GlusterFS.

 On Mon, Sep 15, 2014 at 8:51 AM, rapelly kartheek kartheek.m...@gmail.com
  wrote:

 Hi

 I am trying to perform some read/write file operations in spark. Somehow
 I am neither able to write to a file nor read.

 import java.io._

   val writer = new PrintWriter(new File(test.txt ))

   writer.write(Hello Scala)


 Can someone please tell me how to perform file I/O in spark.





Re: File I/O in spark

2014-09-15 Thread rapelly kartheek
The file gets created on the fly. So I dont know how to make sure that its
accessible to all nodes.

On Mon, Sep 15, 2014 at 10:10 PM, rapelly kartheek kartheek.m...@gmail.com
wrote:

 Yes. I have HDFS. My cluster has 5 nodes. When I run the above commands, I
 see that the file gets created in the master node. But, there wont be any
 data written to it.


 On Mon, Sep 15, 2014 at 10:06 PM, Mohit Jaggi mohitja...@gmail.com
 wrote:

 Is this code running in an executor? You need to make sure the file is
 accessible on ALL executors. One way to do that is to use a distributed
 filesystem like HDFS or GlusterFS.

 On Mon, Sep 15, 2014 at 8:51 AM, rapelly kartheek 
 kartheek.m...@gmail.com wrote:

 Hi

 I am trying to perform some read/write file operations in spark. Somehow
 I am neither able to write to a file nor read.

 import java.io._

   val writer = new PrintWriter(new File(test.txt ))

   writer.write(Hello Scala)


 Can someone please tell me how to perform file I/O in spark.






Re: File I/O in spark

2014-09-15 Thread Mohit Jaggi
But the above APIs are not for HDFS.

On Mon, Sep 15, 2014 at 9:40 AM, rapelly kartheek kartheek.m...@gmail.com
wrote:

 Yes. I have HDFS. My cluster has 5 nodes. When I run the above commands, I
 see that the file gets created in the master node. But, there wont be any
 data written to it.


 On Mon, Sep 15, 2014 at 10:06 PM, Mohit Jaggi mohitja...@gmail.com
 wrote:

 Is this code running in an executor? You need to make sure the file is
 accessible on ALL executors. One way to do that is to use a distributed
 filesystem like HDFS or GlusterFS.

 On Mon, Sep 15, 2014 at 8:51 AM, rapelly kartheek 
 kartheek.m...@gmail.com wrote:

 Hi

 I am trying to perform some read/write file operations in spark. Somehow
 I am neither able to write to a file nor read.

 import java.io._

   val writer = new PrintWriter(new File(test.txt ))

   writer.write(Hello Scala)


 Can someone please tell me how to perform file I/O in spark.






Re: scala 2.11?

2014-09-15 Thread Mark Hamstra
No, not yet.  Spark SQL is using org.scalamacros:quasiquotes_2.10.

On Mon, Sep 15, 2014 at 9:28 AM, Mohit Jaggi mohitja...@gmail.com wrote:

 Folks,
 I understand Spark SQL uses quasiquotes. Does that mean Spark has now
 moved to Scala 2.11?

 Mohit.



Re: scala 2.11?

2014-09-15 Thread Mohit Jaggi
ah...thanks!

On Mon, Sep 15, 2014 at 9:47 AM, Mark Hamstra m...@clearstorydata.com
wrote:

 No, not yet.  Spark SQL is using org.scalamacros:quasiquotes_2.10.

 On Mon, Sep 15, 2014 at 9:28 AM, Mohit Jaggi mohitja...@gmail.com wrote:

 Folks,
 I understand Spark SQL uses quasiquotes. Does that mean Spark has now
 moved to Scala 2.11?

 Mohit.





Re: File I/O in spark

2014-09-15 Thread rapelly kartheek
I came across these APIs in one the scala tutorials over the net.

On Mon, Sep 15, 2014 at 10:14 PM, Mohit Jaggi mohitja...@gmail.com wrote:

 But the above APIs are not for HDFS.

 On Mon, Sep 15, 2014 at 9:40 AM, rapelly kartheek kartheek.m...@gmail.com
  wrote:

 Yes. I have HDFS. My cluster has 5 nodes. When I run the above commands,
 I see that the file gets created in the master node. But, there wont be any
 data written to it.


 On Mon, Sep 15, 2014 at 10:06 PM, Mohit Jaggi mohitja...@gmail.com
 wrote:

 Is this code running in an executor? You need to make sure the file is
 accessible on ALL executors. One way to do that is to use a distributed
 filesystem like HDFS or GlusterFS.

 On Mon, Sep 15, 2014 at 8:51 AM, rapelly kartheek 
 kartheek.m...@gmail.com wrote:

 Hi

 I am trying to perform some read/write file operations in spark.
 Somehow I am neither able to write to a file nor read.

 import java.io._

   val writer = new PrintWriter(new File(test.txt ))

   writer.write(Hello Scala)


 Can someone please tell me how to perform file I/O in spark.







Re: File I/O in spark

2014-09-15 Thread rapelly kartheek
Can you please direct me to the right way of doing this.

On Mon, Sep 15, 2014 at 10:18 PM, rapelly kartheek kartheek.m...@gmail.com
wrote:

 I came across these APIs in one the scala tutorials over the net.

 On Mon, Sep 15, 2014 at 10:14 PM, Mohit Jaggi mohitja...@gmail.com
 wrote:

 But the above APIs are not for HDFS.

 On Mon, Sep 15, 2014 at 9:40 AM, rapelly kartheek 
 kartheek.m...@gmail.com wrote:

 Yes. I have HDFS. My cluster has 5 nodes. When I run the above commands,
 I see that the file gets created in the master node. But, there wont be any
 data written to it.


 On Mon, Sep 15, 2014 at 10:06 PM, Mohit Jaggi mohitja...@gmail.com
 wrote:

 Is this code running in an executor? You need to make sure the file is
 accessible on ALL executors. One way to do that is to use a distributed
 filesystem like HDFS or GlusterFS.

 On Mon, Sep 15, 2014 at 8:51 AM, rapelly kartheek 
 kartheek.m...@gmail.com wrote:

 Hi

 I am trying to perform some read/write file operations in spark.
 Somehow I am neither able to write to a file nor read.

 import java.io._

   val writer = new PrintWriter(new File(test.txt ))

   writer.write(Hello Scala)


 Can someone please tell me how to perform file I/O in spark.








Re: File I/O in spark

2014-09-15 Thread Mohit Jaggi
If you underlying filesystem is HDFS, you need to use HDFS APIs. A google
search brought up this link which appears reasonable.

http://wiki.apache.org/hadoop/HadoopDfsReadWriteExample

If you want to use java.io APIs, you have to make sure your filesystem is
accessible from all nodes in your cluster. You did not mention what errors
you get with your code. They may mean something.


On Mon, Sep 15, 2014 at 9:51 AM, rapelly kartheek kartheek.m...@gmail.com
wrote:

 Can you please direct me to the right way of doing this.

 On Mon, Sep 15, 2014 at 10:18 PM, rapelly kartheek 
 kartheek.m...@gmail.com wrote:

 I came across these APIs in one the scala tutorials over the net.

 On Mon, Sep 15, 2014 at 10:14 PM, Mohit Jaggi mohitja...@gmail.com
 wrote:

 But the above APIs are not for HDFS.

 On Mon, Sep 15, 2014 at 9:40 AM, rapelly kartheek 
 kartheek.m...@gmail.com wrote:

 Yes. I have HDFS. My cluster has 5 nodes. When I run the above
 commands, I see that the file gets created in the master node. But, there
 wont be any data written to it.


 On Mon, Sep 15, 2014 at 10:06 PM, Mohit Jaggi mohitja...@gmail.com
 wrote:

 Is this code running in an executor? You need to make sure the file is
 accessible on ALL executors. One way to do that is to use a distributed
 filesystem like HDFS or GlusterFS.

 On Mon, Sep 15, 2014 at 8:51 AM, rapelly kartheek 
 kartheek.m...@gmail.com wrote:

 Hi

 I am trying to perform some read/write file operations in spark.
 Somehow I am neither able to write to a file nor read.

 import java.io._

   val writer = new PrintWriter(new File(test.txt ))

   writer.write(Hello Scala)


 Can someone please tell me how to perform file I/O in spark.









Re: File I/O in spark

2014-09-15 Thread Frank Austin Nothaft
Kartheek,

What exactly are you trying to do? Those APIs are only for local file access.

If you want to access data in HDFS, you’ll want to use one of the reader 
methods in org.apache.spark.SparkContext which will give you an RDD (e.g., 
newAPIHadoopFile, sequenceFile, or textFile). If you want to write data to 
HDFS, you’ll need to have an RDD and use one of the functions in 
org.apache.spark.RDD (saveAsObjectFile or saveAsTextFile) or one of the 
PairRDDFunctions (e.g., saveAsNewAPIHadoopFile or saveAsNewAPIHadoopDataset).

The Scala/Java IO system can be used inside of Spark, but only for local file 
access. This is used to implement the rdd.pipe method (IIRC), and we use it in 
some downstream apps to do IO with processes that we spawn from mapPartitions 
calls (see here and here).

Regards,

Frank Austin Nothaft
fnoth...@berkeley.edu
fnoth...@eecs.berkeley.edu
202-340-0466

On Sep 15, 2014, at 9:44 AM, rapelly kartheek kartheek.m...@gmail.com wrote:

 The file gets created on the fly. So I dont know how to make sure that its 
 accessible to all nodes.
 
 On Mon, Sep 15, 2014 at 10:10 PM, rapelly kartheek kartheek.m...@gmail.com 
 wrote:
 Yes. I have HDFS. My cluster has 5 nodes. When I run the above commands, I 
 see that the file gets created in the master node. But, there wont be any 
 data written to it.
 
 
 On Mon, Sep 15, 2014 at 10:06 PM, Mohit Jaggi mohitja...@gmail.com wrote:
 Is this code running in an executor? You need to make sure the file is 
 accessible on ALL executors. One way to do that is to use a distributed 
 filesystem like HDFS or GlusterFS.
 
 On Mon, Sep 15, 2014 at 8:51 AM, rapelly kartheek kartheek.m...@gmail.com 
 wrote:
 Hi 
 
 I am trying to perform some read/write file operations in spark. Somehow I am 
 neither able to write to a file nor read.
 
 import java.io._
   val writer = new PrintWriter(new File(test.txt ))
   writer.write(Hello Scala)
 
 Can someone please tell me how to perform file I/O in spark.
 
 
 
 



Re: Broadcast error

2014-09-15 Thread Davies Liu
I think the 1.1 will be really helpful for you, it's all compatitble
with 1.0, so it's
not hard to upgrade to 1.1.

On Mon, Sep 15, 2014 at 2:35 AM, Chengi Liu chengi.liu...@gmail.com wrote:
 So.. same result with parallelize (matrix,1000)
 with broadcast.. seems like I got jvm core dump :-/
 4/09/15 02:31:22 INFO BlockManagerInfo: Registering block manager host:47978
 with 19.2 GB RAM
 14/09/15 02:31:22 INFO BlockManagerInfo: Registering block manager
 host:43360 with 19.2 GB RAM
 Unhandled exception
 Unhandled exception
 Type=Segmentation error vmState=0x
 J9Generic_Signal_Number=0004 Signal_Number=000b Error_Value=
 Signal_Code=0001
 Handler1=2BF53760 Handler2=2C3069D0
 InaccessibleAddress=
 RDI=2AB9505F2698 RSI=2AABAE2C54D8 RAX=2AB7CE6009A0
 RBX=2AB7CE6009C0
 RCX=FFC7FFE0 RDX=2AB8509726A8 R8=7FE41FF0
 R9=2000
 R10=2DA318A0 R11=2AB850959520 R12=2AB5EF97DD88
 R13=2AB5EF97BD88
 R14=2C0CE940 R15=2AB5EF97BD88
 RIP= GS= FS= RSP=007367A0
 EFlags=00210282 CS=0033 RBP=00BCDB00 ERR=0014
 TRAPNO=000E OLDMASK= CR2=
 xmm0 4141414141414141 (f: 1094795648.00, d: 2.261635e+06)
 xmm1 4141414141414141 (f: 1094795648.00, d: 2.261635e+06)
 xmm2 4141414141414141 (f: 1094795648.00, d: 2.261635e+06)
 xmm3 4141414141414141 (f: 1094795648.00, d: 2.261635e+06)
 xmm4 4141414141414141 (f: 1094795648.00, d: 2.261635e+06)
 xmm5 4141414141414141 (f: 1094795648.00, d: 2.261635e+06)
 xmm6 4141414141414141 (f: 1094795648.00, d: 2.261635e+06)
 xmm7 f180c714f8e2a139 (f: 4175601920.00, d: -5.462583e+238)
 xmm8 428e8000 (f: 1116635136.00, d: 5.516911e-315)
 xmm9  (f: 0.00, d: 0.00e+00)
 xmm10  (f: 0.00, d: 0.00e+00)
 xmm11  (f: 0.00, d: 0.00e+00)
 xmm12  (f: 0.00, d: 0.00e+00)
 xmm13  (f: 0.00, d: 0.00e+00)
 xmm14  (f: 0.00, d: 0.00e+00)
 xmm15  (f: 0.00, d: 0.00e+00)
 Target=2_60_20140106_181350 (Linux 3.0.93-0.8.2_1.0502.8048-cray_ari_c)
 CPU=amd64 (48 logical CPUs) (0xfc0c5b000 RAM)
 --- Stack Backtrace ---
 (0x2C2FA122 [libj9prt26.so+0x13122])
 (0x2C30779F [libj9prt26.so+0x2079f])
 (0x2C2F9E6B [libj9prt26.so+0x12e6b])
 (0x2C2F9F67 [libj9prt26.so+0x12f67])
 (0x2C30779F [libj9prt26.so+0x2079f])
 (0x2C2F9A8B [libj9prt26.so+0x12a8b])
 (0x2BF52C9D [libj9vm26.so+0x1ac9d])
 (0x2C30779F [libj9prt26.so+0x2079f])
 (0x2BF52F56 [libj9vm26.so+0x1af56])
 (0x2BF96CA0 [libj9vm26.so+0x5eca0])
 ---
 JVMDUMP039I
 JVMDUMP032I


 Note, this still is with the framesize I modified in the last email thread?

 On Mon, Sep 15, 2014 at 2:12 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Try:

 rdd = sc.broadcast(matrix)

 Or

 rdd = sc.parallelize(matrix,100) // Just increase the number of slices,
 give it a try.



 Thanks
 Best Regards

 On Mon, Sep 15, 2014 at 2:18 PM, Chengi Liu chengi.liu...@gmail.com
 wrote:

 Hi Akhil,
   So with your config (specifically with set(spark.akka.frameSize ,
 1000)) , I see the error:
 org.apache.spark.SparkException: Job aborted due to stage failure:
 Serialized task 0:0 was 401970046 bytes which exceeds spark.akka.frameSize
 (10485760 bytes). Consider using broadcast variables for large values.
 at
 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
 at org.apache.spark

 So, I changed
 set(spark.akka.frameSize , 1000) to set(spark.akka.frameSize ,
 10)
 but now I get the same error?

 y4j.protocol.Py4JJavaError: An error occurred while calling
 o28.trainKMeansModel.
 : org.apache.spark.SparkException: Job aborted due to stage failure: All
 masters are unresponsive! Giving up.
 at
 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGSched


 along with following:
 14/09/15 01:44:11 WARN TaskSchedulerImpl: Initial job has not accepted
 any resources; check your cluster UI to ensure that workers are registered
 and have sufficient memory
 14/09/15 01:44:21 INFO 

Re: Low Level Kafka Consumer for Spark

2014-09-15 Thread Tim Smith
Hi Dibyendu,

I am a little confused about the need for rate limiting input from
kafka. If the stream coming in from kafka has higher message/second
rate than what a Spark job can process then it should simply build a
backlog in Spark if the RDDs are cached on disk using persist().
Right?

Thanks,

Tim


On Mon, Sep 15, 2014 at 4:33 AM, Dibyendu Bhattacharya
dibyendu.bhattach...@gmail.com wrote:
 Hi Alon,

 No this will not be guarantee that same set of messages will come in same
 RDD. This fix just re-play the messages from last processed offset in same
 order. Again this is just a interim fix we needed to solve our use case . If
 you do not need this message re-play feature, just do not perform the ack (
 Acknowledgement) call in the Driver code. Then the processed messages will
 not be written to ZK and hence replay will not happen.

 Regards,
 Dibyendu

 On Mon, Sep 15, 2014 at 4:48 PM, Alon Pe'er alo...@supersonicads.com
 wrote:

 Hi Dibyendu,

 Thanks for your great work!

 I'm new to Spark Streaming, so I just want to make sure I understand
 Driver
 failure issue correctly.

 In my use case, I want to make sure that messages coming in from Kafka are
 always broken into the same set of RDDs, meaning that if a set of messages
 are assigned to one RDD, and the Driver dies before this RDD is processed,
 then once the Driver recovers, the same set of messages are assigned to a
 single RDD, instead of arbitrarily repartitioning the messages across
 different RDDs.

 Does your Receiver guarantee this behavior, until the problem is fixed in
 Spark 1.2?

 Regards,
 Alon



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p14233.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: PathFilter for newAPIHadoopFile?

2014-09-15 Thread Davies Liu
In PySpark, I think you could enumerate all the valid files, and create RDD by
newAPIHadoopFile(), then union them together.

On Mon, Sep 15, 2014 at 5:49 AM, Eric Friedman
eric.d.fried...@gmail.com wrote:
 I neglected to specify that I'm using pyspark. Doesn't look like these APIs 
 have been bridged.

 
 Eric Friedman

 On Sep 14, 2014, at 11:02 PM, Nat Padmanabhan reachn...@gmail.com wrote:

 Hi Eric,

 Something along the lines of the following should work

 val fs = getFileSystem(...) // standard hadoop API call
 val filteredConcatenatedPaths = fs.listStatus(topLevelDirPath,
 pathFilter).map(_.getPath.toString).mkString(,)  // pathFilter is an
 instance of org.apache.hadoop.fs.PathFilter
 val parquetRdd = sc.hadoopFile(filteredConcatenatedPaths,
 classOf[ParquetInputFormat[Something]], classOf[Void],
 classOf[SomeAvroType], getConfiguration(...))

 You have to do some initializations on ParquetInputFormat such as
 AvroReadSetup/AvroWriteSupport etc but that you should be doing
 already I am guessing.

 Cheers,
 Nat


 On Sun, Sep 14, 2014 at 7:37 PM, Eric Friedman
 eric.d.fried...@gmail.com wrote:
 Hi,

 I have a directory structure with parquet+avro data in it. There are a
 couple of administrative files (.foo and/or _foo) that I need to ignore when
 processing this data or Spark tries to read them as containing parquet
 content, which they do not.

 How can I set a PathFilter on the FileInputFormat used to construct an RDD?

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Write 1 RDD to multiple output paths in one go

2014-09-15 Thread Davies Liu
Maybe we should provide an API like saveTextFilesByKey(path),
could you create an JIRA for it ?

There is one in DPark [1] actually.

[1] https://github.com/douban/dpark/blob/master/dpark/rdd.py#L309

On Mon, Sep 15, 2014 at 7:08 AM, Nicholas Chammas
nicholas.cham...@gmail.com wrote:
 Any tips from anybody on how to do this in PySpark? (Or regular Spark, for
 that matter.)

 On Sat, Sep 13, 2014 at 1:25 PM, Nick Chammas nicholas.cham...@gmail.com
 wrote:

 Howdy doody Spark Users,

 I’d like to somehow write out a single RDD to multiple paths in one go.
 Here’s an example.

 I have an RDD of (key, value) pairs like this:

  a = sc.parallelize(['Nick', 'Nancy', 'Bob', 'Ben',
  'Frankie']).keyBy(lambda x: x[0])
  a.collect()
 [('N', 'Nick'), ('N', 'Nancy'), ('B', 'Bob'), ('B', 'Ben'), ('F',
 'Frankie')]

 Now I want to write the RDD out to different paths depending on the keys,
 so that I have one output directory per distinct key. Each output directory
 could potentially have multiple part- files or whatever.

 So my output would be something like:

 /path/prefix/n [/part-1, /part-2, etc]
 /path/prefix/b [/part-1, /part-2, etc]
 /path/prefix/f [/part-1, /part-2, etc]

 How would you do that?

 I suspect I need to use saveAsNewAPIHadoopFile or saveAsHadoopFile along
 with the MultipleTextOutputFormat output format class, but I’m not sure how.

 By the way, there is a very similar question to this here on Stack
 Overflow.

 Nick


 
 View this message in context: Write 1 RDD to multiple output paths in one
 go
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Low Level Kafka Consumer for Spark

2014-09-15 Thread Dibyendu Bhattacharya
Hi Tim,

I have not tried persist the RDD.

Here are some discussion on Rate Limiting Spark Streaming is there in this
thread.

http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-rate-limiting-from-kafka-td8590.html

There is a Pull Request https://github.com/apache/spark/pull/945/files to
fix this Rate Limiting issue at BlockGenerator level.

But while testing with heavy load, this fix did not solve my problem. So I
had to have Rate Limiting built into Kafka Consumer. I will make it
configurable soon.

If this is not done, I can see Block are getting dropped which leads to Job
failure.

I have raised this in another thread ..

https://mail.google.com/mail/u/1/?tab=wm#search/Serious/148650fd829cd239.
But have not got any answer yet if this is a bug ( Block getting dropped
and Job failed).



Dib


On Mon, Sep 15, 2014 at 10:33 PM, Tim Smith secs...@gmail.com wrote:

 Hi Dibyendu,

 I am a little confused about the need for rate limiting input from
 kafka. If the stream coming in from kafka has higher message/second
 rate than what a Spark job can process then it should simply build a
 backlog in Spark if the RDDs are cached on disk using persist().
 Right?

 Thanks,

 Tim


 On Mon, Sep 15, 2014 at 4:33 AM, Dibyendu Bhattacharya
 dibyendu.bhattach...@gmail.com wrote:
  Hi Alon,
 
  No this will not be guarantee that same set of messages will come in same
  RDD. This fix just re-play the messages from last processed offset in
 same
  order. Again this is just a interim fix we needed to solve our use case
 . If
  you do not need this message re-play feature, just do not perform the
 ack (
  Acknowledgement) call in the Driver code. Then the processed messages
 will
  not be written to ZK and hence replay will not happen.
 
  Regards,
  Dibyendu
 
  On Mon, Sep 15, 2014 at 4:48 PM, Alon Pe'er alo...@supersonicads.com
  wrote:
 
  Hi Dibyendu,
 
  Thanks for your great work!
 
  I'm new to Spark Streaming, so I just want to make sure I understand
  Driver
  failure issue correctly.
 
  In my use case, I want to make sure that messages coming in from Kafka
 are
  always broken into the same set of RDDs, meaning that if a set of
 messages
  are assigned to one RDD, and the Driver dies before this RDD is
 processed,
  then once the Driver recovers, the same set of messages are assigned to
 a
  single RDD, instead of arbitrarily repartitioning the messages across
  different RDDs.
 
  Does your Receiver guarantee this behavior, until the problem is fixed
 in
  Spark 1.2?
 
  Regards,
  Alon
 
 
 
  --
  View this message in context:
 
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p14233.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Need help with ThriftServer/Spark1.1.0

2014-09-15 Thread Yana Kadiyska
Hi ladies and gents,

trying to get Thrift server up and running in an effort to replace Shark.

My first attempt to run sbin/start-thriftserver resulted in:

14/09/15 17:09:05 ERROR TThreadPoolServer: Error occurred during processing
of message.
java.lang.RuntimeException:
org.apache.thrift.transport.TTransportException: java.net.SocketException:
Connection reset
at
org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:219)
at
org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:189)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Caused by: org.apache.thrift.transport.TTransportException:
java.net.SocketException: Connection reset
at
org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:129)
at
org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
at
org.apache.thrift.transport.TSaslTransport.receiveSaslMessage(TSaslTransport.java:178)
at
org.apache.thrift.transport.TSaslServerTransport.handleSaslStartMessage(TSaslServerTransport.java:125)
at
org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:253)
at
org.apache.thrift.transport.TSaslServerTransport.open(TSaslServerTransport.java:41)
at
org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:216)
... 4 more
Caused by: java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:196)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:275)
at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
at
org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:127)

After turing logging levels up it seemed like this error is related to SASL
and the SO advice was to turn it off
via: 
propertynamehive.server2.authentication/namevalueNOSASL/value/property

But I still have no luck:
(this is the full command that gets run)

java -cp
/spark-1.1.0-bin-cdh4/conf:/spark-1.1.0-bin-cdh4/lib/spark-assembly-1.1.0-hadoop2.0.0-mr1-cdh4.2.0.jar:/spark-1.1.0-bin-cdh4/lib/datanucleus-core-3.2.2.jar:/spark-1.1.0-bin-cdh4/lib/datanucleus-rdbms-3.2.1.jar:/a/shark/spark-1.1.0-bin-cdh4/lib/datanucleus-api-jdo-3.2.1.jar:/hadoop/share/hadoop/mapreduce1//conf
-XX:MaxPermSize=128m-Xms4012m -Xmx4012m org.apache.spark.deploy.SparkSubmit
--class org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 --master
spark://master-ip:7077 spark-internal --hiveconf
hive.server.thrift.bind.host ip-to-bind

14/09/15 17:05:05 ERROR TThreadPoolServer:
Error occurred during processing of message.
java.lang.ClassCastException: org.apache.thrift.transport.TSocket cannot be
cast to org.apache.thrift.transport.TSaslServerTransport
at
org.apache.hive.service.auth.TUGIContainingProcessor.process(TUGIContainingProcessor.java:53)
at
org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:206)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)

Any idea what might be going on? I compiled w/ -Phive against the 1.1.0.
hive-site.conf is the conf file we used previously. SparkSQL does work for
me  but does not have  a lot of functionality I need.

Any help appreciated -- I do acklnowledge this is likely more of a Hive
question than spark...If there is a precompiled version of CDH4 that
includes thrift-server I'd be happy to try that too...

thanks again.


MLLib sparse vector

2014-09-15 Thread Sameer Tilak
Hi All,I have transformed the data into following format: First column is user 
id, and then all the other columns are class ids. For a user only class ids 
that appear in this row have value 1 and others are 0.  I need to crease a 
sparse vector from this. Does the API for creating a sparse vector that can 
directly support this format?  
User idProduct class ids
2622572 145447  162013421   28565   285556  293 455367261   130 
3646167118806   183576  328651715   57671   57476   
  

Example of Geoprocessing with Spark

2014-09-15 Thread Abel Coronado Iruegas
Here an example of a working code that takes a csv with lat lon points and
intersects with polygons of municipalities of Mexico, generating a new
version of the file with new attributes.

Do you think that could be improved?

Thanks.

The Code:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.geoscript.feature._
import org.geoscript.geometry._
import org.geoscript.geometry.builder._
import com.vividsolutions.jts._
import org.geoscript.layer.Shapefile
import org.geotools.feature.FeatureCollection
import java.text._
import java.util._

object SimpleApp {
def main(args: Array[String]){
val conf = new SparkConf().setAppName(Csv Clipper)
val sc = new SparkContext(conf)
val csvPath = hdfs://x01/user/acoronado/mov/movilidad.csv
//70 Millions of rows
val csv = sc.textFile(csvPath)
val clipPoints = csv.map({line: String =
   val Array(usuario, lat, lon,
date) = line.split(,).map(_.trim)
   val punto =
Point(lon.toDouble,lat.toDouble)
   val existe =
geoData.get.filter(f = f.geometry intersects punto) // Geospatial operation
   var cve_est = 0
   var cve_mun = 0
   var time = 0
   if(!existe.isEmpty){
  val f = existe.take(1)
  val ff = f.toList(0)
  cve_est =
ff.getAttribute(1).toString //State Code
  cve_mun =
ff.getAttribute(2).toString  // Municipality Code
  time = (new
SimpleDateFormat(-MM-dd'T'HH:mm:ss.SSSZ)).parse(date.replaceAll(Z$,
+)).getTime().toString()
   }

 line+,+time+,+cve_est+,+cve_mun
   })

clipPoints.coalesce(1,true).saveAsTextFile(hdfs://m01/user/acoronado/mov/mov_all.csv)
println(Spark Clip Exito!!!)
}
object geoData {
private val estatal =
Shapefile(/geoData/MunicipiosLatLon.shp) //This directory exist in all
the nodes.
private val estatalColl = estatal.getFeatures
def
get:FeatureCollection[org.geoscript.feature.Schema,org.geoscript.feature.Feature]
= estatalColl
}
}


Dealing with Time Series Data

2014-09-15 Thread Gary Malouf
I have a use case for our data in HDFS that involves sorting chunks of data
into time series format by a specific characteristic and doing computations
from that.  At large scale, what is the most efficient way to do this?
 Obviously, having the data sharded by that characteristic would make the
performance significantly better, but are there good tools Spark can do to
help us?


Re: How to initiate a shutdown of Spark Streaming context?

2014-09-15 Thread Jeoffrey Lim
What we did for gracefully shutting down the spark streaming context is
extend a Spark Web UI Tab and perform a
SparkContext.SparkUI.attachTab(custom web ui). However, the custom scala
Web UI extensions needs to be under the package org.apache.spark.ui to get
around with the package access restrictions.

Would it be possible that the SparkUI under SparkContext, and Spark Web UI
packages exposed as public so that developers may be able to add
customizations with their own tools?

Thanks!

On Tue, Sep 16, 2014 at 12:34 AM, stanley [via Apache Spark User List] 
ml-node+s1001560n14252...@n3.nabble.com wrote:

 Thank you.

 Would the following approaches to address this problem an overkills?

 a. create a ServerSocket in a different thread from the main thread that
 created the Spark StreamingContext, and listens to shutdown command there
 b. create a web service that wraps around the main thread that created the
 Spark StreamingContext, and responds to shutdown requests

 Does Spark Streaming already provide similar capabilities?

 Stanley

 --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-initiate-a-shutdown-of-Spark-Streaming-context-tp14092p14252.html
  To start a new topic under Apache Spark User List, email
 ml-node+s1001560n1...@n3.nabble.com
 To unsubscribe from Apache Spark User List, click here
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=amVvZmZyZXlsQGdtYWlsLmNvbXwxfDUzNTE3MDc2OQ==
 .
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-initiate-a-shutdown-of-Spark-Streaming-context-tp14092p14277.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: scala 2.11?

2014-09-15 Thread Matei Zaharia
Scala 2.11 work is under way in open pull requests though, so hopefully it will 
be in soon.

Matei

On September 15, 2014 at 9:48:42 AM, Mohit Jaggi (mohitja...@gmail.com) wrote:

ah...thanks!

On Mon, Sep 15, 2014 at 9:47 AM, Mark Hamstra m...@clearstorydata.com wrote:
No, not yet.  Spark SQL is using org.scalamacros:quasiquotes_2.10.

On Mon, Sep 15, 2014 at 9:28 AM, Mohit Jaggi mohitja...@gmail.com wrote:
Folks,
I understand Spark SQL uses quasiquotes. Does that mean Spark has now moved to 
Scala 2.11?

Mohit.




Re: scala 2.11?

2014-09-15 Thread Mark Hamstra
Are we going to put 2.11 support into 1.1 or 1.0?  Else will be in soon
applies to the master development branch, but actually in the Spark 1.2.0
release won't occur until the second half of November at the earliest.

On Mon, Sep 15, 2014 at 12:11 PM, Matei Zaharia matei.zaha...@gmail.com
wrote:

 Scala 2.11 work is under way in open pull requests though, so hopefully it
 will be in soon.

 Matei

 On September 15, 2014 at 9:48:42 AM, Mohit Jaggi (mohitja...@gmail.com)
 wrote:

 ah...thanks!

 On Mon, Sep 15, 2014 at 9:47 AM, Mark Hamstra m...@clearstorydata.com
 wrote:

 No, not yet.  Spark SQL is using org.scalamacros:quasiquotes_2.10.

 On Mon, Sep 15, 2014 at 9:28 AM, Mohit Jaggi mohitja...@gmail.com
 wrote:

 Folks,
 I understand Spark SQL uses quasiquotes. Does that mean Spark has now
 moved to Scala 2.11?

 Mohit.






Re: Write 1 RDD to multiple output paths in one go

2014-09-15 Thread Nicholas Chammas
Davies,

That’s pretty neat. I heard there was a pure Python clone of Spark out
there—so you were one of the people behind it!

I’ve created a JIRA issue about this. SPARK-3533: Add saveAsTextFileByKey()
method to RDDs https://issues.apache.org/jira/browse/SPARK-3533

Sean,

I think you might be able to get this working with a subclass of
MultipleTextOutputFormat, which overrides generateFileNameForKeyValue,
generateActualKey, etc. A bit of work for sure, but probably works.

I’m looking at how to make this work in PySpark as of 1.1.0. The closest
examples I can see of how to use the saveAsHadoop...() methods in this way
are these two examples: HBase Output Format
https://github.com/apache/spark/blob/cc14644460872efb344e8d895859d70213a40840/examples/src/main/python/hbase_outputformat.py#L60
and Avro Input Format
https://github.com/apache/spark/blob/cc14644460872efb344e8d895859d70213a40840/examples/src/main/python/avro_inputformat.py#L73

Basically, I’m thinking I need to subclass MultipleTextOutputFormat and
override some methods in a Scala file, and then reference that from Python?
Like how the AvroWrapperToJavaConverter class is done? Seems pretty
involved, but I’ll give it a shot if that’s the right direction to go in.

Nick
​

On Mon, Sep 15, 2014 at 1:08 PM, Davies Liu dav...@databricks.com wrote:

 Maybe we should provide an API like saveTextFilesByKey(path),
 could you create an JIRA for it ?

 There is one in DPark [1] actually.

 [1] https://github.com/douban/dpark/blob/master/dpark/rdd.py#L309

 On Mon, Sep 15, 2014 at 7:08 AM, Nicholas Chammas
 nicholas.cham...@gmail.com wrote:
  Any tips from anybody on how to do this in PySpark? (Or regular Spark,
 for
  that matter.)
 
  On Sat, Sep 13, 2014 at 1:25 PM, Nick Chammas 
 nicholas.cham...@gmail.com
  wrote:
 
  Howdy doody Spark Users,
 
  I’d like to somehow write out a single RDD to multiple paths in one go.
  Here’s an example.
 
  I have an RDD of (key, value) pairs like this:
 
   a = sc.parallelize(['Nick', 'Nancy', 'Bob', 'Ben',
   'Frankie']).keyBy(lambda x: x[0])
   a.collect()
  [('N', 'Nick'), ('N', 'Nancy'), ('B', 'Bob'), ('B', 'Ben'), ('F',
  'Frankie')]
 
  Now I want to write the RDD out to different paths depending on the
 keys,
  so that I have one output directory per distinct key. Each output
 directory
  could potentially have multiple part- files or whatever.
 
  So my output would be something like:
 
  /path/prefix/n [/part-1, /part-2, etc]
  /path/prefix/b [/part-1, /part-2, etc]
  /path/prefix/f [/part-1, /part-2, etc]
 
  How would you do that?
 
  I suspect I need to use saveAsNewAPIHadoopFile or saveAsHadoopFile along
  with the MultipleTextOutputFormat output format class, but I’m not sure
 how.
 
  By the way, there is a very similar question to this here on Stack
  Overflow.
 
  Nick
 
 
  
  View this message in context: Write 1 RDD to multiple output paths in
 one
  go
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 



Re: MLLib sparse vector

2014-09-15 Thread Chris Gore
Hi Sameer,

MLLib uses Breeze’s vector format under the hood.  You can use that.  
http://www.scalanlp.org/api/breeze/index.html#breeze.linalg.SparseVector

For example:

import breeze.linalg.{DenseVector = BDV, SparseVector = BSV, Vector = BV}

val numClasses = classes.distinct.count.toInt

val userWithClassesAsSparseVector = rows.map(x = (x.userID, new 
BSV[Double](x.classIDs.sortWith(_  _), 
Seq.fill(x.classIDs.length)(1.0).toArray, numClasses).asInstanceOf[BV[Double]]))

Chris

On Sep 15, 2014, at 11:28 AM, Sameer Tilak ssti...@live.com wrote:

 Hi All,
 I have transformed the data into following format: First column is user id, 
 and then all the other columns are class ids. For a user only class ids that 
 appear in this row have value 1 and others are 0.  I need to crease a sparse 
 vector from this. Does the API for creating a sparse vector that can directly 
 support this format?  
 
 User idProduct class ids
 
 2622572   145447  162013421   28565   285556  293 455367261   
 130 3646167118806   183576  328651715   57671   57476



Efficient way to sum multiple columns

2014-09-15 Thread jamborta
Hi all,

I have an RDD that contains around 50 columns. I need to sum each column,
which I am doing by running it through a for loop, creating an array and
running the sum function as follows:

for (i - 0 until 10) yield {
   data.map(x = x(i)).sum
}

is their a better way to do this?

thanks,




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Efficient-way-to-sum-multiple-columns-tp14281.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: PathFilter for newAPIHadoopFile?

2014-09-15 Thread Eric Friedman
That's a good idea and one I had considered too.  Unfortunately I'm not
aware of an API in PySpark for enumerating paths on HDFS.  Have I
overlooked one?

On Mon, Sep 15, 2014 at 10:01 AM, Davies Liu dav...@databricks.com wrote:

 In PySpark, I think you could enumerate all the valid files, and create
 RDD by
 newAPIHadoopFile(), then union them together.

 On Mon, Sep 15, 2014 at 5:49 AM, Eric Friedman
 eric.d.fried...@gmail.com wrote:
  I neglected to specify that I'm using pyspark. Doesn't look like these
 APIs have been bridged.
 
  
  Eric Friedman
 
  On Sep 14, 2014, at 11:02 PM, Nat Padmanabhan reachn...@gmail.com
 wrote:
 
  Hi Eric,
 
  Something along the lines of the following should work
 
  val fs = getFileSystem(...) // standard hadoop API call
  val filteredConcatenatedPaths = fs.listStatus(topLevelDirPath,
  pathFilter).map(_.getPath.toString).mkString(,)  // pathFilter is an
  instance of org.apache.hadoop.fs.PathFilter
  val parquetRdd = sc.hadoopFile(filteredConcatenatedPaths,
  classOf[ParquetInputFormat[Something]], classOf[Void],
  classOf[SomeAvroType], getConfiguration(...))
 
  You have to do some initializations on ParquetInputFormat such as
  AvroReadSetup/AvroWriteSupport etc but that you should be doing
  already I am guessing.
 
  Cheers,
  Nat
 
 
  On Sun, Sep 14, 2014 at 7:37 PM, Eric Friedman
  eric.d.fried...@gmail.com wrote:
  Hi,
 
  I have a directory structure with parquet+avro data in it. There are a
  couple of administrative files (.foo and/or _foo) that I need to
 ignore when
  processing this data or Spark tries to read them as containing parquet
  content, which they do not.
 
  How can I set a PathFilter on the FileInputFormat used to construct an
 RDD?
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 



Re: Efficient way to sum multiple columns

2014-09-15 Thread Xiangrui Meng
Please check the colStats method defined under mllib.stat.Statistics. -Xiangrui

On Mon, Sep 15, 2014 at 1:00 PM, jamborta jambo...@gmail.com wrote:
 Hi all,

 I have an RDD that contains around 50 columns. I need to sum each column,
 which I am doing by running it through a for loop, creating an array and
 running the sum function as follows:

 for (i - 0 until 10) yield {
data.map(x = x(i)).sum
 }

 is their a better way to do this?

 thanks,




 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Efficient-way-to-sum-multiple-columns-tp14281.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming union expected behaviour?

2014-09-15 Thread Varad Joshi
I am seeing the same exact behavior. Shrikar, did you get any response to
your post?

Varad



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-union-expected-behaviour-tp7206p14284.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



minPartitions for non-text files?

2014-09-15 Thread Eric Friedman
sc.textFile takes a minimum # of partitions to use.

is there a way to get sc.newAPIHadoopFile to do the same?

I know I can repartition() and get a shuffle.  I'm wondering if there's a
way to tell the underlying InputFormat (AvroParquet, in my case) how many
partitions to use at the outset.

What I'd really prefer is to get the partitions automatically defined based
on the number of blocks.


Re: minPartitions for non-text files?

2014-09-15 Thread Sean Owen
I think the reason is simply that there is no longer an explicit
min-partitions argument for Hadoop InputSplits in the new Hadoop APIs.
At least, I didn't see it when I glanced just now.

However, you should be able to get the same effect by setting a
Configuration property, and you can do so through the newAPIHadoopFile
method. You set it as a suggested maximum split size rather than
suggest minimum number of splits.

Although I think the old config property mapred.max.split.size is
still respected, you may try
mapreduce.input.fileinputformat.split.maxsize instead, which appears
to be the intended replacement in the new APIs.

On Mon, Sep 15, 2014 at 9:35 PM, Eric Friedman
eric.d.fried...@gmail.com wrote:
 sc.textFile takes a minimum # of partitions to use.

 is there a way to get sc.newAPIHadoopFile to do the same?

 I know I can repartition() and get a shuffle.  I'm wondering if there's a
 way to tell the underlying InputFormat (AvroParquet, in my case) how many
 partitions to use at the outset.

 What I'd really prefer is to get the partitions automatically defined based
 on the number of blocks.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Weird aggregation results when reusing objects inside reduceByKey

2014-09-15 Thread kriskalish
I have a pretty simple scala spark aggregation job that is summing up number
of occurrences of two types of events. I have run into situations where it
seems to generate bad values that are clearly incorrect after reviewing the
raw data. 

First I have a Record object which I use to do my aggregation: 

class Record (val PrimaryId: Int,
  val SubId: Int,
  var Event1Count: Int,
  var Event2Count: Int) extends Serializable  {
}

Then once I have an RDD I do a reduce by key:

val allAgr = all.map(x = (s${x.PrimaryId}-${x.SubId}, x)).reduceByKey
{ (l, r) =
  l.Event1Count= l.Event1Count+ r.Event1Count
  l.Event2Count= l.Event2Count+ r.Event2Count
  l
}.map(x = x._2)

The problem is that for some scenarios I get about 16 billion back for
Event1Count, but the value of Event2Count looks fine. If I refactor my
reduce by key function to actually produce a new object, it seems to work:

val allAgr = all.map(x = (s${x.PrimaryId}-${x.SubId}, x)).reduceByKey
{ (l, r) =
  val n = new Record(l.PrimaryId, l.SubId, 0, 0 )
  n.Event1Count= l.Event1Count+ r.Event1Count
  n.Event2Count= l.Event2Count+ r.Event2Count
  n
}.map(x = x._2)


This second option is clearly the safer way to go since there is no chance
for changing values via reference. However, it doesn't make sense to me that
this should fix it as in map reduce a once a object is reduced, it should
never be reduced again (otherwise double-counting would happen).

I dug into the source a little:

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Aggregator.scala


I didn't really see any obvious redflags and admittedly it is beyond my
comprehension.

Any ideas?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Weird-aggregation-results-when-reusing-objects-inside-reduceByKey-tp14287.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: vertex active/inactive feature in Pregel API ?

2014-09-15 Thread Ankur Dave
At 2014-09-15 16:25:04 +0200, Yifan LI iamyifa...@gmail.com wrote:
 I am wondering if the vertex active/inactive(corresponding the change of its 
 value between two supersteps) feature is introduced in Pregel API of GraphX?

Vertex activeness in Pregel is controlled by messages: if a vertex did not 
receive a message in the previous iteration, its vertex program will not run in 
the current iteration. Also, inactive vertices will not be able to send 
messages because by default the sendMsg function will only be run on edges 
where at least one of the adjacent vertices received a message. You can change 
this behavior -- see the documentation for the activeDirection parameter to 
Pregel.apply [1].

There is also an open pull request to make active vertex tracking more explicit 
by allowing vertices to vote to halt directly [2].

Ankur

[1] 
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Pregel$
[2] https://github.com/apache/spark/pull/1217

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Weird aggregation results when reusing objects inside reduceByKey

2014-09-15 Thread Sean Owen
It isn't a question of an item being reduced twice, but of when
objects may be reused to represent other items.

I don't think you have a guarantee that you can safely reuse the
objects in this argument, but I'd also be interested if there was a
case where this is guaranteed.

For example I'm guessing this does work if you foldByKey() and supply
your own starting value?

On Mon, Sep 15, 2014 at 9:58 PM, kriskalish k...@kalish.net wrote:
 I have a pretty simple scala spark aggregation job that is summing up number
 of occurrences of two types of events. I have run into situations where it
 seems to generate bad values that are clearly incorrect after reviewing the
 raw data.

 First I have a Record object which I use to do my aggregation:

 class Record (val PrimaryId: Int,
   val SubId: Int,
   var Event1Count: Int,
   var Event2Count: Int) extends Serializable  {
 }

 Then once I have an RDD I do a reduce by key:

 val allAgr = all.map(x = (s${x.PrimaryId}-${x.SubId}, x)).reduceByKey
 { (l, r) =
   l.Event1Count= l.Event1Count+ r.Event1Count
   l.Event2Count= l.Event2Count+ r.Event2Count
   l
 }.map(x = x._2)

 The problem is that for some scenarios I get about 16 billion back for
 Event1Count, but the value of Event2Count looks fine. If I refactor my
 reduce by key function to actually produce a new object, it seems to work:

 val allAgr = all.map(x = (s${x.PrimaryId}-${x.SubId}, x)).reduceByKey
 { (l, r) =
   val n = new Record(l.PrimaryId, l.SubId, 0, 0 )
   n.Event1Count= l.Event1Count+ r.Event1Count
   n.Event2Count= l.Event2Count+ r.Event2Count
   n
 }.map(x = x._2)


 This second option is clearly the safer way to go since there is no chance
 for changing values via reference. However, it doesn't make sense to me that
 this should fix it as in map reduce a once a object is reduced, it should
 never be reduced again (otherwise double-counting would happen).

 I dug into the source a little:

 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Aggregator.scala


 I didn't really see any obvious redflags and admittedly it is beyond my
 comprehension.

 Any ideas?




 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Weird-aggregation-results-when-reusing-objects-inside-reduceByKey-tp14287.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: PathFilter for newAPIHadoopFile?

2014-09-15 Thread Davies Liu
There is one way by do it in bash: hadoop fs -ls , maybe you could
end up with a bash scripts to do the things.

On Mon, Sep 15, 2014 at 1:01 PM, Eric Friedman
eric.d.fried...@gmail.com wrote:
 That's a good idea and one I had considered too.  Unfortunately I'm not
 aware of an API in PySpark for enumerating paths on HDFS.  Have I overlooked
 one?

 On Mon, Sep 15, 2014 at 10:01 AM, Davies Liu dav...@databricks.com wrote:

 In PySpark, I think you could enumerate all the valid files, and create
 RDD by
 newAPIHadoopFile(), then union them together.

 On Mon, Sep 15, 2014 at 5:49 AM, Eric Friedman
 eric.d.fried...@gmail.com wrote:
  I neglected to specify that I'm using pyspark. Doesn't look like these
  APIs have been bridged.
 
  
  Eric Friedman
 
  On Sep 14, 2014, at 11:02 PM, Nat Padmanabhan reachn...@gmail.com
  wrote:
 
  Hi Eric,
 
  Something along the lines of the following should work
 
  val fs = getFileSystem(...) // standard hadoop API call
  val filteredConcatenatedPaths = fs.listStatus(topLevelDirPath,
  pathFilter).map(_.getPath.toString).mkString(,)  // pathFilter is an
  instance of org.apache.hadoop.fs.PathFilter
  val parquetRdd = sc.hadoopFile(filteredConcatenatedPaths,
  classOf[ParquetInputFormat[Something]], classOf[Void],
  classOf[SomeAvroType], getConfiguration(...))
 
  You have to do some initializations on ParquetInputFormat such as
  AvroReadSetup/AvroWriteSupport etc but that you should be doing
  already I am guessing.
 
  Cheers,
  Nat
 
 
  On Sun, Sep 14, 2014 at 7:37 PM, Eric Friedman
  eric.d.fried...@gmail.com wrote:
  Hi,
 
  I have a directory structure with parquet+avro data in it. There are a
  couple of administrative files (.foo and/or _foo) that I need to
  ignore when
  processing this data or Spark tries to read them as containing parquet
  content, which they do not.
 
  How can I set a PathFilter on the FileInputFormat used to construct an
  RDD?
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: PathFilter for newAPIHadoopFile?

2014-09-15 Thread Davies Liu
Or maybe you could give this one a try:
https://labs.spotify.com/2013/05/07/snakebite/

On Mon, Sep 15, 2014 at 2:51 PM, Davies Liu dav...@databricks.com wrote:
 There is one way by do it in bash: hadoop fs -ls , maybe you could
 end up with a bash scripts to do the things.

 On Mon, Sep 15, 2014 at 1:01 PM, Eric Friedman
 eric.d.fried...@gmail.com wrote:
 That's a good idea and one I had considered too.  Unfortunately I'm not
 aware of an API in PySpark for enumerating paths on HDFS.  Have I overlooked
 one?

 On Mon, Sep 15, 2014 at 10:01 AM, Davies Liu dav...@databricks.com wrote:

 In PySpark, I think you could enumerate all the valid files, and create
 RDD by
 newAPIHadoopFile(), then union them together.

 On Mon, Sep 15, 2014 at 5:49 AM, Eric Friedman
 eric.d.fried...@gmail.com wrote:
  I neglected to specify that I'm using pyspark. Doesn't look like these
  APIs have been bridged.
 
  
  Eric Friedman
 
  On Sep 14, 2014, at 11:02 PM, Nat Padmanabhan reachn...@gmail.com
  wrote:
 
  Hi Eric,
 
  Something along the lines of the following should work
 
  val fs = getFileSystem(...) // standard hadoop API call
  val filteredConcatenatedPaths = fs.listStatus(topLevelDirPath,
  pathFilter).map(_.getPath.toString).mkString(,)  // pathFilter is an
  instance of org.apache.hadoop.fs.PathFilter
  val parquetRdd = sc.hadoopFile(filteredConcatenatedPaths,
  classOf[ParquetInputFormat[Something]], classOf[Void],
  classOf[SomeAvroType], getConfiguration(...))
 
  You have to do some initializations on ParquetInputFormat such as
  AvroReadSetup/AvroWriteSupport etc but that you should be doing
  already I am guessing.
 
  Cheers,
  Nat
 
 
  On Sun, Sep 14, 2014 at 7:37 PM, Eric Friedman
  eric.d.fried...@gmail.com wrote:
  Hi,
 
  I have a directory structure with parquet+avro data in it. There are a
  couple of administrative files (.foo and/or _foo) that I need to
  ignore when
  processing this data or Spark tries to read them as containing parquet
  content, which they do not.
 
  How can I set a PathFilter on the FileInputFormat used to construct an
  RDD?
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Define the name of the outputs with Java-Spark.

2014-09-15 Thread Xiangrui Meng
Spark doesn't support MultipleOutput at this time. You can cache the
parent RDD. Then create RDDs from it and save them separately.
-Xiangrui

On Fri, Sep 12, 2014 at 7:45 AM, Guillermo Ortiz konstt2...@gmail.com wrote:

 I would like to define the names of my output in Spark, I have a process
 which write many fails and I would like to name them, is it possible? I
 guess that it's not possible with saveAsText method.

 It would be something similar to the MultipleOutput of Hadoop.


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Accuracy hit in classification with Spark

2014-09-15 Thread Xiangrui Meng
Thanks for the update! -Xiangrui

On Sun, Sep 14, 2014 at 11:33 PM, jatinpreet jatinpr...@gmail.com wrote:
 Hi,

 I have been able to get the same accuracy with MLlib as Mahout's. The
 pre-processing phase of Mahout was the reason  behind the accuracy mismatch.
 After studying and applying the same logic in my code, it worked like a
 charm.

 Thanks,
 Jatin



 -
 Novice Big Data Programmer
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Accuracy-hit-in-classification-with-Spark-tp13773p14221.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: minPartitions for non-text files?

2014-09-15 Thread Eric Friedman
That would be awesome, but doesn't seem to have any effect.

In PySpark, I created a dict with that key and a numeric value, then passed
it into newAPIHadoopFile as a value for the conf keyword.  The returned
RDD still has a single partition.

On Mon, Sep 15, 2014 at 1:56 PM, Sean Owen so...@cloudera.com wrote:

 I think the reason is simply that there is no longer an explicit
 min-partitions argument for Hadoop InputSplits in the new Hadoop APIs.
 At least, I didn't see it when I glanced just now.

 However, you should be able to get the same effect by setting a
 Configuration property, and you can do so through the newAPIHadoopFile
 method. You set it as a suggested maximum split size rather than
 suggest minimum number of splits.

 Although I think the old config property mapred.max.split.size is
 still respected, you may try
 mapreduce.input.fileinputformat.split.maxsize instead, which appears
 to be the intended replacement in the new APIs.

 On Mon, Sep 15, 2014 at 9:35 PM, Eric Friedman
 eric.d.fried...@gmail.com wrote:
  sc.textFile takes a minimum # of partitions to use.
 
  is there a way to get sc.newAPIHadoopFile to do the same?
 
  I know I can repartition() and get a shuffle.  I'm wondering if there's a
  way to tell the underlying InputFormat (AvroParquet, in my case) how many
  partitions to use at the outset.
 
  What I'd really prefer is to get the partitions automatically defined
 based
  on the number of blocks.



Re: MLLib sparse vector

2014-09-15 Thread Xiangrui Meng
Or you can use the factory method `Vectors.sparse`:

val sv = Vectors.sparse(numProducts, productIds.map(x = (x, 1.0)))

where numProducts should be the largest product id plus one.

Best,
Xiangrui

On Mon, Sep 15, 2014 at 12:46 PM, Chris Gore cdg...@cdgore.com wrote:
 Hi Sameer,

 MLLib uses Breeze’s vector format under the hood.  You can use that.
 http://www.scalanlp.org/api/breeze/index.html#breeze.linalg.SparseVector

 For example:

 import breeze.linalg.{DenseVector = BDV, SparseVector = BSV, Vector = BV}

 val numClasses = classes.distinct.count.toInt

 val userWithClassesAsSparseVector = rows.map(x = (x.userID, new
 BSV[Double](x.classIDs.sortWith(_  _),
 Seq.fill(x.classIDs.length)(1.0).toArray,
 numClasses).asInstanceOf[BV[Double]]))

 Chris

 On Sep 15, 2014, at 11:28 AM, Sameer Tilak ssti...@live.com wrote:

 Hi All,
 I have transformed the data into following format: First column is user id,
 and then all the other columns are class ids. For a user only class ids that
 appear in this row have value 1 and others are 0.  I need to crease a sparse
 vector from this. Does the API for creating a sparse vector that can
 directly support this format?

 User idProduct class ids

 2622572 145447 1620 13421 28565 285556 293 4553 67261 130 3646 1671 18806
 183576 3286 51715 57671 57476



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: minPartitions for non-text files?

2014-09-15 Thread Sean Owen
Heh, it's still just a suggestion to Hadoop I guess, not guaranteed.

Is it a splittable format? for example, some compressed formats are
not splittable and Hadoop has to process whole files at a time.

I'm also not sure if this is something to do with pyspark, since the
underlying Scala API takes a Configuration object rather than
dictionary.

On Mon, Sep 15, 2014 at 11:23 PM, Eric Friedman
eric.d.fried...@gmail.com wrote:
 That would be awesome, but doesn't seem to have any effect.

 In PySpark, I created a dict with that key and a numeric value, then passed
 it into newAPIHadoopFile as a value for the conf keyword.  The returned
 RDD still has a single partition.

 On Mon, Sep 15, 2014 at 1:56 PM, Sean Owen so...@cloudera.com wrote:

 I think the reason is simply that there is no longer an explicit
 min-partitions argument for Hadoop InputSplits in the new Hadoop APIs.
 At least, I didn't see it when I glanced just now.

 However, you should be able to get the same effect by setting a
 Configuration property, and you can do so through the newAPIHadoopFile
 method. You set it as a suggested maximum split size rather than
 suggest minimum number of splits.

 Although I think the old config property mapred.max.split.size is
 still respected, you may try
 mapreduce.input.fileinputformat.split.maxsize instead, which appears
 to be the intended replacement in the new APIs.

 On Mon, Sep 15, 2014 at 9:35 PM, Eric Friedman
 eric.d.fried...@gmail.com wrote:
  sc.textFile takes a minimum # of partitions to use.
 
  is there a way to get sc.newAPIHadoopFile to do the same?
 
  I know I can repartition() and get a shuffle.  I'm wondering if there's
  a
  way to tell the underlying InputFormat (AvroParquet, in my case) how
  many
  partitions to use at the outset.
 
  What I'd really prefer is to get the partitions automatically defined
  based
  on the number of blocks.



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Does Spark always wait for stragglers to finish running?

2014-09-15 Thread Pramod Biligiri
Hi,
I'm running Spark tasks with speculation enabled. I'm noticing that Spark
seems to wait in a given stage for all stragglers to finish, even though
the speculated alternative might have finished sooner. Is that correct?

Is there a way to indicate to Spark not to wait for stragglers to finish?

Thanks,
Pramod
-- 
http://twitter.com/pramodbiligiri


Invalid signature file digest for Manifest main attributes with spark job built using maven

2014-09-15 Thread kpeng1
Hi All,

I am trying to submit a spark job that I have built in maven using the
following command:
/usr/bin/spark-submit --deploy-mode client --class com.spark.TheMain
--master local[1] /home/cloudera/myjar.jar 100

But I seem to be getting the following error:
Exception in thread main java.lang.SecurityException: Invalid signature
file digest for Manifest main attributes
at
sun.security.util.SignatureFileVerifier.processImpl(SignatureFileVerifier.java:286)
at
sun.security.util.SignatureFileVerifier.process(SignatureFileVerifier.java:239)
at java.util.jar.JarVerifier.processEntry(JarVerifier.java:307)
at java.util.jar.JarVerifier.update(JarVerifier.java:218)
at java.util.jar.JarFile.initializeVerifier(JarFile.java:345)
at java.util.jar.JarFile.getInputStream(JarFile.java:412)
at 
sun.misc.URLClassPath$JarLoader$2.getInputStream(URLClassPath.java:775)
at sun.misc.Resource.cachedInputStream(Resource.java:77)
at sun.misc.Resource.getByteBuffer(Resource.java:160)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:436)
at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:270)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:289)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


Here is the pom file I am using to build the jar:
project xmlns=http://maven.apache.org/POM/4.0.0;
xmlns:xsi=http://www.w3.org/2001/XMLSchema-instance;
xsi:schemaLocation=http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd;
  modelVersion4.0.0/modelVersion
  groupIdcom.spark/groupId
  artifactIdmyjar/artifactId
  version0.0.1-SNAPSHOT/version
  name${project.artifactId}/name
  descriptionMy wonderfull scala app/description
  inceptionYear2010/inceptionYear
  licenses
license
  nameMy License/name
  urlhttp:///url
  distributionrepo/distribution
/license
  /licenses

  properties
cdh.versioncdh5.1.0/cdh.version
maven.compiler.source1.6/maven.compiler.source
maven.compiler.target1.6/maven.compiler.target
encodingUTF-8/encoding
scala.tools.version2.10/scala.tools.version
scala.version2.10.4/scala.version
  /properties

  repositories
repository
  idscala-tools.org/id
  nameScala-tools Maven2 Repository/name
  urlhttps://oss.sonatype.org/content/repositories/snapshots//url
/repository
repository
  idmaven-hadoop/id
  nameHadoop Releases/name
 
urlhttps://repository.cloudera.com/content/repositories/releases//url
/repository
repository
  idcloudera-repos/id
  nameCloudera Repos/name
  urlhttps://repository.cloudera.com/artifactory/cloudera-repos//url
/repository
  /repositories
  pluginRepositories
pluginRepository
  idscala-tools.org/id
  nameScala-tools Maven2 Repository/name
  urlhttps://oss.sonatype.org/content/repositories/snapshots//url
/pluginRepository
  /pluginRepositories

  dependencies
dependency
  groupIdorg.scala-lang/groupId
  artifactIdscala-library/artifactId
  version${scala.version}/version
/dependency
dependency
  groupIdorg.apache.spark/groupId
  artifactIdspark-core_2.10/artifactId
  version1.0.0-${cdh.version}/version
/dependency
dependency
  groupIdorg.apache.spark/groupId
  artifactIdspark-tools_2.10/artifactId
  version1.0.0-${cdh.version}/version
/dependency
dependency
  groupIdorg.apache.spark/groupId
  artifactIdspark-streaming-flume_2.10/artifactId
  version1.0.0-${cdh.version}/version
/dependency
dependency
  groupIdorg.apache.spark/groupId
  artifactIdspark-streaming_2.10/artifactId
  version1.0.0-${cdh.version}/version
/dependency
dependency
  groupIdorg.apache.flume/groupId
  artifactIdflume-ng-sdk/artifactId
  version1.5.0-${cdh.version}/version
  
  exclusions
exclusion
  groupIdio.netty/groupId
  artifactIdnetty/artifactId
/exclusion
  /exclusions
/dependency
dependency
  groupIdorg.apache.flume/groupId
  artifactIdflume-ng-core/artifactId
  version1.5.0-${cdh.version}/version
  
  exclusions
exclusion
  groupIdio.netty/groupId
  artifactIdnetty/artifactId
/exclusion
  /exclusions
/dependency
dependency
  

Re: Invalid signature file digest for Manifest main attributes with spark job built using maven

2014-09-15 Thread Sean Owen
This is more of a Java / Maven issue than Spark per se. I would use
the shade plugin to remove signature files in your final META-INF/
dir. As Spark does, in its configuration:

filters
  filter
artifact*:*/artifact
excludes
  excludeorg/datanucleus/**/exclude
  excludeMETA-INF/*.SF/exclude
  excludeMETA-INF/*.DSA/exclude
  excludeMETA-INF/*.RSA/exclude
/excludes
  /filter
/filters

On Mon, Sep 15, 2014 at 11:33 PM, kpeng1 kpe...@gmail.com wrote:
 Hi All,

 I am trying to submit a spark job that I have built in maven using the
 following command:
 /usr/bin/spark-submit --deploy-mode client --class com.spark.TheMain
 --master local[1] /home/cloudera/myjar.jar 100

 But I seem to be getting the following error:
 Exception in thread main java.lang.SecurityException: Invalid signature
 file digest for Manifest main attributes
 at
 sun.security.util.SignatureFileVerifier.processImpl(SignatureFileVerifier.java:286)
 at
 sun.security.util.SignatureFileVerifier.process(SignatureFileVerifier.java:239)
 at java.util.jar.JarVerifier.processEntry(JarVerifier.java:307)
 at java.util.jar.JarVerifier.update(JarVerifier.java:218)
 at java.util.jar.JarFile.initializeVerifier(JarFile.java:345)
 at java.util.jar.JarFile.getInputStream(JarFile.java:412)
 at 
 sun.misc.URLClassPath$JarLoader$2.getInputStream(URLClassPath.java:775)
 at sun.misc.Resource.cachedInputStream(Resource.java:77)
 at sun.misc.Resource.getByteBuffer(Resource.java:160)
 at java.net.URLClassLoader.defineClass(URLClassLoader.java:436)
 at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:270)
 at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:289)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


 Here is the pom file I am using to build the jar:
 project xmlns=http://maven.apache.org/POM/4.0.0;
 xmlns:xsi=http://www.w3.org/2001/XMLSchema-instance;
 xsi:schemaLocation=http://maven.apache.org/POM/4.0.0
 http://maven.apache.org/maven-v4_0_0.xsd;
   modelVersion4.0.0/modelVersion
   groupIdcom.spark/groupId
   artifactIdmyjar/artifactId
   version0.0.1-SNAPSHOT/version
   name${project.artifactId}/name
   descriptionMy wonderfull scala app/description
   inceptionYear2010/inceptionYear
   licenses
 license
   nameMy License/name
   urlhttp:///url
   distributionrepo/distribution
 /license
   /licenses

   properties
 cdh.versioncdh5.1.0/cdh.version
 maven.compiler.source1.6/maven.compiler.source
 maven.compiler.target1.6/maven.compiler.target
 encodingUTF-8/encoding
 scala.tools.version2.10/scala.tools.version
 scala.version2.10.4/scala.version
   /properties

   repositories
 repository
   idscala-tools.org/id
   nameScala-tools Maven2 Repository/name
   urlhttps://oss.sonatype.org/content/repositories/snapshots//url
 /repository
 repository
   idmaven-hadoop/id
   nameHadoop Releases/name

 urlhttps://repository.cloudera.com/content/repositories/releases//url
 /repository
 repository
   idcloudera-repos/id
   nameCloudera Repos/name
   urlhttps://repository.cloudera.com/artifactory/cloudera-repos//url
 /repository
   /repositories
   pluginRepositories
 pluginRepository
   idscala-tools.org/id
   nameScala-tools Maven2 Repository/name
   urlhttps://oss.sonatype.org/content/repositories/snapshots//url
 /pluginRepository
   /pluginRepositories

   dependencies
 dependency
   groupIdorg.scala-lang/groupId
   artifactIdscala-library/artifactId
   version${scala.version}/version
 /dependency
 dependency
   groupIdorg.apache.spark/groupId
   artifactIdspark-core_2.10/artifactId
   version1.0.0-${cdh.version}/version
 /dependency
 dependency
   groupIdorg.apache.spark/groupId
   artifactIdspark-tools_2.10/artifactId
   version1.0.0-${cdh.version}/version
 /dependency
 dependency
   groupIdorg.apache.spark/groupId
   artifactIdspark-streaming-flume_2.10/artifactId
   version1.0.0-${cdh.version}/version
 /dependency
 dependency
   groupIdorg.apache.spark/groupId
   artifactIdspark-streaming_2.10/artifactId
   version1.0.0-${cdh.version}/version
 /dependency
 dependency
   

Re: MLLib sparse vector

2014-09-15 Thread Chris Gore
Probably worth noting that the factory methods in mllib create an object of 
type org.apache.spark.mllib.linalg.Vector which stores data in a similar format 
as Breeze vectors

Chris

On Sep 15, 2014, at 3:24 PM, Xiangrui Meng men...@gmail.com wrote:

 Or you can use the factory method `Vectors.sparse`:
 
 val sv = Vectors.sparse(numProducts, productIds.map(x = (x, 1.0)))
 
 where numProducts should be the largest product id plus one.
 
 Best,
 Xiangrui
 
 On Mon, Sep 15, 2014 at 12:46 PM, Chris Gore cdg...@cdgore.com wrote:
 Hi Sameer,
 
 MLLib uses Breeze’s vector format under the hood.  You can use that.
 http://www.scalanlp.org/api/breeze/index.html#breeze.linalg.SparseVector
 
 For example:
 
 import breeze.linalg.{DenseVector = BDV, SparseVector = BSV, Vector = BV}
 
 val numClasses = classes.distinct.count.toInt
 
 val userWithClassesAsSparseVector = rows.map(x = (x.userID, new
 BSV[Double](x.classIDs.sortWith(_  _),
 Seq.fill(x.classIDs.length)(1.0).toArray,
 numClasses).asInstanceOf[BV[Double]]))
 
 Chris
 
 On Sep 15, 2014, at 11:28 AM, Sameer Tilak ssti...@live.com wrote:
 
 Hi All,
 I have transformed the data into following format: First column is user id,
 and then all the other columns are class ids. For a user only class ids that
 appear in this row have value 1 and others are 0.  I need to crease a sparse
 vector from this. Does the API for creating a sparse vector that can
 directly support this format?
 
 User idProduct class ids
 
 2622572 145447 1620 13421 28565 285556 293 4553 67261 130 3646 1671 18806
 183576 3286 51715 57671 57476
 
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Convert GraphX Graph to Sparse Matrix

2014-09-15 Thread crockpotveggies
Hi everyone,

I'm looking to implement Markov algorithms in GraphX and I'm wondering if
it's already possible to auto-convert the Graph into a Sparse Double Matrix?
I've seen this implemented in other graphs before, namely JUNG, but still
familiarizing myself with GraphX. Example:
https://code.google.com/p/jung/source/browse/branches/guava/jung/jung-algorithms/src/main/java/edu/uci/ics/jung/algorithms/matrix/GraphMatrixOperations.java#181

This is specifically for doing operations such as Mean First Passage Time
calculations. If it doesn't yet exist, are the Matrices implemented in MLLib
going to be enough for this? Or will I need to go the way of Breeze or Colt?

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Convert-GraphX-Graph-to-Sparse-Matrix-tp14303.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Does Spark always wait for stragglers to finish running?

2014-09-15 Thread Du Li
There is a parameter spark.speculation that is turned off by default. Look at 
the configuration doc: http://spark.apache.org/docs/latest/configuration.html



From: Pramod Biligiri 
pramodbilig...@gmail.commailto:pramodbilig...@gmail.com
Date: Monday, September 15, 2014 at 3:30 PM
To: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Does Spark always wait for stragglers to finish running?

Hi,
I'm running Spark tasks with speculation enabled. I'm noticing that Spark seems 
to wait in a given stage for all stragglers to finish, even though the 
speculated alternative might have finished sooner. Is that correct?

Is there a way to indicate to Spark not to wait for stragglers to finish?

Thanks,
Pramod
--
http://twitter.com/pramodbiligiri


Re: minPartitions for non-text files?

2014-09-15 Thread Eric Friedman
Yes, it's AvroParquetInputFormat, which is splittable.  If I force a
repartitioning, it works. If I don't, spark chokes on my not-terribly-large
250Mb files.

PySpark's documentation says that the dictionary is turned into a
Configuration object.

@param conf: Hadoop configuration, passed in as a dict (None by default)

On Mon, Sep 15, 2014 at 3:26 PM, Sean Owen so...@cloudera.com wrote:

 Heh, it's still just a suggestion to Hadoop I guess, not guaranteed.

 Is it a splittable format? for example, some compressed formats are
 not splittable and Hadoop has to process whole files at a time.

 I'm also not sure if this is something to do with pyspark, since the
 underlying Scala API takes a Configuration object rather than
 dictionary.

 On Mon, Sep 15, 2014 at 11:23 PM, Eric Friedman
 eric.d.fried...@gmail.com wrote:
  That would be awesome, but doesn't seem to have any effect.
 
  In PySpark, I created a dict with that key and a numeric value, then
 passed
  it into newAPIHadoopFile as a value for the conf keyword.  The returned
  RDD still has a single partition.
 
  On Mon, Sep 15, 2014 at 1:56 PM, Sean Owen so...@cloudera.com wrote:
 
  I think the reason is simply that there is no longer an explicit
  min-partitions argument for Hadoop InputSplits in the new Hadoop APIs.
  At least, I didn't see it when I glanced just now.
 
  However, you should be able to get the same effect by setting a
  Configuration property, and you can do so through the newAPIHadoopFile
  method. You set it as a suggested maximum split size rather than
  suggest minimum number of splits.
 
  Although I think the old config property mapred.max.split.size is
  still respected, you may try
  mapreduce.input.fileinputformat.split.maxsize instead, which appears
  to be the intended replacement in the new APIs.
 
  On Mon, Sep 15, 2014 at 9:35 PM, Eric Friedman
  eric.d.fried...@gmail.com wrote:
   sc.textFile takes a minimum # of partitions to use.
  
   is there a way to get sc.newAPIHadoopFile to do the same?
  
   I know I can repartition() and get a shuffle.  I'm wondering if
 there's
   a
   way to tell the underlying InputFormat (AvroParquet, in my case) how
   many
   partitions to use at the outset.
  
   What I'd really prefer is to get the partitions automatically defined
   based
   on the number of blocks.
 
 



apply at Option.scala:120 callback in Spark 1.1, but no user code involved?

2014-09-15 Thread John Salvatier
In Spark 1.1, I'm seeing tasks with callbacks that don't involve my code at
all!
I'd seen something like this before in 1.0.0, but the behavior seems to be
back

apply at Option.scala:120
http://localhost:4040/stages/stage?id=52attempt=0

org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
scala.Option.getOrElse(Option.scala:120)
org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
org.apache.spark.rdd.FilteredRDD.getPartitions(FilteredRDD.scala:29)
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
scala.Option.getOrElse(Option.scala:120)
org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
scala.Option.getOrElse(Option.scala:120)
org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
org.apache.spark.rdd.FilteredRDD.getPartitions(FilteredRDD.scala:29)
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
scala.Option.getOrElse(Option.scala:120)
org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)

Ideas on what might be going on?


Re: Does Spark always wait for stragglers to finish running?

2014-09-15 Thread Pramod Biligiri
I'm already running with speculation set to true and the speculated tasks
are launching, but the issue I'm observing is that Spark does not kill the
long running task even if the shorter alternative has finished
successfully. Therefore the overall turnaround time is still the same as
without speculation.

Pramod

On Mon, Sep 15, 2014 at 4:22 PM, Du Li l...@yahoo-inc.com wrote:

   There is a parameter spark.speculation that is turned off by default.
 Look at the configuration doc:
 http://spark.apache.org/docs/latest/configuration.html



   From: Pramod Biligiri pramodbilig...@gmail.com
 Date: Monday, September 15, 2014 at 3:30 PM
 To: user@spark.apache.org user@spark.apache.org
 Subject: Does Spark always wait for stragglers to finish running?

   Hi,
 I'm running Spark tasks with speculation enabled. I'm noticing that Spark
 seems to wait in a given stage for all stragglers to finish, even though
 the speculated alternative might have finished sooner. Is that correct?

  Is there a way to indicate to Spark not to wait for stragglers to finish?

  Thanks,
 Pramod
 --
 http://twitter.com/pramodbiligiri



Re: scala 2.11?

2014-09-15 Thread Matei Zaharia
I think the current plan is to put it in 1.2.0, so that's what I meant by 
soon. It might be possible to backport it too, but I'd be hesitant to do that 
as a maintenance release on 1.1.x and 1.0.x since it would require nontrivial 
changes to the build that could break things on Scala 2.10.

Matei

On September 15, 2014 at 12:19:04 PM, Mark Hamstra (m...@clearstorydata.com) 
wrote:

Are we going to put 2.11 support into 1.1 or 1.0?  Else will be in soon 
applies to the master development branch, but actually in the Spark 1.2.0 
release won't occur until the second half of November at the earliest.

On Mon, Sep 15, 2014 at 12:11 PM, Matei Zaharia matei.zaha...@gmail.com wrote:
Scala 2.11 work is under way in open pull requests though, so hopefully it will 
be in soon.

Matei

On September 15, 2014 at 9:48:42 AM, Mohit Jaggi (mohitja...@gmail.com) wrote:

ah...thanks!

On Mon, Sep 15, 2014 at 9:47 AM, Mark Hamstra m...@clearstorydata.com wrote:
No, not yet.  Spark SQL is using org.scalamacros:quasiquotes_2.10.

On Mon, Sep 15, 2014 at 9:28 AM, Mohit Jaggi mohitja...@gmail.com wrote:
Folks,
I understand Spark SQL uses quasiquotes. Does that mean Spark has now moved to 
Scala 2.11?

Mohit.





SPARK_MASTER_IP

2014-09-15 Thread Mark Grover
Hi Koert,
I work on Bigtop and CDH packaging and you are right, based on my quick
glance, it doesn't seem to be used.

Mark

From: Koert Kuipers ko...@tresata.com
Date: Sat, Sep 13, 2014 at 7:03 AM
Subject: SPARK_MASTER_IP
To: user@spark.apache.org


a grep for SPARK_MASTER_IP shows that sbin/start-master.sh and
sbin/start-slaves.sh are the only ones that use it.

yet for example in CDH5 the spark-master is started from
/etc/init.d/spark-master by running bin/spark-class. does that means
SPARK_MASTER_IP is simply ignored? it looks like that to me. it is
supposed to?


Re: Does Spark always wait for stragglers to finish running?

2014-09-15 Thread Matei Zaharia
It's true that it does not send a kill command right now -- we should probably 
add that. This code was written before tasks were killable AFAIK. However, the 
*job* should still finish while a speculative task is running as far as I know, 
and it will just leave that task behind.

Matei

On September 15, 2014 at 4:51:59 PM, Pramod Biligiri (pramodbilig...@gmail.com) 
wrote:

I'm already running with speculation set to true and the speculated tasks are 
launching, but the issue I'm observing is that Spark does not kill the long 
running task even if the shorter alternative has finished successfully. 
Therefore the overall turnaround time is still the same as without speculation.

Pramod

On Mon, Sep 15, 2014 at 4:22 PM, Du Li l...@yahoo-inc.com wrote:
There is a parameter spark.speculation that is turned off by default. Look at 
the configuration doc: http://spark.apache.org/docs/latest/configuration.html



From: Pramod Biligiri pramodbilig...@gmail.com
Date: Monday, September 15, 2014 at 3:30 PM
To: user@spark.apache.org user@spark.apache.org
Subject: Does Spark always wait for stragglers to finish running?

Hi,
I'm running Spark tasks with speculation enabled. I'm noticing that Spark seems 
to wait in a given stage for all stragglers to finish, even though the 
speculated alternative might have finished sooner. Is that correct?

Is there a way to indicate to Spark not to wait for stragglers to finish?

Thanks,
Pramod
-- 
http://twitter.com/pramodbiligiri



Spark 1.1 / cdh4 stuck using old hadoop client?

2014-09-15 Thread Paul Wais
Dear List,

I'm having trouble getting Spark 1.1 to use the Hadoop 2 API for
reading SequenceFiles.  In particular, I'm seeing:

Exception in thread main org.apache.hadoop.ipc.RemoteException:
Server IPC version 7 cannot communicate with client version 4
at org.apache.hadoop.ipc.Client.call(Client.java:1070)
at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
at com.sun.proxy.$Proxy7.getProtocolVersion(Unknown Source)
...

When invoking JavaSparkContext#newAPIHadoopFile().  (With args
validSequenceFileURI, SequenceFileInputFormat.class, Text.class,
BytesWritable.class, new Job().getConfiguration() -- Pretty close to
the unit test here:
https://github.com/apache/spark/blob/f0f1ba09b195f23f0c89af6fa040c9e01dfa8951/core/src/test/java/org/apache/spark/JavaAPISuite.java#L916
)


This error indicates to me that Spark is using an old hadoop client to
do reads.  Oddly I'm able to do /writes/ ok, i.e. I'm able to write
via JavaPairRdd#saveAsNewAPIHadoopFile() to my hdfs cluster.


Do I need to explicitly build spark for modern hadoop??  I previously
had an hdfs cluster running hadoop 2.3.0 and I was getting a similar
error (server is using version 9, client is using version 4).


I'm using Spark 1.1 cdh4 as well as hadoop cdh4 from the links posted
on spark's site:
 * http://d3kbcqa49mib13.cloudfront.net/spark-1.1.0-bin-cdh4.tgz
 * http://d3kbcqa49mib13.cloudfront.net/hadoop-2.0.0-cdh4.2.0.tar.gz


What distro of hadoop is used at Data Bricks?  Are there distros of
Spark 1.1 and hadoop that should work together out-of-the-box?
(Previously I had Spark 1.0.0 and Hadoop 2.3 working fine..)

Thanks for any help anybody can give me here!
-Paul

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.1 / cdh4 stuck using old hadoop client?

2014-09-15 Thread Christian Chua
Hi Paul.

I would recommend building your own 1.1.0 distribution.

./make-distribution.sh --name hadoop-personal-build-2.4 --tgz -Pyarn 
-Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests



I downloaded the Pre-build for Hadoop 2.4 binary, and it had this strange 
behavior where

spark-submit --master yarn-cluster ...

will work, but

spark-submit --master yarn-client ...

will fail.


But on the personal build obtained from the command above, both will then work.


-Christian




On Sep 15, 2014, at 6:28 PM, Paul Wais pw...@yelp.com wrote:

 Dear List,
 
 I'm having trouble getting Spark 1.1 to use the Hadoop 2 API for
 reading SequenceFiles.  In particular, I'm seeing:
 
 Exception in thread main org.apache.hadoop.ipc.RemoteException:
 Server IPC version 7 cannot communicate with client version 4
at org.apache.hadoop.ipc.Client.call(Client.java:1070)
at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
at com.sun.proxy.$Proxy7.getProtocolVersion(Unknown Source)
...
 
 When invoking JavaSparkContext#newAPIHadoopFile().  (With args
 validSequenceFileURI, SequenceFileInputFormat.class, Text.class,
 BytesWritable.class, new Job().getConfiguration() -- Pretty close to
 the unit test here:
 https://github.com/apache/spark/blob/f0f1ba09b195f23f0c89af6fa040c9e01dfa8951/core/src/test/java/org/apache/spark/JavaAPISuite.java#L916
 )
 
 
 This error indicates to me that Spark is using an old hadoop client to
 do reads.  Oddly I'm able to do /writes/ ok, i.e. I'm able to write
 via JavaPairRdd#saveAsNewAPIHadoopFile() to my hdfs cluster.
 
 
 Do I need to explicitly build spark for modern hadoop??  I previously
 had an hdfs cluster running hadoop 2.3.0 and I was getting a similar
 error (server is using version 9, client is using version 4).
 
 
 I'm using Spark 1.1 cdh4 as well as hadoop cdh4 from the links posted
 on spark's site:
 * http://d3kbcqa49mib13.cloudfront.net/spark-1.1.0-bin-cdh4.tgz
 * http://d3kbcqa49mib13.cloudfront.net/hadoop-2.0.0-cdh4.2.0.tar.gz
 
 
 What distro of hadoop is used at Data Bricks?  Are there distros of
 Spark 1.1 and hadoop that should work together out-of-the-box?
 (Previously I had Spark 1.0.0 and Hadoop 2.3 working fine..)
 
 Thanks for any help anybody can give me here!
 -Paul
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 



Re: scala 2.11?

2014-09-15 Thread Mark Hamstra
Okay, that's consistent with what I was expecting.  Thanks, Matei.

On Mon, Sep 15, 2014 at 5:20 PM, Matei Zaharia matei.zaha...@gmail.com
wrote:

 I think the current plan is to put it in 1.2.0, so that's what I meant by
 soon. It might be possible to backport it too, but I'd be hesitant to do
 that as a maintenance release on 1.1.x and 1.0.x since it would require
 nontrivial changes to the build that could break things on Scala 2.10.

 Matei

 On September 15, 2014 at 12:19:04 PM, Mark Hamstra (
 m...@clearstorydata.com) wrote:

 Are we going to put 2.11 support into 1.1 or 1.0?  Else will be in soon
 applies to the master development branch, but actually in the Spark 1.2.0
 release won't occur until the second half of November at the earliest.

 On Mon, Sep 15, 2014 at 12:11 PM, Matei Zaharia matei.zaha...@gmail.com
 wrote:

  Scala 2.11 work is under way in open pull requests though, so hopefully
 it will be in soon.

  Matei

 On September 15, 2014 at 9:48:42 AM, Mohit Jaggi (mohitja...@gmail.com)
 wrote:

  ah...thanks!

 On Mon, Sep 15, 2014 at 9:47 AM, Mark Hamstra m...@clearstorydata.com
 wrote:

 No, not yet.  Spark SQL is using org.scalamacros:quasiquotes_2.10.

 On Mon, Sep 15, 2014 at 9:28 AM, Mohit Jaggi mohitja...@gmail.com
 wrote:

 Folks,
 I understand Spark SQL uses quasiquotes. Does that mean Spark has now
 moved to Scala 2.11?

 Mohit.







RE: SparkSQL 1.1 hang when DROP or LOAD

2014-09-15 Thread Cheng, Hao
What's your Spark / Hadoop version? And also the hive-site.xml? Most of case 
like that caused by incompatible Hadoop client jar and the Hadoop cluster.

-Original Message-
From: linkpatrickliu [mailto:linkpatrick...@live.com] 
Sent: Monday, September 15, 2014 2:35 PM
To: u...@spark.incubator.apache.org
Subject: SparkSQL 1.1 hang when DROP or LOAD

I started sparkSQL thrift server:
sbin/start-thriftserver.sh

Then I use beeline to connect to it:
bin/beeline
!connect jdbc:hive2://localhost:1 op1 op1

I have created a database for user op1.
create database dw_op1;

And grant all privileges to user op1;
grant all on database dw_op1 to user op1;

Then I create a table:
create tabel src(key int, value string)

Now, I want to load data into this table:
load data inpath kv1.txt into table src; (kv1.txt is located in the
/user/op1 directory in hdfs)

However, the client will hang...

The log in the thrift server:
14/09/15 14:21:25 INFO Driver: PERFLOG method=acquireReadWriteLocks


Then I ctrl-C to stop the beeline client, and restart the beelien client.
Now I want to drop the table src in dw_op1; use dw_op1
drop table src

Then, the beeline client is hanging again..
The log in the thrift server:
14/09/15 14:23:27 INFO Driver: PERFLOG method=acquireReadWriteLocks


Anyone can help on this? Many thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-1-1-hang-when-DROP-or-LOAD-tp14222.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



About SpakSQL OR MLlib

2014-09-15 Thread boyingk...@163.com
Hi:
I have a dataset ,the struct [id,driverAge,TotalKiloMeter ,Emissions 
,date,KiloMeter ,fuel], and the data like this:
[1-980,34,221926,9,2005-2-8,123,14]
[1-981,49,271321,15,2005-2-8,181,82]
[1-982,36,189149,18,2005-2-8,162,51]
[1-983,51,232753,5,2005-2-8,106,92]
[1-984,56,45338,8,2005-2-8,156,98]
[1-985,45,132060,4,2005-2-8,179,98]
[1-986,40,15751,5,2005-2-8,149,77]
[1-987,36,167930,17,2005-2-8,121,87]
[1-988,53,44949,4,2005-2-8,195,72]
[1-989,34,252867,5,2005-2-8,181,86]
[1-990,53,152858,11,2005-2-8,130,43]
[1-991,40,126831,11,2005-2-8,126,47]
……

now ,my requirments is group by driverAge, five is a step,like 20~25 is a 
group,26~30 is a group?
how should i do ? who can give some code?





boyingk...@163.com

Re: SPARK_MASTER_IP

2014-09-15 Thread Koert Kuipers
hey mark,
you think that this is on purpose, or is it an omission? thanks, koert

On Mon, Sep 15, 2014 at 8:32 PM, Mark Grover m...@apache.org wrote:

 Hi Koert,
 I work on Bigtop and CDH packaging and you are right, based on my quick
 glance, it doesn't seem to be used.

 Mark

 From: Koert Kuipers ko...@tresata.com
 Date: Sat, Sep 13, 2014 at 7:03 AM
 Subject: SPARK_MASTER_IP
 To: user@spark.apache.org


 a grep for SPARK_MASTER_IP shows that sbin/start-master.sh and
 sbin/start-slaves.sh are the only ones that use it.

 yet for example in CDH5 the spark-master is started from
 /etc/init.d/spark-master by running bin/spark-class. does that means
 SPARK_MASTER_IP is simply ignored? it looks like that to me. it is
 supposed to?



Re: About SpakSQL OR MLlib

2014-09-15 Thread Soumya Simanta
case class Car(id:String,age:Int,tkm:Int,emissions:Int,date:Date, km:Int,
fuel:Int)

1. Create an PairedRDD of (age,Car) tuples (pairedRDD)
2. Create a new function fc

//returns the interval lower and upper bound

def fc(x:Int, interval:Int) : (Int,Int) = {

 val floor = x - (x%interval)

 val ceil = floor + interval

 (floor,ceil)

 }
3. do a groupBy on this RDD (step 1) by passing the function fc

val myrdd = pairedRDD.groupBy( x = fun(x.age, 5) )


On Mon, Sep 15, 2014 at 11:38 PM, boyingk...@163.com boyingk...@163.com
wrote:

  Hi:
 I have a dataset ,the struct [id,driverAge,TotalKiloMeter ,Emissions
 ,date,KiloMeter ,fuel], and the data like this:
  [1-980,34,221926,9,2005-2-8,123,14]
 [1-981,49,271321,15,2005-2-8,181,82]
 [1-982,36,189149,18,2005-2-8,162,51]
 [1-983,51,232753,5,2005-2-8,106,92]
 [1-984,56,45338,8,2005-2-8,156,98]
 [1-985,45,132060,4,2005-2-8,179,98]
 [1-986,40,15751,5,2005-2-8,149,77]
 [1-987,36,167930,17,2005-2-8,121,87]
 [1-988,53,44949,4,2005-2-8,195,72]
 [1-989,34,252867,5,2005-2-8,181,86]
 [1-990,53,152858,11,2005-2-8,130,43]
 [1-991,40,126831,11,2005-2-8,126,47]
 ……

 now ,my requirments is group by driverAge, five is a step,like 20~25 is a
 group,26~30 is a group?
 how should i do ? who can give some code?


 --
  boyingk...@163.com



Re: NullWritable not serializable

2014-09-15 Thread Du Li
Hi Matei,

Thanks for your reply.

The Writable classes have never been serializable and this is why it is weird. 
I did try as you suggested to map the Writables to integers and strings. It 
didn’t pass, either. Similar exceptions were thrown except that the messages 
became IntWritable, Text are not serializable. The reason is in the implicits 
defined in the SparkContext object that convert those values into their 
corresponding Writable classes before saving the data in sequence file.

My original code was actual some test cases to try out SequenceFile related 
APIs. The tests all passed when the spark version was specified as 1.0.2. But 
this one failed after I changed the spark version to 1.1.0 the new release, 
nothing else changed. In addition, it failed when I called rdd2.collect(), 
take(1), and first(). But it worked fine when calling rdd2.count(). As you can 
see, count() does not need to serialize and ship data while the other three 
methods do.

Do you recall any difference between spark 1.0 and 1.1 that might cause this 
problem?

Thanks,
Du


From: Matei Zaharia matei.zaha...@gmail.commailto:matei.zaha...@gmail.com
Date: Friday, September 12, 2014 at 9:10 PM
To: Du Li l...@yahoo-inc.com.invalidmailto:l...@yahoo-inc.com.invalid, 
user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org, 
d...@spark.apache.orgmailto:d...@spark.apache.org 
d...@spark.apache.orgmailto:d...@spark.apache.org
Subject: Re: NullWritable not serializable

Hi Du,

I don't think NullWritable has ever been serializable, so you must be doing 
something differently from your previous program. In this case though, just use 
a map() to turn your Writables to serializable types (e.g. null and String).

Matie


On September 12, 2014 at 8:48:36 PM, Du Li 
(l...@yahoo-inc.com.invalidmailto:l...@yahoo-inc.com.invalid) wrote:

Hi,

I was trying the following on spark-shell (built with apache master and hadoop 
2.4.0). Both calling rdd2.collect and calling rdd3.collect threw 
java.io.NotSerializableException: org.apache.hadoop.io.NullWritable.

I got the same problem in similar code of my app which uses the newly released 
Spark 1.1.0 under hadoop 2.4.0. Previously it worked fine with spark 1.0.2 
under either hadoop 2.40 and 0.23.10.

Anybody knows what caused the problem?

Thanks,
Du


import org.apache.hadoop.io.{NullWritable, Text}
val rdd = sc.textFile(README.md)
val res = rdd.map(x = (NullWritable.get(), new Text(x)))
res.saveAsSequenceFile(./test_data)
val rdd2 = sc.sequenceFile(./test_data, classOf[NullWritable], classOf[Text])
rdd2.collect
val rdd3 = sc.sequenceFile[NullWritable,Text](./test_data)
rdd3.collect




RE: SparkSQL 1.1 hang when DROP or LOAD

2014-09-15 Thread linkpatrickliu
Hi, Hao Cheng,

Here is the Spark\Hadoop version:
Spark version = 1.1.0
Hadoop version = 2.0.0-cdh4.6.0

And hive-site.xml:
configuration

  property
namefs.default.name/name
valuehdfs://ns/value
  /property
  property
namedfs.nameservices/name
valuens/value
  /property
  
  property
namedfs.ha.namenodes.ns/name
valuemachine01,machine02/value
  /property
  
  property
namedfs.namenode.rpc-address.ns.machine01/name
valuemachine01:54310/value

  /property
  property
namedfs.namenode.rpc-address.ns.machine02/name
valuemachine02:54310/value

  /property
  
  property
namedfs.client.failover.proxy.provider.ns/name
   
valueorg.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider/value
  /property
  property
namejavax.jdo.option.ConnectionURL/name
valuejdbc:mysql://localhost:3306/metastore/value
descriptionJDBC connect string for a JDBC metastore/description
  /property
  property
namejavax.jdo.option.ConnectionDriverName/name
valuecom.mysql.jdbc.Driver/value
descriptionDriver class name for a JDBC metastore/description
  /property
  property
namejavax.jdo.option.ConnectionUserName/name
valuehive_user/value
  /property
  property
namejavax.jdo.option.ConnectionPassword/name
valuehive_123/value
  /property
  property
namedatanucleus.autoCreateSchema/name
valuefalse/value
  /property 
  property
namedatanucleus.autoCreateTables/name
valuetrue/value
  /property 
  property
namedatanucleus.fixedDatastore/name
valuefalse/value
  /property
  property
namehive.support.concurrency/name
descriptionEnable Hive's Table Lock Manager Service/description
valuetrue/value
  /property

  property
namehive.zookeeper.quorum/name
valuemachine01,machine02,machine03/value
descriptionZookeeper quorum used by Hive's Table Lock
Manager/description
  /property
  property
namehive.metastore.warehouse.dir/name
value/user/hive/warehouse/value
descriptionHive warehouse directory/description
  /property
  property
namemapred.job.tracker/name
valuemachine01:8032/value
  /property
  property
 nameio.compression.codecs/name
 valueorg.apache.hadoop.io.compress.SnappyCodec/value
  /property
  property
namemapreduce.output.fileoutputformat.compress.codec/name
valueorg.apache.hadoop.io.compress.SnappyCodec/value
  /property
  property
namemapreduce.output.fileoutputformat.compress.type/name
valueBLOCK/value
  /property
  property
namehive.exec.show.job.failure.debug.info/name
valuetrue/value
description
If a job fails, whether to provide a link in the CLI to the task with
the
most failures, along with debugging hints if applicable.
/description
  /property
  property
namehive.hwi.listen.host/name
value0.0.0.0/value
descriptionThis is the host address the Hive Web Interface will listen
on/description
  /property
  property
namehive.hwi.listen.port/name
value/value
descriptionThis is the port the Hive Web Interface will listen
on/description
  /property
  property
namehive.hwi.war.file/name
value/lib/hive-hwi-0.10.0-cdh4.2.0.war/value
descriptionThis is the WAR file with the jsp content for Hive Web
Interface/description
  /property
  property
namehive.aux.jars.path/name
   
valuefile:///usr/lib/hive/lib/hive-hbase-handler-0.10.0-cdh4.6.0.jar,file:///usr/lib/hbase/hbase-0.94.15-cdh4.6.0-security.jar,file:///usr/lib/zookeeper/zookeeper.jar/value
  /property
  property
 namehbase.zookeeper.quorum/name
 valuemachine01,machine02,machine03/value
  /property
  property
namehive.cli.print.header/name
valuetrue/value
  /property
  property
namehive.metastore.execute.setugi/name
valuetrue/value
descriptionIn unsecure mode, setting this property to true will cause
the metastore to execute DFS operations using the client's reported user and
group permissions. Note that this property must be set on both the client
and server sides. Further note that its best effort. If client sets its to
true and server sets it to false, client setting will be
ignored./description
  /property
  property
namehive.security.authorization.enabled/name
valuetrue/value
descriptionenable or disable the hive client
authorization/description
  /property
  property
namehive.metastore.authorization.storage.checks/name
valuetrue/value
  /property
  property
namehive.security.authorization.createtable.owner.grants/name
valueALL/value
descriptionthe privileges automatically granted to the owner whenever
a table gets created.
An example like select,drop will grant select and drop privilege to
the owner of the table/description
  /property 
/configuration



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-1-1-hang-when-DROP-or-LOAD-tp14222p14320.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

RE: SparkSQL 1.1 hang when DROP or LOAD

2014-09-15 Thread linkpatrickliu
Hi, Hao Cheng,

Here is the Spark\Hadoop version:
Spark version = 1.1.0
Hadoop version = 2.0.0-cdh4.6.0

And hive-site.xml:
configuration

  property
namefs.default.name/name
valuehdfs://ns/value
  /property
  property
namedfs.nameservices/name
valuens/value
  /property
  
  property
namedfs.ha.namenodes.ns/name
valuemachine01,machine02/value
  /property
  
  property
namedfs.namenode.rpc-address.ns.machine01/name
valuemachine01:54310/value

  /property
  property
namedfs.namenode.rpc-address.ns.machine02/name
valuemachine02:54310/value

  /property
  
  property
namedfs.client.failover.proxy.provider.ns/name
   
valueorg.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider/value
  /property
  property
namejavax.jdo.option.ConnectionURL/name
valuejdbc:mysql://localhost:3306/metastore/value
descriptionJDBC connect string for a JDBC metastore/description
  /property
  property
namejavax.jdo.option.ConnectionDriverName/name
valuecom.mysql.jdbc.Driver/value
descriptionDriver class name for a JDBC metastore/description
  /property
  property
namejavax.jdo.option.ConnectionUserName/name
valuehive_user/value
  /property
  property
namejavax.jdo.option.ConnectionPassword/name
valuehive_123/value
  /property
  property
namedatanucleus.autoCreateSchema/name
valuefalse/value
  /property 
  property
namedatanucleus.autoCreateTables/name
valuetrue/value
  /property 
  property
namedatanucleus.fixedDatastore/name
valuefalse/value
  /property
  property
namehive.support.concurrency/name
descriptionEnable Hive's Table Lock Manager Service/description
valuetrue/value
  /property

  property
namehive.zookeeper.quorum/name
valuemachine01,machine02,machine03/value
descriptionZookeeper quorum used by Hive's Table Lock
Manager/description
  /property
  property
namehive.metastore.warehouse.dir/name
value/user/hive/warehouse/value
descriptionHive warehouse directory/description
  /property
  property
namemapred.job.tracker/name
valuemachine01:8032/value
  /property
  property
 nameio.compression.codecs/name
 valueorg.apache.hadoop.io.compress.SnappyCodec/value
  /property
  property
namemapreduce.output.fileoutputformat.compress.codec/name
valueorg.apache.hadoop.io.compress.SnappyCodec/value
  /property
  property
namemapreduce.output.fileoutputformat.compress.type/name
valueBLOCK/value
  /property
  property
namehive.exec.show.job.failure.debug.info/name
valuetrue/value
description
If a job fails, whether to provide a link in the CLI to the task with
the
most failures, along with debugging hints if applicable.
/description
  /property
  property
namehive.hwi.listen.host/name
value0.0.0.0/value
descriptionThis is the host address the Hive Web Interface will listen
on/description
  /property
  property
namehive.hwi.listen.port/name
value/value
descriptionThis is the port the Hive Web Interface will listen
on/description
  /property
  property
namehive.hwi.war.file/name
value/lib/hive-hwi-0.10.0-cdh4.2.0.war/value
descriptionThis is the WAR file with the jsp content for Hive Web
Interface/description
  /property
  property
namehive.aux.jars.path/name
   
valuefile:///usr/lib/hive/lib/hive-hbase-handler-0.10.0-cdh4.6.0.jar,file:///usr/lib/hbase/hbase-0.94.15-cdh4.6.0-security.jar,file:///usr/lib/zookeeper/zookeeper.jar/value
  /property
  property
 namehbase.zookeeper.quorum/name
 valuemachine01,machine02,machine03/value
  /property
  property
namehive.cli.print.header/name
valuetrue/value
  /property
  property
namehive.metastore.execute.setugi/name
valuetrue/value
descriptionIn unsecure mode, setting this property to true will cause
the metastore to execute DFS operations using the client's reported user and
group permissions. Note that this property must be set on both the client
and server sides. Further note that its best effort. If client sets its to
true and server sets it to false, client setting will be
ignored./description
  /property
  property
namehive.security.authorization.enabled/name
valuetrue/value
descriptionenable or disable the hive client
authorization/description
  /property
  property
namehive.metastore.authorization.storage.checks/name
valuetrue/value
  /property
  property
namehive.security.authorization.createtable.owner.grants/name
valueALL/value
descriptionthe privileges automatically granted to the owner whenever
a table gets created.
An example like select,drop will grant select and drop privilege to
the owner of the table/description
  /property 
/configuration



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-1-1-hang-when-DROP-or-LOAD-tp14222p14319.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

RE: SparkSQL 1.1 hang when DROP or LOAD

2014-09-15 Thread Cheng, Hao
The Hadoop client jar should be assembled into the uber-jar, but (I suspect) 
it's probably not compatible with your Hadoop Cluster.
Can you also paste the Spark uber-jar name? Usually will be under the path 
lib/spark-assembly-1.1.0-xxx-hadoopxxx.jar.


-Original Message-
From: linkpatrickliu [mailto:linkpatrick...@live.com] 
Sent: Tuesday, September 16, 2014 12:14 PM
To: u...@spark.incubator.apache.org
Subject: RE: SparkSQL 1.1 hang when DROP or LOAD

Hi, Hao Cheng,

Here is the Spark\Hadoop version:
Spark version = 1.1.0
Hadoop version = 2.0.0-cdh4.6.0

And hive-site.xml:
configuration

  property
namefs.default.name/name
valuehdfs://ns/value
  /property
  property
namedfs.nameservices/name
valuens/value
  /property
  
  property
namedfs.ha.namenodes.ns/name
valuemachine01,machine02/value
  /property
  
  property
namedfs.namenode.rpc-address.ns.machine01/name
valuemachine01:54310/value

  /property
  property
namedfs.namenode.rpc-address.ns.machine02/name
valuemachine02:54310/value

  /property
  
  property
namedfs.client.failover.proxy.provider.ns/name
   
valueorg.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider/value
  /property
  property
namejavax.jdo.option.ConnectionURL/name
valuejdbc:mysql://localhost:3306/metastore/value
descriptionJDBC connect string for a JDBC metastore/description
  /property
  property
namejavax.jdo.option.ConnectionDriverName/name
valuecom.mysql.jdbc.Driver/value
descriptionDriver class name for a JDBC metastore/description
  /property
  property
namejavax.jdo.option.ConnectionUserName/name
valuehive_user/value
  /property
  property
namejavax.jdo.option.ConnectionPassword/name
valuehive_123/value
  /property
  property
namedatanucleus.autoCreateSchema/name
valuefalse/value
  /property
  property
namedatanucleus.autoCreateTables/name
valuetrue/value
  /property
  property
namedatanucleus.fixedDatastore/name
valuefalse/value
  /property
  property
namehive.support.concurrency/name
descriptionEnable Hive's Table Lock Manager Service/description
valuetrue/value
  /property

  property
namehive.zookeeper.quorum/name
valuemachine01,machine02,machine03/value
descriptionZookeeper quorum used by Hive's Table Lock 
Manager/description
  /property
  property
namehive.metastore.warehouse.dir/name
value/user/hive/warehouse/value
descriptionHive warehouse directory/description
  /property
  property
namemapred.job.tracker/name
valuemachine01:8032/value
  /property
  property
 nameio.compression.codecs/name
 valueorg.apache.hadoop.io.compress.SnappyCodec/value
  /property
  property
namemapreduce.output.fileoutputformat.compress.codec/name
valueorg.apache.hadoop.io.compress.SnappyCodec/value
  /property
  property
namemapreduce.output.fileoutputformat.compress.type/name
valueBLOCK/value
  /property
  property
namehive.exec.show.job.failure.debug.info/name
valuetrue/value
description
If a job fails, whether to provide a link in the CLI to the task with the
most failures, along with debugging hints if applicable.
/description
  /property
  property
namehive.hwi.listen.host/name
value0.0.0.0/value
descriptionThis is the host address the Hive Web Interface will listen 
on/description
  /property
  property
namehive.hwi.listen.port/name
value/value
descriptionThis is the port the Hive Web Interface will listen 
on/description
  /property
  property
namehive.hwi.war.file/name
value/lib/hive-hwi-0.10.0-cdh4.2.0.war/value
descriptionThis is the WAR file with the jsp content for Hive Web 
Interface/description
  /property
  property
namehive.aux.jars.path/name
   
valuefile:///usr/lib/hive/lib/hive-hbase-handler-0.10.0-cdh4.6.0.jar,file:///usr/lib/hbase/hbase-0.94.15-cdh4.6.0-security.jar,file:///usr/lib/zookeeper/zookeeper.jar/value
  /property
  property
 namehbase.zookeeper.quorum/name
 valuemachine01,machine02,machine03/value
  /property
  property
namehive.cli.print.header/name
valuetrue/value
  /property
  property
namehive.metastore.execute.setugi/name
valuetrue/value
descriptionIn unsecure mode, setting this property to true will cause the 
metastore to execute DFS operations using the client's reported user and group 
permissions. Note that this property must be set on both the client and server 
sides. Further note that its best effort. If client sets its to true and server 
sets it to false, client setting will be ignored./description
  /property
  property
namehive.security.authorization.enabled/name
valuetrue/value
descriptionenable or disable the hive client authorization/description
  /property
  property
namehive.metastore.authorization.storage.checks/name
valuetrue/value
  /property
  property

Re: Re: About SpakSQL OR MLlib

2014-09-15 Thread boyingk...@163.com
case class Car(id:String,age:Int,tkm:Int,emissions:Int,date:Date, km:Int,
fuel:Int)

1. Create an PairedRDD of (age,Car) tuples (pairedRDD)
2. Create a new function fc

//returns the interval lower and upper bound

def fc(x:Int, interval:Int) : (Int,Int) = {

 val floor = x - (x%interval)

 val ceil = floor + interval

 (floor,ceil)

 }
3. do a groupBy on this RDD (step 1) by passing the function fc

val myrdd = pairedRDD.groupBy( x = fun(x.age, 5) )


On Mon, Sep 15, 2014 at 11:38 PM, boyingk...@163.com boyingk...@163.com
wrote:

  Hi:
 I have a dataset ,the struct [id,driverAge,TotalKiloMeter ,Emissions
 ,date,KiloMeter ,fuel], and the data like this:
  [1-980,34,221926,9,2005-2-8,123,14]
 [1-981,49,271321,15,2005-2-8,181,82]
 [1-982,36,189149,18,2005-2-8,162,51]
 [1-983,51,232753,5,2005-2-8,106,92]
 [1-984,56,45338,8,2005-2-8,156,98]
 [1-985,45,132060,4,2005-2-8,179,98]
 [1-986,40,15751,5,2005-2-8,149,77]
 [1-987,36,167930,17,2005-2-8,121,87]
 [1-988,53,44949,4,2005-2-8,195,72]
 [1-989,34,252867,5,2005-2-8,181,86]
 [1-990,53,152858,11,2005-2-8,130,43]
 [1-991,40,126831,11,2005-2-8,126,47]
 ……

 now ,my requirments is group by driverAge, five is a step,like 20~25 is a
 group,26~30 is a group?
 how should i do ? who can give some code?


 --
  boyingk...@163.com




RE: SparkSQL 1.1 hang when DROP or LOAD

2014-09-15 Thread linkpatrickliu
Hi, Hao Cheng,

This is my spark assembly jar name:
spark-assembly-1.1.0-hadoop2.0.0-cdh4.6.0.jar

I compiled spark 1.1.0 with following cmd:
export MAVEN_OPTS=-Xmx2g -XX:MaxPermSize=512M
-XX:ReservedCodeCacheSize=512m
mvn -Dhadoop.version=2.0.0-cdh4.6.0 -Phive -Pspark-ganglia-lgpl -DskipTests
package




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-1-1-hang-when-DROP-or-LOAD-tp14222p14325.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to set executor num on spark on yarn

2014-09-15 Thread hequn cheng
hi~I want to set the executor number to 16, but it is very strange that
executor cores may affect executor num on spark on yarn, i don't know why
and how to set executor number.
=
./bin/spark-submit --class com.hequn.spark.SparkJoins \
--master yarn-cluster \
--num-executors 16 \
--driver-memory 2g \
--executor-memory 10g \
  *  --executor-cores 4 \*
/home/sparkjoins-1.0-SNAPSHOT.jar

The UI shows there are *7 executors*
=
./bin/spark-submit --class com.hequn.spark.SparkJoins \
--master yarn-cluster \
--num-executors 16 \
--driver-memory 2g \
--executor-memory 10g \
*--executor-cores 2 \*
/home/sparkjoins-1.0-SNAPSHOT.jar

The UI shows there are *9 executors*
=
./bin/spark-submit --class com.hequn.spark.SparkJoins \
--master yarn-cluster \
--num-executors 16 \
--driver-memory 2g \
--executor-memory 10g \
*--executor-cores 1 \*
/home/sparkjoins-1.0-SNAPSHOT.jar

The UI shows there are *9 executors*
==
The cluster contains 16 nodes. Each node 64G RAM.


Complexity/Efficiency of SortByKey

2014-09-15 Thread cjwang
I wonder what algorithm is used to implement sortByKey?  I assume it is some
O(n*log(n))  parallelized on x number of nodes, right?

Then, what size of data would make it worthwhile to use sortByKey on
multiple processors rather than use standard Scala sort functions on a
single processor (considering the overhead of putting stuff into RDDs and
collecting them back)?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Complexity-Efficiency-of-SortByKey-tp14328.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: NullWritable not serializable

2014-09-15 Thread Matei Zaharia
Can you post the exact code for the test that worked in 1.0? I can't think of 
much that could've changed. The one possibility is if  we had some operations 
that were computed locally on the driver (this happens with things like first() 
and take(), which will try to do the first partition locally). But generally 
speaking these operations should *not* work over a network, so you'll have to 
make sure that you only send serializable types through shuffles or collects, 
or use a serialization framework like Kryo that might be okay with Writables.

Matei

On September 15, 2014 at 9:13:13 PM, Du Li (l...@yahoo-inc.com) wrote:

Hi Matei,

Thanks for your reply. 

The Writable classes have never been serializable and this is why it is weird. 
I did try as you suggested to map the Writables to integers and strings. It 
didn’t pass, either. Similar exceptions were thrown except that the messages 
became IntWritable, Text are not serializable. The reason is in the implicits 
defined in the SparkContext object that convert those values into their 
corresponding Writable classes before saving the data in sequence file.

My original code was actual some test cases to try out SequenceFile related 
APIs. The tests all passed when the spark version was specified as 1.0.2. But 
this one failed after I changed the spark version to 1.1.0 the new release, 
nothing else changed. In addition, it failed when I called rdd2.collect(), 
take(1), and first(). But it worked fine when calling rdd2.count(). As you can 
see, count() does not need to serialize and ship data while the other three 
methods do.

Do you recall any difference between spark 1.0 and 1.1 that might cause this 
problem?

Thanks,
Du


From: Matei Zaharia matei.zaha...@gmail.com
Date: Friday, September 12, 2014 at 9:10 PM
To: Du Li l...@yahoo-inc.com.invalid, user@spark.apache.org 
user@spark.apache.org, d...@spark.apache.org d...@spark.apache.org
Subject: Re: NullWritable not serializable

Hi Du,

I don't think NullWritable has ever been serializable, so you must be doing 
something differently from your previous program. In this case though, just use 
a map() to turn your Writables to serializable types (e.g. null and String).

Matie

On September 12, 2014 at 8:48:36 PM, Du Li (l...@yahoo-inc.com.invalid) wrote:

Hi,

I was trying the following on spark-shell (built with apache master and hadoop 
2.4.0). Both calling rdd2.collect and calling rdd3.collect threw 
java.io.NotSerializableException: org.apache.hadoop.io.NullWritable. 

I got the same problem in similar code of my app which uses the newly released 
Spark 1.1.0 under hadoop 2.4.0. Previously it worked fine with spark 1.0.2 
under either hadoop 2.40 and 0.23.10.

Anybody knows what caused the problem?

Thanks,
Du


import org.apache.hadoop.io.{NullWritable, Text}
val rdd = sc.textFile(README.md)
val res = rdd.map(x = (NullWritable.get(), new Text(x)))
res.saveAsSequenceFile(./test_data)
val rdd2 = sc.sequenceFile(./test_data, classOf[NullWritable], classOf[Text])
rdd2.collect
val rdd3 = sc.sequenceFile[NullWritable,Text](./test_data)
rdd3.collect




RE: SparkSQL 1.1 hang when DROP or LOAD

2014-09-15 Thread Cheng, Hao
Sorry, I am not able to reproduce that. 

Can you try add the following entry into the hive-site.xml? I know they have 
the default value, but let's make it explicitly.

hive.server2.thrift.port
hive.server2.thrift.bind.host
hive.server2.authentication (NONE、KERBEROS、LDAP、PAM or CUSTOM)

-Original Message-
From: linkpatrickliu [mailto:linkpatrick...@live.com] 
Sent: Tuesday, September 16, 2014 1:10 PM
To: u...@spark.incubator.apache.org
Subject: RE: SparkSQL 1.1 hang when DROP or LOAD

Besides,

When I use bin/spark-sql, I can Load data and drop table freely.

Only when I use sbin/start-thriftserver.sh and connect with beeline, the client 
will hang!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-1-1-hang-when-DROP-or-LOAD-tp14222p14326.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org



Re: Complexity/Efficiency of SortByKey

2014-09-15 Thread Matei Zaharia
sortByKey is indeed O(n log n), it's a first pass to figure out even-sized 
partitions (by sampling the RDD), then a second pass to do a distributed 
merge-sort (first partition the data on each machine, then run a reduce phase 
that merges the data for each partition). The point where it becomes useful to 
scale out versus a single machine is probably pretty high, because 
communication over a network is *much* slower than memory bandwidth within a 
machine. Generally it would make the most sense for data that doesn't fit in 
memory on a single machine, or data that already starts out distributed.

Please also note that if you run Spark on just one multicore machine, it still 
goes through many of the same code paths as on a cluster (e.g. serializing data 
between tasks) -- it's not optimized to be as fast as, say, a multithreaded 
sort framework. So it wouldn't make a ton of sense to use it for that.

Matei

On September 15, 2014 at 10:32:14 PM, cjwang (c...@cjwang.us) wrote:

I wonder what algorithm is used to implement sortByKey? I assume it is some 
O(n*log(n)) parallelized on x number of nodes, right? 

Then, what size of data would make it worthwhile to use sortByKey on 
multiple processors rather than use standard Scala sort functions on a 
single processor (considering the overhead of putting stuff into RDDs and 
collecting them back)? 



-- 
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Complexity-Efficiency-of-SortByKey-tp14328.html
 
Sent from the Apache Spark User List mailing list archive at Nabble.com. 

- 
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
For additional commands, e-mail: user-h...@spark.apache.org