Re: PathFilter for newAPIHadoopFile?
Hi Eric, Something along the lines of the following should work val fs = getFileSystem(...) // standard hadoop API call val filteredConcatenatedPaths = fs.listStatus(topLevelDirPath, pathFilter).map(_.getPath.toString).mkString(,) // pathFilter is an instance of org.apache.hadoop.fs.PathFilter val parquetRdd = sc.hadoopFile(filteredConcatenatedPaths, classOf[ParquetInputFormat[Something]], classOf[Void], classOf[SomeAvroType], getConfiguration(...)) You have to do some initializations on ParquetInputFormat such as AvroReadSetup/AvroWriteSupport etc but that you should be doing already I am guessing. Cheers, Nat On Sun, Sep 14, 2014 at 7:37 PM, Eric Friedman eric.d.fried...@gmail.com wrote: Hi, I have a directory structure with parquet+avro data in it. There are a couple of administrative files (.foo and/or _foo) that I need to ignore when processing this data or Spark tries to read them as containing parquet content, which they do not. How can I set a PathFilter on the FileInputFormat used to construct an RDD? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Accuracy hit in classification with Spark
Hi, I have been able to get the same accuracy with MLlib as Mahout's. The pre-processing phase of Mahout was the reason behind the accuracy mismatch. After studying and applying the same logic in my code, it worked like a charm. Thanks, Jatin - Novice Big Data Programmer -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Accuracy-hit-in-classification-with-Spark-tp13773p14221.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SparkSQL 1.1 hang when DROP or LOAD
I started sparkSQL thrift server: sbin/start-thriftserver.sh Then I use beeline to connect to it: bin/beeline !connect jdbc:hive2://localhost:1 op1 op1 I have created a database for user op1. create database dw_op1; And grant all privileges to user op1; grant all on database dw_op1 to user op1; Then I create a table: create tabel src(key int, value string) Now, I want to load data into this table: load data inpath kv1.txt into table src; (kv1.txt is located in the /user/op1 directory in hdfs) However, the client will hang... The log in the thrift server: 14/09/15 14:21:25 INFO Driver: PERFLOG method=acquireReadWriteLocks Then I ctrl-C to stop the beeline client, and restart the beelien client. Now I want to drop the table src in dw_op1; use dw_op1 drop table src Then, the beeline client is hanging again.. The log in the thrift server: 14/09/15 14:23:27 INFO Driver: PERFLOG method=acquireReadWriteLocks Anyone can help on this? Many thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-1-1-hang-when-DROP-or-LOAD-tp14222.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: combineByKey throws ClassCastException
How about this. scala val rdd2 = rdd.combineByKey( | (v: Int) = v.toLong, | (c: Long, v: Int) = c + v, | (c1: Long, c2: Long) = c1 + c2) rdd2: org.apache.spark.rdd.RDD[(String, Long)] = MapPartitionsRDD[9] at combineB yKey at console:14 xj @ Tokyo On Mon, Sep 15, 2014 at 3:06 PM, Tao Xiao xiaotao.cs@gmail.com wrote: I followd an example presented in the tutorial Learning Spark http://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html to compute the per-key average as follows: val Array(appName) = args val sparkConf = new SparkConf() .setAppName(appName) val sc = new SparkContext(sparkConf) /* * compute the per-key average of values * results should be: *A : 5.8 *B : 14 *C : 60.6 */ val rdd = sc.parallelize(List( (A, 3), (A, 9), (A, 12), (A, 0), (A, 5), (B, 4), (B, 10), (B, 11), (B, 20), (B, 25), (C, 32), (C, 91), (C, 122), (C, 3), (C, 55)), 2) val avg = rdd.combineByKey( (x:Int) = (x, 1), // java.lang.ClassCastException: scala.Tuple2$mcII$sp cannot be cast to java.lang.Integer (acc:(Int, Int), x) = (acc._1 + x, acc._2 + 1), (acc1:(Int, Int), acc2:(Int, Int)) = (acc1._1 + acc2._1, acc1._2 + acc2._2)) .map{case (s, t) = (s, t._1/t._2.toFloat)} avg.collect.foreach(t = println(t._1 + - + t._2)) When I submitted the application, an exception of *java.lang.ClassCastException: scala.Tuple2$mcII$sp cannot be cast to java.lang.Integer* was thrown out. The tutorial said that the first function of *combineByKey*, *(x:Int) = (x, 1)*, should take a single element in the source RDD and return an element of the desired type in the resulting RDD. In my application, we take a single element of type *Int *from the source RDD and return a tuple of type (*Int*, *Int*), which meets the requirements quite well. But why would such an exception be thrown? I'm using CDH 5.0 and Spark 0.9 Thanks.
Re: Developing a spark streaming application
Just for the record, this is being discussed at StackOverflow: http://stackoverflow.com/questions/25663026/developing-a-spark-streaming-application/25766618 2014-08-27 10:28 GMT+02:00 Filip Andrei andreis.fi...@gmail.com: Hey guys, so the problem i'm trying to tackle is the following: - I need a data source that emits messages at a certain frequency - There are N neural nets that need to process each message individually - The outputs from all neural nets are aggregated and only when all N outputs for each message are collected, should a message be declared fully processed - At the end i should measure the time it took for a message to be fully processed (time between when it was emitted and when all N neural net outputs from that message have been collected) What i'm mostly interested in is if i approached the problem correctly in the first place and if so some best practice pointers on my approach. And my current implementation if the following: For a data source i created the class public class JavaRandomReceiver extends ReceiverMaplt;String, Object As i decided a key-value store would be best suited to holding emitted data. The onStart() method initializes a custom random sequence generator and starts a thread that continuously generates new neural net inputs and stores them as following: SensorData sdata = generator.createSensorData(); MapString, Object result = new HashMapString, Object(); result.put(msgNo, sdata.getMsgNo()); result.put(sensorTime, sdata.getSampleTime()); result.put(list, sdata.getPayload()); result.put(timeOfProc, sdata.getCreationTime()); store(result); // sleeps for a given amount of time set at generator creation generator.waitForNextTuple(); The msgNo here is incremented for each newly created message and is used to keep The neural net functionality is added by creating a custom mapper public class NeuralNetMapper implements FunctionMaplt;String, Object, MapString, Object whose call function basically just takes the input map, plugs its list object as the input to the neural net object, replaces the map's initial list with the neural net output and returns the modified map. The aggregator is implemented as a single class that has the following form public class JavaSyncBarrier implements FunctionJavaRDDlt;Maplt;String,Object, Void This class maintains a google guava cache of neural net outputs that it has received in the form of Long, Listlt;Maplt;String, Object, where the Long value is the msgNo and the list contains all maps containing said message number. When a new map is received, it is added to the cache, its list's length is compared to to the total number of neural nets and, if these numbers match, that message number is said to be fully processed and a difference between timeOfProc (all maps with the same msgNo have the same timeOfProc) and the current system time is displayed as the total time necessary for processing. Now the way all these components are linked together is the following: public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setAppName(SimpleSparkStreamingTest); JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(1000)); jssc.checkpoint(/tmp/spark-tempdir); // Generator config goes here // Set to emit new message every 1 second // --- // Neural net config goes here // --- JavaReceiverInputDStreamMaplt;String, Object rndLists = jssc .receiverStream(new JavaRandomReceiver(generatorConfig); ListJavaDStreamlt;Maplt;String, Object neuralNetOutputStreams = new ArrayListJavaDStreamlt;Maplt;String, Object(); for(int i = 0; i numberOfNets; i++){ neuralNetOutputStreams .add( rndLists.map(new NeuralNetMapper(neuralNetConfig)) ); } JavaDStreamMaplt;String, Object joined = joinStreams(neuralNetOutputs); joined.foreach(new JavaSyncBarrier(numberOfNets)); jssc.start(); jssc.awaitTermination(); } where joinStreams unifies a list of streams: public static T JavaDStreamT joinStreams(ListJavaDStreamlt;T streams) { JavaDStreamT result = streams.get(0); for (int i = 1; i streams.size(); i++) { result = result.union(streams.get(i)); } return result; } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Developing-a-spark-streaming-application-tp12893.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail:
Re: When does Spark switch from PROCESS_LOCAL to NODE_LOCAL or RACK_LOCAL?
Hi Brad and Nick, Thanks for the comments! I opened a ticket to get a more thorough explanation of data locality into the docs here: https://issues.apache.org/jira/browse/SPARK-3526 If you could put any other unanswered questions you have about data locality on that ticket I'll try to incorporate answers to them in the final addition I send in. Andrew On Sun, Sep 14, 2014 at 6:47 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi Andrew, I agree with Nicholas. That was a nice, concise summary of the meaning of the locality customization options, indicators and default Spark behaviors. I haven't combed through the documentation end-to-end in a while, but I'm also not sure that information is presently represented somewhere and it would be great to persist it somewhere besides the mailing list. best, -Brad On Fri, Sep 12, 2014 at 12:12 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Andrew, This email was pretty helpful. I feel like this stuff should be summarized in the docs somewhere, or perhaps in a blog post. Do you know if it is? Nick On Thu, Jun 5, 2014 at 6:36 PM, Andrew Ash and...@andrewash.com wrote: The locality is how close the data is to the code that's processing it. PROCESS_LOCAL means data is in the same JVM as the code that's running, so it's really fast. NODE_LOCAL might mean that the data is in HDFS on the same node, or in another executor on the same node, so is a little slower because the data has to travel across an IPC connection. RACK_LOCAL is even slower -- data is on a different server so needs to be sent over the network. Spark switches to lower locality levels when there's no unprocessed data on a node that has idle CPUs. In that situation you have two options: wait until the busy CPUs free up so you can start another task that uses data on that server, or start a new task on a farther away server that needs to bring data from that remote place. What Spark typically does is wait a bit in the hopes that a busy CPU frees up. Once that timeout expires, it starts moving the data from far away to the free CPU. The main tunable option is how far long the scheduler waits before starting to move data rather than code. Those are the spark.locality.* settings here: http://spark.apache.org/docs/latest/configuration.html If you want to prevent this from happening entirely, you can set the values to ridiculously high numbers. The documentation also mentions that 0 has special meaning, so you can try that as well. Good luck! Andrew On Thu, Jun 5, 2014 at 3:13 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote: I noticed that sometimes tasks would switch from PROCESS_LOCAL (I'd assume that this means fully cached) to NODE_LOCAL or even RACK_LOCAL. When these happen things get extremely slow. Does this mean that the executor got terminated and restarted? Is there a way to prevent this from happening (barring the machine actually going down, I'd rather stick with the same process)? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Viewing web UI after fact
Hi Andrew, sorry for late response. Thank you very much for solving my problem. There was no APPLICATION_COMPLETE file in log directory due to not calling sc.stop() at the end of program. With stopping spark context everything works correctly, so thank you again. Best regards, Grzegorz On Fri, Sep 5, 2014 at 8:06 PM, Andrew Or and...@databricks.com wrote: Hi Grzegorz, Can you verify that there are APPLICATION_COMPLETE files in the event log directories? E.g. Does file:/tmp/spark-events/app-name-1234567890/APPLICATION_COMPLETE exist? If not, it could be that your application didn't call sc.stop(), so the ApplicationEnd event is not actually logged. The HistoryServer looks for this special file to identify applications to display. You could also try manually adding the APPLICATION_COMPLETE file to this directory; the HistoryServer should pick this up and display the application, though the information displayed will be incomplete because the log did not capture all the events (sc.stop() does a final close() on the file written). Andrew 2014-09-05 1:50 GMT-07:00 Grzegorz Białek grzegorz.bia...@codilime.com: Hi Andrew, thank you very much for your answer. Unfortunately it still doesn't work. I'm using Spark 1.0.0, and I start history server running sbin/start-history-server.sh dir, although I also set SPARK_HISTORY_OPTS=-Dspark.history.fs.logDirectory in conf/spark-env.sh. I tried also other dir than /tmp/spark-events which have all possible permissions enabled. Also adding file: (and file://) didn't help - history server still shows: History Server Event Log Location: file:/tmp/spark-events/ No Completed Applications Found. Best regards, Grzegorz On Thu, Sep 4, 2014 at 8:20 PM, Andrew Or and...@databricks.com wrote: Hi Grzegorz, Sorry for the late response. Unfortunately, if the Master UI doesn't know about your applications (they are completed with respect to a different Master), then it can't regenerate the UIs even if the logs exist. You will have to use the history server for that. How did you start the history server? If you are using Spark =1.0, you can pass the directory as an argument to the sbin/start-history-server.sh script. Otherwise, you may need to set the following in your conf/spark-env.sh to specify the log directory: export SPARK_HISTORY_OPTS=-Dspark.history.fs.logDirectory=/tmp/spark-events It could also be a permissions thing. Make sure your logs in /tmp/spark-events are accessible by the JVM that runs the history server. Also, there's a chance that /tmp/spark-events is interpreted as an HDFS path depending on which Spark version you're running. To resolve any ambiguity, you may set the log path to file:/tmp/spark-events instead. But first verify whether they actually exist. Let me know if you get it working, -Andrew 2014-08-19 8:23 GMT-07:00 Grzegorz Białek grzegorz.bia...@codilime.com : Hi, Is there any way view history of applications statistics in master ui after restarting master server? I have all logs ing /tmp/spark-events/ but when I start history server in this directory it says No Completed Applications Found. Maybe I could copy this logs to dir used by master server but I couldn't find any. Or maybe I'm doing something wrong launching history server. Do you have any idea how to solve it? Thanks, Grzegorz On Thu, Aug 14, 2014 at 10:53 AM, Grzegorz Białek grzegorz.bia...@codilime.com wrote: Hi, Thank you both for your answers. Browsing using Master UI works fine. Unfortunately History Server shows No Completed Applications Found even if logs exists under given directory, but using Master UI is enough for me. Best regards, Grzegorz On Wed, Aug 13, 2014 at 8:09 PM, Andrew Or and...@databricks.com wrote: The Spark UI isn't available through the same address; otherwise new applications won't be able to bind to it. Once the old application finishes, the standalone Master renders the after-the-fact application UI and exposes it under a different URL. To see this, go to the Master UI (master-url:8080) and click on your application in the Completed Applications table. 2014-08-13 10:56 GMT-07:00 Matei Zaharia matei.zaha...@gmail.com: Take a look at http://spark.apache.org/docs/latest/monitoring.html -- you need to launch a history server to serve the logs. Matei On August 13, 2014 at 2:03:08 AM, grzegorz-bialek ( grzegorz.bia...@codilime.com) wrote: Hi, I wanted to access Spark web UI after application stops. I set spark.eventLog.enabled to true and logs are availaible in JSON format in /tmp/spark-event but web UI isn't available under address http://driver-node:4040 I'm running Spark in standalone mode. What should I do to access web UI after application ends? Thanks, Grzegorz -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Viewing-web-UI-after-fact-tp12023.html Sent from the Apache Spark User List mailing list
Re: Dependency Problem with Spark / ScalaTest / SBT
Hello, When I remove the line and try to execute sbt run, I end up with the following lines: 14/09/15 10:11:35 INFO ui.SparkUI: Stopped Spark web UI at http://base:4040 [...] 14/09/15 10:11:05 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/09/15 10:11:15 INFO client.AppClient$ClientActor: Connecting to master spark://base:7077... It seems that the configuration within sbt doesn't use my original Spark, because my original Spark web UI is running under http://base:8080. Seems like sbt is starting another spark instance?? Best regards Thorsten Am 14.09.2014 um 18:56 schrieb Dean Wampler: Sorry, I meant any *other* SBT files. However, what happens if you remove the line: exclude(org.eclipse.jetty.orbit, javax.servlet) dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Sun, Sep 14, 2014 at 11:53 AM, Dean Wampler deanwamp...@gmail.com mailto:deanwamp...@gmail.com wrote: Can you post your whole SBT build file(s)? Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Wed, Sep 10, 2014 at 6:48 AM, Thorsten Bergler sp...@tbonline.de mailto:sp...@tbonline.de wrote: Hi, I just called: test or run Thorsten Am 10.09.2014 um 13:38 schrieb arthur.hk.c...@gmail.com mailto:arthur.hk.c...@gmail.com: Hi, What is your SBT command and the parameters? Arthur On 10 Sep, 2014, at 6:46 pm, Thorsten Bergler sp...@tbonline.de mailto:sp...@tbonline.de wrote: Hello, I am writing a Spark App which is already working so far. Now I started to build also some UnitTests, but I am running into some dependecy problems and I cannot find a solution right now. Perhaps someone could help me. I build my Spark Project with SBT and it seems to be configured well, because compiling, assembling and running the built jar with spark-submit are working well. Now I started with the UnitTests, which I located under /src/test/scala. When I call test in sbt, I get the following: 14/09/10 12:22:06 INFO storage.BlockManagerMaster: Registered BlockManager 14/09/10 12:22:06 INFO spark.HttpServer: Starting HTTP Server [trace] Stack trace suppressed: run last test:test for the full output. [error] Could not run test test.scala.SetSuite: java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse [info] Run completed in 626 milliseconds. [info] Total number of tests run: 0 [info] Suites: completed 0, aborted 0 [info] Tests: succeeded 0, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. [error] Error during tests: [error] test.scala.SetSuite [error] (test:test) sbt.TestsFailedException: Tests unsuccessful [error] Total time: 3 s, completed 10.09.2014 12:22:06 last test:test gives me the following: last test:test [debug] Running TaskDef(test.scala.SetSuite, org.scalatest.tools.Framework$$anon$1@6e5626c8, false, [SuiteSelector]) java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse at org.apache.spark.HttpServer.start(HttpServer.scala:54) at org.apache.spark.broadcast.HttpBroadcast$.createServer(HttpBroadcast.scala:156) at org.apache.spark.broadcast.HttpBroadcast$.initialize(HttpBroadcast.scala:127) at org.apache.spark.broadcast.HttpBroadcastFactory.initialize(HttpBroadcastFactory.scala:31) at org.apache.spark.broadcast.BroadcastManager.initialize(BroadcastManager.scala:48) at org.apache.spark.broadcast.BroadcastManager.init(BroadcastManager.scala:35) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:218) at
Re: Broadcast error
Hi Akhil, So with your config (specifically with set(spark.akka.frameSize , 1000)) , I see the error: org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 0:0 was 401970046 bytes which exceeds spark.akka.frameSize (10485760 bytes). Consider using broadcast variables for large values. at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) at org.apache.spark So, I changed set(spark.akka.frameSize , 1000) to set(spark.akka.frameSize , 1000*00*) but now I get the same error? y4j.protocol.Py4JJavaError: An error occurred while calling o28.trainKMeansModel. : org.apache.spark.SparkException: Job aborted due to stage failure: All masters are unresponsive! Giving up. at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGSched along with following: 14/09/15 01:44:11 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/09/15 01:44:21 INFO AppClient$ClientActor: Connecting to master spark://host:7077... 14/09/15 01:44:21 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkMaster@host:7077: akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@host:7077] 14/09/15 01:44:21 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkMaster@host:7077: akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@host:7077] 14/09/15 01:44:21 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkMaster@host:7077: akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@host:7077] 14/09/15 01:44:21 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkMaster@host:7077: akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@host:7077] 14/09/15 01:44:26 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/09/15 01:44:41 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/09/15 01:44:41 ERROR SparkDeploySchedulerBackend: Application has been killed. Reason: All masters are unresponsive! Giving up. :-( On Mon, Sep 15, 2014 at 1:20 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Can you give this a try: conf = SparkConf().set(spark.executor.memory, 32G)*.set(spark.akka.frameSize , 1000).set(spark.broadcast.factory,org.apache.spark.broadcast.TorrentBroadcastFactory)* sc = SparkContext(conf = conf) rdd = sc.parallelize(matrix,5) from pyspark.mllib.clustering import KMeans from math import sqrt clusters = KMeans.train(rdd, 5, maxIterations=2,runs=2, initializationMode=random) def error(point): center = clusters.centers[clusters.predict(point)] return sqrt(sum([x**2 for x in (point - center)])) WSSSE = rdd.map(lambda point: error(point)).reduce(lambda x, y: x + y) print Within Set Sum of Squared Error = + str(WSSSE) Thanks Best Regards On Mon, Sep 15, 2014 at 9:16 AM, Chengi Liu chengi.liu...@gmail.com wrote: And the thing is code runs just fine if I reduce the number of rows in my data? On Sun, Sep 14, 2014 at 8:45 PM, Chengi Liu chengi.liu...@gmail.com wrote: I am using spark1.0.2. This is my work cluster.. so I can't setup a new version readily... But right now, I am not using broadcast .. conf = SparkConf().set(spark.executor.memory, 32G).set(spark.akka.frameSize, 1000) sc = SparkContext(conf = conf) rdd = sc.parallelize(matrix,5) from pyspark.mllib.clustering import KMeans from math import sqrt clusters = KMeans.train(rdd, 5, maxIterations=2,runs=2, initializationMode=random) def error(point): center = clusters.centers[clusters.predict(point)] return sqrt(sum([x**2 for x in (point - center)])) WSSSE = rdd.map(lambda point: error(point)).reduce(lambda x, y: x + y) print Within Set Sum of Squared Error = + str(WSSSE) executed by spark-submit --master $SPARKURL clustering_example.py --executor-memory 32G --driver-memory 60G and the error I see py4j.protocol.Py4JJavaError: An error occurred while calling o26.trainKMeansModel. :
Re: Broadcast error
Try: rdd = sc.broadcast(matrix) Or rdd = sc.parallelize(matrix,100) // Just increase the number of slices, give it a try. Thanks Best Regards On Mon, Sep 15, 2014 at 2:18 PM, Chengi Liu chengi.liu...@gmail.com wrote: Hi Akhil, So with your config (specifically with set(spark.akka.frameSize , 1000)) , I see the error: org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 0:0 was 401970046 bytes which exceeds spark.akka.frameSize (10485760 bytes). Consider using broadcast variables for large values. at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) at org.apache.spark So, I changed set(spark.akka.frameSize , 1000) to set(spark.akka.frameSize , 1000*00*) but now I get the same error? y4j.protocol.Py4JJavaError: An error occurred while calling o28.trainKMeansModel. : org.apache.spark.SparkException: Job aborted due to stage failure: All masters are unresponsive! Giving up. at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGSched along with following: 14/09/15 01:44:11 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/09/15 01:44:21 INFO AppClient$ClientActor: Connecting to master spark://host:7077... 14/09/15 01:44:21 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkMaster@host:7077: akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@host:7077] 14/09/15 01:44:21 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkMaster@host:7077: akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@host:7077] 14/09/15 01:44:21 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkMaster@host:7077: akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@host:7077] 14/09/15 01:44:21 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkMaster@host:7077: akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@host:7077] 14/09/15 01:44:26 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/09/15 01:44:41 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/09/15 01:44:41 ERROR SparkDeploySchedulerBackend: Application has been killed. Reason: All masters are unresponsive! Giving up. :-( On Mon, Sep 15, 2014 at 1:20 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Can you give this a try: conf = SparkConf().set(spark.executor.memory, 32G)*.set(spark.akka.frameSize , 1000).set(spark.broadcast.factory,org.apache.spark.broadcast.TorrentBroadcastFactory)* sc = SparkContext(conf = conf) rdd = sc.parallelize(matrix,5) from pyspark.mllib.clustering import KMeans from math import sqrt clusters = KMeans.train(rdd, 5, maxIterations=2,runs=2, initializationMode=random) def error(point): center = clusters.centers[clusters.predict(point)] return sqrt(sum([x**2 for x in (point - center)])) WSSSE = rdd.map(lambda point: error(point)).reduce(lambda x, y: x + y) print Within Set Sum of Squared Error = + str(WSSSE) Thanks Best Regards On Mon, Sep 15, 2014 at 9:16 AM, Chengi Liu chengi.liu...@gmail.com wrote: And the thing is code runs just fine if I reduce the number of rows in my data? On Sun, Sep 14, 2014 at 8:45 PM, Chengi Liu chengi.liu...@gmail.com wrote: I am using spark1.0.2. This is my work cluster.. so I can't setup a new version readily... But right now, I am not using broadcast .. conf = SparkConf().set(spark.executor.memory, 32G).set(spark.akka.frameSize, 1000) sc = SparkContext(conf = conf) rdd = sc.parallelize(matrix,5) from pyspark.mllib.clustering import KMeans from math import sqrt clusters = KMeans.train(rdd, 5, maxIterations=2,runs=2, initializationMode=random) def error(point): center = clusters.centers[clusters.predict(point)] return sqrt(sum([x**2 for x in (point - center)])) WSSSE = rdd.map(lambda point: error(point)).reduce(lambda x, y: x + y) print Within Set
Re: About SparkSQL 1.1.0 join between more than two table
Spark SQL can support SQL and HiveSQL which used SQLContext and HiveContext separate. As far as I know, SQLContext of Spark SQL 1.1.0 can not support three table join directly. However you can modify your query with subquery such as SELECT * FROM (SELECT * FROM youhao_data left join youhao_age on (youhao_data.rowkey=youhao_age.rowkey)) tmp left join youhao_totalKiloMeter on (tmp.rowkey=youhao_totalKiloMeter.rowkey) HiveContext of Spark 1.1.0 can support three table join. val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) sqlContext.sql(SELECT * FROM youhao_data left join youhao_age on (youhao_data.rowkey=youhao_age.rowkey) left join youhao_totalKiloMeter on (youhao_age.rowkey=youhao_totalKiloMeter.rowkey)) 2014-09-15 10:41 GMT+08:00 boyingk...@163.com boyingk...@163.com: Hi: When I use spark SQL (1.0.1), I found it not support join between three tables,eg: sql(SELECT * FROM youhao_data left join youhao_age on (youhao_data.rowkey=youhao_age.rowkey) left join youhao_totalKiloMeter on (youhao_age.rowkey=youhao_totalKiloMeter.rowkey)) I take the Exception: Exception in thread main java.lang.RuntimeException: [1.90] failure: ``UNION'' expected but `left' found If the Spark SQL 1.1.0 has support join between three tables? -- boyingk...@163.com
Re: Broadcast error
So.. same result with parallelize (matrix,1000) with broadcast.. seems like I got jvm core dump :-/ 4/09/15 02:31:22 INFO BlockManagerInfo: Registering block manager host:47978 with 19.2 GB RAM 14/09/15 02:31:22 INFO BlockManagerInfo: Registering block manager host:43360 with 19.2 GB RAM Unhandled exception Unhandled exception Type=Segmentation error vmState=0x J9Generic_Signal_Number=0004 Signal_Number=000b Error_Value= Signal_Code=0001 Handler1=2BF53760 Handler2=2C3069D0 InaccessibleAddress= RDI=2AB9505F2698 RSI=2AABAE2C54D8 RAX=2AB7CE6009A0 RBX=2AB7CE6009C0 RCX=FFC7FFE0 RDX=2AB8509726A8 R8=7FE41FF0 R9=2000 R10=2DA318A0 R11=2AB850959520 R12=2AB5EF97DD88 R13=2AB5EF97BD88 R14=2C0CE940 R15=2AB5EF97BD88 RIP= GS= FS= RSP=007367A0 EFlags=00210282 CS=0033 RBP=00BCDB00 ERR=0014 TRAPNO=000E OLDMASK= CR2= xmm0 4141414141414141 (f: 1094795648.00, d: 2.261635e+06) xmm1 4141414141414141 (f: 1094795648.00, d: 2.261635e+06) xmm2 4141414141414141 (f: 1094795648.00, d: 2.261635e+06) xmm3 4141414141414141 (f: 1094795648.00, d: 2.261635e+06) xmm4 4141414141414141 (f: 1094795648.00, d: 2.261635e+06) xmm5 4141414141414141 (f: 1094795648.00, d: 2.261635e+06) xmm6 4141414141414141 (f: 1094795648.00, d: 2.261635e+06) xmm7 f180c714f8e2a139 (f: 4175601920.00, d: -5.462583e+238) xmm8 428e8000 (f: 1116635136.00, d: 5.516911e-315) xmm9 (f: 0.00, d: 0.00e+00) xmm10 (f: 0.00, d: 0.00e+00) xmm11 (f: 0.00, d: 0.00e+00) xmm12 (f: 0.00, d: 0.00e+00) xmm13 (f: 0.00, d: 0.00e+00) xmm14 (f: 0.00, d: 0.00e+00) xmm15 (f: 0.00, d: 0.00e+00) Target=2_60_20140106_181350 (Linux 3.0.93-0.8.2_1.0502.8048-cray_ari_c) CPU=amd64 (48 logical CPUs) (0xfc0c5b000 RAM) --- Stack Backtrace --- (0x2C2FA122 [libj9prt26.so+0x13122]) (0x2C30779F [libj9prt26.so+0x2079f]) (0x2C2F9E6B [libj9prt26.so+0x12e6b]) (0x2C2F9F67 [libj9prt26.so+0x12f67]) (0x2C30779F [libj9prt26.so+0x2079f]) (0x2C2F9A8B [libj9prt26.so+0x12a8b]) (0x2BF52C9D [libj9vm26.so+0x1ac9d]) (0x2C30779F [libj9prt26.so+0x2079f]) (0x2BF52F56 [libj9vm26.so+0x1af56]) (0x2BF96CA0 [libj9vm26.so+0x5eca0]) --- JVMDUMP039I JVMDUMP032I Note, this still is with the framesize I modified in the last email thread? On Mon, Sep 15, 2014 at 2:12 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Try: rdd = sc.broadcast(matrix) Or rdd = sc.parallelize(matrix,100) // Just increase the number of slices, give it a try. Thanks Best Regards On Mon, Sep 15, 2014 at 2:18 PM, Chengi Liu chengi.liu...@gmail.com wrote: Hi Akhil, So with your config (specifically with set(spark.akka.frameSize , 1000)) , I see the error: org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 0:0 was 401970046 bytes which exceeds spark.akka.frameSize (10485760 bytes). Consider using broadcast variables for large values. at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) at org.apache.spark So, I changed set(spark.akka.frameSize , 1000) to set(spark.akka.frameSize , 1000*00*) but now I get the same error? y4j.protocol.Py4JJavaError: An error occurred while calling o28.trainKMeansModel. : org.apache.spark.SparkException: Job aborted due to stage failure: All masters are unresponsive! Giving up. at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGSched along with following: 14/09/15 01:44:11 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/09/15 01:44:21 INFO AppClient$ClientActor: Connecting to master spark://host:7077... 14/09/15 01:44:21 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkMaster@host:7077: akka.remote.EndpointAssociationException: Association failed with
Re: Low Level Kafka Consumer for Spark
Hi Dibyendu, Thanks for your great work! I'm new to Spark Streaming, so I just want to make sure I understand Driver failure issue correctly. In my use case, I want to make sure that messages coming in from Kafka are always broken into the same set of RDDs, meaning that if a set of messages are assigned to one RDD, and the Driver dies before this RDD is processed, then once the Driver recovers, the same set of messages are assigned to a single RDD, instead of arbitrarily repartitioning the messages across different RDDs. Does your Receiver guarantee this behavior, until the problem is fixed in Spark 1.2? Regards, Alon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p14233.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
Hi Alon, No this will not be guarantee that same set of messages will come in same RDD. This fix just re-play the messages from last processed offset in same order. Again this is just a interim fix we needed to solve our use case . If you do not need this message re-play feature, just do not perform the ack ( Acknowledgement) call in the Driver code. Then the processed messages will not be written to ZK and hence replay will not happen. Regards, Dibyendu On Mon, Sep 15, 2014 at 4:48 PM, Alon Pe'er alo...@supersonicads.com wrote: Hi Dibyendu, Thanks for your great work! I'm new to Spark Streaming, so I just want to make sure I understand Driver failure issue correctly. In my use case, I want to make sure that messages coming in from Kafka are always broken into the same set of RDDs, meaning that if a set of messages are assigned to one RDD, and the Driver dies before this RDD is processed, then once the Driver recovers, the same set of messages are assigned to a single RDD, instead of arbitrarily repartitioning the messages across different RDDs. Does your Receiver guarantee this behavior, until the problem is fixed in Spark 1.2? Regards, Alon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p14233.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Serving data
Thank you guys, I’ll try Parquet and if that’s not quick enough I’ll go the usual route with either read-only or normal database. On 13.09.2014, at 12:45, andy petrella andy.petre...@gmail.com wrote: however, the cache is not guaranteed to remain, if other jobs are launched in the cluster and require more memory than what's left in the overall caching memory, previous RDDs will be discarded. Using an off heap cache like tachyon as a dump repo can help. In general, I'd say that using a persistent sink (like Cassandra for instance) is best. my .2¢ aℕdy ℙetrella about.me/noootsab On Sat, Sep 13, 2014 at 9:20 AM, Mayur Rustagi mayur.rust...@gmail.com wrote: You can cache data in memory query it using Spark Job Server. Most folks dump data down to a queue/db for retrieval You can batch up data store into parquet partitions as well. query it using another SparkSQL shell, JDBC driver in SparkSQL is part 1.1 i believe. -- Regards, Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi On Fri, Sep 12, 2014 at 2:54 PM, Marius Soutier mps@gmail.com wrote: Hi there, I’m pretty new to Spark, and so far I’ve written my jobs the same way I wrote Scalding jobs - one-off, read data from HDFS, count words, write counts back to HDFS. Now I want to display these counts in a dashboard. Since Spark allows to cache RDDs in-memory and you have to explicitly terminate your app (and there’s even a new JDBC server in 1.1), I’m assuming it’s possible to keep an app running indefinitely and query an in-memory RDD from the outside (via SparkSQL for example). Is this how others are using Spark? Or are you just dumping job results into message queues or databases? Thanks - Marius - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Serving data
I'm using Parquet in ADAM, and I can say that it works pretty fine! Enjoy ;-) aℕdy ℙetrella about.me/noootsab [image: aℕdy ℙetrella on about.me] http://about.me/noootsab On Mon, Sep 15, 2014 at 1:41 PM, Marius Soutier mps@gmail.com wrote: Thank you guys, I’ll try Parquet and if that’s not quick enough I’ll go the usual route with either read-only or normal database. On 13.09.2014, at 12:45, andy petrella andy.petre...@gmail.com wrote: however, the cache is not guaranteed to remain, if other jobs are launched in the cluster and require more memory than what's left in the overall caching memory, previous RDDs will be discarded. Using an off heap cache like tachyon as a dump repo can help. In general, I'd say that using a persistent sink (like Cassandra for instance) is best. my .2¢ aℕdy ℙetrella about.me/noootsab [image: aℕdy ℙetrella on about.me] http://about.me/noootsab On Sat, Sep 13, 2014 at 9:20 AM, Mayur Rustagi mayur.rust...@gmail.com wrote: You can cache data in memory query it using Spark Job Server. Most folks dump data down to a queue/db for retrieval You can batch up data store into parquet partitions as well. query it using another SparkSQL shell, JDBC driver in SparkSQL is part 1.1 i believe. -- Regards, Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi On Fri, Sep 12, 2014 at 2:54 PM, Marius Soutier mps@gmail.com wrote: Hi there, I’m pretty new to Spark, and so far I’ve written my jobs the same way I wrote Scalding jobs - one-off, read data from HDFS, count words, write counts back to HDFS. Now I want to display these counts in a dashboard. Since Spark allows to cache RDDs in-memory and you have to explicitly terminate your app (and there’s even a new JDBC server in 1.1), I’m assuming it’s possible to keep an app running indefinitely and query an in-memory RDD from the outside (via SparkSQL for example). Is this how others are using Spark? Or are you just dumping job results into message queues or databases? Thanks - Marius - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Serving data
So you are living the dream of using HDFS as a database? ;) On 15.09.2014, at 13:50, andy petrella andy.petre...@gmail.com wrote: I'm using Parquet in ADAM, and I can say that it works pretty fine! Enjoy ;-) aℕdy ℙetrella about.me/noootsab On Mon, Sep 15, 2014 at 1:41 PM, Marius Soutier mps@gmail.com wrote: Thank you guys, I’ll try Parquet and if that’s not quick enough I’ll go the usual route with either read-only or normal database. On 13.09.2014, at 12:45, andy petrella andy.petre...@gmail.com wrote: however, the cache is not guaranteed to remain, if other jobs are launched in the cluster and require more memory than what's left in the overall caching memory, previous RDDs will be discarded. Using an off heap cache like tachyon as a dump repo can help. In general, I'd say that using a persistent sink (like Cassandra for instance) is best. my .2¢ aℕdy ℙetrella about.me/noootsab On Sat, Sep 13, 2014 at 9:20 AM, Mayur Rustagi mayur.rust...@gmail.com wrote: You can cache data in memory query it using Spark Job Server. Most folks dump data down to a queue/db for retrieval You can batch up data store into parquet partitions as well. query it using another SparkSQL shell, JDBC driver in SparkSQL is part 1.1 i believe. -- Regards, Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi On Fri, Sep 12, 2014 at 2:54 PM, Marius Soutier mps@gmail.com wrote: Hi there, I’m pretty new to Spark, and so far I’ve written my jobs the same way I wrote Scalding jobs - one-off, read data from HDFS, count words, write counts back to HDFS. Now I want to display these counts in a dashboard. Since Spark allows to cache RDDs in-memory and you have to explicitly terminate your app (and there’s even a new JDBC server in 1.1), I’m assuming it’s possible to keep an app running indefinitely and query an in-memory RDD from the outside (via SparkSQL for example). Is this how others are using Spark? Or are you just dumping job results into message queues or databases? Thanks - Marius - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Upgrading a standalone cluster on ec2 from 1.0.2 to 1.1.0
Hi, I would like to upgrade a standalone cluster to 1.1.0. What's the best way to do it? Should I just replace the existing /root/spark folder with the uncompressed folder from http://d3kbcqa49mib13.cloudfront.net/spark-1.1.0-bin-cdh4.tgz ? What about hdfs and other installations? I have spark 1.0.2 with cdh4 hadoop 2.0 installed currently. Thanks, Tomer - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Write 1 RDD to multiple output paths in one go
Any tips from anybody on how to do this in PySpark? (Or regular Spark, for that matter.) On Sat, Sep 13, 2014 at 1:25 PM, Nick Chammas nicholas.cham...@gmail.com wrote: Howdy doody Spark Users, I’d like to somehow write out a single RDD to multiple paths in one go. Here’s an example. I have an RDD of (key, value) pairs like this: a = sc.parallelize(['Nick', 'Nancy', 'Bob', 'Ben', 'Frankie']).keyBy(lambda x: x[0]) a.collect() [('N', 'Nick'), ('N', 'Nancy'), ('B', 'Bob'), ('B', 'Ben'), ('F', 'Frankie')] Now I want to write the RDD out to different paths depending on the keys, so that I have one output directory per distinct key. Each output directory could potentially have multiple part- files or whatever. So my output would be something like: /path/prefix/n [/part-1, /part-2, etc] /path/prefix/b [/part-1, /part-2, etc] /path/prefix/f [/part-1, /part-2, etc] How would you do that? I suspect I need to use saveAsNewAPIHadoopFile http://spark.apache.org/docs/latest/api/python/pyspark.rdd.RDD-class.html#saveAsNewAPIHadoopFile or saveAsHadoopFile http://spark.apache.org/docs/latest/api/python/pyspark.rdd.RDD-class.html#saveAsHadoopFile along with the MultipleTextOutputFormat output format class, but I’m not sure how. By the way, there is a very similar question to this here on Stack Overflow http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job . Nick -- View this message in context: Write 1 RDD to multiple output paths in one go http://apache-spark-user-list.1001560.n3.nabble.com/Write-1-RDD-to-multiple-output-paths-in-one-go-tp14174.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
vertex active/inactive feature in Pregel API ?
Hi, I am wondering if the vertex active/inactive(corresponding the change of its value between two supersteps) feature is introduced in Pregel API of GraphX? if it is not a default setting, how to call it below? def sendMessage(edge: EdgeTriplet[(Int,HashMap[VertexId, Double]), Int]) = Iterator((edge.dstId, hmCal(edge.srcAttr))) or, I should do that by a customised measure function, e.g. by keeping its change in vertex attribute after each iteration. I noticed that there is an optional parameter “skipStale in mrTriplets operator. Best, Yifan LI
Found both spark.driver.extraClassPath and SPARK_CLASSPATH
in spark 1.1.0 i get this error: 2014-09-14 23:17:01 ERROR actor.OneForOneStrategy: Found both spark.driver.extraClassPath and SPARK_CLASSPATH. Use only the former. i checked my application. i do not set spark.driver.extraClassPath or SPARK_CLASSPATH. SPARK_CLASSPATH is set in spark-env.sh since the machine is a worker and this is how lzo is added to classpath. as a user i cannot modify this. looking at the logs i see the value of SPARK_CLASSPATH ends up in spark.driver.extraClassPath, but thats not done by me. i presume some logic in spark-submit does this.
Re: About SparkSQL 1.1.0 join between more than two table
1.0.1 does not have the support on outer joins (added in 1.1). Your query should be fine in 1.1. On Mon, Sep 15, 2014 at 5:35 AM, Yanbo Liang yanboha...@gmail.com wrote: Spark SQL can support SQL and HiveSQL which used SQLContext and HiveContext separate. As far as I know, SQLContext of Spark SQL 1.1.0 can not support three table join directly. However you can modify your query with subquery such as SELECT * FROM (SELECT * FROM youhao_data left join youhao_age on (youhao_data.rowkey=youhao_age.rowkey)) tmp left join youhao_totalKiloMeter on (tmp.rowkey=youhao_totalKiloMeter.rowkey) HiveContext of Spark 1.1.0 can support three table join. val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) sqlContext.sql(SELECT * FROM youhao_data left join youhao_age on (youhao_data.rowkey=youhao_age.rowkey) left join youhao_totalKiloMeter on (youhao_age.rowkey=youhao_totalKiloMeter.rowkey)) 2014-09-15 10:41 GMT+08:00 boyingk...@163.com boyingk...@163.com: Hi: When I use spark SQL (1.0.1), I found it not support join between three tables,eg: sql(SELECT * FROM youhao_data left join youhao_age on (youhao_data.rowkey=youhao_age.rowkey) left join youhao_totalKiloMeter on (youhao_age.rowkey=youhao_totalKiloMeter.rowkey)) I take the Exception: Exception in thread main java.lang.RuntimeException: [1.90] failure: ``UNION'' expected but `left' found If the Spark SQL 1.1.0 has support join between three tables? -- boyingk...@163.com
Compiler issues for multiple map on RDD
Hello Folks, I am trying to chain a couple of map operations and it seems the second map fails with a mismatch in arguments(event though the compiler prints them to be the same.) I checked the function and variable types using :t and they look ok to me. Have you seen this earlier? I am posting the code, data and output below. Any pointers will be greatly appreciated. Thanks, Boromir. /// SCRIPT val data = sc.textFile(data/testpv.csv) case class KVV(key: String, valvec: Array[Double]) def mapToKV(line: String) : KVV = { val splits = line.split(,) val key = splits(0).trim val valvec = splits.drop(1).map(x = x.trim.toDouble) val kvv = KVV(key, valvec) return kvv } val kvs = data.map(line = mapToKV(line)) def mapKVtoKVL(kvv: KVV) : KVV = { return kvv } val tvar = kvs.map(x = mapKVtoKVL(x)) /// SAMPLE DATA in testpv.csv x,1.1,1.2,1.3 y,2.1,2.2,2.3 /// REPL OUTPUT scala val data = sc.textFile(data/testpv.csv) 14/09/15 10:53:23 INFO MemoryStore: ensureFreeSpace(146579) called with curMem=0, maxMem=308713881 14/09/15 10:53:23 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 143.1 KB, free 294.3 MB) data: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at console:12 scala case class KVV(key: String, valvec: Array[Double]) defined class KVV scala scala def mapToKV(line: String) : KVV = { | val splits = line.split(,) | val key = splits(0).trim | val valvec = splits.drop(1).map(x = x.trim.toDouble) | | val kvv = KVV(key, valvec) | return kvv | } mapToKV: (line: String)KVV scala val kvs = data.map(line = mapToKV(line)) kvs: org.apache.spark.rdd.RDD[KVV] = MappedRDD[2] at map at console:18 scala scala def mapKVtoKVL(kvv: KVV) : KVV = { | return kvv | } mapKVtoKVL: (kvv: KVV)KVV scala val tvar = kvs.map(x = mapKVtoKVL(x)) console:22: error: type mismatch; found : KVV required: KVV val tvar = kvs.map(x = mapKVtoKVL(x)) ^
File I/O in spark
Hi I am trying to perform some read/write file operations in spark. Somehow I am neither able to write to a file nor read. import java.io._ val writer = new PrintWriter(new File(test.txt )) writer.write(Hello Scala) Can someone please tell me how to perform file I/O in spark.
Re: Compiler issues for multiple map on RDD
Looks like another instance of https://issues.apache.org/jira/browse/SPARK-1199 which was intended to be fixed in 1.1.0. I'm not clear whether https://issues.apache.org/jira/browse/SPARK-2620 is the same issue and therefore whether it too is resolved in 1.1? On Mon, Sep 15, 2014 at 4:37 PM, Boromir Widas vcsub...@gmail.com wrote: Hello Folks, I am trying to chain a couple of map operations and it seems the second map fails with a mismatch in arguments(event though the compiler prints them to be the same.) I checked the function and variable types using :t and they look ok to me. Have you seen this earlier? I am posting the code, data and output below. Any pointers will be greatly appreciated. Thanks, Boromir. /// SCRIPT val data = sc.textFile(data/testpv.csv) case class KVV(key: String, valvec: Array[Double]) def mapToKV(line: String) : KVV = { val splits = line.split(,) val key = splits(0).trim val valvec = splits.drop(1).map(x = x.trim.toDouble) val kvv = KVV(key, valvec) return kvv } val kvs = data.map(line = mapToKV(line)) def mapKVtoKVL(kvv: KVV) : KVV = { return kvv } val tvar = kvs.map(x = mapKVtoKVL(x)) /// SAMPLE DATA in testpv.csv x,1.1,1.2,1.3 y,2.1,2.2,2.3 /// REPL OUTPUT scala val data = sc.textFile(data/testpv.csv) 14/09/15 10:53:23 INFO MemoryStore: ensureFreeSpace(146579) called with curMem=0, maxMem=308713881 14/09/15 10:53:23 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 143.1 KB, free 294.3 MB) data: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at console:12 scala case class KVV(key: String, valvec: Array[Double]) defined class KVV scala scala def mapToKV(line: String) : KVV = { | val splits = line.split(,) | val key = splits(0).trim | val valvec = splits.drop(1).map(x = x.trim.toDouble) | | val kvv = KVV(key, valvec) | return kvv | } mapToKV: (line: String)KVV scala val kvs = data.map(line = mapToKV(line)) kvs: org.apache.spark.rdd.RDD[KVV] = MappedRDD[2] at map at console:18 scala scala def mapKVtoKVL(kvv: KVV) : KVV = { | return kvv | } mapKVtoKVL: (kvv: KVV)KVV scala val tvar = kvs.map(x = mapKVtoKVL(x)) console:22: error: type mismatch; found : KVV required: KVV val tvar = kvs.map(x = mapKVtoKVL(x)) ^ - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
scala 2.11?
Folks, I understand Spark SQL uses quasiquotes. Does that mean Spark has now moved to Scala 2.11? Mohit.
Re: How to initiate a shutdown of Spark Streaming context?
Thank you. Would the following approaches to address this problem an overkills? a. create a ServerSocket in a different thread from the main thread that created the Spark StreamingContext, and listens to shutdown command there b. create a web service that wraps around the main thread that created the Spark StreamingContext, and responds to shutdown requests Does Spark Streaming already provide similar capabilities? Stanley -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-initiate-a-shutdown-of-Spark-Streaming-context-tp14092p14252.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: File I/O in spark
Is this code running in an executor? You need to make sure the file is accessible on ALL executors. One way to do that is to use a distributed filesystem like HDFS or GlusterFS. On Mon, Sep 15, 2014 at 8:51 AM, rapelly kartheek kartheek.m...@gmail.com wrote: Hi I am trying to perform some read/write file operations in spark. Somehow I am neither able to write to a file nor read. import java.io._ val writer = new PrintWriter(new File(test.txt )) writer.write(Hello Scala) Can someone please tell me how to perform file I/O in spark.
Re: Compiler issues for multiple map on RDD
(Adding back the user list) Boromir says: Thanks much Sean, verified 1.1.0 does not have this issue. On Mon, Sep 15, 2014 at 4:47 PM, Sean Owen so...@cloudera.com wrote: Looks like another instance of https://issues.apache.org/jira/browse/SPARK-1199 which was intended to be fixed in 1.1.0. I'm not clear whether https://issues.apache.org/jira/browse/SPARK-2620 is the same issue and therefore whether it too is resolved in 1.1? On Mon, Sep 15, 2014 at 4:37 PM, Boromir Widas vcsub...@gmail.com wrote: Hello Folks, I am trying to chain a couple of map operations and it seems the second map fails with a mismatch in arguments(event though the compiler prints them to be the same.) I checked the function and variable types using :t and they look ok to me. Have you seen this earlier? I am posting the code, data and output below. Any pointers will be greatly appreciated. Thanks, Boromir. /// SCRIPT val data = sc.textFile(data/testpv.csv) case class KVV(key: String, valvec: Array[Double]) def mapToKV(line: String) : KVV = { val splits = line.split(,) val key = splits(0).trim val valvec = splits.drop(1).map(x = x.trim.toDouble) val kvv = KVV(key, valvec) return kvv } val kvs = data.map(line = mapToKV(line)) def mapKVtoKVL(kvv: KVV) : KVV = { return kvv } val tvar = kvs.map(x = mapKVtoKVL(x)) /// SAMPLE DATA in testpv.csv x,1.1,1.2,1.3 y,2.1,2.2,2.3 /// REPL OUTPUT scala val data = sc.textFile(data/testpv.csv) 14/09/15 10:53:23 INFO MemoryStore: ensureFreeSpace(146579) called with curMem=0, maxMem=308713881 14/09/15 10:53:23 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 143.1 KB, free 294.3 MB) data: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at console:12 scala case class KVV(key: String, valvec: Array[Double]) defined class KVV scala scala def mapToKV(line: String) : KVV = { | val splits = line.split(,) | val key = splits(0).trim | val valvec = splits.drop(1).map(x = x.trim.toDouble) | | val kvv = KVV(key, valvec) | return kvv | } mapToKV: (line: String)KVV scala val kvs = data.map(line = mapToKV(line)) kvs: org.apache.spark.rdd.RDD[KVV] = MappedRDD[2] at map at console:18 scala scala def mapKVtoKVL(kvv: KVV) : KVV = { | return kvv | } mapKVtoKVL: (kvv: KVV)KVV scala val tvar = kvs.map(x = mapKVtoKVL(x)) console:22: error: type mismatch; found : KVV required: KVV val tvar = kvs.map(x = mapKVtoKVL(x)) ^ - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: File I/O in spark
Yes. I have HDFS. My cluster has 5 nodes. When I run the above commands, I see that the file gets created in the master node. But, there wont be any data written to it. On Mon, Sep 15, 2014 at 10:06 PM, Mohit Jaggi mohitja...@gmail.com wrote: Is this code running in an executor? You need to make sure the file is accessible on ALL executors. One way to do that is to use a distributed filesystem like HDFS or GlusterFS. On Mon, Sep 15, 2014 at 8:51 AM, rapelly kartheek kartheek.m...@gmail.com wrote: Hi I am trying to perform some read/write file operations in spark. Somehow I am neither able to write to a file nor read. import java.io._ val writer = new PrintWriter(new File(test.txt )) writer.write(Hello Scala) Can someone please tell me how to perform file I/O in spark.
Re: File I/O in spark
The file gets created on the fly. So I dont know how to make sure that its accessible to all nodes. On Mon, Sep 15, 2014 at 10:10 PM, rapelly kartheek kartheek.m...@gmail.com wrote: Yes. I have HDFS. My cluster has 5 nodes. When I run the above commands, I see that the file gets created in the master node. But, there wont be any data written to it. On Mon, Sep 15, 2014 at 10:06 PM, Mohit Jaggi mohitja...@gmail.com wrote: Is this code running in an executor? You need to make sure the file is accessible on ALL executors. One way to do that is to use a distributed filesystem like HDFS or GlusterFS. On Mon, Sep 15, 2014 at 8:51 AM, rapelly kartheek kartheek.m...@gmail.com wrote: Hi I am trying to perform some read/write file operations in spark. Somehow I am neither able to write to a file nor read. import java.io._ val writer = new PrintWriter(new File(test.txt )) writer.write(Hello Scala) Can someone please tell me how to perform file I/O in spark.
Re: File I/O in spark
But the above APIs are not for HDFS. On Mon, Sep 15, 2014 at 9:40 AM, rapelly kartheek kartheek.m...@gmail.com wrote: Yes. I have HDFS. My cluster has 5 nodes. When I run the above commands, I see that the file gets created in the master node. But, there wont be any data written to it. On Mon, Sep 15, 2014 at 10:06 PM, Mohit Jaggi mohitja...@gmail.com wrote: Is this code running in an executor? You need to make sure the file is accessible on ALL executors. One way to do that is to use a distributed filesystem like HDFS or GlusterFS. On Mon, Sep 15, 2014 at 8:51 AM, rapelly kartheek kartheek.m...@gmail.com wrote: Hi I am trying to perform some read/write file operations in spark. Somehow I am neither able to write to a file nor read. import java.io._ val writer = new PrintWriter(new File(test.txt )) writer.write(Hello Scala) Can someone please tell me how to perform file I/O in spark.
Re: scala 2.11?
No, not yet. Spark SQL is using org.scalamacros:quasiquotes_2.10. On Mon, Sep 15, 2014 at 9:28 AM, Mohit Jaggi mohitja...@gmail.com wrote: Folks, I understand Spark SQL uses quasiquotes. Does that mean Spark has now moved to Scala 2.11? Mohit.
Re: scala 2.11?
ah...thanks! On Mon, Sep 15, 2014 at 9:47 AM, Mark Hamstra m...@clearstorydata.com wrote: No, not yet. Spark SQL is using org.scalamacros:quasiquotes_2.10. On Mon, Sep 15, 2014 at 9:28 AM, Mohit Jaggi mohitja...@gmail.com wrote: Folks, I understand Spark SQL uses quasiquotes. Does that mean Spark has now moved to Scala 2.11? Mohit.
Re: File I/O in spark
I came across these APIs in one the scala tutorials over the net. On Mon, Sep 15, 2014 at 10:14 PM, Mohit Jaggi mohitja...@gmail.com wrote: But the above APIs are not for HDFS. On Mon, Sep 15, 2014 at 9:40 AM, rapelly kartheek kartheek.m...@gmail.com wrote: Yes. I have HDFS. My cluster has 5 nodes. When I run the above commands, I see that the file gets created in the master node. But, there wont be any data written to it. On Mon, Sep 15, 2014 at 10:06 PM, Mohit Jaggi mohitja...@gmail.com wrote: Is this code running in an executor? You need to make sure the file is accessible on ALL executors. One way to do that is to use a distributed filesystem like HDFS or GlusterFS. On Mon, Sep 15, 2014 at 8:51 AM, rapelly kartheek kartheek.m...@gmail.com wrote: Hi I am trying to perform some read/write file operations in spark. Somehow I am neither able to write to a file nor read. import java.io._ val writer = new PrintWriter(new File(test.txt )) writer.write(Hello Scala) Can someone please tell me how to perform file I/O in spark.
Re: File I/O in spark
Can you please direct me to the right way of doing this. On Mon, Sep 15, 2014 at 10:18 PM, rapelly kartheek kartheek.m...@gmail.com wrote: I came across these APIs in one the scala tutorials over the net. On Mon, Sep 15, 2014 at 10:14 PM, Mohit Jaggi mohitja...@gmail.com wrote: But the above APIs are not for HDFS. On Mon, Sep 15, 2014 at 9:40 AM, rapelly kartheek kartheek.m...@gmail.com wrote: Yes. I have HDFS. My cluster has 5 nodes. When I run the above commands, I see that the file gets created in the master node. But, there wont be any data written to it. On Mon, Sep 15, 2014 at 10:06 PM, Mohit Jaggi mohitja...@gmail.com wrote: Is this code running in an executor? You need to make sure the file is accessible on ALL executors. One way to do that is to use a distributed filesystem like HDFS or GlusterFS. On Mon, Sep 15, 2014 at 8:51 AM, rapelly kartheek kartheek.m...@gmail.com wrote: Hi I am trying to perform some read/write file operations in spark. Somehow I am neither able to write to a file nor read. import java.io._ val writer = new PrintWriter(new File(test.txt )) writer.write(Hello Scala) Can someone please tell me how to perform file I/O in spark.
Re: File I/O in spark
If you underlying filesystem is HDFS, you need to use HDFS APIs. A google search brought up this link which appears reasonable. http://wiki.apache.org/hadoop/HadoopDfsReadWriteExample If you want to use java.io APIs, you have to make sure your filesystem is accessible from all nodes in your cluster. You did not mention what errors you get with your code. They may mean something. On Mon, Sep 15, 2014 at 9:51 AM, rapelly kartheek kartheek.m...@gmail.com wrote: Can you please direct me to the right way of doing this. On Mon, Sep 15, 2014 at 10:18 PM, rapelly kartheek kartheek.m...@gmail.com wrote: I came across these APIs in one the scala tutorials over the net. On Mon, Sep 15, 2014 at 10:14 PM, Mohit Jaggi mohitja...@gmail.com wrote: But the above APIs are not for HDFS. On Mon, Sep 15, 2014 at 9:40 AM, rapelly kartheek kartheek.m...@gmail.com wrote: Yes. I have HDFS. My cluster has 5 nodes. When I run the above commands, I see that the file gets created in the master node. But, there wont be any data written to it. On Mon, Sep 15, 2014 at 10:06 PM, Mohit Jaggi mohitja...@gmail.com wrote: Is this code running in an executor? You need to make sure the file is accessible on ALL executors. One way to do that is to use a distributed filesystem like HDFS or GlusterFS. On Mon, Sep 15, 2014 at 8:51 AM, rapelly kartheek kartheek.m...@gmail.com wrote: Hi I am trying to perform some read/write file operations in spark. Somehow I am neither able to write to a file nor read. import java.io._ val writer = new PrintWriter(new File(test.txt )) writer.write(Hello Scala) Can someone please tell me how to perform file I/O in spark.
Re: File I/O in spark
Kartheek, What exactly are you trying to do? Those APIs are only for local file access. If you want to access data in HDFS, you’ll want to use one of the reader methods in org.apache.spark.SparkContext which will give you an RDD (e.g., newAPIHadoopFile, sequenceFile, or textFile). If you want to write data to HDFS, you’ll need to have an RDD and use one of the functions in org.apache.spark.RDD (saveAsObjectFile or saveAsTextFile) or one of the PairRDDFunctions (e.g., saveAsNewAPIHadoopFile or saveAsNewAPIHadoopDataset). The Scala/Java IO system can be used inside of Spark, but only for local file access. This is used to implement the rdd.pipe method (IIRC), and we use it in some downstream apps to do IO with processes that we spawn from mapPartitions calls (see here and here). Regards, Frank Austin Nothaft fnoth...@berkeley.edu fnoth...@eecs.berkeley.edu 202-340-0466 On Sep 15, 2014, at 9:44 AM, rapelly kartheek kartheek.m...@gmail.com wrote: The file gets created on the fly. So I dont know how to make sure that its accessible to all nodes. On Mon, Sep 15, 2014 at 10:10 PM, rapelly kartheek kartheek.m...@gmail.com wrote: Yes. I have HDFS. My cluster has 5 nodes. When I run the above commands, I see that the file gets created in the master node. But, there wont be any data written to it. On Mon, Sep 15, 2014 at 10:06 PM, Mohit Jaggi mohitja...@gmail.com wrote: Is this code running in an executor? You need to make sure the file is accessible on ALL executors. One way to do that is to use a distributed filesystem like HDFS or GlusterFS. On Mon, Sep 15, 2014 at 8:51 AM, rapelly kartheek kartheek.m...@gmail.com wrote: Hi I am trying to perform some read/write file operations in spark. Somehow I am neither able to write to a file nor read. import java.io._ val writer = new PrintWriter(new File(test.txt )) writer.write(Hello Scala) Can someone please tell me how to perform file I/O in spark.
Re: Broadcast error
I think the 1.1 will be really helpful for you, it's all compatitble with 1.0, so it's not hard to upgrade to 1.1. On Mon, Sep 15, 2014 at 2:35 AM, Chengi Liu chengi.liu...@gmail.com wrote: So.. same result with parallelize (matrix,1000) with broadcast.. seems like I got jvm core dump :-/ 4/09/15 02:31:22 INFO BlockManagerInfo: Registering block manager host:47978 with 19.2 GB RAM 14/09/15 02:31:22 INFO BlockManagerInfo: Registering block manager host:43360 with 19.2 GB RAM Unhandled exception Unhandled exception Type=Segmentation error vmState=0x J9Generic_Signal_Number=0004 Signal_Number=000b Error_Value= Signal_Code=0001 Handler1=2BF53760 Handler2=2C3069D0 InaccessibleAddress= RDI=2AB9505F2698 RSI=2AABAE2C54D8 RAX=2AB7CE6009A0 RBX=2AB7CE6009C0 RCX=FFC7FFE0 RDX=2AB8509726A8 R8=7FE41FF0 R9=2000 R10=2DA318A0 R11=2AB850959520 R12=2AB5EF97DD88 R13=2AB5EF97BD88 R14=2C0CE940 R15=2AB5EF97BD88 RIP= GS= FS= RSP=007367A0 EFlags=00210282 CS=0033 RBP=00BCDB00 ERR=0014 TRAPNO=000E OLDMASK= CR2= xmm0 4141414141414141 (f: 1094795648.00, d: 2.261635e+06) xmm1 4141414141414141 (f: 1094795648.00, d: 2.261635e+06) xmm2 4141414141414141 (f: 1094795648.00, d: 2.261635e+06) xmm3 4141414141414141 (f: 1094795648.00, d: 2.261635e+06) xmm4 4141414141414141 (f: 1094795648.00, d: 2.261635e+06) xmm5 4141414141414141 (f: 1094795648.00, d: 2.261635e+06) xmm6 4141414141414141 (f: 1094795648.00, d: 2.261635e+06) xmm7 f180c714f8e2a139 (f: 4175601920.00, d: -5.462583e+238) xmm8 428e8000 (f: 1116635136.00, d: 5.516911e-315) xmm9 (f: 0.00, d: 0.00e+00) xmm10 (f: 0.00, d: 0.00e+00) xmm11 (f: 0.00, d: 0.00e+00) xmm12 (f: 0.00, d: 0.00e+00) xmm13 (f: 0.00, d: 0.00e+00) xmm14 (f: 0.00, d: 0.00e+00) xmm15 (f: 0.00, d: 0.00e+00) Target=2_60_20140106_181350 (Linux 3.0.93-0.8.2_1.0502.8048-cray_ari_c) CPU=amd64 (48 logical CPUs) (0xfc0c5b000 RAM) --- Stack Backtrace --- (0x2C2FA122 [libj9prt26.so+0x13122]) (0x2C30779F [libj9prt26.so+0x2079f]) (0x2C2F9E6B [libj9prt26.so+0x12e6b]) (0x2C2F9F67 [libj9prt26.so+0x12f67]) (0x2C30779F [libj9prt26.so+0x2079f]) (0x2C2F9A8B [libj9prt26.so+0x12a8b]) (0x2BF52C9D [libj9vm26.so+0x1ac9d]) (0x2C30779F [libj9prt26.so+0x2079f]) (0x2BF52F56 [libj9vm26.so+0x1af56]) (0x2BF96CA0 [libj9vm26.so+0x5eca0]) --- JVMDUMP039I JVMDUMP032I Note, this still is with the framesize I modified in the last email thread? On Mon, Sep 15, 2014 at 2:12 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Try: rdd = sc.broadcast(matrix) Or rdd = sc.parallelize(matrix,100) // Just increase the number of slices, give it a try. Thanks Best Regards On Mon, Sep 15, 2014 at 2:18 PM, Chengi Liu chengi.liu...@gmail.com wrote: Hi Akhil, So with your config (specifically with set(spark.akka.frameSize , 1000)) , I see the error: org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 0:0 was 401970046 bytes which exceeds spark.akka.frameSize (10485760 bytes). Consider using broadcast variables for large values. at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) at org.apache.spark So, I changed set(spark.akka.frameSize , 1000) to set(spark.akka.frameSize , 10) but now I get the same error? y4j.protocol.Py4JJavaError: An error occurred while calling o28.trainKMeansModel. : org.apache.spark.SparkException: Job aborted due to stage failure: All masters are unresponsive! Giving up. at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGSched along with following: 14/09/15 01:44:11 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/09/15 01:44:21 INFO
Re: Low Level Kafka Consumer for Spark
Hi Dibyendu, I am a little confused about the need for rate limiting input from kafka. If the stream coming in from kafka has higher message/second rate than what a Spark job can process then it should simply build a backlog in Spark if the RDDs are cached on disk using persist(). Right? Thanks, Tim On Mon, Sep 15, 2014 at 4:33 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi Alon, No this will not be guarantee that same set of messages will come in same RDD. This fix just re-play the messages from last processed offset in same order. Again this is just a interim fix we needed to solve our use case . If you do not need this message re-play feature, just do not perform the ack ( Acknowledgement) call in the Driver code. Then the processed messages will not be written to ZK and hence replay will not happen. Regards, Dibyendu On Mon, Sep 15, 2014 at 4:48 PM, Alon Pe'er alo...@supersonicads.com wrote: Hi Dibyendu, Thanks for your great work! I'm new to Spark Streaming, so I just want to make sure I understand Driver failure issue correctly. In my use case, I want to make sure that messages coming in from Kafka are always broken into the same set of RDDs, meaning that if a set of messages are assigned to one RDD, and the Driver dies before this RDD is processed, then once the Driver recovers, the same set of messages are assigned to a single RDD, instead of arbitrarily repartitioning the messages across different RDDs. Does your Receiver guarantee this behavior, until the problem is fixed in Spark 1.2? Regards, Alon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p14233.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: PathFilter for newAPIHadoopFile?
In PySpark, I think you could enumerate all the valid files, and create RDD by newAPIHadoopFile(), then union them together. On Mon, Sep 15, 2014 at 5:49 AM, Eric Friedman eric.d.fried...@gmail.com wrote: I neglected to specify that I'm using pyspark. Doesn't look like these APIs have been bridged. Eric Friedman On Sep 14, 2014, at 11:02 PM, Nat Padmanabhan reachn...@gmail.com wrote: Hi Eric, Something along the lines of the following should work val fs = getFileSystem(...) // standard hadoop API call val filteredConcatenatedPaths = fs.listStatus(topLevelDirPath, pathFilter).map(_.getPath.toString).mkString(,) // pathFilter is an instance of org.apache.hadoop.fs.PathFilter val parquetRdd = sc.hadoopFile(filteredConcatenatedPaths, classOf[ParquetInputFormat[Something]], classOf[Void], classOf[SomeAvroType], getConfiguration(...)) You have to do some initializations on ParquetInputFormat such as AvroReadSetup/AvroWriteSupport etc but that you should be doing already I am guessing. Cheers, Nat On Sun, Sep 14, 2014 at 7:37 PM, Eric Friedman eric.d.fried...@gmail.com wrote: Hi, I have a directory structure with parquet+avro data in it. There are a couple of administrative files (.foo and/or _foo) that I need to ignore when processing this data or Spark tries to read them as containing parquet content, which they do not. How can I set a PathFilter on the FileInputFormat used to construct an RDD? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Write 1 RDD to multiple output paths in one go
Maybe we should provide an API like saveTextFilesByKey(path), could you create an JIRA for it ? There is one in DPark [1] actually. [1] https://github.com/douban/dpark/blob/master/dpark/rdd.py#L309 On Mon, Sep 15, 2014 at 7:08 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Any tips from anybody on how to do this in PySpark? (Or regular Spark, for that matter.) On Sat, Sep 13, 2014 at 1:25 PM, Nick Chammas nicholas.cham...@gmail.com wrote: Howdy doody Spark Users, I’d like to somehow write out a single RDD to multiple paths in one go. Here’s an example. I have an RDD of (key, value) pairs like this: a = sc.parallelize(['Nick', 'Nancy', 'Bob', 'Ben', 'Frankie']).keyBy(lambda x: x[0]) a.collect() [('N', 'Nick'), ('N', 'Nancy'), ('B', 'Bob'), ('B', 'Ben'), ('F', 'Frankie')] Now I want to write the RDD out to different paths depending on the keys, so that I have one output directory per distinct key. Each output directory could potentially have multiple part- files or whatever. So my output would be something like: /path/prefix/n [/part-1, /part-2, etc] /path/prefix/b [/part-1, /part-2, etc] /path/prefix/f [/part-1, /part-2, etc] How would you do that? I suspect I need to use saveAsNewAPIHadoopFile or saveAsHadoopFile along with the MultipleTextOutputFormat output format class, but I’m not sure how. By the way, there is a very similar question to this here on Stack Overflow. Nick View this message in context: Write 1 RDD to multiple output paths in one go Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
Hi Tim, I have not tried persist the RDD. Here are some discussion on Rate Limiting Spark Streaming is there in this thread. http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-rate-limiting-from-kafka-td8590.html There is a Pull Request https://github.com/apache/spark/pull/945/files to fix this Rate Limiting issue at BlockGenerator level. But while testing with heavy load, this fix did not solve my problem. So I had to have Rate Limiting built into Kafka Consumer. I will make it configurable soon. If this is not done, I can see Block are getting dropped which leads to Job failure. I have raised this in another thread .. https://mail.google.com/mail/u/1/?tab=wm#search/Serious/148650fd829cd239. But have not got any answer yet if this is a bug ( Block getting dropped and Job failed). Dib On Mon, Sep 15, 2014 at 10:33 PM, Tim Smith secs...@gmail.com wrote: Hi Dibyendu, I am a little confused about the need for rate limiting input from kafka. If the stream coming in from kafka has higher message/second rate than what a Spark job can process then it should simply build a backlog in Spark if the RDDs are cached on disk using persist(). Right? Thanks, Tim On Mon, Sep 15, 2014 at 4:33 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi Alon, No this will not be guarantee that same set of messages will come in same RDD. This fix just re-play the messages from last processed offset in same order. Again this is just a interim fix we needed to solve our use case . If you do not need this message re-play feature, just do not perform the ack ( Acknowledgement) call in the Driver code. Then the processed messages will not be written to ZK and hence replay will not happen. Regards, Dibyendu On Mon, Sep 15, 2014 at 4:48 PM, Alon Pe'er alo...@supersonicads.com wrote: Hi Dibyendu, Thanks for your great work! I'm new to Spark Streaming, so I just want to make sure I understand Driver failure issue correctly. In my use case, I want to make sure that messages coming in from Kafka are always broken into the same set of RDDs, meaning that if a set of messages are assigned to one RDD, and the Driver dies before this RDD is processed, then once the Driver recovers, the same set of messages are assigned to a single RDD, instead of arbitrarily repartitioning the messages across different RDDs. Does your Receiver guarantee this behavior, until the problem is fixed in Spark 1.2? Regards, Alon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p14233.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Need help with ThriftServer/Spark1.1.0
Hi ladies and gents, trying to get Thrift server up and running in an effort to replace Shark. My first attempt to run sbin/start-thriftserver resulted in: 14/09/15 17:09:05 ERROR TThreadPoolServer: Error occurred during processing of message. java.lang.RuntimeException: org.apache.thrift.transport.TTransportException: java.net.SocketException: Connection reset at org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:219) at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:189) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: org.apache.thrift.transport.TTransportException: java.net.SocketException: Connection reset at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:129) at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84) at org.apache.thrift.transport.TSaslTransport.receiveSaslMessage(TSaslTransport.java:178) at org.apache.thrift.transport.TSaslServerTransport.handleSaslStartMessage(TSaslServerTransport.java:125) at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:253) at org.apache.thrift.transport.TSaslServerTransport.open(TSaslServerTransport.java:41) at org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:216) ... 4 more Caused by: java.net.SocketException: Connection reset at java.net.SocketInputStream.read(SocketInputStream.java:196) at java.net.SocketInputStream.read(SocketInputStream.java:122) at java.io.BufferedInputStream.fill(BufferedInputStream.java:235) at java.io.BufferedInputStream.read1(BufferedInputStream.java:275) at java.io.BufferedInputStream.read(BufferedInputStream.java:334) at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:127) After turing logging levels up it seemed like this error is related to SASL and the SO advice was to turn it off via: propertynamehive.server2.authentication/namevalueNOSASL/value/property But I still have no luck: (this is the full command that gets run) java -cp /spark-1.1.0-bin-cdh4/conf:/spark-1.1.0-bin-cdh4/lib/spark-assembly-1.1.0-hadoop2.0.0-mr1-cdh4.2.0.jar:/spark-1.1.0-bin-cdh4/lib/datanucleus-core-3.2.2.jar:/spark-1.1.0-bin-cdh4/lib/datanucleus-rdbms-3.2.1.jar:/a/shark/spark-1.1.0-bin-cdh4/lib/datanucleus-api-jdo-3.2.1.jar:/hadoop/share/hadoop/mapreduce1//conf -XX:MaxPermSize=128m-Xms4012m -Xmx4012m org.apache.spark.deploy.SparkSubmit --class org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 --master spark://master-ip:7077 spark-internal --hiveconf hive.server.thrift.bind.host ip-to-bind 14/09/15 17:05:05 ERROR TThreadPoolServer: Error occurred during processing of message. java.lang.ClassCastException: org.apache.thrift.transport.TSocket cannot be cast to org.apache.thrift.transport.TSaslServerTransport at org.apache.hive.service.auth.TUGIContainingProcessor.process(TUGIContainingProcessor.java:53) at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:206) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Any idea what might be going on? I compiled w/ -Phive against the 1.1.0. hive-site.conf is the conf file we used previously. SparkSQL does work for me but does not have a lot of functionality I need. Any help appreciated -- I do acklnowledge this is likely more of a Hive question than spark...If there is a precompiled version of CDH4 that includes thrift-server I'd be happy to try that too... thanks again.
MLLib sparse vector
Hi All,I have transformed the data into following format: First column is user id, and then all the other columns are class ids. For a user only class ids that appear in this row have value 1 and others are 0. I need to crease a sparse vector from this. Does the API for creating a sparse vector that can directly support this format? User idProduct class ids 2622572 145447 162013421 28565 285556 293 455367261 130 3646167118806 183576 328651715 57671 57476
Example of Geoprocessing with Spark
Here an example of a working code that takes a csv with lat lon points and intersects with polygons of municipalities of Mexico, generating a new version of the file with new attributes. Do you think that could be improved? Thanks. The Code: import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.geoscript.feature._ import org.geoscript.geometry._ import org.geoscript.geometry.builder._ import com.vividsolutions.jts._ import org.geoscript.layer.Shapefile import org.geotools.feature.FeatureCollection import java.text._ import java.util._ object SimpleApp { def main(args: Array[String]){ val conf = new SparkConf().setAppName(Csv Clipper) val sc = new SparkContext(conf) val csvPath = hdfs://x01/user/acoronado/mov/movilidad.csv //70 Millions of rows val csv = sc.textFile(csvPath) val clipPoints = csv.map({line: String = val Array(usuario, lat, lon, date) = line.split(,).map(_.trim) val punto = Point(lon.toDouble,lat.toDouble) val existe = geoData.get.filter(f = f.geometry intersects punto) // Geospatial operation var cve_est = 0 var cve_mun = 0 var time = 0 if(!existe.isEmpty){ val f = existe.take(1) val ff = f.toList(0) cve_est = ff.getAttribute(1).toString //State Code cve_mun = ff.getAttribute(2).toString // Municipality Code time = (new SimpleDateFormat(-MM-dd'T'HH:mm:ss.SSSZ)).parse(date.replaceAll(Z$, +)).getTime().toString() } line+,+time+,+cve_est+,+cve_mun }) clipPoints.coalesce(1,true).saveAsTextFile(hdfs://m01/user/acoronado/mov/mov_all.csv) println(Spark Clip Exito!!!) } object geoData { private val estatal = Shapefile(/geoData/MunicipiosLatLon.shp) //This directory exist in all the nodes. private val estatalColl = estatal.getFeatures def get:FeatureCollection[org.geoscript.feature.Schema,org.geoscript.feature.Feature] = estatalColl } }
Dealing with Time Series Data
I have a use case for our data in HDFS that involves sorting chunks of data into time series format by a specific characteristic and doing computations from that. At large scale, what is the most efficient way to do this? Obviously, having the data sharded by that characteristic would make the performance significantly better, but are there good tools Spark can do to help us?
Re: How to initiate a shutdown of Spark Streaming context?
What we did for gracefully shutting down the spark streaming context is extend a Spark Web UI Tab and perform a SparkContext.SparkUI.attachTab(custom web ui). However, the custom scala Web UI extensions needs to be under the package org.apache.spark.ui to get around with the package access restrictions. Would it be possible that the SparkUI under SparkContext, and Spark Web UI packages exposed as public so that developers may be able to add customizations with their own tools? Thanks! On Tue, Sep 16, 2014 at 12:34 AM, stanley [via Apache Spark User List] ml-node+s1001560n14252...@n3.nabble.com wrote: Thank you. Would the following approaches to address this problem an overkills? a. create a ServerSocket in a different thread from the main thread that created the Spark StreamingContext, and listens to shutdown command there b. create a web service that wraps around the main thread that created the Spark StreamingContext, and responds to shutdown requests Does Spark Streaming already provide similar capabilities? Stanley -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-initiate-a-shutdown-of-Spark-Streaming-context-tp14092p14252.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com To unsubscribe from Apache Spark User List, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=amVvZmZyZXlsQGdtYWlsLmNvbXwxfDUzNTE3MDc2OQ== . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-initiate-a-shutdown-of-Spark-Streaming-context-tp14092p14277.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: scala 2.11?
Scala 2.11 work is under way in open pull requests though, so hopefully it will be in soon. Matei On September 15, 2014 at 9:48:42 AM, Mohit Jaggi (mohitja...@gmail.com) wrote: ah...thanks! On Mon, Sep 15, 2014 at 9:47 AM, Mark Hamstra m...@clearstorydata.com wrote: No, not yet. Spark SQL is using org.scalamacros:quasiquotes_2.10. On Mon, Sep 15, 2014 at 9:28 AM, Mohit Jaggi mohitja...@gmail.com wrote: Folks, I understand Spark SQL uses quasiquotes. Does that mean Spark has now moved to Scala 2.11? Mohit.
Re: scala 2.11?
Are we going to put 2.11 support into 1.1 or 1.0? Else will be in soon applies to the master development branch, but actually in the Spark 1.2.0 release won't occur until the second half of November at the earliest. On Mon, Sep 15, 2014 at 12:11 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Scala 2.11 work is under way in open pull requests though, so hopefully it will be in soon. Matei On September 15, 2014 at 9:48:42 AM, Mohit Jaggi (mohitja...@gmail.com) wrote: ah...thanks! On Mon, Sep 15, 2014 at 9:47 AM, Mark Hamstra m...@clearstorydata.com wrote: No, not yet. Spark SQL is using org.scalamacros:quasiquotes_2.10. On Mon, Sep 15, 2014 at 9:28 AM, Mohit Jaggi mohitja...@gmail.com wrote: Folks, I understand Spark SQL uses quasiquotes. Does that mean Spark has now moved to Scala 2.11? Mohit.
Re: Write 1 RDD to multiple output paths in one go
Davies, That’s pretty neat. I heard there was a pure Python clone of Spark out there—so you were one of the people behind it! I’ve created a JIRA issue about this. SPARK-3533: Add saveAsTextFileByKey() method to RDDs https://issues.apache.org/jira/browse/SPARK-3533 Sean, I think you might be able to get this working with a subclass of MultipleTextOutputFormat, which overrides generateFileNameForKeyValue, generateActualKey, etc. A bit of work for sure, but probably works. I’m looking at how to make this work in PySpark as of 1.1.0. The closest examples I can see of how to use the saveAsHadoop...() methods in this way are these two examples: HBase Output Format https://github.com/apache/spark/blob/cc14644460872efb344e8d895859d70213a40840/examples/src/main/python/hbase_outputformat.py#L60 and Avro Input Format https://github.com/apache/spark/blob/cc14644460872efb344e8d895859d70213a40840/examples/src/main/python/avro_inputformat.py#L73 Basically, I’m thinking I need to subclass MultipleTextOutputFormat and override some methods in a Scala file, and then reference that from Python? Like how the AvroWrapperToJavaConverter class is done? Seems pretty involved, but I’ll give it a shot if that’s the right direction to go in. Nick On Mon, Sep 15, 2014 at 1:08 PM, Davies Liu dav...@databricks.com wrote: Maybe we should provide an API like saveTextFilesByKey(path), could you create an JIRA for it ? There is one in DPark [1] actually. [1] https://github.com/douban/dpark/blob/master/dpark/rdd.py#L309 On Mon, Sep 15, 2014 at 7:08 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Any tips from anybody on how to do this in PySpark? (Or regular Spark, for that matter.) On Sat, Sep 13, 2014 at 1:25 PM, Nick Chammas nicholas.cham...@gmail.com wrote: Howdy doody Spark Users, I’d like to somehow write out a single RDD to multiple paths in one go. Here’s an example. I have an RDD of (key, value) pairs like this: a = sc.parallelize(['Nick', 'Nancy', 'Bob', 'Ben', 'Frankie']).keyBy(lambda x: x[0]) a.collect() [('N', 'Nick'), ('N', 'Nancy'), ('B', 'Bob'), ('B', 'Ben'), ('F', 'Frankie')] Now I want to write the RDD out to different paths depending on the keys, so that I have one output directory per distinct key. Each output directory could potentially have multiple part- files or whatever. So my output would be something like: /path/prefix/n [/part-1, /part-2, etc] /path/prefix/b [/part-1, /part-2, etc] /path/prefix/f [/part-1, /part-2, etc] How would you do that? I suspect I need to use saveAsNewAPIHadoopFile or saveAsHadoopFile along with the MultipleTextOutputFormat output format class, but I’m not sure how. By the way, there is a very similar question to this here on Stack Overflow. Nick View this message in context: Write 1 RDD to multiple output paths in one go Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: MLLib sparse vector
Hi Sameer, MLLib uses Breeze’s vector format under the hood. You can use that. http://www.scalanlp.org/api/breeze/index.html#breeze.linalg.SparseVector For example: import breeze.linalg.{DenseVector = BDV, SparseVector = BSV, Vector = BV} val numClasses = classes.distinct.count.toInt val userWithClassesAsSparseVector = rows.map(x = (x.userID, new BSV[Double](x.classIDs.sortWith(_ _), Seq.fill(x.classIDs.length)(1.0).toArray, numClasses).asInstanceOf[BV[Double]])) Chris On Sep 15, 2014, at 11:28 AM, Sameer Tilak ssti...@live.com wrote: Hi All, I have transformed the data into following format: First column is user id, and then all the other columns are class ids. For a user only class ids that appear in this row have value 1 and others are 0. I need to crease a sparse vector from this. Does the API for creating a sparse vector that can directly support this format? User idProduct class ids 2622572 145447 162013421 28565 285556 293 455367261 130 3646167118806 183576 328651715 57671 57476
Efficient way to sum multiple columns
Hi all, I have an RDD that contains around 50 columns. I need to sum each column, which I am doing by running it through a for loop, creating an array and running the sum function as follows: for (i - 0 until 10) yield { data.map(x = x(i)).sum } is their a better way to do this? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Efficient-way-to-sum-multiple-columns-tp14281.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: PathFilter for newAPIHadoopFile?
That's a good idea and one I had considered too. Unfortunately I'm not aware of an API in PySpark for enumerating paths on HDFS. Have I overlooked one? On Mon, Sep 15, 2014 at 10:01 AM, Davies Liu dav...@databricks.com wrote: In PySpark, I think you could enumerate all the valid files, and create RDD by newAPIHadoopFile(), then union them together. On Mon, Sep 15, 2014 at 5:49 AM, Eric Friedman eric.d.fried...@gmail.com wrote: I neglected to specify that I'm using pyspark. Doesn't look like these APIs have been bridged. Eric Friedman On Sep 14, 2014, at 11:02 PM, Nat Padmanabhan reachn...@gmail.com wrote: Hi Eric, Something along the lines of the following should work val fs = getFileSystem(...) // standard hadoop API call val filteredConcatenatedPaths = fs.listStatus(topLevelDirPath, pathFilter).map(_.getPath.toString).mkString(,) // pathFilter is an instance of org.apache.hadoop.fs.PathFilter val parquetRdd = sc.hadoopFile(filteredConcatenatedPaths, classOf[ParquetInputFormat[Something]], classOf[Void], classOf[SomeAvroType], getConfiguration(...)) You have to do some initializations on ParquetInputFormat such as AvroReadSetup/AvroWriteSupport etc but that you should be doing already I am guessing. Cheers, Nat On Sun, Sep 14, 2014 at 7:37 PM, Eric Friedman eric.d.fried...@gmail.com wrote: Hi, I have a directory structure with parquet+avro data in it. There are a couple of administrative files (.foo and/or _foo) that I need to ignore when processing this data or Spark tries to read them as containing parquet content, which they do not. How can I set a PathFilter on the FileInputFormat used to construct an RDD? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Efficient way to sum multiple columns
Please check the colStats method defined under mllib.stat.Statistics. -Xiangrui On Mon, Sep 15, 2014 at 1:00 PM, jamborta jambo...@gmail.com wrote: Hi all, I have an RDD that contains around 50 columns. I need to sum each column, which I am doing by running it through a for loop, creating an array and running the sum function as follows: for (i - 0 until 10) yield { data.map(x = x(i)).sum } is their a better way to do this? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Efficient-way-to-sum-multiple-columns-tp14281.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming union expected behaviour?
I am seeing the same exact behavior. Shrikar, did you get any response to your post? Varad -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-union-expected-behaviour-tp7206p14284.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
minPartitions for non-text files?
sc.textFile takes a minimum # of partitions to use. is there a way to get sc.newAPIHadoopFile to do the same? I know I can repartition() and get a shuffle. I'm wondering if there's a way to tell the underlying InputFormat (AvroParquet, in my case) how many partitions to use at the outset. What I'd really prefer is to get the partitions automatically defined based on the number of blocks.
Re: minPartitions for non-text files?
I think the reason is simply that there is no longer an explicit min-partitions argument for Hadoop InputSplits in the new Hadoop APIs. At least, I didn't see it when I glanced just now. However, you should be able to get the same effect by setting a Configuration property, and you can do so through the newAPIHadoopFile method. You set it as a suggested maximum split size rather than suggest minimum number of splits. Although I think the old config property mapred.max.split.size is still respected, you may try mapreduce.input.fileinputformat.split.maxsize instead, which appears to be the intended replacement in the new APIs. On Mon, Sep 15, 2014 at 9:35 PM, Eric Friedman eric.d.fried...@gmail.com wrote: sc.textFile takes a minimum # of partitions to use. is there a way to get sc.newAPIHadoopFile to do the same? I know I can repartition() and get a shuffle. I'm wondering if there's a way to tell the underlying InputFormat (AvroParquet, in my case) how many partitions to use at the outset. What I'd really prefer is to get the partitions automatically defined based on the number of blocks. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Weird aggregation results when reusing objects inside reduceByKey
I have a pretty simple scala spark aggregation job that is summing up number of occurrences of two types of events. I have run into situations where it seems to generate bad values that are clearly incorrect after reviewing the raw data. First I have a Record object which I use to do my aggregation: class Record (val PrimaryId: Int, val SubId: Int, var Event1Count: Int, var Event2Count: Int) extends Serializable { } Then once I have an RDD I do a reduce by key: val allAgr = all.map(x = (s${x.PrimaryId}-${x.SubId}, x)).reduceByKey { (l, r) = l.Event1Count= l.Event1Count+ r.Event1Count l.Event2Count= l.Event2Count+ r.Event2Count l }.map(x = x._2) The problem is that for some scenarios I get about 16 billion back for Event1Count, but the value of Event2Count looks fine. If I refactor my reduce by key function to actually produce a new object, it seems to work: val allAgr = all.map(x = (s${x.PrimaryId}-${x.SubId}, x)).reduceByKey { (l, r) = val n = new Record(l.PrimaryId, l.SubId, 0, 0 ) n.Event1Count= l.Event1Count+ r.Event1Count n.Event2Count= l.Event2Count+ r.Event2Count n }.map(x = x._2) This second option is clearly the safer way to go since there is no chance for changing values via reference. However, it doesn't make sense to me that this should fix it as in map reduce a once a object is reduced, it should never be reduced again (otherwise double-counting would happen). I dug into the source a little: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Aggregator.scala I didn't really see any obvious redflags and admittedly it is beyond my comprehension. Any ideas? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Weird-aggregation-results-when-reusing-objects-inside-reduceByKey-tp14287.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: vertex active/inactive feature in Pregel API ?
At 2014-09-15 16:25:04 +0200, Yifan LI iamyifa...@gmail.com wrote: I am wondering if the vertex active/inactive(corresponding the change of its value between two supersteps) feature is introduced in Pregel API of GraphX? Vertex activeness in Pregel is controlled by messages: if a vertex did not receive a message in the previous iteration, its vertex program will not run in the current iteration. Also, inactive vertices will not be able to send messages because by default the sendMsg function will only be run on edges where at least one of the adjacent vertices received a message. You can change this behavior -- see the documentation for the activeDirection parameter to Pregel.apply [1]. There is also an open pull request to make active vertex tracking more explicit by allowing vertices to vote to halt directly [2]. Ankur [1] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Pregel$ [2] https://github.com/apache/spark/pull/1217 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Weird aggregation results when reusing objects inside reduceByKey
It isn't a question of an item being reduced twice, but of when objects may be reused to represent other items. I don't think you have a guarantee that you can safely reuse the objects in this argument, but I'd also be interested if there was a case where this is guaranteed. For example I'm guessing this does work if you foldByKey() and supply your own starting value? On Mon, Sep 15, 2014 at 9:58 PM, kriskalish k...@kalish.net wrote: I have a pretty simple scala spark aggregation job that is summing up number of occurrences of two types of events. I have run into situations where it seems to generate bad values that are clearly incorrect after reviewing the raw data. First I have a Record object which I use to do my aggregation: class Record (val PrimaryId: Int, val SubId: Int, var Event1Count: Int, var Event2Count: Int) extends Serializable { } Then once I have an RDD I do a reduce by key: val allAgr = all.map(x = (s${x.PrimaryId}-${x.SubId}, x)).reduceByKey { (l, r) = l.Event1Count= l.Event1Count+ r.Event1Count l.Event2Count= l.Event2Count+ r.Event2Count l }.map(x = x._2) The problem is that for some scenarios I get about 16 billion back for Event1Count, but the value of Event2Count looks fine. If I refactor my reduce by key function to actually produce a new object, it seems to work: val allAgr = all.map(x = (s${x.PrimaryId}-${x.SubId}, x)).reduceByKey { (l, r) = val n = new Record(l.PrimaryId, l.SubId, 0, 0 ) n.Event1Count= l.Event1Count+ r.Event1Count n.Event2Count= l.Event2Count+ r.Event2Count n }.map(x = x._2) This second option is clearly the safer way to go since there is no chance for changing values via reference. However, it doesn't make sense to me that this should fix it as in map reduce a once a object is reduced, it should never be reduced again (otherwise double-counting would happen). I dug into the source a little: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Aggregator.scala I didn't really see any obvious redflags and admittedly it is beyond my comprehension. Any ideas? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Weird-aggregation-results-when-reusing-objects-inside-reduceByKey-tp14287.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: PathFilter for newAPIHadoopFile?
There is one way by do it in bash: hadoop fs -ls , maybe you could end up with a bash scripts to do the things. On Mon, Sep 15, 2014 at 1:01 PM, Eric Friedman eric.d.fried...@gmail.com wrote: That's a good idea and one I had considered too. Unfortunately I'm not aware of an API in PySpark for enumerating paths on HDFS. Have I overlooked one? On Mon, Sep 15, 2014 at 10:01 AM, Davies Liu dav...@databricks.com wrote: In PySpark, I think you could enumerate all the valid files, and create RDD by newAPIHadoopFile(), then union them together. On Mon, Sep 15, 2014 at 5:49 AM, Eric Friedman eric.d.fried...@gmail.com wrote: I neglected to specify that I'm using pyspark. Doesn't look like these APIs have been bridged. Eric Friedman On Sep 14, 2014, at 11:02 PM, Nat Padmanabhan reachn...@gmail.com wrote: Hi Eric, Something along the lines of the following should work val fs = getFileSystem(...) // standard hadoop API call val filteredConcatenatedPaths = fs.listStatus(topLevelDirPath, pathFilter).map(_.getPath.toString).mkString(,) // pathFilter is an instance of org.apache.hadoop.fs.PathFilter val parquetRdd = sc.hadoopFile(filteredConcatenatedPaths, classOf[ParquetInputFormat[Something]], classOf[Void], classOf[SomeAvroType], getConfiguration(...)) You have to do some initializations on ParquetInputFormat such as AvroReadSetup/AvroWriteSupport etc but that you should be doing already I am guessing. Cheers, Nat On Sun, Sep 14, 2014 at 7:37 PM, Eric Friedman eric.d.fried...@gmail.com wrote: Hi, I have a directory structure with parquet+avro data in it. There are a couple of administrative files (.foo and/or _foo) that I need to ignore when processing this data or Spark tries to read them as containing parquet content, which they do not. How can I set a PathFilter on the FileInputFormat used to construct an RDD? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: PathFilter for newAPIHadoopFile?
Or maybe you could give this one a try: https://labs.spotify.com/2013/05/07/snakebite/ On Mon, Sep 15, 2014 at 2:51 PM, Davies Liu dav...@databricks.com wrote: There is one way by do it in bash: hadoop fs -ls , maybe you could end up with a bash scripts to do the things. On Mon, Sep 15, 2014 at 1:01 PM, Eric Friedman eric.d.fried...@gmail.com wrote: That's a good idea and one I had considered too. Unfortunately I'm not aware of an API in PySpark for enumerating paths on HDFS. Have I overlooked one? On Mon, Sep 15, 2014 at 10:01 AM, Davies Liu dav...@databricks.com wrote: In PySpark, I think you could enumerate all the valid files, and create RDD by newAPIHadoopFile(), then union them together. On Mon, Sep 15, 2014 at 5:49 AM, Eric Friedman eric.d.fried...@gmail.com wrote: I neglected to specify that I'm using pyspark. Doesn't look like these APIs have been bridged. Eric Friedman On Sep 14, 2014, at 11:02 PM, Nat Padmanabhan reachn...@gmail.com wrote: Hi Eric, Something along the lines of the following should work val fs = getFileSystem(...) // standard hadoop API call val filteredConcatenatedPaths = fs.listStatus(topLevelDirPath, pathFilter).map(_.getPath.toString).mkString(,) // pathFilter is an instance of org.apache.hadoop.fs.PathFilter val parquetRdd = sc.hadoopFile(filteredConcatenatedPaths, classOf[ParquetInputFormat[Something]], classOf[Void], classOf[SomeAvroType], getConfiguration(...)) You have to do some initializations on ParquetInputFormat such as AvroReadSetup/AvroWriteSupport etc but that you should be doing already I am guessing. Cheers, Nat On Sun, Sep 14, 2014 at 7:37 PM, Eric Friedman eric.d.fried...@gmail.com wrote: Hi, I have a directory structure with parquet+avro data in it. There are a couple of administrative files (.foo and/or _foo) that I need to ignore when processing this data or Spark tries to read them as containing parquet content, which they do not. How can I set a PathFilter on the FileInputFormat used to construct an RDD? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Define the name of the outputs with Java-Spark.
Spark doesn't support MultipleOutput at this time. You can cache the parent RDD. Then create RDDs from it and save them separately. -Xiangrui On Fri, Sep 12, 2014 at 7:45 AM, Guillermo Ortiz konstt2...@gmail.com wrote: I would like to define the names of my output in Spark, I have a process which write many fails and I would like to name them, is it possible? I guess that it's not possible with saveAsText method. It would be something similar to the MultipleOutput of Hadoop. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Accuracy hit in classification with Spark
Thanks for the update! -Xiangrui On Sun, Sep 14, 2014 at 11:33 PM, jatinpreet jatinpr...@gmail.com wrote: Hi, I have been able to get the same accuracy with MLlib as Mahout's. The pre-processing phase of Mahout was the reason behind the accuracy mismatch. After studying and applying the same logic in my code, it worked like a charm. Thanks, Jatin - Novice Big Data Programmer -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Accuracy-hit-in-classification-with-Spark-tp13773p14221.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: minPartitions for non-text files?
That would be awesome, but doesn't seem to have any effect. In PySpark, I created a dict with that key and a numeric value, then passed it into newAPIHadoopFile as a value for the conf keyword. The returned RDD still has a single partition. On Mon, Sep 15, 2014 at 1:56 PM, Sean Owen so...@cloudera.com wrote: I think the reason is simply that there is no longer an explicit min-partitions argument for Hadoop InputSplits in the new Hadoop APIs. At least, I didn't see it when I glanced just now. However, you should be able to get the same effect by setting a Configuration property, and you can do so through the newAPIHadoopFile method. You set it as a suggested maximum split size rather than suggest minimum number of splits. Although I think the old config property mapred.max.split.size is still respected, you may try mapreduce.input.fileinputformat.split.maxsize instead, which appears to be the intended replacement in the new APIs. On Mon, Sep 15, 2014 at 9:35 PM, Eric Friedman eric.d.fried...@gmail.com wrote: sc.textFile takes a minimum # of partitions to use. is there a way to get sc.newAPIHadoopFile to do the same? I know I can repartition() and get a shuffle. I'm wondering if there's a way to tell the underlying InputFormat (AvroParquet, in my case) how many partitions to use at the outset. What I'd really prefer is to get the partitions automatically defined based on the number of blocks.
Re: MLLib sparse vector
Or you can use the factory method `Vectors.sparse`: val sv = Vectors.sparse(numProducts, productIds.map(x = (x, 1.0))) where numProducts should be the largest product id plus one. Best, Xiangrui On Mon, Sep 15, 2014 at 12:46 PM, Chris Gore cdg...@cdgore.com wrote: Hi Sameer, MLLib uses Breeze’s vector format under the hood. You can use that. http://www.scalanlp.org/api/breeze/index.html#breeze.linalg.SparseVector For example: import breeze.linalg.{DenseVector = BDV, SparseVector = BSV, Vector = BV} val numClasses = classes.distinct.count.toInt val userWithClassesAsSparseVector = rows.map(x = (x.userID, new BSV[Double](x.classIDs.sortWith(_ _), Seq.fill(x.classIDs.length)(1.0).toArray, numClasses).asInstanceOf[BV[Double]])) Chris On Sep 15, 2014, at 11:28 AM, Sameer Tilak ssti...@live.com wrote: Hi All, I have transformed the data into following format: First column is user id, and then all the other columns are class ids. For a user only class ids that appear in this row have value 1 and others are 0. I need to crease a sparse vector from this. Does the API for creating a sparse vector that can directly support this format? User idProduct class ids 2622572 145447 1620 13421 28565 285556 293 4553 67261 130 3646 1671 18806 183576 3286 51715 57671 57476 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: minPartitions for non-text files?
Heh, it's still just a suggestion to Hadoop I guess, not guaranteed. Is it a splittable format? for example, some compressed formats are not splittable and Hadoop has to process whole files at a time. I'm also not sure if this is something to do with pyspark, since the underlying Scala API takes a Configuration object rather than dictionary. On Mon, Sep 15, 2014 at 11:23 PM, Eric Friedman eric.d.fried...@gmail.com wrote: That would be awesome, but doesn't seem to have any effect. In PySpark, I created a dict with that key and a numeric value, then passed it into newAPIHadoopFile as a value for the conf keyword. The returned RDD still has a single partition. On Mon, Sep 15, 2014 at 1:56 PM, Sean Owen so...@cloudera.com wrote: I think the reason is simply that there is no longer an explicit min-partitions argument for Hadoop InputSplits in the new Hadoop APIs. At least, I didn't see it when I glanced just now. However, you should be able to get the same effect by setting a Configuration property, and you can do so through the newAPIHadoopFile method. You set it as a suggested maximum split size rather than suggest minimum number of splits. Although I think the old config property mapred.max.split.size is still respected, you may try mapreduce.input.fileinputformat.split.maxsize instead, which appears to be the intended replacement in the new APIs. On Mon, Sep 15, 2014 at 9:35 PM, Eric Friedman eric.d.fried...@gmail.com wrote: sc.textFile takes a minimum # of partitions to use. is there a way to get sc.newAPIHadoopFile to do the same? I know I can repartition() and get a shuffle. I'm wondering if there's a way to tell the underlying InputFormat (AvroParquet, in my case) how many partitions to use at the outset. What I'd really prefer is to get the partitions automatically defined based on the number of blocks. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Does Spark always wait for stragglers to finish running?
Hi, I'm running Spark tasks with speculation enabled. I'm noticing that Spark seems to wait in a given stage for all stragglers to finish, even though the speculated alternative might have finished sooner. Is that correct? Is there a way to indicate to Spark not to wait for stragglers to finish? Thanks, Pramod -- http://twitter.com/pramodbiligiri
Invalid signature file digest for Manifest main attributes with spark job built using maven
Hi All, I am trying to submit a spark job that I have built in maven using the following command: /usr/bin/spark-submit --deploy-mode client --class com.spark.TheMain --master local[1] /home/cloudera/myjar.jar 100 But I seem to be getting the following error: Exception in thread main java.lang.SecurityException: Invalid signature file digest for Manifest main attributes at sun.security.util.SignatureFileVerifier.processImpl(SignatureFileVerifier.java:286) at sun.security.util.SignatureFileVerifier.process(SignatureFileVerifier.java:239) at java.util.jar.JarVerifier.processEntry(JarVerifier.java:307) at java.util.jar.JarVerifier.update(JarVerifier.java:218) at java.util.jar.JarFile.initializeVerifier(JarFile.java:345) at java.util.jar.JarFile.getInputStream(JarFile.java:412) at sun.misc.URLClassPath$JarLoader$2.getInputStream(URLClassPath.java:775) at sun.misc.Resource.cachedInputStream(Resource.java:77) at sun.misc.Resource.getByteBuffer(Resource.java:160) at java.net.URLClassLoader.defineClass(URLClassLoader.java:436) at java.net.URLClassLoader.access$100(URLClassLoader.java:71) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:289) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Here is the pom file I am using to build the jar: project xmlns=http://maven.apache.org/POM/4.0.0; xmlns:xsi=http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation=http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd; modelVersion4.0.0/modelVersion groupIdcom.spark/groupId artifactIdmyjar/artifactId version0.0.1-SNAPSHOT/version name${project.artifactId}/name descriptionMy wonderfull scala app/description inceptionYear2010/inceptionYear licenses license nameMy License/name urlhttp:///url distributionrepo/distribution /license /licenses properties cdh.versioncdh5.1.0/cdh.version maven.compiler.source1.6/maven.compiler.source maven.compiler.target1.6/maven.compiler.target encodingUTF-8/encoding scala.tools.version2.10/scala.tools.version scala.version2.10.4/scala.version /properties repositories repository idscala-tools.org/id nameScala-tools Maven2 Repository/name urlhttps://oss.sonatype.org/content/repositories/snapshots//url /repository repository idmaven-hadoop/id nameHadoop Releases/name urlhttps://repository.cloudera.com/content/repositories/releases//url /repository repository idcloudera-repos/id nameCloudera Repos/name urlhttps://repository.cloudera.com/artifactory/cloudera-repos//url /repository /repositories pluginRepositories pluginRepository idscala-tools.org/id nameScala-tools Maven2 Repository/name urlhttps://oss.sonatype.org/content/repositories/snapshots//url /pluginRepository /pluginRepositories dependencies dependency groupIdorg.scala-lang/groupId artifactIdscala-library/artifactId version${scala.version}/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.0.0-${cdh.version}/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-tools_2.10/artifactId version1.0.0-${cdh.version}/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming-flume_2.10/artifactId version1.0.0-${cdh.version}/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming_2.10/artifactId version1.0.0-${cdh.version}/version /dependency dependency groupIdorg.apache.flume/groupId artifactIdflume-ng-sdk/artifactId version1.5.0-${cdh.version}/version exclusions exclusion groupIdio.netty/groupId artifactIdnetty/artifactId /exclusion /exclusions /dependency dependency groupIdorg.apache.flume/groupId artifactIdflume-ng-core/artifactId version1.5.0-${cdh.version}/version exclusions exclusion groupIdio.netty/groupId artifactIdnetty/artifactId /exclusion /exclusions /dependency dependency
Re: Invalid signature file digest for Manifest main attributes with spark job built using maven
This is more of a Java / Maven issue than Spark per se. I would use the shade plugin to remove signature files in your final META-INF/ dir. As Spark does, in its configuration: filters filter artifact*:*/artifact excludes excludeorg/datanucleus/**/exclude excludeMETA-INF/*.SF/exclude excludeMETA-INF/*.DSA/exclude excludeMETA-INF/*.RSA/exclude /excludes /filter /filters On Mon, Sep 15, 2014 at 11:33 PM, kpeng1 kpe...@gmail.com wrote: Hi All, I am trying to submit a spark job that I have built in maven using the following command: /usr/bin/spark-submit --deploy-mode client --class com.spark.TheMain --master local[1] /home/cloudera/myjar.jar 100 But I seem to be getting the following error: Exception in thread main java.lang.SecurityException: Invalid signature file digest for Manifest main attributes at sun.security.util.SignatureFileVerifier.processImpl(SignatureFileVerifier.java:286) at sun.security.util.SignatureFileVerifier.process(SignatureFileVerifier.java:239) at java.util.jar.JarVerifier.processEntry(JarVerifier.java:307) at java.util.jar.JarVerifier.update(JarVerifier.java:218) at java.util.jar.JarFile.initializeVerifier(JarFile.java:345) at java.util.jar.JarFile.getInputStream(JarFile.java:412) at sun.misc.URLClassPath$JarLoader$2.getInputStream(URLClassPath.java:775) at sun.misc.Resource.cachedInputStream(Resource.java:77) at sun.misc.Resource.getByteBuffer(Resource.java:160) at java.net.URLClassLoader.defineClass(URLClassLoader.java:436) at java.net.URLClassLoader.access$100(URLClassLoader.java:71) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:289) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Here is the pom file I am using to build the jar: project xmlns=http://maven.apache.org/POM/4.0.0; xmlns:xsi=http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation=http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd; modelVersion4.0.0/modelVersion groupIdcom.spark/groupId artifactIdmyjar/artifactId version0.0.1-SNAPSHOT/version name${project.artifactId}/name descriptionMy wonderfull scala app/description inceptionYear2010/inceptionYear licenses license nameMy License/name urlhttp:///url distributionrepo/distribution /license /licenses properties cdh.versioncdh5.1.0/cdh.version maven.compiler.source1.6/maven.compiler.source maven.compiler.target1.6/maven.compiler.target encodingUTF-8/encoding scala.tools.version2.10/scala.tools.version scala.version2.10.4/scala.version /properties repositories repository idscala-tools.org/id nameScala-tools Maven2 Repository/name urlhttps://oss.sonatype.org/content/repositories/snapshots//url /repository repository idmaven-hadoop/id nameHadoop Releases/name urlhttps://repository.cloudera.com/content/repositories/releases//url /repository repository idcloudera-repos/id nameCloudera Repos/name urlhttps://repository.cloudera.com/artifactory/cloudera-repos//url /repository /repositories pluginRepositories pluginRepository idscala-tools.org/id nameScala-tools Maven2 Repository/name urlhttps://oss.sonatype.org/content/repositories/snapshots//url /pluginRepository /pluginRepositories dependencies dependency groupIdorg.scala-lang/groupId artifactIdscala-library/artifactId version${scala.version}/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.0.0-${cdh.version}/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-tools_2.10/artifactId version1.0.0-${cdh.version}/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming-flume_2.10/artifactId version1.0.0-${cdh.version}/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming_2.10/artifactId version1.0.0-${cdh.version}/version /dependency dependency
Re: MLLib sparse vector
Probably worth noting that the factory methods in mllib create an object of type org.apache.spark.mllib.linalg.Vector which stores data in a similar format as Breeze vectors Chris On Sep 15, 2014, at 3:24 PM, Xiangrui Meng men...@gmail.com wrote: Or you can use the factory method `Vectors.sparse`: val sv = Vectors.sparse(numProducts, productIds.map(x = (x, 1.0))) where numProducts should be the largest product id plus one. Best, Xiangrui On Mon, Sep 15, 2014 at 12:46 PM, Chris Gore cdg...@cdgore.com wrote: Hi Sameer, MLLib uses Breeze’s vector format under the hood. You can use that. http://www.scalanlp.org/api/breeze/index.html#breeze.linalg.SparseVector For example: import breeze.linalg.{DenseVector = BDV, SparseVector = BSV, Vector = BV} val numClasses = classes.distinct.count.toInt val userWithClassesAsSparseVector = rows.map(x = (x.userID, new BSV[Double](x.classIDs.sortWith(_ _), Seq.fill(x.classIDs.length)(1.0).toArray, numClasses).asInstanceOf[BV[Double]])) Chris On Sep 15, 2014, at 11:28 AM, Sameer Tilak ssti...@live.com wrote: Hi All, I have transformed the data into following format: First column is user id, and then all the other columns are class ids. For a user only class ids that appear in this row have value 1 and others are 0. I need to crease a sparse vector from this. Does the API for creating a sparse vector that can directly support this format? User idProduct class ids 2622572 145447 1620 13421 28565 285556 293 4553 67261 130 3646 1671 18806 183576 3286 51715 57671 57476 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Convert GraphX Graph to Sparse Matrix
Hi everyone, I'm looking to implement Markov algorithms in GraphX and I'm wondering if it's already possible to auto-convert the Graph into a Sparse Double Matrix? I've seen this implemented in other graphs before, namely JUNG, but still familiarizing myself with GraphX. Example: https://code.google.com/p/jung/source/browse/branches/guava/jung/jung-algorithms/src/main/java/edu/uci/ics/jung/algorithms/matrix/GraphMatrixOperations.java#181 This is specifically for doing operations such as Mean First Passage Time calculations. If it doesn't yet exist, are the Matrices implemented in MLLib going to be enough for this? Or will I need to go the way of Breeze or Colt? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Convert-GraphX-Graph-to-Sparse-Matrix-tp14303.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Does Spark always wait for stragglers to finish running?
There is a parameter spark.speculation that is turned off by default. Look at the configuration doc: http://spark.apache.org/docs/latest/configuration.html From: Pramod Biligiri pramodbilig...@gmail.commailto:pramodbilig...@gmail.com Date: Monday, September 15, 2014 at 3:30 PM To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Does Spark always wait for stragglers to finish running? Hi, I'm running Spark tasks with speculation enabled. I'm noticing that Spark seems to wait in a given stage for all stragglers to finish, even though the speculated alternative might have finished sooner. Is that correct? Is there a way to indicate to Spark not to wait for stragglers to finish? Thanks, Pramod -- http://twitter.com/pramodbiligiri
Re: minPartitions for non-text files?
Yes, it's AvroParquetInputFormat, which is splittable. If I force a repartitioning, it works. If I don't, spark chokes on my not-terribly-large 250Mb files. PySpark's documentation says that the dictionary is turned into a Configuration object. @param conf: Hadoop configuration, passed in as a dict (None by default) On Mon, Sep 15, 2014 at 3:26 PM, Sean Owen so...@cloudera.com wrote: Heh, it's still just a suggestion to Hadoop I guess, not guaranteed. Is it a splittable format? for example, some compressed formats are not splittable and Hadoop has to process whole files at a time. I'm also not sure if this is something to do with pyspark, since the underlying Scala API takes a Configuration object rather than dictionary. On Mon, Sep 15, 2014 at 11:23 PM, Eric Friedman eric.d.fried...@gmail.com wrote: That would be awesome, but doesn't seem to have any effect. In PySpark, I created a dict with that key and a numeric value, then passed it into newAPIHadoopFile as a value for the conf keyword. The returned RDD still has a single partition. On Mon, Sep 15, 2014 at 1:56 PM, Sean Owen so...@cloudera.com wrote: I think the reason is simply that there is no longer an explicit min-partitions argument for Hadoop InputSplits in the new Hadoop APIs. At least, I didn't see it when I glanced just now. However, you should be able to get the same effect by setting a Configuration property, and you can do so through the newAPIHadoopFile method. You set it as a suggested maximum split size rather than suggest minimum number of splits. Although I think the old config property mapred.max.split.size is still respected, you may try mapreduce.input.fileinputformat.split.maxsize instead, which appears to be the intended replacement in the new APIs. On Mon, Sep 15, 2014 at 9:35 PM, Eric Friedman eric.d.fried...@gmail.com wrote: sc.textFile takes a minimum # of partitions to use. is there a way to get sc.newAPIHadoopFile to do the same? I know I can repartition() and get a shuffle. I'm wondering if there's a way to tell the underlying InputFormat (AvroParquet, in my case) how many partitions to use at the outset. What I'd really prefer is to get the partitions automatically defined based on the number of blocks.
apply at Option.scala:120 callback in Spark 1.1, but no user code involved?
In Spark 1.1, I'm seeing tasks with callbacks that don't involve my code at all! I'd seen something like this before in 1.0.0, but the behavior seems to be back apply at Option.scala:120 http://localhost:4040/stages/stage?id=52attempt=0 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) scala.Option.getOrElse(Option.scala:120) org.apache.spark.rdd.RDD.partitions(RDD.scala:202) org.apache.spark.rdd.FilteredRDD.getPartitions(FilteredRDD.scala:29) org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) scala.Option.getOrElse(Option.scala:120) org.apache.spark.rdd.RDD.partitions(RDD.scala:202) org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) scala.Option.getOrElse(Option.scala:120) org.apache.spark.rdd.RDD.partitions(RDD.scala:202) org.apache.spark.rdd.FilteredRDD.getPartitions(FilteredRDD.scala:29) org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) scala.Option.getOrElse(Option.scala:120) org.apache.spark.rdd.RDD.partitions(RDD.scala:202) org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) Ideas on what might be going on?
Re: Does Spark always wait for stragglers to finish running?
I'm already running with speculation set to true and the speculated tasks are launching, but the issue I'm observing is that Spark does not kill the long running task even if the shorter alternative has finished successfully. Therefore the overall turnaround time is still the same as without speculation. Pramod On Mon, Sep 15, 2014 at 4:22 PM, Du Li l...@yahoo-inc.com wrote: There is a parameter spark.speculation that is turned off by default. Look at the configuration doc: http://spark.apache.org/docs/latest/configuration.html From: Pramod Biligiri pramodbilig...@gmail.com Date: Monday, September 15, 2014 at 3:30 PM To: user@spark.apache.org user@spark.apache.org Subject: Does Spark always wait for stragglers to finish running? Hi, I'm running Spark tasks with speculation enabled. I'm noticing that Spark seems to wait in a given stage for all stragglers to finish, even though the speculated alternative might have finished sooner. Is that correct? Is there a way to indicate to Spark not to wait for stragglers to finish? Thanks, Pramod -- http://twitter.com/pramodbiligiri
Re: scala 2.11?
I think the current plan is to put it in 1.2.0, so that's what I meant by soon. It might be possible to backport it too, but I'd be hesitant to do that as a maintenance release on 1.1.x and 1.0.x since it would require nontrivial changes to the build that could break things on Scala 2.10. Matei On September 15, 2014 at 12:19:04 PM, Mark Hamstra (m...@clearstorydata.com) wrote: Are we going to put 2.11 support into 1.1 or 1.0? Else will be in soon applies to the master development branch, but actually in the Spark 1.2.0 release won't occur until the second half of November at the earliest. On Mon, Sep 15, 2014 at 12:11 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Scala 2.11 work is under way in open pull requests though, so hopefully it will be in soon. Matei On September 15, 2014 at 9:48:42 AM, Mohit Jaggi (mohitja...@gmail.com) wrote: ah...thanks! On Mon, Sep 15, 2014 at 9:47 AM, Mark Hamstra m...@clearstorydata.com wrote: No, not yet. Spark SQL is using org.scalamacros:quasiquotes_2.10. On Mon, Sep 15, 2014 at 9:28 AM, Mohit Jaggi mohitja...@gmail.com wrote: Folks, I understand Spark SQL uses quasiquotes. Does that mean Spark has now moved to Scala 2.11? Mohit.
SPARK_MASTER_IP
Hi Koert, I work on Bigtop and CDH packaging and you are right, based on my quick glance, it doesn't seem to be used. Mark From: Koert Kuipers ko...@tresata.com Date: Sat, Sep 13, 2014 at 7:03 AM Subject: SPARK_MASTER_IP To: user@spark.apache.org a grep for SPARK_MASTER_IP shows that sbin/start-master.sh and sbin/start-slaves.sh are the only ones that use it. yet for example in CDH5 the spark-master is started from /etc/init.d/spark-master by running bin/spark-class. does that means SPARK_MASTER_IP is simply ignored? it looks like that to me. it is supposed to?
Re: Does Spark always wait for stragglers to finish running?
It's true that it does not send a kill command right now -- we should probably add that. This code was written before tasks were killable AFAIK. However, the *job* should still finish while a speculative task is running as far as I know, and it will just leave that task behind. Matei On September 15, 2014 at 4:51:59 PM, Pramod Biligiri (pramodbilig...@gmail.com) wrote: I'm already running with speculation set to true and the speculated tasks are launching, but the issue I'm observing is that Spark does not kill the long running task even if the shorter alternative has finished successfully. Therefore the overall turnaround time is still the same as without speculation. Pramod On Mon, Sep 15, 2014 at 4:22 PM, Du Li l...@yahoo-inc.com wrote: There is a parameter spark.speculation that is turned off by default. Look at the configuration doc: http://spark.apache.org/docs/latest/configuration.html From: Pramod Biligiri pramodbilig...@gmail.com Date: Monday, September 15, 2014 at 3:30 PM To: user@spark.apache.org user@spark.apache.org Subject: Does Spark always wait for stragglers to finish running? Hi, I'm running Spark tasks with speculation enabled. I'm noticing that Spark seems to wait in a given stage for all stragglers to finish, even though the speculated alternative might have finished sooner. Is that correct? Is there a way to indicate to Spark not to wait for stragglers to finish? Thanks, Pramod -- http://twitter.com/pramodbiligiri
Spark 1.1 / cdh4 stuck using old hadoop client?
Dear List, I'm having trouble getting Spark 1.1 to use the Hadoop 2 API for reading SequenceFiles. In particular, I'm seeing: Exception in thread main org.apache.hadoop.ipc.RemoteException: Server IPC version 7 cannot communicate with client version 4 at org.apache.hadoop.ipc.Client.call(Client.java:1070) at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225) at com.sun.proxy.$Proxy7.getProtocolVersion(Unknown Source) ... When invoking JavaSparkContext#newAPIHadoopFile(). (With args validSequenceFileURI, SequenceFileInputFormat.class, Text.class, BytesWritable.class, new Job().getConfiguration() -- Pretty close to the unit test here: https://github.com/apache/spark/blob/f0f1ba09b195f23f0c89af6fa040c9e01dfa8951/core/src/test/java/org/apache/spark/JavaAPISuite.java#L916 ) This error indicates to me that Spark is using an old hadoop client to do reads. Oddly I'm able to do /writes/ ok, i.e. I'm able to write via JavaPairRdd#saveAsNewAPIHadoopFile() to my hdfs cluster. Do I need to explicitly build spark for modern hadoop?? I previously had an hdfs cluster running hadoop 2.3.0 and I was getting a similar error (server is using version 9, client is using version 4). I'm using Spark 1.1 cdh4 as well as hadoop cdh4 from the links posted on spark's site: * http://d3kbcqa49mib13.cloudfront.net/spark-1.1.0-bin-cdh4.tgz * http://d3kbcqa49mib13.cloudfront.net/hadoop-2.0.0-cdh4.2.0.tar.gz What distro of hadoop is used at Data Bricks? Are there distros of Spark 1.1 and hadoop that should work together out-of-the-box? (Previously I had Spark 1.0.0 and Hadoop 2.3 working fine..) Thanks for any help anybody can give me here! -Paul - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.1 / cdh4 stuck using old hadoop client?
Hi Paul. I would recommend building your own 1.1.0 distribution. ./make-distribution.sh --name hadoop-personal-build-2.4 --tgz -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests I downloaded the Pre-build for Hadoop 2.4 binary, and it had this strange behavior where spark-submit --master yarn-cluster ... will work, but spark-submit --master yarn-client ... will fail. But on the personal build obtained from the command above, both will then work. -Christian On Sep 15, 2014, at 6:28 PM, Paul Wais pw...@yelp.com wrote: Dear List, I'm having trouble getting Spark 1.1 to use the Hadoop 2 API for reading SequenceFiles. In particular, I'm seeing: Exception in thread main org.apache.hadoop.ipc.RemoteException: Server IPC version 7 cannot communicate with client version 4 at org.apache.hadoop.ipc.Client.call(Client.java:1070) at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225) at com.sun.proxy.$Proxy7.getProtocolVersion(Unknown Source) ... When invoking JavaSparkContext#newAPIHadoopFile(). (With args validSequenceFileURI, SequenceFileInputFormat.class, Text.class, BytesWritable.class, new Job().getConfiguration() -- Pretty close to the unit test here: https://github.com/apache/spark/blob/f0f1ba09b195f23f0c89af6fa040c9e01dfa8951/core/src/test/java/org/apache/spark/JavaAPISuite.java#L916 ) This error indicates to me that Spark is using an old hadoop client to do reads. Oddly I'm able to do /writes/ ok, i.e. I'm able to write via JavaPairRdd#saveAsNewAPIHadoopFile() to my hdfs cluster. Do I need to explicitly build spark for modern hadoop?? I previously had an hdfs cluster running hadoop 2.3.0 and I was getting a similar error (server is using version 9, client is using version 4). I'm using Spark 1.1 cdh4 as well as hadoop cdh4 from the links posted on spark's site: * http://d3kbcqa49mib13.cloudfront.net/spark-1.1.0-bin-cdh4.tgz * http://d3kbcqa49mib13.cloudfront.net/hadoop-2.0.0-cdh4.2.0.tar.gz What distro of hadoop is used at Data Bricks? Are there distros of Spark 1.1 and hadoop that should work together out-of-the-box? (Previously I had Spark 1.0.0 and Hadoop 2.3 working fine..) Thanks for any help anybody can give me here! -Paul - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: scala 2.11?
Okay, that's consistent with what I was expecting. Thanks, Matei. On Mon, Sep 15, 2014 at 5:20 PM, Matei Zaharia matei.zaha...@gmail.com wrote: I think the current plan is to put it in 1.2.0, so that's what I meant by soon. It might be possible to backport it too, but I'd be hesitant to do that as a maintenance release on 1.1.x and 1.0.x since it would require nontrivial changes to the build that could break things on Scala 2.10. Matei On September 15, 2014 at 12:19:04 PM, Mark Hamstra ( m...@clearstorydata.com) wrote: Are we going to put 2.11 support into 1.1 or 1.0? Else will be in soon applies to the master development branch, but actually in the Spark 1.2.0 release won't occur until the second half of November at the earliest. On Mon, Sep 15, 2014 at 12:11 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Scala 2.11 work is under way in open pull requests though, so hopefully it will be in soon. Matei On September 15, 2014 at 9:48:42 AM, Mohit Jaggi (mohitja...@gmail.com) wrote: ah...thanks! On Mon, Sep 15, 2014 at 9:47 AM, Mark Hamstra m...@clearstorydata.com wrote: No, not yet. Spark SQL is using org.scalamacros:quasiquotes_2.10. On Mon, Sep 15, 2014 at 9:28 AM, Mohit Jaggi mohitja...@gmail.com wrote: Folks, I understand Spark SQL uses quasiquotes. Does that mean Spark has now moved to Scala 2.11? Mohit.
RE: SparkSQL 1.1 hang when DROP or LOAD
What's your Spark / Hadoop version? And also the hive-site.xml? Most of case like that caused by incompatible Hadoop client jar and the Hadoop cluster. -Original Message- From: linkpatrickliu [mailto:linkpatrick...@live.com] Sent: Monday, September 15, 2014 2:35 PM To: u...@spark.incubator.apache.org Subject: SparkSQL 1.1 hang when DROP or LOAD I started sparkSQL thrift server: sbin/start-thriftserver.sh Then I use beeline to connect to it: bin/beeline !connect jdbc:hive2://localhost:1 op1 op1 I have created a database for user op1. create database dw_op1; And grant all privileges to user op1; grant all on database dw_op1 to user op1; Then I create a table: create tabel src(key int, value string) Now, I want to load data into this table: load data inpath kv1.txt into table src; (kv1.txt is located in the /user/op1 directory in hdfs) However, the client will hang... The log in the thrift server: 14/09/15 14:21:25 INFO Driver: PERFLOG method=acquireReadWriteLocks Then I ctrl-C to stop the beeline client, and restart the beelien client. Now I want to drop the table src in dw_op1; use dw_op1 drop table src Then, the beeline client is hanging again.. The log in the thrift server: 14/09/15 14:23:27 INFO Driver: PERFLOG method=acquireReadWriteLocks Anyone can help on this? Many thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-1-1-hang-when-DROP-or-LOAD-tp14222.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
About SpakSQL OR MLlib
Hi: I have a dataset ,the struct [id,driverAge,TotalKiloMeter ,Emissions ,date,KiloMeter ,fuel], and the data like this: [1-980,34,221926,9,2005-2-8,123,14] [1-981,49,271321,15,2005-2-8,181,82] [1-982,36,189149,18,2005-2-8,162,51] [1-983,51,232753,5,2005-2-8,106,92] [1-984,56,45338,8,2005-2-8,156,98] [1-985,45,132060,4,2005-2-8,179,98] [1-986,40,15751,5,2005-2-8,149,77] [1-987,36,167930,17,2005-2-8,121,87] [1-988,53,44949,4,2005-2-8,195,72] [1-989,34,252867,5,2005-2-8,181,86] [1-990,53,152858,11,2005-2-8,130,43] [1-991,40,126831,11,2005-2-8,126,47] …… now ,my requirments is group by driverAge, five is a step,like 20~25 is a group,26~30 is a group? how should i do ? who can give some code? boyingk...@163.com
Re: SPARK_MASTER_IP
hey mark, you think that this is on purpose, or is it an omission? thanks, koert On Mon, Sep 15, 2014 at 8:32 PM, Mark Grover m...@apache.org wrote: Hi Koert, I work on Bigtop and CDH packaging and you are right, based on my quick glance, it doesn't seem to be used. Mark From: Koert Kuipers ko...@tresata.com Date: Sat, Sep 13, 2014 at 7:03 AM Subject: SPARK_MASTER_IP To: user@spark.apache.org a grep for SPARK_MASTER_IP shows that sbin/start-master.sh and sbin/start-slaves.sh are the only ones that use it. yet for example in CDH5 the spark-master is started from /etc/init.d/spark-master by running bin/spark-class. does that means SPARK_MASTER_IP is simply ignored? it looks like that to me. it is supposed to?
Re: About SpakSQL OR MLlib
case class Car(id:String,age:Int,tkm:Int,emissions:Int,date:Date, km:Int, fuel:Int) 1. Create an PairedRDD of (age,Car) tuples (pairedRDD) 2. Create a new function fc //returns the interval lower and upper bound def fc(x:Int, interval:Int) : (Int,Int) = { val floor = x - (x%interval) val ceil = floor + interval (floor,ceil) } 3. do a groupBy on this RDD (step 1) by passing the function fc val myrdd = pairedRDD.groupBy( x = fun(x.age, 5) ) On Mon, Sep 15, 2014 at 11:38 PM, boyingk...@163.com boyingk...@163.com wrote: Hi: I have a dataset ,the struct [id,driverAge,TotalKiloMeter ,Emissions ,date,KiloMeter ,fuel], and the data like this: [1-980,34,221926,9,2005-2-8,123,14] [1-981,49,271321,15,2005-2-8,181,82] [1-982,36,189149,18,2005-2-8,162,51] [1-983,51,232753,5,2005-2-8,106,92] [1-984,56,45338,8,2005-2-8,156,98] [1-985,45,132060,4,2005-2-8,179,98] [1-986,40,15751,5,2005-2-8,149,77] [1-987,36,167930,17,2005-2-8,121,87] [1-988,53,44949,4,2005-2-8,195,72] [1-989,34,252867,5,2005-2-8,181,86] [1-990,53,152858,11,2005-2-8,130,43] [1-991,40,126831,11,2005-2-8,126,47] …… now ,my requirments is group by driverAge, five is a step,like 20~25 is a group,26~30 is a group? how should i do ? who can give some code? -- boyingk...@163.com
Re: NullWritable not serializable
Hi Matei, Thanks for your reply. The Writable classes have never been serializable and this is why it is weird. I did try as you suggested to map the Writables to integers and strings. It didn’t pass, either. Similar exceptions were thrown except that the messages became IntWritable, Text are not serializable. The reason is in the implicits defined in the SparkContext object that convert those values into their corresponding Writable classes before saving the data in sequence file. My original code was actual some test cases to try out SequenceFile related APIs. The tests all passed when the spark version was specified as 1.0.2. But this one failed after I changed the spark version to 1.1.0 the new release, nothing else changed. In addition, it failed when I called rdd2.collect(), take(1), and first(). But it worked fine when calling rdd2.count(). As you can see, count() does not need to serialize and ship data while the other three methods do. Do you recall any difference between spark 1.0 and 1.1 that might cause this problem? Thanks, Du From: Matei Zaharia matei.zaha...@gmail.commailto:matei.zaha...@gmail.com Date: Friday, September 12, 2014 at 9:10 PM To: Du Li l...@yahoo-inc.com.invalidmailto:l...@yahoo-inc.com.invalid, user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org, d...@spark.apache.orgmailto:d...@spark.apache.org d...@spark.apache.orgmailto:d...@spark.apache.org Subject: Re: NullWritable not serializable Hi Du, I don't think NullWritable has ever been serializable, so you must be doing something differently from your previous program. In this case though, just use a map() to turn your Writables to serializable types (e.g. null and String). Matie On September 12, 2014 at 8:48:36 PM, Du Li (l...@yahoo-inc.com.invalidmailto:l...@yahoo-inc.com.invalid) wrote: Hi, I was trying the following on spark-shell (built with apache master and hadoop 2.4.0). Both calling rdd2.collect and calling rdd3.collect threw java.io.NotSerializableException: org.apache.hadoop.io.NullWritable. I got the same problem in similar code of my app which uses the newly released Spark 1.1.0 under hadoop 2.4.0. Previously it worked fine with spark 1.0.2 under either hadoop 2.40 and 0.23.10. Anybody knows what caused the problem? Thanks, Du import org.apache.hadoop.io.{NullWritable, Text} val rdd = sc.textFile(README.md) val res = rdd.map(x = (NullWritable.get(), new Text(x))) res.saveAsSequenceFile(./test_data) val rdd2 = sc.sequenceFile(./test_data, classOf[NullWritable], classOf[Text]) rdd2.collect val rdd3 = sc.sequenceFile[NullWritable,Text](./test_data) rdd3.collect
RE: SparkSQL 1.1 hang when DROP or LOAD
Hi, Hao Cheng, Here is the Spark\Hadoop version: Spark version = 1.1.0 Hadoop version = 2.0.0-cdh4.6.0 And hive-site.xml: configuration property namefs.default.name/name valuehdfs://ns/value /property property namedfs.nameservices/name valuens/value /property property namedfs.ha.namenodes.ns/name valuemachine01,machine02/value /property property namedfs.namenode.rpc-address.ns.machine01/name valuemachine01:54310/value /property property namedfs.namenode.rpc-address.ns.machine02/name valuemachine02:54310/value /property property namedfs.client.failover.proxy.provider.ns/name valueorg.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider/value /property property namejavax.jdo.option.ConnectionURL/name valuejdbc:mysql://localhost:3306/metastore/value descriptionJDBC connect string for a JDBC metastore/description /property property namejavax.jdo.option.ConnectionDriverName/name valuecom.mysql.jdbc.Driver/value descriptionDriver class name for a JDBC metastore/description /property property namejavax.jdo.option.ConnectionUserName/name valuehive_user/value /property property namejavax.jdo.option.ConnectionPassword/name valuehive_123/value /property property namedatanucleus.autoCreateSchema/name valuefalse/value /property property namedatanucleus.autoCreateTables/name valuetrue/value /property property namedatanucleus.fixedDatastore/name valuefalse/value /property property namehive.support.concurrency/name descriptionEnable Hive's Table Lock Manager Service/description valuetrue/value /property property namehive.zookeeper.quorum/name valuemachine01,machine02,machine03/value descriptionZookeeper quorum used by Hive's Table Lock Manager/description /property property namehive.metastore.warehouse.dir/name value/user/hive/warehouse/value descriptionHive warehouse directory/description /property property namemapred.job.tracker/name valuemachine01:8032/value /property property nameio.compression.codecs/name valueorg.apache.hadoop.io.compress.SnappyCodec/value /property property namemapreduce.output.fileoutputformat.compress.codec/name valueorg.apache.hadoop.io.compress.SnappyCodec/value /property property namemapreduce.output.fileoutputformat.compress.type/name valueBLOCK/value /property property namehive.exec.show.job.failure.debug.info/name valuetrue/value description If a job fails, whether to provide a link in the CLI to the task with the most failures, along with debugging hints if applicable. /description /property property namehive.hwi.listen.host/name value0.0.0.0/value descriptionThis is the host address the Hive Web Interface will listen on/description /property property namehive.hwi.listen.port/name value/value descriptionThis is the port the Hive Web Interface will listen on/description /property property namehive.hwi.war.file/name value/lib/hive-hwi-0.10.0-cdh4.2.0.war/value descriptionThis is the WAR file with the jsp content for Hive Web Interface/description /property property namehive.aux.jars.path/name valuefile:///usr/lib/hive/lib/hive-hbase-handler-0.10.0-cdh4.6.0.jar,file:///usr/lib/hbase/hbase-0.94.15-cdh4.6.0-security.jar,file:///usr/lib/zookeeper/zookeeper.jar/value /property property namehbase.zookeeper.quorum/name valuemachine01,machine02,machine03/value /property property namehive.cli.print.header/name valuetrue/value /property property namehive.metastore.execute.setugi/name valuetrue/value descriptionIn unsecure mode, setting this property to true will cause the metastore to execute DFS operations using the client's reported user and group permissions. Note that this property must be set on both the client and server sides. Further note that its best effort. If client sets its to true and server sets it to false, client setting will be ignored./description /property property namehive.security.authorization.enabled/name valuetrue/value descriptionenable or disable the hive client authorization/description /property property namehive.metastore.authorization.storage.checks/name valuetrue/value /property property namehive.security.authorization.createtable.owner.grants/name valueALL/value descriptionthe privileges automatically granted to the owner whenever a table gets created. An example like select,drop will grant select and drop privilege to the owner of the table/description /property /configuration -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-1-1-hang-when-DROP-or-LOAD-tp14222p14320.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
RE: SparkSQL 1.1 hang when DROP or LOAD
Hi, Hao Cheng, Here is the Spark\Hadoop version: Spark version = 1.1.0 Hadoop version = 2.0.0-cdh4.6.0 And hive-site.xml: configuration property namefs.default.name/name valuehdfs://ns/value /property property namedfs.nameservices/name valuens/value /property property namedfs.ha.namenodes.ns/name valuemachine01,machine02/value /property property namedfs.namenode.rpc-address.ns.machine01/name valuemachine01:54310/value /property property namedfs.namenode.rpc-address.ns.machine02/name valuemachine02:54310/value /property property namedfs.client.failover.proxy.provider.ns/name valueorg.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider/value /property property namejavax.jdo.option.ConnectionURL/name valuejdbc:mysql://localhost:3306/metastore/value descriptionJDBC connect string for a JDBC metastore/description /property property namejavax.jdo.option.ConnectionDriverName/name valuecom.mysql.jdbc.Driver/value descriptionDriver class name for a JDBC metastore/description /property property namejavax.jdo.option.ConnectionUserName/name valuehive_user/value /property property namejavax.jdo.option.ConnectionPassword/name valuehive_123/value /property property namedatanucleus.autoCreateSchema/name valuefalse/value /property property namedatanucleus.autoCreateTables/name valuetrue/value /property property namedatanucleus.fixedDatastore/name valuefalse/value /property property namehive.support.concurrency/name descriptionEnable Hive's Table Lock Manager Service/description valuetrue/value /property property namehive.zookeeper.quorum/name valuemachine01,machine02,machine03/value descriptionZookeeper quorum used by Hive's Table Lock Manager/description /property property namehive.metastore.warehouse.dir/name value/user/hive/warehouse/value descriptionHive warehouse directory/description /property property namemapred.job.tracker/name valuemachine01:8032/value /property property nameio.compression.codecs/name valueorg.apache.hadoop.io.compress.SnappyCodec/value /property property namemapreduce.output.fileoutputformat.compress.codec/name valueorg.apache.hadoop.io.compress.SnappyCodec/value /property property namemapreduce.output.fileoutputformat.compress.type/name valueBLOCK/value /property property namehive.exec.show.job.failure.debug.info/name valuetrue/value description If a job fails, whether to provide a link in the CLI to the task with the most failures, along with debugging hints if applicable. /description /property property namehive.hwi.listen.host/name value0.0.0.0/value descriptionThis is the host address the Hive Web Interface will listen on/description /property property namehive.hwi.listen.port/name value/value descriptionThis is the port the Hive Web Interface will listen on/description /property property namehive.hwi.war.file/name value/lib/hive-hwi-0.10.0-cdh4.2.0.war/value descriptionThis is the WAR file with the jsp content for Hive Web Interface/description /property property namehive.aux.jars.path/name valuefile:///usr/lib/hive/lib/hive-hbase-handler-0.10.0-cdh4.6.0.jar,file:///usr/lib/hbase/hbase-0.94.15-cdh4.6.0-security.jar,file:///usr/lib/zookeeper/zookeeper.jar/value /property property namehbase.zookeeper.quorum/name valuemachine01,machine02,machine03/value /property property namehive.cli.print.header/name valuetrue/value /property property namehive.metastore.execute.setugi/name valuetrue/value descriptionIn unsecure mode, setting this property to true will cause the metastore to execute DFS operations using the client's reported user and group permissions. Note that this property must be set on both the client and server sides. Further note that its best effort. If client sets its to true and server sets it to false, client setting will be ignored./description /property property namehive.security.authorization.enabled/name valuetrue/value descriptionenable or disable the hive client authorization/description /property property namehive.metastore.authorization.storage.checks/name valuetrue/value /property property namehive.security.authorization.createtable.owner.grants/name valueALL/value descriptionthe privileges automatically granted to the owner whenever a table gets created. An example like select,drop will grant select and drop privilege to the owner of the table/description /property /configuration -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-1-1-hang-when-DROP-or-LOAD-tp14222p14319.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
RE: SparkSQL 1.1 hang when DROP or LOAD
The Hadoop client jar should be assembled into the uber-jar, but (I suspect) it's probably not compatible with your Hadoop Cluster. Can you also paste the Spark uber-jar name? Usually will be under the path lib/spark-assembly-1.1.0-xxx-hadoopxxx.jar. -Original Message- From: linkpatrickliu [mailto:linkpatrick...@live.com] Sent: Tuesday, September 16, 2014 12:14 PM To: u...@spark.incubator.apache.org Subject: RE: SparkSQL 1.1 hang when DROP or LOAD Hi, Hao Cheng, Here is the Spark\Hadoop version: Spark version = 1.1.0 Hadoop version = 2.0.0-cdh4.6.0 And hive-site.xml: configuration property namefs.default.name/name valuehdfs://ns/value /property property namedfs.nameservices/name valuens/value /property property namedfs.ha.namenodes.ns/name valuemachine01,machine02/value /property property namedfs.namenode.rpc-address.ns.machine01/name valuemachine01:54310/value /property property namedfs.namenode.rpc-address.ns.machine02/name valuemachine02:54310/value /property property namedfs.client.failover.proxy.provider.ns/name valueorg.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider/value /property property namejavax.jdo.option.ConnectionURL/name valuejdbc:mysql://localhost:3306/metastore/value descriptionJDBC connect string for a JDBC metastore/description /property property namejavax.jdo.option.ConnectionDriverName/name valuecom.mysql.jdbc.Driver/value descriptionDriver class name for a JDBC metastore/description /property property namejavax.jdo.option.ConnectionUserName/name valuehive_user/value /property property namejavax.jdo.option.ConnectionPassword/name valuehive_123/value /property property namedatanucleus.autoCreateSchema/name valuefalse/value /property property namedatanucleus.autoCreateTables/name valuetrue/value /property property namedatanucleus.fixedDatastore/name valuefalse/value /property property namehive.support.concurrency/name descriptionEnable Hive's Table Lock Manager Service/description valuetrue/value /property property namehive.zookeeper.quorum/name valuemachine01,machine02,machine03/value descriptionZookeeper quorum used by Hive's Table Lock Manager/description /property property namehive.metastore.warehouse.dir/name value/user/hive/warehouse/value descriptionHive warehouse directory/description /property property namemapred.job.tracker/name valuemachine01:8032/value /property property nameio.compression.codecs/name valueorg.apache.hadoop.io.compress.SnappyCodec/value /property property namemapreduce.output.fileoutputformat.compress.codec/name valueorg.apache.hadoop.io.compress.SnappyCodec/value /property property namemapreduce.output.fileoutputformat.compress.type/name valueBLOCK/value /property property namehive.exec.show.job.failure.debug.info/name valuetrue/value description If a job fails, whether to provide a link in the CLI to the task with the most failures, along with debugging hints if applicable. /description /property property namehive.hwi.listen.host/name value0.0.0.0/value descriptionThis is the host address the Hive Web Interface will listen on/description /property property namehive.hwi.listen.port/name value/value descriptionThis is the port the Hive Web Interface will listen on/description /property property namehive.hwi.war.file/name value/lib/hive-hwi-0.10.0-cdh4.2.0.war/value descriptionThis is the WAR file with the jsp content for Hive Web Interface/description /property property namehive.aux.jars.path/name valuefile:///usr/lib/hive/lib/hive-hbase-handler-0.10.0-cdh4.6.0.jar,file:///usr/lib/hbase/hbase-0.94.15-cdh4.6.0-security.jar,file:///usr/lib/zookeeper/zookeeper.jar/value /property property namehbase.zookeeper.quorum/name valuemachine01,machine02,machine03/value /property property namehive.cli.print.header/name valuetrue/value /property property namehive.metastore.execute.setugi/name valuetrue/value descriptionIn unsecure mode, setting this property to true will cause the metastore to execute DFS operations using the client's reported user and group permissions. Note that this property must be set on both the client and server sides. Further note that its best effort. If client sets its to true and server sets it to false, client setting will be ignored./description /property property namehive.security.authorization.enabled/name valuetrue/value descriptionenable or disable the hive client authorization/description /property property namehive.metastore.authorization.storage.checks/name valuetrue/value /property property
Re: Re: About SpakSQL OR MLlib
case class Car(id:String,age:Int,tkm:Int,emissions:Int,date:Date, km:Int, fuel:Int) 1. Create an PairedRDD of (age,Car) tuples (pairedRDD) 2. Create a new function fc //returns the interval lower and upper bound def fc(x:Int, interval:Int) : (Int,Int) = { val floor = x - (x%interval) val ceil = floor + interval (floor,ceil) } 3. do a groupBy on this RDD (step 1) by passing the function fc val myrdd = pairedRDD.groupBy( x = fun(x.age, 5) ) On Mon, Sep 15, 2014 at 11:38 PM, boyingk...@163.com boyingk...@163.com wrote: Hi: I have a dataset ,the struct [id,driverAge,TotalKiloMeter ,Emissions ,date,KiloMeter ,fuel], and the data like this: [1-980,34,221926,9,2005-2-8,123,14] [1-981,49,271321,15,2005-2-8,181,82] [1-982,36,189149,18,2005-2-8,162,51] [1-983,51,232753,5,2005-2-8,106,92] [1-984,56,45338,8,2005-2-8,156,98] [1-985,45,132060,4,2005-2-8,179,98] [1-986,40,15751,5,2005-2-8,149,77] [1-987,36,167930,17,2005-2-8,121,87] [1-988,53,44949,4,2005-2-8,195,72] [1-989,34,252867,5,2005-2-8,181,86] [1-990,53,152858,11,2005-2-8,130,43] [1-991,40,126831,11,2005-2-8,126,47] …… now ,my requirments is group by driverAge, five is a step,like 20~25 is a group,26~30 is a group? how should i do ? who can give some code? -- boyingk...@163.com
RE: SparkSQL 1.1 hang when DROP or LOAD
Hi, Hao Cheng, This is my spark assembly jar name: spark-assembly-1.1.0-hadoop2.0.0-cdh4.6.0.jar I compiled spark 1.1.0 with following cmd: export MAVEN_OPTS=-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m mvn -Dhadoop.version=2.0.0-cdh4.6.0 -Phive -Pspark-ganglia-lgpl -DskipTests package -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-1-1-hang-when-DROP-or-LOAD-tp14222p14325.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to set executor num on spark on yarn
hi~I want to set the executor number to 16, but it is very strange that executor cores may affect executor num on spark on yarn, i don't know why and how to set executor number. = ./bin/spark-submit --class com.hequn.spark.SparkJoins \ --master yarn-cluster \ --num-executors 16 \ --driver-memory 2g \ --executor-memory 10g \ * --executor-cores 4 \* /home/sparkjoins-1.0-SNAPSHOT.jar The UI shows there are *7 executors* = ./bin/spark-submit --class com.hequn.spark.SparkJoins \ --master yarn-cluster \ --num-executors 16 \ --driver-memory 2g \ --executor-memory 10g \ *--executor-cores 2 \* /home/sparkjoins-1.0-SNAPSHOT.jar The UI shows there are *9 executors* = ./bin/spark-submit --class com.hequn.spark.SparkJoins \ --master yarn-cluster \ --num-executors 16 \ --driver-memory 2g \ --executor-memory 10g \ *--executor-cores 1 \* /home/sparkjoins-1.0-SNAPSHOT.jar The UI shows there are *9 executors* == The cluster contains 16 nodes. Each node 64G RAM.
Complexity/Efficiency of SortByKey
I wonder what algorithm is used to implement sortByKey? I assume it is some O(n*log(n)) parallelized on x number of nodes, right? Then, what size of data would make it worthwhile to use sortByKey on multiple processors rather than use standard Scala sort functions on a single processor (considering the overhead of putting stuff into RDDs and collecting them back)? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Complexity-Efficiency-of-SortByKey-tp14328.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: NullWritable not serializable
Can you post the exact code for the test that worked in 1.0? I can't think of much that could've changed. The one possibility is if we had some operations that were computed locally on the driver (this happens with things like first() and take(), which will try to do the first partition locally). But generally speaking these operations should *not* work over a network, so you'll have to make sure that you only send serializable types through shuffles or collects, or use a serialization framework like Kryo that might be okay with Writables. Matei On September 15, 2014 at 9:13:13 PM, Du Li (l...@yahoo-inc.com) wrote: Hi Matei, Thanks for your reply. The Writable classes have never been serializable and this is why it is weird. I did try as you suggested to map the Writables to integers and strings. It didn’t pass, either. Similar exceptions were thrown except that the messages became IntWritable, Text are not serializable. The reason is in the implicits defined in the SparkContext object that convert those values into their corresponding Writable classes before saving the data in sequence file. My original code was actual some test cases to try out SequenceFile related APIs. The tests all passed when the spark version was specified as 1.0.2. But this one failed after I changed the spark version to 1.1.0 the new release, nothing else changed. In addition, it failed when I called rdd2.collect(), take(1), and first(). But it worked fine when calling rdd2.count(). As you can see, count() does not need to serialize and ship data while the other three methods do. Do you recall any difference between spark 1.0 and 1.1 that might cause this problem? Thanks, Du From: Matei Zaharia matei.zaha...@gmail.com Date: Friday, September 12, 2014 at 9:10 PM To: Du Li l...@yahoo-inc.com.invalid, user@spark.apache.org user@spark.apache.org, d...@spark.apache.org d...@spark.apache.org Subject: Re: NullWritable not serializable Hi Du, I don't think NullWritable has ever been serializable, so you must be doing something differently from your previous program. In this case though, just use a map() to turn your Writables to serializable types (e.g. null and String). Matie On September 12, 2014 at 8:48:36 PM, Du Li (l...@yahoo-inc.com.invalid) wrote: Hi, I was trying the following on spark-shell (built with apache master and hadoop 2.4.0). Both calling rdd2.collect and calling rdd3.collect threw java.io.NotSerializableException: org.apache.hadoop.io.NullWritable. I got the same problem in similar code of my app which uses the newly released Spark 1.1.0 under hadoop 2.4.0. Previously it worked fine with spark 1.0.2 under either hadoop 2.40 and 0.23.10. Anybody knows what caused the problem? Thanks, Du import org.apache.hadoop.io.{NullWritable, Text} val rdd = sc.textFile(README.md) val res = rdd.map(x = (NullWritable.get(), new Text(x))) res.saveAsSequenceFile(./test_data) val rdd2 = sc.sequenceFile(./test_data, classOf[NullWritable], classOf[Text]) rdd2.collect val rdd3 = sc.sequenceFile[NullWritable,Text](./test_data) rdd3.collect
RE: SparkSQL 1.1 hang when DROP or LOAD
Sorry, I am not able to reproduce that. Can you try add the following entry into the hive-site.xml? I know they have the default value, but let's make it explicitly. hive.server2.thrift.port hive.server2.thrift.bind.host hive.server2.authentication (NONE、KERBEROS、LDAP、PAM or CUSTOM) -Original Message- From: linkpatrickliu [mailto:linkpatrick...@live.com] Sent: Tuesday, September 16, 2014 1:10 PM To: u...@spark.incubator.apache.org Subject: RE: SparkSQL 1.1 hang when DROP or LOAD Besides, When I use bin/spark-sql, I can Load data and drop table freely. Only when I use sbin/start-thriftserver.sh and connect with beeline, the client will hang! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-1-1-hang-when-DROP-or-LOAD-tp14222p14326.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Complexity/Efficiency of SortByKey
sortByKey is indeed O(n log n), it's a first pass to figure out even-sized partitions (by sampling the RDD), then a second pass to do a distributed merge-sort (first partition the data on each machine, then run a reduce phase that merges the data for each partition). The point where it becomes useful to scale out versus a single machine is probably pretty high, because communication over a network is *much* slower than memory bandwidth within a machine. Generally it would make the most sense for data that doesn't fit in memory on a single machine, or data that already starts out distributed. Please also note that if you run Spark on just one multicore machine, it still goes through many of the same code paths as on a cluster (e.g. serializing data between tasks) -- it's not optimized to be as fast as, say, a multithreaded sort framework. So it wouldn't make a ton of sense to use it for that. Matei On September 15, 2014 at 10:32:14 PM, cjwang (c...@cjwang.us) wrote: I wonder what algorithm is used to implement sortByKey? I assume it is some O(n*log(n)) parallelized on x number of nodes, right? Then, what size of data would make it worthwhile to use sortByKey on multiple processors rather than use standard Scala sort functions on a single processor (considering the overhead of putting stuff into RDDs and collecting them back)? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Complexity-Efficiency-of-SortByKey-tp14328.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org