Re: Spark Streaming Suggestion

2015-09-16 Thread srungarapu vamsi
@David, I am going through the articles you have shared. Will message you
if i need any hellp. Thanks

@Ayan,  Yes, it looks like i can get every thing done with spark streaming.
In fact we have storm already in the architecture sanitizing the data and
dumping into cassandra. Now, i got some new requirements for which spark
streaming is the right tool. Just wanted to see if there can be smooth
marriage between existing storm and spark streaming.

Thanks for the inputs.

On Wed, Sep 16, 2015 at 2:30 AM, ayan guha  wrote:

> I think you need to make up your mind about storm vs spark. Using both in
> this context does not make much sense to me.
> On 15 Sep 2015 22:54, "David Morales"  wrote:
>
>> Hi there,
>>
>> This is exactly our goal in Stratio Sparkta, a real-time aggregation
>> engine fully developed with spark streaming (and fully open source).
>>
>> Take a look at:
>>
>>
>>- the docs: http://docs.stratio.com/modules/sparkta/development/
>>- the repository: https://github.com/Stratio/sparkta
>>- and some slides explaining how sparkta was born and what it makes:
>>http://www.slideshare.net/Stratio/strata-sparkta
>>
>>
>> Feel free to ask us anything about the project.
>>
>>
>>
>>
>>
>>
>>
>>
>> 2015-09-15 8:10 GMT+02:00 srungarapu vamsi :
>>
>>> The batch approach i had implemented takes about 10 minutes to complete
>>> all the pre-computation tasks for the one hour worth of data. When i went
>>> through my code, i figured out that most of the time consuming tasks are
>>> the ones, which read data from cassandra and the places where i perform
>>> sparkContex.union(Array[RDD]).
>>> Now the ask is to get the pre computation tasks near real time. So i am
>>> exploring the streaming approach.
>>>
>>> My pre computation tasks not only include just finding the unique
>>> numbers for a given device every minute, every hour, every day but it also
>>> includes the following tasks:
>>> 1. Find the number of unique numbers across a set of devices every
>>> minute, every hour, every day
>>> 2. Find the number of unique numbers which are commonly occurring across
>>> a set of devices every minute, every hour, every day
>>> 3. Find (total time a number occurred across a set of devices)/(total
>>> unique numbers occurred across the set of devices)
>>> The above mentioned pre computation tasks are just a few of what i will
>>> be needing and there are many more coming towards me :)
>>> I see all these problems need more of data parallel approach and hence i
>>> am interested to do this on the spark streaming end.
>>>
>>>
>>> On Tue, Sep 15, 2015 at 11:04 AM, Jörn Franke 
>>> wrote:
>>>
 Why did you not stay with the batch approach? For me the architecture
 looks very complex for a simple thing you want to achieve. Why don't you
 process the data already in storm ?

 Le mar. 15 sept. 2015 à 6:20, srungarapu vamsi <
 srungarapu1...@gmail.com> a écrit :

> I am pretty new to spark. Please suggest a better model for the
> following use case.
>
> I have few (about 1500) devices in field which keep emitting about
> 100KB of data every minute. The nature of data sent by the devices is just
> a list of numbers.
> As of now, we have Storm is in the architecture which receives this
> data, sanitizes it and writes to cassandra.
> Now, i have a requirement to process this data. The processing
> includes finding unique numbers emitted by one or more devices for every
> minute, every hour, every day, every month.
> I had implemented this processing part as a batch job execution and
> now i am interested in making it a streaming application. i.e calculating
> the processed data as and when devices emit the data.
>
> I have the following two approaches:
> 1. Storm writes the actual data to cassandra and writes a message on
> Kafka bus that data corresponding to device D and minute M has been 
> written
> to cassandra
>
> Then Spark streaming reads this message from kafka , then reads the
> data of Device D at minute M from cassandra and starts processing the 
> data.
>
> 2. Storm writes the data to both cassandra and  kafka, spark reads the
> actual data from kafka , processes the data and writes to cassandra.
> The second approach avoids additional hit of reading from cassandra
> every minute , a device has written data to cassandra at the cost of
> putting the actual heavy messages instead of light events on  kafka.
>
> I am a bit confused among the two approaches. Please suggest which one
> is better and if both are bad, how can i handle this use case?
>
>
> --
> /Vamsi
>

>>>
>>>
>>> --
>>> /Vamsi
>>>
>>
>>
>>
>> --
>>
>> David Morales de Frías  ::  +34 607 010 411 :: @dmoralesdf
>> 
>>
>>
>> 

Idle time between jobs

2015-09-16 Thread patcharee

Hi,

I am using Spark 1.5. I have a spark application which is divided into 
some jobs. I noticed from the Event Timeline - Spark History UI, that 
there was idle time between jobs. See below, job 1 was submitted at 
11:20:49 and finished at 11:20:52, but the job 2 was submitted "16s" 
after (at 11:21:08). I wonder what is going on during 16s? Any suggestions?


Job IdDescription SubmittedDuration
2 saveAsTextFile at GenerateHistogram.scala:143 2015/09/16 
11:21:08 0.7 s

1 collect at GenerateHistogram.scala:132 2015/09/16 11:20:49 2 s
0 count at GenerateHistogram.scala:129 2015/09/16 11:20:41 9 s

Below is log

15/09/16 11:20:52 INFO DAGScheduler: Job 1 finished: collect at 
GenerateHistogram.scala:132, took 2.221756 s
15/09/16 11:21:08 INFO deprecation: mapred.tip.id is deprecated. 
Instead, use mapreduce.task.id
15/09/16 11:21:08 INFO deprecation: mapred.task.id is deprecated. 
Instead, use mapreduce.task.attempt.id
15/09/16 11:21:08 INFO deprecation: mapred.task.is.map is deprecated. 
Instead, use mapreduce.task.ismap
15/09/16 11:21:08 INFO deprecation: mapred.task.partition is deprecated. 
Instead, use mapreduce.task.partition
15/09/16 11:21:08 INFO deprecation: mapred.job.id is deprecated. 
Instead, use mapreduce.job.id
15/09/16 11:21:08 INFO FileOutputCommitter: File Output Committer 
Algorithm version is 1
15/09/16 11:21:08 INFO SparkContext: Starting job: saveAsTextFile at 
GenerateHistogram.scala:143
15/09/16 11:21:08 INFO DAGScheduler: Got job 2 (saveAsTextFile at 
GenerateHistogram.scala:143) with 1 output partitions
15/09/16 11:21:08 INFO DAGScheduler: Final stage: ResultStage 
2(saveAsTextFile at GenerateHistogram.scala:143)


BR,
Patcharee


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Managing scheduling delay in Spark Streaming

2015-09-16 Thread Dibyendu Bhattacharya
Hi Michal,

If you use https://github.com/dibbhatt/kafka-spark-consumer  , it comes
with int own built-in back pressure mechanism. By default this is disabled,
you need to enable it to use this feature with this consumer. It does
control the rate based on Scheduling Delay at runtime..

Regards,
Dibyendu

On Wed, Sep 16, 2015 at 12:32 PM, Akhil Das 
wrote:

