Redirect Spark Logs to Kafka

2016-02-01 Thread Ashish Soni
Hi All ,

Please let me know how we can redirect spark logging files or tell spark to
log to kafka queue instead of files ..

Ashish


Failed job not throwing exception

2016-02-01 Thread Nick Buroojy
Hi All,

I'm seeing some odd behavior with a Spark-Elasticsearch loading job. The
load seems to fail, it only loads part of the data, and there is a
stacktrace on the Driver, but there is no exception propagated to the top
level. Has anyone seen "swallowed" failures before?

We're using spark 1.5.2

16/01/31 18:34:48 INFO scheduler.DAGScheduler: Job 1 failed: runJob at
EsSparkSQL.scala:57, took 129.232414 s
org.apache.spark.SparkException: Job aborted due to stage failure: Task 15
in stage 1.0 failed 4 times, most recent failure: Lost task 15.3 in stage
1.0 (TID 83, ip-10-4-5-32.ec2.internal): UnknownReason
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$failJobAndIndependentStages(DAGScheduler.scala:1283)
at
org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at
org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)

at
org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)

at
org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)

at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)

at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)

at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)

at org.apache.spark.util.EventLoop$anon$1.run(EventLoop.scala:48)
at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)

at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)

at org.apache.spark.SparkContext.runJob(SparkContext.scala:1914)
at
org.elasticsearch.spark.sql.EsSparkSQL$.saveToEs(EsSparkSQL.scala:57)

at
org.elasticsearch.spark.sql.EsSparkSQL$.saveToEs(EsSparkSQL.scala:45)

at com.civitaslearning.[redacted]
at scala.util.Try$.apply(Try.scala:161)
at com.civitaslearning.[redacted]

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.scala:674)

at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

-- 

Nick Buroojy, Software Engineer

Civitas Learning, Inc.

100 Congress Avenue, Suite 300, Austin, TX 78701

e: n...@civitaslearning.com


How to deal with same class mismatch?

2016-02-01 Thread Daniel Valdivia
Hi, I'm having a couple of issues.

I'm experiencing a known issue 
 on the spark-shell where I'm 
getting a type mismatch for the right class

:82: error: type mismatch; 
found : 
org.apache.spark.rdd.org.apache.spark.rdd.org.apache.spark.rdd.org.apache.spark.rdd.org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint]
 required: 
org.apache.spark.rdd.org.apache.spark.rdd.org.apache.spark.rdd.org.apache.spark.rdd.org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint]

I was wondering if anyone has found a way around this?

I was trying to dump my RDD into a brand new RDD, and each element of the 
LabeledPoint into a new one, just in case there was the internal class causing 
the problem however I can't seem to be able to access the vectors inside my 
LabeledPoint. in the process of generating my RDD I did used Java Maps and 
converted them back to scala

Any advice on how to remap this LabelPoint?

tfidfs.take(1)
res143: Array[org.apache.spark.mllib.regression.LabeledPoint] = 
Array((143.0,(7175,[2738,4134,4756,6354,6424],[492.63076923076926,11.060794473229707,2.7010544074230283,57.69549549549549,76.2404761904762])))

tfidfs.take(1)(0).label
res144: Double = 143.0 

tfidfs.take(1)(0).features
res145: org.apache.spark.mllib.linalg.Vector = 
(7175,[2738,4134,4756,6354,6424],[492.63076923076926,11.060794473229707,2.7010544074230283,57.69549549549549,76.2404761904762])
 

tfidfs.take(1)(0).features(0)
res146: Double = 0.0 

tfidfs.take(1)(0).features(1)
res147: Double = 0.0 

tfidfs.take(1)(0).features(2)
res148: Double = 0.0

How to build interactive dash boards with spark?

2016-02-01 Thread Andy Davidson


Over the weekend I started playing around with Shinny. I built a very simple
Shinny App using R Studio. Shinny makes it easy to build a web page that
interact with R.

https://aedwip.shinyapps.io/developingDataProducts_CourseProject/

http://shiny.rstudio.com/
https://www.shinyapps.io/

Two years ago I build an streaming spark app that pushed caused an iPython
notebook, MatPlotLib graph to update. It was pretty easy. I am sure the
notebook infra struct has improved a lot. The changes with the work I did in
the past was there was not way for the user to interact with the data. E.G.
Image they wanted to graph data between two values. Also It was written like
a notebook/research paper not a commercial application.


What are other people doing?

Andy




RE: Using Java spring injection with spark

2016-02-01 Thread Sambit Tripathy (RBEI/EDS1)


1.  It depends on what you want to do. Don’t worry about singleton and 
wiring the beans as it is pretty much taken care by the Spark Framework itself. 
Infact doing so, you will run into issues like serialization errors.



2.  You can write your code using Scala/ Python using the spark shell or a 
notebook like Ipython, zeppelin  or if you have written a application using 
Scala/Java using the Spark API you can create a jar and run it using 
spark-submit.

From: HARSH TAKKAR [mailto:takkarha...@gmail.com]
Sent: Monday, February 01, 2016 10:00 AM
To: user@spark.apache.org
Subject: Re: Using Java spring injection with spark


Hi

Please can anyone reply on this.

On Mon, 1 Feb 2016, 4:28 p.m. HARSH TAKKAR 
> wrote:
Hi
I am new to apache spark and big data analytics, before starting to code on 
spark data frames and rdd, i just wanted to confirm following
1. Can we create an implementation of java.api.Function as a singleton bean 
using the spring frameworks and, can it be injected using autowiring to other 
classes.
2. what is the best way to submit jobs to spark , using the api or using the 
shell script?
Looking forward for your help,

Kind Regards
Harsh


Getting the size of a broadcast variable

2016-02-01 Thread apu mishra . rr
How can I determine the size (in bytes) of a broadcast variable? Do I need
to use the .dump method and then look at the size of the result, or is
there an easier way?

Using PySpark with Spark 1.6.

Thanks!

Apu


try to read multiple bz2 files in s3

2016-02-01 Thread Lin, Hao
When I tried to read multiple bz2 files from s3, I have the following warning 
messages. What is the problem here?

16/02/01 22:30:30 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 
10.162.67.248): java.lang.ArrayIndexOutOfBoundsException: -1844424343
at 
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode0(CBZip2InputStream.java:1014)
at 
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:829)
at 
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:504)
at 
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
at 
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:399)
at 
org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:483)
at java.io.InputStream.read(InputStream.java:101)
at 
org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader.fillBuffer(CompressedSplitLineReader.java:130)
at 
org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
at 
org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader.readLine(CompressedSplitLineReader.java:159)
at 
org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:209)
at 
org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:47)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:248)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:216)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1553)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1125)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1125)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Confidentiality Notice::  This email, including attachments, may include 
non-public, proprietary, confidential or legally privileged information.  If 
you are not an intended recipient or an authorized agent of an intended 
recipient, you are hereby notified that any dissemination, distribution or 
copying of the information contained in or transmitted with this e-mail is 
unauthorized and strictly prohibited.  If you have received this email in 
error, please notify the sender by replying to this message and permanently 
delete this e-mail, its attachments, and any copies of it immediately.  You 
should not retain, copy or use this e-mail or any attachment for any purpose, 
nor disclose all or any part of the contents to any other person. Thank you.


Re: SPARK_WORKER_INSTANCES deprecated

2016-02-01 Thread Ted Yu
As the message (from SparkConf.scala) showed, you shouldn't
use SPARK_WORKER_INSTANCES any more.

FYI

On Mon, Feb 1, 2016 at 2:19 PM, Lin, Hao  wrote:

> Can I still use SPARK_WORKER_INSTANCES in conf/spark-env.sh?  the
> following is what I’ve got after trying to set this parameter and run
> spark-shell
>
>
>
> SPARK_WORKER_INSTANCES was detected (set to '32').
>
> This is deprecated in Spark 1.0+.
>
>
>
> Please instead use:
>
> - ./spark-submit with --num-executors to specify the number of executors
>
> - Or set SPARK_EXECUTOR_INSTANCES
>
> - spark.executor.instances to configure the number of instances in the
> spark config.
> Confidentiality Notice:: This email, including attachments, may include
> non-public, proprietary, confidential or legally privileged information. If
> you are not an intended recipient or an authorized agent of an intended
> recipient, you are hereby notified that any dissemination, distribution or
> copying of the information contained in or transmitted with this e-mail is
> unauthorized and strictly prohibited. If you have received this email in
> error, please notify the sender by replying to this message and permanently
> delete this e-mail, its attachments, and any copies of it immediately. You
> should not retain, copy or use this e-mail or any attachment for any
> purpose, nor disclose all or any part of the contents to any other person.
> Thank you.
>


SPARK_WORKER_INSTANCES deprecated

2016-02-01 Thread Lin, Hao
Can I still use SPARK_WORKER_INSTANCES in conf/spark-env.sh?  the following is 
what I’ve got after trying to set this parameter and run spark-shell

SPARK_WORKER_INSTANCES was detected (set to '32').
This is deprecated in Spark 1.0+.

Please instead use:
- ./spark-submit with --num-executors to specify the number of executors
- Or set SPARK_EXECUTOR_INSTANCES
- spark.executor.instances to configure the number of instances in the spark 
config.

Confidentiality Notice::  This email, including attachments, may include 
non-public, proprietary, confidential or legally privileged information.  If 
you are not an intended recipient or an authorized agent of an intended 
recipient, you are hereby notified that any dissemination, distribution or 
copying of the information contained in or transmitted with this e-mail is 
unauthorized and strictly prohibited.  If you have received this email in 
error, please notify the sender by replying to this message and permanently 
delete this e-mail, its attachments, and any copies of it immediately.  You 
should not retain, copy or use this e-mail or any attachment for any purpose, 
nor disclose all or any part of the contents to any other person. Thank you.


Master failover and active jobs

2016-02-01 Thread aant00
Hi - 

I'm running Spark 1.5.2 in standalone mode with multiple masters using
zookeeper for failover.  The master fails over correctly to the standby when
it goes down, and running applications continue to run, but in the new
active master web UI, they are marked as "WAITING", and the workers have
these entries in their logs: 

