spark 1.3.1 : unable to access s3n:// urls (no file system for scheme s3n:)

2015-04-22 Thread Sujee Maniyam
Hi all
I am unable to access s3n://  urls using   sc.textFile().. getting 'no file
system for scheme s3n://'  error.

a bug or some conf settings missing?

See below for details:

env variables :
AWS_SECRET_ACCESS_KEY=set
AWS_ACCESS_KEY_ID=set

spark/RELAESE :
Spark 1.3.1 (git revision 908a0bf) built for Hadoop 2.6.0
Build flags: -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver
-Pyarn -DzincPort=3034


./bin/spark-shell
 val f = sc.textFile(s3n://bucket/file)
 f.count

error==
java.io.IOException: No FileSystem for scheme: s3n
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at
org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256)
at
org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
at
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1512)
at org.apache.spark.rdd.RDD.count(RDD.scala:1006)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:24)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:29)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:31)
at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:33)
at $iwC$$iwC$$iwC$$iwC.init(console:35)
at $iwC$$iwC$$iwC.init(console:37)
at $iwC$$iwC.init(console:39)
at $iwC.init(console:41)
at init(console:43)
at .init(console:47)
at .clinit(console)
at .init(console:7)
at .clinit(console)
at $print(console)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:856)
at
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:656)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:664)
at org.apache.spark.repl.SparkILoop.org
$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669)
at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:996)
at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
at
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org
$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)



Sujee Maniyam 

RE: spark 1.3.1 : unable to access s3n:// urls (no file system for scheme s3n:)

2015-04-22 Thread Shuai Zheng
Below is my code to access s3n without problem (only for 1.3.1. there is a bug 
in 1.3.0).

 

  Configuration hadoopConf = ctx.hadoopConfiguration();

  hadoopConf.set(fs.s3n.impl, 
org.apache.hadoop.fs.s3native.NativeS3FileSystem);

  hadoopConf.set(fs.s3n.awsAccessKeyId, awsAccessKeyId);

  hadoopConf.set(fs.s3n.awsSecretAccessKey, awsSecretAccessKey);

 

Regards,

 

Shuai

 

From: Sujee Maniyam [mailto:su...@sujee.net] 
Sent: Wednesday, April 22, 2015 12:45 PM
To: Spark User List
Subject: spark 1.3.1 : unable to access s3n:// urls (no file system for scheme 
s3n:)

 

Hi all

I am unable to access s3n://  urls using   sc.textFile().. getting 'no file 
system for scheme s3n://'  error.

 

a bug or some conf settings missing?

 

See below for details:

 

env variables : 

AWS_SECRET_ACCESS_KEY=set

AWS_ACCESS_KEY_ID=set

 

spark/RELAESE :

Spark 1.3.1 (git revision 908a0bf) built for Hadoop 2.6.0

Build flags: -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver 
-Pyarn -DzincPort=3034

 

 

./bin/spark-shell

 val f = sc.textFile(s3n://bucket/file)

 f.count

 

error== 

java.io.IOException: No FileSystem for scheme: s3n

at 
org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)

at 
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)

at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)

at 
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)

at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)

at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)

at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)

at 
org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256)

at 
org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)

at 
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)

at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)

at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)

at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)

at scala.Option.getOrElse(Option.scala:120)

at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)

at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)

at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)

at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)

at scala.Option.getOrElse(Option.scala:120)

at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)

at org.apache.spark.SparkContext.runJob(SparkContext.scala:1512)

at org.apache.spark.rdd.RDD.count(RDD.scala:1006)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:24)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:29)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:31)

at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:33)

at $iwC$$iwC$$iwC$$iwC.init(console:35)

at $iwC$$iwC$$iwC.init(console:37)

at $iwC$$iwC.init(console:39)

at $iwC.init(console:41)

at init(console:43)

at .init(console:47)

at .clinit(console)

at .init(console:7)

at .clinit(console)

at $print(console)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at 
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)

at 
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)

at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)

at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)

at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)

at 
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:856)

at 
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901)

at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813)

at 
org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:656)

at 
org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:664)

at 
org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669)

at 

Re: spark 1.3.1 : unable to access s3n:// urls (no file system for scheme s3n:)

2015-04-22 Thread Ted Yu
This thread from hadoop mailing list should give you some clue:
http://search-hadoop.com/m/LgpTk2df7822