> I had a workaround for exactly the same scenario
> http://apache-spark-developers-list.1001551.n3.nabble.com/SparkStreaming-Workaround-for-BlockNotFound-Exceptions-td12096.html
>
> Apart from that, if you are using this consumer
> https://github.com/dibbhatt/kafka-spark-consumer it also has a built-in
> rate limiting, Also in Spark 1.5.0 they have a rate limiting/back-pressure
> (haven't tested it on production though).
>
>
>
> Thanks
> Best Regards
>
> On Tue, Sep 15, 2015 at 11:56 PM, Michal Čizmazia 
> wrote:
>
>> Hi,
>>
>> I have a Reliable Custom Receiver storing messages into Spark. Is there
>> way how to prevent my receiver from storing more messages into Spark when
>> the Scheduling Delay reaches a certain threshold?
>>
>> Possible approaches:
>> #1 Does Spark block on the Receiver.store(messages) call to prevent
>> storing more messages and overflowing the system?
>> #2 How to obtain the Scheduling Delay in the Custom Receiver, so that I
>> can implement the feature.
>>
>> Thanks,
>>
>> Mike
>>
>>
>


Re: How to avoid shuffle errors for a large join ?

2015-09-16 Thread Reynold Xin
Only SQL and DataFrame for now.

We are thinking about how to apply that to a more general distributed
collection based API, but it's not in 1.5.

On Sat, Sep 5, 2015 at 11:56 AM, Gurvinder Singh  wrote:

> On 09/05/2015 11:22 AM, Reynold Xin wrote:
> > Try increase the shuffle memory fraction (by default it is only 16%).
> > Again, if you run Spark 1.5, this will probably run a lot faster,
> > especially if you increase the shuffle memory fraction ...
> Hi Reynold,
>
> Does the 1.5 has better join/cogroup performance for RDD case too or
> only for SQL.
>
> - Gurvinder
> >
> > On Tue, Sep 1, 2015 at 8:13 AM, Thomas Dudziak  > > wrote:
> >
> > While it works with sort-merge-join, it takes about 12h to finish
> > (with 1 shuffle partitions). My hunch is that the reason for
> > that is this:
> >
> > INFO ExternalSorter: Thread 3733 spilling in-memory map of 174.9 MB
> > to disk (62 times so far)
> >
> > (and lots more where this comes from).
> >
> > On Sat, Aug 29, 2015 at 7:17 PM, Reynold Xin  > > wrote:
> >
> > Can you try 1.5? This should work much, much better in 1.5 out
> > of the box.
> >
> > For 1.4, I think you'd want to turn on sort-merge-join, which is
> > off by default. However, the sort-merge join in 1.4 can still
> > trigger a lot of garbage, making it slower. SMJ performance is
> > probably 5x - 1000x better in 1.5 for your case.
> >
> >
> > On Thu, Aug 27, 2015 at 6:03 PM, Thomas Dudziak
> > > wrote:
> >
> > I'm getting errors like "Removing executor with no recent
> > heartbeats" & "Missing an output location for shuffle"
> > errors for a large SparkSql join (1bn rows/2.5TB joined with
> > 1bn rows/30GB) and I'm not sure how to configure the job to
> > avoid them.
> >
> > The initial stage completes fine with some 30k tasks on a
> > cluster with 70 machines/10TB memory, generating about 6.5TB
> > of shuffle writes, but then the shuffle stage first waits
> > 30min in the scheduling phase according to the UI, and then
> > dies with the mentioned errors.
> >
> > I can see in the GC logs that the executors reach their
> > memory limits (32g per executor, 2 workers per machine) and
> > can't allocate any more stuff in the heap. Fwiw, the top 10
> > in the memory use histogram are:
> >
> > num #instances #bytes  class name
> > --
> >1: 24913959511958700560
> >  scala.collection.immutable.HashMap$HashMap1
> >2: 251085327 8034730464 
> >  scala.Tuple2
> >3: 243694737 5848673688  java.lang.Float
> >4: 231198778 5548770672  java.lang.Integer
> >5:  72191585 4298521576
> >  [Lscala.collection.immutable.HashMap;
> >6:  72191582 2310130624
> >  scala.collection.immutable.HashMap$HashTrieMap
> >7:  74114058 1778737392  java.lang.Long
> >8:   6059103  779203840  [Ljava.lang.Object;
> >9:   5461096  174755072
> >  scala.collection.mutable.ArrayBuffer
> >   10: 34749   70122104  [B
> >
> > Relevant settings are (Spark 1.4.1, Java 8 with G1 GC):
> >
> > spark.core.connection.ack.wait.timeout 600
> > spark.executor.heartbeatInterval   60s
> > spark.executor.memory  32g
> > spark.mesos.coarse false
> > spark.network.timeout  600s
> > spark.shuffle.blockTransferService netty
> > spark.shuffle.consolidateFiles true
> > spark.shuffle.file.buffer  1m
> > spark.shuffle.io.maxRetries6
> > spark.shuffle.manager  sort
> >
> > The join is currently configured with
> > spark.sql.shuffle.partitions=1000 but that doesn't seem to
> > help. Would increasing the partitions help ? Is there a
> > formula to determine an approximate partitions number value
> > for a join ?
> > Any help with this job would be appreciated !
> >
> > cheers,
> > Tom
> >
> >
> >
> >
>
>


RE: Getting parent RDD

2015-09-16 Thread Samya MAITI
Hi Akhil,

I suppose this will give me the transformed msg & not the original msg.

I need the data corresponding to msgStream & not wordCountPair.

As per my understanding, we need to keep a copy of incoming stream (not sure 
how), so as to refer to that in catch block.

Regards,
Sam

From: Akhil Das [mailto:ak...@sigmoidanalytics.com]
Sent: Wednesday, September 16, 2015 12:24 PM
To: Samya MAITI 
Cc: user@spark.apache.org
Subject: Re: Getting parent RDD

​How many RDDs are you having in that stream? If its a single RDD then you 
could do a .foreach and log the message, something like:


val ssc = 
val msgStream = .   //SparkKafkaDirectAPI
val wordCountPair = TransformStream.transform(msgStream)
/wordCountPair.foreach(
​msg​
=>
  try{
//Some action that causes exception
  }catch {
case ex1 : Exception => {
   // *How to get hold of the msgStream, so that I can log the
actual message that caused the exception.*
​ Log.error("Whoops! This message :=>" + msg)​

  }
)/​


Thanks
Best Regards

On Tue, Sep 15, 2015 at 9:13 PM, Samya 
> wrote:
Hi Team

I have the below situation.

val ssc = 
val msgStream = .   //SparkKafkaDirectAPI
val wordCountPair = TransformStream.transform(msgStream)
/wordCountPair.foreachRDD(rdd =>
  try{
//Some action that causes exception
  }catch {
case ex1 : Exception => {
   // *How to get hold of the msgStream, so that I can log the
actual message that caused the exception.*
  }
)/


Regards,
Sam



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Getting-parent-RDD-tp24701.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org



Re: Getting parent RDD

2015-09-16 Thread Akhil Das
If i understand it correctly, then in your transform function make it
return DStream[OldMsg, TranformedMsg] and then in the try...catch you can
access ._1 for oldMsg and then ._2 for transformedMsg.

Thanks
Best Regards

On Wed, Sep 16, 2015 at 2:46 PM, Samya MAITI 
wrote:

> Hi Akhil,
>
>
>
> I suppose this will give me the transformed msg & not the original msg.
>
>
>
> I need the data corresponding to *msgStream* & not *wordCountPair*.
>
>
>
> As per my understanding, we need to keep a copy of incoming stream (not
> sure how), so as to refer to that in catch block.
>
>
>
> Regards,
>
> Sam
>
>
>
> *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com]
> *Sent:* Wednesday, September 16, 2015 12:24 PM
> *To:* Samya MAITI 
> *Cc:* user@spark.apache.org
> *Subject:* Re: Getting parent RDD
>
>
>
> ​How many RDDs are you having in that stream? If its a single RDD then
> you could do a .foreach and log the message, something like:
>
>
>
>
>
> val ssc = 
> val msgStream = .   //SparkKafkaDirectAPI
> val wordCountPair = TransformStream.transform(msgStream)
> /wordCountPair.foreach(
>
> ​msg​
>
> =>
>   try{
> //Some action that causes exception
>   }catch {
> case ex1 : Exception => {
>// *How to get hold of the msgStream, so that I can log the
> actual message that caused the exception.*
>
> ​ Log.error("Whoops! This message :=>" + msg)​
>
>
>   }
>
> )/​
>
>
>
>
> Thanks
>
> Best Regards
>
>
>
> On Tue, Sep 15, 2015 at 9:13 PM, Samya  wrote:
>
> Hi Team
>
> I have the below situation.
>
> val ssc = 
> val msgStream = .   //SparkKafkaDirectAPI
> val wordCountPair = TransformStream.transform(msgStream)
> /wordCountPair.foreachRDD(rdd =>
>   try{
> //Some action that causes exception
>   }catch {
> case ex1 : Exception => {
>// *How to get hold of the msgStream, so that I can log the
> actual message that caused the exception.*
>   }
> )/
>
>
> Regards,
> Sam
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Getting-parent-RDD-tp24701.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>


Re: Managing scheduling delay in Spark Streaming

2015-09-16 Thread Akhil Das
I had a workaround for exactly the same scenario
http://apache-spark-developers-list.1001551.n3.nabble.com/SparkStreaming-Workaround-for-BlockNotFound-Exceptions-td12096.html

Apart from that, if you are using this consumer
https://github.com/dibbhatt/kafka-spark-consumer it also has a built-in
rate limiting, Also in Spark 1.5.0 they have a rate limiting/back-pressure
(haven't tested it on production though).



Thanks
Best Regards

On Tue, Sep 15, 2015 at 11:56 PM, Michal Čizmazia  wrote:

> Hi,
>
> I have a Reliable Custom Receiver storing messages into Spark. Is there
> way how to prevent my receiver from storing more messages into Spark when
> the Scheduling Delay reaches a certain threshold?
>
> Possible approaches:
> #1 Does Spark block on the Receiver.store(messages) call to prevent
> storing more messages and overflowing the system?
> #2 How to obtain the Scheduling Delay in the Custom Receiver, so that I
> can implement the feature.
>
> Thanks,
>
> Mike
>
>


Re: Getting parent RDD

2015-09-16 Thread Akhil Das
​How many RDDs are you having in that stream? If its a single RDD then you
could do a .foreach and log the message, something like:


val ssc = 
val msgStream = .   //SparkKafkaDirectAPI
val wordCountPair = TransformStream.transform(msgStream)
/wordCountPair.foreach(
​msg​
=>
  try{
//Some action that causes exception
  }catch {
case ex1 : Exception => {
   // *How to get hold of the msgStream, so that I can log the
actual message that caused the exception.*
​ Log.error("Whoops! This message :=>" + msg)​

  }
)/​


Thanks
Best Regards

On Tue, Sep 15, 2015 at 9:13 PM, Samya  wrote:

> Hi Team
>
> I have the below situation.
>
> val ssc = 
> val msgStream = .   //SparkKafkaDirectAPI
> val wordCountPair = TransformStream.transform(msgStream)
> /wordCountPair.foreachRDD(rdd =>
>   try{
> //Some action that causes exception
>   }catch {
> case ex1 : Exception => {
>// *How to get hold of the msgStream, so that I can log the
> actual message that caused the exception.*
>   }
> )/
>
>
> Regards,
> Sam
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Getting-parent-RDD-tp24701.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Difference between sparkDriver and "executor ID driver"

2015-09-16 Thread Hemant Bhanawat
1. When you call new SparkContext(), spark driver is started which
internally create Akka ActorSystem which registers on this port.

2. Since you are running in local mode, starting of executor is short
circuited and an Executor object is created in the same process (see
LocalEndpoint). This Executor object logs this message with executor ID as
"driver".

On Wed, Sep 16, 2015 at 9:44 AM, Muler  wrote:

> I'm running Spark in local mode and getting these two log messages who
> appear to be similar. I want to understand what each is doing:
>
>
>1. [main] util.Utils (Logging.scala:logInfo(59)) - Successfully
>started service 'sparkDriver' on port 60782.
>2. [main] executor.Executor (Logging.scala:logInfo(59)) - Starting
>executor ID driver on host localhost
>
> 1. is created using:
>
> val actorSystemName = if (isDriver) driverActorSystemName else
> executorActorSystemName
>
> val rpcEnv = RpcEnv.create(actorSystemName, hostname, port, conf,
> securityManager)
> val actorSystem = rpcEnv.asInstanceOf[AkkaRpcEnv].actorSystem
>
> 2. is created when:
>
>  _taskScheduler.start()
>
>
> What is the difference and what does each do?
>
>
>


Spark streaming on spark-standalone/ yarn inside Spring XD

2015-09-16 Thread Vignesh Radhakrishnan
Hi,  I am trying to run a Spark processor on Spring XD for streaming operation.

The spark processor module on Spring XD works when spark is pointing to local. 
The processor fails to run when we point spark to spark standalone (running on 
the same machine) or yarn-client.  Is it possible to run spark processor on 
spark standalone or yarn inside spring XD or is spark local the only option 
here ?

The processor module is:

class WordCount extends Processor[String, (String, Int)] {

  def process(input: ReceiverInputDStream[String]): DStream[(String, Int)] = {
  val words = input.flatMap(_.split(" "))
  val pairs = words.map(word => (word, 1))
  val wordCounts = pairs.reduceByKey(_ + _)
  wordCounts
  }

  @SparkConfig
  def properties : Properties = {
val props = new Properties()
// Any specific Spark configuration properties would go here.
// These properties always get the highest precedence
//props.setProperty("spark.master", "spark://a.b.c.d:7077")
props.setProperty("spark.master", "spark://abcd.hadoop.ambari:7077")
props
  }

}

Below is the error log that I get:

// Error Log

2015-09-16T14:28:48+0530 1.2.0.RELEASE INFO DeploymentsPathChildrenCache-0 
container.DeploymentListener - Deploying module 'log' for stream 
'spark-streaming-word-count'
2015-09-16T14:28:48+0530 1.2.0.RELEASE INFO DeploymentsPathChildrenCache-0 
container.DeploymentListener - Deploying module [ModuleDescriptor@6dbc4f81 
moduleName = 'log', moduleLabel = 'log', group = 'spark-streaming-word-count', 
sourceChannelName = [null], sinkChannelName = [null], index = 2, type = sink, 
parameters = map[[empty]], children = list[[empty]]]
2015-09-16T14:28:48+0530 1.2.0.RELEASE INFO DeploymentsPathChildrenCache-0 
container.DeploymentListener - Path cache event: 
path=/deployments/modules/allocated/4ff3ba84-e6ca-47dd-894f-aa92bdbb3e06/spark-streaming-word-count.processor.processor.1,
 type=CHILD_ADDED
2015-09-16T14:28:48+0530 1.2.0.RELEASE INFO DeploymentsPathChildrenCache-0 
container.DeploymentListener - Deploying module 'processor' for stream 
'spark-streaming-word-count'
2015-09-16T14:28:48+0530 1.2.0.RELEASE INFO DeploymentsPathChildrenCache-0 
container.DeploymentListener - Deploying module [ModuleDescriptor@5e16dafb 
moduleName = 'scala-word-count', moduleLabel = 'processor', group = 
'spark-streaming-word-count', sourceChannelName = [null], sinkChannelName = 
[null], index = 1, type = processor, parameters = map[[empty]], children = 
list[[empty]]]
2015-09-16T14:28:49+0530 1.2.0.RELEASE WARN DeploymentsPathChildrenCache-0 
util.NativeCodeLoader - Unable to load native-hadoop library for your 
platform... using builtin-java classes where applicable
2015-09-16T14:28:49+0530 1.2.0.RELEASE WARN 
sparkDriver-akka.actor.default-dispatcher-3 remote.ReliableDeliverySupervisor - 
Association with remote system [akka.tcp://sparkMaster@abcd.hadoop.ambari:7077] 
has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
2015-09-16T14:29:09+0530 1.2.0.RELEASE WARN 
sparkDriver-akka.actor.default-dispatcher-4 remote.ReliableDeliverySupervisor - 
Association with remote system [akka.tcp://sparkMaster@abcd.hadoop.ambari:7077] 
has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
2015-09-16T14:29:18+0530 1.2.0.RELEASE INFO DeploymentsPathChildrenCache-0 
container.DeploymentListener - Path cache event: 
path=/deployments/modules/allocated/8d07cdba-557e-458a-9225-b90e5a5778ce/spark-streaming-word-count.source.http.1,
 type=CHILD_ADDED
2015-09-16T14:29:18+0530 1.2.0.RELEASE INFO DeploymentsPathChildrenCache-0 
container.DeploymentListener - Deploying module 'http' for stream 
'spark-streaming-word-count'
2015-09-16T14:29:18+0530 1.2.0.RELEASE INFO DeploymentsPathChildrenCache-0 
container.DeploymentListener - Deploying module [ModuleDescriptor@610e43b0 
moduleName = 'http', moduleLabel = 'http', group = 
'spark-streaming-word-count', sourceChannelName = [null], sinkChannelName = 
[null], index = 0, type = source, parameters = map[[empty]], children = 
list[[empty]]]
2015-09-16T14:29:19+0530 1.2.0.RELEASE INFO DeploymentSupervisor-0 
zk.ZKStreamDeploymentHandler - Deployment status for stream 
'spark-streaming-word-count': DeploymentStatus{state=failed,error(s)=Deployment 
of module 'ModuleDeploymentKey{stream='spark-streaming-word-count', 
type=processor, label='processor'}' to container 
'4ff3ba84-e6ca-47dd-894f-aa92bdbb3e06' timed out after 3 ms}
2015-09-16T14:29:29+0530 1.2.0.RELEASE WARN 
sparkDriver-akka.actor.default-dispatcher-4 remote.ReliableDeliverySupervisor - 
Association with remote system [akka.tcp://sparkMaster@abcd.hadoop.ambari:7077] 
has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
2015-09-16T14:29:49+0530 1.2.0.RELEASE ERROR 
sparkDriver-akka.actor.default-dispatcher-3 cluster.SparkDeploySchedulerBackend 
- Application has been killed. Reason: All 

How to update python code in memory

2015-09-16 Thread Margus Roo

Hi

In example I submited python code to cluster:
in/spark-submit --master spark://nn1:7077 SocketListen.py
Now I discovered that I have to change something in SocketListen.py.
One way is stop older work and submit new one.
Is there way to change code in workers machines so that there no need to 
submit new code?


--
Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
+372 51 480


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to recovery DStream from checkpoint directory?

2015-09-16 Thread Akhil Das
You can't really recover from checkpoint if you alter the code. A better
approach would be to use some sort of external storage (like a db or
zookeeper etc) to keep the state (the indexes etc) and then when you deploy
new code they can be easily recovered.

Thanks
Best Regards

On Wed, Sep 16, 2015 at 3:52 PM, Bin Wang  wrote:

> I'd like to know if there is a way to recovery dstream from checkpoint.
>
> Because I stores state in DStream, I'd like the state to be recovered when
> I restart the application and deploy new code.
>


Re: How to recovery DStream from checkpoint directory?

2015-09-16 Thread Bin Wang
Will StreamingContex.getOrCreate do this work?What kind of code change will
make it cannot load?

Akhil Das 于2015年9月16日周三 20:20写道:

> You can't really recover from checkpoint if you alter the code. A better
> approach would be to use some sort of external storage (like a db or
> zookeeper etc) to keep the state (the indexes etc) and then when you deploy
> new code they can be easily recovered.
>
> Thanks
> Best Regards
>
> On Wed, Sep 16, 2015 at 3:52 PM, Bin Wang  wrote:
>
>> I'd like to know if there is a way to recovery dstream from checkpoint.
>>
>> Because I stores state in DStream, I'd like the state to be recovered
>> when I restart the application and deploy new code.
>>
>
>


How to recovery DStream from checkpoint directory?

2015-09-16 Thread Bin Wang
I'd like to know if there is a way to recovery dstream from checkpoint.

Because I stores state in DStream, I'd like the state to be recovered when
I restart the application and deploy new code.


RE: application failed on large dataset

2015-09-16 Thread java8964
Can you try for "nio", instead of "netty".
set "spark.shuffle.blockTransferService", to "nio" and give it a try.
Yong
From: z.qian...@gmail.com
Date: Wed, 16 Sep 2015 03:21:02 +
Subject: Re: application failed on large dataset
To: java8...@hotmail.com; user@spark.apache.org

Hi,   after check with the yarn logs, all the error stack looks like below:
15/09/15 19:58:23 ERROR shuffle.OneForOneBlockFetcher: Failed while starting 
block fetchesjava.io.IOException: Connection reset by peerat 
sun.nio.ch.FileDispatcherImpl.read0(Native Method)at 
sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)at 
sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)at 
sun.nio.ch.IOUtil.read(IOUtil.java:192)at 
sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)at 
io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:313)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881) 
   at 
io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242)
at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) 
   at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)
It seems that some error occurs when try to fetch the block, and after 
several retries, the executor just dies with such error.And for your 
question, I did not see any executor restart during the job.PS: the 
operator I am using during that stage if rdd.glom().mapPartitions()

java8964 于2015年9月15日周二 下午11:44写道:



When you saw this error, does any executor die due to whatever error?
Do you check to see if any executor restarts during your job?
It is hard to help you just with the stack trace. You need to tell us the whole 
picture when your jobs are running.
Yong

From: qhz...@apache.org
Date: Tue, 15 Sep 2015 15:02:28 +
Subject: Re: application failed on large dataset
To: user@spark.apache.org

has anyone met the same problems?
周千昊 于2015年9月14日周一 下午9:07写道:
Hi, community  I am facing a strange problem:  all executors does not 
respond, and then all of them failed with the ExecutorLostFailure.  when I 
look into yarn logs, there are full of such exception
15/09/14 04:35:33 ERROR shuffle.RetryingBlockFetcher: Exception while beginning 
fetch of 1 outstanding blocks (after 3 retries)java.io.IOException: Failed to 
connect to host/ip:portat 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
at 
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
   at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
   at java.lang.Thread.run(Thread.java:745)Caused by: 
java.net.ConnectException: Connection refused: host/ip:portat 
sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at 
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) 
   at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
... 1 more

  The strange thing is that, if I reduce the input size, the problems just 
disappeared. 

Spark Thrift Server JDBC Drivers

2015-09-16 Thread Daniel Haviv
Hi,
are there any free JDBC drivers for thrift ?
The only ones I could find are Simba's which require a license.

Thank,
Daniel


Re: application failed on large dataset

2015-09-16 Thread 周千昊
Hi,
 I have switch 'spark.shuffle.blockTransferService' to 'nio'. But the
problem still exists. However the stack trace is a little bit different:
PART one:
15/09/16 06:20:32 ERROR executor.Executor: Exception in task 1.2 in stage
15.0 (TID 5341)
java.io.IOException: Failed without being ACK'd
at
org.apache.spark.network.nio.ConnectionManager$MessageStatus.failWithoutAck(ConnectionManager.scala:72)
at
org.apache.spark.network.nio.ConnectionManager$$anonfun$removeConnection$3.apply(ConnectionManager.scala:533)
at
org.apache.spark.network.nio.ConnectionManager$$anonfun$removeConnection$3.apply(ConnectionManager.scala:531)
at scala.collection.immutable.List.foreach(List.scala:318)
at
org.apache.spark.network.nio.ConnectionManager.removeConnection(ConnectionManager.scala:531)
at
org.apache.spark.network.nio.ConnectionManager$$anonfun$addListeners$3.apply(ConnectionManager.scala:510)
at
org.apache.spark.network.nio.ConnectionManager$$anonfun$addListeners$3.apply(ConnectionManager.scala:510)
at
org.apache.spark.network.nio.Connection.callOnCloseCallback(Connection.scala:162)
at
org.apache.spark.network.nio.Connection.close(Connection.scala:130)
at
org.apache.spark.network.nio.ConnectionManager$$anonfun$stop$1.apply(ConnectionManager.scala:1000)
at
org.apache.spark.network.nio.ConnectionManager$$anonfun$stop$1.apply(ConnectionManager.scala:1000)
at
scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:107)
at
scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:107)
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at
scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:107)
at
org.apache.spark.network.nio.ConnectionManager.stop(ConnectionManager.scala:1000)
at
org.apache.spark.network.nio.NioBlockTransferService.close(NioBlockTransferService.scala:78)
at
org.apache.spark.storage.BlockManager.stop(BlockManager.scala:1228)
at org.apache.spark.SparkEnv.stop(SparkEnv.scala:100)
at org.apache.spark.executor.Executor.stop(Executor.scala:144)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:113)
at org.apache.spark.rpc.akka.AkkaRpcEnv.org
$apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:177)
at
org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:126)
at org.apache.spark.rpc.akka.AkkaRpcEnv.org
$apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:197)
at
org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:125)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
at
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at
org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at
org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:92)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