16/01/30 00:51:13 ERROR Worker: Connection to master failed! Waiting for
master to reconnect... 
16/01/30 00:51:13 WARN Worker: Failed to connect to master XXX:7077 
akka.actor.ActorNotFound: Actor not found for:
ActorSelection[Anchor(akka.tcp://sparkMaster@XXX:7077/), Path(/user/Master)] 

Should they be "RUNNING" still? One time, it looks like the job stopped
functioning (This is a continuously running streaming job), but I haven't
been able to reproduce it.  FWIW, the driver that started it is still marked
as "RUNNING". 

Thanks. 
- Anthony 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Master-failover-and-active-jobs-tp26128.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Need help in spark-Scala program

2016-02-01 Thread Vinti Maheshwari
Hi,

Sorry, please ignore my message, It was sent by mistake. I am still
drafting.

Regards,
Vinti

On Mon, Feb 1, 2016 at 2:25 PM, Vinti Maheshwari 
wrote:

> Hi All,
>
> I recently started learning Spark. I need to use spark-streaming.
>
> 1) Input, need to read from MongoDB
>
> db.event_gcovs.find({executions:"56791a746e928d7b176d03c0", valid:1,
> infofile:{$exists:1}, geo:"sunnyvale"}, {infofile:1}).count()
>
> > Number of Info files: 24441
>
> /* 0 */
>
> {
>
> "_id" : ObjectId("568eaeda71404e5c563ccb86"),
>
> "infofile" :
> "/volume/testtech/datastore/code-coverage/p//infos/svl/6/56791a746e928d7b176d03c0/
> 69958.pcp_napt44_20368.pl.30090.exhibit.R0-re0.15.1I20151218_1934_jammyc.pfe.i386.TC011.fail.FAIL.gcov.info
> "
> }
>
> One info file can have 1000 of  these blocks( Each block starts from "SF"
> delimeter, and ends with the end_of_record.
>
>
>


RE: SPARK_WORKER_INSTANCES deprecated

2016-02-01 Thread Lin, Hao
If you look at the Spark Doc, variable SPARK_WORKER_INSTANCES  can still be 
specified but yet the SPARK_EXECUTOR_INSTANCES

http://spark.apache.org/docs/1.5.2/spark-standalone.html


From: Ted Yu [mailto:yuzhih...@gmail.com]
Sent: Monday, February 01, 2016 5:45 PM
To: Lin, Hao
Cc: user
Subject: Re: SPARK_WORKER_INSTANCES deprecated

As the message (from SparkConf.scala) showed, you shouldn't use 
SPARK_WORKER_INSTANCES any more.

FYI

On Mon, Feb 1, 2016 at 2:19 PM, Lin, Hao 
> wrote:
Can I still use SPARK_WORKER_INSTANCES in conf/spark-env.sh?  the following is 
what I’ve got after trying to set this parameter and run spark-shell

SPARK_WORKER_INSTANCES was detected (set to '32').
This is deprecated in Spark 1.0+.

Please instead use:
- ./spark-submit with --num-executors to specify the number of executors
- Or set SPARK_EXECUTOR_INSTANCES
- spark.executor.instances to configure the number of instances in the spark 
config.
Confidentiality Notice:: This email, including attachments, may include 
non-public, proprietary, confidential or legally privileged information. If you 
are not an intended recipient or an authorized agent of an intended recipient, 
you are hereby notified that any dissemination, distribution or copying of the 
information contained in or transmitted with this e-mail is unauthorized and 
strictly prohibited. If you have received this email in error, please notify 
the sender by replying to this message and permanently delete this e-mail, its 
attachments, and any copies of it immediately. You should not retain, copy or 
use this e-mail or any attachment for any purpose, nor disclose all or any part 
of the contents to any other person. Thank you.


Confidentiality Notice::  This email, including attachments, may include 
non-public, proprietary, confidential or legally privileged information.  If 
you are not an intended recipient or an authorized agent of an intended 
recipient, you are hereby notified that any dissemination, distribution or 
copying of the information contained in or transmitted with this e-mail is 
unauthorized and strictly prohibited.  If you have received this email in 
error, please notify the sender by replying to this message and permanently 
delete this e-mail, its attachments, and any copies of it immediately.  You 
should not retain, copy or use this e-mail or any attachment for any purpose, 
nor disclose all or any part of the contents to any other person. Thank you.


Re: local class incompatible: stream classdesc serialVersionUID

2016-02-01 Thread Holden Karau
So I'm a little confused to exactly how this might have happened - but one
quick guess is that maybe you've built an assembly jar with Spark core, can
you mark it is a provided and or post your build file?

On Fri, Jan 29, 2016 at 7:35 AM, Ted Yu  wrote:

> I logged SPARK-13084
>
> For the moment, please consider running with 1.5.2 on all the nodes.
>
> On Fri, Jan 29, 2016 at 5:29 AM, Jason Plurad  wrote:
>
>> I agree with you, Ted, if RDD had a serial version UID this might not be
>> an issue. So that could be a JIRA to submit to help avoid version
>> mismatches in future Spark versions, but that doesn't help my current
>> situation between 1.5.1 and 1.5.2.
>>
>> Any other ideas? Thanks.
>> On Thu, Jan 28, 2016 at 5:06 PM Ted Yu  wrote:
>>
>>> I am not Scala expert.
>>>
>>> RDD extends Serializable but doesn't have @SerialVersionUID()
>>> annotation.
>>> This may explain what you described.
>>>
>>> One approach is to add @SerialVersionUID so that RDD's have stable
>>> serial version UID.
>>>
>>> Cheers
>>>
>>> On Thu, Jan 28, 2016 at 1:38 PM, Jason Plurad  wrote:
>>>
 I've searched through the mailing list archive. It seems that if you
 try to run, for example, a Spark 1.5.2 program against a Spark 1.5.1
 standalone server, you will run into an exception like this:

 WARN  org.apache.spark.scheduler.TaskSetManager  - Lost task 0.0 in
 stage 0.0 (TID 0, 192.168.14.103): java.io.InvalidClassException:
 org.apache.spark.rdd.RDD; local class incompatible: stream classdesc
 serialVersionUID = -3343649307726848892, local class serialVersionUID =
 -3996494161745401652

 If my application is using a library that builds against Spark 1.5.2,
 does that mean that my application is now tied to that same Spark
 standalone server version?

 Is there a recommended way for that library to have a Spark dependency
 but keep it compatible against a wider set of versions, i.e. any version
 1.5.x?

 Thanks!

>>>
>>>
>


-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


Re: local class incompatible: stream classdesc serialVersionUID

2016-02-01 Thread Holden Karau
ah yah that would not work.

On Mon, Feb 1, 2016 at 2:31 PM, Shixiong(Ryan) Zhu 
wrote:

> I guess he used client model and the local Spark version is 1.5.2 but the
> standalone Spark version is 1.5.1. In other words, he used a 1.5.2 driver
> to talk with 1.5.1 executors.
>
> On Mon, Feb 1, 2016 at 2:08 PM, Holden Karau  wrote:
>
>> So I'm a little confused to exactly how this might have happened - but
>> one quick guess is that maybe you've built an assembly jar with Spark core,
>> can you mark it is a provided and or post your build file?
>>
>> On Fri, Jan 29, 2016 at 7:35 AM, Ted Yu  wrote:
>>
>>> I logged SPARK-13084
>>>
>>> For the moment, please consider running with 1.5.2 on all the nodes.
>>>
>>> On Fri, Jan 29, 2016 at 5:29 AM, Jason Plurad  wrote:
>>>
 I agree with you, Ted, if RDD had a serial version UID this might not
 be an issue. So that could be a JIRA to submit to help avoid version
 mismatches in future Spark versions, but that doesn't help my current
 situation between 1.5.1 and 1.5.2.

 Any other ideas? Thanks.
 On Thu, Jan 28, 2016 at 5:06 PM Ted Yu  wrote:

> I am not Scala expert.
>
> RDD extends Serializable but doesn't have @SerialVersionUID()
> annotation.
> This may explain what you described.
>
> One approach is to add @SerialVersionUID so that RDD's have stable
> serial version UID.
>
> Cheers
>
> On Thu, Jan 28, 2016 at 1:38 PM, Jason Plurad 
> wrote:
>
>> I've searched through the mailing list archive. It seems that if you
>> try to run, for example, a Spark 1.5.2 program against a Spark 1.5.1
>> standalone server, you will run into an exception like this:
>>
>> WARN  org.apache.spark.scheduler.TaskSetManager  - Lost task 0.0 in
>> stage 0.0 (TID 0, 192.168.14.103): java.io.InvalidClassException:
>> org.apache.spark.rdd.RDD; local class incompatible: stream classdesc
>> serialVersionUID = -3343649307726848892, local class serialVersionUID =
>> -3996494161745401652
>>
>> If my application is using a library that builds against Spark 1.5.2,
>> does that mean that my application is now tied to that same Spark
>> standalone server version?
>>
>> Is there a recommended way for that library to have a Spark
>> dependency but keep it compatible against a wider set of versions, i.e. 
>> any
>> version 1.5.x?
>>
>> Thanks!
>>
>
>
>>>
>>
>>
>> --
>> Cell : 425-233-8271
>> Twitter: https://twitter.com/holdenkarau
>>
>
>


-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


Re: SPARK_WORKER_INSTANCES deprecated

2016-02-01 Thread Ted Yu
I see.

A pull request can be submitted for spark-standalone.md

On Mon, Feb 1, 2016 at 2:51 PM, Lin, Hao  wrote:

> If you look at the Spark Doc, variable SPARK_WORKER_INSTANCES  can still
> be specified but yet the SPARK_EXECUTOR_INSTANCES
>
>
>
> http://spark.apache.org/docs/1.5.2/spark-standalone.html
>
>
>
>
>
> *From:* Ted Yu [mailto:yuzhih...@gmail.com]
> *Sent:* Monday, February 01, 2016 5:45 PM
> *To:* Lin, Hao
> *Cc:* user
> *Subject:* Re: SPARK_WORKER_INSTANCES deprecated
>
>
>
> As the message (from SparkConf.scala) showed, you shouldn't
> use SPARK_WORKER_INSTANCES any more.
>
>
>
> FYI
>
>
>
> On Mon, Feb 1, 2016 at 2:19 PM, Lin, Hao  wrote:
>
> Can I still use SPARK_WORKER_INSTANCES in conf/spark-env.sh?  the
> following is what I’ve got after trying to set this parameter and run
> spark-shell
>
>
>
> SPARK_WORKER_INSTANCES was detected (set to '32').
>
> This is deprecated in Spark 1.0+.
>
>
>
> Please instead use:
>
> - ./spark-submit with --num-executors to specify the number of executors
>
> - Or set SPARK_EXECUTOR_INSTANCES
>
> - spark.executor.instances to configure the number of instances in the
> spark config.
>
> Confidentiality Notice:: This email, including attachments, may include
> non-public, proprietary, confidential or legally privileged information. If
> you are not an intended recipient or an authorized agent of an intended
> recipient, you are hereby notified that any dissemination, distribution or
> copying of the information contained in or transmitted with this e-mail is
> unauthorized and strictly prohibited. If you have received this email in
> error, please notify the sender by replying to this message and permanently
> delete this e-mail, its attachments, and any copies of it immediately. You
> should not retain, copy or use this e-mail or any attachment for any
> purpose, nor disclose all or any part of the contents to any other person.
> Thank you.
>
>
> Confidentiality Notice:: This email, including attachments, may include
> non-public, proprietary, confidential or legally privileged information. If
> you are not an intended recipient or an authorized agent of an intended
> recipient, you are hereby notified that any dissemination, distribution or
> copying of the information contained in or transmitted with this e-mail is
> unauthorized and strictly prohibited. If you have received this email in
> error, please notify the sender by replying to this message and permanently
> delete this e-mail, its attachments, and any copies of it immediately. You
> should not retain, copy or use this e-mail or any attachment for any
> purpose, nor disclose all or any part of the contents to any other person.
> Thank you.
>


Using accumulator to push custom logs to driver

2016-02-01 Thread Utkarsh Sengar
I am trying to debug code executed in executors by logging. Even when I add
log4j's LOG.info(..) inside .map() I don't see it in mesos task logs in the
corresponding slaves.
Its anyway inefficient to keep checking multiple slaves for logs.

One way to deal with this is to push logs to a central location.

Another way (for debugging purposes) is to use accumulators . Is it
advisable to use accumulators to push string from executors to driver?
It will simplify things when I am debugging datasets, bugs which is hard to
reproduce locally etc.

Suggestions/comments?

-Utkarsh


Spark Streaming application designing question

2016-02-01 Thread Vinti Maheshwari
Hi,

I am new in spark. I wanted to do spark streaming setup to retrieve key
value pairs  of below format files:

file: info1

Note: Each info file will have around of 1000 of these records. And our
system continuously generating info files. So Through spark streaming i
wanted to aggregate result.

Can we give input to spark cluster this kind of files. I am interested in
the "SF" and "DA" delimiters only, "SF" corresponds to source file . And
"DA" corresponds the ( line number,  count).

As this input data is not the line format, so is this the good idea to use
these files for the spark input or should i need to do some intermediary
stage where i need to clean these files to generate new files which will
have each record information in line instead of blocks?
Or can we achieve this in Spark itself?

What should be the right approach?



*What i wanted to achieve? :*
I wanted to get line level information. Means, to get line (As a key) and
info files (as values)
My system continuously generating info files. So Through spark streaming i
wanted to aggregate result.

Final output i wanted is like below:
line178 -> (info1, info2, info7.)
line 2908 -> (info3, info90)

Do let me know if my explanation is not clear.


Thanks & Regards,
Vinti


Need help in spark-Scala program

2016-02-01 Thread Vinti Maheshwari
Hi All,

I recently started learning Spark. I need to use spark-streaming.

1) Input, need to read from MongoDB

db.event_gcovs.find({executions:"56791a746e928d7b176d03c0", valid:1,
infofile:{$exists:1}, geo:"sunnyvale"}, {infofile:1}).count()

> Number of Info files: 24441

/* 0 */

{

"_id" : ObjectId("568eaeda71404e5c563ccb86"),

"infofile" :
"/volume/testtech/datastore/code-coverage/p//infos/svl/6/56791a746e928d7b176d03c0/
69958.pcp_napt44_20368.pl.30090.exhibit.R0-re0.15.1I20151218_1934_jammyc.pfe.i386.TC011.fail.FAIL.gcov.info
"
}

One info file can have 1000 of  these blocks( Each block starts from "SF"
delimeter, and ends with the end_of_record.


Re: local class incompatible: stream classdesc serialVersionUID

2016-02-01 Thread Shixiong(Ryan) Zhu
I guess he used client model and the local Spark version is 1.5.2 but the
standalone Spark version is 1.5.1. In other words, he used a 1.5.2 driver
to talk with 1.5.1 executors.

On Mon, Feb 1, 2016 at 2:08 PM, Holden Karau  wrote:

> So I'm a little confused to exactly how this might have happened - but one
> quick guess is that maybe you've built an assembly jar with Spark core, can
> you mark it is a provided and or post your build file?
>
> On Fri, Jan 29, 2016 at 7:35 AM, Ted Yu  wrote:
>
>> I logged SPARK-13084
>>
>> For the moment, please consider running with 1.5.2 on all the nodes.
>>
>> On Fri, Jan 29, 2016 at 5:29 AM, Jason Plurad  wrote:
>>
>>> I agree with you, Ted, if RDD had a serial version UID this might not be
>>> an issue. So that could be a JIRA to submit to help avoid version
>>> mismatches in future Spark versions, but that doesn't help my current
>>> situation between 1.5.1 and 1.5.2.
>>>
>>> Any other ideas? Thanks.
>>> On Thu, Jan 28, 2016 at 5:06 PM Ted Yu  wrote:
>>>
 I am not Scala expert.

 RDD extends Serializable but doesn't have @SerialVersionUID()
 annotation.
 This may explain what you described.

 One approach is to add @SerialVersionUID so that RDD's have stable
 serial version UID.

 Cheers

 On Thu, Jan 28, 2016 at 1:38 PM, Jason Plurad 
 wrote:

> I've searched through the mailing list archive. It seems that if you
> try to run, for example, a Spark 1.5.2 program against a Spark 1.5.1
> standalone server, you will run into an exception like this:
>
> WARN  org.apache.spark.scheduler.TaskSetManager  - Lost task 0.0 in
> stage 0.0 (TID 0, 192.168.14.103): java.io.InvalidClassException:
> org.apache.spark.rdd.RDD; local class incompatible: stream classdesc
> serialVersionUID = -3343649307726848892, local class serialVersionUID =
> -3996494161745401652
>
> If my application is using a library that builds against Spark 1.5.2,
> does that mean that my application is now tied to that same Spark
> standalone server version?
>
> Is there a recommended way for that library to have a Spark dependency
> but keep it compatible against a wider set of versions, i.e. any version
> 1.5.x?
>
> Thanks!
>


>>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>


Re: Redirect Spark Logs to Kafka

2016-02-01 Thread Burak Yavuz
You can use the KafkaLog4jAppender (
https://github.com/apache/kafka/blob/trunk/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
).

Best,
Burak

On Mon, Feb 1, 2016 at 12:20 PM, Ashish Soni  wrote:

> Hi All ,
>
> Please let me know how we can redirect spark logging files or tell spark
> to log to kafka queue instead of files ..
>
> Ashish
>


Re: Using accumulator to push custom logs to driver

2016-02-01 Thread Holden Karau
I wouldn't use accumulators for things which could get large, they can
become kind of a bottle neck. Do you have a lot of string messages you want
to bring back or only a few?

On Mon, Feb 1, 2016 at 3:24 PM, Utkarsh Sengar 
wrote:

> I am trying to debug code executed in executors by logging. Even when I
> add log4j's LOG.info(..) inside .map() I don't see it in mesos task logs in
> the corresponding slaves.
> Its anyway inefficient to keep checking multiple slaves for logs.
>
> One way to deal with this is to push logs to a central location.
>
> Another way (for debugging purposes) is to use accumulators . Is it
> advisable to use accumulators to push string from executors to driver?
> It will simplify things when I am debugging datasets, bugs which is hard
> to reproduce locally etc.
>
> Suggestions/comments?
>
> -Utkarsh
>



-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


Re: try to read multiple bz2 files in s3

2016-02-01 Thread Robert Collich
Hi Hao,

Could you please post the corresponding code? Are you using textFile or
sc.parallelize?

On Mon, Feb 1, 2016 at 2:36 PM Lin, Hao  wrote:

> When I tried to read multiple bz2 files from s3, I have the following
> warning messages. What is the problem here?
>
>
>
> 16/02/01 22:30:30 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
> 10.162.67.248): java.lang.ArrayIndexOutOfBoundsException: -1844424343
>
> at
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode0(CBZip2InputStream.java:1014)
>
> at
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:829)
>
> at
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:504)
>
> at
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
>
> at
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:399)
>
> at
> org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:483)
>
> at java.io.InputStream.read(InputStream.java:101)
>
> at
> org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader.fillBuffer(CompressedSplitLineReader.java:130)
>
> at
> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
>
> at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
>
> at
> org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader.readLine(CompressedSplitLineReader.java:159)
>
> at
> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:209)
>
> at
> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:47)
>
> at
> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:248)
>
> at
> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:216)
>
> at
> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>
> at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>
> at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1553)
>
> at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1125)
>
> at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1125)
>
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850)
>
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850)
>
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
>
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
> at java.lang.Thread.run(Thread.java:745)
> Confidentiality Notice:: This email, including attachments, may include
> non-public, proprietary, confidential or legally privileged information. If
> you are not an intended recipient or an authorized agent of an intended
> recipient, you are hereby notified that any dissemination, distribution or
> copying of the information contained in or transmitted with this e-mail is
> unauthorized and strictly prohibited. If you have received this email in
> error, please notify the sender by replying to this message and permanently
> delete this e-mail, its attachments, and any copies of it immediately. You
> should not retain, copy or use this e-mail or any attachment for any
> purpose, nor disclose all or any part of the contents to any other person.
> Thank you.
>


Spark Standalone cluster job to connect Hbase is Stuck

2016-02-01 Thread sudhir patil
Spark job on Standalone cluster is Stuck, shows no logs after
"util.AkkaUtils: Connecting to HeartbeatReceiver" on worker nodes and
"storage.BlockmanagerInfo: Added broadcast..." on client driver side.

Would be great, if you could clarify any of these ( or better all of these
:)
1. Did anyone see similar issue? or any clues on what could be the reason?
2. How do you i increase debug or log level? to see whats actually
happening?
3. Any clues or links on how to use kerborised Hbase in spark standalone?


Re: Guidelines for writing SPARK packages

2016-02-01 Thread Burak Yavuz
Thanks for the reply David, just wanted to fix one part of your response:


> If you
> want to register a release for your package you will also need to push
> the artifacts for your package to Maven central.
>

It is NOT necessary to push to Maven Central in order to make a release.
There are many packages out there that don't publish to Maven Central, e.g.
scripts, and pure python packages.

Praveen, I would suggest taking a look at:
 - spark-package command line tool (
https://github.com/databricks/spark-package-cmd-tool), to get you set up
 - sbt-spark-package (https://github.com/databricks/sbt-spark-package) to
help with building/publishing if you plan to use Scala in your package. You
could of course use Maven as well, but we don't have a maven plugin for
Spark Packages.

Best,
Burak


Re: Spark Standalone cluster job to connect Hbase is Stuck

2016-02-01 Thread sudhir patil
Thanks Ted for quick reply.

I am using spark 1.2, exporting Hbase conf directory containing
hbase-site.xml in HADOOP_CLASSPATH & SPARK_CLASSPATH. Do i need to do
anything else?

Issues in connecting to kerberos Hbase through spark yarn cluster is fixed
spark 1.4+, so i am trying if it works in spark stand alone mode in spark
1.2, as i cannot upgrade cluster.
https://issues.apache.org/jira/browse/SPARK-6918



On Tue, Feb 2, 2016 at 8:31 AM, Ted Yu  wrote:

> Is the hbase-site.xml on the classpath of the worker nodes ?
>
> Which Spark release are you using ?
>
> Cheers
>
> On Mon, Feb 1, 2016 at 4:25 PM, sudhir patil 
> wrote:
>
>> Spark job on Standalone cluster is Stuck, shows no logs after
>> "util.AkkaUtils: Connecting to HeartbeatReceiver" on worker nodes and
>> "storage.BlockmanagerInfo: Added broadcast..." on client driver side.
>>
>> Would be great, if you could clarify any of these ( or better all of
>> these :)
>> 1. Did anyone see similar issue? or any clues on what could be the reason?
>> 2. How do you i increase debug or log level? to see whats actually
>> happening?
>> 3. Any clues or links on how to use kerborised Hbase in spark standalone?
>>
>
>


Re: Using accumulator to push custom logs to driver

2016-02-01 Thread Holden Karau
Ah if its manual ad-hoc logging of the 100 to 200 lines then thats probably
OK.

On Mon, Feb 1, 2016 at 3:48 PM, Utkarsh Sengar 
wrote:

> Not alot of string messages, I need it mostly for debugging purposed which
> I will use on an ahdoc basis - manually add debug statements which returns
> info about the dataset etc.
> I would assume the strings will vary from 100-200lines max, that would be
> about 50-100KB if they are really long lines.
>
> -Utkarsh
>
> On Mon, Feb 1, 2016 at 3:40 PM, Holden Karau  wrote:
>
>> I wouldn't use accumulators for things which could get large, they can
>> become kind of a bottle neck. Do you have a lot of string messages you want
>> to bring back or only a few?
>>
>> On Mon, Feb 1, 2016 at 3:24 PM, Utkarsh Sengar 
>> wrote:
>>
>>> I am trying to debug code executed in executors by logging. Even when I
>>> add log4j's LOG.info(..) inside .map() I don't see it in mesos task logs in
>>> the corresponding slaves.
>>> Its anyway inefficient to keep checking multiple slaves for logs.
>>>
>>> One way to deal with this is to push logs to a central location.
>>>
>>> Another way (for debugging purposes) is to use accumulators . Is it
>>> advisable to use accumulators to push string from executors to driver?
>>> It will simplify things when I am debugging datasets, bugs which is hard
>>> to reproduce locally etc.
>>>
>>> Suggestions/comments?
>>>
>>> -Utkarsh
>>>
>>
>>
>>
>> --
>> Cell : 425-233-8271
>> Twitter: https://twitter.com/holdenkarau
>>
>
>
>
> --
> Thanks,
> -Utkarsh
>



-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


questions about progress bar status [stuck]?

2016-02-01 Thread charles li
code:

---
total = int(1e8)
local_collection = range(1, total)
rdd = sc.parallelize(local_collection)
res = rdd.collect()
---

web ui status
---

​
problems:
---

1. from the status bar, it seems that the there should be about half tasks
done, but it just say there is no tasks done in the total 16 tasks.

2. the task just stuck, I have to kill it manually, but I don't know why it
stuck? any idea about this problem?

3. I tried to set total as `1e6`,it  works fine.


-- 
*--*
a spark lover, a quant, a developer and a good man.

http://github.com/litaotao


RE: saveAsTextFile is not writing to local fs

2016-02-01 Thread Mohammed Guller
If the data is not too big, one option is to call the collect method and then 
save the result to a local file using standard Java/Scala API. However, keep in 
mind that this will transfer data from all the worker nodes to the driver 
program. Looks like that is what you want to do anyway, but you need to be 
aware of how big that data is and related implications.

Mohammed
Author: Big Data Analytics with 
Spark

From: Siva [mailto:sbhavan...@gmail.com]
Sent: Monday, February 1, 2016 6:00 PM
To: Mohammed Guller
Cc: spark users
Subject: Re: saveAsTextFile is not writing to local fs

Hi Mohamed,

Thanks for your response. Data is available in worker nodes. But looking for 
something to write directly to local fs. Seems like it is not an option.

Thanks,
Sivakumar Bhavanari.

On Mon, Feb 1, 2016 at 5:45 PM, Mohammed Guller 
> wrote:
You should not be saving an RDD to local FS if Spark is running on a real 
cluster. Essentially, each Spark worker will save the partitions that it 
processes locally.

Check the directories on the worker nodes and you should find pieces of your 
file on each node.

Mohammed
Author: Big Data Analytics with 
Spark

From: Siva [mailto:sbhavan...@gmail.com]
Sent: Friday, January 29, 2016 5:40 PM
To: Mohammed Guller
Cc: spark users
Subject: Re: saveAsTextFile is not writing to local fs

Hi Mohammed,

Thanks for your quick response. I m submitting spark job to Yarn in 
"yarn-client" mode on a 6 node cluster. I ran the job by turning on DEBUG mode. 
I see the below exception, but this exception occurred after saveAsTextfile 
function is finished.

16/01/29 20:26:57 DEBUG HttpParser:
java.net.SocketException: Socket closed
at java.net.SocketInputStream.read(SocketInputStream.java:190)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at 
org.spark-project.jetty.io.ByteArrayBuffer.readFrom(ByteArrayBuffer.java:391)
at 
org.spark-project.jetty.io.bio.StreamEndPoint.fill(StreamEndPoint.java:141)
at 
org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.fill(SocketConnector.java:227)
at org.spark-project.jetty.http.HttpParser.fill(HttpParser.java:1044)
at 
org.spark-project.jetty.http.HttpParser.parseNext(HttpParser.java:280)
at 
org.spark-project.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)
at 
org.spark-project.jetty.server.BlockingHttpConnection.handle(BlockingHttpConnection.java:72)
at 
org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.run(SocketConnector.java:264)
at 
org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
at 
org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
at java.lang.Thread.run(Thread.java:745)
16/01/29 20:26:57 DEBUG HttpParser:
java.net.SocketException: Socket closed
at java.net.SocketInputStream.read(SocketInputStream.java:190)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at 
org.spark-project.jetty.io.ByteArrayBuffer.readFrom(ByteArrayBuffer.java:391)
at 
org.spark-project.jetty.io.bio.StreamEndPoint.fill(StreamEndPoint.java:141)
at 
org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.fill(SocketConnector.java:227)
at org.spark-project.jetty.http.HttpParser.fill(HttpParser.java:1044)
at 
org.spark-project.jetty.http.HttpParser.parseNext(HttpParser.java:280)
at 
org.spark-project.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)
at 
org.spark-project.jetty.server.BlockingHttpConnection.handle(BlockingHttpConnection.java:72)
at 
org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.run(SocketConnector.java:264)
at 
org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
at 
org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
at java.lang.Thread.run(Thread.java:745)
16/01/29 20:26:57 DEBUG HttpParser: HttpParser{s=-14,l=0,c=-3}
org.spark-project.jetty.io.EofException

Do you think this one this causing this?

Thanks,
Sivakumar Bhavanari.

On Fri, Jan 29, 2016 at 3:55 PM, Mohammed Guller 
> wrote:
Is it a multi-node cluster or you running Spark on a single machine?

You can change Spark’s logging level to INFO or DEBUG to see what is going on.

Mohammed
Author: Big Data Analytics with 
Spark

From: Siva [mailto:sbhavan...@gmail.com]
Sent: Friday, January 29, 2016 3:38 PM
To: spark users
Subject: saveAsTextFile is not writing 

unsubscribe email

2016-02-01 Thread Eduardo Costa Alfaia
Hi Guys,
How could I unsubscribe the email e.costaalf...@studenti.unibs.it, that is an 
alias from my email e.costaalf...@unibs.it and it is registered in the mail 
list .

Thanks

Eduardo Costa Alfaia
PhD Student Telecommunication Engineering
Università degli Studi di Brescia-UNIBS