On Wed, Apr 22, 2015 at 9:45 AM, Sujee Maniyam su...@sujee.net wrote:

 Hi all
 I am unable to access s3n://  urls using   sc.textFile().. getting 'no
 file system for scheme s3n://'  error.

 a bug or some conf settings missing?

 See below for details:

 env variables :
 AWS_SECRET_ACCESS_KEY=set
 AWS_ACCESS_KEY_ID=set

 spark/RELAESE :
 Spark 1.3.1 (git revision 908a0bf) built for Hadoop 2.6.0
 Build flags: -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive
 -Phive-thriftserver -Pyarn -DzincPort=3034


 ./bin/spark-shell
  val f = sc.textFile(s3n://bucket/file)
  f.count

 error==
 java.io.IOException: No FileSystem for scheme: s3n
 at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)
 at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
 at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
 at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
 at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
 at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
 at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
 at
 org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256)
 at
 org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
 at
 org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
 at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
 at
 org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:1512)
 at org.apache.spark.rdd.RDD.count(RDD.scala:1006)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:24)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:29)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:31)
 at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:33)
 at $iwC$$iwC$$iwC$$iwC.init(console:35)
 at $iwC$$iwC$$iwC.init(console:37)
 at $iwC$$iwC.init(console:39)
 at $iwC.init(console:41)
 at init(console:43)
 at .init(console:47)
 at .clinit(console)
 at .init(console:7)
 at .clinit(console)
 at $print(console)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
 at
 org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
 at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
 at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
 at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
 at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:856)
 at
 org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901)
 at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813)
 at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:656)
 at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:664)
 at org.apache.spark.repl.SparkILoop.org
 $apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669)
 at
 org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:996)
 at
 org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
 at
 org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
 at
 scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
 at org.apache.spark.repl.SparkILoop.org
 $apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944)
 at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058)
 at org.apache.spark.repl.Main$.main(Main.scala:31)
 at org.apache.spark.repl.Main.main(Main.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
 at 

Can I index a column in parquet file to make it join faster

2015-04-22 Thread Wang, Ningjun (LNG-NPV)
I have two RDDs each saved in a parquet file. I need to join this two RDDs by 
the id column. Can I created index on the id column so they can join faster? 
Here is the code


case class Example(val id: String,  val category: String)
case class DocVector(val id: String,  val vector: Vector)

val examples : RDD[Example] = ...// First RDD which contain a few thousands 
items
val docVectors : RDD[DocVector] = ...  // Second RDD   which contains 10 
million items

// These two RDDs are saved in parquet files as follow
examples.toDF().saveAsParquetFile(file:///c:/temp/examples.parquet)
docVectors.toDF().saveAsParquetFile(file:///c:/temp/docVectors.parquet)


// Now I need to join these two RDDs stored in the parquet files
val dfExamples = sqlContext.parquetFile(file:///c:/temp/docVectors.parquet)
val dfDocVectors = sqlContext.parquetFile(docVectorsParquet) // DataFrame of 
(id, vector)

dfExamples.join(dfDocVectors, dfExamples(id) === 
dfDocVectors(id)).select(dfDocVectors(id), dfDocVectors(vector), 
dfExamples(cat))

I need to perform such kind of join many times. To speed up the join, can I 
create index on the id column in the parquet file like what I can do to a 
database table?


Ningjun



Re: Convert DStream to DataFrame

2015-04-22 Thread Sergio Jiménez Barrio
I tried the solution of the guide, but I exceded the size of case class Row:


2015-04-22 15:22 GMT+02:00 Tathagata Das tathagata.das1...@gmail.com:

 Did you checkout the latest streaming programming guide?


 http://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations

 You also need to be aware of that to convert json RDDs to dataframe,
 sqlContext has to make a pass on the data to learn the schema. This will
 fail if a batch has no data. You have to safeguard against that.

 On Wed, Apr 22, 2015 at 6:19 AM, ayan guha guha.a...@gmail.com wrote:

 What about sqlcontext.createDataframe(rdd)?
 On 22 Apr 2015 23:04, Sergio Jiménez Barrio drarse.a...@gmail.com
 wrote:

 Hi,

 I am using Kafka with Apache Stream to send JSON to Apache Spark:

 val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, 
 StringDecoder](ssc, kafkaParams, topicsSet)

 Now, I want parse the DStream created to DataFrame, but I don't know if
 Spark 1.3 have some easy way for this. ¿Any suggestion? I can get the
 message with:

 val lines = messages.map(_._2)

 Thank u for all. Sergio J.






Re: Clustering algorithms in Spark

2015-04-22 Thread Jeetendra Gangele
does anybody have any thought on this?

On 21 April 2015 at 20:57, Jeetendra Gangele gangele...@gmail.com wrote:

 The problem with k means is we have to define the no of cluster which I
 dont want in this case
 So thinking for something like hierarchical clustering any idea and
 suggestions?



 On 21 April 2015 at 20:51, Jeetendra Gangele gangele...@gmail.com wrote:

 I have a requirement in which I want to match the company name .. and I
 am thinking to solve this using clustering technique.

 Can anybody suggest which algo I should Use in Spark and how to evaluate
 the running time and accuracy for this particular problem.

 I checked K means looks good.
 Any idea suggestions?









Re: Convert DStream to DataFrame

2015-04-22 Thread Tathagata Das
Did you checkout the latest streaming programming guide?

http://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations

You also need to be aware of that to convert json RDDs to dataframe,
sqlContext has to make a pass on the data to learn the schema. This will
fail if a batch has no data. You have to safeguard against that.

On Wed, Apr 22, 2015 at 6:19 AM, ayan guha guha.a...@gmail.com wrote:

 What about sqlcontext.createDataframe(rdd)?
 On 22 Apr 2015 23:04, Sergio Jiménez Barrio drarse.a...@gmail.com
 wrote:

 Hi,

 I am using Kafka with Apache Stream to send JSON to Apache Spark:

 val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, 
 StringDecoder](ssc, kafkaParams, topicsSet)

 Now, I want parse the DStream created to DataFrame, but I don't know if
 Spark 1.3 have some easy way for this. ¿Any suggestion? I can get the
 message with:

 val lines = messages.map(_._2)

 Thank u for all. Sergio J.





Re: Building Spark : Building just one module.

2015-04-22 Thread Iulian Dragoș
One way is to use export SPARK_PREPEND_CLASSES=true. This will instruct the
launcher to prepend the target directories for each project to the spark
assembly. I’ve had mixed experiences with it lately, but in principle
that's the only way I know.
​

On Wed, Apr 22, 2015 at 3:42 PM, zia_kayani zia.kay...@platalytics.com
wrote:

 Hi,

 I've to add custom things into spark SQL and Catalyst Module ... But for
 every time I change a line of code I've to compile the whole spark, if I
 only compile sql/core and sql/catalyst module those changes aren't visible
 when I run the job over that spark, What I'm missing ? Any other way to
 overcome this, as this is time talking ...



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Building-Spark-Building-just-one-module-tp22607.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 

--
Iulian Dragos

--
Reactive Apps on the JVM
www.typesafe.com


Re: Auto Starting a Spark Job on Cluster Starup

2015-04-22 Thread Ted Yu
This thread seems related:
http://search-hadoop.com/m/JW1q51W02V

Cheers

On Wed, Apr 22, 2015 at 6:09 AM, James King jakwebin...@gmail.com wrote:

 What's the best way to start-up a spark job as part of starting-up the
 Spark cluster.

 I have an single uber jar for my job and want to make the start-up as easy
 as possible.

 Thanks

 jk





Re: Multiple HA spark clusters managed by 1 ZK cluster?

2015-04-22 Thread Jeff Nadler
You can run multiple Spark clusters against one ZK cluster.   Just use this
config to set independent ZK roots for each cluster:

 spark.deploy.zookeeper.dir
 The directory in ZooKeeper to store recovery state (default: /spark).

-Jeff


From: Sean Owen so...@cloudera.com
To: Akhil Das ak...@sigmoidanalytics.com
Cc: Michal Klos michal.klo...@gmail.com, User user@spark.apache.org
Date: Wed, 22 Apr 2015 11:05:46 +0100
Subject: Re: Multiple HA spark clusters managed by 1 ZK cluster?
Not that i've tried it, but, why couldn't you use one ZK server? I
don't see a reason.

On Wed, Apr 22, 2015 at 7:40 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:
 It isn't mentioned anywhere in the doc, but you will probably need
separate
 ZK for each of your HA cluster.

 Thanks
 Best Regards


Re: Spark Performance on Yarn

2015-04-22 Thread Ted Yu
In master branch, overhead is now 10%. 
That would be 500 MB 

FYI



 On Apr 22, 2015, at 8:26 AM, nsalian neeleshssal...@gmail.com wrote:
 
 +1 to executor-memory to 5g.
 Do check the overhead space for both the driver and the executor as per
 Wilfred's suggestion.
 
 Typically, 384 MB should suffice.
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729p22610.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Master -chatter - Worker

2015-04-22 Thread James King
Is there a good resource that covers what kind of chatter (communication)
that goes on between driver, master and worker processes?

Thanks


the indices of SparseVector must be ordered while computing SVD

2015-04-22 Thread yaochunnan
Hi all, 
I am using Spark 1.3.1 to write a Spectral Clustering algorithm. This really
confused me today. At first I thought my implementation is wrong. It turns
out it's an issue in MLlib. Fortunately, I've figured it out. 

I suggest to add a hint on user document of MLlib ( as far as I know, there
have not been such hints yet) that  indices of Local Sparse Vector must be
ordered in ascending manner. Because of ignorance of this point, I spent a
lot of time looking for reasons why computeSVD of RowMatrix did not run
correctly on Sparse data. I don't know the influence of Sparse Vector
without ordered indices on other functions, but I believe it is necessary to
let the users know or fix it. Actually, it's very easy to fix. Just add a
sortBy function in internal construction of SparseVector. 

Here is an example to reproduce the affect of unordered Sparse Vector on
computeSVD.

//in spark-shell, Spark 1.3.1
 import org.apache.spark.mllib.linalg.distributed.RowMatrix
 import org.apache.spark.mllib.linalg.{SparseVector, DenseVector, Vector,
Vectors}

  val sparseData_ordered = Seq(
Vectors.sparse(3, Array(1, 2), Array(1.0, 2.0)),
Vectors.sparse(3, Array(0,1,2), Array(3.0, 4.0, 5.0)),
Vectors.sparse(3, Array(0,1,2), Array(6.0, 7.0, 8.0)),
Vectors.sparse(3, Array(0,2), Array(9.0, 1.0))
  )
  val sparseMat_ordered = new RowMatrix(sc.parallelize(sparseData_ordered,
2))

  val sparseData_not_ordered = Seq(
Vectors.sparse(3, Array(1, 2), Array(1.0, 2.0)),
Vectors.sparse(3, Array(2,1,0), Array(5.0,4.0,3.0)),
Vectors.sparse(3, Array(0,1,2), Array(6.0, 7.0, 8.0)),
Vectors.sparse(3, Array(2,0), Array(1.0,9.0))
  )
 val sparseMat_not_ordered = new
RowMatrix(sc.parallelize(sparseData_not_ordered, 2))

//apparently, sparseMat_ordered and sparseMat_not_ordered are essentially
the same matirx
//however, the computeSVD result of these two matrixes are different. Users
should be notified about this situation. 
  println(sparseMat_ordered.computeSVD(2,
true).U.rows.collect.mkString(\n))
  println(===)
  println(sparseMat_not_ordered.computeSVD(2,
true).U.rows.collect.mkString(\n))
==
The results are:
ordered: 
[-0.10972870132786407,-0.18850811494220537]
[-0.44712472003608356,-0.24828866611663725]
[-0.784520738744303,-0.3080692172910691]
[-0.4154110101064339,0.8988385762953358]

not ordered:
[-0.10830447119599484,-0.1559341848984378]
[-0.4522713511277327,-0.23449829541447448]
[-0.7962382310594706,-0.3130624059305111]
[-0.43131320303494614,0.8453864703362308]

Looking into this issue, I can see it's reason locates in
RowMatrix.scala(line 629). The implementation of Sparse dspr here requires
ordered indices. Because it is scanning the indices consecutively to skip
empty columns. 








--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/the-indices-of-SparseVector-must-be-ordered-while-computing-SVD-tp22611.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Parquet Hive table become very slow on 1.3?

2015-04-22 Thread Yin Huai
Xudong and Rex,

Can you try 1.3.1? With PR 5339 http://github.com/apache/spark/pull/5339 ,
after we get a hive parquet from metastore and convert it to our native
parquet code path, we will cache the converted relation. For now, the first
access to that hive parquet table reads all of the footers (when you first
refer to that table in a query or call
sqlContext.table(hiveParquetTable)). All of your later accesses will hit
the metadata cache.

Thanks,

Yin

On Tue, Apr 21, 2015 at 1:13 AM, Rex Xiong bycha...@gmail.com wrote:

 We have the similar issue with massive parquet files, Cheng Lian, could
 you have a look?

 2015-04-08 15:47 GMT+08:00 Zheng, Xudong dong...@gmail.com:

 Hi Cheng,

 I tried both these patches, and seems still not resolve my issue. And I
 found the most time is spend on this line in newParquet.scala:

 ParquetFileReader.readAllFootersInParallel(
   sparkContext.hadoopConfiguration, seqAsJavaList(leaves),
 taskSideMetaData)

 Which need read all the files under the Parquet folder, while our Parquet
 folder has a lot of Parquet files (near 2000), read one file need about 2
 seconds, so it become very slow ... And the PR 5231 did not skip this steps
 so it not resolve my issue.

 As our Parquet files are generated by a Spark job, so the number of
 .parquet files is same with the number of tasks, that is why we have so
 many files. But these files actually have the same schema. Is there any way
 to merge these files into one, or avoid scan each of them?

 On Sat, Apr 4, 2015 at 9:47 PM, Cheng Lian lian.cs@gmail.com wrote:

  Hey Xudong,

 We had been digging this issue for a while, and believe PR 5339
 http://github.com/apache/spark/pull/5339 and PR 5334
 http://github.com/apache/spark/pull/5339 should fix this issue.

 There two problems:

 1. Normally we cache Parquet table metadata for better performance, but
 when converting Hive metastore Hive tables, the cache is not used. Thus
 heavy operations like schema discovery is done every time a metastore
 Parquet table is converted.
 2. With Parquet task side metadata reading (which is turned on by
 default), we can actually skip the row group information in the footer.
 However, we accidentally called a Parquet function which doesn't skip row
 group information.

 For your question about schema merging, Parquet allows different
 part-files have different but compatible schemas. For example,
 part-1.parquet has columns a and b, while part-2.parquet may has
 columns a and c. In some cases, the summary files (_metadata and
 _common_metadata) contains the merged schema (a, b, and c), but it's not
 guaranteed. For example, when the user defined metadata stored different
 part-files contain different values for the same key, Parquet simply gives
 up writing summary files. That's why all part-files must be touched to get
 a precise merged schema.

 However, in scenarios where a centralized arbitrative schema is
 available (e.g. Hive metastore schema, or the schema provided by user via
 data source DDL), we don't need to do schema merging on driver side, but
 defer it to executor side and each task only needs to reconcile those
 part-files it needs to touch. This is also what the Parquet developers did
 recently for parquet-hadoop
 https://github.com/apache/incubator-parquet-mr/pull/45.

 Cheng


 On 3/31/15 11:49 PM, Zheng, Xudong wrote:

 Thanks Cheng!

  Set 'spark.sql.parquet.useDataSourceApi' to false resolves my issues,
 but the PR 5231 seems not. Not sure any other things I did wrong ...

  BTW, actually, we are very interested in the schema merging feature in
 Spark 1.3, so both these two solution will disable this feature, right? It
 seems that Parquet metadata is store in a file named _metadata in the
 Parquet file folder (each folder is a partition as we use partition table),
 why we need scan all Parquet part files? Is there any other solutions could
 keep schema merging feature at the same time? We are really like this
 feature :)

 On Tue, Mar 31, 2015 at 3:19 PM, Cheng Lian lian.cs@gmail.com
 wrote:

  Hi Xudong,

 This is probably because of Parquet schema merging is turned on by
 default. This is generally useful for Parquet files with different but
 compatible schemas. But it needs to read metadata from all Parquet
 part-files. This can be problematic when reading Parquet files with lots of
 part-files, especially when the user doesn't need schema merging.

 This issue is tracked by SPARK-6575, and here is a PR for it:
 https://github.com/apache/spark/pull/5231. This PR adds a
 configuration to disable schema merging by default when doing Hive
 metastore Parquet table conversion.

 Another workaround is to fallback to the old Parquet code by setting
 spark.sql.parquet.useDataSourceApi to false.

 Cheng


 On 3/31/15 2:47 PM, Zheng, Xudong wrote:

 Hi all,

  We are using Parquet Hive table, and we are upgrading to Spark 1.3.
 But we find that, just a simple COUNT(*) query will much slower (100x) than
 Spark 

Re: Spark Performance on Yarn

2015-04-22 Thread nsalian
+1 to executor-memory to 5g.
Do check the overhead space for both the driver and the executor as per
Wilfred's suggestion.

Typically, 384 MB should suffice.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729p22610.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Multiple HA spark clusters managed by 1 ZK cluster?

2015-04-22 Thread Akhil Das
nice, thanks for the information.

Thanks
Best Regards

On Wed, Apr 22, 2015 at 8:53 PM, Jeff Nadler jnad...@srcginc.com wrote:


 You can run multiple Spark clusters against one ZK cluster.   Just use
 this config to set independent ZK roots for each cluster:

  spark.deploy.zookeeper.dir
  The directory in ZooKeeper to store recovery state (default: /spark).

 -Jeff


 From: Sean Owen so...@cloudera.com
 To: Akhil Das ak...@sigmoidanalytics.com
 Cc: Michal Klos michal.klo...@gmail.com, User user@spark.apache.org
 Date: Wed, 22 Apr 2015 11:05:46 +0100
 Subject: Re: Multiple HA spark clusters managed by 1 ZK cluster?
 Not that i've tried it, but, why couldn't you use one ZK server? I
 don't see a reason.

 On Wed, Apr 22, 2015 at 7:40 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:
  It isn't mentioned anywhere in the doc, but you will probably need
 separate
  ZK for each of your HA cluster.
 
  Thanks
  Best Regards



Re: Spark Performance on Yarn

2015-04-22 Thread Neelesh Salian
Does it still hit the memory limit for the container?

An expensive transformation?

On Wed, Apr 22, 2015 at 8:45 AM, Ted Yu yuzhih...@gmail.com wrote:

 In master branch, overhead is now 10%.
 That would be 500 MB

 FYI



  On Apr 22, 2015, at 8:26 AM, nsalian neeleshssal...@gmail.com wrote:
 
  +1 to executor-memory to 5g.
  Do check the overhead space for both the driver and the executor as per
  Wilfred's suggestion.
 
  Typically, 384 MB should suffice.
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729p22610.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 



RDD.filter vs. RDD.join--advice please

2015-04-22 Thread hokiegeek2
Hi Everyone,

I have two options of filtering the RDD resulting from the Graph.vertices
method as illustrated with the following pseudo code:

1. Filter

val vertexSet = Set(vertexOne,vertexTwo...);

val filteredVertices = Graph.vertices.filter(x =
vertexSet.contains(x._2.vertexName))

2. Join

val filterVertexIdsRDD = ...

val filteredVertices = Graph.vertices.join(filterVertexIdsRDD)

Either one of these should work, correct?

Thanks

--John





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RDD-filter-vs-RDD-join-advice-please-tp22612.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Trouble working with Spark-CSV package (error: object databricks is not a member of package com)

2015-04-22 Thread Mohammed Omer
Afternoon all,

I'm working with Scala 2.11.6, and Spark 1.3.1 built from source via:

`mvn -Pyarn -Phadoop-2.4 -Dscala-2.11 -DskipTests clean package`

The error is encountered when running spark shell via:

`spark-shell --packages com.databricks:spark-csv_2.11:1.0.3`

The full trace of the commands can be found at
https://gist.github.com/momer/9d1ca583f9978ec9739d

Not sure if I've done something wrong, or if the documentation is outdated,
or...?

Would appreciate any input or push in the right direction!

Thank you,

Mo


Re: Map Question

2015-04-22 Thread Tathagata Das
Is the mylist present on every executor? If not, then you have to pass it
on. And broadcasts are the best way to pass them on. But note that once
broadcasted it will immutable at the executors, and if you update the list
at the driver, you will have to broadcast it again.

TD

On Wed, Apr 22, 2015 at 9:28 AM, Vadim Bichutskiy 
vadim.bichuts...@gmail.com wrote:

 I am using Spark Streaming with Python. For each RDD, I call a map, i.e.,
 myrdd.map(myfunc), myfunc is in a separate Python module. In yet another
 separate Python module I have a global list, i.e. mylist, that's populated
 with metadata. I can't get myfunc to see mylist...it's always empty.
 Alternatively, I guess I could pass mylist to map.

 Any suggestions?

 Thanks,
 Vadim
 ᐧ



Re: regarding ZipWithIndex

2015-04-22 Thread Jeetendra Gangele
Sure thanks. if you can guide me how to do this will be great help.

On 17 April 2015 at 22:05, Ted Yu yuzhih...@gmail.com wrote:

 I have some assignments on hand at the moment.

 Will try to come up with sample code after I clear the assignments.

 FYI

 On Thu, Apr 16, 2015 at 2:00 PM, Jeetendra Gangele gangele...@gmail.com
 wrote:

 Can you please guide me how can I extend RDD and convert into this way
 you are suggesting.

 On 16 April 2015 at 23:46, Jeetendra Gangele gangele...@gmail.com
 wrote:

 I type T i already have Object ... I have RDDObject and then I am
 calling ZipWithIndex on this RDD and getting RDDObject,Long on this I am
 running MapToPair and converting into RDDLong,Object so that i can use it
 later for other operation like lookup and join.


 On 16 April 2015 at 23:42, Ted Yu yuzhih...@gmail.com wrote:

 The Long in RDD[(T, Long)] is type parameter. You can create RDD with
 Integer as the first type parameter.

 Cheers

 On Thu, Apr 16, 2015 at 11:07 AM, Jeetendra Gangele 
 gangele...@gmail.com wrote:

 Hi Ted.
 This works for me. But since Long takes here 8 bytes. Can I reduce it
 to 4 bytes. its just a index and I feel 4 bytes was more than
 enough.is there any method which takes Integer or similar for Index?


 On 13 April 2015 at 01:59, Ted Yu yuzhih...@gmail.com wrote:

 bq. will return something like JavaPairRDDObject, long

 The long component of the pair fits your description of index. What
 other requirement does ZipWithIndex not provide you ?

 Cheers

 On Sun, Apr 12, 2015 at 1:16 PM, Jeetendra Gangele 
 gangele...@gmail.com wrote:

 Hi All I have an RDD JavaRDDObject and I want to convert it to
 JavaPairRDDIndex,Object.. Index should be unique and it should 
 maintain
 the order. For first object It should have 1 and then for second 2 like
 that.

 I tried using ZipWithIndex but it will return something like
 JavaPairRDDObject, long
 I wanted to use this RDD for lookup and join operation later in my
 workflow so ordering is important.


 Regards
 jeet

















Spark streaming action running the same work in parallel

2015-04-22 Thread ColinMc
Hi,

I'm running a unit test that keeps failing to work with the code I wrote in
Spark.

Here is the output logs from my test that I ran that gets the customers from
incoming events in the JSON called QueueEvent and I am trying to convert the
incoming events for each customer to an alert.



From the logs you can see that there is one RDD in the stream with 6 events
(3 for each customer).

Here is the code snippet that gets the customers and gets the alerts for all
the customers.



Spark is doing the same work but on different threads as you can see in the
logs Executor task launch worker-x. This is throwing off the results when
comparing against the expected results.

Here is some of the code in the test for troubleshooting




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-action-running-the-same-work-in-parallel-tp22613.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Not able run multiple tasks in parallel, spark streaming

2015-04-22 Thread Tathagata Das
Furthermore, just to explain, doing arr.par.foreach does not help because
it not really running anything, it only doing setup of the computation.
Doing the setup in parallel does not mean that the jobs will be done
concurrently.

Also, from your code it seems like your pairs of dstreams dont interact
with each other (that is pair1 dont interact with pair2). Then you could
run then in separate applications, which would allow them to run in
parallel.


On Tue, Apr 21, 2015 at 11:53 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 You can enable this flag to run multiple jobs concurrently, It might not
 be production ready, but you can give it a try:

 sc.set(spark.streaming.concurrentJobs,2)

 ​Refer to TD's answer here
 http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming#answers-header
 for more information.​


 Thanks
 Best Regards

 On Wed, Apr 22, 2015 at 8:57 AM, Abhay Bansal abhaybansal.1...@gmail.com
 wrote:

 Hi,

 I have use case wherein I have to join multiple kafka topics in parallel.
 So if there are 2n topics there is a one to one mapping of topics which
 needs to be joined.


 val arr= ...

 for(condition) {

 val dStream1 = KafkaUtils.createDirectStream[String, String,
 StringDecoder, StringDecoder](ssc, kafkaParams, topics1
 ).map(a=(getKey1(a._2),a._2))

 val dStream2 = KafkaUtils.createDirectStream[String, String,
 StringDecoder, StringDecoder](ssc, kafkaParams, topics2
 ).map(a=(getKey2(a._2),a._2))

arr(counter) = (dStream1, dStream2);

counter+=1;

 }



 arr.par.foreach {

 case(dStream1, dStream2) = try {

 val joined = dStream1.join(dStream2,4);

 joined.saveAsTextFiles(joinedData”)

 }

 catch {

 case t:Exception =t.printStackTrace();

 }

 }



 ssc.start()

 ssc.awaitTermination()


 Doing so the streams are getting joined by sequentially. Is there a way
 out of this? I am new to spark, would appreciate any suggestions around
 this.


 Thanks,

 -Abhay





Re: Map Question

2015-04-22 Thread Vadim Bichutskiy
Can I use broadcast vars in local mode?
ᐧ

On Wed, Apr 22, 2015 at 2:06 PM, Tathagata Das t...@databricks.com wrote:

 Yep. Not efficient. Pretty bad actually. That's why broadcast variable
 were introduced right at the very beginning of Spark.



 On Wed, Apr 22, 2015 at 10:58 AM, Vadim Bichutskiy 
 vadim.bichuts...@gmail.com wrote:

 Thanks TD. I was looking into broadcast variables.

 Right now I am running it locally...and I plan to move it to production
 on EC2.

 The way I fixed it is by doing myrdd.map(lambda x: (x,
 mylist)).map(myfunc) but I don't think it's efficient?

 mylist is filled only once at the start and never changes.

 Vadim
 ᐧ

 On Wed, Apr 22, 2015 at 1:42 PM, Tathagata Das t...@databricks.com
 wrote:

 Is the mylist present on every executor? If not, then you have to pass
 it on. And broadcasts are the best way to pass them on. But note that once
 broadcasted it will immutable at the executors, and if you update the list
 at the driver, you will have to broadcast it again.

 TD

 On Wed, Apr 22, 2015 at 9:28 AM, Vadim Bichutskiy 
 vadim.bichuts...@gmail.com wrote:

 I am using Spark Streaming with Python. For each RDD, I call a map,
 i.e., myrdd.map(myfunc), myfunc is in a separate Python module. In yet
 another separate Python module I have a global list, i.e. mylist,
 that's populated with metadata. I can't get myfunc to see mylist...it's
 always empty. Alternatively, I guess I could pass mylist to map.

 Any suggestions?

 Thanks,
 Vadim







Re: Map Question

2015-04-22 Thread Tathagata Das
Absolutely. The same code would work for local as well as distributed mode!

On Wed, Apr 22, 2015 at 11:08 AM, Vadim Bichutskiy 
vadim.bichuts...@gmail.com wrote:

 Can I use broadcast vars in local mode?
 ᐧ

 On Wed, Apr 22, 2015 at 2:06 PM, Tathagata Das t...@databricks.com
 wrote:

 Yep. Not efficient. Pretty bad actually. That's why broadcast variable
 were introduced right at the very beginning of Spark.



 On Wed, Apr 22, 2015 at 10:58 AM, Vadim Bichutskiy 
 vadim.bichuts...@gmail.com wrote:

 Thanks TD. I was looking into broadcast variables.

 Right now I am running it locally...and I plan to move it to
 production on EC2.

 The way I fixed it is by doing myrdd.map(lambda x: (x,
 mylist)).map(myfunc) but I don't think it's efficient?

 mylist is filled only once at the start and never changes.

 Vadim
 ᐧ

 On Wed, Apr 22, 2015 at 1:42 PM, Tathagata Das t...@databricks.com
 wrote:

 Is the mylist present on every executor? If not, then you have to pass
 it on. And broadcasts are the best way to pass them on. But note that once
 broadcasted it will immutable at the executors, and if you update the list
 at the driver, you will have to broadcast it again.

 TD

 On Wed, Apr 22, 2015 at 9:28 AM, Vadim Bichutskiy 
 vadim.bichuts...@gmail.com wrote:

 I am using Spark Streaming with Python. For each RDD, I call a map,
 i.e., myrdd.map(myfunc), myfunc is in a separate Python module. In yet
 another separate Python module I have a global list, i.e. mylist,
 that's populated with metadata. I can't get myfunc to see mylist...it's
 always empty. Alternatively, I guess I could pass mylist to map.

 Any suggestions?

 Thanks,
 Vadim








Re: Map Question

2015-04-22 Thread Tathagata Das
Yep. Not efficient. Pretty bad actually. That's why broadcast variable were
introduced right at the very beginning of Spark.



On Wed, Apr 22, 2015 at 10:58 AM, Vadim Bichutskiy 
vadim.bichuts...@gmail.com wrote:

 Thanks TD. I was looking into broadcast variables.

 Right now I am running it locally...and I plan to move it to production
 on EC2.

 The way I fixed it is by doing myrdd.map(lambda x: (x,
 mylist)).map(myfunc) but I don't think it's efficient?

 mylist is filled only once at the start and never changes.

 Vadim
 ᐧ

 On Wed, Apr 22, 2015 at 1:42 PM, Tathagata Das t...@databricks.com
 wrote:

 Is the mylist present on every executor? If not, then you have to pass it
 on. And broadcasts are the best way to pass them on. But note that once
 broadcasted it will immutable at the executors, and if you update the list
 at the driver, you will have to broadcast it again.

 TD

 On Wed, Apr 22, 2015 at 9:28 AM, Vadim Bichutskiy 
 vadim.bichuts...@gmail.com wrote:

 I am using Spark Streaming with Python. For each RDD, I call a map,
 i.e., myrdd.map(myfunc), myfunc is in a separate Python module. In yet
 another separate Python module I have a global list, i.e. mylist,
 that's populated with metadata. I can't get myfunc to see mylist...it's
 always empty. Alternatively, I guess I could pass mylist to map.

 Any suggestions?

 Thanks,
 Vadim






Re: ElasticSearch for Spark times out

2015-04-22 Thread Jeetendra Gangele
will you be able to paste the code?

On 23 April 2015 at 00:19, Adrian Mocanu amoc...@verticalscope.com wrote:

  Hi



 I use the ElasticSearch package for Spark and very often it times out
 reading data from ES into an RDD.

 How can I keep the connection alive (why doesn't it? Bug?)



 Here's the exception I get:

 org.elasticsearch.hadoop.serialization.EsHadoopSerializationException:
 java.net.SocketTimeoutException: Read timed out

 at
 org.elasticsearch.hadoop.serialization.json.JacksonJsonParser.nextToken(JacksonJsonParser.java:86)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

 at
 org.elasticsearch.hadoop.serialization.ParsingUtils.doSeekToken(ParsingUtils.java:70)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

 at
 org.elasticsearch.hadoop.serialization.ParsingUtils.seek(ParsingUtils.java:58)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

 at
 org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:149)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

 at
 org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:102)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

 at
 org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:81)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

 at
 org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:314)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

 at
 org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

 at
 org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:46)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

 at
 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 ~[scala-library.jar:na]

 at
 scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 ~[scala-library.jar:na]

 at
 scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
 ~[scala-library.jar:na]

 at
 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 ~[scala-library.jar:na]

 at
 scala.collection.Iterator$class.foreach(Iterator.scala:727)
 ~[scala-library.jar:na]

 at
 scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 ~[scala-library.jar:na]

 at
 org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65)
 ~[spark-core_2.10-1.1.0.jar:1.1.0]

 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 ~[spark-core_2.10-1.1.0.jar:1.1.0]

 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 ~[spark-core_2.10-1.1.0.jar:1.1.0]

 at org.apache.spark.scheduler.Task.run(Task.scala:54)
 ~[spark-core_2.10-1.1.0.jar:1.1.0]

 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
 ~[spark-core_2.10-1.1.0.jar:1.1.0]

 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 [na:1.7.0_75]

 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 [na:1.7.0_75]

 at java.lang.Thread.run(Thread.java:745) [na:1.7.0_75]

 Caused by: java.net.SocketTimeoutException: Read timed out

 at java.net.SocketInputStream.socketRead0(Native Method)
 ~[na:1.7.0_75]

 at
 java.net.SocketInputStream.read(SocketInputStream.java:152) ~[na:1.7.0_75]

 at
 java.net.SocketInputStream.read(SocketInputStream.java:122) ~[na:1.7.0_75]

 at
 java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
 ~[na:1.7.0_75]

 at
 java.io.BufferedInputStream.read(BufferedInputStream.java:334)
 ~[na:1.7.0_75]

 at
 org.apache.commons.httpclient.WireLogInputStream.read(WireLogInputStream.java:69)
 ~[commons-httpclient-3.1.jar:na]

 at
 org.apache.commons.httpclient.ContentLengthInputStream.read(ContentLengthInputStream.java:170)
 ~[commons-httpclient-3.1.jar:na]

 at
 java.io.FilterInputStream.read(FilterInputStream.java:133) ~[na:1.7.0_75]

 at
 org.apache.commons.httpclient.AutoCloseInputStream.read(AutoCloseInputStream.java:108)
 ~[commons-httpclient-3.1.jar:na]

 at
 org.elasticsearch.hadoop.rest.DelegatingInputStream.read(DelegatingInputStream.java:57)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

 at
 org.codehaus.jackson.impl.Utf8StreamParser.loadMore(Utf8StreamParser.java:172)
 ~[jackson-core-asl-1.9.11.jar:1.9.11]

 at
 org.codehaus.jackson.impl.Utf8StreamParser.parseEscapedFieldName(Utf8StreamParser.java:1502)
 