PART two:
15/09/16 06:14:36 INFO nio.ConnectionManager: Removing SendingConnection to
ConnectionManagerId()
15/09/16 06:14:36 INFO nio.ConnectionManager: Removing ReceivingConnection
to ConnectionManagerId()
15/09/16 06:14:36 ERROR nio.ConnectionManager: Corresponding
SendingConnection to ConnectionManagerId() not found
15/09/16 06:14:36 INFO nio.ConnectionManager: Key not valid ?
sun.nio.ch.SelectionKeyImpl@3011c7c9
15/09/16 06:14:36 INFO 

Re: Spark wastes a lot of space (tmp data) for iterative jobs

2015-09-16 Thread Ali Hadian
Thanks for your response, Alexis. 

I have seen this page, but its suggested solutions do not work and the tmp 
space still grows linearly after unpersisting RDDs and calling System.gc() 
in each iteration.

I think it might be due to one of the following reasons:

1. System.gc() does not directly invoke the garbage collector, but it just 
requests JVM to run GC, and JVM usually postpones it until memory is almost 
filled. However, since we are just running out of hard-disk space (not 
memory space), GC does not run; therefore the finalize() methods for the 
intermediate RDDs are not triggered.

2. System.gc() is only executed on the driver, but not on the workers (Is it 
how it works??!!)

Any suggestions?

Kind regards
Ali Hadian


-Original Message-
From: Alexis Gillain 
To: Ali Hadian 
Cc: spark users 
Date: Wed, 16 Sep 2015 12:05:35 +0800
Subject: Re: Spark wastes a lot of space (tmp data) for iterative jobs

You can try system.gc() considering that checkpointing is enabled by default 
in graphx :

https://forums.databricks.com/questions/277/how-do-i-avoid-the-no-space-left-on-device-error.html

2015-09-15 22:42 GMT+08:00 Ali Hadian :
Hi!
We are executing the PageRank example from the Spark java examples package 
on a very large input graph. The code is available here. (Spark's github 
repo).
During the execution, the framework generates huge amount of intermediate 
data per each iteration (i.e. the contribs RDD). The intermediate data is 
temporary, but Spark does not clear the intermediate data of previous 
iterations. That is to say, if we are in the middle of 20th iteration, all 
of the temporary data of all previous iterations (iteration 0 to 19) are 
still kept in the tmp  directory. As a result, the tmp directory grows 
linearly.
It seems rational to keep the data from only the previous iteration, because 
if the current iteration fails, the job can be continued using the 
intermediate data from the previous iteration. Anyways, why does it keep the 
intermediate data for ALL previous iterations???
How can we enforce Spark to clear these intermediate data during the 
execution of job?

Kind regards, 
Ali hadian




--
Alexis GILLAIN

Spark on YARN / aws - executor lost on node restart

2015-09-16 Thread Adrian Tanase
Hi all,

We’re using spark streaming (1.4.0), deployed on AWS through yarn. It’s a 
stateful app that reads from kafka (with the new direct API) and we’re 
checkpointing to HDFS.

During some resilience testing, we restarted one of the machines and brought it 
back online. During the offline period, the Yarn cluster would not have 
resources to re-create the missing executor.
After starting all the services on the machine, it correctly joined the Yarn 
cluster, however the spark streaming app does not seem to notice that the 
resources are back and has not re-created the missing executor.

The app is correctly running with 6 out o 7 executors, however it’s running 
under capacity.
If we manually kill the driver and re-submit the app to yarn, all the sate is 
correctly recreated from checkpoint and all 7 executors are now online – 
however this seems like a brutal workaround.

So, here are some questions:

  *   Isn't the driver supposed to auto-heal after a machine is completely lost 
and then comes back after some time?
  *   Are any configuration settings that influence how spark driver should 
poll yarn to check back on resources being available again?
  *   Is there a tool one can run to “force” the driver to re-create missing 
workers/executors?

Lastly, another issue was that the driver also crashed and yarn successfully 
restarted it – I’m not sure yet if it’s because of some retry setting or 
another exception, will post the logs after I recreate the problem.

Thanks in advance for any ideas,
-adrian


How to calculate average from multiple values

2015-09-16 Thread diplomatic Guru
 have a mapper that emit key/value pairs(composite keys and composite
values separated by comma).

e.g

*key:* a,b,c,d *Value:* 1,2,3,4,5

*key:* a1,b1,c1,d1 *Value:* 5,4,3,2,1

...

...

*key:* a,b,c,d *Value:* 5,4,3,2,1


I could easily SUM these values using reduceByKey.

e.g.

reduceByKey(new Function2() {

@Override
public String call(String value1, String value2) {
String oldValue[] = value1.toString().split(",");
String newValue[] = value2.toString().split(",");

int iFirst = Integer.parseInt(oldValue[0]) +
Integer.parseInt(newValue[0]);
int iSecond = Integer.parseInt(oldValue[1]) +
Integer.parseInt(newValue[1]);
int iThird = Integer.parseInt(oldValue[2]) +
Integer.parseInt(newValue[2]);
int iFourth = Integer.parseInt(oldValue[3]) +
Integer.parseInt(newValue[3]);
int iFifth = Integer.parseInt(oldValue[4]) +
Integer.parseInt(newValue[4]);

return iFirst  + "," + iSecond + ","
+ iThird+ "," + iFourth+ "," + iFifth;

}
});

But the problem is how do I find average of just one of these values. Lets
assume I want to SUM iFirst, iSecond, iThird and iFourth but I want to find
Average of iFifth. How do i do it? With a simple key/value pairs I could
use mapValues function but not sure how I could do it with my example.
Please advice.


Re: Spark Streaming application code change and stateful transformations

2015-09-16 Thread Ofir Kerker
Thanks Cody!
The 2nd solution is safer but seems wasteful :/
I'll try to optimize it by keeping in addition to the 'last-complete-hour'
the corresponding offsets that bound the incomplete data to try and
fast-forward only the last couple of hours in the worst case.

On Mon, Sep 14, 2015 at 22:14 Cody Koeninger  wrote:

> Solution 2 sounds better to me.  You aren't always going to have graceful
> shutdowns.
>
> On Mon, Sep 14, 2015 at 1:49 PM, Ofir Kerker 
> wrote:
>
>> Hi,
>> My Spark Streaming application consumes messages (events) from Kafka every
>> 10 seconds using the direct stream approach and aggregates these messages
>> into hourly aggregations (to answer analytics questions like: "How many
>> users from Paris visited page X between 8PM to 9PM") and save the data to
>> Cassandra.
>>
>> I was wondering if there's a good practice for handling a code change in a
>> Spark Streaming applications that uses stateful transformations
>> (updateStateByKey for example) because the new application code will not
>> be
>> able to use the data that was checkpointed by the former application.
>> I have thought of a few solutions for this issue and was hoping some of
>> you
>> have some experience with such case and can suggest other solutions or
>> feedback my suggested solutions:
>> *Solution #1*: On a graceful shutdown, in addition to the current Kafka
>> offsets, persist the current aggregated data into Cassandra tables
>> (different than the regular aggregation tables) that would allow reading
>> them easily when the new application starts in order to build the initial
>> state.
>> *Solution #2*: When an hour is "complete" (i.e not expecting more events
>> with the timestamp of this hour), update somewhere persistent (DB / shared
>> file) the last-complete-hour. This will allow me, when the new application
>> starts, to read all the events from Kafka from the beginning of retention
>> period (last X hours) and ignore events from timestamp smaller or equal
>> than
>> the last-complete-hour.
>>
>> I'll be happy to get your feedback!
>>
>> Thanks,
>> Ofir
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-application-code-change-and-stateful-transformations-tp24692.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


RE: application failed on large dataset

2015-09-16 Thread java8964
This sounds like a memory issue.
Do you enable the GC output? When this is happening, are your executors doing 
full gc? How long is the full gc?
Yong

From: qhz...@apache.org
Date: Wed, 16 Sep 2015 13:52:25 +
Subject: Re: application failed on large dataset
To: java8...@hotmail.com; user@spark.apache.org

Hi, I have switch 'spark.shuffle.blockTransferService' to 'nio'. But the 
problem still exists. However the stack trace is a little bit different:PART 
one:15/09/16 06:20:32 ERROR executor.Executor: Exception in task 1.2 in stage 
15.0 (TID 5341)java.io.IOException: Failed without being ACK'dat 
org.apache.spark.network.nio.ConnectionManager$MessageStatus.failWithoutAck(ConnectionManager.scala:72)
at 
org.apache.spark.network.nio.ConnectionManager$$anonfun$removeConnection$3.apply(ConnectionManager.scala:533)
at 
org.apache.spark.network.nio.ConnectionManager$$anonfun$removeConnection$3.apply(ConnectionManager.scala:531)
at scala.collection.immutable.List.foreach(List.scala:318)at 
org.apache.spark.network.nio.ConnectionManager.removeConnection(ConnectionManager.scala:531)
at 
org.apache.spark.network.nio.ConnectionManager$$anonfun$addListeners$3.apply(ConnectionManager.scala:510)
at 
org.apache.spark.network.nio.ConnectionManager$$anonfun$addListeners$3.apply(ConnectionManager.scala:510)
at 
org.apache.spark.network.nio.Connection.callOnCloseCallback(Connection.scala:162)
at org.apache.spark.network.nio.Connection.close(Connection.scala:130)  
  at 
org.apache.spark.network.nio.ConnectionManager$$anonfun$stop$1.apply(ConnectionManager.scala:1000)
at 
org.apache.spark.network.nio.ConnectionManager$$anonfun$stop$1.apply(ConnectionManager.scala:1000)
at 
scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:107)
at 
scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:107)
at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)  
  at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)at 
scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:107)at 
org.apache.spark.network.nio.ConnectionManager.stop(ConnectionManager.scala:1000)
at 
org.apache.spark.network.nio.NioBlockTransferService.close(NioBlockTransferService.scala:78)
at org.apache.spark.storage.BlockManager.stop(BlockManager.scala:1228)  
  at org.apache.spark.SparkEnv.stop(SparkEnv.scala:100)at 
org.apache.spark.executor.Executor.stop(Executor.scala:144)at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:113)
at 
org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:177)
at 
org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:126)
at 
org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:197)
at 
org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:125)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at 
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)   
 at 
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)   
 at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)  
  at 
org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)at 
org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:92)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)at 
akka.actor.ActorCell.invoke(ActorCell.scala:487)at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)at 
akka.dispatch.Mailbox.run(Mailbox.scala:220)at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
   at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
PART two:15/09/16 06:14:36 INFO nio.ConnectionManager: Removing 
SendingConnection to ConnectionManagerId()15/09/16 

Re: spark performance - executor computing time