-- 
Informativa sulla Privacy: http://www.unibs.it/node/8155


RE: saveAsTextFile is not writing to local fs

2016-02-01 Thread Mohammed Guller
You should not be saving an RDD to local FS if Spark is running on a real 
cluster. Essentially, each Spark worker will save the partitions that it 
processes locally.

Check the directories on the worker nodes and you should find pieces of your 
file on each node.

Mohammed
Author: Big Data Analytics with 
Spark

From: Siva [mailto:sbhavan...@gmail.com]
Sent: Friday, January 29, 2016 5:40 PM
To: Mohammed Guller
Cc: spark users
Subject: Re: saveAsTextFile is not writing to local fs

Hi Mohammed,

Thanks for your quick response. I m submitting spark job to Yarn in 
"yarn-client" mode on a 6 node cluster. I ran the job by turning on DEBUG mode. 
I see the below exception, but this exception occurred after saveAsTextfile 
function is finished.

16/01/29 20:26:57 DEBUG HttpParser:
java.net.SocketException: Socket closed
at java.net.SocketInputStream.read(SocketInputStream.java:190)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at 
org.spark-project.jetty.io.ByteArrayBuffer.readFrom(ByteArrayBuffer.java:391)
at 
org.spark-project.jetty.io.bio.StreamEndPoint.fill(StreamEndPoint.java:141)
at 
org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.fill(SocketConnector.java:227)
at org.spark-project.jetty.http.HttpParser.fill(HttpParser.java:1044)
at 
org.spark-project.jetty.http.HttpParser.parseNext(HttpParser.java:280)
at 
org.spark-project.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)
at 
org.spark-project.jetty.server.BlockingHttpConnection.handle(BlockingHttpConnection.java:72)
at 
org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.run(SocketConnector.java:264)
at 
org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
at 
org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
at java.lang.Thread.run(Thread.java:745)
16/01/29 20:26:57 DEBUG HttpParser:
java.net.SocketException: Socket closed
at java.net.SocketInputStream.read(SocketInputStream.java:190)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at 
org.spark-project.jetty.io.ByteArrayBuffer.readFrom(ByteArrayBuffer.java:391)
at 
org.spark-project.jetty.io.bio.StreamEndPoint.fill(StreamEndPoint.java:141)
at 
org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.fill(SocketConnector.java:227)
at org.spark-project.jetty.http.HttpParser.fill(HttpParser.java:1044)
at 
org.spark-project.jetty.http.HttpParser.parseNext(HttpParser.java:280)
at 
org.spark-project.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)
at 
org.spark-project.jetty.server.BlockingHttpConnection.handle(BlockingHttpConnection.java:72)
at 
org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.run(SocketConnector.java:264)
at 
org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
at 
org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
at java.lang.Thread.run(Thread.java:745)
16/01/29 20:26:57 DEBUG HttpParser: HttpParser{s=-14,l=0,c=-3}
org.spark-project.jetty.io.EofException

Do you think this one this causing this?

Thanks,
Sivakumar Bhavanari.

On Fri, Jan 29, 2016 at 3:55 PM, Mohammed Guller 
> wrote:
Is it a multi-node cluster or you running Spark on a single machine?

You can change Spark’s logging level to INFO or DEBUG to see what is going on.

Mohammed
Author: Big Data Analytics with 
Spark

From: Siva [mailto:sbhavan...@gmail.com]
Sent: Friday, January 29, 2016 3:38 PM
To: spark users
Subject: saveAsTextFile is not writing to local fs

Hi Everyone,

We are using spark 1.4.1 and we have a requirement of writing data local fs 
instead of hdfs.

When trying to save rdd to local fs with saveAsTextFile, it is just writing 
_SUCCESS file in the folder with no part- files and also no error or warning 
messages on console.

Is there any place to look at to fix this problem?

Thanks,
Sivakumar Bhavanari.



how to covert millisecond time to SQL timeStamp

2016-02-01 Thread Andy Davidson
What little I know about working with timestamps is based on
https://databricks.com/blog/2015/09/16/spark-1-5-dataframe-api-highlights-da
tetimestring-handling-time-intervals-and-udafs.html

Using the example of dates formatted into human friend strings -> timeStamps
I was able to figure out how to convert Epoch times to timestamps. The same
trick did not work for millisecond times.

Any suggestions would be greatly appreciated.


Andy

Working with epoch times
 
ref: http://www.epochconverter.com/
Epoch timestamp:  1456050620
Timestamp in milliseconds: 145605062
Human time (GMT): Sun, 21 Feb 2016 10:30:20 GMT
Human time (your time zone): 2/21/2016, 2:30:20 AM

# Epoch time stamp example
data = [
  ("1456050620", "1456050621", 1),
  ("1456050622", "14560506203", 2),
  ("14560506204", "14560506205", 3)]
df = sqlContext.createDataFrame(data, ["start_time", "end_time", "id"])
​
# convert epoch time strings in to spark timestamps
df = df.select(
  df.start_time.cast("long").alias("start_time"),
  df.end_time.cast("long").alias("end_time"),
  df.id)
df.printSchema()
df.show(truncate=False)
​
# convert longs to timestamps
df = df.select(
  df.start_time.cast("timestamp").alias("start_time"),
  df.end_time.cast("timestamp").alias("end_time"),
  df.id)
df.printSchema()
df.show(truncate=False)
​
root
 |-- start_time: long (nullable = true)
 |-- end_time: long (nullable = true)
 |-- id: long (nullable = true)

+---+---+---+
|start_time |end_time   |id |
+---+---+---+
|1456050620 |1456050621 |1  |
|1456050622 |14560506203|2  |
|14560506204|14560506205|3  |
+---+---+---+

root
 |-- start_time: timestamp (nullable = true)
 |-- end_time: timestamp (nullable = true)
 |-- id: long (nullable = true)

+-+-+---+
|start_time   |end_time |id |
+-+-+---+
|2016-02-21 02:30:20.0|2016-02-21 02:30:21.0|1  |
|2016-02-21 02:30:22.0|2431-05-28 02:03:23.0|2  |
|2431-05-28 02:03:24.0|2431-05-28 02:03:25.0|3  |
+-+-+---+

In [21]:
# working with millisecond times
data = [
  ("145605062", "145605062", 1)]
  
df = sqlContext.createDataFrame(data, ["start_time", "end_time", "id"])
​
# convert epoch time strings in to spark timestamps
df = df.select(
  df.start_time.cast("long").alias("start_time"),
  df.end_time.cast("long").alias("end_time"),
  df.id)
df.printSchema()
df.show(truncate=False)
​
# convert longs to timestamps
df = df.select(
  df.start_time.cast("timestamp").alias("start_time"),
  df.end_time.cast("timestamp").alias("end_time"),
  df.id)
df.printSchema()
df.show(truncate=False)
root
 |-- start_time: long (nullable = true)
 |-- end_time: long (nullable = true)
 |-- id: long (nullable = true)

+-+-+---+
|start_time   |end_time |id |
+-+-+---+
|145605062|145605062|1  |
+-+-+---+

root
 |-- start_time: timestamp (nullable = true)
 |-- end_time: timestamp (nullable = true)
 |-- id: long (nullable = true)

+--+--+---+
|start_time|end_time  |id |
+--+--+---+
|48110-05-29 10:33:20.0|48110-05-29 10:33:20.0|1  |
+--+--+---+





Re: TTransportException when using Spark 1.6.0 on top of Tachyon 0.8.2

2016-02-01 Thread Jia Zou
Hi, Calvin, I am running  24GB data Spark KMeans in a c3.2xlarge AWS 
instance with 30GB physical memory.
Spark will cache data off-heap to Tachyon, the input data is also stored in 
Tachyon.
Tachyon is configured to use 15GB memory, and use tired store.
Tachyon underFS is /tmp.

The only configuration I've changed is Tachyon data block size.

Above experiment is a part of a research project.

Best Regards,
Jia

On Thursday, January 28, 2016 at 9:11:19 PM UTC-6, Calvin Jia wrote:
>
> Hi,
>
> Thanks for the detailed information. How large is the dataset you are 
> running against? Also did you change any Tachyon configurations?
>
> Thanks,
> Calvin
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: saveAsTextFile is not writing to local fs

2016-02-01 Thread Siva
Hi Mohamed,

Thanks for your response. Data is available in worker nodes. But looking
for something to write directly to local fs. Seems like it is not an option.

Thanks,
Sivakumar Bhavanari.

On Mon, Feb 1, 2016 at 5:45 PM, Mohammed Guller 
wrote:

> You should not be saving an RDD to local FS if Spark is running on a real
> cluster. Essentially, each Spark worker will save the partitions that it
> processes locally.
>
>
>
> Check the directories on the worker nodes and you should find pieces of
> your file on each node.
>
>
>
> Mohammed
>
> Author: Big Data Analytics with Spark
> 
>
>
>
> *From:* Siva [mailto:sbhavan...@gmail.com]
> *Sent:* Friday, January 29, 2016 5:40 PM
> *To:* Mohammed Guller
> *Cc:* spark users
> *Subject:* Re: saveAsTextFile is not writing to local fs
>
>
>
> Hi Mohammed,
>
>
>
> Thanks for your quick response. I m submitting spark job to Yarn in
> "yarn-client" mode on a 6 node cluster. I ran the job by turning on DEBUG
> mode. I see the below exception, but this exception occurred after
> saveAsTextfile function is finished.
>
>
>
> 16/01/29 20:26:57 DEBUG HttpParser:
>
> java.net.SocketException: Socket closed
>
> at java.net.SocketInputStream.read(SocketInputStream.java:190)
>
> at java.net.SocketInputStream.read(SocketInputStream.java:122)
>
> at
> org.spark-project.jetty.io.ByteArrayBuffer.readFrom(ByteArrayBuffer.java:391)
>
> at
> org.spark-project.jetty.io.bio.StreamEndPoint.fill(StreamEndPoint.java:141)
>
> at
> org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.fill(SocketConnector.java:227)
>
> at
> org.spark-project.jetty.http.HttpParser.fill(HttpParser.java:1044)
>
> at
> org.spark-project.jetty.http.HttpParser.parseNext(HttpParser.java:280)
>
> at
> org.spark-project.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)
>
> at
> org.spark-project.jetty.server.BlockingHttpConnection.handle(BlockingHttpConnection.java:72)
>
> at
> org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.run(SocketConnector.java:264)
>
> at
> org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
>
> at
> org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
>
> at java.lang.Thread.run(Thread.java:745)
>
> 16/01/29 20:26:57 DEBUG HttpParser:
>
> java.net.SocketException: Socket closed
>
> at java.net.SocketInputStream.read(SocketInputStream.java:190)
>
> at java.net.SocketInputStream.read(SocketInputStream.java:122)
>
> at
> org.spark-project.jetty.io.ByteArrayBuffer.readFrom(ByteArrayBuffer.java:391)
>
> at
> org.spark-project.jetty.io.bio.StreamEndPoint.fill(StreamEndPoint.java:141)
>
> at
> org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.fill(SocketConnector.java:227)
>
> at
> org.spark-project.jetty.http.HttpParser.fill(HttpParser.java:1044)
>
> at
> org.spark-project.jetty.http.HttpParser.parseNext(HttpParser.java:280)
>
> at
> org.spark-project.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)
>
> at
> org.spark-project.jetty.server.BlockingHttpConnection.handle(BlockingHttpConnection.java:72)
>
> at
> org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.run(SocketConnector.java:264)
>
> at
> org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
>
> at
> org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
>
> at java.lang.Thread.run(Thread.java:745)
>
> 16/01/29 20:26:57 DEBUG HttpParser: HttpParser{s=-14,l=0,c=-3}
>
> org.spark-project.jetty.io.EofException
>
>
>
> Do you think this one this causing this?
>
>
> Thanks,
>
> Sivakumar Bhavanari.
>
>
>
> On Fri, Jan 29, 2016 at 3:55 PM, Mohammed Guller 
> wrote:
>
> Is it a multi-node cluster or you running Spark on a single machine?
>
>
>
> You can change Spark’s logging level to INFO or DEBUG to see what is going
> on.
>
>
>
> Mohammed
>
> Author: Big Data Analytics with Spark
> 
>
>
>
> *From:* Siva [mailto:sbhavan...@gmail.com]
> *Sent:* Friday, January 29, 2016 3:38 PM
> *To:* spark users
> *Subject:* saveAsTextFile is not writing to local fs
>
>
>
> Hi Everyone,
>
>
>
> We are using spark 1.4.1 and we have a requirement of writing data local
> fs instead of hdfs.
>
>
>
> When trying to save rdd to local fs with saveAsTextFile, it is just
> writing _SUCCESS file in the folder with no part- files and also no error
> or warning messages on console.
>
>
>
> Is there any place to look at to fix this problem?
>
>
> Thanks,
>
> Sivakumar Bhavanari.
>
>
>


Re: How to control the number of files for dynamic partition in Spark SQL?

2016-02-01 Thread Benyi Wang
Thanks Deenar, both two methods work.

I actually tried the second method in spark-shell, but it didn't work at
that time. The reason might be: I registered the data frame eventwk as a
temporary table, repartition, then register the table again. Unfortunately
I could not reproduce it.

Thanks again.

On Sat, Jan 30, 2016 at 1:25 AM, Deenar Toraskar 
wrote:

> The following should work as long as your tables are created using Spark
> SQL
>
> event_wk.repartition(2).write.partitionBy("eventDate").format("parquet"
> ).insertInto("event)
>
> If you want to stick to using "insert overwrite" for Hive compatibility,
> then you can repartition twice, instead of setting the global
> spark.sql.shuffle.partition parameter
>
> df eventwk = sqlContext.sql("some joins") // this should use the global
> shuffle partition parameter
> df eventwkRepartitioned = eventwk.repartition(2)
> eventwkRepartitioned.registerTempTable("event_wk_repartitioned")
> and use this in your insert statement.
>
> registering temp table is cheap
>
> HTH
>
>
> On 29 January 2016 at 20:26, Benyi Wang  wrote:
>
>> I want to insert into a partition table using dynamic partition, but I
>> don’t want to have 200 files for a partition because the files will be
>> small for my case.
>>
>> sqlContext.sql(  """
>> |insert overwrite table event
>> |partition(eventDate)
>> |select
>> | user,
>> | detail,
>> | eventDate
>> |from event_wk
>>   """.stripMargin)
>>
>> the table “event_wk” is created from a dataframe by registerTempTable,
>> which is built with some joins. If I set spark.sql.shuffle.partition=2, the
>> join’s performance will be bad because that property seems global.
>>
>> I can do something like this:
>>
>> event_wk.reparitition(2).write.partitionBy("eventDate").format("parquet").save(path)
>>
>> but I have to handle adding partitions by myself.
>>
>> Is there a way you can control the number of files just for this last
>> insert step?
>> ​
>>
>
>


Re: Spark Standalone cluster job to connect Hbase is Stuck

2016-02-01 Thread Ted Yu
Is the hbase-site.xml on the classpath of the worker nodes ?

Which Spark release are you using ?

Cheers

On Mon, Feb 1, 2016 at 4:25 PM, sudhir patil 
wrote:

> Spark job on Standalone cluster is Stuck, shows no logs after
> "util.AkkaUtils: Connecting to HeartbeatReceiver" on worker nodes and
> "storage.BlockmanagerInfo: Added broadcast..." on client driver side.
>
> Would be great, if you could clarify any of these ( or better all of these
> :)
> 1. Did anyone see similar issue? or any clues on what could be the reason?
> 2. How do you i increase debug or log level? to see whats actually
> happening?
> 3. Any clues or links on how to use kerborised Hbase in spark standalone?
>


Re: Spark Standalone cluster job to connect Hbase is Stuck

2016-02-01 Thread Ted Yu
>From your first email, it seems that you don't observer output from hbase
client.
Spark 1.2 was quite old, missing fixes for log4j such as SPARK-9826

Can you change the following line in HBase's conf/log4j.properties
from:
log4j.logger.org.apache.hadoop.hbase=INFO

to:
log4j.logger.org.apache.hadoop.hbase=DEBUG

on client and server sides (restart servers for the above to take effect)
to see if you would have more clue ?

On Mon, Feb 1, 2016 at 4:44 PM, sudhir patil 
wrote:

> Thanks Ted for quick reply.
>
> I am using spark 1.2, exporting Hbase conf directory containing
> hbase-site.xml in HADOOP_CLASSPATH & SPARK_CLASSPATH. Do i need to do
> anything else?
>
> Issues in connecting to kerberos Hbase through spark yarn cluster is fixed
> spark 1.4+, so i am trying if it works in spark stand alone mode in spark
> 1.2, as i cannot upgrade cluster.
> https://issues.apache.org/jira/browse/SPARK-6918
>
>
>
> On Tue, Feb 2, 2016 at 8:31 AM, Ted Yu  wrote:
>
>> Is the hbase-site.xml on the classpath of the worker nodes ?
>>
>> Which Spark release are you using ?
>>
>> Cheers
>>
>> On Mon, Feb 1, 2016 at 4:25 PM, sudhir patil 
>> wrote:
>>
>>> Spark job on Standalone cluster is Stuck, shows no logs after
>>> "util.AkkaUtils: Connecting to HeartbeatReceiver" on worker nodes and
>>> "storage.BlockmanagerInfo: Added broadcast..." on client driver side.
>>>
>>> Would be great, if you could clarify any of these ( or better all of
>>> these :)
>>> 1. Did anyone see similar issue? or any clues on what could be the
>>> reason?
>>> 2. How do you i increase debug or log level? to see whats actually
>>> happening?
>>> 3. Any clues or links on how to use kerborised Hbase in spark standalone?
>>>
>>
>>
>


Error w/ Invertable ReduceByKeyAndWindow

2016-02-01 Thread Bryan Jeffrey
Hello.

I have a reduceByKeyAndWindow function with an invertable function and
filter function defined.  I am seeing an error as follows:

"Neither previous window has value for key, nor new values found. Are you
sure your key classhashes consistently?"

We're using case classes, and so I am sure we're doing consistent hashing.
The 'reduceAdd' function is adding to a map. The 'inverseReduceFunction' is
subtracting from the map. The filter function is removing items where the
number of entries in the map is zero.  Has anyone seen this error before?

Regards,

Bryan Jeffrey


Re: Error w/ Invertable ReduceByKeyAndWindow

2016-02-01 Thread Bryan Jeffrey
Excuse me - I should have mentioned: I am running Spark 1.4.1, Scala 2.11.
I am running in streaming mode receiving data from Kafka.

Regards,

Bryan Jeffrey

On Mon, Feb 1, 2016 at 9:19 PM, Bryan Jeffrey 
wrote:

> Hello.
>
> I have a reduceByKeyAndWindow function with an invertable function and
> filter function defined.  I am seeing an error as follows:
>
> "Neither previous window has value for key, nor new values found. Are you
> sure your key classhashes consistently?"
>
> We're using case classes, and so I am sure we're doing consistent
> hashing.  The 'reduceAdd' function is adding to a map. The
> 'inverseReduceFunction' is subtracting from the map. The filter function is
> removing items where the number of entries in the map is zero.  Has anyone
> seen this error before?
>
> Regards,
>
> Bryan Jeffrey
>
>


Re: Unpersist RDD in Graphx

2016-02-01 Thread Takeshi Yamamuro
Hi,

Please call "Graph#unpersist" that releases two RDDs, vertex and edge ones.
"Graph#unpersist"  just invokes "Graph#unpersistVertices" and
"Graph#edges#unpersist";
"Graph#unpersistVertices" releases memory for vertices and
"Graph#edges#unpersist"
does memory for edges.
If blocking = true,  unpersist() waits until memory released from
BlockManager.



On Mon, Feb 1, 2016 at 8:35 AM, Zhang, Jingyu 
wrote:

> Hi, What is he best way to unpersist the RDD in graphx to release memory?
> RDD.unpersist
> or
> RDD.unpersistVertices and RDD..edges.unpersist
>
> I study the source code of Pregel.scala, Both of above were used between
> line 148 and line 150. Can anyone please tell me what the different? In
> addition, what is the difference to set blocking = false and blocking =
> true?
>
> oldMessages.unpersist(blocking = false)
> prevG.unpersistVertices(blocking = false)
> prevG.edges.unpersist(blocking = false)
>
> Thanks,
>
> Jingyu
>
> This message and its attachments may contain legally privileged or
> confidential information. It is intended solely for the named addressee. If
> you are not the addressee indicated in this message or responsible for
> delivery of the message to the addressee, you may not copy or deliver this
> message or its attachments to anyone. Rather, you should permanently delete
> this message and its attachments and kindly notify the sender by reply
> e-mail. Any content of this message and its attachments which does not
> relate to the official business of the sending company must be taken not to
> have been sent or endorsed by that company or any of its related entities.
> No warranty is made that the e-mail or attachments are free from computer
> virus or other defect.




-- 
---
Takeshi Yamamuro


Using Java spring injection with spark

2016-02-01 Thread HARSH TAKKAR
>
> Hi
>
> I am new to apache spark and big data analytics, before starting to code
> on spark data frames and rdd, i just wanted to confirm following
>
> 1. Can we create an implementation of java.api.Function as a singleton
> bean using the spring frameworks and, can it be injected using autowiring
> to other classes.
>
> 2. what is the best way to submit jobs to spark , using the api or using
> the shell script?
>
> Looking forward for your help,
>
>
> Kind Regards
> Harsh
>


Re: mapWithState: remove key

2016-02-01 Thread Udo Fholl
That makes sense. Thanks for your quick response.

On Fri, Jan 29, 2016 at 7:01 PM, Shixiong(Ryan) Zhu  wrote:

> 1. To remove a state, you need to call "state.remove()". If you return a
> None in the function, it just means don't output it as the DStream's
> output, but the state won't be removed if you don't call "state.remove()".
>
> 2. For NoSuchElementException, here is the doc for "State.get":
>
>   /**
>* Get the state if it exists, otherwise it will throw
> `java.util.NoSuchElementException`.
>* Check with `exists()` whether the state exists or not before calling
> `get()`.
>*
>* @throws java.util.NoSuchElementException If the state does not exist.
>*/
>
>
>
>
> On Fri, Jan 29, 2016 at 10:45 AM, Udo Fholl 
> wrote:
>
>> Hi,
>>
>> From the signature of the "mapWithState" method I infer that by returning
>> a "None.type" (in Scala) the key is removed from the state. Is that so?
>> Sorry if it is in the docs, but it wasn't entirely clear to me.
>>
>> I'm chaining operations and calling "mapWithState" twice (one to
>> consolidate, then I perform some operations that might, or might not
>> succeed, and invoke "mapWithState" again). I'm getting this error[1] which
>> I suppose is because I'm returning "None" in the "mapWithState" function.
>>
>> Thank you.
>>
>> Best regards,
>> Udo.
>>
>> [1]: java.util.NoSuchElementException: State is not set
>> at org.apache.spark.streaming.StateImpl.get(State.scala:150)
>>
>
>