RE: ElasticSearch for Spark times out

2015-04-22 Thread Adrian Mocanu
Hi
The gist of it is this:
I have data indexed into ES. Each index stores monthly data and the query will 
get data for some date range (across several ES indexes or within 1 if the date 
range is within 1 month). Then I merge these RDDs into an uberRdd and performs 
some operations then print the result with for each.

The simplified code is below.

{
// esRdds: List[RDD] contains mentions count per post
val esRdds = (startDate.getYear to endDate.getYear).flatMap { year =
  val sMonth = if (year == startDate.getYear) startDate.getMonthOfYear else 
1
  val eMonth = if (year == endDate.getYear) endDate.getMonthOfYear else 12
  (sMonth to eMonth).map { i =
sc.esRDD(s$year-${i.formatted(%02d)}_nlpindex/nlp, 
ESQueries.generateQueryString(Some(startDate), Some(endDate), mentionsToFind, 
siteNames))
  .map { case (str, map) = unwrapAndCountMentionsPerPost(map)}
  }
}

var uberRdd = esRdds(0)
for (rdd - esRdds) {
  uberRdd = uberRdd ++ rdd
}

   uberRdd.map joinforeach(x = println(x))
  }

From: Jeetendra Gangele [mailto:gangele...@gmail.com]
Sent: April 22, 2015 2:52 PM
To: Adrian Mocanu
Cc: u...@spark.incubator.apache.org
Subject: Re: ElasticSearch for Spark times out

will you be able to paste the code?

On 23 April 2015 at 00:19, Adrian Mocanu 
amoc...@verticalscope.commailto:amoc...@verticalscope.com wrote:
Hi

I use the ElasticSearch package for Spark and very often it times out reading 
data from ES into an RDD.
How can I keep the connection alive (why doesn't it? Bug?)

Here's the exception I get:
org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: 
java.net.SocketTimeoutException: Read timed out
at 
org.elasticsearch.hadoop.serialization.json.JacksonJsonParser.nextToken(JacksonJsonParser.java:86)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ParsingUtils.doSeekToken(ParsingUtils.java:70)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ParsingUtils.seek(ParsingUtils.java:58) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:149)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:102) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:81) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:314) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:46)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
~[scala-library.jar:na]
at 
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
~[scala-library.jar:na]
at 
scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) 
~[scala-library.jar:na]
at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
~[scala-library.jar:na]
at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
~[scala-library.jar:na]
at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
~[scala-library.jar:na]
at 
org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65)
 ~[spark-core_2.10-1.1.0.jar:1.1.0]
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) 
~[spark-core_2.10-1.1.0.jar:1.1.0]
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
~[spark-core_2.10-1.1.0.jar:1.1.0]
at org.apache.spark.scheduler.Task.run(Task.scala:54) 
~[spark-core_2.10-1.1.0.jar:1.1.0]
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) 
~[spark-core_2.10-1.1.0.jar:1.1.0]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
[na:1.7.0_75]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
[na:1.7.0_75]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_75]
Caused by: java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method) 
~[na:1.7.0_75]
at java.net.SocketInputStream.read(SocketInputStream.java:152) 
~[na:1.7.0_75]
at 

Re: ElasticSearch for Spark times out

2015-04-22 Thread Jeetendra Gangele
Basically ready timeout means hat no data arrived within the specified
receive timeout period.
Few thing I would suggest
1.are your ES cluster Up and running?
2. if 1 is yes then reduce the size of the Index make it few kbps and then
test?

On 23 April 2015 at 00:19, Adrian Mocanu amoc...@verticalscope.com wrote:

  Hi



 I use the ElasticSearch package for Spark and very often it times out
 reading data from ES into an RDD.

 How can I keep the connection alive (why doesn't it? Bug?)



 Here's the exception I get:

 org.elasticsearch.hadoop.serialization.EsHadoopSerializationException:
 java.net.SocketTimeoutException: Read timed out

 at
 org.elasticsearch.hadoop.serialization.json.JacksonJsonParser.nextToken(JacksonJsonParser.java:86)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

 at
 org.elasticsearch.hadoop.serialization.ParsingUtils.doSeekToken(ParsingUtils.java:70)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

 at
 org.elasticsearch.hadoop.serialization.ParsingUtils.seek(ParsingUtils.java:58)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

 at
 org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:149)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

 at
 org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:102)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

 at
 org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:81)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

 at
 org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:314)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

 at
 org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

 at
 org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:46)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

 at
 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 ~[scala-library.jar:na]

 at
 scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 ~[scala-library.jar:na]

 at
 scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
 ~[scala-library.jar:na]

 at
 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 ~[scala-library.jar:na]

 at
 scala.collection.Iterator$class.foreach(Iterator.scala:727)
 ~[scala-library.jar:na]

 at
 scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 ~[scala-library.jar:na]

 at
 org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65)
 ~[spark-core_2.10-1.1.0.jar:1.1.0]

 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 ~[spark-core_2.10-1.1.0.jar:1.1.0]

 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 ~[spark-core_2.10-1.1.0.jar:1.1.0]

 at org.apache.spark.scheduler.Task.run(Task.scala:54)
 ~[spark-core_2.10-1.1.0.jar:1.1.0]

 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
 ~[spark-core_2.10-1.1.0.jar:1.1.0]

 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 [na:1.7.0_75]

 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 [na:1.7.0_75]

 at java.lang.Thread.run(Thread.java:745) [na:1.7.0_75]

 Caused by: java.net.SocketTimeoutException: Read timed out

 at java.net.SocketInputStream.socketRead0(Native Method)
 ~[na:1.7.0_75]

 at
 java.net.SocketInputStream.read(SocketInputStream.java:152) ~[na:1.7.0_75]

 at
 java.net.SocketInputStream.read(SocketInputStream.java:122) ~[na:1.7.0_75]

 at
 java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
 ~[na:1.7.0_75]

 at
 java.io.BufferedInputStream.read(BufferedInputStream.java:334)
 ~[na:1.7.0_75]

 at
 org.apache.commons.httpclient.WireLogInputStream.read(WireLogInputStream.java:69)
 ~[commons-httpclient-3.1.jar:na]

 at
 org.apache.commons.httpclient.ContentLengthInputStream.read(ContentLengthInputStream.java:170)
 ~[commons-httpclient-3.1.jar:na]

 at
 java.io.FilterInputStream.read(FilterInputStream.java:133) ~[na:1.7.0_75]

 at
 org.apache.commons.httpclient.AutoCloseInputStream.read(AutoCloseInputStream.java:108)
 ~[commons-httpclient-3.1.jar:na]

 at
 org.elasticsearch.hadoop.rest.DelegatingInputStream.read(DelegatingInputStream.java:57)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

 at
 

Re: Spark Streaming updatyeStateByKey throws OutOfMemory Error

2015-04-22 Thread Tathagata Das
It could very well be that your executor memory is not enough to store the
state RDDs AND operate on the data. 1G per executor is quite low.
Definitely give more memory. And have you tried increasing the number of
partitions (specify number of partitions in updateStateByKey) ?

On Wed, Apr 22, 2015 at 2:34 AM, Sourav Chandra 
sourav.chan...@livestream.com wrote:

 Anyone?

 On Wed, Apr 22, 2015 at 12:29 PM, Sourav Chandra 
 sourav.chan...@livestream.com wrote:

  Hi Olivier,
 
  *the update function is as below*:
 
  *val updateFunc = (values: Seq[IConcurrentUsers], state: Option[(Long,
  Long)]) = {*
  *  val previousCount = state.getOrElse((0L, 0L))._2*
  *  var startValue: IConcurrentUsers = ConcurrentViewers(0)*
  *  var currentCount = 0L*
  *  val lastIndexOfConcurrentUsers =*
  *values.lastIndexWhere(_.isInstanceOf[ConcurrentViewers])*
  *  val subList = values.slice(0, lastIndexOfConcurrentUsers)*
  *  val currentCountFromSubList = subList.foldLeft(startValue)(_ op
  _).count + previousCount*
  *  val lastConcurrentViewersCount =
  values(lastIndexOfConcurrentUsers).count*
 
  *  if (math.abs(lastConcurrentViewersCount - currentCountFromSubList)
  = 1) {*
  *logger.error(*
  *  sCount using state updation $currentCountFromSubList,  +*
  *sConcurrentUsers count $lastConcurrentViewersCount +*
  *s resetting to $lastConcurrentViewersCount*
  *)*
  *currentCount = lastConcurrentViewersCount*
  *  }*
  *  val remainingValuesList = values.diff(subList)*
  *  startValue = ConcurrentViewers(currentCount)*
  *  currentCount = remainingValuesList.foldLeft(startValue)(_ op
  _).count*
 
  *  if (currentCount  0) {*
 
  *logger.error(*
  *  sERROR: Got new count $currentCount  0, value:$values,
  state:$state, resetting to 0*
  *)*
  *currentCount = 0*
  *  }*
  *  // to stop pushing subsequent 0 after receiving first 0*
  *  if (currentCount == 0  previousCount == 0) None*
  *  else Some(previousCount, currentCount)*
  *}*
 
  *trait IConcurrentUsers {*
  *  val count: Long*
  *  def op(a: IConcurrentUsers): IConcurrentUsers =
  IConcurrentUsers.op(this, a)*
  *}*
 
  *object IConcurrentUsers {*
  *  def op(a: IConcurrentUsers, b: IConcurrentUsers): IConcurrentUsers =
  (a, b) match {*
  *case (_, _: ConcurrentViewers) = *
  *  ConcurrentViewers(b.count)*
  *case (_: ConcurrentViewers, _: IncrementConcurrentViewers) = *
  *  ConcurrentViewers(a.count + b.count)*
  *case (_: ConcurrentViewers, _: DecrementConcurrentViewers) = *
  *  ConcurrentViewers(a.count - b.count)*
  *  }*
  *}*
 
  *case class IncrementConcurrentViewers(count: Long) extends
  IConcurrentUsers*
  *case class DecrementConcurrentViewers(count: Long) extends
  IConcurrentUsers*
  *case class ConcurrentViewers(count: Long) extends IConcurrentUsers*
 
 
  *also the error stack trace copied from executor logs is:*
 
  *java.lang.OutOfMemoryError: Java heap space*
  *at
 
 org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183)*
  *at
  org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2564)*
  *at
  org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285)*
  *at
  org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77)*
  *at
 
 org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43)*
  *at
 org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)*
  *at
 
 org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)*
  *at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)*
  *at
 
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)*
  *at
 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
  *at java.lang.reflect.Method.invoke(Method.java:601)*
  *at
  java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004)*
  *at
  java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866)*
  *at
 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)*
  *at
  java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)*
  *at
  java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)*
  *at
 
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)*
  *at
 
 org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:236)*
  *at
 
 org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readObject$1.apply$mcV$sp(TorrentBroadcast.scala:169)*
  *at
 org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)*
  *at
 
 

ElasticSearch for Spark times out

2015-04-22 Thread Adrian Mocanu
Hi

I use the ElasticSearch package for Spark and very often it times out reading 
data from ES into an RDD.
How can I keep the connection alive (why doesn't it? Bug?)

Here's the exception I get:
org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: 
java.net.SocketTimeoutException: Read timed out
at 
org.elasticsearch.hadoop.serialization.json.JacksonJsonParser.nextToken(JacksonJsonParser.java:86)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ParsingUtils.doSeekToken(ParsingUtils.java:70)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ParsingUtils.seek(ParsingUtils.java:58) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:149)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:102) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:81) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:314) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:46)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
~[scala-library.jar:na]
at 
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
~[scala-library.jar:na]
at 
scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) 
~[scala-library.jar:na]
at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
~[scala-library.jar:na]
at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
~[scala-library.jar:na]
at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
~[scala-library.jar:na]
at 
org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65)
 ~[spark-core_2.10-1.1.0.jar:1.1.0]
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) 
~[spark-core_2.10-1.1.0.jar:1.1.0]
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
~[spark-core_2.10-1.1.0.jar:1.1.0]
at org.apache.spark.scheduler.Task.run(Task.scala:54) 
~[spark-core_2.10-1.1.0.jar:1.1.0]
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) 
~[spark-core_2.10-1.1.0.jar:1.1.0]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
[na:1.7.0_75]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
[na:1.7.0_75]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_75]
Caused by: java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method) 
~[na:1.7.0_75]
at java.net.SocketInputStream.read(SocketInputStream.java:152) 
~[na:1.7.0_75]
at java.net.SocketInputStream.read(SocketInputStream.java:122) 
~[na:1.7.0_75]
at 
java.io.BufferedInputStream.read1(BufferedInputStream.java:273) ~[na:1.7.0_75]
at 
java.io.BufferedInputStream.read(BufferedInputStream.java:334) ~[na:1.7.0_75]
at 
org.apache.commons.httpclient.WireLogInputStream.read(WireLogInputStream.java:69)
 ~[commons-httpclient-3.1.jar:na]
at 
org.apache.commons.httpclient.ContentLengthInputStream.read(ContentLengthInputStream.java:170)
 ~[commons-httpclient-3.1.jar:na]
at java.io.FilterInputStream.read(FilterInputStream.java:133) 
~[na:1.7.0_75]
at 
org.apache.commons.httpclient.AutoCloseInputStream.read(AutoCloseInputStream.java:108)
 ~[commons-httpclient-3.1.jar:na]
at 
org.elasticsearch.hadoop.rest.DelegatingInputStream.read(DelegatingInputStream.java:57)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.codehaus.jackson.impl.Utf8StreamParser.loadMore(Utf8StreamParser.java:172) 
~[jackson-core-asl-1.9.11.jar:1.9.11]
at 
org.codehaus.jackson.impl.Utf8StreamParser.parseEscapedFieldName(Utf8StreamParser.java:1502)
 ~[jackson-core-asl-1.9.11.jar:1.9.11]
at 
org.codehaus.jackson.impl.Utf8StreamParser.slowParseFieldName(Utf8StreamParser.java:1404)
 ~[jackson-core-asl-1.9.11.jar:1.9.11]
at 

Re: Efficient saveAsTextFile by key, directory for each key?

2015-04-22 Thread Arun Luthra
I ended up post-processing the result in hive with a dynamic partition
insert query to get a table partitioned by the key.

Looking further, it seems that 'dynamic partition' insert is in Spark SQL
and working well in Spark SQL versions  1.2.0:
https://issues.apache.org/jira/browse/SPARK-3007

On Tue, Apr 21, 2015 at 5:45 PM, Arun Luthra arun.lut...@gmail.com wrote:

 Is there an efficient way to save an RDD with saveAsTextFile in such a way
 that the data gets shuffled into separated directories according to a key?
 (My end goal is to wrap the result in a multi-partitioned Hive table)

 Suppose you have:

 case class MyData(val0: Int, val1: string, directory_name: String)

 and an RDD called myrdd with type RDD[MyData]. Suppose that you already
 have an array of the distinct directory_name's, called distinct_directories.

 A very inefficient way to to this is:

 distinct_directories.foreach(
   dir_name = myrdd.filter( mydata = mydata.directory_name == dir_name )
 .map( mydata = Array(mydata.val0.toString, mydata.val1).mkString(,))
 .coalesce(5)
 .saveAsTextFile(base_dir_name/ + f$dir_name)
 )

 I tried this solution, and it does not do the multiple myrdd.filter's in
 parallel.

 I'm guessing partitionBy might be in the efficient solution if an easy
 efficient solution exists...

 Thanks,
 Arun



Re: spark 1.3.1 : unable to access s3n:// urls (no file system for scheme s3n:)

2015-04-22 Thread Sujee Maniyam
Thanks all...

btw, s3n load works without any issues with  spark-1.3.1-bulit-for-hadoop
2.4

I tried this on 1.3.1-hadoop26
  sc.hadoopConfiguration.set(fs.s3n.impl,
org.apache.hadoop.fs.s3native.NativeS3FileSystem)
 val f = sc.textFile(s3n://bucket/file)
 f.count

No it can't find the implementation path.  Looks like some jar is missing ?

java.lang.RuntimeException: java.lang.ClassNotFoundException: Class
org.apache.hadoop.fs.s3native.NativeS3FileSystem not found
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2074)
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2578)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)