2015-09-16 Thread Robin East
Is this repeatable? Do you always get one or two executors that are 6 times as 
slow? It could be that some of your tasks have more work to do (maybe you are 
filtering some records out? If it’s always one particular worker node is there 
something about the machine configuration (e.g. CPU speed) that means the 
processing takes longer.

—
Robin East
Spark GraphX in Action Michael S Malak and Robin East
http://www.manning.com/books/spark-graphx-in-action 


> On 15 Sep 2015, at 12:35, patcharee  wrote:
> 
> Hi,
> 
> I was running a job (on Spark 1.5 + Yarn + java 8). In a stage that lookup 
> (org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:873)) 
> there was an executor that took the executor computing time > 6 times of 
> median. This executor had almost the same shuffle read size and low gc time 
> as others.
> 
> What can impact the executor computing time? Any suggestions what parameters 
> I should monitor/configure?
> 
> BR,
> Patcharee
> 
> 
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 



Spark Cassandra Filtering

2015-09-16 Thread Ashish Soni
Hi ,

How can i pass an dynamic value inside below function to filter instead of
hardcoded
if have an existing RDD and i would like to use data in that for filter so
instead of doing .where("name=?","Anna") i want to do
.where("name=?",someobject.value)

Please help

JavaRDD rdd3 = javaFunctions(sc).cassandraTable("test", "people",
mapRowTo(Person.class))
.where("name=?", "Anna").map(new Function()
{
@Override
public String call(Person person) throws Exception {
return person.toString();
}
});


Re: Spark wastes a lot of space (tmp data) for iterative jobs

2015-09-16 Thread Alexis Gillain
Ok just realized you don't use mllib pagerank.

You must use checkpointing as pointed in the databricks url.

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicGraphCheckpointer.scala

Due to lineage Spark doesn't erase the shuffle file.
When you do :
contrib = link.join(rank)
rank = contrib.map(...)
contrib=link.join(rank)
I think Spark doesn't erase the shuffle files of the first join because
they are still part of the lineage of the second contrib through rank.

Have a look at this : https://www.youtube.com/watch?v=1MWxIUoIYFA

2015-09-16 22:16 GMT+08:00 Ali Hadian :