Re: Repartition taking place for all previous windows even after checkpointing

2016-02-01 Thread Abhishek Anand
Any insights on this ?


On Fri, Jan 29, 2016 at 1:08 PM, Abhishek Anand 
wrote:

> Hi All,
>
> Can someone help me with the following doubts regarding checkpointing :
>
> My code flow is something like follows ->
>
> 1) create direct stream from kafka
> 2) repartition kafka stream
> 3)  mapToPair followed by reduceByKey
> 4)  filter
> 5)  reduceByKeyAndWindow without the inverse function
> 6)  write to cassandra
>
> Now when I restart my application from checkpoint, I see repartition and
> other steps being called for the previous windows which takes longer and
> delays my aggregations.
>
> My understanding  was that once data checkpointing is done it should not
> re-read from kafka and use the saved RDDs but guess I am wrong.
>
> Is there a way to avoid the repartition or any workaround for this.
>
> Spark Version is 1.4.0
>
> Cheers !!
> Abhi
>


When char will be availble in Spark

2016-02-01 Thread Dr Mich Talebzadeh

Hi,

I am using spark on Hive. Some tables have CHAR type characters. It is my
understanding that spark converts varchar characters to String internally
however the Spark version 1.5.2 that I have throws error when the
underlying Hive table has CHAR fields.

I wanted to when Varchar will be available in Spark.

Also Spark does not seem to understand temporary tables. For example the
following throws error

spark-sql> CREATE TEMPORARY TABLE tmp AS
 > SELECT t.calendar_month_desc, c.channel_desc,
SUM(s.amount_sold) AS TotalSales
 > FROM sales s, times t, channels c
 > WHERE s.time_id = t.time_id
 > AND   s.channel_id = c.channel_id
 > GROUP BY t.calendar_month_desc, c.channel_desc
 > ;
Error in query: Unhandled clauses: TEMPORARY 1, 2,2, 7
.
You are likely trying to use an unsupported Hive feature.";

Thanks,


Dr Mich Talebzadeh

LinkedIn 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

Sybase ASE 15 Gold Medal Award 2008
A Winning Strategy: Running the most Critical Financial Data on ASE 15
http://login.sybase.com/files/Product_Overviews/ASE-Winning-Strategy-091908.pdf
Author of the books "A Practitioner’s Guide to Upgrading to Sybase ASE
15", ISBN 978-0-9563693-0-7.
co-author "Sybase Transact SQL Guidelines Best Practices", ISBN
978-0-9759693-0-4
Publications due shortly:
Complex Event Processing in Heterogeneous Environments, ISBN:
978-0-9563693-3-8
Oracle and Sybase, Concepts and Contrasts, ISBN: 978-0-9563693-1-4, volume
one out shortly

http://talebzadehmich.wordpress.com

NOTE: The information in this email is proprietary and confidential. This
message is for the designated recipient only, if you are not the intended
recipient, you should destroy it immediately. Any information in this
message shall not be understood as given or endorsed by
Cloudtechnologypartners Ltd, its subsidiaries or their employees, unless
expressly so stated. It is the responsibility of the recipient to ensure
that this email is virus free, therefore neither Cloudtechnologypartners
Ltd, its subsidiaries nor their employees accept any responsibility.



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Failed to 'collect_set' with dataset in spark 1.6

2016-02-01 Thread Alexandr Dzhagriev
Hi,

That's another thing: that the Record case class should be outside. I ran
it as spark-submit.

Thanks, Alex.

On Mon, Feb 1, 2016 at 6:41 PM, Ted Yu  wrote:

> Running your sample in spark-shell built in master branch, I got:
>
> scala> val dataset = sc.parallelize(Seq(RecordExample(1, "apple"),
> RecordExample(2, "orange"))).toDS()
> org.apache.spark.sql.AnalysisException: Unable to generate an encoder for
> inner class `RecordExample` without access to the scope that this class was
> defined in. Try moving this class out of its parent class.;
>   at
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$$anonfun$3.applyOrElse(ExpressionEncoder.scala:316)
>   at
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$$anonfun$3.applyOrElse(ExpressionEncoder.scala:312)
>   at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:262)
>   at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:262)
>   at
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>   at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:261)
>   at
> org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:251)
>   at
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.resolve(ExpressionEncoder.scala:312)
>   at org.apache.spark.sql.Dataset.(Dataset.scala:80)
>   at org.apache.spark.sql.Dataset.(Dataset.scala:91)
>   at org.apache.spark.sql.SQLContext.createDataset(SQLContext.scala:488)
>   at
> org.apache.spark.sql.SQLImplicits.rddToDatasetHolder(SQLImplicits.scala:71)
>   ... 53 elided
>
> On Mon, Feb 1, 2016 at 9:09 AM, Alexandr Dzhagriev 
> wrote:
>
>> Hello again,
>>
>> Also I've tried the following snippet with concat_ws:
>>
>> val dataset = sc.parallelize(Seq(
>>   RecordExample(1, "apple"),
>>   RecordExample(1, "banana"),
>>   RecordExample(2, "orange"))
>> ).toDS().groupBy($"a").agg(concat_ws(",", $"b").as[String])
>>
>> dataset.take(10).foreach(println)
>>
>>
>> which also fails
>>
>> Exception in thread "main" org.apache.spark.sql.AnalysisException:
>> expression 'b' is neither present in the group by, nor is it an aggregate
>> function. Add to group by or wrap in first() (or first_value) if you don't
>> care which value you get.;
>> at
>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
>> at
>> org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:44)
>> at
>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$
>> 1.org
>> $apache$spark$sql$catalyst$analysis$CheckAnalysis$class$$anonfun$$checkValidAggregateExpression$1(CheckAnalysis.scala:130)
>>
>> Thanks, Alex.
>>
>> On Mon, Feb 1, 2016 at 6:03 PM, Alexandr Dzhagriev 
>> wrote:
>>
>>> Hi Ted,
>>>
>>> That doesn't help neither as one method delegates to another as far as I
>>> can see:
>>>
>>> def collect_list(columnName: String): Column = 
>>> collect_list(Column(columnName))
>>>
>>>
>>> Thanks, Alex
>>>
>>> On Mon, Feb 1, 2016 at 5:55 PM, Ted Yu  wrote:
>>>
 bq. agg(collect_list("b")

 Have you tried:

 agg(collect_list($"b")

 On Mon, Feb 1, 2016 at 8:50 AM, Alexandr Dzhagriev 
 wrote:

> Hello,
>
> I'm trying to run the following example code:
>
> import org.apache.spark.sql.hive.HiveContext
> import org.apache.spark.{SparkContext, SparkConf}
> import org.apache.spark.sql.functions._
>
>
> case class RecordExample(a: Int, b: String)
>
> object ArrayExample {
>   def main(args: Array[String]) {
> val conf = new SparkConf()
>
> val sc = new SparkContext(conf)
> val sqlContext = new HiveContext(sc)
>
> import sqlContext.implicits._
>
> val dataset = sc.parallelize(Seq(RecordExample(1, "apple"), 
> RecordExample(2, "orange"))).toDS()
>
> dataset.groupBy($"a").agg(collect_list("b").as[List[String]])
>
> dataset.collect()
>
>   }
>
> }
>
>
> and it fails with the following (please see the whole stack trace
> below):
>
>  Exception in thread "main" java.lang.ClassCastException:
> org.apache.spark.sql.types.ArrayType cannot be cast to
> org.apache.spark.sql.types.StructType
>
>
> Could please someone point me to the proper way to do that or confirm
> it's a bug?
>
> Thank you and here is the whole stacktrace:
>
> Exception in thread "main" java.lang.ClassCastException:
> org.apache.spark.sql.types.ArrayType cannot be cast to
> org.apache.spark.sql.types.StructType
> at org.apache.spark.sql.catalyst.expressions.GetStructField.org
> $apache$spark$sql$catalyst$expressions$GetStructField$$field$lzycompute(complexTypeExtractors.scala:107)
> at 

Re: Failed to 'collect_set' with dataset in spark 1.6

2016-02-01 Thread Ted Yu
Got around the previous error by adding:

scala> implicit val kryoEncoder = Encoders.kryo[RecordExample]
kryoEncoder: org.apache.spark.sql.Encoder[RecordExample] = class[value[0]:
binary]

On Mon, Feb 1, 2016 at 9:55 AM, Alexandr Dzhagriev  wrote:

> Hi,
>
> That's another thing: that the Record case class should be outside. I ran
> it as spark-submit.
>
> Thanks, Alex.
>
> On Mon, Feb 1, 2016 at 6:41 PM, Ted Yu  wrote:
>
>> Running your sample in spark-shell built in master branch, I got:
>>
>> scala> val dataset = sc.parallelize(Seq(RecordExample(1, "apple"),
>> RecordExample(2, "orange"))).toDS()
>> org.apache.spark.sql.AnalysisException: Unable to generate an encoder for
>> inner class `RecordExample` without access to the scope that this class was
>> defined in. Try moving this class out of its parent class.;
>>   at
>> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$$anonfun$3.applyOrElse(ExpressionEncoder.scala:316)
>>   at
>> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$$anonfun$3.applyOrElse(ExpressionEncoder.scala:312)
>>   at
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:262)
>>   at
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:262)
>>   at
>> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>>   at
>> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:261)
>>   at
>> org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:251)
>>   at
>> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.resolve(ExpressionEncoder.scala:312)
>>   at org.apache.spark.sql.Dataset.(Dataset.scala:80)
>>   at org.apache.spark.sql.Dataset.(Dataset.scala:91)
>>   at org.apache.spark.sql.SQLContext.createDataset(SQLContext.scala:488)
>>   at
>> org.apache.spark.sql.SQLImplicits.rddToDatasetHolder(SQLImplicits.scala:71)
>>   ... 53 elided
>>
>> On Mon, Feb 1, 2016 at 9:09 AM, Alexandr Dzhagriev 
>> wrote:
>>
>>> Hello again,
>>>
>>> Also I've tried the following snippet with concat_ws:
>>>
>>> val dataset = sc.parallelize(Seq(
>>>   RecordExample(1, "apple"),
>>>   RecordExample(1, "banana"),
>>>   RecordExample(2, "orange"))
>>> ).toDS().groupBy($"a").agg(concat_ws(",", $"b").as[String])
>>>
>>> dataset.take(10).foreach(println)
>>>
>>>
>>> which also fails
>>>
>>> Exception in thread "main" org.apache.spark.sql.AnalysisException:
>>> expression 'b' is neither present in the group by, nor is it an aggregate
>>> function. Add to group by or wrap in first() (or first_value) if you don't
>>> care which value you get.;
>>> at
>>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
>>> at
>>> org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:44)
>>> at
>>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$
>>> 1.org
>>> $apache$spark$sql$catalyst$analysis$CheckAnalysis$class$$anonfun$$checkValidAggregateExpression$1(CheckAnalysis.scala:130)
>>>
>>> Thanks, Alex.
>>>
>>> On Mon, Feb 1, 2016 at 6:03 PM, Alexandr Dzhagriev 
>>> wrote:
>>>
 Hi Ted,

 That doesn't help neither as one method delegates to another as far as
 I can see:

 def collect_list(columnName: String): Column = 
 collect_list(Column(columnName))


 Thanks, Alex

 On Mon, Feb 1, 2016 at 5:55 PM, Ted Yu  wrote:

> bq. agg(collect_list("b")
>
> Have you tried:
>
> agg(collect_list($"b")
>
> On Mon, Feb 1, 2016 at 8:50 AM, Alexandr Dzhagriev 
> wrote:
>
>> Hello,
>>
>> I'm trying to run the following example code:
>>
>> import org.apache.spark.sql.hive.HiveContext
>> import org.apache.spark.{SparkContext, SparkConf}
>> import org.apache.spark.sql.functions._
>>
>>
>> case class RecordExample(a: Int, b: String)
>>
>> object ArrayExample {
>>   def main(args: Array[String]) {
>> val conf = new SparkConf()
>>
>> val sc = new SparkContext(conf)
>> val sqlContext = new HiveContext(sc)
>>
>> import sqlContext.implicits._
>>
>> val dataset = sc.parallelize(Seq(RecordExample(1, "apple"), 
>> RecordExample(2, "orange"))).toDS()
>>
>> dataset.groupBy($"a").agg(collect_list("b").as[List[String]])
>>
>> dataset.collect()
>>
>>   }
>>
>> }
>>
>>
>> and it fails with the following (please see the whole stack trace
>> below):
>>
>>  Exception in thread "main" java.lang.ClassCastException:
>> org.apache.spark.sql.types.ArrayType cannot be cast to
>> org.apache.spark.sql.types.StructType
>>
>>
>> Could please someone point me to the proper way to do that or confirm
>> it's a bug?
>>
>> Thank you and here is the whole 

Re: Using Java spring injection with spark

2016-02-01 Thread HARSH TAKKAR
Hi

Please can anyone reply on this.

On Mon, 1 Feb 2016, 4:28 p.m. HARSH TAKKAR  wrote:

> Hi
>>
>> I am new to apache spark and big data analytics, before starting to code
>> on spark data frames and rdd, i just wanted to confirm following
>>
>> 1. Can we create an implementation of java.api.Function as a singleton
>> bean using the spring frameworks and, can it be injected using autowiring
>> to other classes.
>>
>> 2. what is the best way to submit jobs to spark , using the api or using
>> the shell script?
>>
>> Looking forward for your help,
>>
>>
>> Kind Regards
>> Harsh
>>
>


Re: Guidelines for writing SPARK packages

2016-02-01 Thread David Russell
Hi Praveen,

The basic requirements for releasing a Spark package on
spark-packages.org are as follows:

1. The package content must be hosted by GitHub in a public repo under
the owner's account.
2. The repo name must match the package name.
3. The master branch of the repo must contain "README.md" and "LICENSE".

Per the doc on spark-packages.org site an example package that meets
those requirements can be found at
https://github.com/databricks/spark-avro. My own recently released
SAMBA package also meets these requirements:
https://github.com/onetapbeyond/lambda-spark-executor.

As you can see there is nothing in this list of requirements that
demands the implementation of specific interfaces. What you'll need to
implement will depend entirely on what you want to accomplish. If you
want to register a release for your package you will also need to push
the artifacts for your package to Maven central.

David


On Mon, Feb 1, 2016 at 7:03 AM, Praveen Devarao  wrote:
> Hi,
>
> Is there any guidelines or specs to write a Spark package? I would
> like to implement a spark package and would like to know the way it needs to
> be structured (implement some interfaces etc) so that it can plug into Spark
> for extended functionality.
>
> Could any one help me point to docs or links on the above?
>
> Thanking You
>
> Praveen Devarao



-- 
"All that is gold does not glitter, Not all those who wander are lost."

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Can't view executor logs in web UI on Windows

2016-02-01 Thread Mark Pavey
I am running Spark on Windows. When I try to view the Executor logs in the UI
I get the following error:

HTTP ERROR 500

Problem accessing /logPage/. Reason:

Server Error
Caused by:

java.net.URISyntaxException: Illegal character in path at index 1:
.\work/app-20160129154716-0038/2/
at java.net.URI$Parser.fail(Unknown Source)
at java.net.URI$Parser.checkChars(Unknown Source)
at java.net.URI$Parser.parseHierarchical(Unknown Source)
at java.net.URI$Parser.parse(Unknown Source)
at java.net.URI.(Unknown Source)
at org.apache.spark.deploy.worker.ui.LogPage.getLog(LogPage.scala:141)
at org.apache.spark.deploy.worker.ui.LogPage.render(LogPage.scala:78)
at org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:79)
at org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:79)
at org.apache.spark.ui.JettyUtils$$anon$1.doGet(JettyUtils.scala:69)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:735)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:848)
at
org.spark-project.jetty.servlet.ServletHolder.handle(ServletHolder.java:684)
at
org.spark-project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:501)
at
org.spark-project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1086)
at
org.spark-project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:428)
at
org.spark-project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1020)
at
org.spark-project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135)
at
org.spark-project.jetty.server.handler.GzipHandler.handle(GzipHandler.java:264)
at
org.spark-project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:255)
at
org.spark-project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:116)
at org.spark-project.jetty.server.Server.handle(Server.java:370)
at
org.spark-project.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpConnection.java:494)
at
org.spark-project.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpConnection.java:971)
at
org.spark-project.jetty.server.AbstractHttpConnection$RequestHandler.headerComplete(AbstractHttpConnection.java:1033)
at 
org.spark-project.jetty.http.HttpParser.parseNext(HttpParser.java:644)
at
org.spark-project.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)
at
org.spark-project.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java:82)
at
org.spark-project.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.java:667)
at
org.spark-project.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.java:52)
at
org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
at
org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
at java.lang.Thread.run(Unknown Source)