Sujee Maniyam (http://sujee.net | http://www.linkedin.com/in/sujeemaniyam )

On Wed, Apr 22, 2015 at 9:49 AM, Shuai Zheng szheng.c...@gmail.com wrote:

 Below is my code to access s3n without problem (only for 1.3.1. there is a
 bug in 1.3.0).



   Configuration hadoopConf = ctx.hadoopConfiguration();

   hadoopConf.set(fs.s3n.impl,
 org.apache.hadoop.fs.s3native.NativeS3FileSystem);

   hadoopConf.set(fs.s3n.awsAccessKeyId, awsAccessKeyId);

   hadoopConf.set(fs.s3n.awsSecretAccessKey,
 awsSecretAccessKey);



 Regards,



 Shuai



 *From:* Sujee Maniyam [mailto:su...@sujee.net]
 *Sent:* Wednesday, April 22, 2015 12:45 PM
 *To:* Spark User List
 *Subject:* spark 1.3.1 : unable to access s3n:// urls (no file system for
 scheme s3n:)



 Hi all

 I am unable to access s3n://  urls using   sc.textFile().. getting 'no
 file system for scheme s3n://'  error.



 a bug or some conf settings missing?



 See below for details:



 env variables :

 AWS_SECRET_ACCESS_KEY=set

 AWS_ACCESS_KEY_ID=set



 spark/RELAESE :

 Spark 1.3.1 (git revision 908a0bf) built for Hadoop 2.6.0

 Build flags: -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive
 -Phive-thriftserver -Pyarn -DzincPort=3034





 ./bin/spark-shell

  val f = sc.textFile(s3n://bucket/file)

  f.count



 error==

 java.io.IOException: No FileSystem for scheme: s3n

 at
 org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)

 at
 org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)

 at
 org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)

 at
 org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)

 at
 org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)

 at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)

 at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)

 at
 org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256)

 at
 org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)

 at
 org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)

 at
 org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)

 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)

 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)

 at scala.Option.getOrElse(Option.scala:120)

 at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)

 at
 org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)

 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)

 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)

 at scala.Option.getOrElse(Option.scala:120)

 at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)

 at
 org.apache.spark.SparkContext.runJob(SparkContext.scala:1512)

 at org.apache.spark.rdd.RDD.count(RDD.scala:1006)

 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:24)

 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:29)

 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:31)

 at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:33)

 at $iwC$$iwC$$iwC$$iwC.init(console:35)

 at $iwC$$iwC$$iwC.init(console:37)

 at $iwC$$iwC.init(console:39)

 at $iwC.init(console:41)

 at init(console:43)

 at .init(console:47)

 at .clinit(console)

 at .init(console:7)

 at .clinit(console)

 at $print(console)

 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

 at java.lang.reflect.Method.invoke(Method.java:606)

 at
 

Spark SQL: SchemaRDD, DataFrame. Multi-value, Nested attributes

2015-04-22 Thread Eugene Morozov
Hi!

I’m trying to query a dataset that reads data from csv and provides a SQL on 
top of it. The problem I have is I have a hierarchy of objects that I need to 
represent as a table so that users might use SQL to query it and do some 
aggregations. I do have multi value attributes (that in csv file looks like 
column_1, column_2, …, column_n) and I do have particular entities that split 
into several columns, like an Address (city, street, etc). And each row (let’s 
say it represents a Person) might have several Addresses. 

It’s pretty clear that it’s not simple to flatten everything into one long list 
of columns as I would be able to find some weird stuff by doing that. So my 
question is the following: 
1. Does SchemaRDD support something like multi value attributes? It might look 
like and array of values that lives in just one 
column. Although it’s not clear how I’d aggregate over it. May be there is some 
custom type API I can utilise?
2. Does newly supported DataFrame provides something in this regard? My 
understanding is that columns in DataFrame do need to be actual columns (as in 
a relation), but they may be different types (like arrays or objects). May be 
implementation of DataFrame itself provides some sort of custom types or smth 
pluggable that I might consider.

Any clue would be really appreciated.
Thanks

--
Eugene Morozov
fathers...@list.ru






RE: Scheduling across applications - Need suggestion

2015-04-22 Thread yana
Yes. Fair schedulwr only helps concurrency within an application.  With 
multiple shells you'd either need something like Yarn/Mesos or careful math on 
resources as you said


Sent on the new Sprint Network from my Samsung Galaxy S®4.

div Original message /divdivFrom: Arun Patel 
arunp.bigd...@gmail.com /divdivDate:04/22/2015  6:28 AM  (GMT-05:00) 
/divdivTo: user user@spark.apache.org /divdivSubject: Scheduling 
across applications - Need suggestion /divdiv
/divI believe we can use the properties like --executor-memory  
--total-executor-cores to configure the resources allocated for each 
application.  But, in a multi user environment, shells and applications are 
being submitted by multiple users at the same time.  All users are requesting 
resources with different properties.  At times, some users are not getting 
resources of the cluster.   



How to control resource usage in this case?  Please share any best practices 
followed. 



As per my understanding, Fair scheduler can used for scheduling tasks within an 
application but not across multiple applications.  Is this correct?



Regards,

Arun

RE: ElasticSearch for Spark times out

2015-04-22 Thread Adrian Mocanu
Hi

Thanks for the help. My ES is up.
Out of curiosity, do you know what the timeout value is? There are probably 
other things happening to cause the timeout; I don't think my ES is that slow 
but it's possible that ES is taking too long to find the data. What I see 
happening is that it uses scroll to get the data from ES; about 150 items at a 
time. Usual delay when I perform the same query from a browser plugin ranges 
from 1-5sec.

Thanks

From: Jeetendra Gangele [mailto:gangele...@gmail.com]
Sent: April 22, 2015 3:09 PM
To: Adrian Mocanu
Cc: u...@spark.incubator.apache.org
Subject: Re: ElasticSearch for Spark times out

Basically ready timeout means hat no data arrived within the specified receive 
timeout period.
Few thing I would suggest
1.are your ES cluster Up and running?
2. if 1 is yes then reduce the size of the Index make it few kbps and then test?

On 23 April 2015 at 00:19, Adrian Mocanu 
amoc...@verticalscope.commailto:amoc...@verticalscope.com wrote:
Hi

I use the ElasticSearch package for Spark and very often it times out reading 
data from ES into an RDD.
How can I keep the connection alive (why doesn't it? Bug?)

Here's the exception I get:
org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: 
java.net.SocketTimeoutException: Read timed out
at 
org.elasticsearch.hadoop.serialization.json.JacksonJsonParser.nextToken(JacksonJsonParser.java:86)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ParsingUtils.doSeekToken(ParsingUtils.java:70)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ParsingUtils.seek(ParsingUtils.java:58) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:149)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:102) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:81) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:314) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:46)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
~[scala-library.jar:na]
at 
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
~[scala-library.jar:na]
at 
scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) 
~[scala-library.jar:na]
at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
~[scala-library.jar:na]
at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
~[scala-library.jar:na]
at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
~[scala-library.jar:na]
at 
org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65)
 ~[spark-core_2.10-1.1.0.jar:1.1.0]
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) 
~[spark-core_2.10-1.1.0.jar:1.1.0]
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
~[spark-core_2.10-1.1.0.jar:1.1.0]
at org.apache.spark.scheduler.Task.run(Task.scala:54) 
~[spark-core_2.10-1.1.0.jar:1.1.0]
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) 
~[spark-core_2.10-1.1.0.jar:1.1.0]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
[na:1.7.0_75]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
[na:1.7.0_75]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_75]
Caused by: java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method) 
~[na:1.7.0_75]
at java.net.SocketInputStream.read(SocketInputStream.java:152) 
~[na:1.7.0_75]
at java.net.SocketInputStream.read(SocketInputStream.java:122) 
~[na:1.7.0_75]
at 
java.io.BufferedInputStream.read1(BufferedInputStream.java:273) ~[na:1.7.0_75]
at 
java.io.BufferedInputStream.read(BufferedInputStream.java:334) ~[na:1.7.0_75]
at 
org.apache.commons.httpclient.WireLogInputStream.read(WireLogInputStream.java:69)
 ~[commons-httpclient-3.1.jar:na]
at 

Re: Task not Serializable: Graph is unexpectedly null when DStream is being serialized

2015-04-22 Thread Jean-Pascal Billaud
I have now a fair understanding of the situation after looking at javap
output. So as a reminder:

dstream.map(tuple = {
val w = StreamState.fetch[K,W](state.prefixKey, tuple._1)
(tuple._1, (tuple._2, w)) })

And StreamState being a very simple standalone object:

object StreamState {
  def fetch[K : ClassTag : Ordering, V : ClassTag](prefixKey: String, key:
K) : Option[V] = None
}

Basically the serialization failed because the ClassTag[K] came from the
enclosing class, in which the dstream.map() code is running e.g. :

class A[K : ClassTag](val dstream: DStream[K]) {
  [...]
  def fun =
dstream.map(tuple = {
  val w = StreamState.fetch[K,W](state.prefixKey, tuple._1)
  (tuple._1, (tuple._2, w)) })
   }

therefore the instance of class A is being serialized and it fails when the
dstream field call writeObject() when it checks for the graph field...

The fact that graph is not set might be expected given that I have not
started the context yet...

Cheers,


On Tue, Apr 21, 2015 at 6:17 PM, Tathagata Das t...@databricks.com wrote:

 It is kind of unexpected, i can imagine a real scenario under which it
 should trigger. But obviously I am missing something :)

 TD

 On Tue, Apr 21, 2015 at 4:23 PM, Jean-Pascal Billaud j...@tellapart.com
 wrote:

 Sure. But in general, I am assuming this Graph is unexpectedly null
 when DStream is being serialized must mean something. Under which
 circumstances, such an exception would trigger?

 On Tue, Apr 21, 2015 at 4:12 PM, Tathagata Das t...@databricks.com
 wrote:

 Yeah, I am not sure what is going on. The only way to figure to take a
 look at the disassembled bytecodes using javap.

 TD

 On Tue, Apr 21, 2015 at 1:53 PM, Jean-Pascal Billaud j...@tellapart.com
 wrote:

 At this point I am assuming that nobody has an idea... I am still going
 to give it a last shot just in case it was missed by some people :)

 Thanks,

 On Mon, Apr 20, 2015 at 2:20 PM, Jean-Pascal Billaud j...@tellapart.com
 wrote:

 Hey, so I start the context at the very end when all the piping is
 done. BTW a foreachRDD will be called on the resulting dstream.map() right
 after that.

 The puzzling thing is why removing the context bounds solve the
 problem... What does this exception mean in general?

 On Mon, Apr 20, 2015 at 1:33 PM, Tathagata Das t...@databricks.com
 wrote:

 When are you getting this exception? After starting the context?

 TD

 On Mon, Apr 20, 2015 at 10:44 AM, Jean-Pascal Billaud 
 j...@tellapart.com wrote:

 Hi,

 I am getting this serialization exception and I am not too sure what
 Graph is unexpectedly null when DStream is being serialized means?

 15/04/20 06:12:38 INFO yarn.ApplicationMaster: Final app status:
 FAILED, exitCode: 15, (reason: User class threw exception: Task not
 serializable)
 Exception in thread Driver org.apache.spark.SparkException: Task
 not serializable
 at org.apache.spark.util.ClosureCleaner$.ensureSerializable(
 ClosureCleaner.scala:166)
 at org.apache.spark.util.ClosureCleaner$.clean(
 ClosureCleaner.scala:158)
 at org.apache.spark.SparkContext.
 clean(SparkContext.scala:1435)
 at org.apache.spark.streaming.dstream.DStream.map(DStream.
 scala:438)
 [...]
 Caused by: java.io.NotSerializableException: Graph is unexpectedly
 null when DStream is being serialized.
 at org.apache.spark.streaming.dstream.DStream$anonfun$
 writeObject$1.apply$mcV$sp(DStream.scala:420)
 at org.apache.spark.util.Utils$.
 tryOrIOException(Utils.scala:985)
 at org.apache.spark.streaming.dstream.DStream.writeObject(
 DStream.scala:403)

 The operation comes down to something like this:

 dstream.map(tuple = {
 val w = StreamState.fetch[K,W](state.prefixKey, tuple._1)
 (tuple._1, (tuple._2, w)) })

 And StreamState being a very simple standalone object:

 object StreamState {
   def fetch[K : ClassTag : Ordering, V : ClassTag](prefixKey:
 String, key: K) : Option[V] = None
 }

 However if I remove the context bounds from K in fetch e.g. removing
 ClassTag and Ordering then everything is fine.

 If anyone has some pointers, I'd really appreciate it.

 Thanks,










Re: Map Question

2015-04-22 Thread Tathagata Das
Can you give full code? especially the myfunc?

On Wed, Apr 22, 2015 at 2:20 PM, Vadim Bichutskiy 
vadim.bichuts...@gmail.com wrote:

 Here's what I did:

 print 'BROADCASTING...'
 broadcastVar = sc.broadcast(mylist)
 print broadcastVar
 print broadcastVar.value
 print 'FINISHED BROADCASTING...'

 The above works fine,

 but when I call myrdd.map(myfunc) I get *NameError: global name
 'broadcastVar' is not defined*

 The myfunc function is in a different module. How do I make it aware of
 broadcastVar?
 ᐧ

 On Wed, Apr 22, 2015 at 2:13 PM, Vadim Bichutskiy 
 vadim.bichuts...@gmail.com wrote:

 Great. Will try to modify the code. Always room to optimize!
 ᐧ

 On Wed, Apr 22, 2015 at 2:11 PM, Tathagata Das t...@databricks.com
 wrote:

 Absolutely. The same code would work for local as well as distributed
 mode!

 On Wed, Apr 22, 2015 at 11:08 AM, Vadim Bichutskiy 
 vadim.bichuts...@gmail.com wrote:

 Can I use broadcast vars in local mode?
 ᐧ

 On Wed, Apr 22, 2015 at 2:06 PM, Tathagata Das t...@databricks.com
 wrote:

 Yep. Not efficient. Pretty bad actually. That's why broadcast variable
 were introduced right at the very beginning of Spark.



 On Wed, Apr 22, 2015 at 10:58 AM, Vadim Bichutskiy 
 vadim.bichuts...@gmail.com wrote:

 Thanks TD. I was looking into broadcast variables.

 Right now I am running it locally...and I plan to move it to
 production on EC2.

 The way I fixed it is by doing myrdd.map(lambda x: (x,
 mylist)).map(myfunc) but I don't think it's efficient?

 mylist is filled only once at the start and never changes.

 Vadim
 ᐧ

 On Wed, Apr 22, 2015 at 1:42 PM, Tathagata Das t...@databricks.com
 wrote:

 Is the mylist present on every executor? If not, then you have to
 pass it on. And broadcasts are the best way to pass them on. But note 
 that
 once broadcasted it will immutable at the executors, and if you update 
 the
 list at the driver, you will have to broadcast it again.

 TD

 On Wed, Apr 22, 2015 at 9:28 AM, Vadim Bichutskiy 
 vadim.bichuts...@gmail.com wrote:

 I am using Spark Streaming with Python. For each RDD, I call a map,
 i.e., myrdd.map(myfunc), myfunc is in a separate Python module. In yet
 another separate Python module I have a global list, i.e. mylist,
 that's populated with metadata. I can't get myfunc to see mylist...it's
 always empty. Alternatively, I guess I could pass mylist to map.

 Any suggestions?

 Thanks,
 Vadim











Re: RE: ElasticSearch for Spark times out

2015-04-22 Thread Otis Gospodnetic
Hi,

If you get ES response back in 1-5 seconds that's pretty slow.  Are these
ES aggregation queries?  Costin may be right about GC possibly causing
timeouts.  SPM http://sematext.com/spm/ can give you all Spark and all
key Elasticsearch metrics, including various JVM metrics.  If the problem
is GC, you'll see it.  If you monitor both Spark side and ES side, you
should be able to find some correlation with SPM.

Otis
--
Monitoring * Alerting * Anomaly Detection * Centralized Log Management
Solr  Elasticsearch Support * http://sematext.com/


On Wed, Apr 22, 2015 at 5:43 PM, Costin Leau costin.l...@gmail.com wrote:

 Hi,

 First off, for Elasticsearch questions is worth pinging the Elastic
 mailing list as that is closer monitored than this one.

 Back to your question, Jeetendra is right that the exception indicates
 nodata is flowing back to the es-connector and
 Spark.
 The default is 1m [1] which should be more than enough for a typical
 scenario. As a side note the scroll size is 50 per
 tasks
 (so 150 suggests 3 tasks).

 Once the query is made, scrolling the document is fast - likely there's
 something else at hand that causes the
 connection to timeout.
 In such cases, you can enable logging on the REST package and see what
 type of data transfer occurs between ES and Spark.

 Do note that if a GC occurs, that can freeze Elastic (or Spark) which
 might trigger the timeout. Consider monitoring
 Elasticsearch during
 the query and see whether anything jumps - in particular the memory
 pressure.

 Hope this helps,

 [1]
 http://www.elastic.co/guide/en/elasticsearch/hadoop/master/configuration.html#_network

 On 4/22/15 10:44 PM, Adrian Mocanu wrote:

 Hi

 Thanks for the help. My ES is up.

 Out of curiosity, do you know what the timeout value is? There are
 probably other things happening to cause the timeout;
 I don’t think my ES is that slow but it’s possible that ES is taking too
 long to find the data. What I see happening is
 that it uses scroll to get the data from ES; about 150 items at a
 time.Usual delay when I perform the same query from a
 browser plugin ranges from 1-5sec.

 Thanks

 *From:*Jeetendra Gangele [mailto:gangele...@gmail.com]
 *Sent:* April 22, 2015 3:09 PM
 *To:* Adrian Mocanu
 *Cc:* u...@spark.incubator.apache.org
 *Subject:* Re: ElasticSearch for Spark times out

 Basically ready timeout means hat no data arrived within the specified
 receive timeout period.

 Few thing I would suggest

 1.are your ES cluster Up and running?

 2. if 1 is yes then reduce the size of the Index make it few kbps and
 then test?

 On 23 April 2015 at 00:19, Adrian Mocanu amoc...@verticalscope.com
 mailto:amoc...@verticalscope.com wrote:

 Hi

 I use the ElasticSearch package for Spark and very often it times out
 reading data from ES into an RDD.

 How can I keep the connection alive (why doesn’t it? Bug?)

 Here’s the exception I get:

 org.elasticsearch.hadoop.serialization.EsHadoopSerializationException:
 java.net.SocketTimeoutException: Read timed out

  at
 org.elasticsearch.hadoop.serialization.json.JacksonJsonParser.nextToken(JacksonJsonParser.java:86)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

  at
 org.elasticsearch.hadoop.serialization.ParsingUtils.doSeekToken(ParsingUtils.java:70)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

  at
 org.elasticsearch.hadoop.serialization.ParsingUtils.seek(ParsingUtils.java:58)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

  at
 org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:149)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

  at
 org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:102)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

  at
 org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:81)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

  at
 org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:314)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

  at
 org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

  at
 org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:46)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

  at
 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 ~[scala-library.jar:na]

  at
 scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 ~[scala-library.jar:na]

  at
 scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
 ~[scala-library.jar:na]

  at
 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 ~[scala-library.jar:na]

  at
 scala.collection.Iterator$class.foreach(Iterator.scala:727)
 

Re: StackOverflow Error when run ALS with 100 iterations

2015-04-22 Thread amghost
Hi, would you please how to checkpoint the training set rdd since all things
are done in ALS.train method.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/StackOverflow-Error-when-run-ALS-with-100-iterations-tp4296p22619.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to access HBase on Spark SQL