> Thanks for your response, Alexis.
>
> I have seen this page, but its suggested solutions do not work and the tmp
> space still grows linearly after unpersisting RDDs and calling
> System.gc() in each iteration.
>
> I think it might be due to one of the following reasons:
>
> 1. System.gc() does not directly invoke the garbage collector, but it just
> requests JVM to run GC, and JVM usually postpones it until memory is
> almost filled. However, since we are just running out of hard-disk space
> (not memory space), GC does not run; therefore the finalize() methods for
> the intermediate RDDs are not triggered.
>
>
> 2. System.gc() is only executed on the driver, but not on the workers (Is
> it how it works??!!)
>
> Any suggestions?
>
> Kind regards
> Ali Hadian
>
> -Original Message-
> From: Alexis Gillain 
> To: Ali Hadian 
> Cc: spark users 
> Date: Wed, 16 Sep 2015 12:05:35 +0800
> Subject: Re: Spark wastes a lot of space (tmp data) for iterative jobs
>
> You can try system.gc() considering that checkpointing is enabled by
> default in graphx :
>
>
> https://forums.databricks.com/questions/277/how-do-i-avoid-the-no-space-left-on-device-error.html
>
> 2015-09-15 22:42 GMT+08:00 Ali Hadian < had...@comp.iust.ac.ir>:
>
>> Hi!
>> We are executing the PageRank example from the Spark java examples
>> package on a very large input graph. The code is available here
>> .
>> (Spark's github repo).
>> During the execution, the framework generates huge amount of intermediate
>> data per each iteration (i.e. the *contribs* RDD). The intermediate data
>> is temporary, but Spark does not clear the intermediate data of previous
>> iterations. That is to say, if we are in the middle of 20th iteration, all
>> of the temporary data of all previous iterations (iteration 0 to 19) are
>> still kept in the *tmp*  directory. As a result, the tmp directory grows
>> linearly.
>> It seems rational to keep the data from only the previous iteration,
>> because if the current iteration fails, the job can be continued using the
>> intermediate data from the previous iteration. Anyways, why does it keep
>> the intermediate data for ALL previous iterations???
>> How can we enforce Spark to clear these intermediate data * during* the
>> execution of job?
>>
>> Kind regards,
>> Ali hadian
>>
>>
>
>
>
> --
> Alexis GILLAIN
>
>


-- 
Alexis GILLAIN


Re: mappartition's FlatMapFunction help

2015-09-16 Thread Ankur Srivastava
Good to know it worked for you.

CC'ed user group so that the thread reaches a closure.

Thanks
Ankur

On Wed, Sep 16, 2015 at 6:13 AM, Thiago Diniz 
wrote:

> Nailed it.
>
> Thank you!
>
> 2015-09-15 14:39 GMT-03:00 Ankur Srivastava :
>
>> Hi,
>>
>> The signatures are perfect. I also tried same code on eclipse and for
>> some reason eclipse did not import java.util.Iterator. Once I imported it,
>> it is fine. Might be same issue with NetBeans.
>>
>> Thanks
>> Ankur
>>
>> On Tue, Sep 15, 2015 at 10:11 AM, dinizthiagobr 
>> wrote:
>>
>>> Can't get this one to work and I have no idea why.
>>>
>>> JavaPairRDD> lel = gen.groupByKey();
>>>
>>> JavaRDD partitions = lel.mapPartitions(
>>> new FlatMapFunction>>,
>>> String> () {
>>>   public Iterable call(Iterator>> IterableString>>> it) {
>>>//return whatever
>>>}
>>> });
>>>
>>> Netbeans complains about mapPartitions not being applicable for the used
>>> arguments.
>>>
>>> Any idea what's wrong?
>>>
>>> Thank you.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/mappartition-s-FlatMapFunction-help-tp24702.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Incorrect results with spark sql

2015-09-16 Thread gpatcham
Hi,

I'm trying to query on hive view using spark and it is giving different
rowcounts when compared to hive.

here is the view definition in hive

create view test_hive_view as
select col1 , col2 from tab1
left join tab2 on tab1.col1 = tab2.col1
left join tab3 on tab1.col1 = tab3.col1
where col1 in ('Processed')
union
select col1 , col2 from tab4
left join tab5 on tab4.col1 = tab5.col1
left join tab6 on tab4.col4 = tab6.col1
where col1 in ('Processed')


I'm doing select count(1) from test_hive_view  where col1 = :some_value.
spark and hive are giving me different results.

Spark is giving different results when I query on view only.

Can anyone point me where the issue is . I'm using spark 1.3 , hive 1.1 and
hadoop 2.6

Thanks
Giri



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Incorrect-results-with-spark-sql-tp24716.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark SQL 'create table' options

2015-09-16 Thread Dan LaBar
The SQL programming guide provides an example

for creating a table using Spark SQL:

CREATE TEMPORARY TABLE parquetTableUSING org.apache.spark.sql.parquet
OPTIONS (
  path "examples/src/main/resources/people.parquet"
)
SELECT * FROM parquetTable

However, I don’t see where the list of options are fully documented. The JDBC
section

lists a few specific options beyond just path. Are there other options for
different types of “storage” methods?

For instance, if I am saving a table to S3, how do I specify to use server
side encryption? How do I overwrite existing files? Can I set the number of
partitions?

Thanks,
Dan
​


problem with a very simple word count program

2015-09-16 Thread huajun
Hi.
I have a problem with this very simple word count rogram. The program works
fine for
thousands of similar files in the dataset but is very slow for these first
28 or so.
The files are about 50 to 100 MB each
and the program process other similar 28 files in about 30sec. These first
28 files, however, take 30min.
This should not be a problem with the data in these files, as if I combine
all the files into one
bigger file, it will be processed in about 30sec.

I am running spark in local mode (with > 100GB memory) and it is just use
100% CPU (one core) most of time (for this troubled case) and no network
traffic is involved.

Any obvious (or non-obvious) errors?

def process(file : String) : RDD[(String, Int)] = {
  val rdd = sc.textFile(file)
  val words = rdd.flatMap( x=> x.split(" ") );

  words.map( x=> (x,1)).reduceByKey( (x,y) => (x+y) )
}

val file = "medline15n0001.xml"
var keep = process(file)

for (i <- 2 to 28) {
  val file = if (i < 10) "medline15n000" + i + ".xml" 
 else "medline15n00" + i + ".xml"
  
  val result = process(file)
  keep = result.union(keep);
}
keep = keep.reduceByKey( (x,y) => (x+y) )
keep.saveAsTextFile("results")

Thanks.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/problem-with-a-very-simple-word-count-program-tp24715.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: problem with a very simple word count program

2015-09-16 Thread Shawn Carroll
Your loop is deciding the files to process and then you are unioning the
data on each iteration. If you change it to load all the files at the same
time and let spark sort it out you should be much faster.

Untested:

 val rdd = sc.textFile("medline15n00*.xml")
 val words = rdd.flatMap( x=> x.split(" ") );
 words.map( x=> (x,1)).reduceByKey( (x,y) => (x+y) )
 words.saveAsTextFile("results")



shawn.c.carr...@gmail.com
Software Engineer
Soccer Referee

On Wed, Sep 16, 2015 at 2:07 PM, huajun  wrote:

> Hi.
> I have a problem with this very simple word count rogram. The program works
> fine for
> thousands of similar files in the dataset but is very slow for these first
> 28 or so.
> The files are about 50 to 100 MB each
> and the program process other similar 28 files in about 30sec. These first
> 28 files, however, take 30min.
> This should not be a problem with the data in these files, as if I combine
> all the files into one
> bigger file, it will be processed in about 30sec.
>
> I am running spark in local mode (with > 100GB memory) and it is just use
> 100% CPU (one core) most of time (for this troubled case) and no network
> traffic is involved.
>
> Any obvious (or non-obvious) errors?
>
> def process(file : String) : RDD[(String, Int)] = {
>   val rdd = sc.textFile(file)
>   val words = rdd.flatMap( x=> x.split(" ") );
>
>   words.map( x=> (x,1)).reduceByKey( (x,y) => (x+y) )
> }
>
> val file = "medline15n0001.xml"
> var keep = process(file)
>
> for (i <- 2 to 28) {
>   val file = if (i < 10) "medline15n000" + i + ".xml"
>  else "medline15n00" + i + ".xml"
>
>   val result = process(file)
>   keep = result.union(keep);
> }
> keep = keep.reduceByKey( (x,y) => (x+y) )
> keep.saveAsTextFile("results")
>
> Thanks.
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/problem-with-a-very-simple-word-count-program-tp24715.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


SparkR - calling as.vector() with rdd dataframe causes error

2015-09-16 Thread ekraffmiller
Hi,
I have a library of clustering algorithms that I'm trying to run in the
SparkR interactive shell. (I am working on a proof of concept for a document
classification tool.) Each algorithm takes a term document matrix in the
form of a dataframe.  When I pass the method a local dataframe, the
clustering algorithm works correctly, but when I pass it a spark rdd, it
gives an error trying to coerce the data into a vector.  Here is the code,
that I'm calling within SparkR:

# get matrix from a file
file <-
"/Applications/spark-1.5.0-bin-hadoop2.6/examples/src/main/resources/matrix.csv"

#read it into variable
 raw_data <- read.csv(file,sep=',',header=FALSE)

#convert to a local dataframe
localDF = data.frame(raw_data)

# create the rdd
rdd  <- createDataFrame(sqlContext,localDF)

#call the algorithm with the localDF - this works
result <- galileo(localDF, model='hclust',dist='euclidean',link='ward',K=5)

#call with the rdd - this produces error
result <- galileo(rdd, model='hclust',dist='euclidean',link='ward',K=5)

Error in as.vector(data) : 
  no method for coercing this S4 class to a vector


I get the same error if I try to directly call as.vector(rdd) as well.

Is there a reason why this works for localDF and not rdd?  Should I be doing
something else to coerce the object into a vector?

Thanks,
Ellen



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-calling-as-vector-with-rdd-dataframe-causes-error-tp24717.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: why when I double the number of workers, ml LogisticRegression fitting time is not reduced in half?

2015-09-16 Thread Robineast
In principle yes, however it depends on whether your application is actually
utilising the extra resources. Use the Task metrics available in the
application UI (usually available from the driver machine on port 4040) to
find out.

--
Robin East
Spark GraphX in Action - Michael S Malak and Robin East
Manning Publications
http://www.manning.com/books/spark-graphx-in-action




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/why-when-I-double-the-number-of-workers-ml-LogisticRegression-fitting-time-is-not-reduced-in-half-tp24714p24718.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark streaming on spark-standalone/ yarn inside Spring XD

2015-09-16 Thread Tathagata Das
I would check the following.

See if your setup (spark master, etc.) is correct for running simple
applications in Yarn/Standalone, like the SparkPi example.
If that does not work then the problem is elsewhere. If that works, then
the problem could be in the Spring XD.

On Wed, Sep 16, 2015 at 5:01 AM, Vignesh Radhakrishnan  wrote:

> Hi,  I am trying to run a Spark processor on Spring XD for streaming
> operation.
>
>
>
> The spark processor module on Spring XD works when spark is pointing to
> local. The processor fails to run when we point spark to spark standalone
> (running on the same machine) or yarn-client.  *Is it possible to run
> spark processor on spark standalone or yarn inside spring XD or is spark
> local the only option here ?*
>
>
>
> The processor module is:
>
>
>
> class WordCount extends Processor[String, (String, Int)] {
>
>
>
>   def process(input: ReceiverInputDStream[String]): DStream[(String, Int)]
> = {
>
>   val words = input.flatMap(_.split(" "))
>
>   val pairs = words.map(word => (word, 1))
>
>   val wordCounts = pairs.reduceByKey(_ + _)
>
>   wordCounts
>
>   }
>
>
>
>   @SparkConfig
>
>   def properties : Properties = {
>
> val props = new Properties()
>
> // Any specific Spark configuration properties would go here.
>
> // These properties always get the highest precedence
>
> //props.setProperty("spark.master", "spark://a.b.c.d:7077")
>
> *props.setProperty("spark.master", "spark://abcd.hadoop.ambari:7077")*
>
> props
>
>   }
>
>
>
> }
>
>
>
> Below is the error log that I get:
>
>
>
> // Error Log
>
> 
>
> 2015-09-16T14:28:48+0530 1.2.0.RELEASE INFO DeploymentsPathChildrenCache-0
> container.DeploymentListener - Deploying module 'log' for stream
> 'spark-streaming-word-count'
>
> 2015-09-16T14:28:48+0530 1.2.0.RELEASE INFO DeploymentsPathChildrenCache-0
> container.DeploymentListener - Deploying module [ModuleDescriptor@6dbc4f81
> moduleName = 'log', moduleLabel = 'log', group =
> 'spark-streaming-word-count', sourceChannelName = [null], sinkChannelName =
> [null], index = 2, type = sink, parameters = map[[empty]], children =
> list[[empty]]]
>
> 2015-09-16T14:28:48+0530 1.2.0.RELEASE INFO DeploymentsPathChildrenCache-0
> container.DeploymentListener - Path cache event:
> path=/deployments/modules/allocated/4ff3ba84-e6ca-47dd-894f-aa92bdbb3e06/spark-streaming-word-count.processor.processor.1,
> type=CHILD_ADDED
>
> 2015-09-16T14:28:48+0530 1.2.0.RELEASE INFO DeploymentsPathChildrenCache-0
> container.DeploymentListener - Deploying module 'processor' for stream
> 'spark-streaming-word-count'
>
> 2015-09-16T14:28:48+0530 1.2.0.RELEASE INFO DeploymentsPathChildrenCache-0
> container.DeploymentListener - Deploying module [ModuleDescriptor@5e16dafb
> moduleName = 'scala-word-count', moduleLabel = 'processor', group =
> 'spark-streaming-word-count', sourceChannelName = [null], sinkChannelName =
> [null], index = 1, type = processor, parameters = map[[empty]], children =
> list[[empty]]]
>
> 2015-09-16T14:28:49+0530 1.2.0.RELEASE WARN DeploymentsPathChildrenCache-0
> util.NativeCodeLoader - Unable to load native-hadoop library for your
> platform... using builtin-java classes where applicable
>
> 2015-09-16T14:28:49+0530 1.2.0.RELEASE WARN
> sparkDriver-akka.actor.default-dispatcher-3
> remote.ReliableDeliverySupervisor - Association with remote system
> [akka.tcp://sparkMaster@abcd.hadoop.ambari:7077] has failed, address is
> now gated for [5000] ms. Reason is: [Disassociated].
>
> 2015-09-16T14:29:09+0530 1.2.0.RELEASE WARN
> sparkDriver-akka.actor.default-dispatcher-4
> remote.ReliableDeliverySupervisor - Association with remote system
> [akka.tcp://sparkMaster@abcd.hadoop.ambari:7077] has failed, address is
> now gated for [5000] ms. Reason is: [Disassociated].
>
> 2015-09-16T14:29:18+0530 1.2.0.RELEASE INFO DeploymentsPathChildrenCache-0
> container.DeploymentListener - Path cache event:
> path=/deployments/modules/allocated/8d07cdba-557e-458a-9225-b90e5a5778ce/spark-streaming-word-count.source.http.1,
> type=CHILD_ADDED
>
> 2015-09-16T14:29:18+0530 1.2.0.RELEASE INFO DeploymentsPathChildrenCache-0
> container.DeploymentListener - Deploying module 'http' for stream
> 'spark-streaming-word-count'
>
> 2015-09-16T14:29:18+0530 1.2.0.RELEASE INFO DeploymentsPathChildrenCache-0
> container.DeploymentListener - Deploying module [ModuleDescriptor@610e43b0
> moduleName = 'http', moduleLabel = 'http', group =
> 'spark-streaming-word-count', sourceChannelName = [null], sinkChannelName =
> [null], index = 0, type = source, parameters = map[[empty]], children =
> list[[empty]]]
>
> 2015-09-16T14:29:19+0530 1.2.0.RELEASE INFO DeploymentSupervisor-0
> zk.ZKStreamDeploymentHandler - Deployment status for stream
> 'spark-streaming-word-count':
> DeploymentStatus{state=failed,error(s)=Deployment of module
> 

Re: Spark streaming on spark-standalone/ yarn inside Spring XD

2015-09-16 Thread Vignesh Radhakrishnan
Yes, it is TD. I'm able to run word count etc on spark standalone/ yarn when 
it's not integrated with spring xd.
But the same breaks when used as processor on spring. Was trying to get an 
opinion on whether it's doable or it's something that's not supported at the 
moment

On 16 Sep 2015 23:50, Tathagata Das  wrote:
I would check the following.

See if your setup (spark master, etc.) is correct for running simple 
applications in Yarn/Standalone, like the SparkPi example.
If that does not work then the problem is elsewhere. If that works, then the 
problem could be in the Spring XD.

On Wed, Sep 16, 2015 at 5:01 AM, Vignesh Radhakrishnan 
> wrote:
Hi,  I am trying to run a Spark processor on Spring XD for streaming operation.

The spark processor module on Spring XD works when spark is pointing to local. 
The processor fails to run when we point spark to spark standalone (running on 
the same machine) or yarn-client.  Is it possible to run spark processor on 
spark standalone or yarn inside spring XD or is spark local the only option 
here ?

The processor module is:

class WordCount extends Processor[String, (String, Int)] {

  def process(input: ReceiverInputDStream[String]): DStream[(String, Int)] = {
  val words = input.flatMap(_.split(" "))
  val pairs = words.map(word => (word, 1))
  val wordCounts = pairs.reduceByKey(_ + _)
  wordCounts
  }

  @SparkConfig
  def properties : Properties = {
val props = new Properties()
// Any specific Spark configuration properties would go here.
// These properties always get the highest precedence
//props.setProperty("spark.master", "spark://a.b.c.d:7077")
props.setProperty("spark.master", "spark://abcd.hadoop.ambari:7077")
props
  }

}

Below is the error log that I get:

// Error Log

2015-09-16T14:28:48+0530 1.2.0.RELEASE INFO DeploymentsPathChildrenCache-0 
container.DeploymentListener - Deploying module 'log' for stream 
'spark-streaming-word-count'
2015-09-16T14:28:48+0530 1.2.0.RELEASE INFO DeploymentsPathChildrenCache-0 
container.DeploymentListener - Deploying module [ModuleDescriptor@6dbc4f81 
moduleName = 'log', moduleLabel = 'log', group = 'spark-streaming-word-count', 
sourceChannelName = [null], sinkChannelName = [null], index = 2, type = sink, 
parameters = map[[empty]], children = list[[empty]]]
2015-09-16T14:28:48+0530 1.2.0.RELEASE INFO DeploymentsPathChildrenCache-0 
container.DeploymentListener - Path cache event: 
path=/deployments/modules/allocated/4ff3ba84-e6ca-47dd-894f-aa92bdbb3e06/spark-streaming-word-count.processor.processor.1,
 type=CHILD_ADDED
2015-09-16T14:28:48+0530 1.2.0.RELEASE INFO DeploymentsPathChildrenCache-0 
container.DeploymentListener - Deploying module 'processor' for stream 
'spark-streaming-word-count'
2015-09-16T14:28:48+0530 1.2.0.RELEASE INFO DeploymentsPathChildrenCache-0 
container.DeploymentListener - Deploying module [ModuleDescriptor@5e16dafb 
moduleName = 'scala-word-count', moduleLabel = 'processor', group = 
'spark-streaming-word-count', sourceChannelName = [null], sinkChannelName = 
[null], index = 1, type = processor, parameters = map[[empty]], children = 
list[[empty]]]
2015-09-16T14:28:49+0530 1.2.0.RELEASE WARN DeploymentsPathChildrenCache-0 
util.NativeCodeLoader - Unable to load native-hadoop library for your 
platform... using builtin-java classes where applicable
2015-09-16T14:28:49+0530 1.2.0.RELEASE WARN 
sparkDriver-akka.actor.default-dispatcher-3 remote.ReliableDeliverySupervisor - 
Association with remote system [akka.tcp://sparkMaster@abcd.hadoop.ambari:7077] 
has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
2015-09-16T14:29:09+0530 1.2.0.RELEASE WARN 
sparkDriver-akka.actor.default-dispatcher-4 remote.ReliableDeliverySupervisor - 
Association with remote system [akka.tcp://sparkMaster@abcd.hadoop.ambari:7077] 
has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
2015-09-16T14:29:18+0530 1.2.0.RELEASE INFO DeploymentsPathChildrenCache-0 
container.DeploymentListener - Path cache event: 
path=/deployments/modules/allocated/8d07cdba-557e-458a-9225-b90e5a5778ce/spark-streaming-word-count.source.http.1,
 type=CHILD_ADDED
2015-09-16T14:29:18+0530 1.2.0.RELEASE INFO DeploymentsPathChildrenCache-0 
container.DeploymentListener - Deploying module 'http' for stream 
'spark-streaming-word-count'
2015-09-16T14:29:18+0530 1.2.0.RELEASE INFO DeploymentsPathChildrenCache-0 
container.DeploymentListener - Deploying module [ModuleDescriptor@610e43b0 
moduleName = 'http', moduleLabel = 'http', group = 
'spark-streaming-word-count', sourceChannelName = [null], sinkChannelName = 
[null], index = 0, type = source, parameters = map[[empty]], children = 
list[[empty]]]
2015-09-16T14:29:19+0530 1.2.0.RELEASE INFO DeploymentSupervisor-0 
zk.ZKStreamDeploymentHandler - Deployment status for 

unpersist RDD from another thread

2015-09-16 Thread Paul Weiss
Hi,

What is the behavior when calling rdd.unpersist() from a different thread
while another thread is using that rdd.  Below is a simple case for this:

1) create rdd and load data
2) call rdd.cache() to bring data into memory
3) create another thread and pass rdd for a long computation
4) call rdd.unpersist while 3. is still running

Questions:

* Will the computation in 3) finish properly even if unpersist was called
on the rdd while running?
* What happens if a part of the computation fails and the rdd needs to
reconstruct based on DAG lineage, will this still work even though
unpersist has been called?

thanks,
-paul


DataFrame repartition not repartitioning

2015-09-16 Thread Steve Annessa
Hello,

I'm trying to load in an Avro file and write it out as Parquet. I would
like to have enough partitions to properly parallelize on. When I do the
simple load and save I get 1 partition out. I thought I would be able to
use repartition like the following:

val avroFile =
sqlContext.read.format("com.databricks.spark.avro").load(inFile)
avroFile.repartition(10)
avroFile.save(outFile, "parquet")

However, the saved file is still a single partition in the directory.

What am I missing?

Thanks,

-- Steve


why when I double the number of workers, ml LogisticRegression fitting time is not reduced in half?

2015-09-16 Thread julia
Hi all,

I run the following LogisticRegression code (ml classification class) with
14 and 28 workers respectively (2 cores/worker, 12G/worker), and the fitting
times are almost the same: 11.25 vs 10.39 minutes for 14 & 28 workers.
Shouldn't it reduce speed in half?
DataFrame 'train' has 3,654,390 rows and 175 columns.

from pyspark.ml.classification import LogisticRegression
elasticNetParam = 0.5
regParam = 0.00077
lr = LogisticRegression(featuresCol = "features", labelCol = "label",
elasticNetParam = elasticNetParam, regParam = regParam, maxIter = 1000)
pipeline = Pipeline(stages=[lr])
model = lr.fit(train)

I've also tried CrossValidator class with 10-fold and a grid of 100 values
for regParam, a process that takes much longer, and the reduction in time
with 28 instead of 14 workers is on the same magnitude (with 28 it takes
0.85 of the time it took for 14).

Thank you very much.
Any insight would be much appreciate it!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/why-when-I-double-the-number-of-workers-ml-LogisticRegression-fitting-time-is-not-reduced-in-half-tp24714.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to update python code in memory

2015-09-16 Thread Davies Liu
Short answer is No.

On Wed, Sep 16, 2015 at 4:06 AM, Margus Roo  wrote:
> Hi
>
> In example I submited python code to cluster:
> in/spark-submit --master spark://nn1:7077 SocketListen.py
> Now I discovered that I have to change something in SocketListen.py.
> One way is stop older work and submit new one.
> Is there way to change code in workers machines so that there no need to
> submit new code?
>
> --
> Margus (margusja) Roo
> http://margus.roo.ee
> skype: margusja
> +372 51 480
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkR - calling as.vector() with rdd dataframe causes error

2015-09-16 Thread ekraffmiller
Also, just for completeness, matrix.csv contains:
1,2,3
4,5,6
7,8,9



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-calling-as-vector-with-rdd-dataframe-causes-error-tp24717p24719.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Suggested Method for Execution of Periodic Actions

2015-09-16 Thread Bryan Jeffrey
Hello.

I have a streaming job that is processing data.  I process a stream of
events, taking actions when I see anomalous events.  I also keep a count
events observed using updateStateByKey to maintain a map of type to count.
I would like to periodically (every 5 minutes) write the results of my
counts to a database.  Is there a built in mechanism or established
pattern to execute periodic jobs in spark streaming?

Regards,

Bryan Jeffrey


Re: DataFrame repartition not repartitioning

2015-09-16 Thread Silvio Fiorito
You just need to assign it to a new variable:

val avroFile = sqlContext.read.format("com.databricks.spark.avro").load(inFile)
val repart = avroFile.repartition(10)
repart.save(outFile, "parquet")

From: Steve Annessa
Date: Wednesday, September 16, 2015 at 2:08 PM
To: "user@spark.apache.org"
Subject: DataFrame repartition not repartitioning

Hello,

I'm trying to load in an Avro file and write it out as Parquet. I would like to 
have enough partitions to properly parallelize on. When I do the simple load 
and save I get 1 partition out. I thought I would be able to use repartition 
like the following:

val avroFile = sqlContext.read.format("com.databricks.spark.avro").load(inFile)
avroFile.repartition(10)
avroFile.save(outFile, "parquet")

However, the saved file is still a single partition in the directory.

What am I missing?

Thanks,

-- Steve


Re: unpersist RDD from another thread

2015-09-16 Thread Paul Weiss
So in order to not incur any performance issues I should really wait for
all usage of the rdd to complete before calling unpersist, correct?

On Wed, Sep 16, 2015 at 4:08 PM, Tathagata Das 
wrote:

> unpredictable. I think it will be safe (as in nothing should fail), but
> the performance will be unpredictable (some partition may use cache, some
> may not be able to use the cache).
>
> On Wed, Sep 16, 2015 at 1:06 PM, Paul Weiss 
> wrote:
>
>> Hi,
>>
>> What is the behavior when calling rdd.unpersist() from a different thread
>> while another thread is using that rdd.  Below is a simple case for this:
>>
>> 1) create rdd and load data
>> 2) call rdd.cache() to bring data into memory
>> 3) create another thread and pass rdd for a long computation
>> 4) call rdd.unpersist while 3. is still running
>>
>> Questions:
>>
>> * Will the computation in 3) finish properly even if unpersist was called
>> on the rdd while running?
>> * What happens if a part of the computation fails and the rdd needs to
>> reconstruct based on DAG lineage, will this still work even though
>> unpersist has been called?
>>
>> thanks,
>> -paul
>>
>
>