Looking at the source code for
org.apache.spark.deploy.worker.ui.LogPage.getLog reveals the following:
 - At line 141 the constructor of java.net.URI is called with the path to
the log directory as a String argument. This string
(".\work/app-20160129154716-0038/2/" in example above) contains a backslash,
which is an illegal character for the URI constructor.
 - The component of the path containing the backslash is created at line 71
by calling the getPath method on a java.io.File object. Because it is
running on Windows it uses the default Windows file separator, which is a
backslash.

I am using Spark 1.5.1 but the source code appears unchanged in 1.6.0.

I haven't been able to find an open issue for this but if there is one could
possibly submit a pull request for it.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Can-t-view-executor-logs-in-web-UI-on-Windows-tp26122.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [MLlib] What is the best way to forecast the next month page visit?

2016-02-01 Thread Jorge Machado
Hi Guru,

So First transform your Name pages with OneHotEncoder ( 
https://spark.apache.org/docs/latest/ml-features.html#onehotencoder 
) then 
make the same thing for months:

You will end with something like: 
(first tree are the pagename, the other the month,)
(0,0,1,0,0,1) 

then you have your label that is what you want to predict. At the end you will 
have an LabeledPoint with (1 -> (0,0,1,0,0,1)) this will represent (1 
-> (PageA, UV_NOV))
After that try a regression tree with 

val model = DecisionTree.trainRegressor(trainingData, categoricalFeaturesInfo, 
impurity,maxDepth, maxBins)


Regards
Jorge

> On 01/02/2016, at 12:29, diplomatic Guru  wrote:
> 
> Any suggestions please?
> 
> 
> On 29 January 2016 at 22:31, diplomatic Guru  > wrote:
> Hello guys,
> 
> I'm trying understand how I could predict the next month page views based on 
> the previous access pattern.
> 
> For example, I've collected statistics on page views:
> 
> e.g.
> Page,UniqueView
> -
> pageA, 1
> pageB, 999
> ...
> pageZ,200
> 
> I aggregate the statistics monthly.
> 
> I've prepared a file containing last 3 months as this:
> 
> e.g.
> Page,UV_NOV, UV_DEC, UV_JAN
> ---
> pageA, 1,9989,11000
> pageB, 999,500,700
> ...
> pageZ,200,50,34
> 
> 
> Based on above information, I want to predict the next month (FEB).
> 
> Which alogrithm do you think will suit most, I think linear regression is the 
> safe bet. However, I'm struggling to prepare this data for LR ML, especially 
> how do I prepare the X,Y relationship.
> 
> The Y is easy (uniqiue visitors), but not sure about the X(it should be 
> Page,right). However, how do I plot those three months of data.
> 
> Could you give me an example based on above example data?
> 
> 
> 
> Page,UV_NOV, UV_DEC, UV_JAN
> ---
> 1, 1,9989,11000
> 2, 999,500,700
> ...
> 26,200,50,34
> 
> 
> 
> 
> 



AFTSurvivalRegression Prediction and QuantilProbabilities

2016-02-01 Thread Christine Jula
Hello,

I would like to fit a survial model with AFTSurvival Regression. My question 
here is what kind of prediction do I get with this? In the package survreg in R 
I can set a type of prediction ("response", "link", "lp", "linear", "terms", 
"quantile", "uquantile").

Besides, what can I manipulate with the parameter quantileProbabilities?

Regards,
Christine


Spark MLLlib Ideal way to convert categorical features into LabeledPoint RDD?

2016-02-01 Thread unk1102
Hi I have dataset which is completely categorical and it does not contain
even one column as numerical. Now I want to apply classification using Naive
Bayes I have to predict whether given alert is actionable or not using
YES/NO I have the following example of my dataset

DayOfWeek(int),AlertType(String),Application(String),Router(String),Symptom(String),Action(String)
0,Network1,App1,Router1,Not reachable,YES
0,Network1,App2,Router5,Not reachable,NO

I am using Spark 1.6 and I see there is StringIndexer class which is used
OneHotEncoding example given here
https://spark.apache.org/docs/latest/ml-features.html#onehotencoder but I
have almost 1 unique words/features to map into continuous how do I
create such a huge map. I have my dataset in csv file please guide me how do
I convert my all the categorical features in csv file and use it in naive
bayes model.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-MLLlib-Ideal-way-to-convert-categorical-features-into-LabeledPoint-RDD-tp26125.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Failed to 'collect_set' with dataset in spark 1.6

2016-02-01 Thread Ted Yu
Running your sample in spark-shell built in master branch, I got:

scala> val dataset = sc.parallelize(Seq(RecordExample(1, "apple"),
RecordExample(2, "orange"))).toDS()
org.apache.spark.sql.AnalysisException: Unable to generate an encoder for
inner class `RecordExample` without access to the scope that this class was
defined in. Try moving this class out of its parent class.;
  at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$$anonfun$3.applyOrElse(ExpressionEncoder.scala:316)
  at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$$anonfun$3.applyOrElse(ExpressionEncoder.scala:312)
  at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:262)
  at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:262)
  at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
  at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:261)
  at
org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:251)
  at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.resolve(ExpressionEncoder.scala:312)
  at org.apache.spark.sql.Dataset.(Dataset.scala:80)
  at org.apache.spark.sql.Dataset.(Dataset.scala:91)
  at org.apache.spark.sql.SQLContext.createDataset(SQLContext.scala:488)
  at
org.apache.spark.sql.SQLImplicits.rddToDatasetHolder(SQLImplicits.scala:71)
  ... 53 elided

On Mon, Feb 1, 2016 at 9:09 AM, Alexandr Dzhagriev  wrote:

> Hello again,
>
> Also I've tried the following snippet with concat_ws:
>
> val dataset = sc.parallelize(Seq(
>   RecordExample(1, "apple"),
>   RecordExample(1, "banana"),
>   RecordExample(2, "orange"))
> ).toDS().groupBy($"a").agg(concat_ws(",", $"b").as[String])
>
> dataset.take(10).foreach(println)
>
>
> which also fails
>
> Exception in thread "main" org.apache.spark.sql.AnalysisException:
> expression 'b' is neither present in the group by, nor is it an aggregate
> function. Add to group by or wrap in first() (or first_value) if you don't
> care which value you get.;
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
> at
> org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:44)
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$
> 1.org
> $apache$spark$sql$catalyst$analysis$CheckAnalysis$class$$anonfun$$checkValidAggregateExpression$1(CheckAnalysis.scala:130)
>
> Thanks, Alex.
>
> On Mon, Feb 1, 2016 at 6:03 PM, Alexandr Dzhagriev 
> wrote:
>
>> Hi Ted,
>>
>> That doesn't help neither as one method delegates to another as far as I
>> can see:
>>
>> def collect_list(columnName: String): Column = 
>> collect_list(Column(columnName))
>>
>>
>> Thanks, Alex
>>
>> On Mon, Feb 1, 2016 at 5:55 PM, Ted Yu  wrote:
>>
>>> bq. agg(collect_list("b")
>>>
>>> Have you tried:
>>>
>>> agg(collect_list($"b")
>>>
>>> On Mon, Feb 1, 2016 at 8:50 AM, Alexandr Dzhagriev 
>>> wrote:
>>>
 Hello,

 I'm trying to run the following example code:

 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.{SparkContext, SparkConf}
 import org.apache.spark.sql.functions._


 case class RecordExample(a: Int, b: String)

 object ArrayExample {
   def main(args: Array[String]) {
 val conf = new SparkConf()

 val sc = new SparkContext(conf)
 val sqlContext = new HiveContext(sc)

 import sqlContext.implicits._

 val dataset = sc.parallelize(Seq(RecordExample(1, "apple"), 
 RecordExample(2, "orange"))).toDS()

 dataset.groupBy($"a").agg(collect_list("b").as[List[String]])

 dataset.collect()

   }

 }


 and it fails with the following (please see the whole stack trace
 below):

  Exception in thread "main" java.lang.ClassCastException:
 org.apache.spark.sql.types.ArrayType cannot be cast to
 org.apache.spark.sql.types.StructType


 Could please someone point me to the proper way to do that or confirm
 it's a bug?

 Thank you and here is the whole stacktrace:

 Exception in thread "main" java.lang.ClassCastException:
 org.apache.spark.sql.types.ArrayType cannot be cast to
 org.apache.spark.sql.types.StructType
 at org.apache.spark.sql.catalyst.expressions.GetStructField.org
 $apache$spark$sql$catalyst$expressions$GetStructField$$field$lzycompute(complexTypeExtractors.scala:107)
 at org.apache.spark.sql.catalyst.expressions.GetStructField.org
 $apache$spark$sql$catalyst$expressions$GetStructField$$field(complexTypeExtractors.scala:107)
 at
 org.apache.spark.sql.catalyst.expressions.GetStructField.dataType(complexTypeExtractors.scala:109)
 at
 org.apache.spark.sql.catalyst.analysis.ResolveUpCast$$anonfun$apply$24.applyOrElse(Analyzer.scala:1214)
 at

Re: Failed to 'collect_set' with dataset in spark 1.6

2016-02-01 Thread Alexandr Dzhagriev
Hi Ted,

That doesn't help neither as one method delegates to another as far as I
can see:

def collect_list(columnName: String): Column = collect_list(Column(columnName))


Thanks, Alex

On Mon, Feb 1, 2016 at 5:55 PM, Ted Yu  wrote:

> bq. agg(collect_list("b")
>
> Have you tried:
>
> agg(collect_list($"b")
>
> On Mon, Feb 1, 2016 at 8:50 AM, Alexandr Dzhagriev 
> wrote:
>
>> Hello,
>>
>> I'm trying to run the following example code:
>>
>> import org.apache.spark.sql.hive.HiveContext
>> import org.apache.spark.{SparkContext, SparkConf}
>> import org.apache.spark.sql.functions._
>>
>>
>> case class RecordExample(a: Int, b: String)
>>
>> object ArrayExample {
>>   def main(args: Array[String]) {
>> val conf = new SparkConf()
>>
>> val sc = new SparkContext(conf)
>> val sqlContext = new HiveContext(sc)
>>
>> import sqlContext.implicits._
>>
>> val dataset = sc.parallelize(Seq(RecordExample(1, "apple"), 
>> RecordExample(2, "orange"))).toDS()
>>
>> dataset.groupBy($"a").agg(collect_list("b").as[List[String]])
>>
>> dataset.collect()
>>
>>   }
>>
>> }
>>
>>
>> and it fails with the following (please see the whole stack trace below):
>>
>>  Exception in thread "main" java.lang.ClassCastException:
>> org.apache.spark.sql.types.ArrayType cannot be cast to
>> org.apache.spark.sql.types.StructType
>>
>>
>> Could please someone point me to the proper way to do that or confirm
>> it's a bug?
>>
>> Thank you and here is the whole stacktrace:
>>
>> Exception in thread "main" java.lang.ClassCastException:
>> org.apache.spark.sql.types.ArrayType cannot be cast to
>> org.apache.spark.sql.types.StructType
>> at org.apache.spark.sql.catalyst.expressions.GetStructField.org
>> $apache$spark$sql$catalyst$expressions$GetStructField$$field$lzycompute(complexTypeExtractors.scala:107)
>> at org.apache.spark.sql.catalyst.expressions.GetStructField.org
>> $apache$spark$sql$catalyst$expressions$GetStructField$$field(complexTypeExtractors.scala:107)
>> at
>> org.apache.spark.sql.catalyst.expressions.GetStructField.dataType(complexTypeExtractors.scala:109)
>> at
>> org.apache.spark.sql.catalyst.analysis.ResolveUpCast$$anonfun$apply$24.applyOrElse(Analyzer.scala:1214)
>> at
>> org.apache.spark.sql.catalyst.analysis.ResolveUpCast$$anonfun$apply$24.applyOrElse(Analyzer.scala:1211)
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:243)
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:243)
>> at
>> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:53)
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:242)
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:248)
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:248)
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:265)
>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:742)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
>> at
>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
>> at
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
>> at
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
>> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:308)
>> at scala.collection.AbstractIterator.to(Iterator.scala:1194)
>> at
>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:300)
>> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1194)
>> at
>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:287)
>> at scala.collection.AbstractIterator.toArray(Iterator.scala:1194)
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:305)
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:248)
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:248)
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:248)
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:265)
>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:742)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
>> at
>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
>> at
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
>> at
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
>> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:308)
>> at scala.collection.AbstractIterator.to(Iterator.scala:1194)
>> at
>> 

Spark Executor retries infinitely

2016-02-01 Thread Prabhu Joseph
Hi All,

  When a Spark job (Spark-1.5.2) is submitted with a single executor and if
user passes some wrong JVM arguments with spark.executor.extraJavaOptions,
the first executor fails. But the job keeps on retrying, creating a new
executor and failing every tim*e, *until CTRL-C is pressed*. *Do we have
configuration to limit the retry attempts.

*Example:*

./spark-submit --class SimpleApp --master "spark://10.10.72.145:7077"
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails
-XX:+PrintGCTimeStamps -XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35
-XX:ConcGCThreads=16" /SPARK/SimpleApp.jar

Executor fails with

Error occurred during initialization of VM
Can't have more ConcGCThreads than ParallelGCThreads.

But the job does not exit, keeps on creating executors and retrying.
..
16/02/01 06:54:28 INFO SparkDeploySchedulerBackend: *Granted executor ID
app-20160201065319-0014/2846* on hostPort 10.10.72.145:36558 with 12 cores,
2.0 GB RAM
16/02/01 06:54:28 INFO AppClient$ClientEndpoint: Executor updated:
app-20160201065319-0014/2846 is now LOADING
16/02/01 06:54:28 INFO AppClient$ClientEndpoint: Executor updated:
app-20160201065319-0014/2846 is now RUNNING
16/02/01 06:54:28 INFO AppClient$ClientEndpoint: Executor updated:
app-20160201065319-0014/2846 is now EXITED (Command exited with code 1)
16/02/01 06:54:28 INFO SparkDeploySchedulerBackend: Executor
app-20160201065319-0014/2846 removed: Command exited with code 1
16/02/01 06:54:28 INFO SparkDeploySchedulerBackend: Asked to remove
non-existent executor 2846
16/02/01 06:54:28 INFO AppClient$ClientEndpoint: *Executor added:
app-20160201065319-0014/2847* on worker-20160131230345-10.10.72.145-36558 (
10.10.72.145:36558) with 12 cores
16/02/01 06:54:28 INFO SparkDeploySchedulerBackend: Granted executor ID
app-20160201065319-0014/2847 on hostPort 10.10.72.145:36558 with 12 cores,
2.0 GB RAM
16/02/01 06:54:28 INFO AppClient$ClientEndpoint: Executor updated:
app-20160201065319-0014/2847 is now LOADING
16/02/01 06:54:28 INFO AppClient$ClientEndpoint: Executor updated:
app-20160201065319-0014/2847 is now EXITED (Command exited with code 1)
16/02/01 06:54:28 INFO SparkDeploySchedulerBackend: Executor
app-20160201065319-0014/2847 removed: Command exited with code 1
16/02/01 06:54:28 INFO SparkDeploySchedulerBackend: Asked to remove
non-existent executor 2847
16/02/01 06:54:28 INFO AppClient$ClientEndpoint:* Executor added:
app-20160201065319-0014/2848* on worker-20160131230345-10.10.72.145-36558 (
10.10.72.145:36558) with 12 cores
16/02/01 06:54:28 INFO SparkDeploySchedulerBackend: Granted executor ID
app-20160201065319-0014/2848 on hostPort 10.10.72.145:36558 with 12 cores,
2.0 GB RAM
16/02/01 06:54:28 INFO AppClient$ClientEndpoint: Executor updated:
app-20160201065319-0014/2848 is now LOADING
16/02/01 06:54:28 INFO AppClient$ClientEndpoint: Executor updated:
app-20160201065319-0014/2848 is now RUNNING




Thanks,
Prabhu Joseph


Guidelines for writing SPARK packages

2016-02-01 Thread Praveen Devarao
Hi,

Is there any guidelines or specs to write a Spark package? I would 
like to implement a spark package and would like to know the way it needs 
to be structured (implement some interfaces etc) so that it can plug into 
Spark for extended functionality.

Could any one help me point to docs or links on the above?

Thanking You

Praveen Devarao



[ANNOUNCE] New SAMBA Package = Spark + AWS Lambda

2016-02-01 Thread David Russell
Hi all,

Just sharing news of the release of a newly available Spark package, SAMBA
.


https://github.com/onetapbeyond/lambda-spark-executor

SAMBA is an Apache Spark package offering seamless integration with the AWS
Lambda  compute service for Spark batch and
streaming applications on the JVM.

Within traditional Spark deployments RDD tasks are executed using fixed
compute resources on worker nodes within the Spark cluster. With SAMBA,
application developers can delegate selected RDD tasks to execute using
on-demand AWS Lambda compute infrastructure in the cloud.

Not unlike the recently released ROSE
 package that
extends the capabilities of traditional Spark applications with support for
CRAN R analytics, SAMBA provides another (hopefully) useful extension for
Spark application developers on the JVM.

SAMBA Spark Package: https://github.com/onetapbeyond/lambda-spark-executor

ROSE Spark Package: https://github.com/onetapbeyond/opencpu-spark-executor


Questions, suggestions, feedback welcome.

David

-- 
"*All that is gold does not glitter,** Not all those who wander are lost."*


Re: [MLlib] What is the best way to forecast the next month page visit?

2016-02-01 Thread diplomatic Guru
Any suggestions please?


On 29 January 2016 at 22:31, diplomatic Guru 
wrote:

> Hello guys,
>
> I'm trying understand how I could predict the next month page views based
> on the previous access pattern.
>
> For example, I've collected statistics on page views:
>
> e.g.
> Page,UniqueView
> -
> pageA, 1
> pageB, 999
> ...
> pageZ,200
>
> I aggregate the statistics monthly.
>
> I've prepared a file containing last 3 months as this:
>
> e.g.
> Page,UV_NOV, UV_DEC, UV_JAN
> ---
> pageA, 1,9989,11000
> pageB, 999,500,700
> ...
> pageZ,200,50,34
>
>
> Based on above information, I want to predict the next month (FEB).
>
> Which alogrithm do you think will suit most, I think linear regression is
> the safe bet. However, I'm struggling to prepare this data for LR ML,
> especially how do I prepare the X,Y relationship.
>
> The Y is easy (uniqiue visitors), but not sure about the X(it should be
> Page,right). However, how do I plot those three months of data.
>
> Could you give me an example based on above example data?
>
>
>
> Page,UV_NOV, UV_DEC, UV_JAN
> ---
> 1, 1,9989,11000
> 2, 999,500,700
> ...
> 26,200,50,34
>
>
>
>
>


Failed to 'collect_set' with dataset in spark 1.6

2016-02-01 Thread Alexandr Dzhagriev
Hello,

I'm trying to run the following example code:

import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.functions._


case class RecordExample(a: Int, b: String)

object ArrayExample {
  def main(args: Array[String]) {
val conf = new SparkConf()

val sc = new SparkContext(conf)
val sqlContext = new HiveContext(sc)

import sqlContext.implicits._

val dataset = sc.parallelize(Seq(RecordExample(1, "apple"),
RecordExample(2, "orange"))).toDS()

dataset.groupBy($"a").agg(collect_list("b").as[List[String]])

dataset.collect()

  }

}


and it fails with the following (please see the whole stack trace below):

 Exception in thread "main" java.lang.ClassCastException:
org.apache.spark.sql.types.ArrayType cannot be cast to
org.apache.spark.sql.types.StructType


Could please someone point me to the proper way to do that or confirm it's
a bug?

Thank you and here is the whole stacktrace:

Exception in thread "main" java.lang.ClassCastException:
org.apache.spark.sql.types.ArrayType cannot be cast to
org.apache.spark.sql.types.StructType
at org.apache.spark.sql.catalyst.expressions.GetStructField.org
$apache$spark$sql$catalyst$expressions$GetStructField$$field$lzycompute(complexTypeExtractors.scala:107)
at org.apache.spark.sql.catalyst.expressions.GetStructField.org
$apache$spark$sql$catalyst$expressions$GetStructField$$field(complexTypeExtractors.scala:107)
at
org.apache.spark.sql.catalyst.expressions.GetStructField.dataType(complexTypeExtractors.scala:109)
at
org.apache.spark.sql.catalyst.analysis.ResolveUpCast$$anonfun$apply$24.applyOrElse(Analyzer.scala:1214)
at
org.apache.spark.sql.catalyst.analysis.ResolveUpCast$$anonfun$apply$24.applyOrElse(Analyzer.scala:1211)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:243)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:243)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:53)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:242)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:248)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:248)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:265)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:308)
at scala.collection.AbstractIterator.to(Iterator.scala:1194)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:300)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1194)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:287)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1194)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:305)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:248)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:248)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:248)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:265)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:308)
at scala.collection.AbstractIterator.to(Iterator.scala:1194)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:300)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1194)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:287)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1194)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:305)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:248)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:248)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:248)
at

Re: Spark Executor retries infinitely

2016-02-01 Thread Prabhu Joseph
Thanks Ted. My concern is how to avoid these kind of user errors on a
production cluster, it would be better if Spark handles this instead of
creating an Executor for every second and fails and overloading the Spark
Master. Shall i report a Spark JIRA to handle this.


Thanks,
Prabhu Joseph


On Mon, Feb 1, 2016 at 9:09 PM, Ted Yu  wrote:

> I haven't found config knob for controlling the retry count after brief
> search.
>
> According to
> http://www.oracle.com/technetwork/articles/java/g1gc-1984535.html ,
> default value for -XX:ParallelGCThreads= seems to be 8.
> This seems to explain why you got the VM initialization error.
>
> FYI
>
> On Mon, Feb 1, 2016 at 4:16 AM, Prabhu Joseph 
> wrote:
>
>> Hi All,
>>
>>   When a Spark job (Spark-1.5.2) is submitted with a single executor and
>> if user passes some wrong JVM arguments with
>> spark.executor.extraJavaOptions, the first executor fails. But the job
>> keeps on retrying, creating a new executor and failing every tim*e, *until
>> CTRL-C is pressed*. *Do we have configuration to limit the retry
>> attempts.
>>
>> *Example:*
>>
>> ./spark-submit --class SimpleApp --master "spark://10.10.72.145:7077"
>> --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails
>> -XX:+PrintGCTimeStamps -XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35
>> -XX:ConcGCThreads=16" /SPARK/SimpleApp.jar
>>
>> Executor fails with
>>
>> Error occurred during initialization of VM
>> Can't have more ConcGCThreads than ParallelGCThreads.
>>
>> But the job does not exit, keeps on creating executors and retrying.
>> ..
>> 16/02/01 06:54:28 INFO SparkDeploySchedulerBackend: *Granted executor ID
>> app-20160201065319-0014/2846* on hostPort 10.10.72.145:36558 with 12
>> cores, 2.0 GB RAM
>> 16/02/01 06:54:28 INFO AppClient$ClientEndpoint: Executor updated:
>> app-20160201065319-0014/2846 is now LOADING
>> 16/02/01 06:54:28 INFO AppClient$ClientEndpoint: Executor updated:
>> app-20160201065319-0014/2846 is now RUNNING
>> 16/02/01 06:54:28 INFO AppClient$ClientEndpoint: Executor updated:
>> app-20160201065319-0014/2846 is now EXITED (Command exited with code 1)
>> 16/02/01 06:54:28 INFO SparkDeploySchedulerBackend: Executor
>> app-20160201065319-0014/2846 removed: Command exited with code 1
>> 16/02/01 06:54:28 INFO SparkDeploySchedulerBackend: Asked to remove
>> non-existent executor 2846
>> 16/02/01 06:54:28 INFO AppClient$ClientEndpoint: *Executor added:
>> app-20160201065319-0014/2847* on
>> worker-20160131230345-10.10.72.145-36558 (10.10.72.145:36558) with 12
>> cores
>> 16/02/01 06:54:28 INFO SparkDeploySchedulerBackend: Granted executor ID
>> app-20160201065319-0014/2847 on hostPort 10.10.72.145:36558 with 12
>> cores, 2.0 GB RAM
>> 16/02/01 06:54:28 INFO AppClient$ClientEndpoint: Executor updated:
>> app-20160201065319-0014/2847 is now LOADING
>> 16/02/01 06:54:28 INFO AppClient$ClientEndpoint: Executor updated:
>> app-20160201065319-0014/2847 is now EXITED (Command exited with code 1)
>> 16/02/01 06:54:28 INFO SparkDeploySchedulerBackend: Executor
>> app-20160201065319-0014/2847 removed: Command exited with code 1
>> 16/02/01 06:54:28 INFO SparkDeploySchedulerBackend: Asked to remove
>> non-existent executor 2847
>> 16/02/01 06:54:28 INFO AppClient$ClientEndpoint:* Executor added:
>> app-20160201065319-0014/2848* on
>> worker-20160131230345-10.10.72.145-36558 (10.10.72.145:36558) with 12
>> cores
>> 16/02/01 06:54:28 INFO SparkDeploySchedulerBackend: Granted executor ID
>> app-20160201065319-0014/2848 on hostPort 10.10.72.145:36558 with 12
>> cores, 2.0 GB RAM
>> 16/02/01 06:54:28 INFO AppClient$ClientEndpoint: Executor updated:
>> app-20160201065319-0014/2848 is now LOADING
>> 16/02/01 06:54:28 INFO AppClient$ClientEndpoint: Executor updated:
>> app-20160201065319-0014/2848 is now RUNNING
>> 
>>
>>
>>
>> Thanks,
>> Prabhu Joseph
>>
>>
>>
>


Re: Failed to 'collect_set' with dataset in spark 1.6

2016-02-01 Thread Alexandr Dzhagriev
Hello again,

Also I've tried the following snippet with concat_ws:

val dataset = sc.parallelize(Seq(
  RecordExample(1, "apple"),
  RecordExample(1, "banana"),
  RecordExample(2, "orange"))
).toDS().groupBy($"a").agg(concat_ws(",", $"b").as[String])

dataset.take(10).foreach(println)


which also fails

Exception in thread "main" org.apache.spark.sql.AnalysisException:
expression 'b' is neither present in the group by, nor is it an aggregate
function. Add to group by or wrap in first() (or first_value) if you don't
care which value you get.;
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
at
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:44)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$
1.org
$apache$spark$sql$catalyst$analysis$CheckAnalysis$class$$anonfun$$checkValidAggregateExpression$1(CheckAnalysis.scala:130)

Thanks, Alex.

On Mon, Feb 1, 2016 at 6:03 PM, Alexandr Dzhagriev  wrote:

> Hi Ted,
>
> That doesn't help neither as one method delegates to another as far as I
> can see:
>
> def collect_list(columnName: String): Column = 
> collect_list(Column(columnName))
>
>
> Thanks, Alex
>
> On Mon, Feb 1, 2016 at 5:55 PM, Ted Yu  wrote:
>
>> bq. agg(collect_list("b")
>>
>> Have you tried:
>>
>> agg(collect_list($"b")
>>
>> On Mon, Feb 1, 2016 at 8:50 AM, Alexandr Dzhagriev 
>> wrote:
>>
>>> Hello,
>>>
>>> I'm trying to run the following example code:
>>>
>>> import org.apache.spark.sql.hive.HiveContext
>>> import org.apache.spark.{SparkContext, SparkConf}
>>> import org.apache.spark.sql.functions._
>>>
>>>
>>> case class RecordExample(a: Int, b: String)
>>>
>>> object ArrayExample {
>>>   def main(args: Array[String]) {
>>> val conf = new SparkConf()
>>>
>>> val sc = new SparkContext(conf)
>>> val sqlContext = new HiveContext(sc)
>>>
>>> import sqlContext.implicits._
>>>
>>> val dataset = sc.parallelize(Seq(RecordExample(1, "apple"), 
>>> RecordExample(2, "orange"))).toDS()
>>>
>>> dataset.groupBy($"a").agg(collect_list("b").as[List[String]])
>>>
>>> dataset.collect()
>>>
>>>   }
>>>
>>> }
>>>
>>>
>>> and it fails with the following (please see the whole stack trace below):
>>>
>>>  Exception in thread "main" java.lang.ClassCastException:
>>> org.apache.spark.sql.types.ArrayType cannot be cast to
>>> org.apache.spark.sql.types.StructType
>>>
>>>
>>> Could please someone point me to the proper way to do that or confirm
>>> it's a bug?
>>>
>>> Thank you and here is the whole stacktrace:
>>>
>>> Exception in thread "main" java.lang.ClassCastException:
>>> org.apache.spark.sql.types.ArrayType cannot be cast to
>>> org.apache.spark.sql.types.StructType
>>> at org.apache.spark.sql.catalyst.expressions.GetStructField.org
>>> $apache$spark$sql$catalyst$expressions$GetStructField$$field$lzycompute(complexTypeExtractors.scala:107)
>>> at org.apache.spark.sql.catalyst.expressions.GetStructField.org
>>> $apache$spark$sql$catalyst$expressions$GetStructField$$field(complexTypeExtractors.scala:107)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.GetStructField.dataType(complexTypeExtractors.scala:109)
>>> at
>>> org.apache.spark.sql.catalyst.analysis.ResolveUpCast$$anonfun$apply$24.applyOrElse(Analyzer.scala:1214)
>>> at
>>> org.apache.spark.sql.catalyst.analysis.ResolveUpCast$$anonfun$apply$24.applyOrElse(Analyzer.scala:1211)
>>> at
>>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:243)
>>> at
>>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:243)
>>> at
>>> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:53)
>>> at
>>> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:242)
>>> at
>>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:248)
>>> at
>>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:248)
>>> at
>>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:265)
>>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
>>> at scala.collection.Iterator$class.foreach(Iterator.scala:742)
>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
>>> at
>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
>>> at
>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
>>> at
>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
>>> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:308)
>>> at scala.collection.AbstractIterator.to(Iterator.scala:1194)
>>> at
>>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:300)
>>> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1194)
>>> at
>>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:287)
>>> 

Re: Spark Caching Kafka Metadata

2016-02-01 Thread Benjamin Han
Is there another way to create topics from Spark? Is there any reason the
above code snippet would still produce this error? I've dumbly inserted
waits and retries for testing, but that still doesn't consistently work,
even after waiting several minutes.

On Fri, Jan 29, 2016 at 8:29 AM, Cody Koeninger  wrote:

> The kafka direct stream doesn't do any explicit caching.  I haven't looked
> through the underlying simple consumer code in the kafka project in detail,
> but I doubt it does either.
>
> Honestly, I'd recommend not using auto created topics (it makes it too
> easy to pollute your topics if someone fat-fingers something when
> interacting with kafka), and instead explicitly creating topics before
> using them.
>
> If you're trying to create the topic in your spark job right before using
> it with direct stream, I can see how there might possibly be a race
> condition - you're using the ZK api, but the direct stream is talking only
> to the broker api.
>
> On Thu, Jan 28, 2016 at 6:07 PM, asdf zxcv 
> wrote:
>
>> Does Spark cache which kafka topics exist? A service incorrectly assumes
>> all the relevant topics exist, even if they are empty, causing it to fail.
>> Fortunately the service is automatically restarted and by default, kafka
>> creates the topic after it is requested.
>>
>> I'm trying to create the topic if it doesn't exist using
>> AdminUtils.createTopic:
>>
>>   val zkClient = new ZkClient("localhost:2181", 1, 1,
>> ZKStringSerializer)
>>   while (!AdminUtils.topicExists(zkClient, topic)) {
>> AdminUtils.createTopic(zkClient, topic, 1, 1, new Properties())
>>   }
>>
>> But I still get an Error getting partition metadata for 'topic-name'.
>> Does the topic exist? when I execute KafkaUtils.createDirectStream
>>
>> I've also tried to implement a retry with a wait such that the retry
>> should occur after Kafka has created the requested topic with 
>> auto.create.topics.enable
>> = true, but this still doesn't work consistently.
>>
>> This is a bit frustrating to debug as well since the topic is
>> successfully created about 50% of the time, other times I get message "Does
>> the topic exist?". My thinking is that Spark may be caching the list of
>> extant kafka topics, ignoring that I've added a new one. Is this the case?
>> Am I missing something?
>>
>>
>> Ben
>>
>
>


Re: java.nio.channels.ClosedChannelException in Spark Streaming KafKa Direct

2016-02-01 Thread Cody Koeninger
That indicates a problem in network communication between the executor and
the kafka broker.  Have you done any network troubleshooting?



On Mon, Feb 1, 2016 at 9:59 AM, SRK  wrote:

> Hi,
>
> I see the following error in Spark Streaming with Kafka Direct. I think
> that
> this error is related to Kafka topic. Any suggestions on how to avoid this
> error would be of great help.
>
> java.nio.channels.ClosedChannelException
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
> at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
> at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
> at
>
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
> at
>
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
> at
> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:396)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:413)
> at
>
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.insertAll(BypassMergeSortShuffleWriter.java:99)
> at
>
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/java-nio-channels-ClosedChannelException-in-Spark-Streaming-KafKa-Direct-tp26124.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Failed to 'collect_set' with dataset in spark 1.6

2016-02-01 Thread Ted Yu
bq. agg(collect_list("b")

Have you tried:

agg(collect_list($"b")

On Mon, Feb 1, 2016 at 8:50 AM, Alexandr Dzhagriev  wrote:

> Hello,
>
> I'm trying to run the following example code:
>
> import org.apache.spark.sql.hive.HiveContext
> import org.apache.spark.{SparkContext, SparkConf}
> import org.apache.spark.sql.functions._
>
>
> case class RecordExample(a: Int, b: String)
>
> object ArrayExample {
>   def main(args: Array[String]) {
> val conf = new SparkConf()
>
> val sc = new SparkContext(conf)
> val sqlContext = new HiveContext(sc)
>
> import sqlContext.implicits._
>
> val dataset = sc.parallelize(Seq(RecordExample(1, "apple"), 
> RecordExample(2, "orange"))).toDS()
>
> dataset.groupBy($"a").agg(collect_list("b").as[List[String]])
>
> dataset.collect()
>
>   }
>
> }
>
>
> and it fails with the following (please see the whole stack trace below):
>
>  Exception in thread "main" java.lang.ClassCastException:
> org.apache.spark.sql.types.ArrayType cannot be cast to
> org.apache.spark.sql.types.StructType
>
>
> Could please someone point me to the proper way to do that or confirm it's
> a bug?
>
> Thank you and here is the whole stacktrace:
>
> Exception in thread "main" java.lang.ClassCastException:
> org.apache.spark.sql.types.ArrayType cannot be cast to
> org.apache.spark.sql.types.StructType
> at org.apache.spark.sql.catalyst.expressions.GetStructField.org
> $apache$spark$sql$catalyst$expressions$GetStructField$$field$lzycompute(complexTypeExtractors.scala:107)
> at org.apache.spark.sql.catalyst.expressions.GetStructField.org
> $apache$spark$sql$catalyst$expressions$GetStructField$$field(complexTypeExtractors.scala:107)
> at
> org.apache.spark.sql.catalyst.expressions.GetStructField.dataType(complexTypeExtractors.scala:109)
> at
> org.apache.spark.sql.catalyst.analysis.ResolveUpCast$$anonfun$apply$24.applyOrElse(Analyzer.scala:1214)
> at
> org.apache.spark.sql.catalyst.analysis.ResolveUpCast$$anonfun$apply$24.applyOrElse(Analyzer.scala:1211)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:243)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:243)
> at
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:53)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:242)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:248)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:248)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:265)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
> at scala.collection.Iterator$class.foreach(Iterator.scala:742)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
> at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
> at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:308)
> at scala.collection.AbstractIterator.to(Iterator.scala:1194)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:300)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1194)
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:287)
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1194)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:305)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:248)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:248)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:248)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:265)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
> at scala.collection.Iterator$class.foreach(Iterator.scala:742)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
> at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
> at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:308)
> at scala.collection.AbstractIterator.to(Iterator.scala:1194)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:300)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1194)
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:287)
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1194)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:305)
> at
> 

Re: Spark streaming and ThreadLocal

2016-02-01 Thread N B
Is each partition guaranteed to execute in a single thread in a worker?

Thanks
N B


On Fri, Jan 29, 2016 at 6:53 PM, Shixiong(Ryan) Zhu  wrote:

> I see. Then you should use `mapPartitions` rather than using ThreadLocal.
> E.g.,
>
> dstream.mapPartitions( iter ->
> val d = new SomeClass();
> return iter.map { p =>
>somefunc(p, d.get())
> };
> }; );
>
>
> On Fri, Jan 29, 2016 at 5:29 PM, N B  wrote:
>
>> Well won't the code in lambda execute inside multiple threads in the
>> worker because it has to process many records? I would just want to have a
>> single copy of SomeClass instantiated per thread rather than once per each
>> record being processed. That was what triggered this thought anyways.
>>
>> Thanks
>> NB
>>
>>
>> On Fri, Jan 29, 2016 at 5:09 PM, Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>>> It looks weird. Why don't you just pass "new SomeClass()" to
>>> "somefunc"? You don't need to use ThreadLocal if there are no multiple
>>> threads in your codes.
>>>
>>> On Fri, Jan 29, 2016 at 4:39 PM, N B  wrote:
>>>
 Fixed a typo in the code to avoid any confusion Please comment on
 the code below...

 dstream.map( p -> { ThreadLocal d = new ThreadLocal<>() {
  public SomeClass initialValue() { return new SomeClass(); }
 };
 somefunc(p, d.get());
 d.remove();
 return p;
 }; );

 On Fri, Jan 29, 2016 at 4:32 PM, N B  wrote:

> So this use of ThreadLocal will be inside the code of a function
> executing on the workers i.e. within a call from one of the lambdas. Would
> it just look like this then:
>
> dstream.map( p -> { ThreadLocal d = new ThreadLocal<>() {
>  public SomeClass initialValue() { return new SomeClass(); }
> };
> somefunc(p, d.get());
> d.remove();
> return p;
> }; );
>
> Will this make sure that all threads inside the worker clean up the
> ThreadLocal once they are done with processing this task?
>
> Thanks
> NB
>
>
> On Fri, Jan 29, 2016 at 1:00 PM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Spark Streaming uses threadpools so you need to remove ThreadLocal
>> when it's not used.
>>
>> On Fri, Jan 29, 2016 at 12:55 PM, N B  wrote:
>>
>>> Thanks for the response Ryan. So I would say that it is in fact the
>>> purpose of a ThreadLocal i.e. to have a copy of the variable as long as 
>>> the
>>> thread lives. I guess my concern is around usage of threadpools and 
>>> whether
>>> Spark streaming will internally create many threads that rotate between
>>> tasks on purpose thereby holding onto ThreadLocals that may actually 
>>> never
>>> be used again.
>>>
>>> Thanks
>>>
>>> On Fri, Jan 29, 2016 at 12:12 PM, Shixiong(Ryan) Zhu <
>>> shixi...@databricks.com> wrote:
>>>
 Of cause. If you use a ThreadLocal in a long living thread and
 forget to remove it, it's definitely a memory leak.

 On Thu, Jan 28, 2016 at 9:31 PM, N B  wrote:

> Hello,
>
> Does anyone know if there are any potential pitfalls associated
> with using ThreadLocal variables in a Spark streaming application? One
> things I have seen mentioned in the context of app servers that use 
> thread
> pools is that ThreadLocals can leak memory. Could this happen in Spark
> streaming also?
>
> Thanks
> Nikunj
>
>

>>>
>>
>

>>>
>>
>