2015-04-22 Thread doovsaid
I notice that databricks provides external datasource api for Spark SQL. But I 
can't find any reference documents to guide how to access HBase based on it 
directly. 
Who know it? Or please give me some related links. Thanks.


ZhangYi (张逸)
BigEye 
website: http://www.bigeyedata.com
blog: http://zhangyi.farbox.com
tel: 15023157626





Re: Parquet Hive table become very slow on 1.3?

2015-04-22 Thread Rex Xiong
Yin,

Thanks for your reply.
We already patched this PR to our 1.3.0
As Xudong mentioned, we have thousand of parquet files, it's very very slow
in first read, and another app will add more files and refresh table
regularly.
Cheng Lian's PR-5334 seems can resolve this issue, it will skip read all
footer if we set auto merge to false.
But it's not done yet.

Thanks

2015-04-22 23:10 GMT+08:00 Yin Huai yh...@databricks.com:

 Xudong and Rex,

 Can you try 1.3.1? With PR 5339 http://github.com/apache/spark/pull/5339 ,
 after we get a hive parquet from metastore and convert it to our native
 parquet code path, we will cache the converted relation. For now, the first
 access to that hive parquet table reads all of the footers (when you first
 refer to that table in a query or call
 sqlContext.table(hiveParquetTable)). All of your later accesses will hit
 the metadata cache.

 Thanks,

 Yin

 On Tue, Apr 21, 2015 at 1:13 AM, Rex Xiong bycha...@gmail.com wrote:

 We have the similar issue with massive parquet files, Cheng Lian, could
 you have a look?

 2015-04-08 15:47 GMT+08:00 Zheng, Xudong dong...@gmail.com:

 Hi Cheng,

 I tried both these patches, and seems still not resolve my issue. And I
 found the most time is spend on this line in newParquet.scala:

 ParquetFileReader.readAllFootersInParallel(
   sparkContext.hadoopConfiguration, seqAsJavaList(leaves),
 taskSideMetaData)

 Which need read all the files under the Parquet folder, while our
 Parquet folder has a lot of Parquet files (near 2000), read one file need
 about 2 seconds, so it become very slow ... And the PR 5231 did not skip
 this steps so it not resolve my issue.

 As our Parquet files are generated by a Spark job, so the number of
 .parquet files is same with the number of tasks, that is why we have so
 many files. But these files actually have the same schema. Is there any way
 to merge these files into one, or avoid scan each of them?

 On Sat, Apr 4, 2015 at 9:47 PM, Cheng Lian lian.cs@gmail.com
 wrote:

  Hey Xudong,

 We had been digging this issue for a while, and believe PR 5339
 http://github.com/apache/spark/pull/5339 and PR 5334
 http://github.com/apache/spark/pull/5339 should fix this issue.

 There two problems:

 1. Normally we cache Parquet table metadata for better performance, but
 when converting Hive metastore Hive tables, the cache is not used. Thus
 heavy operations like schema discovery is done every time a metastore
 Parquet table is converted.
 2. With Parquet task side metadata reading (which is turned on by
 default), we can actually skip the row group information in the footer.
 However, we accidentally called a Parquet function which doesn't skip row
 group information.

 For your question about schema merging, Parquet allows different
 part-files have different but compatible schemas. For example,
 part-1.parquet has columns a and b, while part-2.parquet may has
 columns a and c. In some cases, the summary files (_metadata and
 _common_metadata) contains the merged schema (a, b, and c), but it's not
 guaranteed. For example, when the user defined metadata stored different
 part-files contain different values for the same key, Parquet simply gives
 up writing summary files. That's why all part-files must be touched to get
 a precise merged schema.

 However, in scenarios where a centralized arbitrative schema is
 available (e.g. Hive metastore schema, or the schema provided by user via
 data source DDL), we don't need to do schema merging on driver side, but
 defer it to executor side and each task only needs to reconcile those
 part-files it needs to touch. This is also what the Parquet developers did
 recently for parquet-hadoop
 https://github.com/apache/incubator-parquet-mr/pull/45.

 Cheng


 On 3/31/15 11:49 PM, Zheng, Xudong wrote:

 Thanks Cheng!

  Set 'spark.sql.parquet.useDataSourceApi' to false resolves my issues,
 but the PR 5231 seems not. Not sure any other things I did wrong ...

  BTW, actually, we are very interested in the schema merging feature
 in Spark 1.3, so both these two solution will disable this feature, right?
 It seems that Parquet metadata is store in a file named _metadata in the
 Parquet file folder (each folder is a partition as we use partition table),
 why we need scan all Parquet part files? Is there any other solutions could
 keep schema merging feature at the same time? We are really like this
 feature :)

 On Tue, Mar 31, 2015 at 3:19 PM, Cheng Lian lian.cs@gmail.com
 wrote:

  Hi Xudong,

 This is probably because of Parquet schema merging is turned on by
 default. This is generally useful for Parquet files with different but
 compatible schemas. But it needs to read metadata from all Parquet
 part-files. This can be problematic when reading Parquet files with lots 
 of
 part-files, especially when the user doesn't need schema merging.

 This issue is tracked by SPARK-6575, and here is a PR for it:
 https://github.com/apache/spark/pull/5231. This PR adds a
 

Re: Start ThriftServer Error

2015-04-22 Thread Denny Lee
You may need to specify the hive port itself.  For example, my own Thrift
start command is in the form:

./sbin/start-thriftserver.sh --master spark://$myserver:7077
--driver-class-path $CLASSPATH --hiveconf hive.server2.thrift.bind.host
$myserver --hiveconf hive.server2.thrift.port 1

HTH!


On Wed, Apr 22, 2015 at 5:27 AM Yiannis Gkoufas johngou...@gmail.com
wrote:

 Hi Himanshu,

 I am using:

 ./start-thriftserver.sh --master spark://localhost:7077

 Do I need to specify something additional to the command?

 Thanks!

 On 22 April 2015 at 13:14, Himanshu Parashar himanshu.paras...@gmail.com
 wrote:

 what command are you using to start the Thrift server?

 On Wed, Apr 22, 2015 at 3:52 PM, Yiannis Gkoufas johngou...@gmail.com
 wrote:

 Hi all,

 I am trying to start the thriftserver and I get some errors.
 I have hive running and placed hive-site.xml under the conf directory.
 From the logs I can see that the error is:

 Call From localhost to localhost:54310 failed

 I am assuming that it tries to connect to the wrong port for the
 namenode, which in my case its running on 9000 instead of 54310

 Any help would be really appreciated.

 Thanks a lot!




 --
 [HiM]





Re: RE: ElasticSearch for Spark times out

2015-04-22 Thread Costin Leau

Hi,

First off, for Elasticsearch questions is worth pinging the Elastic mailing 
list as that is closer monitored than this one.

Back to your question, Jeetendra is right that the exception indicates nodata 
is flowing back to the es-connector and
Spark.
The default is 1m [1] which should be more than enough for a typical scenario. 
As a side note the scroll size is 50 per
tasks
(so 150 suggests 3 tasks).

Once the query is made, scrolling the document is fast - likely there's 
something else at hand that causes the
connection to timeout.
In such cases, you can enable logging on the REST package and see what type of 
data transfer occurs between ES and Spark.

Do note that if a GC occurs, that can freeze Elastic (or Spark) which might 
trigger the timeout. Consider monitoring
Elasticsearch during
the query and see whether anything jumps - in particular the memory pressure.

Hope this helps,

[1] 
http://www.elastic.co/guide/en/elasticsearch/hadoop/master/configuration.html#_network

On 4/22/15 10:44 PM, Adrian Mocanu wrote:

Hi

Thanks for the help. My ES is up.

Out of curiosity, do you know what the timeout value is? There are probably 
other things happening to cause the timeout;
I don’t think my ES is that slow but it’s possible that ES is taking too long 
to find the data. What I see happening is
that it uses scroll to get the data from ES; about 150 items at a time.Usual 
delay when I perform the same query from a
browser plugin ranges from 1-5sec.

Thanks

*From:*Jeetendra Gangele [mailto:gangele...@gmail.com]
*Sent:* April 22, 2015 3:09 PM
*To:* Adrian Mocanu
*Cc:* u...@spark.incubator.apache.org
*Subject:* Re: ElasticSearch for Spark times out

Basically ready timeout means hat no data arrived within the specified receive 
timeout period.

Few thing I would suggest

1.are your ES cluster Up and running?

2. if 1 is yes then reduce the size of the Index make it few kbps and then test?

On 23 April 2015 at 00:19, Adrian Mocanu amoc...@verticalscope.com 
mailto:amoc...@verticalscope.com wrote:

Hi

I use the ElasticSearch package for Spark and very often it times out reading 
data from ES into an RDD.

How can I keep the connection alive (why doesn’t it? Bug?)

Here’s the exception I get:

org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: 
java.net.SocketTimeoutException: Read timed out

 at 
org.elasticsearch.hadoop.serialization.json.JacksonJsonParser.nextToken(JacksonJsonParser.java:86)
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

 at 
org.elasticsearch.hadoop.serialization.ParsingUtils.doSeekToken(ParsingUtils.java:70)
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

 at 
org.elasticsearch.hadoop.serialization.ParsingUtils.seek(ParsingUtils.java:58)
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

 at 
org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:149)
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

 at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:102)
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

 at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:81)
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

 at 
org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:314)
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

 at 
org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76)
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

 at 
org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:46)
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

 at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
~[scala-library.jar:na]

 at 
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
~[scala-library.jar:na]

 at 
scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) 
~[scala-library.jar:na]

 at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
~[scala-library.jar:na]

 at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
~[scala-library.jar:na]

 at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
~[scala-library.jar:na]

 at 
org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65)
~[spark-core_2.10-1.1.0.jar:1.1.0]

 at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
~[spark-core_2.10-1.1.0.jar:1.1.0]

 at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
~[spark-core_2.10-1.1.0.jar:1.1.0]

 at org.apache.spark.scheduler.Task.run(Task.scala:54) 
~[spark-core_2.10-1.1.0.jar:1.1.0]

 at 

Re: Hive table creation - possible bug in Spark 1.3?

2015-04-22 Thread Michael Armbrust
Sorry for the confusion.  We should be more clear about the semantics in
the documentation. (PRs welcome :) )

.saveAsTable does not create a hive table, but instead creates a Spark Data
Source table.  Here the metadata is persisted into Hive, but hive cannot
read the tables (as this API support MLlib vectors, schema discovery, and
other things that hive does not).  If you want to create a hive table, use
HiveQL and run a CREATE TABLE AS SELECT ...

On Wed, Apr 22, 2015 at 12:50 AM, Ophir Cohen oph...@gmail.com wrote:

 I wrote few mails here regarding this issue.
 After further investigation I think there is a bug in Spark 1.3 in saving
 Hive tables.

 (hc is HiveContext)

 1. Verify the needed configuration exists:
 scala hc.sql(set hive.exec.compress.output).collect
 res4: Array[org.apache.spark.sql.Row] =
 Array([hive.exec.compress.output=true])
 scala hc.sql(set
 mapreduce.output.fileoutputformat.compress.codec).collect
 res5: Array[org.apache.spark.sql.Row] =
 Array([mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec])
 scala hc.sql(set
 mapreduce.output.fileoutputformat.compress.type).collect
 res6: Array[org.apache.spark.sql.Row] =
 Array([mapreduce.output.fileoutputformat.compress.type=BLOCK])
 2. Loading DataFrame and save as table (path point to exists file):
 val saDF = hc.parquetFile(path)
 saDF.count

 (count yield 229764 - i.e. the rdd exists)
 saDF.saveAsTable(test_hive_ms)

 Now for few interesting outputs:
 1. Trying to query Hive CLI, the table exists but with wrong output format:
 Failed with exception java.io.IOException:java.io.IOException: hdfs://
 10.166.157.97:9000/user/hive/warehouse/test_hive_ms/part-r-1.parquet
 not a SequenceFile
 2. Looking at the output files found that files are '.parquet' and not
 '.snappy'
 3. Looking at the saveAsTable output shows that it actually store the
 table in both, wrong output format and without compression:
 15/04/22 07:16:54 INFO metastore.HiveMetaStore: 0: create_table:
 Table(tableName:test_hive_ms, dbName:default, owner:hadoop,
 createTime:1429687014, lastAccessTime:0, retention:0,
 sd:StorageDescriptor(cols:[FieldSchema(name:col, type:arraystring,
 comment:from deserializer)], location:null,
 inputFormat:org.apache.hadoop.mapred.SequenceFileInputFormat,
 outputFormat:org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat,
 compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null,
 serializationLib:org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe,
 parameters:{serialization.format=1, path=hdfs://
 10.166.157.97:9000/user/hive/warehouse/test_hive_ms}
 http://10.166.157.97:9000/user/hive/warehouse/test_hive_ms%7D),
 bucketCols:[], sortCols:[], parameters:{},
 skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[],
 skewedColValueLocationMaps:{})), partitionKeys:[],
 parameters:{spark.sql.sources.schema.part.0={type:struct,fields:[{name:ADJDATE,type:long,nullable:true,metadata:{}},{name:sid,type:integer,nullable:true,metadata:{}},{name:ADJTYPE,type:integer,nullable:true,metadata:{}},{name:ENDADJDATE,type:long,nullable:true,metadata:{}},{name:ADJFACTOR,type:double,nullable:true,metadata:{}},{name:CUMADJFACTOR,type:double,nullable:true,metadata:{}}]},
 EXTERNAL=FALSE, spark.sql.sources.schema.numParts=1,
 spark.sql.sources.provider=org.apache.spark.sql.parquet},
 viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE)

 So, the question is: do I miss some configuration here or should I open a
 bug?

 Thanks,
 Ophir




spark-ec2 s3a filesystem support and hadoop versions

2015-04-22 Thread Daniel Mahler
I would like to easily launch a cluster that supports s3a file systems.

if I launch a cluster with `spark-ec2 --hadoop-major-version=2`,
what determines the minor version of hadoop?

Does it depend on the spark version being launched?

Are there other allowed values for --hadoop-major-version besides 1 and 2?

How can I get a cluster that supports s3a fielsystems?

thanks
Daniel


why does groupByKey return RDD[(K, Iterable[V])] not RDD[(K, CompactBuffer[V])] ?

2015-04-22 Thread Hao Ren
Hi,

Just a quick question,

Regarding the source code of groupByKey:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L453

In the end, it cast CompactBuffer to Iterable. But why ? Any advantage?

Thank you.

Hao.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/why-does-groupByKey-return-RDD-K-Iterable-V-not-RDD-K-CompactBuffer-V-tp22616.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



setting cost in linear SVM [Python]

2015-04-22 Thread Pagliari, Roberto
Is there a way to set the cost value C when using linear SVM?


beeline that comes with spark 1.3.0 doesn't work with --hiveconf or ''--hivevar which substitutes variables at hive scripts.

2015-04-22 Thread ogoh

Hello, 
I am using Spark 1.3 for SparkSQL (hive)  ThriftServer  Beeline.
The Beeline doesn't work with --hiveconf or ''--hivevar which substitutes
variables at hive scripts.
I found the following jiras saying that Hive 0.13 resolved that issue.
I wonder if this is well-known issue?

https://issues.apache.org/jira/browse/HIVE-4568 Beeline needs to support
resolving variables
https://issues.apache.org/jira/browse/HIVE-6173 Beeline doesn't accept
--hiveconf option as Hive CLI does

Thanks,
Okehee



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/beeline-that-comes-with-spark-1-3-0-doesn-t-work-with-hiveconf-or-hivevar-which-substitutes-variable-tp22615.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Task not Serializable: Graph is unexpectedly null when DStream is being serialized

2015-04-22 Thread Tathagata Das
Vaguely makes sense. :) Wow that's an interesting corner case.

On Wed, Apr 22, 2015 at 1:57 PM, Jean-Pascal Billaud j...@tellapart.com
wrote:

 I have now a fair understanding of the situation after looking at javap
 output. So as a reminder:

 dstream.map(tuple = {
 val w = StreamState.fetch[K,W](state.prefixKey, tuple._1)
 (tuple._1, (tuple._2, w)) })

 And StreamState being a very simple standalone object:

 object StreamState {
   def fetch[K : ClassTag : Ordering, V : ClassTag](prefixKey: String, key:
 K) : Option[V] = None
 }

 Basically the serialization failed because the ClassTag[K] came from the
 enclosing class, in which the dstream.map() code is running e.g. :

 class A[K : ClassTag](val dstream: DStream[K]) {
   [...]
   def fun =
 dstream.map(tuple = {
   val w = StreamState.fetch[K,W](state.prefixKey, tuple._1)
   (tuple._1, (tuple._2, w)) })
}

 therefore the instance of class A is being serialized and it fails when
 the dstream field call writeObject() when it checks for the graph field...

 The fact that graph is not set might be expected given that I have not
 started the context yet...

 Cheers,


 On Tue, Apr 21, 2015 at 6:17 PM, Tathagata Das t...@databricks.com
 wrote:

 It is kind of unexpected, i can imagine a real scenario under which it
 should trigger. But obviously I am missing something :)

 TD

 On Tue, Apr 21, 2015 at 4:23 PM, Jean-Pascal Billaud j...@tellapart.com
 wrote:

 Sure. But in general, I am assuming this Graph is unexpectedly null
 when DStream is being serialized must mean something. Under which
 circumstances, such an exception would trigger?

 On Tue, Apr 21, 2015 at 4:12 PM, Tathagata Das t...@databricks.com
 wrote:

 Yeah, I am not sure what is going on. The only way to figure to take a
 look at the disassembled bytecodes using javap.

 TD

 On Tue, Apr 21, 2015 at 1:53 PM, Jean-Pascal Billaud j...@tellapart.com
 wrote:

 At this point I am assuming that nobody has an idea... I am still
 going to give it a last shot just in case it was missed by some people :)

 Thanks,

 On Mon, Apr 20, 2015 at 2:20 PM, Jean-Pascal Billaud j...@tellapart.com
  wrote:

 Hey, so I start the context at the very end when all the piping is
 done. BTW a foreachRDD will be called on the resulting dstream.map() 
 right
 after that.

 The puzzling thing is why removing the context bounds solve the
 problem... What does this exception mean in general?

 On Mon, Apr 20, 2015 at 1:33 PM, Tathagata Das t...@databricks.com
 wrote:

 When are you getting this exception? After starting the context?

 TD

 On Mon, Apr 20, 2015 at 10:44 AM, Jean-Pascal Billaud 
 j...@tellapart.com wrote:

 Hi,

 I am getting this serialization exception and I am not too sure
 what Graph is unexpectedly null when DStream is being serialized 
 means?

 15/04/20 06:12:38 INFO yarn.ApplicationMaster: Final app status:
 FAILED, exitCode: 15, (reason: User class threw exception: Task not
 serializable)
 Exception in thread Driver org.apache.spark.SparkException: Task
 not serializable
 at org.apache.spark.util.ClosureCleaner$.
 ensureSerializable(ClosureCleaner.scala:166)
 at org.apache.spark.util.ClosureCleaner$.clean(
 ClosureCleaner.scala:158)
 at org.apache.spark.SparkContext.
 clean(SparkContext.scala:1435)
 at org.apache.spark.streaming.dstream.DStream.map(DStream.
 scala:438)
 [...]
 Caused by: java.io.NotSerializableException: Graph is unexpectedly
 null when DStream is being serialized.
 at org.apache.spark.streaming.dstream.DStream$anonfun$
 writeObject$1.apply$mcV$sp(DStream.scala:420)
 at org.apache.spark.util.Utils$.
 tryOrIOException(Utils.scala:985)
 at org.apache.spark.streaming.dstream.DStream.writeObject(
 DStream.scala:403)

 The operation comes down to something like this:

 dstream.map(tuple = {
 val w = StreamState.fetch[K,W](state.prefixKey, tuple._1)
 (tuple._1, (tuple._2, w)) })

 And StreamState being a very simple standalone object:

 object StreamState {
   def fetch[K : ClassTag : Ordering, V : ClassTag](prefixKey:
 String, key: K) : Option[V] = None
 }

 However if I remove the context bounds from K in fetch e.g.
 removing ClassTag and Ordering then everything is fine.

 If anyone has some pointers, I'd really appreciate it.

 Thanks,