Re: problem with a very simple word count program

2015-09-16 Thread Alexander Krasheninnikov
Collect all your rdds from single files into List, then call
context.union(context.emptyRdd(), YOUR_LIST); Otherwise, on greater number
of elements to union, you will get stack overflow exception.

On Wed, Sep 16, 2015 at 10:17 PM, Shawn Carroll 
wrote:

> Your loop is deciding the files to process and then you are unioning the
> data on each iteration. If you change it to load all the files at the same
> time and let spark sort it out you should be much faster.
>
> Untested:
>
>  val rdd = sc.textFile("medline15n00*.xml")
>  val words = rdd.flatMap( x=> x.split(" ") );
>  words.map( x=> (x,1)).reduceByKey( (x,y) => (x+y) )
>  words.saveAsTextFile("results")
>
>
>
> shawn.c.carr...@gmail.com
> Software Engineer
> Soccer Referee
>
> On Wed, Sep 16, 2015 at 2:07 PM, huajun  wrote:
>
>> Hi.
>> I have a problem with this very simple word count rogram. The program
>> works
>> fine for
>> thousands of similar files in the dataset but is very slow for these first
>> 28 or so.
>> The files are about 50 to 100 MB each
>> and the program process other similar 28 files in about 30sec. These first
>> 28 files, however, take 30min.
>> This should not be a problem with the data in these files, as if I combine
>> all the files into one
>> bigger file, it will be processed in about 30sec.
>>
>> I am running spark in local mode (with > 100GB memory) and it is just use
>> 100% CPU (one core) most of time (for this troubled case) and no network
>> traffic is involved.
>>
>> Any obvious (or non-obvious) errors?
>>
>> def process(file : String) : RDD[(String, Int)] = {
>>   val rdd = sc.textFile(file)
>>   val words = rdd.flatMap( x=> x.split(" ") );
>>
>>   words.map( x=> (x,1)).reduceByKey( (x,y) => (x+y) )
>> }
>>
>> val file = "medline15n0001.xml"
>> var keep = process(file)
>>
>> for (i <- 2 to 28) {
>>   val file = if (i < 10) "medline15n000" + i + ".xml"
>>  else "medline15n00" + i + ".xml"
>>
>>   val result = process(file)
>>   keep = result.union(keep);
>> }
>> keep = keep.reduceByKey( (x,y) => (x+y) )
>> keep.saveAsTextFile("results")
>>
>> Thanks.
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/problem-with-a-very-simple-word-count-program-tp24715.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Suggested Method for Execution of Periodic Actions

2015-09-16 Thread Ted Yu
bq. and check if 5 minutes have passed

What if the duration for the window is longer than 5 minutes ?

Cheers

On Wed, Sep 16, 2015 at 1:25 PM, Adrian Tanase  wrote:

> If you don't need the counts in betweem the DB writes, you could simply
> use a 5 min window for the updateStateByKey and use foreachRdd on the
> resulting DStream.
>
> Even simpler, you could use reduceByKeyAndWindow directly.
>
> Lastly, you could keep a variable on the driver and check if 5 minutes
> have passed
> in foreachRdd on the original DStream, even if the batch duration is
> shorter.
>
> Also, remember to cleanup the state in your updateStateByKey function or
> it will grow unbounded. I still believe one of the builtin ByKey functions
> are a simpler strategy.
>
> hope this helps.
>
> -adrian
>
> Sent from my iPhone
>
> > On 16 Sep 2015, at 22:33, Bryan Jeffrey  wrote:
> >
> > Hello.
> >
> > I have a streaming job that is processing data.  I process a stream of
> events, taking actions when I see anomalous events.  I also keep a count
> events observed using updateStateByKey to maintain a map of type to count.
> I would like to periodically (every 5 minutes) write the results of my
> counts to a database.  Is there a built in mechanism or established pattern
> to execute periodic jobs in spark streaming?
> >
> > Regards,
> >
> > Bryan Jeffrey
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: unpersist RDD from another thread

2015-09-16 Thread Tathagata Das
Yes.

On Wed, Sep 16, 2015 at 1:12 PM, Paul Weiss  wrote:

> So in order to not incur any performance issues I should really wait for
> all usage of the rdd to complete before calling unpersist, correct?
>
> On Wed, Sep 16, 2015 at 4:08 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> unpredictable. I think it will be safe (as in nothing should fail), but
>> the performance will be unpredictable (some partition may use cache, some
>> may not be able to use the cache).
>>
>> On Wed, Sep 16, 2015 at 1:06 PM, Paul Weiss 
>> wrote:
>>
>>> Hi,
>>>
>>> What is the behavior when calling rdd.unpersist() from a different
>>> thread while another thread is using that rdd.  Below is a simple case for
>>> this:
>>>
>>> 1) create rdd and load data
>>> 2) call rdd.cache() to bring data into memory
>>> 3) create another thread and pass rdd for a long computation
>>> 4) call rdd.unpersist while 3. is still running
>>>
>>> Questions:
>>>
>>> * Will the computation in 3) finish properly even if unpersist was
>>> called on the rdd while running?
>>> * What happens if a part of the computation fails and the rdd needs to
>>> reconstruct based on DAG lineage, will this still work even though
>>> unpersist has been called?
>>>
>>> thanks,
>>> -paul
>>>
>>
>>
>


Issue with writing Dataframe to Vertica through JDBC

2015-09-16 Thread Divya Ravichandran
> I get the following stack trace with this issue, anybody has any clue? I
am running spark on yarn in cluster mode.
>
>
>
>
>
> 15/09/16 16:30:28 INFO spark.SparkContext: Starting job: jdbc at
AssetMetadataToVertica.java:114
>
> 15/09/16 16:30:28 INFO scheduler.DAGScheduler: Got job 0 (jdbc at
AssetMetadataToVertica.java:114) with 1 output partitions
>
> 15/09/16 16:30:28 INFO scheduler.DAGScheduler: Final stage: ResultStage
0(jdbc at AssetMetadataToVertica.java:114)
>
> 15/09/16 16:30:28 INFO scheduler.DAGScheduler: Parents of final stage:
List()
>
> 15/09/16 16:30:28 INFO scheduler.DAGScheduler: Missing parents: List()
>
> 15/09/16 16:30:28 INFO scheduler.DAGScheduler: Submitting ResultStage 0
(MapPartitionsRDD[4] at jdbc at AssetMetadataToVertica.java:114), which has
no missing parents
>
> 15/09/16 16:30:28 INFO storage.MemoryStore: ensureFreeSpace(9736) called
with curMem=294749, maxMem=515553361
>
> 15/09/16 16:30:28 INFO storage.MemoryStore: Block broadcast_1 stored as
values in memory (estimated size 9.5 KB, free 491.4 MB)
>
> 15/09/16 16:30:28 INFO storage.MemoryStore: ensureFreeSpace(4926) called
with curMem=304485, maxMem=515553361
>
> 15/09/16 16:30:28 INFO storage.MemoryStore: Block broadcast_1_piece0
stored as bytes in memory (estimated size 4.8 KB, free 491.4 MB)
>
> 15/09/16 16:30:28 INFO storage.BlockManagerInfo: Added broadcast_1_piece0
in memory on 10.140.104.95:39784 (size: 4.8 KB, free: 491.6 MB)
>
> 15/09/16 16:30:28 INFO spark.SparkContext: Created broadcast 1 from
broadcast at DAGScheduler.scala:861
>
> 15/09/16 16:30:28 INFO scheduler.DAGScheduler: Submitting 1 missing tasks
from ResultStage 0 (MapPartitionsRDD[4] at jdbc at
AssetMetadataToVertica.java:114)
>
> 15/09/16 16:30:28 INFO cluster.YarnClusterScheduler: Adding task set 0.0
with 1 tasks
>
> 15/09/16 16:30:28 INFO scheduler.TaskSetManager: Starting task 0.0 in
stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 2249 bytes)
>
> 15/09/16 16:30:29 WARN scheduler.TaskSetManager: Lost task 0.0 in stage
0.0 (TID 0, localhost): java.lang.ClassCastException:
org.apache.spark.scheduler.ResultTask cannot be cast to
org.apache.spark.scheduler.Task
>
> at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
>
> at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
> at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
> at java.lang.Thread.run(Thread.java:744)
>
>
>
> 15/09/16 16:30:29 INFO scheduler.TaskSetManager: Starting task 0.1 in
stage 0.0 (TID 1, localhost, PROCESS_LOCAL, 2249 bytes)
>
> 15/09/16 16:30:30 INFO scheduler.TaskSetManager: Lost task 0.1 in stage
0.0 (TID 1) on executor localhost: java.lang.ClassCastException
(org.apache.spark.scheduler.ResultTask cannot be cast to
org.apache.spark.scheduler.Task) [duplicate 1]
>
> 15/09/16 16:30:30 INFO scheduler.TaskSetManager: Starting task 0.2 in
stage 0.0 (TID 2, localhost, PROCESS_LOCAL, 2249 bytes)
>
> 15/09/16 16:30:30 INFO scheduler.TaskSetManager: Lost task 0.2 in stage
0.0 (TID 2) on executor localhost: java.lang.ClassCastException
(org.apache.spark.scheduler.ResultTask cannot be cast to
org.apache.spark.scheduler.Task) [duplicate 2]
>
> 15/09/16 16:30:30 INFO scheduler.TaskSetManager: Starting task 0.3 in
stage 0.0 (TID 3, localhost, PROCESS_LOCAL, 2249 bytes)
>
> 15/09/16 16:30:30 INFO scheduler.TaskSetManager: Lost task 0.3 in stage
0.0 (TID 3) on executor localhost: java.lang.ClassCastException
(org.apache.spark.scheduler.ResultTask cannot be cast to
org.apache.spark.scheduler.Task) [duplicate 3]
>
> 15/09/16 16:30:30 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0
failed 4 times; aborting job
>
> 15/09/16 16:30:30 INFO cluster.YarnClusterScheduler: Removed TaskSet 0.0,
whose tasks have all completed, from pool
>
> 15/09/16 16:30:30 INFO cluster.YarnClusterScheduler: Cancelling stage 0
>
> 15/09/16 16:30:30 INFO scheduler.DAGScheduler: ResultStage 0 (jdbc at
AssetMetadataToVertica.java:114) failed in 1.901 s
>
> 15/09/16 16:30:30 INFO scheduler.DAGScheduler: Job 0 failed: jdbc at
AssetMetadataToVertica.java:114, took 2.058078 s
>
> 15/09/16 16:30:30 ERROR yarn.ApplicationMaster: User class threw
exception: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task
0.3 in stage 0.0 (TID 3, localhost): java.lang.ClassCastException:
org.apache.spark.scheduler.ResultTask cannot be cast to
org.apache.spark.scheduler.Task
>
> at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
>
> at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
> at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
> at java.lang.Thread.run(Thread.java:744)
>
>
>
> Driver stacktrace:
>
> org.apache.spark.SparkException: Job aborted due to stage 

Re: Suggested Method for Execution of Periodic Actions

2015-09-16 Thread Adrian Tanase
If you don't need the counts in betweem the DB writes, you could simply use a 5 
min window for the updateStateByKey and use foreachRdd on the resulting DStream.

Even simpler, you could use reduceByKeyAndWindow directly.

Lastly, you could keep a variable on the driver and check if 5 minutes have 
passed
in foreachRdd on the original DStream, even if the batch duration is shorter.

Also, remember to cleanup the state in your updateStateByKey function or it 
will grow unbounded. I still believe one of the builtin ByKey functions are a 
simpler strategy.

hope this helps.

-adrian

Sent from my iPhone

> On 16 Sep 2015, at 22:33, Bryan Jeffrey  wrote:
> 
> Hello.
> 
> I have a streaming job that is processing data.  I process a stream of 
> events, taking actions when I see anomalous events.  I also keep a count 
> events observed using updateStateByKey to maintain a map of type to count.  I 
> would like to periodically (every 5 minutes) write the results of my counts 
> to a database.  Is there a built in mechanism or established pattern to 
> execute periodic jobs in spark streaming? 
> 
> Regards,
> 
> Bryan Jeffrey

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark streaming on spark-standalone/ yarn inside Spring XD

2015-09-16 Thread Tathagata Das
I am not at all familiar with how SpringXD works so hard to say.

On Wed, Sep 16, 2015 at 12:12 PM, Vignesh Radhakrishnan <
vignes...@altiux.com> wrote:

> Yes, it is TD. I'm able to run word count etc on spark standalone/ yarn
> when it's not integrated with spring xd.
> But the same breaks when used as processor on spring. Was trying to get an
> opinion on whether it's doable or it's something that's not supported at
> the moment
> On 16 Sep 2015 23:50, Tathagata Das  wrote:
> I would check the following.
>
> See if your setup (spark master, etc.) is correct for running simple
> applications in Yarn/Standalone, like the SparkPi example.
> If that does not work then the problem is elsewhere. If that works, then
> the problem could be in the Spring XD.
>
> On Wed, Sep 16, 2015 at 5:01 AM, Vignesh Radhakrishnan <
> vignes...@altiux.com> wrote:
>
>> Hi,  I am trying to run a Spark processor on Spring XD for streaming
>> operation.
>>
>>
>>
>> The spark processor module on Spring XD works when spark is pointing to
>> local. The processor fails to run when we point spark to spark standalone
>> (running on the same machine) or yarn-client.  *Is it possible to run
>> spark processor on spark standalone or yarn inside spring XD or is spark
>> local the only option here ?*
>>
>>
>>
>> The processor module is:
>>
>>
>>
>> class WordCount extends Processor[String, (String, Int)] {
>>
>>
>>
>>   def process(input: ReceiverInputDStream[String]): DStream[(String,
>> Int)] = {
>>
>>   val words = input.flatMap(_.split(" "))
>>
>>   val pairs = words.map(word => (word, 1))
>>
>>   val wordCounts = pairs.reduceByKey(_ + _)
>>
>>   wordCounts
>>
>>   }
>>
>>
>>
>>   @SparkConfig
>>
>>   def properties : Properties = {
>>
>> val props = new Properties()
>>
>> // Any specific Spark configuration properties would go here.
>>
>> // These properties always get the highest precedence
>>
>> //props.setProperty("spark.master", "spark://a.b.c.d:7077")
>>
>> *props.setProperty("spark.master", "spark://abcd.hadoop.ambari:7077")*
>>
>> props
>>
>>   }
>>
>>
>>
>> }
>>
>>
>>
>> Below is the error log that I get:
>>
>>
>>
>> // Error Log
>>
>> 
>>
>> 2015-09-16T14:28:48+0530 1.2.0.RELEASE INFO
>> DeploymentsPathChildrenCache-0 container.DeploymentListener - Deploying
>> module 'log' for stream 'spark-streaming-word-count'
>>
>> 2015-09-16T14:28:48+0530 1.2.0.RELEASE INFO
>> DeploymentsPathChildrenCache-0 container.DeploymentListener - Deploying
>> module [ModuleDescriptor@6dbc4f81 moduleName = 'log', moduleLabel =
>> 'log', group = 'spark-streaming-word-count', sourceChannelName = [null],
>> sinkChannelName = [null], index = 2, type = sink, parameters =
>> map[[empty]], children = list[[empty]]]
>>
>> 2015-09-16T14:28:48+0530 1.2.0.RELEASE INFO
>> DeploymentsPathChildrenCache-0 container.DeploymentListener - Path cache
>> event:
>> path=/deployments/modules/allocated/4ff3ba84-e6ca-47dd-894f-aa92bdbb3e06/spark-streaming-word-count.processor.processor.1,
>> type=CHILD_ADDED
>>
>> 2015-09-16T14:28:48+0530 1.2.0.RELEASE INFO
>> DeploymentsPathChildrenCache-0 container.DeploymentListener - Deploying
>> module 'processor' for stream 'spark-streaming-word-count'
>>
>> 2015-09-16T14:28:48+0530 1.2.0.RELEASE INFO
>> DeploymentsPathChildrenCache-0 container.DeploymentListener - Deploying
>> module [ModuleDescriptor@5e16dafb moduleName = 'scala-word-count',
>> moduleLabel = 'processor', group = 'spark-streaming-word-count',
>> sourceChannelName = [null], sinkChannelName = [null], index = 1, type =
>> processor, parameters = map[[empty]], children = list[[empty]]]
>>
>> 2015-09-16T14:28:49+0530 1.2.0.RELEASE WARN
>> DeploymentsPathChildrenCache-0 util.NativeCodeLoader - Unable to load
>> native-hadoop library for your platform... using builtin-java classes where
>> applicable
>>
>> 2015-09-16T14:28:49+0530 1.2.0.RELEASE WARN
>> sparkDriver-akka.actor.default-dispatcher-3
>> remote.ReliableDeliverySupervisor - Association with remote system
>> [akka.tcp://sparkMaster@abcd.hadoop.ambari:7077] has failed, address is
>> now gated for [5000] ms. Reason is: [Disassociated].
>>
>> 2015-09-16T14:29:09+0530 1.2.0.RELEASE WARN
>> sparkDriver-akka.actor.default-dispatcher-4
>> remote.ReliableDeliverySupervisor - Association with remote system
>> [akka.tcp://sparkMaster@abcd.hadoop.ambari:7077] has failed, address is
>> now gated for [5000] ms. Reason is: [Disassociated].
>>
>> 2015-09-16T14:29:18+0530 1.2.0.RELEASE INFO
>> DeploymentsPathChildrenCache-0 container.DeploymentListener - Path cache
>> event:
>> path=/deployments/modules/allocated/8d07cdba-557e-458a-9225-b90e5a5778ce/spark-streaming-word-count.source.http.1,
>> type=CHILD_ADDED
>>
>> 2015-09-16T14:29:18+0530 1.2.0.RELEASE INFO
>> DeploymentsPathChildrenCache-0 container.DeploymentListener - Deploying
>> module 'http' for stream 'spark-streaming-word-count'
>>
>> 

Re: Suggested Method for Execution of Periodic Actions

2015-09-16 Thread Adrian Tanase
The window can be larger, the batch/slide interval has to be smaller (assuming 
every 5-10 secs?).
You have a separate parameter on most default functions and you can override it 
as long as it's a multiple of streaming context batch interval.

Sent from my iPhone

On 16 Sep 2015, at 23:30, Ted Yu 
> wrote:

bq. and check if 5 minutes have passed

What if the duration for the window is longer than 5 minutes ?

Cheers

On Wed, Sep 16, 2015 at 1:25 PM, Adrian Tanase 
> wrote:
If you don't need the counts in betweem the DB writes, you could simply use a 5 
min window for the updateStateByKey and use foreachRdd on the resulting DStream.

Even simpler, you could use reduceByKeyAndWindow directly.

Lastly, you could keep a variable on the driver and check if 5 minutes have 
passed
in foreachRdd on the original DStream, even if the batch duration is shorter.

Also, remember to cleanup the state in your updateStateByKey function or it 
will grow unbounded. I still believe one of the builtin ByKey functions are a 
simpler strategy.

hope this helps.

-adrian

Sent from my iPhone

> On 16 Sep 2015, at 22:33, Bryan Jeffrey 
> > wrote:
>
> Hello.
>
> I have a streaming job that is processing data.  I process a stream of 
> events, taking actions when I see anomalous events.  I also keep a count 
> events observed using updateStateByKey to maintain a map of type to count.  I 
> would like to periodically (every 5 minutes) write the results of my counts 
> to a database.  Is there a built in mechanism or established pattern to 
> execute periodic jobs in spark streaming?
>
> Regards,
>
> Bryan Jeffrey

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org




parquet error

2015-09-16 Thread Chengi Liu
Hi,
  I have a spark cluster setup and I am trying to write the data to s3 but
in parquet format.
Here is what I am doing

df = sqlContext.load('test', 'com.databricks.spark.avro')

df.saveAsParquetFile("s3n://test")

But I get some nasty error:

Py4JJavaError: An error occurred while calling o29.saveAsParquetFile.

: org.apache.spark.SparkException: Job aborted.

at
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.insert(commands.scala:166)

at
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.run(commands.scala:139)

at
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)

at
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)

at
org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:68)

at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)

at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)

at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)

at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:87)

at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:950)

at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:950)

at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:336)

at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:144)

at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:135)

at org.apache.spark.sql.DataFrame.saveAsParquetFile(DataFrame.scala:1508)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)

at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)

at py4j.Gateway.invoke(Gateway.java:259)

at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)

at py4j.commands.CallCommand.execute(CallCommand.java:79)

at py4j.GatewayConnection.run(GatewayConnection.java:207)

at java.lang.Thread.run(Thread.java:744)

Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 3 in stage 0.0 failed 4 times, most recent failure: Lost task
3.3 in stage 0.0 (TID 12, srv-110-29.720.rdio):
org.apache.spark.SparkException: Task failed while writing rows.

at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org
$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:191)

at
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:160)

at
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:160)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)

at org.apache.spark.scheduler.Task.run(Task.scala:70)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:744)

Caused by: java.lang.VerifyError: Bad type on operand stack

Exception Details:

  Location:


org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.initialize(Ljava/net/URI;Lorg/apache/hadoop/conf/Configuration;)V
@38: invokespecial

  Reason:

Type 'org/jets3t/service/security/AWSCredentials' (current frame,
stack[3]) is not assignable to
'org/jets3t/service/security/ProviderCredentials'

  Current Frame:

bci: @38

flags: { }

locals: { 'org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore',
'java/net/URI', 'org/apache/hadoop/conf/Configuration',
'org/apache/hadoop/fs/s3/S3Credentials',
'org/jets3t/service/security/AWSCredentials' }

stack: { 'org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore',
uninitialized 32, uninitialized 32,
'org/jets3t/service/security/AWSCredentials' }

  Bytecode:

000: bb00 3159 b700 324e 2d2b 2cb6 0034 bb00

010: 3659 2db6 003a 2db6 003d b700 403a 042a

020: bb00 4259 1904 b700 45b5 0047 a700 0b3a

030: 042a 1904 b700 4f2a 2c12 5103 b600 55b5

040: 0057 2a2c 1259 1400 5ab6 005f 1400 1eb8

050: 0065 b500 672a 2c12 6914 001e b600 5f14

060: 001e b800 65b5 006b 2a2c 126d b600 71b5

070: 0073 2abb 0075 592b b600 78b7 007b b500

080: 7db1

  Exception Handler Table:

bci [14, 44] => handler: 47

  Stackmap Table:


full_frame(@47,{Object[#2],Object[#73],Object[#75],Object[#49]},{Object[#47]})

same_frame(@55)



And in s3, I see something like test$folder?

I am not sure, how to fix this?

Any ideas?

Thanks


Stopping criteria for gradient descent

2015-09-16 Thread Nishanth P S
Hi,

I am running LogisticRegressionWithSGD in spark 1.4.1 and it always takes
100 iterations to train (which is the default). It never meets the
convergence criteria, shouldn't the convergence criteria for SGD be based
on difference in logloss or the difference in accuracy on a held out test
set ?

Code for convergence criteria:
https://github.com/apache/spark/blob/c0e9ff1588b4d9313cc6ec6e00e5c7663eb67910/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala#L251

Thanks,
Nishanth


spark-submit chronos issue

2015-09-16 Thread Saurabh Malviya (samalviy)
Hi,

I am using facing strange issue while using chronos, As job is not able to find 
the Main class while invoking spark-submit using chronos.

Issue I identified as "colon" in the task name

Env -Chronos scheduled job on mesos

/tmp/mesos/slaves/20150911-070325-218147008-5050-30275-S4/frameworks/20150911-070325-218147008-5050-30275-0483/executors/ct:144244800:0:hbaseConnTest:/runs/6f9ddfc8-944c-4648-b29e-d4c9182f2292/spark-1.4.1-bin-custom-spark'


/tmp/mesos/slaves/20150911-070325-218147008-5050-30275-S4/frameworks/20150911-070325-218147008-5050-30275-0483/executors/ct:144244800:0:hbaseConnTest:/runs/6f9ddfc8-944c-4648-b29e-d4c9182f2292/spark-1.4.1-bin-custom-spark/bin/spark-submit


Error: Could not find or load main class org.apache.spark.launcher.Main



I identified if I rename above highlighted folder and remove those ":" it works.



Anyone faces similar problem using chronos, Let me know quick workaround or if 
there is another underline issue.





Thanks,

Saurabh



Re: RE: spark sql hook

2015-09-16 Thread r7raul1...@163.com
Example:
select * from test.table chang to  select * from production.table  



r7raul1...@163.com
 
From: Cheng, Hao
Date: 2015-09-17 11:05
To: r7raul1...@163.com; user
Subject: RE: spark sql hook
Catalyst TreeNode is very fundamental API, not sure what kind of hook you need. 
Any concrete example will be more helpful to understand your requirement.
 
Hao
 
From: r7raul1...@163.com [mailto:r7raul1...@163.com] 
Sent: Thursday, September 17, 2015 10:54 AM
To: user
Subject: spark sql hook
 
 
I want to modify some sql treenode before execute. I cau do this by hive hook 
in hive. Does spark support such hook? Any advise?


r7raul1...@163.com


Table is modified by DataFrameWriter

2015-09-16 Thread guoqing0...@yahoo.com.hk
Hi all,
I found the table structure was modified  when use DataFrameWriter.jdbc to save 
the content of DataFrame , 
 
sqlContext.sql("select '2015-09-17',count(1) from 
test").write.jdbc(url,test,properties)

table structure before saving:
app_key text
t_amount bigint(20)

saved:
_c0 text
_c1 bigint(20)

Is there any way to just save the field in sequence and do not alter the table 
? 

Thanks!


Lost tasks in Spark SQL join jobs

2015-09-16 Thread Gang Bai
Hi all,

I’m joining two tables on a specific attribute. The job is like 
`sqlContext.sql(“SELECT * FROM tableA LEFT JOIN tableB on 
tableA.uuid=tableB.uuid”)`, where tableA and tableB are two temp tables, of 
which both sizes are around 100 GBs and are not skewed on 'uuid’. 

As I run the application, I constantly see logs saying two sets of error:

One is like:

15/09/17 11:06:50 WARN TaskSetManager: Lost task 2946.0 in stage 1.0 (TID 1228, 
10.39.2.93): java.io.FileNotFoundException: 
/data2/hadoop/local/usercache/megatron/appcache/application_1435099124107_3613186/blockmgr-4761cb8d-0dbd-4832-98ef-e64a787e09d4/2f/shuffle_1_2946_0.data
 (No such file or directory)
at java.io.FileOutputStream.open(Native Method)
at java.io.FileOutputStream.(FileOutputStream.java:221)
at 
org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:130)
at 
org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:201)
at 
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$5$$anonfun$apply$2.apply(ExternalSorter.scala:759)
at 
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$5$$anonfun$apply$2.apply(ExternalSorter.scala:758)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$5.apply(ExternalSorter.scala:758)
at 
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$5.apply(ExternalSorter.scala:754)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:754)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:71)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

and the other is like:

5/09/17 11:06:50 ERROR YarnScheduler: Lost executor 925 on 10.39.7.87: remote 
Akka client disassociated
15/09/17 11:06:50 INFO TaskSetManager: Re-queueing tasks for 925 from TaskSet 
1.0
15/09/17 11:06:50 WARN ReliableDeliverySupervisor: Association with remote 
system [akka.tcp://sparkExecutor@10.39.7.87:52148] has failed, address is now 
gated for [5000] ms. Reason is: [Disassociated].
15/09/17 11:06:50 WARN TaskSetManager: Lost task 1321.0 in stage 1.0 (TID 1142, 
10.39.7.87): ExecutorLostFailure (executor 925 lost)
15/09/17 11:06:50 INFO DAGScheduler: Executor lost: 925 (epoch 1659)
15/09/17 11:06:50 INFO BlockManagerMasterActor: Trying to remove executor 925 
from BlockManagerMaster.
15/09/17 11:06:50 INFO BlockManagerMasterActor: Removing block manager 
BlockManagerId(925, 10.39.7.87, 51494)
15/09/17 11:06:50 INFO BlockManagerMaster: Removed 925 successfully in 
removeExecutor

And increasing the num of executors and executor memory didn’t help. Seems this 
is a very basic use case of SQL. So my question is how to solve this issue?

Thanks,
Gang



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: application failed on large dataset

2015-09-16 Thread 周千昊
indeed, the operation in this stage is quite memory consuming.
We are trying to enable the printGCDetail option and see what is going on.

java8964 于2015年9月16日周三 下午11:47写道:

> This sounds like a memory issue.
>
> Do you enable the GC output? When this is happening, are your executors
> doing full gc? How long is the full gc?
>
> Yong
>
> --
> From: qhz...@apache.org
> Date: Wed, 16 Sep 2015 13:52:25 +
>
> Subject: Re: application failed on large dataset
> To: java8...@hotmail.com; user@spark.apache.org
>
> Hi,
>  I have switch 'spark.shuffle.blockTransferService' to 'nio'. But the
> problem still exists. However the stack trace is a little bit different:
> PART one:
> 15/09/16 06:20:32 ERROR executor.Executor: Exception in task 1.2 in stage
> 15.0 (TID 5341)
> java.io.IOException: Failed without being ACK'd
> at
> org.apache.spark.network.nio.ConnectionManager$MessageStatus.failWithoutAck(ConnectionManager.scala:72)
> at
> org.apache.spark.network.nio.ConnectionManager$$anonfun$removeConnection$3.apply(ConnectionManager.scala:533)
> at
> org.apache.spark.network.nio.ConnectionManager$$anonfun$removeConnection$3.apply(ConnectionManager.scala:531)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at
> org.apache.spark.network.nio.ConnectionManager.removeConnection(ConnectionManager.scala:531)
> at
> org.apache.spark.network.nio.ConnectionManager$$anonfun$addListeners$3.apply(ConnectionManager.scala:510)
> at
> org.apache.spark.network.nio.ConnectionManager$$anonfun$addListeners$3.apply(ConnectionManager.scala:510)
> at
> org.apache.spark.network.nio.Connection.callOnCloseCallback(Connection.scala:162)
> at
> org.apache.spark.network.nio.Connection.close(Connection.scala:130)
> at
> org.apache.spark.network.nio.ConnectionManager$$anonfun$stop$1.apply(ConnectionManager.scala:1000)
> at
> org.apache.spark.network.nio.ConnectionManager$$anonfun$stop$1.apply(ConnectionManager.scala:1000)
> at
> scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:107)
> at
> scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:107)
> at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> at
> scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:107)
> at
> org.apache.spark.network.nio.ConnectionManager.stop(ConnectionManager.scala:1000)
> at
> org.apache.spark.network.nio.NioBlockTransferService.close(NioBlockTransferService.scala:78)
> at
> org.apache.spark.storage.BlockManager.stop(BlockManager.scala:1228)
> at org.apache.spark.SparkEnv.stop(SparkEnv.scala:100)
> at org.apache.spark.executor.Executor.stop(Executor.scala:144)
> at
> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:113)
> at org.apache.spark.rpc.akka.AkkaRpcEnv.org
> $apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:177)
> at
> org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:126)
> at org.apache.spark.rpc.akka.AkkaRpcEnv.org
> $apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:197)
> at
> org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:125)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at
> org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
> at
> org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
> at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> at
> org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
> at
> org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:92)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> at
> 

spark sql hook

2015-09-16 Thread r7raul1...@163.com

I want to modify some sql treenode before execute. I cau do this by hive hook 
in hive. Does spark support such hook? Any advise?


r7raul1...@163.com


Re: Re: Table is modified by DataFrameWriter

2015-09-16 Thread Josh Rosen
What are your JDBC properties configured to? Do you have overwrite mode
enabled?

On Wed, Sep 16, 2015 at 7:39 PM, guoqing0...@yahoo.com.hk <
guoqing0...@yahoo.com.hk> wrote:

> Spark-1.4.1
>
>
> *From:* Ted Yu 
> *Date:* 2015-09-17 10:29
> *To:* guoqing0...@yahoo.com.hk
> *CC:* user 
> *Subject:* Re: Table is modified by DataFrameWriter
> Can you tell us which release you were using ?
>
> Thanks
>
>
>
> On Sep 16, 2015, at 7:11 PM, "guoqing0...@yahoo.com.hk" <
> guoqing0...@yahoo.com.hk> wrote:
>
> Hi all,
> I found the table structure was modified  when use DataFrameWriter.jdbc
> to save the content of DataFrame ,
>
> sqlContext.sql("select '2015-09-17',count(1) from
> test").write.jdbc(url,test,properties)
>
> table structure before saving:
> app_key text
> t_amount bigint(20)
>
> saved:
> _c0 text
> _c1 bigint(20)
>
> Is there any way to just save the field in sequence and do not alter the
> table ?
>
> Thanks!
>
>


Re: Iterating over JavaRDD

2015-09-16 Thread Ted Yu
How about using this method:

   * Return a new RDD by applying a function to all elements of this RDD.
   */
  def mapToDouble[R](f: DoubleFunction[T]): JavaDoubleRDD = {
new JavaDoubleRDD(rdd.map(x => f.call(x).doubleValue()))

On Wed, Sep 16, 2015 at 8:30 PM, Tapan Sharma 
wrote:

> Hi All,
>
> I am a newbie. I want to achieve following scenario.
> I would like to iterate over a JavaRDD of Complex type (user defined class,
> say X).
> I need to calculate the sum of integers stored in X and return.
> Which method do I need to call of JavaRDD?
>
> Thanks
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Iterating-over-JavaRDD-tp24723.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


RE: RE: spark sql hook

2015-09-16 Thread Cheng, Hao
Probably a workable solution is, create your own SQLContext by extending the 
class HiveContext, and override the `analyzer`, and add your own rule to do the 
hacking.

From: r7raul1...@163.com [mailto:r7raul1...@163.com]
Sent: Thursday, September 17, 2015 11:08 AM
To: Cheng, Hao; user
Subject: Re: RE: spark sql hook

Example:
select * from test.table chang to  select * from production.table


r7raul1...@163.com

From: Cheng, Hao
Date: 2015-09-17 11:05
To: r7raul1...@163.com; 
user
Subject: RE: spark sql hook
Catalyst TreeNode is very fundamental API, not sure what kind of hook you need. 
Any concrete example will be more helpful to understand your requirement.

Hao

From: r7raul1...@163.com [mailto:r7raul1...@163.com]
Sent: Thursday, September 17, 2015 10:54 AM
To: user
Subject: spark sql hook


I want to modify some sql treenode before execute. I cau do this by hive hook 
in hive. Does spark support such hook? Any advise?

r7raul1...@163.com


Re: RE: spark sql hook

2015-09-16 Thread r7raul1...@163.com

Thank you


r7raul1...@163.com
 
From: Cheng, Hao
Date: 2015-09-17 12:32
To: r7raul1...@163.com; user
Subject: RE: RE: spark sql hook
Probably a workable solution is, create your own SQLContext by extending the 
class HiveContext, and override the `analyzer`, and add your own rule to do the 
hacking.
 
From: r7raul1...@163.com [mailto:r7raul1...@163.com] 
Sent: Thursday, September 17, 2015 11:08 AM
To: Cheng, Hao; user
Subject: Re: RE: spark sql hook
 
Example:
select * from test.table chang to  select * from production.table  
 


r7raul1...@163.com
 
From: Cheng, Hao
Date: 2015-09-17 11:05
To: r7raul1...@163.com; user
Subject: RE: spark sql hook
Catalyst TreeNode is very fundamental API, not sure what kind of hook you need. 
Any concrete example will be more helpful to understand your requirement.
 
Hao
 
From: r7raul1...@163.com [mailto:r7raul1...@163.com] 
Sent: Thursday, September 17, 2015 10:54 AM
To: user
Subject: spark sql hook
 
 
I want to modify some sql treenode before execute. I cau do this by hive hook 
in hive. Does spark support such hook? Any advise?


r7raul1...@163.com


Re: Table is modified by DataFrameWriter

2015-09-16 Thread Ted Yu
Can you tell us which release you were using ?

Thanks



> On Sep 16, 2015, at 7:11 PM, "guoqing0...@yahoo.com.hk" 
>  wrote:
> 
> Hi all,
> I found the table structure was modified  when use DataFrameWriter.jdbc to 
> save the content of DataFrame , 
>  
> sqlContext.sql("select '2015-09-17',count(1) from 
> test").write.jdbc(url,test,properties)
> 
> table structure before saving:
> app_key text
> t_amount bigint(20)
> 
> saved:
> _c0 text
> _c1 bigint(20)
> 
> Is there any way to just save the field in sequence and do not alter the 
> table ? 
> 
> Thanks!


RE: spark sql hook

2015-09-16 Thread Cheng, Hao
Catalyst TreeNode is very fundamental API, not sure what kind of hook you need. 
Any concrete example will be more helpful to understand your requirement.

Hao

From: r7raul1...@163.com [mailto:r7raul1...@163.com]
Sent: Thursday, September 17, 2015 10:54 AM
To: user
Subject: spark sql hook


I want to modify some sql treenode before execute. I cau do this by hive hook 
in hive. Does spark support such hook? Any advise?

r7raul1...@163.com


Iterating over JavaRDD

2015-09-16 Thread Tapan Sharma
Hi All,

I am a newbie. I want to achieve following scenario.
I would like to iterate over a JavaRDD of Complex type (user defined class,
say X).
I need to calculate the sum of integers stored in X and return.
Which method do I need to call of JavaRDD?

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Iterating-over-JavaRDD-tp24723.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to recovery DStream from checkpoint directory?

2015-09-16 Thread Bin Wang
And here is another question. If I load the DStream from database every
time I start the job, will the data be loaded when the job is failed and
auto restart? If so, both the checkpoint data and database data are loaded,
won't this a problem?



Bin Wang 于2015年9月16日周三 下午8:40写道:

> Will StreamingContex.getOrCreate do this work?What kind of code change
> will make it cannot load?
>
> Akhil Das 于2015年9月16日周三 20:20写道:
>
>> You can't really recover from checkpoint if you alter the code. A better
>> approach would be to use some sort of external storage (like a db or
>> zookeeper etc) to keep the state (the indexes etc) and then when you deploy
>> new code they can be easily recovered.
>>
>> Thanks
>> Best Regards
>>
>> On Wed, Sep 16, 2015 at 3:52 PM, Bin Wang  wrote:
>>
>>> I'd like to know if there is a way to recovery dstream from checkpoint.
>>>
>>> Because I stores state in DStream, I'd like the state to be recovered
>>> when I restart the application and deploy new code.
>>>
>>
>>


Re: Re: Table is modified by DataFrameWriter

2015-09-16 Thread guoqing0...@yahoo.com.hk
I tried SaveMode.Append and SaveMode.Overwrite , the output table was modified 
. 
Is the _c0 and _c1 automatically generated for the DataFrame Schema? 
In my scenario , i hope it just flush the data from DataFrame to RMDB if there 
are the same structure between on both . but i found the column name was 
modified.

 
From: Josh Rosen
Date: 2015-09-17 11:42
To: guoqing0...@yahoo.com.hk
CC: Ted Yu; user
Subject: Re: Re: Table is modified by DataFrameWriter
What are your JDBC properties configured to? Do you have overwrite mode enabled?

On Wed, Sep 16, 2015 at 7:39 PM, guoqing0...@yahoo.com.hk 
 wrote:
Spark-1.4.1

 
From: Ted Yu
Date: 2015-09-17 10:29
To: guoqing0...@yahoo.com.hk
CC: user
Subject: Re: Table is modified by DataFrameWriter
Can you tell us which release you were using ?

Thanks



On Sep 16, 2015, at 7:11 PM, "guoqing0...@yahoo.com.hk" 
 wrote:

Hi all,
I found the table structure was modified  when use DataFrameWriter.jdbc to save 
the content of DataFrame , 
 
sqlContext.sql("select '2015-09-17',count(1) from 
test").write.jdbc(url,test,properties)

table structure before saving:
app_key text
t_amount bigint(20)

saved:
_c0 text
_c1 bigint(20)

Is there any way to just save the field in sequence and do not alter the table 
? 

Thanks!



Re: Re: Table is modified by DataFrameWriter

2015-09-16 Thread guoqing0...@yahoo.com.hk
Spark-1.4.1

 
From: Ted Yu
Date: 2015-09-17 10:29
To: guoqing0...@yahoo.com.hk
CC: user
Subject: Re: Table is modified by DataFrameWriter
Can you tell us which release you were using ?

Thanks



On Sep 16, 2015, at 7:11 PM, "guoqing0...@yahoo.com.hk" 
 wrote:

Hi all,
I found the table structure was modified  when use DataFrameWriter.jdbc to save 
the content of DataFrame , 
 
sqlContext.sql("select '2015-09-17',count(1) from 
test").write.jdbc(url,test,properties)

table structure before saving:
app_key text
t_amount bigint(20)

saved:
_c0 text
_c1 bigint(20)

Is there any way to just save the field in sequence and do not alter the table 
? 

Thanks!


Re: Spark Thrift Server JDBC Drivers

2015-09-16 Thread Dan LaBar
I'm running Spark in EMR, and using the JDBC driver provided by AWS
.
Don't know if it will work outside of EMR, but it's worth a try.

I've also used the ODBC driver from Hortonworks
.

Regards,
Dan

On Wed, Sep 16, 2015 at 8:34 AM, Daniel Haviv <
daniel.ha...@veracity-group.com> wrote:

> Hi,
> are there any free JDBC drivers for thrift ?
> The only ones I could find are Simba's which require a license.
>
> Thank,
> Daniel
>