RE: Using Java spring injection with spark

2016-02-01 Thread Sambit Tripathy (RBEI/EDS1)
Hi Harsh,

I still do not understand your problem completely. If this is what you are 
talking about 
http://stackoverflow.com/questions/30053449/use-spring-together-with-spark



Best regards
Sambit Tripathy


From: HARSH TAKKAR [mailto:takkarha...@gmail.com]
Sent: Monday, February 01, 2016 10:28 PM
To: Sambit Tripathy (RBEI/EDS1) ; 
user@spark.apache.org
Subject: Re: Using Java spring injection with spark

Hi Sambit
My app is basically a cron which checks on the db, if there is a job that is 
scheduled and needs to be executed, and  it submits the job to spark using 
spark java api.This app is written with spring framework as core.
Each job has set of task which needs to be executed in an order.
> we have implemented a chain of responsibility pattern to do so,and we persist 
> a status of snapshot step on mysql after persistence of the step result in 
> csv file, So that if job breaks in between we know which file to pick and 
> start processing further.
> I basically wanted to inject the snapshot steps with jdbc layer and other 
> dependency through spring auto-wiring.
will using spring in this use case adversely affect the performance and as you 
mentioned will it cause serialization errors ?


On Tue, Feb 2, 2016 at 1:16 AM Sambit Tripathy (RBEI/EDS1) 
> wrote:


1.  It depends on what you want to do. Don’t worry about singleton and 
wiring the beans as it is pretty much taken care by the Spark Framework itself. 
Infact doing so, you will run into issues like serialization errors.



2.  You can write your code using Scala/ Python using the spark shell or a 
notebook like Ipython, zeppelin  or if you have written a application using 
Scala/Java using the Spark API you can create a jar and run it using 
spark-submit.
From: HARSH TAKKAR [mailto:takkarha...@gmail.com]
Sent: Monday, February 01, 2016 10:00 AM
To: user@spark.apache.org
Subject: Re: Using Java spring injection with spark


Hi

Please can anyone reply on this.

On Mon, 1 Feb 2016, 4:28 p.m. HARSH TAKKAR 
> wrote:
Hi
I am new to apache spark and big data analytics, before starting to code on 
spark data frames and rdd, i just wanted to confirm following
1. Can we create an implementation of java.api.Function as a singleton bean 
using the spring frameworks and, can it be injected using autowiring to other 
classes.
2. what is the best way to submit jobs to spark , using the api or using the 
shell script?
Looking forward for your help,
Kind Regards
Harsh


Spark Streaming with Kafka - batch DStreams in memory

2016-02-01 Thread p pathiyil
Hi,

Are there any ways to store DStreams / RDD read from Kafka in memory to be
processed at a later time ? What we need to do is to read data from Kafka,
process it to be keyed by some attribute that is present in the Kafka
messages, and write out the data related to each key when we have
accumulated enough data for that key to write out a file that is close to
the HDFS block size, say 64MB. We are looking at ways to avoid writing out
some file of the entire Kafka content periodically and then later run a
second job to read those files and split them out to another set of files
as necessary.

Thanks.


How to calculate weighted degrees in GraphX

2016-02-01 Thread Balachandar R.A.
I am new to GraphX and exploring example flight data analysis found on
online.
http://www.sparktutorials.net/analyzing-flight-data:-a-gentle-introduction-to-graphx-in-spark

I tried calculating inDegrees (understand how many incoming flights to an
airport) but I see value which corresponds to unique routes incoming to the
airport. This means that if there exist more than one flight between a
particular source and destination, lets' say 5, I am seeing only 1 for this
route. What I want is something similar in the line of weighted degrees
where in while calculating indegrees, the weight associated with the edge
should be considered. This weight value is in edge._attr parameter which is
not being considered for the computing degrees. Here is the my code block

val inDegrees = graph.inDegrees.join(airportVertices).sortBy(_._2._1,
ascending=false).take(10)
val outDegrees =
graph.outDegrees.join(airportVertices).sortBy(_._2._1,
ascending=false).take(10)
// Top 10 Indegrees
println("Top 10 indegrees - > " )
inDegrees.foreach{println}

// Top 10 out degrees
println("Top 10 outDegrees -> " )
outDegrees.foreach{println}

Can someone point to me right link or provide me an hint to solve this?

regards

Bala


Re: unsubscribe email

2016-02-01 Thread Kevin Mellott
Take a look at the first section on http://spark.apache.org/community.html.
You basically just need to send an email from the aliased email to
user-unsubscr...@spark.apache.org. If you cannot log into that email
directly, then I'd recommend using a mail client that allows for the
"send-as" functionality (such as Gmail
).

On Mon, Feb 1, 2016 at 4:38 PM, Eduardo Costa Alfaia  wrote:

> Hi Guys,
> How could I unsubscribe the email e.costaalf...@studenti.unibs.it, that
> is an alias from my email e.costaalf...@unibs.it and it is registered in
> the mail list .
>
> Thanks
>
> *Eduardo Costa Alfaia*
> *PhD Student Telecommunication Engineering*
> *Università degli Studi di Brescia-UNIBS*
>
>
> Informativa sulla Privacy: http://www.unibs.it/node/8155


Re: Getting the size of a broadcast variable

2016-02-01 Thread Takeshi Yamamuro
Hi,

Currently, there is no way to check the size except for snooping INFO-logs
in a driver;

16/02/02 14:51:53 INFO BlockManagerInfo: Added rdd_2_12 in memory on
localhost:58536 (size: 40.0 B, free: 510.7 MB)



On Tue, Feb 2, 2016 at 8:20 AM, apu mishra . rr 
wrote:

> How can I determine the size (in bytes) of a broadcast variable? Do I need
> to use the .dump method and then look at the size of the result, or is
> there an easier way?
>
> Using PySpark with Spark 1.6.
>
> Thanks!
>
> Apu
>



-- 
---
Takeshi Yamamuro


Re: Using Java spring injection with spark

2016-02-01 Thread HARSH TAKKAR
Hi Sambit

My app is basically a cron which checks on the db, if there is a job that
is scheduled and needs to be executed, and  it submits the job to spark
using spark java api.This app is written with spring framework as core.

Each job has set of task which needs to be executed in an order.
> we have implemented a chain of responsibility pattern to do so,and we
persist a status of snapshot step on mysql after persistence of the step
result in csv file, So that if job breaks in between we know which file to
pick and start processing further.

> I basically wanted to inject the snapshot steps with jdbc layer and other
dependency through spring auto-wiring.

will using spring in this use case adversely affect the performance and as
you mentioned will it cause serialization errors ?


On Tue, Feb 2, 2016 at 1:16 AM Sambit Tripathy (RBEI/EDS1) <
sambit.tripa...@in.bosch.com> wrote:

>
>
> 1.  It depends on what you want to do. Don’t worry about singleton
> and wiring the beans as it is pretty much taken care by the Spark Framework
> itself. Infact doing so, you will run into issues like serialization errors.
>
>
>
> 2.  You can write your code using Scala/ Python using the spark shell
> or a notebook like Ipython, zeppelin  or if you have written a application
> using Scala/Java using the Spark API you can create a jar and run it using
> spark-submit.
>
> *From:* HARSH TAKKAR [mailto:takkarha...@gmail.com]
> *Sent:* Monday, February 01, 2016 10:00 AM
> *To:* user@spark.apache.org
> *Subject:* Re: Using Java spring injection with spark
>
>
>
> Hi
>
> Please can anyone reply on this.
>
>
>
> On Mon, 1 Feb 2016, 4:28 p.m. HARSH TAKKAR  wrote:
>
> Hi
>
> I am new to apache spark and big data analytics, before starting to code
> on spark data frames and rdd, i just wanted to confirm following
>
> 1. Can we create an implementation of java.api.Function as a singleton
> bean using the spring frameworks and, can it be injected using autowiring
> to other classes.
>
> 2. what is the best way to submit jobs to spark , using the api or using
> the shell script?
>
> Looking forward for your help,
>
> Kind Regards
>
> Harsh
>
>


Is there some open source tools which implements draggable widget and make the app runing in a form of DAG ?

2016-02-01 Thread zml张明磊
Hello ,

  I am trying to find some tools but useless. So, as title described, Is 
there some open source tools which implements draggable widget and make the app 
running in a form of DAG like workflow ?

Thanks,
Minglei.


can we do column bind of 2 dataframes in spark R? similar to cbind in R?

2016-02-01 Thread Devesh Raj Singh
Hi,

I want to merge 2 dataframes in sparkR columnwise similar to cbind in R. We
have "unionAll" for r bind but could not find anything for cbind in sparkR

-- 
Warm regards,
Devesh.


What is the correct way to reset a Linux in Cluster?

2016-02-01 Thread pengzhang130
Hi All,

I have a spark cluster running in 3 Linux. One Linux is master, two other
Linux have stand-by masters. And each Linux have 2 worker instance running.  

I found that when the Linux that have the Spark master got reset or network
down, the application will froze and sometimes can not recover. So I wonder
any of you know if there is an official recommended steps to recover after
these situations:
 
1) reset the Linux running Spark master?
2) reset the Linux running Spark slaves?
3) recover from network down on Linux running Spark master?
4) recover from network down on Linux running Spark slaves?

Thanks
Peng




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/What-is-the-correct-way-to-reset-a-Linux-in-Cluster-tp26129.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: how to covert millisecond time to SQL timeStamp

2016-02-01 Thread Ted Yu
See related thread on using Joda DateTime:
http://search-hadoop.com/m/q3RTtSfi342nveex1=RE+NPE+
when+using+Joda+DateTime

On Mon, Feb 1, 2016 at 7:44 PM, Kevin Mellott 
wrote:

> I've had pretty good success using Joda-Time
>  for date/time manipulations
> within Spark applications. You may be able to use the *DateTIme* constructor
> below, if you are starting with milliseconds.
>
> DateTime
>
> public DateTime(long instant)
>
> Constructs an instance set to the milliseconds from 1970-01-01T00:00:00Z
> using ISOChronology in the default time zone.
> Parameters:instant - the milliseconds from 1970-01-01T00:00:00Z
>
> On Mon, Feb 1, 2016 at 5:51 PM, Andy Davidson <
> a...@santacruzintegration.com> wrote:
>
>> What little I know about working with timestamps is based on
>> https://databricks.com/blog/2015/09/16/spark-1-5-dataframe-api-highlights-datetimestring-handling-time-intervals-and-udafs.html
>>
>> Using the example of dates formatted into human friend strings ->
>> timeStamps I was able to figure out how to convert Epoch times to
>> timestamps. The same trick did not work for millisecond times.
>>
>> Any suggestions would be greatly appreciated.
>>
>>
>> Andy
>>
>> Working with epoch times
>> 
>>
>> ref: http://www.epochconverter.com/
>>
>> Epoch timestamp: 1456050620
>>
>> Timestamp in milliseconds: 145605062
>>
>> Human time (GMT): Sun, 21 Feb 2016 10:30:20 GMT
>>
>> Human time (your time zone): 2/21/2016, 2:30:20 AM
>>
>>
>> # Epoch time stamp example
>>
>> data = [
>>
>>   ("1456050620", "1456050621", 1),
>>
>>   ("1456050622", "14560506203", 2),
>>
>>   ("14560506204", "14560506205", 3)]
>>
>> df = sqlContext.createDataFrame(data, ["start_time", "end_time", "id"])
>>
>> ​
>>
>> # convert epoch time strings in to spark timestamps
>>
>> df = df.select(
>>
>>   df.start_time.cast("long").alias("start_time"),
>>
>>   df.end_time.cast("long").alias("end_time"),
>>
>>   df.id)
>>
>> df.printSchema()
>>
>> df.show(truncate=False)
>>
>> ​
>>
>> # convert longs to timestamps
>>
>> df = df.select(
>>
>>   df.start_time.cast("timestamp").alias("start_time"),
>>
>>   df.end_time.cast("timestamp").alias("end_time"),
>>
>>   df.id)
>>
>> df.printSchema()
>>
>> df.show(truncate=False)
>>
>> ​
>>
>> root
>>  |-- start_time: long (nullable = true)
>>  |-- end_time: long (nullable = true)
>>  |-- id: long (nullable = true)
>>
>> +---+---+---+
>> |start_time |end_time   |id |
>> +---+---+---+
>> |1456050620 |1456050621 |1  |
>> |1456050622 |14560506203|2  |
>> |14560506204|14560506205|3  |
>> +---+---+---+
>>
>> root
>>  |-- start_time: timestamp (nullable = true)
>>  |-- end_time: timestamp (nullable = true)
>>  |-- id: long (nullable = true)
>>
>> +-+-+---+
>> |start_time   |end_time |id |
>> +-+-+---+
>> |2016-02-21 02:30:20.0|2016-02-21 02:30:21.0|1  |
>> |2016-02-21 02:30:22.0|2431-05-28 02:03:23.0|2  |
>> |2431-05-28 02:03:24.0|2431-05-28 02:03:25.0|3  |
>> +-+-+---+
>>
>>
>> In [21]:
>>
>> # working with millisecond times
>>
>> data = [
>>
>>   ("145605062", "145605062", 1)]
>>
>>   df = sqlContext.createDataFrame(data, ["start_time", "end_time", "id"])
>>
>> ​
>>
>> # convert epoch time strings in to spark timestamps
>>
>> df = df.select(
>>
>>   df.start_time.cast("long").alias("start_time"),
>>
>>   df.end_time.cast("long").alias("end_time"),
>>
>>   df.id)
>>
>> df.printSchema()
>>
>> df.show(truncate=False)
>>
>> ​
>>
>> # convert longs to timestamps
>>
>> df = df.select(
>>
>>   df.start_time.cast("timestamp").alias("start_time"),
>>
>>   df.end_time.cast("timestamp").alias("end_time"),
>>
>>   df.id)
>>
>> df.printSchema()
>>
>> df.show(truncate=False)
>>
>> root
>>  |-- start_time: long (nullable = true)
>>  |-- end_time: long (nullable = true)
>>  |-- id: long (nullable = true)
>>
>> +-+-+---+
>> |start_time   |end_time |id |
>> +-+-+---+
>> |145605062|145605062|1  |
>> +-+-+---+
>>
>> root
>>  |-- start_time: timestamp (nullable = true)
>>  |-- end_time: timestamp (nullable = true)
>>  |-- id: long (nullable = true)
>>
>> +--+--+---+
>> |start_time|end_time  |id |
>> +--+--+---+
>> |48110-05-29 10:33:20.0|48110-05-29 10:33:20.0|1  |
>> +--+--+---+
>>
>>
>>
>


Re: Explaination for info shown in UI

2016-02-01 Thread Yogesh Mahajan
The jobs depend on the number of output operations (print, foreachRDD,
saveAs*Files) and the number of RDD actions in those output operations.

For example:
dstream1.foreachRDD { rdd => rdd.count }// ONE Spark job per batch
dstream1.foreachRDD { rdd => { rdd.count ; rdd.count } } // TWO Spark jobs
per batch
dstream1.foreachRDD { rdd => rdd.count } ; dstream2.foreachRDD { rdd =>
rdd.count }  // TWO Spark jobs per batch

Regards,
Yogesh Mahajan
SnappyData Inc (snappydata.io)

On Thu, Jan 28, 2016 at 4:30 PM, Sachin Aggarwal  wrote:

> Hi
>
> I am executing a streaming wordcount with kafka
> with one test topic with 2 partition
> my cluster have three spark executors
>
> Each batch is of 10 sec
>
> for every batch(ex below * batch time 02:51:00*) I see 3 entry in spark
> UI , as shown below below
>
> my questions:-
> 1) As label says jobId for first column, does spark submits 3 jobs for
> each batch ?
> 2) I tried decreasing executers/nodes the job count is also getting
> changed what is the relation with no of  executors?
> 3) only one job actually executes the stage rest two shows skipped why
> other jobs got created?
>
> Job IdDescriptionSubmittedDurationStages: Succeeded/TotalTasks (for all
> stages): Succeeded/Total
> 221 Streaming job from [output operation 0, batch time 02:51:00] print at
> StreamingWordCount.scala:54 2016/01/28 02:51:00 46 ms 1/1 (1 skipped)
> 1/1 (3 skipped)
> 220 Streaming job from [output operation 0, batch time 02:51:00] print at
> StreamingWordCount.scala:54 2016/01/28 02:51:00 47 ms 1/1 (1 skipped)
> 4/4 (3 skipped)
> 219 Streaming job from [output operation 0, batch time 02:51:00] print at
> StreamingWordCount.scala:54 2016/01/28 02:51:00 48 ms 2/2
> 4/4
>
> --
>
> Thanks & Regards
>
> Sachin Aggarwal
> 7760502772
>


Re: how to covert millisecond time to SQL timeStamp

2016-02-01 Thread VISHNU SUBRAMANIAN
HI ,

If you need a data frame specific solution , you can try the below

df.select(from_unixtime(col("max(utcTimestamp)")/1000))

On Tue, 2 Feb 2016 at 09:44 Ted Yu  wrote:

> See related thread on using Joda DateTime:
> http://search-hadoop.com/m/q3RTtSfi342nveex1=RE+NPE+
> when+using+Joda+DateTime
>
> On Mon, Feb 1, 2016 at 7:44 PM, Kevin Mellott 
> wrote:
>
>> I've had pretty good success using Joda-Time
>>  for date/time manipulations
>> within Spark applications. You may be able to use the *DateTIme* constructor
>> below, if you are starting with milliseconds.
>>
>> DateTime
>>
>> public DateTime(long instant)
>>
>> Constructs an instance set to the milliseconds from 1970-01-01T00:00:00Z
>> using ISOChronology in the default time zone.
>> Parameters:instant - the milliseconds from 1970-01-01T00:00:00Z
>>
>> On Mon, Feb 1, 2016 at 5:51 PM, Andy Davidson <
>> a...@santacruzintegration.com> wrote:
>>
>>> What little I know about working with timestamps is based on
>>> https://databricks.com/blog/2015/09/16/spark-1-5-dataframe-api-highlights-datetimestring-handling-time-intervals-and-udafs.html
>>>
>>> Using the example of dates formatted into human friend strings ->
>>> timeStamps I was able to figure out how to convert Epoch times to
>>> timestamps. The same trick did not work for millisecond times.
>>>
>>> Any suggestions would be greatly appreciated.
>>>
>>>
>>> Andy
>>>
>>> Working with epoch times
>>> 
>>>
>>> ref: http://www.epochconverter.com/
>>>
>>> Epoch timestamp: 1456050620
>>>
>>> Timestamp in milliseconds: 145605062
>>>
>>> Human time (GMT): Sun, 21 Feb 2016 10:30:20 GMT
>>>
>>> Human time (your time zone): 2/21/2016, 2:30:20 AM
>>>
>>>
>>> # Epoch time stamp example
>>>
>>> data = [
>>>
>>>   ("1456050620", "1456050621", 1),
>>>
>>>   ("1456050622", "14560506203", 2),
>>>
>>>   ("14560506204", "14560506205", 3)]
>>>
>>> df = sqlContext.createDataFrame(data, ["start_time", "end_time", "id"])
>>>
>>> ​
>>>
>>> # convert epoch time strings in to spark timestamps
>>>
>>> df = df.select(
>>>
>>>   df.start_time.cast("long").alias("start_time"),
>>>
>>>   df.end_time.cast("long").alias("end_time"),
>>>
>>>   df.id)
>>>
>>> df.printSchema()
>>>
>>> df.show(truncate=False)
>>>
>>> ​
>>>
>>> # convert longs to timestamps
>>>
>>> df = df.select(
>>>
>>>   df.start_time.cast("timestamp").alias("start_time"),
>>>
>>>   df.end_time.cast("timestamp").alias("end_time"),
>>>
>>>   df.id)
>>>
>>> df.printSchema()
>>>
>>> df.show(truncate=False)
>>>
>>> ​
>>>
>>> root
>>>  |-- start_time: long (nullable = true)
>>>  |-- end_time: long (nullable = true)
>>>  |-- id: long (nullable = true)
>>>
>>> +---+---+---+
>>> |start_time |end_time   |id |
>>> +---+---+---+
>>> |1456050620 |1456050621 |1  |
>>> |1456050622 |14560506203|2  |
>>> |14560506204|14560506205|3  |
>>> +---+---+---+
>>>
>>> root
>>>  |-- start_time: timestamp (nullable = true)
>>>  |-- end_time: timestamp (nullable = true)
>>>  |-- id: long (nullable = true)
>>>
>>> +-+-+---+
>>> |start_time   |end_time |id |
>>> +-+-+---+
>>> |2016-02-21 02:30:20.0|2016-02-21 02:30:21.0|1  |
>>> |2016-02-21 02:30:22.0|2431-05-28 02:03:23.0|2  |
>>> |2431-05-28 02:03:24.0|2431-05-28 02:03:25.0|3  |
>>> +-+-+---+
>>>
>>>
>>> In [21]:
>>>
>>> # working with millisecond times
>>>
>>> data = [
>>>
>>>   ("145605062", "145605062", 1)]
>>>
>>>   df = sqlContext.createDataFrame(data, ["start_time", "end_time", "id"])
>>>
>>> ​
>>>
>>> # convert epoch time strings in to spark timestamps
>>>
>>> df = df.select(
>>>
>>>   df.start_time.cast("long").alias("start_time"),
>>>
>>>   df.end_time.cast("long").alias("end_time"),
>>>
>>>   df.id)
>>>
>>> df.printSchema()
>>>
>>> df.show(truncate=False)
>>>
>>> ​
>>>
>>> # convert longs to timestamps
>>>
>>> df = df.select(
>>>
>>>   df.start_time.cast("timestamp").alias("start_time"),
>>>
>>>   df.end_time.cast("timestamp").alias("end_time"),
>>>
>>>   df.id)
>>>
>>> df.printSchema()
>>>
>>> df.show(truncate=False)
>>>
>>> root
>>>  |-- start_time: long (nullable = true)
>>>  |-- end_time: long (nullable = true)
>>>  |-- id: long (nullable = true)
>>>
>>> +-+-+---+
>>> |start_time   |end_time |id |
>>> +-+-+---+
>>> |145605062|145605062|1  |
>>> +-+-+---+
>>>
>>> root
>>>  |-- start_time: timestamp (nullable = true)
>>>  |-- end_time: timestamp (nullable = true)
>>>  |-- id: long (nullable = true)
>>>
>>> +--+--+---+
>>> |start_time|end_time  |id |
>>> +--+--+---+
>>> 