Re: RE: ElasticSearch for Spark times out

2015-04-22 Thread Nick Pentreath
Is your ES cluster reachable from your Spark cluster via network / firewall? 
Can you run the same query from the spark master and slave nodes via curl / one 
of the other clients?




Seems odd that GC issues would be a problem from the scan but not when running 
query from a browser plugin... Sounds like it could be a network issue.



—
Sent from Mailbox

On Thu, Apr 23, 2015 at 5:11 AM, Otis Gospodnetic
otis.gospodne...@gmail.com wrote:

 Hi,
 If you get ES response back in 1-5 seconds that's pretty slow.  Are these
 ES aggregation queries?  Costin may be right about GC possibly causing
 timeouts.  SPM http://sematext.com/spm/ can give you all Spark and all
 key Elasticsearch metrics, including various JVM metrics.  If the problem
 is GC, you'll see it.  If you monitor both Spark side and ES side, you
 should be able to find some correlation with SPM.
 Otis
 --
 Monitoring * Alerting * Anomaly Detection * Centralized Log Management
 Solr  Elasticsearch Support * http://sematext.com/
 On Wed, Apr 22, 2015 at 5:43 PM, Costin Leau costin.l...@gmail.com wrote:
 Hi,

 First off, for Elasticsearch questions is worth pinging the Elastic
 mailing list as that is closer monitored than this one.

 Back to your question, Jeetendra is right that the exception indicates
 nodata is flowing back to the es-connector and
 Spark.
 The default is 1m [1] which should be more than enough for a typical
 scenario. As a side note the scroll size is 50 per
 tasks
 (so 150 suggests 3 tasks).

 Once the query is made, scrolling the document is fast - likely there's
 something else at hand that causes the
 connection to timeout.
 In such cases, you can enable logging on the REST package and see what
 type of data transfer occurs between ES and Spark.

 Do note that if a GC occurs, that can freeze Elastic (or Spark) which
 might trigger the timeout. Consider monitoring
 Elasticsearch during
 the query and see whether anything jumps - in particular the memory
 pressure.

 Hope this helps,

 [1]
 http://www.elastic.co/guide/en/elasticsearch/hadoop/master/configuration.html#_network

 On 4/22/15 10:44 PM, Adrian Mocanu wrote:

 Hi

 Thanks for the help. My ES is up.

 Out of curiosity, do you know what the timeout value is? There are
 probably other things happening to cause the timeout;
 I don’t think my ES is that slow but it’s possible that ES is taking too
 long to find the data. What I see happening is
 that it uses scroll to get the data from ES; about 150 items at a
 time.Usual delay when I perform the same query from a
 browser plugin ranges from 1-5sec.

 Thanks

 *From:*Jeetendra Gangele [mailto:gangele...@gmail.com]
 *Sent:* April 22, 2015 3:09 PM
 *To:* Adrian Mocanu
 *Cc:* u...@spark.incubator.apache.org
 *Subject:* Re: ElasticSearch for Spark times out

 Basically ready timeout means hat no data arrived within the specified
 receive timeout period.

 Few thing I would suggest

 1.are your ES cluster Up and running?

 2. if 1 is yes then reduce the size of the Index make it few kbps and
 then test?

 On 23 April 2015 at 00:19, Adrian Mocanu amoc...@verticalscope.com
 mailto:amoc...@verticalscope.com wrote:

 Hi

 I use the ElasticSearch package for Spark and very often it times out
 reading data from ES into an RDD.

 How can I keep the connection alive (why doesn’t it? Bug?)

 Here’s the exception I get:

 org.elasticsearch.hadoop.serialization.EsHadoopSerializationException:
 java.net.SocketTimeoutException: Read timed out

  at
 org.elasticsearch.hadoop.serialization.json.JacksonJsonParser.nextToken(JacksonJsonParser.java:86)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

  at
 org.elasticsearch.hadoop.serialization.ParsingUtils.doSeekToken(ParsingUtils.java:70)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

  at
 org.elasticsearch.hadoop.serialization.ParsingUtils.seek(ParsingUtils.java:58)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

  at
 org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:149)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

  at
 org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:102)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

  at
 org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:81)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

  at
 org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:314)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

  at
 org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

  at
 org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:46)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

  at
 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 

Loading lots of .parquet files in Spark 1.3.1 (Hadoop 2.4)