Re: how to covert millisecond time to SQL timeStamp

2016-02-01 Thread Kevin Mellott
I've had pretty good success using Joda-Time
 for date/time manipulations
within Spark applications. You may be able to use the *DateTIme* constructor
below, if you are starting with milliseconds.

DateTime

public DateTime(long instant)

Constructs an instance set to the milliseconds from 1970-01-01T00:00:00Z
using ISOChronology in the default time zone.
Parameters:instant - the milliseconds from 1970-01-01T00:00:00Z

On Mon, Feb 1, 2016 at 5:51 PM, Andy Davidson  wrote:

> What little I know about working with timestamps is based on
> https://databricks.com/blog/2015/09/16/spark-1-5-dataframe-api-highlights-datetimestring-handling-time-intervals-and-udafs.html
>
> Using the example of dates formatted into human friend strings ->
> timeStamps I was able to figure out how to convert Epoch times to
> timestamps. The same trick did not work for millisecond times.
>
> Any suggestions would be greatly appreciated.
>
>
> Andy
>
> Working with epoch times
> 
>
> ref: http://www.epochconverter.com/
>
> Epoch timestamp: 1456050620
>
> Timestamp in milliseconds: 145605062
>
> Human time (GMT): Sun, 21 Feb 2016 10:30:20 GMT
>
> Human time (your time zone): 2/21/2016, 2:30:20 AM
>
>
> # Epoch time stamp example
>
> data = [
>
>   ("1456050620", "1456050621", 1),
>
>   ("1456050622", "14560506203", 2),
>
>   ("14560506204", "14560506205", 3)]
>
> df = sqlContext.createDataFrame(data, ["start_time", "end_time", "id"])
>
> ​
>
> # convert epoch time strings in to spark timestamps
>
> df = df.select(
>
>   df.start_time.cast("long").alias("start_time"),
>
>   df.end_time.cast("long").alias("end_time"),
>
>   df.id)
>
> df.printSchema()
>
> df.show(truncate=False)
>
> ​
>
> # convert longs to timestamps
>
> df = df.select(
>
>   df.start_time.cast("timestamp").alias("start_time"),
>
>   df.end_time.cast("timestamp").alias("end_time"),
>
>   df.id)
>
> df.printSchema()
>
> df.show(truncate=False)
>
> ​
>
> root
>  |-- start_time: long (nullable = true)
>  |-- end_time: long (nullable = true)
>  |-- id: long (nullable = true)
>
> +---+---+---+
> |start_time |end_time   |id |
> +---+---+---+
> |1456050620 |1456050621 |1  |
> |1456050622 |14560506203|2  |
> |14560506204|14560506205|3  |
> +---+---+---+
>
> root
>  |-- start_time: timestamp (nullable = true)
>  |-- end_time: timestamp (nullable = true)
>  |-- id: long (nullable = true)
>
> +-+-+---+
> |start_time   |end_time |id |
> +-+-+---+
> |2016-02-21 02:30:20.0|2016-02-21 02:30:21.0|1  |
> |2016-02-21 02:30:22.0|2431-05-28 02:03:23.0|2  |
> |2431-05-28 02:03:24.0|2431-05-28 02:03:25.0|3  |
> +-+-+---+
>
>
> In [21]:
>
> # working with millisecond times
>
> data = [
>
>   ("145605062", "145605062", 1)]
>
>   df = sqlContext.createDataFrame(data, ["start_time", "end_time", "id"])
>
> ​
>
> # convert epoch time strings in to spark timestamps
>
> df = df.select(
>
>   df.start_time.cast("long").alias("start_time"),
>
>   df.end_time.cast("long").alias("end_time"),
>
>   df.id)
>
> df.printSchema()
>
> df.show(truncate=False)
>
> ​
>
> # convert longs to timestamps
>
> df = df.select(
>
>   df.start_time.cast("timestamp").alias("start_time"),
>
>   df.end_time.cast("timestamp").alias("end_time"),
>
>   df.id)
>
> df.printSchema()
>
> df.show(truncate=False)
>
> root
>  |-- start_time: long (nullable = true)
>  |-- end_time: long (nullable = true)
>  |-- id: long (nullable = true)
>
> +-+-+---+
> |start_time   |end_time |id |
> +-+-+---+
> |145605062|145605062|1  |
> +-+-+---+
>
> root
>  |-- start_time: timestamp (nullable = true)
>  |-- end_time: timestamp (nullable = true)
>  |-- id: long (nullable = true)
>
> +--+--+---+
> |start_time|end_time  |id |
> +--+--+---+
> |48110-05-29 10:33:20.0|48110-05-29 10:33:20.0|1  |
> +--+--+---+
>
>
>


Re: Failed to 'collect_set' with dataset in spark 1.6

2016-02-01 Thread Alexandr Dzhagriev
Good to know, thanks.

On Mon, Feb 1, 2016 at 6:57 PM, Ted Yu  wrote:

> Got around the previous error by adding:
>
> scala> implicit val kryoEncoder = Encoders.kryo[RecordExample]
> kryoEncoder: org.apache.spark.sql.Encoder[RecordExample] = class[value[0]:
> binary]
>
> On Mon, Feb 1, 2016 at 9:55 AM, Alexandr Dzhagriev 
> wrote:
>
>> Hi,
>>
>> That's another thing: that the Record case class should be outside. I ran
>> it as spark-submit.
>>
>> Thanks, Alex.
>>
>> On Mon, Feb 1, 2016 at 6:41 PM, Ted Yu  wrote:
>>
>>> Running your sample in spark-shell built in master branch, I got:
>>>
>>> scala> val dataset = sc.parallelize(Seq(RecordExample(1, "apple"),
>>> RecordExample(2, "orange"))).toDS()
>>> org.apache.spark.sql.AnalysisException: Unable to generate an encoder
>>> for inner class `RecordExample` without access to the scope that this class
>>> was defined in. Try moving this class out of its parent class.;
>>>   at
>>> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$$anonfun$3.applyOrElse(ExpressionEncoder.scala:316)
>>>   at
>>> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$$anonfun$3.applyOrElse(ExpressionEncoder.scala:312)
>>>   at
>>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:262)
>>>   at
>>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:262)
>>>   at
>>> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>>>   at
>>> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:261)
>>>   at
>>> org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:251)
>>>   at
>>> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.resolve(ExpressionEncoder.scala:312)
>>>   at org.apache.spark.sql.Dataset.(Dataset.scala:80)
>>>   at org.apache.spark.sql.Dataset.(Dataset.scala:91)
>>>   at org.apache.spark.sql.SQLContext.createDataset(SQLContext.scala:488)
>>>   at
>>> org.apache.spark.sql.SQLImplicits.rddToDatasetHolder(SQLImplicits.scala:71)
>>>   ... 53 elided
>>>
>>> On Mon, Feb 1, 2016 at 9:09 AM, Alexandr Dzhagriev 
>>> wrote:
>>>
 Hello again,

 Also I've tried the following snippet with concat_ws:

 val dataset = sc.parallelize(Seq(
   RecordExample(1, "apple"),
   RecordExample(1, "banana"),
   RecordExample(2, "orange"))
 ).toDS().groupBy($"a").agg(concat_ws(",", $"b").as[String])

 dataset.take(10).foreach(println)


 which also fails

 Exception in thread "main" org.apache.spark.sql.AnalysisException:
 expression 'b' is neither present in the group by, nor is it an aggregate
 function. Add to group by or wrap in first() (or first_value) if you don't
 care which value you get.;
 at
 org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
 at
 org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:44)
 at
 org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$
 1.org
 $apache$spark$sql$catalyst$analysis$CheckAnalysis$class$$anonfun$$checkValidAggregateExpression$1(CheckAnalysis.scala:130)

 Thanks, Alex.

 On Mon, Feb 1, 2016 at 6:03 PM, Alexandr Dzhagriev 
 wrote:

> Hi Ted,
>
> That doesn't help neither as one method delegates to another as far as
> I can see:
>
> def collect_list(columnName: String): Column = 
> collect_list(Column(columnName))
>
>
> Thanks, Alex
>
> On Mon, Feb 1, 2016 at 5:55 PM, Ted Yu  wrote:
>
>> bq. agg(collect_list("b")
>>
>> Have you tried:
>>
>> agg(collect_list($"b")
>>
>> On Mon, Feb 1, 2016 at 8:50 AM, Alexandr Dzhagriev 
>> wrote:
>>
>>> Hello,
>>>
>>> I'm trying to run the following example code:
>>>
>>> import org.apache.spark.sql.hive.HiveContext
>>> import org.apache.spark.{SparkContext, SparkConf}
>>> import org.apache.spark.sql.functions._
>>>
>>>
>>> case class RecordExample(a: Int, b: String)
>>>
>>> object ArrayExample {
>>>   def main(args: Array[String]) {
>>> val conf = new SparkConf()
>>>
>>> val sc = new SparkContext(conf)
>>> val sqlContext = new HiveContext(sc)
>>>
>>> import sqlContext.implicits._
>>>
>>> val dataset = sc.parallelize(Seq(RecordExample(1, "apple"), 
>>> RecordExample(2, "orange"))).toDS()
>>>
>>> dataset.groupBy($"a").agg(collect_list("b").as[List[String]])
>>>
>>> dataset.collect()
>>>
>>>   }
>>>
>>> }
>>>
>>>
>>> and it fails with the following (please see the whole stack trace
>>> below):
>>>
>>>  Exception in thread "main" java.lang.ClassCastException:
>>> 

Re: Can't view executor logs in web UI on Windows

2016-02-01 Thread Ted Yu
I did a brief search but didn't find relevant JIRA either. 

You can create a JIRA and submit pull request for the fix. 

Cheers

> On Feb 1, 2016, at 5:13 AM, Mark Pavey  wrote:
> 
> I am running Spark on Windows. When I try to view the Executor logs in the UI
> I get the following error:
> 
> HTTP ERROR 500
> 
> Problem accessing /logPage/. Reason:
> 
>Server Error
> Caused by:
> 
> java.net.URISyntaxException: Illegal character in path at index 1:
> .\work/app-20160129154716-0038/2/
>at java.net.URI$Parser.fail(Unknown Source)
>at java.net.URI$Parser.checkChars(Unknown Source)
>at java.net.URI$Parser.parseHierarchical(Unknown Source)
>at java.net.URI$Parser.parse(Unknown Source)
>at java.net.URI.(Unknown Source)
>at org.apache.spark.deploy.worker.ui.LogPage.getLog(LogPage.scala:141)
>at org.apache.spark.deploy.worker.ui.LogPage.render(LogPage.scala:78)
>at org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:79)
>at org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:79)
>at org.apache.spark.ui.JettyUtils$$anon$1.doGet(JettyUtils.scala:69)
>at javax.servlet.http.HttpServlet.service(HttpServlet.java:735)
>at javax.servlet.http.HttpServlet.service(HttpServlet.java:848)
>at
> org.spark-project.jetty.servlet.ServletHolder.handle(ServletHolder.java:684)
>at
> org.spark-project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:501)
>at
> org.spark-project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1086)
>at
> org.spark-project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:428)
>at
> org.spark-project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1020)
>at
> org.spark-project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135)
>at
> org.spark-project.jetty.server.handler.GzipHandler.handle(GzipHandler.java:264)
>at
> org.spark-project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:255)
>at
> org.spark-project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:116)
>at org.spark-project.jetty.server.Server.handle(Server.java:370)
>at
> org.spark-project.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpConnection.java:494)
>at
> org.spark-project.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpConnection.java:971)
>at
> org.spark-project.jetty.server.AbstractHttpConnection$RequestHandler.headerComplete(AbstractHttpConnection.java:1033)
>at org.spark-project.jetty.http.HttpParser.parseNext(HttpParser.java:644)
>at
> org.spark-project.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)
>at
> org.spark-project.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java:82)
>at
> org.spark-project.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.java:667)
>at
> org.spark-project.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.java:52)
>at
> org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
>at
> org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
>at java.lang.Thread.run(Unknown Source)
> 
> 
> 
> Looking at the source code for
> org.apache.spark.deploy.worker.ui.LogPage.getLog reveals the following:
> - At line 141 the constructor of java.net.URI is called with the path to
> the log directory as a String argument. This string
> (".\work/app-20160129154716-0038/2/" in example above) contains a backslash,
> which is an illegal character for the URI constructor.
> - The component of the path containing the backslash is created at line 71
> by calling the getPath method on a java.io.File object. Because it is
> running on Windows it uses the default Windows file separator, which is a
> backslash.
> 
> I am using Spark 1.5.1 but the source code appears unchanged in 1.6.0.
> 
> I haven't been able to find an open issue for this but if there is one could
> possibly submit a pull request for it.
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Can-t-view-executor-logs-in-web-UI-on-Windows-tp26122.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Executor retries infinitely

2016-02-01 Thread Ted Yu
I haven't found config knob for controlling the retry count after brief
search.

According to
http://www.oracle.com/technetwork/articles/java/g1gc-1984535.html , default
value for -XX:ParallelGCThreads= seems to be 8.
This seems to explain why you got the VM initialization error.

FYI

On Mon, Feb 1, 2016 at 4:16 AM, Prabhu Joseph 
wrote:

> Hi All,
>
>   When a Spark job (Spark-1.5.2) is submitted with a single executor and
> if user passes some wrong JVM arguments with
> spark.executor.extraJavaOptions, the first executor fails. But the job
> keeps on retrying, creating a new executor and failing every tim*e, *until
> CTRL-C is pressed*. *Do we have configuration to limit the retry attempts.
>
> *Example:*
>
> ./spark-submit --class SimpleApp --master "spark://10.10.72.145:7077"
> --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails
> -XX:+PrintGCTimeStamps -XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35
> -XX:ConcGCThreads=16" /SPARK/SimpleApp.jar
>
> Executor fails with
>
> Error occurred during initialization of VM
> Can't have more ConcGCThreads than ParallelGCThreads.
>
> But the job does not exit, keeps on creating executors and retrying.
> ..
> 16/02/01 06:54:28 INFO SparkDeploySchedulerBackend: *Granted executor ID
> app-20160201065319-0014/2846* on hostPort 10.10.72.145:36558 with 12
> cores, 2.0 GB RAM
> 16/02/01 06:54:28 INFO AppClient$ClientEndpoint: Executor updated:
> app-20160201065319-0014/2846 is now LOADING
> 16/02/01 06:54:28 INFO AppClient$ClientEndpoint: Executor updated:
> app-20160201065319-0014/2846 is now RUNNING
> 16/02/01 06:54:28 INFO AppClient$ClientEndpoint: Executor updated:
> app-20160201065319-0014/2846 is now EXITED (Command exited with code 1)
> 16/02/01 06:54:28 INFO SparkDeploySchedulerBackend: Executor
> app-20160201065319-0014/2846 removed: Command exited with code 1
> 16/02/01 06:54:28 INFO SparkDeploySchedulerBackend: Asked to remove
> non-existent executor 2846
> 16/02/01 06:54:28 INFO AppClient$ClientEndpoint: *Executor added:
> app-20160201065319-0014/2847* on worker-20160131230345-10.10.72.145-36558
> (10.10.72.145:36558) with 12 cores
> 16/02/01 06:54:28 INFO SparkDeploySchedulerBackend: Granted executor ID
> app-20160201065319-0014/2847 on hostPort 10.10.72.145:36558 with 12
> cores, 2.0 GB RAM
> 16/02/01 06:54:28 INFO AppClient$ClientEndpoint: Executor updated:
> app-20160201065319-0014/2847 is now LOADING
> 16/02/01 06:54:28 INFO AppClient$ClientEndpoint: Executor updated:
> app-20160201065319-0014/2847 is now EXITED (Command exited with code 1)
> 16/02/01 06:54:28 INFO SparkDeploySchedulerBackend: Executor
> app-20160201065319-0014/2847 removed: Command exited with code 1
> 16/02/01 06:54:28 INFO SparkDeploySchedulerBackend: Asked to remove
> non-existent executor 2847
> 16/02/01 06:54:28 INFO AppClient$ClientEndpoint:* Executor added:
> app-20160201065319-0014/2848* on worker-20160131230345-10.10.72.145-36558
> (10.10.72.145:36558) with 12 cores
> 16/02/01 06:54:28 INFO SparkDeploySchedulerBackend: Granted executor ID
> app-20160201065319-0014/2848 on hostPort 10.10.72.145:36558 with 12
> cores, 2.0 GB RAM
> 16/02/01 06:54:28 INFO AppClient$ClientEndpoint: Executor updated:
> app-20160201065319-0014/2848 is now LOADING
> 16/02/01 06:54:28 INFO AppClient$ClientEndpoint: Executor updated:
> app-20160201065319-0014/2848 is now RUNNING
> 
>
>
>
> Thanks,
> Prabhu Joseph
>
>
>


java.nio.channels.ClosedChannelException in Spark Streaming KafKa Direct

2016-02-01 Thread SRK
Hi,

I see the following error in Spark Streaming with Kafka Direct. I think that
this error is related to Kafka topic. Any suggestions on how to avoid this
error would be of great help.

java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
at
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
at
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:396)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:413)
at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.insertAll(BypassMergeSortShuffleWriter.java:99)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-nio-channels-ClosedChannelException-in-Spark-Streaming-KafKa-Direct-tp26124.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org