2015-04-22 Thread cosmincatalin
I am trying to read a few hundred .parquet files from S3 into an EMR cluster.
The .parquet files are structured by date and have /_common_metadata/ in
each of the folders (as well as /_metadata/).The *sqlContext.parquetFile*
operation takes a very long time, opening for reading each of the .parquet
files. I would have expected that the /*metdata/ files would be used for
structure so that Spark does not have to go through all the files in a
folder. I have also tried for a single folder this experiment, all the
.parquet files have been opened and the /*metdata/ was apparently
ignored.What can I do to speed up the loading process? Can I load the
.parquet files in parallel? What is the purpose of the /*metadata/ files?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Loading-lots-of-parquet-files-in-Spark-1-3-1-Hadoop-2-4-tp22624.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

LDA code little error @Xiangrui Meng

2015-04-22 Thread buring
Hi:
there is a little error in source code LDA.scala at line 180, as
follows:
   def setBeta(beta: Double): this.type = setBeta(beta)

   which cause java.lang.StackOverflowError. It's easy to see there is 
error 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/LDA-code-little-error-Xiangrui-Meng-tp22621.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: MLlib - Collaborative Filtering - trainImplicit task size

2015-04-22 Thread Xiangrui Meng
This is the size of the serialized task closure. Is stage 246 part of
ALS iterations, or something before or after it? -Xiangrui

On Tue, Apr 21, 2015 at 10:36 AM, Christian S. Perone
christian.per...@gmail.com wrote:
 Hi Sean, thanks for the answer. I tried to call repartition() on the input
 with many different sizes and it still continues to show that warning
 message.

 On Tue, Apr 21, 2015 at 7:05 AM, Sean Owen so...@cloudera.com wrote:

 I think maybe you need more partitions in your input, which might make
 for smaller tasks?

 On Tue, Apr 21, 2015 at 2:56 AM, Christian S. Perone
 christian.per...@gmail.com wrote:
  I keep seeing these warnings when using trainImplicit:
 
  WARN TaskSetManager: Stage 246 contains a task of very large size (208
  KB).
  The maximum recommended task size is 100 KB.
 
  And then the task size starts to increase. Is this a known issue ?
 
  Thanks !
 
  --
  Blog | Github | Twitter
  Forgive, O Lord, my little jokes on Thee, and I'll forgive Thy great
  big
  joke on me.




 --
 Blog | Github | Twitter
 Forgive, O Lord, my little jokes on Thee, and I'll forgive Thy great big
 joke on me.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to debug Spark on Yarn?

2015-04-22 Thread ๏̯͡๏
I submit a spark app to YARN and i get these messages

 15/04/22 22:45:04 INFO yarn.Client: Application report for
application_1429087638744_101363 (state: RUNNING)

15/04/22 22:45:04 INFO yarn.Client: Application report for
application_1429087638744_101363 (state: RUNNING).

...


1) I can go to Spark UI and see the status of the APP but cannot see the
logs as the job progresses. How can i see logs of executors as they
progress ?

2) In case the App fails/completes then Spark UI vanishes and i get a YARN
Job page that says job failed, there are no link to Spark UI again. Now i
take the job ID and run yarn application logs appid and my console fills up
(with huge scrolling) with logs of all executors. Then i copy and paste
into a text editor and search for keywords Exception , Job aborted due
to . Is this the right way to view logs ?

-- 
Deepak


Re: Spark 1.3.1 Dataframe breaking ALS.train?

2015-04-22 Thread Xiangrui Meng
The patched was merged and it will be included in 1.3.2 and 1.4.0.
Thanks for reporting the bug! -Xiangrui

On Tue, Apr 21, 2015 at 2:51 PM, ayan guha guha.a...@gmail.com wrote:
 Thank you all.

 On 22 Apr 2015 04:29, Xiangrui Meng men...@gmail.com wrote:

 SchemaRDD subclasses RDD in 1.2, but DataFrame is no longer an RDD in
 1.3. We should allow DataFrames in ALS.train. I will submit a patch.
 You can use `ALS.train(training.rdd, ...)` for now as a workaround.
 -Xiangrui

 On Tue, Apr 21, 2015 at 10:51 AM, Joseph Bradley jos...@databricks.com
 wrote:
  Hi Ayan,
 
  If you want to use DataFrame, then you should use the Pipelines API
  (org.apache.spark.ml.*) which will take DataFrames:
 
  http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.recommendation.ALS
 
  In the examples/ directory for ml/, you can find a MovieLensALS example.
 
  Good luck!
  Joseph
 
  On Tue, Apr 21, 2015 at 4:58 AM, ayan guha guha.a...@gmail.com wrote:
 
  Hi
 
  I am getting an error
 
  Also, I am getting an error in mlib.ALS.train function when passing
  dataframe (do I need to convert the DF to RDD?)
 
  Code:
  training = ssc.sql(select userId,movieId,rating from ratings where
  partitionKey  6).cache()
  print type(training)
  model = ALS.train(training,rank,numIter,lmbda)
 
  Error:
  class 'pyspark.sql.dataframe.DataFrame'
 
  Traceback (most recent call last):
File D:\Project\Spark\code\movie_sql.py, line 109, in module
  bestConf =
  getBestModel(sc,ssc,training,validation,validationNoRating)
File D:\Project\Spark\code\movie_sql.py, line 54, in getBestModel
  model = ALS.train(trainingRDD,rank,numIter,lmbda)
File
 
  D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py,
  line 139, in train
  model = callMLlibFunc(trainALSModel, cls._prepare(ratings), rank,
  iterations,
File
 
  D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py,
  line 127, in _prepare
  assert isinstance(ratings, RDD), ratings should be RDD
  AssertionError: ratings should be RDD
 
  It was working fine in 1.2.0 (till last night :))
 
  Any solution? I am thinking to map the training dataframe back to a
  RDD,
  byt will lose the schema information.
 
  Best
  Ayan
 
  On Mon, Apr 20, 2015 at 10:23 PM, ayan guha guha.a...@gmail.com
  wrote:
 
  Hi
  Just upgraded to Spark 1.3.1.
 
  I am getting an warning
 
  Warning (from warnings module):
File
 
  D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\sql\context.py,
  line 191
  warnings.warn(inferSchema is deprecated, please use
  createDataFrame
  instead)
  UserWarning: inferSchema is deprecated, please use createDataFrame
  instead
 
  However, documentation still says to use inferSchema.
  Here: http://spark.apache.org/docs/latest/sql-programming-guide.htm in
  section
 
  Also, I am getting an error in mlib.ALS.train function when passing
  dataframe (do I need to convert the DF to RDD?)
 
  Code:
  training = ssc.sql(select userId,movieId,rating from ratings where
  partitionKey  6).cache()
  print type(training)
  model = ALS.train(training,rank,numIter,lmbda)
 
  Error:
  class 'pyspark.sql.dataframe.DataFrame'
  Rank:8 Lmbda:1.0 iteration:10
 
  Traceback (most recent call last):
File D:\Project\Spark\code\movie_sql.py, line 109, in module
  bestConf =
  getBestModel(sc,ssc,training,validation,validationNoRating)
File D:\Project\Spark\code\movie_sql.py, line 54, in getBestModel
  model = ALS.train(trainingRDD,rank,numIter,lmbda)
File
 
  D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py,
  line 139, in train
  model = callMLlibFunc(trainALSModel, cls._prepare(ratings),
  rank,
  iterations,
File
 
  D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py,
  line 127, in _prepare
  assert isinstance(ratings, RDD), ratings should be RDD
  AssertionError: ratings should be RDD
 
  --
  Best Regards,
  Ayan Guha
 
 
 
 
  --
  Best Regards,
  Ayan Guha
 
 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Problem with using Spark ML

2015-04-22 Thread Xiangrui Meng
Please try reducing the step size. The native BLAS library is not
required. -Xiangrui

On Tue, Apr 21, 2015 at 5:15 AM, Staffan staffan.arvids...@gmail.com wrote:
 Hi,
 I've written an application that performs some machine learning on some
 data. I've validated that the data _should_ give a good output with a decent
 RMSE by using Lib-SVM:
 Mean squared error = 0.00922063 (regression)
 Squared correlation coefficient = 0.9987 (regression)

 When I try to use Spark ML to do the exact same thing I get:
 Mean Squared Error = 8.466193152067944E224

 Which is somewhat worse.. I've tried to look at the data before it's
 inputted to the model, printed that data to file (which is actually the data
 used when I got the result from Lib-SVM above). Somewhere there much be a
 huge mistake, but I cannot place it somewhere in my code (see below).
 traningLP and testLP are training and test-data, in RDD[LabeledPoint].

 // Generate model
 val model_gen = new RidgeRegressionWithSGD();
 val model = model_gen.run(trainingLP);

 // Predict on the test-data
 val valuesAndPreds = testLP.map { point =
 val prediction = model.predict(point.features);
 println(label:  + point.label + , pred:  + prediction);
 (point.label, prediction);
 }
 val MSE = valuesAndPreds.map{case(v, p) = math.pow((v - p), 2)}.mean();
 println(Mean Squared Error =  + MSE)


 I've printed label and prediction-values for each data-point in the testset,
 and the result is something like this;
 label: 5.04, pred: -4.607899000641277E112
 label: 3.59, pred: -3.96787105480399E112
 label: 5.06, pred: -2.8263294374576145E112
 label: 2.85, pred: -1.1536508029072844E112
 label: 2.1, pred: -4.269312783707508E111
 label: 2.75, pred: -3.0072665148591558E112
 label: -0.29, pred: -2.035681731641989E112
 label: 1.98, pred: -3.163404340354783E112

 So there is obviously something wrong with the prediction step. I'm using
 the SparseVector representation of the Vector in LabeledPoint, looking
 something like this for reference (shortened for convenience);
 (-1.59,(2080,[29,59,62,74,127,128,131,144,149,175,198,200,239,247,267,293,307,364,374,393,410,424,425,431,448,469,477,485,501,525,532,533,538,560,..],[1.0,1.0,2.0,8.0,1.0,1.0,6.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,5.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,3.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,8.0,2.0,1.0,1.0,..]))
 (-1.75,(2080,[103,131,149,208,296,335,520,534,603,620,661,694,709,748,859,1053,1116,1156,1186,1207,1208,1223,1256,1278,1356,1375,1399,1480,1569,..],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,4.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,2.0,2.0,2.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,4.0,1.0,7.0,1.0,3.0,2.0,1.0]))

 I do get one type of warning, but that's about it! (And as to my
 understanding, this native code is not required to get the correct results,
 only to improve performance).
 6010 [main] WARN  com.github.fommil.netlib.BLAS  - Failed to load
 implementation from: com.github.fommil.netlib.NativeSystemBLAS
 6011 [main] WARN  com.github.fommil.netlib.BLAS  - Failed to load
 implementation from: com.github.fommil.netlib.NativeRefBLAS

 So where do I go from here?



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-using-Spark-ML-tp22591.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [MLlib] fail to run word2vec

2015-04-22 Thread Xiangrui Meng
We store the vectors on the driver node. So it is hard to handle a
really large vocabulary. You can use setMinCount to filter out
infrequent word to reduce the model size. -Xiangrui

On Wed, Apr 22, 2015 at 12:32 AM, gm yu husty...@gmail.com wrote:
 When use Mllib.Word2Vec, I meet the following error:

  allocating large
 array--thread_id[0x7ff2741ca000]--thread_name[Driver]--array_size[1146093680
 bytes]--array_length[1146093656 elememts]
 prio=10 tid=0x7ff2741ca000 nid=0x1405f runnable
   at java.util.Arrays.copyOf(Arrays.java:2786)
   at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:94)
   - locked 0x7ff33f7fafd0 (a java.io.ByteArrayOutputStream)
   at
 java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1812)
   at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1504)
   at 
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
   at
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
   at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1346)
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1154)
   at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518)
   at 
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
   at
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
   at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518)
   at 
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
   at
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
   at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518)
   at 
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
   at
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330)
   at
 org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
   at
 org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
   at
 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
   at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
   at org.apache.spark.SparkContext.clean(SparkContext.scala:1627)
   at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:635)
   at org.apache.spark.mllib.feature.Word2Vec.fit(Word2Vec.scala:270)
   at com.taobao.changrui.SynonymFind$.main(SynonymFind.scala:79)
   at com.taobao.changrui.SynonymFind.main(SynonymFind.scala)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
   at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
   at java.lang.reflect.Method.invoke(Method.java:597)
   at
 org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:516)


 The data size is: 100M+ sentences, 100M+ words

 Jos Setting is: 50 executors with 20GB and 4cores, the driver memory is 30GB


 Any ideas? Thank you.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: the indices of SparseVector must be ordered while computing SVD

2015-04-22 Thread Xiangrui Meng
Having ordered indices is a contract of SparseVector:
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.linalg.SparseVector.
We do not verify it for performance. -Xiangrui

On Wed, Apr 22, 2015 at 8:26 AM, yaochunnan yaochun...@gmail.com wrote:
 Hi all,
 I am using Spark 1.3.1 to write a Spectral Clustering algorithm. This really
 confused me today. At first I thought my implementation is wrong. It turns
 out it's an issue in MLlib. Fortunately, I've figured it out.

 I suggest to add a hint on user document of MLlib ( as far as I know, there
 have not been such hints yet) that  indices of Local Sparse Vector must be
 ordered in ascending manner. Because of ignorance of this point, I spent a
 lot of time looking for reasons why computeSVD of RowMatrix did not run
 correctly on Sparse data. I don't know the influence of Sparse Vector
 without ordered indices on other functions, but I believe it is necessary to
 let the users know or fix it. Actually, it's very easy to fix. Just add a
 sortBy function in internal construction of SparseVector.

 Here is an example to reproduce the affect of unordered Sparse Vector on
 computeSVD.
 
 //in spark-shell, Spark 1.3.1
  import org.apache.spark.mllib.linalg.distributed.RowMatrix
  import org.apache.spark.mllib.linalg.{SparseVector, DenseVector, Vector,
 Vectors}

   val sparseData_ordered = Seq(
 Vectors.sparse(3, Array(1, 2), Array(1.0, 2.0)),
 Vectors.sparse(3, Array(0,1,2), Array(3.0, 4.0, 5.0)),
 Vectors.sparse(3, Array(0,1,2), Array(6.0, 7.0, 8.0)),
 Vectors.sparse(3, Array(0,2), Array(9.0, 1.0))
   )
   val sparseMat_ordered = new RowMatrix(sc.parallelize(sparseData_ordered,
 2))

   val sparseData_not_ordered = Seq(
 Vectors.sparse(3, Array(1, 2), Array(1.0, 2.0)),
 Vectors.sparse(3, Array(2,1,0), Array(5.0,4.0,3.0)),
 Vectors.sparse(3, Array(0,1,2), Array(6.0, 7.0, 8.0)),
 Vectors.sparse(3, Array(2,0), Array(1.0,9.0))
   )
  val sparseMat_not_ordered = new
 RowMatrix(sc.parallelize(sparseData_not_ordered, 2))

 //apparently, sparseMat_ordered and sparseMat_not_ordered are essentially
 the same matirx
 //however, the computeSVD result of these two matrixes are different. Users
 should be notified about this situation.
   println(sparseMat_ordered.computeSVD(2,
 true).U.rows.collect.mkString(\n))
   println(===)
   println(sparseMat_not_ordered.computeSVD(2,
 true).U.rows.collect.mkString(\n))
 ==
 The results are:
 ordered:
 [-0.10972870132786407,-0.18850811494220537]
 [-0.44712472003608356,-0.24828866611663725]
 [-0.784520738744303,-0.3080692172910691]
 [-0.4154110101064339,0.8988385762953358]

 not ordered:
 [-0.10830447119599484,-0.1559341848984378]
 [-0.4522713511277327,-0.23449829541447448]
 [-0.7962382310594706,-0.3130624059305111]
 [-0.43131320303494614,0.8453864703362308]

 Looking into this issue, I can see it's reason locates in
 RowMatrix.scala(line 629). The implementation of Sparse dspr here requires
 ordered indices. Because it is scanning the indices consecutively to skip
 empty columns.








 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/the-indices-of-SparseVector-must-be-ordered-while-computing-SVD-tp22611.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: LDA code little error @Xiangrui Meng

2015-04-22 Thread Xiangrui Meng
Thanks! That's a bug .. -Xiangrui

On Wed, Apr 22, 2015 at 9:34 PM, buring qyqb...@gmail.com wrote:
 Hi:
 there is a little error in source code LDA.scala at line 180, as
 follows:
def setBeta(beta: Double): this.type = setBeta(beta)

which cause java.lang.StackOverflowError. It's easy to see there is
 error



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/LDA-code-little-error-Xiangrui-Meng-tp22621.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: StandardScaler failing with OOM errors in PySpark

2015-04-22 Thread Xiangrui Meng
What is the feature dimension? Did you set the driver memory? -Xiangrui

On Tue, Apr 21, 2015 at 6:59 AM, rok rokros...@gmail.com wrote:
 I'm trying to use the StandardScaler in pyspark on a relatively small (a few
 hundred Mb) dataset of sparse vectors with 800k features. The fit method of
 StandardScaler crashes with Java heap space or Direct buffer memory errors.
 There should be plenty of memory around -- 10 executors with 2 cores each
 and 8 Gb per core. I'm giving the executors 9g of memory and have also tried
 lots of overhead (3g), thinking it might be the array creation in the
 aggregators that's causing issues.

 The bizarre thing is that this isn't always reproducible -- sometimes it
 actually works without problems. Should I be setting up executors
 differently?

 Thanks,

 Rok




 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/StandardScaler-failing-with-OOM-errors-in-PySpark-tp22593.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RDD.filter vs. RDD.join--advice please

2015-04-22 Thread dsgriffin
Test it out, but I would be willing to bet the join is going to be a good
deal faster.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RDD-filter-vs-RDD-join-advice-please-tp22612p22614.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Scheduling across applications - Need suggestion

2015-04-22 Thread Lan Jiang
YARN capacity scheduler support hierarchical queues, which you can assign
cluster resource as percentage. Your spark application/shell can be
submitted to different queues. Mesos supports fine-grained mode, which
allows the machines/cores used each executors ramp up and down.

Lan

On Wed, Apr 22, 2015 at 2:32 PM, yana yana.kadiy...@gmail.com wrote:

 Yes. Fair schedulwr only helps concurrency within an application.  With
 multiple shells you'd either need something like Yarn/Mesos or careful math
 on resources as you said


 Sent on the new Sprint Network from my Samsung Galaxy S®4.


  Original message 
 From: Arun Patel
 Date:04/22/2015 6:28 AM (GMT-05:00)
 To: user
 Subject: Scheduling across applications - Need suggestion

 I believe we can use the properties like --executor-memory
  --total-executor-cores to configure the resources allocated for each
 application.  But, in a multi user environment, shells and applications are
 being submitted by multiple users at the same time.  All users are
 requesting resources with different properties.  At times, some users are
 not getting resources of the cluster.


 How to control resource usage in this case?  Please share any best
 practices followed.


 As per my understanding, Fair scheduler can used for scheduling tasks
 within an application but not across multiple applications.  Is this
 correct?


 Regards,

 Arun



Re: Map Question

2015-04-22 Thread Vadim Bichutskiy
Here's what I did:

print 'BROADCASTING...'
broadcastVar = sc.broadcast(mylist)
print broadcastVar
print broadcastVar.value
print 'FINISHED BROADCASTING...'

The above works fine,

but when I call myrdd.map(myfunc) I get *NameError: global name
'broadcastVar' is not defined*

The myfunc function is in a different module. How do I make it aware of
broadcastVar?
ᐧ

On Wed, Apr 22, 2015 at 2:13 PM, Vadim Bichutskiy 
vadim.bichuts...@gmail.com wrote:

 Great. Will try to modify the code. Always room to optimize!
 ᐧ

 On Wed, Apr 22, 2015 at 2:11 PM, Tathagata Das t...@databricks.com
 wrote:

 Absolutely. The same code would work for local as well as distributed
 mode!

 On Wed, Apr 22, 2015 at 11:08 AM, Vadim Bichutskiy 
 vadim.bichuts...@gmail.com wrote:

 Can I use broadcast vars in local mode?
 ᐧ

 On Wed, Apr 22, 2015 at 2:06 PM, Tathagata Das t...@databricks.com
 wrote:

 Yep. Not efficient. Pretty bad actually. That's why broadcast variable
 were introduced right at the very beginning of Spark.



 On Wed, Apr 22, 2015 at 10:58 AM, Vadim Bichutskiy 
 vadim.bichuts...@gmail.com wrote:

 Thanks TD. I was looking into broadcast variables.

 Right now I am running it locally...and I plan to move it to
 production on EC2.

 The way I fixed it is by doing myrdd.map(lambda x: (x,
 mylist)).map(myfunc) but I don't think it's efficient?

 mylist is filled only once at the start and never changes.

 Vadim
 ᐧ

 On Wed, Apr 22, 2015 at 1:42 PM, Tathagata Das t...@databricks.com
 wrote:

 Is the mylist present on every executor? If not, then you have to
 pass it on. And broadcasts are the best way to pass them on. But note 
 that
 once broadcasted it will immutable at the executors, and if you update 
 the
 list at the driver, you will have to broadcast it again.

 TD

 On Wed, Apr 22, 2015 at 9:28 AM, Vadim Bichutskiy 
 vadim.bichuts...@gmail.com wrote:

 I am using Spark Streaming with Python. For each RDD, I call a map,
 i.e., myrdd.map(myfunc), myfunc is in a separate Python module. In yet
 another separate Python module I have a global list, i.e. mylist,
 that's populated with metadata. I can't get myfunc to see mylist...it's
 always empty. Alternatively, I guess I could pass mylist to map.

 Any suggestions?

 Thanks,
 Vadim










Re: SparkSQL performance

2015-04-22 Thread Michael Armbrust
https://github.com/databricks/spark-avro

On Tue, Apr 21, 2015 at 3:09 PM, Renato Marroquín Mogrovejo 
renatoj.marroq...@gmail.com wrote:

 Thanks Michael!
 I have tried applying my schema programatically but I didn't get any
 improvement on performance :(
 Could you point me to some code examples using Avro please?
 Many thanks again!


 Renato M.

 2015-04-21 20:45 GMT+02:00 Michael Armbrust mich...@databricks.com:

 Here is an example using rows directly:

 https://spark.apache.org/docs/1.3.0/sql-programming-guide.html#programmatically-specifying-the-schema

 Avro or parquet input would likely give you the best performance.

 On Tue, Apr 21, 2015 at 4:28 AM, Renato Marroquín Mogrovejo 
 renatoj.marroq...@gmail.com wrote:

 Thanks for the hints guys! much appreciated!
 Even if I just do a something like:

 Select * from tableX where attribute1  5

 I see similar behaviour.

 @Michael
 Could you point me to any sample code that uses Spark's Rows? We are at
 a phase where we can actually change our JavaBeans for something that
 provides a better performance than what we are seeing now. Would you
 recommend using Avro presentation then?
 Thanks again!


 Renato M.

 2015-04-21 1:18 GMT+02:00 Michael Armbrust mich...@databricks.com:

 There is a cost to converting from JavaBeans to Rows and this code path
 has not been optimized.  That is likely what you are seeing.

 On Mon, Apr 20, 2015 at 3:55 PM, ayan guha guha.a...@gmail.com wrote:

 SparkSQL optimizes better by column pruning and predicate pushdown,
 primarily. Here you are not taking advantage of either.

 I am curious to know what goes in your filter function, as you are not
 using a filter in SQL side.

 Best
 Ayan
 On 21 Apr 2015 08:05, Renato Marroquín Mogrovejo 
 renatoj.marroq...@gmail.com wrote:

 Does anybody have an idea? a clue? a hint?
 Thanks!


 Renato M.

 2015-04-20 9:31 GMT+02:00 Renato Marroquín Mogrovejo 
 renatoj.marroq...@gmail.com:

 Hi all,

 I have a simple query Select * from tableX where attribute1 between
 0 and 5 that I run over a Kryo file with four partitions that ends up
 being around 3.5 million rows in our case.
 If I run this query by doing a simple map().filter() it takes around
 ~9.6 seconds but when I apply schema, register the table into a 
 SqlContext,
 and then run the query, it takes around ~16 seconds. This is using Spark
 1.2.1 with Scala 2.10.0
 I am wondering why there is such a big gap on performance if it is
 just a filter. Internally, the relation files are mapped to a JavaBean.
 This different data presentation (JavaBeans vs SparkSQL internal
 representation) could lead to such difference? Is there anything I 
 could do
 to make the performance get closer to the hard-coded option?
 Thanks in advance for any suggestions or ideas.


 Renato M.









RE: Trouble working with Spark-CSV package (error: object databricks is not a member of package com)

2015-04-22 Thread yana
You can try pulling the jar with wget and using it with -jars with Spark shell. 
I used 1.0.3 with Spark 1.3.0 but with a different version of scala. From the 
stack trace it looks like Spark shell is just not seeing the csv jar...


Sent on the new Sprint Network from my Samsung Galaxy S®4.

div Original message /divdivFrom: Mohammed Omer 
beancinemat...@gmail.com /divdivDate:04/22/2015  2:01 PM  (GMT-05:00) 
/divdivTo: user@spark.apache.org /divdivSubject: Trouble working with 
Spark-CSV package (error: object databricks is not a member of package com) 
/divdiv
/divAfternoon all,

I'm working with Scala 2.11.6, and Spark 1.3.1 built from source via:

`mvn -Pyarn -Phadoop-2.4 -Dscala-2.11 -DskipTests clean package`

The error is encountered when running spark shell via:

`spark-shell --packages com.databricks:spark-csv_2.11:1.0.3`

The full trace of the commands can be found at 
https://gist.github.com/momer/9d1ca583f9978ec9739d

Not sure if I've done something wrong, or if the documentation is outdated, 
or...? 

Would appreciate any input or push in the right direction!

Thank you,

Mo

Re: Join on DataFrames from the same source (Pyspark)

2015-04-22 Thread Karlson

DataFrames do not have the attributes 'alias' or 'as' in the Python API.

On 2015-04-21 20:41, Michael Armbrust wrote:

This is https://issues.apache.org/jira/browse/SPARK-6231

Unfortunately this is pretty hard to fix as its hard for us to
differentiate these without aliases.  However you can add an alias as
follows:

from pyspark.sql.functions import *
df.alias(a).join(df.alias(b), col(a.col1) == col(b.col1))

On Tue, Apr 21, 2015 at 8:10 AM, Karlson ksonsp...@siberie.de wrote:


Sorry, my code actually was

df_one = df.select('col1', 'col2')
df_two = df.select('col1', 'col3')

But in Spark 1.4.0 this does not seem to make any difference anyway 
and

the problem is the same with both versions.



On 2015-04-21 17:04, ayan guha wrote:


your code should be

 df_one = df.select('col1', 'col2')
 df_two = df.select('col1', 'col3')

Your current code is generating a tupple, and of course df_1 and df_2 
are

different, so join is yielding to cartesian.

Best
Ayan

On Wed, Apr 22, 2015 at 12:42 AM, Karlson ksonsp...@siberie.de 
wrote:


 Hi,


can anyone confirm (and if so elaborate on) the following problem?

When I join two DataFrames that originate from the same source 
DataFrame,
the resulting DF will explode to a huge number of rows. A quick 
example:


I load a DataFrame with n rows from disk:

df = sql_context.parquetFile('data.parquet')

Then I create two DataFrames from that source.

df_one = df.select(['col1', 'col2'])
df_two = df.select(['col1', 'col3'])

Finally I want to (inner) join them back together:

df_joined = df_one.join(df_two, df_one['col1'] == 
df_two['col2'],

'inner')

The key in col1 is unique. The resulting DataFrame should have n 
rows,

however it does have n*n rows.

That does not happen, when I load df_one and df_two from disk 
directly. I
am on Spark 1.3.0, but this also happens on the current 1.4.0 
snapshot.


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Building Spark : Adding new DataType in Catalyst

2015-04-22 Thread zia_kayani
Hi, 
I am working on adding Geometry i.e. a new DataType into Spark catalyst, so
that ROW can hold that object also, I've made a progress but its time taking
as I've to compile the whole spark project, otherwise that changes aren't
visible, I've tried to just build Spark SQL and Catalyst module but it
doesn't have any impact unless i compile the whole spark. What i am missing
? Is there any better way ?? 

 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Building-Spark-Adding-new-DataType-in-Catalyst-tp22604.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[MLlib] fail to run word2vec

2015-04-22 Thread gm yu
When use Mllib.Word2Vec, I meet the following error:

 allocating large
array--thread_id[0x7ff2741ca000]--thread_name[Driver]--array_size[1146093680
bytes]--array_length[1146093656 elememts]
prio=10 tid=0x7ff2741ca000 nid=0x1405f runnable
at java.util.Arrays.copyOf(Arrays.java:2786)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:94)
- locked 0x7ff33f7fafd0 (a java.io.ByteArrayOutputStream)
at 
java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1812)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1504)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1346)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1154)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330)
at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1627)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:635)
at org.apache.spark.mllib.feature.Word2Vec.fit(Word2Vec.scala:270)
at com.taobao.changrui.SynonymFind$.main(SynonymFind.scala:79)
at com.taobao.changrui.SynonymFind.main(SynonymFind.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:516)


The data size is: 100M+ sentences, 100M+ words

Jos Setting is: 50 executors with 20GB and 4cores, the driver memory is 30GB


Any ideas? Thank you.


Re: sparksql - HiveConf not found during task deserialization

2015-04-22 Thread Akhil Das
I see, now try a bit tricky approach, Add the hive jar to the
SPARK_CLASSPATH (in conf/spark-env.sh file on all machines) and make sure
that jar is available on all the machines in the cluster in the same path.

Thanks
Best Regards

On Wed, Apr 22, 2015 at 11:24 AM, Manku Timma manku.tim...@gmail.com
wrote:

 Akhil, Thanks for the suggestions.
 I tried out sc.addJar, --jars, --conf spark.executor.extraClassPath and
 none of them helped. I added stuff into compute-classpath.sh. That did not
 change anything. I checked the classpath of the running executor and made
 sure that the hive jars are in that dir. For me the most confusing thing is
 that the executor can actually create HiveConf objects but when it cannot
 find that when the task deserializer is at work.

 On 20 April 2015 at 14:18, Akhil Das ak...@sigmoidanalytics.com wrote:

 Can you try sc.addJar(/path/to/your/hive/jar), i think it will resolve
 it.

 Thanks
 Best Regards

 On Mon, Apr 20, 2015 at 12:26 PM, Manku Timma manku.tim...@gmail.com
 wrote:

 Akhil,
 But the first case of creating HiveConf on the executor works fine (map
 case). Only the second case fails. I was suspecting some foul play with
 classloaders.

 On 20 April 2015 at 12:20, Akhil Das ak...@sigmoidanalytics.com wrote:

 Looks like a missing jar, try to print the classpath and make sure the
 hive jar is present.

 Thanks
 Best Regards

 On Mon, Apr 20, 2015 at 11:52 AM, Manku Timma manku.tim...@gmail.com
 wrote:

 I am using spark-1.3 with hadoop-provided and hive-provided and
 hive-0.13.1 profiles. I am running a simple spark job on a yarn cluster by
 adding all hadoop2 and hive13 jars to the spark classpaths.

 If I remove the hive-provided while building spark, I dont face any
 issue. But with hive-provided I am getting a
 java.lang.NoClassDefFoundError: org/apache/hadoop/hive/conf/HiveConf in
 the yarn executor.

 Code is below:
 import org.apache.spark._
 import org.apache.spark.sql._
 import org.apache.hadoop.hive.conf.HiveConf

 object Simple {
   def main(args: Array[String]) = {
 val sc = new SparkContext(new SparkConf())
 val sqlC = new  org.apache.spark.sql.hive.HiveContext(sc)

 val x = sc.parallelize(1 to 2).map(x =
   { val h = new HiveConf; h.getBoolean(hive.test, false) })
 x.collect.foreach(x = println(s-  $x
 ))

 val result = sqlC.sql(
   select * from products_avro order by month, name, price
   )
 result.collect.foreach(println)
   }
 }

 The first job (involving map) runs fine. HiveConf is instantiated and
 the conf variable is looked up etc. But the second job (involving the
 select * query) throws the class not found exception.

 The task deserializer is the one throwing the exception. It is unable
 to find the class in its classpath. Not sure what is different from the
 first job which also involved HiveConf.

 157573 [task-result-getter-3] 2015/04/20 11:01:48:287 WARN
 TaskSetManager: Lost task 0.2 in stage 2.0 (TID 4, localhost):
 java.lang.NoClassDefFoundError: org/apache/hadoop/hive/conf/HiveConf
 at java.lang.Class.getDeclaredFields0(Native Method)
 at java.lang.Class.privateGetDeclaredFields(Class.java:2436)
 at java.lang.Class.getDeclaredField(Class.java:1946)
 at
 java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1659)
 at java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:72)
 at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:480)
 at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.io.ObjectStreamClass.init(ObjectStreamClass.java:468)
 at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365)
 at
 java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:602)
 at
 java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
 at
 java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at
 

Re: How does GraphX stores the routing table?

2015-04-22 Thread MUHAMMAD AAMIR
Hi Ankur,
Thanks for the answer. However i still have following queries.

On Wed, Apr 22, 2015 at 8:39 AM, Ankur Dave ankurd...@gmail.com wrote:

 On Tue, Apr 21, 2015 at 10:39 AM, mas mas.ha...@gmail.com wrote:

 How does GraphX stores the routing table? Is it stored on the master node
 or
 chunks of the routing table is send to each partition that maintains the
 record of vertices and edges at that node?


 The latter: the routing table is stored alongside the vertices, and for
 each vertex it stores the set of edge partitions that reference that
 vertex.


*Then how the master node tracks that where(in which partition) a
particular vertex and edge is?*


*Further, does it mean that to fetch a particular edge we first have to
find its source or destination vertex  ?*



 If only customized edge partitioning is performed will the corresponding
 vertices be sent to same partition or not ?


 If I understand correctly, you're asking whether it's possible to colocate
 the vertices with the edges so they don't have to move during replication.
 It's possible to do this in some cases by partitioning each edge based on a
 hash partitioner of its source or destination vertex. GraphX will still do
 replication using a shuffle, but most of the shuffle files should be local
 in this case.

 I tried this a while ago but didn't find a very big improvement for
 PageRank. Ultimately a more general solution would be to unify the vertex
 and edge RDDs by designating one replica for each vertex as the master.
 This would also reduce the storage cost by a factor of (average degree -
 1)/(average degree).


*What do you exactly mean here by desingating one replica for each vertex
as the master ? How can we perform this ?*


 Ankur http://www.ankurdave.com/




-- 
Regards,
Muhammad Aamir


*CONFIDENTIALITY:This email is intended solely for the person(s) named and
may be confidential and/or privileged.If you are not the intended
recipient,please delete it,notify me and do not copy,use,or disclose its
content.*


Re: Not able run multiple tasks in parallel, spark streaming

2015-04-22 Thread Akhil Das
You can enable this flag to run multiple jobs concurrently, It might not be
production ready, but you can give it a try:

sc.set(spark.streaming.concurrentJobs,2)

​Refer to TD's answer here
http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming#answers-header
for more information.​


Thanks
Best Regards

On Wed, Apr 22, 2015 at 8:57 AM, Abhay Bansal abhaybansal.1...@gmail.com
wrote:

 Hi,

 I have use case wherein I have to join multiple kafka topics in parallel.
 So if there are 2n topics there is a one to one mapping of topics which
 needs to be joined.


 val arr= ...

 for(condition) {

 val dStream1 = KafkaUtils.createDirectStream[String, String,
 StringDecoder, StringDecoder](ssc, kafkaParams, topics1
 ).map(a=(getKey1(a._2),a._2))

 val dStream2 = KafkaUtils.createDirectStream[String, String,
 StringDecoder, StringDecoder](ssc, kafkaParams, topics2
 ).map(a=(getKey2(a._2),a._2))

arr(counter) = (dStream1, dStream2);

counter+=1;

 }



 arr.par.foreach {

 case(dStream1, dStream2) = try {

 val joined = dStream1.join(dStream2,4);

 joined.saveAsTextFiles(joinedData”)

 }

 catch {

 case t:Exception =t.printStackTrace();

 }

 }



 ssc.start()

 ssc.awaitTermination()


 Doing so the streams are getting joined by sequentially. Is there a way
 out of this? I am new to spark, would appreciate any suggestions around
 this.


 Thanks,

 -Abhay



Hive table creation - possible bug in Spark 1.3?

2015-04-22 Thread Ophir Cohen
I wrote few mails here regarding this issue.
After further investigation I think there is a bug in Spark 1.3 in saving
Hive tables.

(hc is HiveContext)

1. Verify the needed configuration exists:
scala hc.sql(set hive.exec.compress.output).collect
res4: Array[org.apache.spark.sql.Row] =
Array([hive.exec.compress.output=true])
scala hc.sql(set
mapreduce.output.fileoutputformat.compress.codec).collect
res5: Array[org.apache.spark.sql.Row] =
Array([mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec])
scala hc.sql(set mapreduce.output.fileoutputformat.compress.type).collect
res6: Array[org.apache.spark.sql.Row] =
Array([mapreduce.output.fileoutputformat.compress.type=BLOCK])
2. Loading DataFrame and save as table (path point to exists file):
val saDF = hc.parquetFile(path)
saDF.count

(count yield 229764 - i.e. the rdd exists)
saDF.saveAsTable(test_hive_ms)

Now for few interesting outputs:
1. Trying to query Hive CLI, the table exists but with wrong output format:
Failed with exception java.io.IOException:java.io.IOException: hdfs://
10.166.157.97:9000/user/hive/warehouse/test_hive_ms/part-r-1.parquet
not a SequenceFile
2. Looking at the output files found that files are '.parquet' and not
'.snappy'
3. Looking at the saveAsTable output shows that it actually store the table
in both, wrong output format and without compression:
15/04/22 07:16:54 INFO metastore.HiveMetaStore: 0: create_table:
Table(tableName:test_hive_ms, dbName:default, owner:hadoop,
createTime:1429687014, lastAccessTime:0, retention:0,
sd:StorageDescriptor(cols:[FieldSchema(name:col, type:arraystring,
comment:from deserializer)], location:null,
inputFormat:org.apache.hadoop.mapred.SequenceFileInputFormat,
outputFormat:org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat,
compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null,
serializationLib:org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe,
parameters:{serialization.format=1, path=hdfs://
10.166.157.97:9000/user/hive/warehouse/test_hive_ms}), bucketCols:[],
sortCols:[], parameters:{}, skewedInfo:SkewedInfo(skewedColNames:[],
skewedColValues:[], skewedColValueLocationMaps:{})), partitionKeys:[],
parameters:{spark.sql.sources.schema.part.0={type:struct,fields:[{name:ADJDATE,type:long,nullable:true,metadata:{}},{name:sid,type:integer,nullable:true,metadata:{}},{name:ADJTYPE,type:integer,nullable:true,metadata:{}},{name:ENDADJDATE,type:long,nullable:true,metadata:{}},{name:ADJFACTOR,type:double,nullable:true,metadata:{}},{name:CUMADJFACTOR,type:double,nullable:true,metadata:{}}]},
EXTERNAL=FALSE, spark.sql.sources.schema.numParts=1,
spark.sql.sources.provider=org.apache.spark.sql.parquet},
viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE)

So, the question is: do I miss some configuration here or should I open a
bug?

Thanks,
Ophir


Re: Addition of new Metrics for killed executors.

2015-04-22 Thread twinkle sachdeva
Hi,

Looks interesting.

It is quite interesting to know about what could have been the reason for
not showing these stats in UI.

As per the description of Patrick W in
https://spark-project.atlassian.net/browse/SPARK-999, it does not mention
any exception w.r.t failed tasks/executors.

Can somebody please comment if it is a bug or some intended behaviour w.r.t
performance or some other bottleneck.

--Twinkle




On Mon, Apr 20, 2015 at 2:47 PM, Archit Thakur archit279tha...@gmail.com
wrote:

 Hi Twinkle,

 We have a use case in where we want to debug the reason of how n why an
 executor got killed.
 Could be because of stackoverflow, GC or any other unexpected scenario.
 If I see the driver UI there is no information present around killed
 executors, So was just curious how do people usually debug those things
 apart from scanning logs and understanding it. The metrics we are planning
 to add are similar to what we have for non killed executors - [data per
 stage specifically] - numFailedTasks, executorRunTime, inputBytes,
 memoryBytesSpilled .. etc.

 Apart from that we also intend to add all information present in an
 executor tabs for running executors.

 Thanks,
 Archit Thakur.

 On Mon, Apr 20, 2015 at 1:31 PM, twinkle sachdeva 
 twinkle.sachd...@gmail.com wrote:

 Hi Archit,

 What is your use case and what kind of metrics are you planning to add?

 Thanks,
 Twinkle

 On Fri, Apr 17, 2015 at 4:07 PM, Archit Thakur archit279tha...@gmail.com
  wrote:

 Hi,

 We are planning to add new Metrics in Spark for the executors that got
 killed during the execution. Was just curious, why this info is not already
 present. Is there some reason for not adding it.?
 Any ideas around are welcome.

 Thanks and Regards,
 Archit Thakur.






Re: Multiple HA spark clusters managed by 1 ZK cluster?

2015-04-22 Thread Steve Loughran
the key thing would be to use different ZK paths for each cluster. You 
shouldn't need more than 2 ZK quorums even for a large (few thousand node) 
Hadoop clusters: one for the HA bits of the infrastructure (HDFS, YARN) and one 
for the applications to abuse. It's easy for apps using ZK to stick too much 
stuff, with too high a rate of change for ZK to be happy; overloading ZK can 
then impact those core infrastructure services. I don't believe that Spark is 
in the category of antisocial ZK Apps. 

 On 22 Apr 2015, at 11:05, Sean Owen so...@cloudera.com wrote:
 
 Not that i've tried it, but, why couldn't you use one ZK server? I
 don't see a reason.
 
 On Wed, Apr 22, 2015 at 7:40 AM, Akhil Das ak...@sigmoidanalytics.com wrote:
 It isn't mentioned anywhere in the doc, but you will probably need separate
 ZK for each of your HA cluster.
 
 Thanks
 Best Regards
 
 On Wed, Apr 22, 2015 at 12:02 AM, Michal Klos michal.klo...@gmail.com
 wrote:
 
 Hi,
 
 I'm trying to set up multiple spark clusters with high availability and I
 was wondering if I can re-use a single ZK cluster to manage them? It's not
 very clear in the docs and it seems like the answer may be that I need a
 separate ZK cluster for each spark cluster?
 
 thanks,
 M
 
 
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Scheduling across applications - Need suggestion

2015-04-22 Thread Arun Patel
I believe we can use the properties like --executor-memory
 --total-executor-cores to configure the resources allocated for each
application.  But, in a multi user environment, shells and applications are
being submitted by multiple users at the same time.  All users are
requesting resources with different properties.  At times, some users are
not getting resources of the cluster.


How to control resource usage in this case?  Please share any best
practices followed.


As per my understanding, Fair scheduler can used for scheduling tasks
within an application but not across multiple applications.  Is this
correct?


Regards,

Arun


Re: Spark Streaming updatyeStateByKey throws OutOfMemory Error

2015-04-22 Thread Sourav Chandra
Anyone?

On Wed, Apr 22, 2015 at 12:29 PM, Sourav Chandra 
sourav.chan...@livestream.com wrote:

 Hi Olivier,

 *the update function is as below*:

 *val updateFunc = (values: Seq[IConcurrentUsers], state: Option[(Long,
 Long)]) = {*
 *  val previousCount = state.getOrElse((0L, 0L))._2*
 *  var startValue: IConcurrentUsers = ConcurrentViewers(0)*
 *  var currentCount = 0L*
 *  val lastIndexOfConcurrentUsers =*
 *values.lastIndexWhere(_.isInstanceOf[ConcurrentViewers])*
 *  val subList = values.slice(0, lastIndexOfConcurrentUsers)*
 *  val currentCountFromSubList = subList.foldLeft(startValue)(_ op
 _).count + previousCount*
 *  val lastConcurrentViewersCount =
 values(lastIndexOfConcurrentUsers).count*

 *  if (math.abs(lastConcurrentViewersCount - currentCountFromSubList)
 = 1) {*
 *logger.error(*
 *  sCount using state updation $currentCountFromSubList,  +*
 *sConcurrentUsers count $lastConcurrentViewersCount +*
 *s resetting to $lastConcurrentViewersCount*
 *)*
 *currentCount = lastConcurrentViewersCount*
 *  }*
 *  val remainingValuesList = values.diff(subList)*
 *  startValue = ConcurrentViewers(currentCount)*
 *  currentCount = remainingValuesList.foldLeft(startValue)(_ op
 _).count*

 *  if (currentCount  0) {*

 *logger.error(*
 *  sERROR: Got new count $currentCount  0, value:$values,
 state:$state, resetting to 0*
 *)*
 *currentCount = 0*
 *  }*
 *  // to stop pushing subsequent 0 after receiving first 0*
 *  if (currentCount == 0  previousCount == 0) None*
 *  else Some(previousCount, currentCount)*
 *}*

 *trait IConcurrentUsers {*
 *  val count: Long*
 *  def op(a: IConcurrentUsers): IConcurrentUsers =
 IConcurrentUsers.op(this, a)*
 *}*

 *object IConcurrentUsers {*
 *  def op(a: IConcurrentUsers, b: IConcurrentUsers): IConcurrentUsers =
 (a, b) match {*
 *case (_, _: ConcurrentViewers) = *
 *  ConcurrentViewers(b.count)*
 *case (_: ConcurrentViewers, _: IncrementConcurrentViewers) = *
 *  ConcurrentViewers(a.count + b.count)*
 *case (_: ConcurrentViewers, _: DecrementConcurrentViewers) = *
 *  ConcurrentViewers(a.count - b.count)*
 *  }*
 *}*

 *case class IncrementConcurrentViewers(count: Long) extends
 IConcurrentUsers*
 *case class DecrementConcurrentViewers(count: Long) extends
 IConcurrentUsers*
 *case class ConcurrentViewers(count: Long) extends IConcurrentUsers*


 *also the error stack trace copied from executor logs is:*

 *java.lang.OutOfMemoryError: Java heap space*
 *at
 org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183)*
 *at
 org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2564)*
 *at
 org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285)*
 *at
 org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77)*
 *at
 org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43)*
 *at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)*
 *at
 org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)*
 *at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)*
 *at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)*
 *at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
 *at java.lang.reflect.Method.invoke(Method.java:601)*
 *at
 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004)*
 *at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866)*
 *at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)*
 *at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)*
 *at
 java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)*
 *at
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)*
 *at
 org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:236)*
 *at
 org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readObject$1.apply$mcV$sp(TorrentBroadcast.scala:169)*
 *at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)*
 *at
 org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:155)*
 *at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)*
 *at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
 *at java.lang.reflect.Method.invoke(Method.java:601)*
 *at
 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004)*
 *at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866)*
 *at
 

Re: Multiple HA spark clusters managed by 1 ZK cluster?

2015-04-22 Thread Sean Owen
Not that i've tried it, but, why couldn't you use one ZK server? I
don't see a reason.

On Wed, Apr 22, 2015 at 7:40 AM, Akhil Das ak...@sigmoidanalytics.com wrote:
 It isn't mentioned anywhere in the doc, but you will probably need separate
 ZK for each of your HA cluster.

 Thanks
 Best Regards

 On Wed, Apr 22, 2015 at 12:02 AM, Michal Klos michal.klo...@gmail.com
 wrote:

 Hi,

 I'm trying to set up multiple spark clusters with high availability and I
 was wondering if I can re-use a single ZK cluster to manage them? It's not
 very clear in the docs and it seems like the answer may be that I need a
 separate ZK cluster for each spark cluster?

 thanks,
 M



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to write Hive's map(key, value, ...) in Spark SQL DSL

2015-04-22 Thread Jianshi Huang
Hi,

I want to write this in Spark SQL DSL:

select map('c1', c1, 'c2', c2) as m
from table

Is there a way?

-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Error in creating spark RDD

2015-04-22 Thread madhvi

Hi,

I am creating a spark RDD through accumulo writing like:

JavaPairRDDKey, Value accumuloRDD = 
sc.newAPIHadoopRDD(accumuloJob.getConfiguration(),AccumuloInputFormat.class,Key.class, 
Value.class);


But I am getting the following error and it is not getting compiled:

Bound mismatch: The generic method newAPIHadoopRDD(Configuration, 
ClassF, ClassK, ClassV) of type JavaSparkContext is not applicable 
for the arguments (Configuration, ClassAccumuloInputFormat, 
ClassKey, ClassValue). The inferred type AccumuloInputFormat is not 
a valid substitute for the bounded parameter F extends InputFormatK,V


I am using the following import statements:

import org.apache.accumulo.core.client.mapred.AccumuloInputFormat;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;

I am not getting what is the problem in this.

Thanks
Madhvi


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Shuffle question

2015-04-22 Thread Marius Danciu
Anyone ?


On Tue, Apr 21, 2015 at 3:38 PM Marius Danciu marius.dan...@gmail.com
wrote:

 Hello anyone,

 I have a question regarding the sort shuffle. Roughly I'm doing something
 like:

 rdd.mapPartitionsToPair(f1).groupByKey().mapPartitionsToPair(f2)

 The problem is that in f2 I don't see the keys being sorted. The keys are
 Java Comparable  not scala.math.Ordered or scala.math.Ordering (it would be
 weird for each key to implement Ordering as mentioned in the JIRA item
 https://issues.apache.org/jira/browse/SPARK-2045)

 Questions:
 1. Do I need to explicitly sortByKey ? (if I do this I can see the keys
 correctly sorted in f2) ... but I'm worried about the extra costs since
 Spark 1.3.0 is supposed to use the SORT shuffle manager by default, right ?
 2. Do I need each key to be an scala.math.Ordered ? ... is Java Comparable
 used at all ?

 ... btw I'm using Spark from Java ... don't ask me why :)



 Best,
 Marius