Re: Simple but faster data streaming

2015-04-03 Thread Tathagata Das
I am afraid not. The whole point of Spark Streaming is to make it easy to
do complicated processing on streaming data while interoperating with core
Spark, MLlib, SQL without the operational overheads of maintain 4 different
systems. As a slight cost of achieving that unification, there maybe some
overheads compared to specialized systems that are designed to one specific
thing.

If you have to do something simple that could have been done using Flume,
then the resources needed by the Spark Streaming program shouldn't be too
high. Can you provide more details?

TD

On Thu, Apr 2, 2015 at 11:51 AM, Harut Martirosyan 
harut.martiros...@gmail.com wrote:

 Hi guys.

 Is there a more lightweight way of stream processing with Spark? What we
 want is a simpler way, preferably with no scheduling, which just streams
 the data to destinations multiple.

 We extensively use Spark Core, SQL, Streaming, GraphX, so it's our main
 tool and don't want to add new things to the stack like Storm or Flume, but
 from other side, it really takes much more resources on same streaming than
 our previous setup with Flume, especially if we have multiple destinations
 (triggers multiple actions/scheduling)


 --
 RGRDZ Harut



Re: Spark Streaming FileStream Nested File Support

2015-04-03 Thread Tathagata Das
Yes, definitely can be added. Just haven't gotten around to doing it :)
There are proposals for this that you can try -
https://github.com/apache/spark/pull/2765/files . Have you review it at
some point.

On Fri, Apr 3, 2015 at 1:08 PM, Adam Ritter adamge...@gmail.com wrote:

 That doesn't seem like a good solution unfortunately as I would be needing
 this to work in a production environment.  Do you know why the limitation
 exists for FileInputDStream in the first place?  Unless I'm missing
 something important about how some of the internals work I don't see why
 this feature could be added in at some point.

 On Fri, Apr 3, 2015 at 12:47 PM, Tathagata Das t...@databricks.com
 wrote:

 I sort-a-hacky workaround is to use a queueStream where you can manually
 create RDDs (using sparkContext.hadoopFile) and insert into the queue. Note
 that this is for testing only as queueStream does not work with driver
 fautl recovery.

 TD

 On Fri, Apr 3, 2015 at 12:23 PM, adamgerst adamge...@gmail.com wrote:

 So after pulling my hair out for a bit trying to convert one of my
 standard
 spark jobs to streaming I found that FileInputDStream does not support
 nested folders (see the brief mention here

 http://spark.apache.org/docs/latest/streaming-programming-guide.html#basic-sources
 the fileStream method returns a FileInputDStream).  So before, for my
 standard job, I was reading from say

 s3n://mybucket/2015/03/02/*log

 And could also modify it to simply get an entire months worth of logs.
 Since the logs are split up based upon their date, when the batch ran for
 the day, I simply passed in a parameter of the date to make sure I was
 reading the correct data

 But since I want to turn this job into a streaming job I need to simply
 do
 something like

 s3n://mybucket/*log

 This would totally work fine if it were a standard spark application, but
 fails for streaming.  Is there anyway I can get around this limitation?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-FileStream-Nested-File-Support-tp22370.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: WordCount example

2015-04-03 Thread Tathagata Das
How many cores are present in the works allocated to the standalone cluster
spark://ip-10-241-251-232:7077 ?


On Fri, Apr 3, 2015 at 2:18 PM, Mohit Anchlia mohitanch...@gmail.com
wrote:

 If I use local[2] instead of *URL:* spark://ip-10-241-251-232:7077 this
 seems to work. I don't understand why though because when I
 give spark://ip-10-241-251-232:7077 application seem to bootstrap
 successfully, just doesn't create a socket on port ?


 On Fri, Mar 27, 2015 at 10:55 AM, Mohit Anchlia mohitanch...@gmail.com
 wrote:

 I checked the ports using netstat and don't see any connections
 established on that port. Logs show only this:

 15/03/27 13:50:48 INFO Master: Registering app NetworkWordCount
 15/03/27 13:50:48 INFO Master: Registered app NetworkWordCount with ID
 app-20150327135048-0002

 Spark ui shows:

 Running Applications
 IDNameCoresMemory per NodeSubmitted TimeUserStateDuration
 app-20150327135048-0002
 http://54.69.225.94:8080/app?appId=app-20150327135048-0002
 NetworkWordCount
 http://ip-10-241-251-232.us-west-2.compute.internal:4040/0512.0 
 MB2015/03/27
 13:50:48ec2-userWAITING33 s
 Code looks like is being executed:

 java -cp .:* org.spark.test.WordCount spark://ip-10-241-251-232:7077

 *public* *static* *void* doWork(String masterUrl){

 SparkConf conf = *new* SparkConf().setMaster(masterUrl).setAppName(
 NetworkWordCount);

 JavaStreamingContext *jssc* = *new* JavaStreamingContext(conf, Durations.
 *seconds*(1));

 JavaReceiverInputDStreamString lines = jssc.socketTextStream(
 localhost, );

 System.*out*.println(Successfully created connection);

 *mapAndReduce*(lines);

  jssc.start(); // Start the computation

 jssc.awaitTermination(); // Wait for the computation to terminate

 }

 *public* *static* *void* main(String ...args){

 *doWork*(args[0]);

 }
 And output of the java program after submitting the task:

 java -cp .:* org.spark.test.WordCount spark://ip-10-241-251-232:7077
 Using Spark's default log4j profile:
 org/apache/spark/log4j-defaults.properties
 15/03/27 13:50:46 INFO SecurityManager: Changing view acls to: ec2-user
 15/03/27 13:50:46 INFO SecurityManager: Changing modify acls to: ec2-user
 15/03/27 13:50:46 INFO SecurityManager: SecurityManager: authentication
 disabled; ui acls disabled; users with view permissions: Set(ec2-user);
 users with modify permissions: Set(ec2-user)
 15/03/27 13:50:46 INFO Slf4jLogger: Slf4jLogger started
 15/03/27 13:50:46 INFO Remoting: Starting remoting
 15/03/27 13:50:47 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp://sparkdri...@ip-10-241-251-232.us-west-2.compute.internal
 :60184]
 15/03/27 13:50:47 INFO Utils: Successfully started service 'sparkDriver'
 on port 60184.
 15/03/27 13:50:47 INFO SparkEnv: Registering MapOutputTracker
 15/03/27 13:50:47 INFO SparkEnv: Registering BlockManagerMaster
 15/03/27 13:50:47 INFO DiskBlockManager: Created local directory at
 /tmp/spark-local-20150327135047-5399
 15/03/27 13:50:47 INFO MemoryStore: MemoryStore started with capacity 3.5
 GB
 15/03/27 13:50:47 WARN NativeCodeLoader: Unable to load native-hadoop
 library for your platform... using builtin-java classes where applicable
 15/03/27 13:50:47 INFO HttpFileServer: HTTP File server directory is
 /tmp/spark-7e26df49-1520-4c77-b411-c837da59fa5b
 15/03/27 13:50:47 INFO HttpServer: Starting HTTP Server
 15/03/27 13:50:47 INFO Utils: Successfully started service 'HTTP file
 server' on port 57955.
 15/03/27 13:50:47 INFO Utils: Successfully started service 'SparkUI' on
 port 4040.
 15/03/27 13:50:47 INFO SparkUI: Started SparkUI at
 http://ip-10-241-251-232.us-west-2.compute.internal:4040
 15/03/27 13:50:47 INFO AppClient$ClientActor: Connecting to master
 spark://ip-10-241-251-232:7077...
 15/03/27 13:50:48 INFO SparkDeploySchedulerBackend: Connected to Spark
 cluster with app ID app-20150327135048-0002
 15/03/27 13:50:48 INFO NettyBlockTransferService: Server created on 58358
 15/03/27 13:50:48 INFO BlockManagerMaster: Trying to register BlockManager
 15/03/27 13:50:48 INFO BlockManagerMasterActor: Registering block manager
 ip-10-241-251-232.us-west-2.compute.internal:58358 with 3.5 GB RAM,
 BlockManagerId(driver, ip-10-241-251-232.us-west-2.compute.internal,
 58358)
 15/03/27 13:50:48 INFO BlockManagerMaster: Registered BlockManager
 15/03/27 13:50:48 INFO SparkDeploySchedulerBackend: SchedulerBackend is
 ready for scheduling beginning after reached minRegisteredResourcesRatio:
 0.0
 15/03/27 13:50:48 INFO ReceiverTracker: ReceiverTracker started
 15/03/27 13:50:48 INFO ForEachDStream: metadataCleanupDelay = -1
 15/03/27 13:50:48 INFO ShuffledDStream: metadataCleanupDelay = -1
 15/03/27 13:50:48 INFO MappedDStream: metadataCleanupDelay = -1
 15/03/27 13:50:48 INFO FlatMappedDStream: metadataCleanupDelay = -1
 15/03/27 13:50:48 INFO SocketInputDStream: metadataCleanupDelay = -1
 15/03/27 13:50:48 INFO SocketInputDStream: Slide time = 1000 ms
 15/03/27 13:50:48 INFO SocketInputDStream: Storage level =
 

Re: Spark + Kinesis

2015-04-03 Thread Tathagata Das
Just remove provided for spark-streaming-kinesis-asl

libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl
% 1.3.0

On Fri, Apr 3, 2015 at 12:45 PM, Vadim Bichutskiy 
vadim.bichuts...@gmail.com wrote:

 Thanks. So how do I fix it?
 ᐧ

 On Fri, Apr 3, 2015 at 3:43 PM, Kelly, Jonathan jonat...@amazon.com
 wrote:

   spark-streaming-kinesis-asl is not part of the Spark distribution on
 your cluster, so you cannot have it be just a provided dependency.  This
 is also why the KCL and its dependencies were not included in the assembly
 (but yes, they should be).


  ~ Jonathan Kelly

   From: Vadim Bichutskiy vadim.bichuts...@gmail.com
 Date: Friday, April 3, 2015 at 12:26 PM
 To: Jonathan Kelly jonat...@amazon.com
 Cc: user@spark.apache.org user@spark.apache.org
 Subject: Re: Spark + Kinesis

   Hi all,

  Good news! I was able to create a Kinesis consumer and assemble it into
 an uber jar following
 http://spark.apache.org/docs/latest/streaming-kinesis-integration.html
 and example
 https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
 .

  However when I try to spark-submit it I get the following exception:

  *Exception in thread main java.lang.NoClassDefFoundError:
 com/amazonaws/auth/AWSCredentialsProvider*

  Do I need to include KCL dependency in *build.sbt*, here's what it
 looks like currently:

  import AssemblyKeys._
 name := Kinesis Consumer
 version := 1.0
 organization := com.myconsumer
 scalaVersion := 2.11.5

  libraryDependencies += org.apache.spark %% spark-core % 1.3.0 %
 provided
 libraryDependencies += org.apache.spark %% spark-streaming % 1.3.0
 % provided
 libraryDependencies += org.apache.spark %%
 spark-streaming-kinesis-asl % 1.3.0 % provided

  assemblySettings
 jarName in assembly :=  consumer-assembly.jar
 assemblyOption in assembly := (assemblyOption in
 assembly).value.copy(includeScala=false)

  Any help appreciated.

  Thanks,
 Vadim

 On Thu, Apr 2, 2015 at 1:15 PM, Kelly, Jonathan jonat...@amazon.com
 wrote:

  It looks like you're attempting to mix Scala versions, so that's going
 to cause some problems.  If you really want to use Scala 2.11.5, you must
 also use Spark package versions built for Scala 2.11 rather than 2.10.
 Anyway, that's not quite the correct way to specify Scala dependencies in
 build.sbt.  Instead of placing the Scala version after the artifactId (like
 spark-core_2.10), what you actually want is to use just spark-core with
 two percent signs before it.  Using two percent signs will make it use the
 version of Scala that matches your declared scalaVersion.  For example:

  libraryDependencies += org.apache.spark %% spark-core % 1.3.0 %
 provided

  libraryDependencies += org.apache.spark %% spark-streaming %
 1.3.0 % provided

  libraryDependencies += org.apache.spark %%
 spark-streaming-kinesis-asl % 1.3.0

  I think that may get you a little closer, though I think you're
 probably going to run into the same problems I ran into in this thread:
 https://www.mail-archive.com/user@spark.apache.org/msg23891.html  I
 never really got an answer for that, and I temporarily moved on to other
 things for now.


  ~ Jonathan Kelly

   From: 'Vadim Bichutskiy' vadim.bichuts...@gmail.com
 Date: Thursday, April 2, 2015 at 9:53 AM
 To: user@spark.apache.org user@spark.apache.org
 Subject: Spark + Kinesis

   Hi all,

  I am trying to write an Amazon Kinesis consumer Scala app that
 processes data in the
 Kinesis stream. Is this the correct way to specify *build.sbt*:

  ---
 *import AssemblyKeys._*
 *name := Kinesis Consumer*






 *version := 1.0 organization := com.myconsumer scalaVersion :=
 2.11.5 libraryDependencies ++= Seq(org.apache.spark % spark-core_2.10
 % 1.3.0 % provided, org.apache.spark % spark-streaming_2.10 %
 1.3.0 org.apache.spark % spark-streaming-kinesis-asl_2.10 % 1.3.0)*



 * assemblySettings jarName in assembly :=  consumer-assembly.jar
 assemblyOption in assembly := (assemblyOption in
 assembly).value.copy(includeScala=false)*
 

  In *project/assembly.sbt* I have only the following line:

  *addSbtPlugin(com.eed3si9n % sbt-assembly % 0.13.0)*

  I am using sbt 0.13.7. I adapted Example 7.7 in the Learning Spark
 book.

  Thanks,
 Vadim






Re: Spark Streaming FileStream Nested File Support

2015-04-03 Thread Tathagata Das
I sort-a-hacky workaround is to use a queueStream where you can manually
create RDDs (using sparkContext.hadoopFile) and insert into the queue. Note
that this is for testing only as queueStream does not work with driver
fautl recovery.

TD

On Fri, Apr 3, 2015 at 12:23 PM, adamgerst adamge...@gmail.com wrote:

 So after pulling my hair out for a bit trying to convert one of my standard
 spark jobs to streaming I found that FileInputDStream does not support
 nested folders (see the brief mention here

 http://spark.apache.org/docs/latest/streaming-programming-guide.html#basic-sources
 the fileStream method returns a FileInputDStream).  So before, for my
 standard job, I was reading from say

 s3n://mybucket/2015/03/02/*log

 And could also modify it to simply get an entire months worth of logs.
 Since the logs are split up based upon their date, when the batch ran for
 the day, I simply passed in a parameter of the date to make sure I was
 reading the correct data

 But since I want to turn this job into a streaming job I need to simply do
 something like

 s3n://mybucket/*log

 This would totally work fine if it were a standard spark application, but
 fails for streaming.  Is there anyway I can get around this limitation?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-FileStream-Nested-File-Support-tp22370.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark Streaming Worker runs out of inodes

2015-04-02 Thread Tathagata Das
Are you saying that even with the spark.cleaner.ttl set your files are not
getting cleaned up?

TD

On Thu, Apr 2, 2015 at 8:23 AM, andrem amesa...@gmail.com wrote:

 Apparently Spark Streaming 1.3.0 is not cleaning up its internal files and
 the worker nodes eventually run out of inodes.
 We see tons of old shuffle_*.data and *.index files that are never deleted.
 How do we get Spark to remove these files?

 We have a simple standalone app with one RabbitMQ receiver and a two node
 cluster (2 x r3large AWS instances).
 Batch interval is 10 minutes after which we process data and write results
 to DB. No windowing or state mgmt is used.

 I've poured over the documentation and tried setting the following
 properties but they have not helped.
 As a work around we're using a cron script that periodically cleans up old
 files but this has a bad smell to it.

 SPARK_WORKER_OPTS in spark-env.sh on every worker node
   spark.worker.cleanup.enabled true
   spark.worker.cleanup.interval
   spark.worker.cleanup.appDataTtl

 Also tried on the driver side:
   spark.cleaner.ttl
   spark.shuffle.consolidateFiles true



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Worker-runs-out-of-inodes-tp22355.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark Streaming 1.3 Kafka Direct Streams

2015-04-01 Thread Tathagata Das
We should be able to support that use case in the direct API. It may be as
simple as allowing the users to pass on a function that returns the set of
topic+partitions to read from.
That is function (Time) = Set[TopicAndPartition] This gets called every
batch interval before the offsets are decided. This would allow users to
add topics, delete topics, modify partitions on the fly.

What do you think Cody?




On Wed, Apr 1, 2015 at 11:57 AM, Neelesh neele...@gmail.com wrote:

 Thanks Cody!

 On Wed, Apr 1, 2015 at 11:21 AM, Cody Koeninger c...@koeninger.org
 wrote:

 If you want to change topics from batch to batch, you can always just
 create a KafkaRDD repeatedly.

 The streaming code as it stands assumes a consistent set of topics
 though.  The implementation is private so you cant subclass it without
 building your own spark.

 On Wed, Apr 1, 2015 at 1:09 PM, Neelesh neele...@gmail.com wrote:

 Thanks Cody, that was really helpful.  I have a much better
 understanding now. One last question -  Kafka topics  are initialized once
 in the driver, is there an easy way of adding/removing topics on the fly?
 KafkaRDD#getPartitions() seems to be computed only once, and no way of
 refreshing them.

 Thanks again!

 On Wed, Apr 1, 2015 at 10:01 AM, Cody Koeninger c...@koeninger.org
 wrote:

 https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md

 The kafka consumers run in the executors.

 On Wed, Apr 1, 2015 at 11:18 AM, Neelesh neele...@gmail.com wrote:

 With receivers, it was pretty obvious which code ran where - each
 receiver occupied a core and ran on the workers. However, with the new
 kafka direct input streams, its hard for me to understand where the code
 that's reading from kafka brokers runs. Does it run on the driver (I hope
 not), or does it run on workers?

 Any help appreciated
 thanks!
 -neelesh








Re: Spark Streaming and JMS

2015-04-01 Thread Tathagata Das
Its not a built in component of Spark. However there is a spark-package for
Apache Camel receiver which can integrate with JMS.
http://spark-packages.org/package/synsys/spark

I have not tried it but do check it out.

TD

On Wed, Apr 1, 2015 at 4:38 AM, danila danila.erma...@gmail.com wrote:

 Hi Tathagata

 do you know if JMS Reciever was introduced during last year as standard
 Spark component or somebody is developing it?


 Regards
 Danila



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-and-JMS-tp5371p22337.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Size of arbitrary state managed via DStream updateStateByKey

2015-04-01 Thread Tathagata Das
In the current state yes there will be performance issues. It can be done
much more efficiently and we are working on doing that.

TD

On Wed, Apr 1, 2015 at 7:49 AM, Vinoth Chandar vin...@uber.com wrote:

 Hi all,

 As I understand from docs and talks, the streaming state is in memory as
 RDD (optionally checkpointable to disk). SPARK-2629 hints that this in
 memory structure is not indexed efficiently?

 I am wondering how my performance would be if the streaming state does not
 fit in memory (say 100GB state over 10GB total RAM), and I did random
 updates to different keys via updateStateByKey? (Would throwing in SSDs
 help out).

 I am picturing some kind of performance degeneration would happen akin to
 Linux/innoDB Buffer cache thrashing. But if someone can demystify this,
 that would be awesome.

 Thanks
 Vinoth




Re: Spark Streaming 1.3 Kafka Direct Streams

2015-04-01 Thread Tathagata Das
The challenge of opening up these internal classes to public (even with
Developer API tag) is that it prevents us from making non-trivial changes
without breaking API compatibility for all those who had subclassed. Its a
tradeoff that is hard to optimize. That's why we favor exposing more
optional parameters in the stable API (KafkaUtils) so that we can maintain
binary compatibility with user code as well as allowing us to make
non-trivial changes internally.

That said, it may be worthwhile to actually take an optional compute
function as a parameter through the KafkaUtils, as Cody suggested ( (Time,
current offsets, kafka metadata, etc) = Option[KafkaRDD]). Worth thinking
about its implications in the context of the driver restarts, etc (as those
function will get called again on restart, and different return value from
before can screw up semantics).

TD

On Wed, Apr 1, 2015 at 12:28 PM, Neelesh neele...@gmail.com wrote:

 +1 for subclassing. its more flexible if we can  subclass the
 implementation classes.
  On Apr 1, 2015 12:19 PM, Cody Koeninger c...@koeninger.org wrote:

 As I said in the original ticket, I think the implementation classes
 should be exposed so that people can subclass and override compute() to
 suit their needs.

 Just adding a function from Time = Set[TopicAndPartition] wouldn't be
 sufficient for some of my current production use cases.

 compute() isn't really a function from Time = Option[KafkaRDD], it's a
 function from (Time, current offsets, kafka metadata, etc) =
 Option[KafkaRDD]

 I think it's more straightforward to give access to that additional state
 via subclassing than it is to add in more callbacks for every possible use
 case.




 On Wed, Apr 1, 2015 at 2:01 PM, Tathagata Das t...@databricks.com
 wrote:

 We should be able to support that use case in the direct API. It may be
 as simple as allowing the users to pass on a function that returns the set
 of topic+partitions to read from.
 That is function (Time) = Set[TopicAndPartition] This gets called every
 batch interval before the offsets are decided. This would allow users to
 add topics, delete topics, modify partitions on the fly.

 What do you think Cody?




 On Wed, Apr 1, 2015 at 11:57 AM, Neelesh neele...@gmail.com wrote:

 Thanks Cody!

 On Wed, Apr 1, 2015 at 11:21 AM, Cody Koeninger c...@koeninger.org
 wrote:

 If you want to change topics from batch to batch, you can always just
 create a KafkaRDD repeatedly.

 The streaming code as it stands assumes a consistent set of topics
 though.  The implementation is private so you cant subclass it without
 building your own spark.

 On Wed, Apr 1, 2015 at 1:09 PM, Neelesh neele...@gmail.com wrote:

 Thanks Cody, that was really helpful.  I have a much better
 understanding now. One last question -  Kafka topics  are initialized 
 once
 in the driver, is there an easy way of adding/removing topics on the fly?
 KafkaRDD#getPartitions() seems to be computed only once, and no way of
 refreshing them.

 Thanks again!

 On Wed, Apr 1, 2015 at 10:01 AM, Cody Koeninger c...@koeninger.org
 wrote:


 https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md

 The kafka consumers run in the executors.

 On Wed, Apr 1, 2015 at 11:18 AM, Neelesh neele...@gmail.com wrote:

 With receivers, it was pretty obvious which code ran where - each
 receiver occupied a core and ran on the workers. However, with the new
 kafka direct input streams, its hard for me to understand where the 
 code
 that's reading from kafka brokers runs. Does it run on the driver (I 
 hope
 not), or does it run on workers?

 Any help appreciated
 thanks!
 -neelesh










Re: Could not compute split, block not found in Spark Streaming Simple Application

2015-03-27 Thread Tathagata Das
If it is deterministically reproducible, could you generate full DEBUG
level logs, from the driver and the workers and give it to me? Basically I
want to trace through what is happening to the block that is not being
found.
And can you tell what Cluster manager are you using? Spark Standalone,
Mesos or YARN?


On Fri, Mar 27, 2015 at 10:09 AM, Saiph Kappa saiph.ka...@gmail.com wrote:

 Hi,

 I am just running this simple example with
 machineA: 1 master + 1 worker
 machineB: 1 worker
 «
 val ssc = new StreamingContext(sparkConf, Duration(1000))

 val rawStreams = (1 to numStreams).map(_
 =ssc.rawSocketStream[String](host, port,
 StorageLevel.MEMORY_ONLY_SER)).toArray
 val union = ssc.union(rawStreams)

 union.filter(line = Random.nextInt(1) == 0).map(line = {
   var sum = BigInt(0)
   line.toCharArray.foreach(chr = sum += chr.toInt)
   fib2(sum)
   sum
 }).reduceByWindow(_+_, Seconds(1),Seconds(1)).map(s = s### result:
 $s).print()
 »

 And I'm getting the following exceptions:

 Log from machineB
 «
 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task 132
 15/03/27 16:21:35 INFO Executor: Running task 0.0 in stage 27.0 (TID 132)
 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task 134
 15/03/27 16:21:35 INFO Executor: Running task 2.0 in stage 27.0 (TID 134)
 15/03/27 16:21:35 INFO TorrentBroadcast: Started reading broadcast
 variable 24
 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task 136
 15/03/27 16:21:35 INFO Executor: Running task 4.0 in stage 27.0 (TID 136)
 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task 138
 15/03/27 16:21:35 INFO Executor: Running task 6.0 in stage 27.0 (TID 138)
 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task 140
 15/03/27 16:21:35 INFO Executor: Running task 8.0 in stage 27.0 (TID 140)
 15/03/27 16:21:35 INFO MemoryStore: ensureFreeSpace(1886) called with
 curMem=47117, maxMem=280248975
 15/03/27 16:21:35 INFO MemoryStore: Block broadcast_24_piece0 stored as
 bytes in memory (estimated size 1886.0 B, free 267.2 MB)
 15/03/27 16:21:35 INFO BlockManagerMaster: Updated info of block
 broadcast_24_piece0
 15/03/27 16:21:35 INFO TorrentBroadcast: Reading broadcast variable 24
 took 19 ms
 15/03/27 16:21:35 INFO MemoryStore: ensureFreeSpace(3104) called with
 curMem=49003, maxMem=280248975
 15/03/27 16:21:35 INFO MemoryStore: Block broadcast_24 stored as values in
 memory (estimated size 3.0 KB, free 267.2 MB)
 15/03/27 16:21:35 ERROR Executor: Exception in task 8.0 in stage 27.0 (TID
 140)
 java.lang.Exception: Could not compute split, block input-0-1427473262420
 not found
 at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:701)
 15/03/27 16:21:35 ERROR Executor: Exception in task 6.0 in stage 27.0 (TID
 138)
 java.lang.Exception: Could not compute split, block input-0-1427473262418
 not found
 at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at 

Re: spark streaming driver hang

2015-03-27 Thread Tathagata Das
Do you have the logs of the driver? Does that give any exceptions?

TD

On Fri, Mar 27, 2015 at 12:24 PM, Chen Song chen.song...@gmail.com wrote:

 I ran a spark streaming job.

 100 executors
 30G heap per executor
 4 cores per executor

 The version I used is 1.3.0-cdh5.1.0.

 The job is reading from a directory on HDFS (with files incoming
 continuously) and does some join on the data. I set batch interval to be 15
 minutes and the job worked fine in the first few batches.

 However, it just stalled after 7-8 batches. Below are some symptoms.

 * In Spark UI, every tab worked fine except Streaming tab. When I
 clicked on it, it just hang forever.
 * I did not see any GC activity on driver.
 * Nothing was printed out from driver log.

 Anyone has seen this before?

 --
 Chen Song




Re: Strange JavaDeserialization error - java.lang.ClassNotFoundException: org/apache/spark/storage/StorageLevel

2015-03-27 Thread Tathagata Das
Does it fail with just Spark jobs (using storage levels) on non-coarse mode?

TD

On Fri, Mar 27, 2015 at 4:39 AM, Ondrej Smola ondrej.sm...@gmail.com
wrote:

 More info

 when using *spark.mesos.coarse* everything works as expected. I think
 this must be a bug in spark-mesos integration.


 2015-03-27 9:23 GMT+01:00 Ondrej Smola ondrej.sm...@gmail.com:

 It happens only when StorageLevel is used with 1 replica ( StorageLevel.
 MEMORY_ONLY_2,StorageLevel.MEMORY_AND_DISK_2) , StorageLevel.MEMORY_ONLY
 ,StorageLevel.MEMORY_AND_DISK works - the problems must be clearly
 somewhere between mesos-spark . From console I see that spark is trying to
 replicate to nodes - nodes show up in Mesos active tasks ... but they
 always fail with ClassNotFoundE.

 2015-03-27 0:52 GMT+01:00 Tathagata Das t...@databricks.com:

 Could you try running a simpler spark streaming program with receiver
 (may be socketStream) and see if that works.

 TD

 On Thu, Mar 26, 2015 at 2:08 PM, Ondrej Smola ondrej.sm...@gmail.com
 wrote:

 Hi thanks for reply,

 yes I have custom receiver - but it has simple logic .. pop ids from
 redis queue - load docs based on ids from elastic and store them in spark.
 No classloader modifications. I am running multiple Spark batch jobs (with
 user supplied partitioning) and they have no problems, debug in local mode
 show no errors.

 2015-03-26 21:47 GMT+01:00 Tathagata Das t...@databricks.com:

 Here are few steps to debug.

 1. Try using replication from a Spark job: sc.parallelize(1 to 100,
 100).persist(StorageLevel.MEMORY_ONLY_2).count()
 2. If one works, then we know that there is probably nothing wrong
 with the Spark installation, and probably in the threads related to the
 receivers receiving the data. Are you writing a custom receiver? Are you
 somehow playing around with the class loader in the custom receiver?

 TD


 On Thu, Mar 26, 2015 at 10:59 AM, Ondrej Smola ondrej.sm...@gmail.com
  wrote:

 Hi,

 I am running spark streaming v 1.3.0 (running inside Docker) on Mesos
 0.21.1. Spark streaming is started using Marathon - docker container 
 gets
 deployed and starts streaming (from custom Actor). Spark binary is 
 located
 on shared GlusterFS volume. Data is streamed from Elasticsearch/Redis. 
 When
 new batch arrives Spark tries to replicate it but fails with following
 error :

 15/03/26 14:50:00 INFO MemoryStore: Block broadcast_0 of size 2840
 dropped from memory (free 278017782)
 15/03/26 14:50:00 INFO BlockManager: Removing block broadcast_0_piece0
 15/03/26 14:50:00 INFO MemoryStore: Block broadcast_0_piece0 of size
 1658 dropped from memory (free 278019440)
 15/03/26 14:50:00 INFO BlockManagerMaster: Updated info of block
 broadcast_0_piece0
 15/03/26 14:50:00 ERROR TransportRequestHandler: Error while invoking
 RpcHandler#receive() on RPC id 7178767328921933569
 java.lang.ClassNotFoundException:
 org/apache/spark/storage/StorageLevel
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:344)
 at
 org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:65)
 at
 java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
 at
 java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
 at
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:68)
 at
 org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:88)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:65)
 at
 org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124)
 at
 org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97)
 at
 org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
 at
 org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
 at
 io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
 at
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
 at
 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
 at
 io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
 at
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
 at
 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
 at
 io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
 at
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead

Re: Strange JavaDeserialization error - java.lang.ClassNotFoundException: org/apache/spark/storage/StorageLevel

2015-03-27 Thread Tathagata Das
Seems like a bug, could you file a JIRA?

@Tim: Patrick said you take a look at Mesos related issues. Could you take
a look at this. Thanks!

TD

On Fri, Mar 27, 2015 at 1:25 PM, Ondrej Smola ondrej.sm...@gmail.com
wrote:

 Yes, only when using fine grained mode and replication 
 (StorageLevel.MEMORY_ONLY_2
 etc).

 2015-03-27 19:06 GMT+01:00 Tathagata Das t...@databricks.com:

 Does it fail with just Spark jobs (using storage levels) on non-coarse
 mode?

 TD

 On Fri, Mar 27, 2015 at 4:39 AM, Ondrej Smola ondrej.sm...@gmail.com
 wrote:

 More info

 when using *spark.mesos.coarse* everything works as expected. I think
 this must be a bug in spark-mesos integration.


 2015-03-27 9:23 GMT+01:00 Ondrej Smola ondrej.sm...@gmail.com:

 It happens only when StorageLevel is used with 1 replica (
 StorageLevel.MEMORY_ONLY_2,StorageLevel.MEMORY_AND_DISK_2) ,
 StorageLevel.MEMORY_ONLY ,StorageLevel.MEMORY_AND_DISK works - the
 problems must be clearly somewhere between mesos-spark . From console I see
 that spark is trying to replicate to nodes - nodes show up in Mesos active
 tasks ... but they always fail with ClassNotFoundE.

 2015-03-27 0:52 GMT+01:00 Tathagata Das t...@databricks.com:

 Could you try running a simpler spark streaming program with receiver
 (may be socketStream) and see if that works.

 TD

 On Thu, Mar 26, 2015 at 2:08 PM, Ondrej Smola ondrej.sm...@gmail.com
 wrote:

 Hi thanks for reply,

 yes I have custom receiver - but it has simple logic .. pop ids from
 redis queue - load docs based on ids from elastic and store them in 
 spark.
 No classloader modifications. I am running multiple Spark batch jobs 
 (with
 user supplied partitioning) and they have no problems, debug in local 
 mode
 show no errors.

 2015-03-26 21:47 GMT+01:00 Tathagata Das t...@databricks.com:

 Here are few steps to debug.

 1. Try using replication from a Spark job: sc.parallelize(1 to 100,
 100).persist(StorageLevel.MEMORY_ONLY_2).count()
 2. If one works, then we know that there is probably nothing wrong
 with the Spark installation, and probably in the threads related to the
 receivers receiving the data. Are you writing a custom receiver? Are you
 somehow playing around with the class loader in the custom receiver?

 TD


 On Thu, Mar 26, 2015 at 10:59 AM, Ondrej Smola 
 ondrej.sm...@gmail.com wrote:

 Hi,

 I am running spark streaming v 1.3.0 (running inside Docker) on
 Mesos 0.21.1. Spark streaming is started using Marathon - docker 
 container
 gets deployed and starts streaming (from custom Actor). Spark binary is
 located on shared GlusterFS volume. Data is streamed from
 Elasticsearch/Redis. When new batch arrives Spark tries to replicate 
 it but
 fails with following error :

 15/03/26 14:50:00 INFO MemoryStore: Block broadcast_0 of size 2840
 dropped from memory (free 278017782)
 15/03/26 14:50:00 INFO BlockManager: Removing block
 broadcast_0_piece0
 15/03/26 14:50:00 INFO MemoryStore: Block broadcast_0_piece0 of
 size 1658 dropped from memory (free 278019440)
 15/03/26 14:50:00 INFO BlockManagerMaster: Updated info of block
 broadcast_0_piece0
 15/03/26 14:50:00 ERROR TransportRequestHandler: Error while
 invoking RpcHandler#receive() on RPC id 7178767328921933569
 java.lang.ClassNotFoundException:
 org/apache/spark/storage/StorageLevel
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:344)
 at
 org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:65)
 at
 java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
 at
 java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
 at
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:68)
 at
 org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:88)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:65)
 at
 org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124)
 at
 org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97)
 at
 org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
 at
 org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
 at
 io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
 at
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
 at
 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
 at
 io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103

Re: foreachRDD execution

2015-03-26 Thread Tathagata Das
Yes, that is the correct understanding. There are undocumented parameters
that allow that, but I do not recommend using those :)

TD

On Wed, Mar 25, 2015 at 6:57 AM, Luis Ángel Vicente Sánchez 
langel.gro...@gmail.com wrote:

 I have a simple and probably dumb question about foreachRDD.

 We are using spark streaming + cassandra to compute concurrent users every
 5min. Our batch size is 10secs and our block interval is 2.5secs.

 At the end of the world we are using foreachRDD to join the data in the
 RDD with existing data in Cassandra, update the counters and then save it
 back to Cassandra.

 To the best of my understanding, in this scenario, spark streaming
 produces one RDD every 10secs and foreachRDD executes them sequentially,
 that is, foreachRDD would never run in parallel.

 Am I right?

 Regards,

 Luis





Re: MappedStream vs Transform API

2015-03-17 Thread Tathagata Das
That's not super essential, and hence hasn't been done till now. Even in
core Spark there are MappedRDD, etc. even though all of them can be
implemented by MapPartitionedRDD (may be the name is wrong). So its nice to
maintain the consistency, MappedDStream creates MappedRDDs. :)
Though this does not eliminate the possibility that we will do it. Maybe in
future, if we find that maintaining these different DStreams is becoming a
maintenance burden (its isn't yet), we may collapse them to use transform.
We did so in the python API for exactly this reason.

If you are interested in contributing to Spark Streaming, i can point you
to a number of issues where your contributions will be more valuable.

TD

On Tue, Mar 17, 2015 at 1:56 AM, madhu phatak phatak@gmail.com wrote:

 Hi,
  Thank you for the  response.

  Can I give a PR to use transform for all the functions like map,flatMap
 etc so they are consistent with other API's?.

 Regards,
 Madhukara Phatak
 http://datamantra.io/

 On Mon, Mar 16, 2015 at 11:42 PM, Tathagata Das t...@databricks.com
 wrote:

 It's mostly for legacy reasons. First we had added all the MappedDStream,
 etc. and then later we realized we need to expose something that is more
 generic for arbitrary RDD-RDD transformations. It can be easily replaced.
 However, there is a slight value in having MappedDStream, for developers to
 learn about DStreams.

 TD

 On Mon, Mar 16, 2015 at 3:37 AM, madhu phatak phatak@gmail.com
 wrote:

 Hi,
  Thanks for the response. I understand that part. But I am asking why
 the internal implementation using a subclass when it can use an existing
 api? Unless there is a real difference, it feels like code smell to me.


 Regards,
 Madhukara Phatak
 http://datamantra.io/

 On Mon, Mar 16, 2015 at 2:14 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  I think these two ways are both OK for you to write streaming job,
 `transform` is a more general way for you to transform from one DStream to
 another if there’s no related DStream API (but have related RDD API). But
 using map maybe more straightforward and easy to understand.



 Thanks

 Jerry



 *From:* madhu phatak [mailto:phatak@gmail.com]
 *Sent:* Monday, March 16, 2015 4:32 PM
 *To:* user@spark.apache.org
 *Subject:* MappedStream vs Transform API



 Hi,

   Current implementation of map function in spark streaming looks as
 below.



   *def *map[U: ClassTag](mapFunc: T = U): DStream[U] = {

   *new *MappedDStream(*this*, context.sparkContext.clean(mapFunc))
 }

  It creates an instance of MappedDStream which is a subclass of
 DStream.



 The same function can be also implemented using transform API



 *def map*[U: ClassTag](mapFunc: T = U): DStream[U] =

 this.transform(rdd = {

   rdd.map(mapFunc)
 })



 Both implementation looks same. If they are same, is there any
 advantage having a subclass of DStream?. Why can't we just use transform
 API?





 Regards,
 Madhukara Phatak
 http://datamantra.io/







Re: why generateJob is a private API?

2015-03-16 Thread Tathagata Das
It was not really meant to be pubic and overridden. Because anything you
want to do to generate jobs from RDDs can be done using DStream.foreachRDD

On Sun, Mar 15, 2015 at 11:14 PM, madhu phatak phatak@gmail.com wrote:

 Hi,
  I am trying to create a simple subclass of DStream.  If I understand
 correctly, I should override *compute *lazy operations and *generateJob*
 for actions. But when I try to override, generateJob it gives error saying
 method is private to the streaming package. Is my approach is correct or am
 I  missing something?


 Regards,
 Madhukara Phatak
 http://datamantra.io/



Re: MappedStream vs Transform API

2015-03-16 Thread Tathagata Das
It's mostly for legacy reasons. First we had added all the MappedDStream,
etc. and then later we realized we need to expose something that is more
generic for arbitrary RDD-RDD transformations. It can be easily replaced.
However, there is a slight value in having MappedDStream, for developers to
learn about DStreams.

TD

On Mon, Mar 16, 2015 at 3:37 AM, madhu phatak phatak@gmail.com wrote:

 Hi,
  Thanks for the response. I understand that part. But I am asking why the
 internal implementation using a subclass when it can use an existing api?
 Unless there is a real difference, it feels like code smell to me.


 Regards,
 Madhukara Phatak
 http://datamantra.io/

 On Mon, Mar 16, 2015 at 2:14 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  I think these two ways are both OK for you to write streaming job,
 `transform` is a more general way for you to transform from one DStream to
 another if there’s no related DStream API (but have related RDD API). But
 using map maybe more straightforward and easy to understand.



 Thanks

 Jerry



 *From:* madhu phatak [mailto:phatak@gmail.com]
 *Sent:* Monday, March 16, 2015 4:32 PM
 *To:* user@spark.apache.org
 *Subject:* MappedStream vs Transform API



 Hi,

   Current implementation of map function in spark streaming looks as
 below.



   *def *map[U: ClassTag](mapFunc: T = U): DStream[U] = {

   *new *MappedDStream(*this*, context.sparkContext.clean(mapFunc))
 }

  It creates an instance of MappedDStream which is a subclass of DStream.



 The same function can be also implemented using transform API



 *def map*[U: ClassTag](mapFunc: T = U): DStream[U] =

 this.transform(rdd = {

   rdd.map(mapFunc)
 })



 Both implementation looks same. If they are same, is there any advantage
 having a subclass of DStream?. Why can't we just use transform API?





 Regards,
 Madhukara Phatak
 http://datamantra.io/





Re: problems with spark-streaming-kinesis-asl and sbt assembly (different file contents found)

2015-03-16 Thread Tathagata Das
If you are creating an assembly, make sure spark-streaming is marked as
provided. spark-streaming is already part of the spark installation so will
be present at run time. That might solve some of these, may be!?

TD

On Mon, Mar 16, 2015 at 11:30 AM, Kelly, Jonathan jonat...@amazon.com
wrote:

  I'm attempting to use the Spark Kinesis Connector, so I've added the
 following dependency in my build.sbt:

  libraryDependencies += org.apache.spark %%
 spark-streaming-kinesis-asl % 1.3.0

  My app works fine with sbt run, but I can't seem to get sbt assembly
 to work without failing with different file contents found errors due
 to different versions of various packages getting pulled in to the
 assembly.  This only occurs when I've added spark-streaming-kinesis-asl as
 a dependency. sbt assembly works fine otherwise.

  Here are the conflicts that I see:

  com.esotericsoftware.kryo:kryo:2.21
  com.esotericsoftware.minlog:minlog:1.2

  com.google.guava:guava:15.0
  org.apache.spark:spark-network-common_2.10:1.3.0

  (Note: The conflict is with javac.sh; why is this even getting included?)
 org.apache.spark:spark-streaming-kinesis-asl_2.10:1.3.0
 org.apache.spark:spark-streaming_2.10:1.3.0
 org.apache.spark:spark-core_2.10:1.3.0
 org.apache.spark:spark-network-common_2.10:1.3.0
 org.apache.spark:spark-network-shuffle_2.10:1.3.0

  (Note: I'm actually using my own custom-built version of Spark-1.3.0
 where I've upgraded to v1.9.24 of the AWS Java SDK, but that has nothing to
 do with all of these conflicts, as I upgraded the dependency *because* I
 was getting all of these conflicts with the Spark 1.3.0 artifacts from the
 central repo.)
  com.amazonaws:aws-java-sdk-s3:1.9.24
  net.java.dev.jets3t:jets3t:0.9.3

  commons-collections:commons-collections:3.2.1
  commons-beanutils-commons-beanutils:1.7.0
  commons-beanutils:commons-beanutils-core:1.8.0

  commons-logging:commons-logging:1.1.3
 org.slf4j:jcl-over-slf4j:1.7.10

  (Note: The conflict is with a few package-info.class files, which seems
 really silly.)
 org.apache.hadoop:hadoop-yarn-common:2.4.0
 org.apache.hadoop:hadoop-yarn-api:2.4.0

  (Note: The conflict is with org/apache/spark/unused/UnusedStubClass.class,
 which seems even more silly.)
  org.apache.spark:spark-streaming-kinesis-asl_2.10:1.3.0
  org.apache.spark:spark-streaming_2.10:1.3.0
  org.apache.spark:spark-core_2.10:1.3.0
  org.apache.spark:spark-network-common_2.10:1.3.0
  org.spark-project.spark:unused:1.0.0 (?!?!?!)
  org.apache.spark:spark-network-shuffle_2.10:1.3.0

  I can get rid of some of the conflicts by using excludeAll() to exclude
 artifacts with organization = org.apache.hadoop or organization =
 org.apache.spark and name = spark-streaming, and I might be able to
 resolve a few other conflicts this way, but the bottom line is that this is
 way more complicated than it should be, so either something is really
 broken or I'm just doing something wrong.

  Many of these don't even make sense to me.  For example, the very first
 conflict is between classes in com.esotericsoftware.kryo:kryo:2.21 and in
 com.esotericsoftware.minlog:minlog:1.2, but the former *depends* upon the
 latter, so ???  It seems wrong to me that one package would contain
 different versions of the same classes that are included in one of its
 dependencies.  I guess it doesn't make too much difference though if I
 could only get my assembly to include/exclude the right packages.  I of
 course don't want any of the spark or hadoop dependencies included (other
 than spark-streaming-kinesis-asl itself), but I want all of
 spark-streaming-kinesis-asl's dependencies included (such as the AWS Java
 SDK and its dependencies).  That doesn't seem to be possible without what I
 imagine will become an unruly and fragile exclusion list though.


  Thanks,

 Jonathan



Re: problems with spark-streaming-kinesis-asl and sbt assembly (different file contents found)

2015-03-16 Thread Tathagata Das
Can you give use your SBT project? Minus the source codes if you don't wish
to expose them.

TD

On Mon, Mar 16, 2015 at 12:54 PM, Kelly, Jonathan jonat...@amazon.com
wrote:

   Yes, I do have the following dependencies marked as provided:

  libraryDependencies += org.apache.spark %% spark-core % 1.3.0 %
 provided
 libraryDependencies += org.apache.spark %% spark-hive % 1.3.0 %
 provided
 libraryDependencies += org.apache.spark %% spark-sql % 1.3.0 %
 provided
 libraryDependencies += org.apache.spark %% spark-streaming % 1.3.0 %
 provided

  However, spark-streaming-kinesis-asl has a compile time dependency on
 spark-streaming, so I think that causes it and its dependencies to be
 pulled into the assembly.  I expected that simply excluding spark-streaming
 in the spark-streaming-kinesis-asl dependency would solve this problem, but
 it does not.  That is, this doesn't work either:

  libraryDependencies += org.apache.spark %%
 spark-streaming-kinesis-asl % 1.3.0 exclude(org.apache.spark,
 spark-streaming)

  As I mentioned originally, the following solved some but not all
 conflicts:

  libraryDependencies += org.apache.spark %%
 spark-streaming-kinesis-asl % 1.3.0 excludeAll(
   ExclusionRule(organization = org.apache.hadoop),
   ExclusionRule(organization = org.apache.spark, name =
 spark-streaming)
 )

  (Note that ExclusionRule(organization = org.apache.spark) without the
 name attribute does not work because that apparently causes it to exclude
 even spark-streaming-kinesis-asl.)


  Jonathan Kelly

 Elastic MapReduce - SDE

 Port 99 (SEA35) 08.220.C2

   From: Tathagata Das t...@databricks.com
 Date: Monday, March 16, 2015 at 12:45 PM
 To: Jonathan Kelly jonat...@amazon.com
 Cc: user@spark.apache.org user@spark.apache.org
 Subject: Re: problems with spark-streaming-kinesis-asl and sbt assembly
 (different file contents found)

   If you are creating an assembly, make sure spark-streaming is marked as
 provided. spark-streaming is already part of the spark installation so will
 be present at run time. That might solve some of these, may be!?

  TD

 On Mon, Mar 16, 2015 at 11:30 AM, Kelly, Jonathan jonat...@amazon.com
 wrote:

  I'm attempting to use the Spark Kinesis Connector, so I've added the
 following dependency in my build.sbt:

  libraryDependencies += org.apache.spark %%
 spark-streaming-kinesis-asl % 1.3.0

  My app works fine with sbt run, but I can't seem to get sbt
 assembly to work without failing with different file contents found
 errors due to different versions of various packages getting pulled in to
 the assembly.  This only occurs when I've added spark-streaming-kinesis-asl
 as a dependency. sbt assembly works fine otherwise.

  Here are the conflicts that I see:

  com.esotericsoftware.kryo:kryo:2.21
 com.esotericsoftware.minlog:minlog:1.2

  com.google.guava:guava:15.0
 org.apache.spark:spark-network-common_2.10:1.3.0

  (Note: The conflict is with javac.sh; why is this even getting
 included?)
 org.apache.spark:spark-streaming-kinesis-asl_2.10:1.3.0
 org.apache.spark:spark-streaming_2.10:1.3.0
 org.apache.spark:spark-core_2.10:1.3.0
 org.apache.spark:spark-network-common_2.10:1.3.0
 org.apache.spark:spark-network-shuffle_2.10:1.3.0

  (Note: I'm actually using my own custom-built version of Spark-1.3.0
 where I've upgraded to v1.9.24 of the AWS Java SDK, but that has nothing to
 do with all of these conflicts, as I upgraded the dependency *because* I
 was getting all of these conflicts with the Spark 1.3.0 artifacts from the
 central repo.)
 com.amazonaws:aws-java-sdk-s3:1.9.24
 net.java.dev.jets3t:jets3t:0.9.3

  commons-collections:commons-collections:3.2.1
 commons-beanutils-commons-beanutils:1.7.0
 commons-beanutils:commons-beanutils-core:1.8.0

  commons-logging:commons-logging:1.1.3
 org.slf4j:jcl-over-slf4j:1.7.10

  (Note: The conflict is with a few package-info.class files, which seems
 really silly.)
 org.apache.hadoop:hadoop-yarn-common:2.4.0
 org.apache.hadoop:hadoop-yarn-api:2.4.0

  (Note: The conflict is with org/apache/spark/unused/UnusedStubClass.class,
 which seems even more silly.)
 org.apache.spark:spark-streaming-kinesis-asl_2.10:1.3.0
 org.apache.spark:spark-streaming_2.10:1.3.0
 org.apache.spark:spark-core_2.10:1.3.0
 org.apache.spark:spark-network-common_2.10:1.3.0
 org.spark-project.spark:unused:1.0.0 (?!?!?!)
 org.apache.spark:spark-network-shuffle_2.10:1.3.0

  I can get rid of some of the conflicts by using excludeAll() to exclude
 artifacts with organization = org.apache.hadoop or organization =
 org.apache.spark and name = spark-streaming, and I might be able to
 resolve a few other conflicts this way, but the bottom line is that this is
 way more complicated than it should be, so either something is really
 broken or I'm just doing something wrong.

  Many of these don't even make sense to me.  For example, the very first
 conflict is between classes in com.esotericsoftware.kryo:kryo:2.21 and in
 com.esotericsoftware.minlog:minlog:1.2

RE: Handling worker batch processing during driver shutdown

2015-03-13 Thread Tathagata Das
Are you running the code before or after closing the spark context? It must
be after stopping streaming context (without spark context) and before
stopping spark context.

Cc'ing Sean. He may have more insights.
On Mar 13, 2015 11:19 AM, Jose Fernandez jfernan...@sdl.com wrote:

  Thanks, I am using 1.2.0 so it looks like I am affected by the bug you
 described.



 It also appears that the shutdown hook doesn’t work correctly when the
 driver is running in YARN. According to the logs it looks like the
 SparkContext is closed and the code you suggested is never executed and
 fails silently.



 I really appreciate your help, but it looks like I’m back to the drawing
 board on this one.



 *From:* Tathagata Das [mailto:t...@databricks.com]
 *Sent:* Thursday, March 12, 2015 7:53 PM
 *To:* Jose Fernandez
 *Cc:* user@spark.apache.org
 *Subject:* Re: Handling worker batch processing during driver shutdown







 What version of Spark are you using. You may be hitting a known but solved
 bug where the receivers would not get stop signal and (stopGracefully =
 true) would wait for a while for the receivers to stop indefinitely. Try
 setting stopGracefully to false and see if it works.

 This bug should have been solved in spark 1.2.1



 https://issues.apache.org/jira/browse/SPARK-5035



 TD



 On Thu, Mar 12, 2015 at 7:48 PM, Jose Fernandez jfernan...@sdl.com
 wrote:

 Thanks for the reply!



 Theoretically I should be able to do as you suggest as I follow the pool
 design pattern from the documentation, but I don’t seem to be able to run
 any code after .stop() is called.



   override def main(args: Array[String]) {

 // setup

 val ssc = new StreamingContext(sparkConf, Seconds(streamTime))

 val inputStreams = (1 to numReceivers).map(i =
 ssc.receiverStream(custom receiver))

 val messages = ssc.union(inputStreams)



 messages.foreachRDD { rdd =

   rdd.foreachPartition { p =

 val indexer = Indexer.getInstance()



 p.foreach(Indexer.process(_) match {

   case Some(entry) = indexer.index(entry)

   case None =

 })



 Indexer.returnInstance(indexer)

   }

 }



 messages.print()



 sys.ShutdownHookThread {

   logInfo(** Shutdown hook triggered
 **)

   ssc.stop(false, true)

   logInfo(** Shutdown finished **)

   ssc.stop(true)

 }



 ssc.start()

 ssc.awaitTermination()

   }



 The first shutdown log message is always displayed, but the second message
 never does. I’ve tried multiple permutations of the stop function calls and
 even used try/catch around it. I’m running in yarn-cluster mode using Spark
 1.2 on CDH 5.3. I stop the application with yarn application -kill appID.





 *From:* Tathagata Das [mailto:t...@databricks.com]
 *Sent:* Thursday, March 12, 2015 1:29 PM
 *To:* Jose Fernandez
 *Cc:* user@spark.apache.org
 *Subject:* Re: Handling worker batch processing during driver shutdown



 Can you access the batcher directly? Like is there is there a handle to
 get access to the batchers on the executors by running a task on that
 executor? If so, after the streamingContext has been stopped (not the
 SparkContext), then you can use `sc.makeRDD()` to run a dummy task like
 this.



 sc.makeRDD(1 to 1000, 1000).foreach { x =

Batcher.get().flush()

 }



 With large number of tasks and no other jobs running in the system, at
 least one task will run in each executor and therefore will flush the
 batcher.



 TD



 On Thu, Mar 12, 2015 at 12:27 PM, Jose Fernandez jfernan...@sdl.com
 wrote:

 Hi folks,



 I have a shutdown hook in my driver which stops the streaming context
 cleanly. This is great as workers can finish their current processing unit
 before shutting down. Unfortunately each worker contains a batch processor
 which only flushes every X entries. We’re indexing to different indices in
 elasticsearch and using the bulk index request for performance. As far as
 Spark is concerned, once data is added to the batcher it is considered
 processed, so our workers are being shut down with data still in the
 batcher.



 Is there any way to coordinate the shutdown with the workers? I haven’t
 had any luck searching for a solution online. I would appreciate any
 suggestions you may have.



 Thanks :)




   http://www.sdl.com/innovate/sanfran



 SDL PLC confidential, all rights reserved. If you are not the intended
 recipient of this mail SDL requests and requires that you delete it without
 acting upon or copying any of its contents, and we further request that you
 advise us.

 SDL PLC is a public limited company registered in England and Wales.
 Registered number: 02675207.
 Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6
 7DY, UK.



 This message has been scanned for malware by Websense. www.websense.com





 Click here
 https://www.mailcontrol.com/sr

Re: Unable to connect

2015-03-13 Thread Tathagata Das
Are you running the driver program (that is your application process) in
your desktop and trying to run it on the cluster in EC2?
It could very well be a hostname mismatch in some way due to the all the
public hostname, private hostname, private ip, firewall craziness of ec2.
You have to probably try different combinations of hostname and ip. First
of all, you could try to submit applications from with one of nodes inside
ec2 to narrow down the problem. Additionally, see the Configuration page in
the Spark Standalone documentation to configure which hostname is used (for
binding) by Spark Standalone daemons.

TD

On Fri, Mar 13, 2015 at 2:41 PM, Mohit Anchlia mohitanch...@gmail.com
wrote:

 I am running spark streaming standalone in ec2 and I am trying to run
 wordcount example from my desktop. The program is unable to connect to the
 master, in the logs I see, which seems to be an issue with hostname.

 15/03/13 17:37:44 ERROR EndpointWriter: dropping message [class
 akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://
 sparkMaster@54.69.22.4:7077/]] arriving at [akka.tcp://
 sparkMaster@54.69.22.4:7077] inbound addresses are
 [akka.tcp://sparkMaster@ip-10-241-251-232:7077]




Re: Using rdd methods with Dstream

2015-03-13 Thread Tathagata Das
Is the number of top K elements you want to keep small? That is, is K
small? In which case, you can
1.  either do it in the driver on the array
DStream.foreachRDD ( rdd = {
   val topK = rdd.top(K) ;
   // use top K
})

2. Or, you can use the topK to create another RDD using sc.makeRDD

DStream.transform ( rdd = {
   val topK = rdd.top(K) ;
   rdd.context.makeRDD(topK, numPartitions)
})

TD



On Fri, Mar 13, 2015 at 5:58 AM, Laeeq Ahmed laeeqsp...@yahoo.com.invalid
wrote:

 Hi,

 Earlier my code was like follwing but slow due to repartition. I want top
 K of each window in a stream.

 val counts = keyAndValues.map(x =
 math.round(x._3.toDouble)).countByValueAndWindow(Seconds(4), Seconds(4))
 val topCounts = counts.repartition(1).map(_.swap).transform(rdd =
 rdd.sortByKey(false)).map(_.swap).mapPartitions(rdd = rdd.take(10))

 so I thought to use dstream.transform(rdd=rdd.top()) but this return
 Array rather than rdd. I have to perform further steps on topCounts dstream.

 [ERROR]  found   : Array[(Long, Long)]
 [ERROR]  required: org.apache.spark.rdd.RDD[?]
 [ERROR] val topCounts = counts.transform(rdd = rdd.top(10))


 Regards,
 Laeeq


   On Friday, March 13, 2015 1:47 PM, Sean Owen so...@cloudera.com wrote:


 Hm, aren't you able to use the SparkContext here? DStream operations
 happen on the driver. So you can parallelize() the result?

 take() won't work as it's not the same as top()

 On Fri, Mar 13, 2015 at 11:23 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:
  Like this?
 
  dtream.repartition(1).mapPartitions(it = it.take(5))
 
 
 
  Thanks
  Best Regards
 
  On Fri, Mar 13, 2015 at 4:11 PM, Laeeq Ahmed 
 laeeqsp...@yahoo.com.invalid
  wrote:
 
  Hi,
 
  I normally use dstream.transform whenever I need to use methods which
 are
  available in RDD API but not in streaming API. e.g. dstream.transform(x
 =
  x.sortByKey(true))
 
  But there are other RDD methods which return types other than RDD. e.g.
  dstream.transform(x = x.top(5)) top here returns Array.
 
  In the second scenario, how can i return RDD rather than array, so that
 i
  can perform further steps on dstream.
 
  Regards,
  Laeeq

 
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Re: Partitioning

2015-03-13 Thread Tathagata Das
If you want to access the keys in an RDD that is partition by key, then you
can use RDD.mapPartition(), which gives you access to the whole partition
as an iteratorkey, value. You have the option of maintaing the
partitioning information or not by setting the preservePartitioning flag in
mapPartition (see docs). But use it at your own risk. If you modify the
keys, and yet preserve partitioning, the partitioning would not make sense
any more as the hash of the keys have changed.

TD



On Fri, Mar 13, 2015 at 2:26 PM, Mohit Anchlia mohitanch...@gmail.com
wrote:

 I am trying to look for a documentation on partitioning, which I can't
 seem to find. I am looking at spark streaming and was wondering how does it
 partition RDD in a multi node environment. Where are the keys defined that
 is used for partitioning? For instance in below example keys seem to be
 implicit:

 Which one is key and which one is value? Or is it called a flatMap because
 there are no keys?

 // Split each line into words
 JavaDStreamString words = lines.flatMap(
   new FlatMapFunctionString, String() {
 @Override public IterableString call(String x) {
   return Arrays.asList(x.split( ));
 }
   });


 And are Keys available inside of Function2 in case it's required for a
 given use case ?


 JavaPairDStreamString, Integer wordCounts = pairs.reduceByKey(
   new Function2Integer, Integer, Integer() {
 @Override public Integer call(Integer i1, Integer i2) throws Exception
 {
   return i1 + i2;
 }
   });







Re: Partitioning

2015-03-13 Thread Tathagata Das
If you want to learn about how Spark partitions the data based on keys,
here is a recent talk about that
http://www.slideshare.net/databricks/strata-sj-everyday-im-shuffling-tips-for-writing-better-spark-programs?related=1

Of course you can read the original Spark paper
https://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf

On Fri, Mar 13, 2015 at 3:52 PM, Gerard Maas gerard.m...@gmail.com wrote:

 In spark-streaming, the consumers will fetch data and put it into
 'blocks'. Each block becomes a partition of the rdd generated during that
 batch interval.
 The size of each is block controlled by the conf:
 'spark.streaming.blockInterval'. That is, the amount of data the consumer
 can collect in that time.

 The number of  RDD partitions in a streaming interval will be then: batch
 interval/ spark.streaming.blockInterval * # of consumers.

 -kr, Gerard
 On Mar 13, 2015 11:18 PM, Mohit Anchlia mohitanch...@gmail.com wrote:

 I still don't follow how spark is partitioning data in multi node
 environment. Is there a document on how spark does portioning of data. For
 eg: in word count eg how is spark distributing words to multiple nodes?

 On Fri, Mar 13, 2015 at 3:01 PM, Tathagata Das t...@databricks.com
 wrote:

 If you want to access the keys in an RDD that is partition by key, then
 you can use RDD.mapPartition(), which gives you access to the whole
 partition as an iteratorkey, value. You have the option of maintaing the
 partitioning information or not by setting the preservePartitioning flag in
 mapPartition (see docs). But use it at your own risk. If you modify the
 keys, and yet preserve partitioning, the partitioning would not make sense
 any more as the hash of the keys have changed.

 TD



 On Fri, Mar 13, 2015 at 2:26 PM, Mohit Anchlia mohitanch...@gmail.com
 wrote:

 I am trying to look for a documentation on partitioning, which I can't
 seem to find. I am looking at spark streaming and was wondering how does it
 partition RDD in a multi node environment. Where are the keys defined that
 is used for partitioning? For instance in below example keys seem to be
 implicit:

 Which one is key and which one is value? Or is it called a flatMap
 because there are no keys?

 // Split each line into words
 JavaDStreamString words = lines.flatMap(
   new FlatMapFunctionString, String() {
 @Override public IterableString call(String x) {
   return Arrays.asList(x.split( ));
 }
   });


 And are Keys available inside of Function2 in case it's required for a
 given use case ?


 JavaPairDStreamString, Integer wordCounts = pairs.reduceByKey(
   new Function2Integer, Integer, Integer() {
 @Override public Integer call(Integer i1, Integer i2) throws
 Exception {
   return i1 + i2;
 }
   });









Re: Using Neo4j with Apache Spark

2015-03-12 Thread Tathagata Das
What is GraphDatabaseService object that you are using? Instead of creating
them on the driver (outside foreachRDD), can you create them inside the
RDD.foreach?

In general, the right pattern for doing this in the programming guide
http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd

So you should be doing (sorry for writing in scala)

dstream.foreachRDD ((rdd: RDD, time: Time) = {
rdd.foreachPartition(iterator =
// Create GraphDatabaseService object, or fetch it from a pool
of GraphDatabaseService
objects
// Use it to send the whole partition to Neo4j
// Destroy the object or release it to the pool
})


On Thu, Mar 12, 2015 at 1:15 AM, Gautam Bajaj gautam1...@gmail.com wrote:

 Neo4j is running externally. It has nothing to do with Spark processes.

 Basically, the problem is, I'm unable to figure out a way to store output
 of Spark on the database. As Spark Streaming requires Neo4j Core Java API
 to be serializable as well.

 The answer points out to using REST API but their performance is really
 poor when compared to Core Java API :
 http://www.rene-pickhardt.de/get-the-full-neo4j-power-by-using-the-core-java-api-for-traversing-your-graph-data-base-instead-of-cypher-query-language/

 On Thu, Mar 12, 2015 at 5:09 PM, Tathagata Das t...@databricks.com
 wrote:

 Well the answers you got there are correct as well.
 Unfortunately I am not familiar with Neo4j enough to comment any more. Is
 the Neo4j graph database running externally (outside Spark cluster), or
 within the driver process, or on all the executors? Can you clarify that?

 TD


 On Thu, Mar 12, 2015 at 12:58 AM, Gautam Bajaj gautam1...@gmail.com
 wrote:

 Alright, I have also asked this question in StackOverflow:
 http://stackoverflow.com/questions/28896898/using-neo4j-with-apache-spark

 The code there is pretty neat.

 On Thu, Mar 12, 2015 at 4:55 PM, Tathagata Das t...@databricks.com
 wrote:

 I am not sure if you realized but the code snipper it pretty mangled up
 in the email we received. It might be a good idea to put the code in
 pastebin or gist, much much easier for everyone to read.


 On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r gautam1...@gmail.com
 wrote:

 I'm trying to use Neo4j with Apache Spark Streaming but I am finding
 serializability as an issue.

 Basically, I want Apache Spark to parse and bundle my data in real
 time.
 After, the data has been bundled it should be stored in the database,
 Neo4j.
 However, I am getting this error:

 org.apache.spark.SparkException: Task not serializable
 at

 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
 at
 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
 at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
 at

 org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297)
 at
 org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45)
 at twoGrams.Main$4.call(Main.java:102)
 at twoGrams.Main$4.call(Main.java:1)
 at

 org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
 at

 org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
 at

 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
 at

 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at

 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at scala.util.Try$.apply(Try.scala:161)
 at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
 at

 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: java.io.NotSerializableException:
 org.neo4j.kernel.EmbeddedGraphDatabase
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
 at

 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at

 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at

 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at

 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at

 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547

Re: Using Neo4j with Apache Spark

2015-03-12 Thread Tathagata Das
I am not sure if you realized but the code snipper it pretty mangled up in
the email we received. It might be a good idea to put the code in pastebin
or gist, much much easier for everyone to read.


On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r gautam1...@gmail.com wrote:

 I'm trying to use Neo4j with Apache Spark Streaming but I am finding
 serializability as an issue.

 Basically, I want Apache Spark to parse and bundle my data in real time.
 After, the data has been bundled it should be stored in the database,
 Neo4j.
 However, I am getting this error:

 org.apache.spark.SparkException: Task not serializable
 at

 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
 at
 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
 at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
 at
 org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297)
 at org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45)
 at twoGrams.Main$4.call(Main.java:102)
 at twoGrams.Main$4.call(Main.java:1)
 at

 org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
 at

 org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
 at

 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
 at

 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at

 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at scala.util.Try$.apply(Try.scala:161)
 at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
 at

 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: java.io.NotSerializableException:
 org.neo4j.kernel.EmbeddedGraphDatabase
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
 at

 org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
 at

 org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
 at

 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
 ... 17 more
 Here is my code:

 output a stream of type: JavaPairDStreamString, ArrayListlt;String

 output.foreachRDD(
 new
 Function2JavaPairRDDlt;String,ArrayListlt;String,Time,Void(){

 @Override
 public Void call(
 JavaPairRDDString, ArrayListlt;String arg0,
 Time arg1) throws Exception {
 // TODO Auto-generated method stub

 arg0.foreach(
 new
 VoidFunctionTuple2lt;String,ArrayListlt;String(){

 @Override
 public void call(
 Tuple2String,
 ArrayListlt;String arg0)
 throws Exception {
 // TODO Auto-generated method stub
 try( Transaction tx =
 graphDB.beginTx()){

 if(Neo4jOperations.getHMacFromValue(graphDB, arg0._1)!=null)
 System.out.println(Alread
 in Database: + arg0._1);
 else{

 Neo4jOperations.createHMac(graphDB, arg0._1);
 }
 

Re: Using Neo4j with Apache Spark

2015-03-12 Thread Tathagata Das
Well the answers you got there are correct as well.
Unfortunately I am not familiar with Neo4j enough to comment any more. Is
the Neo4j graph database running externally (outside Spark cluster), or
within the driver process, or on all the executors? Can you clarify that?

TD


On Thu, Mar 12, 2015 at 12:58 AM, Gautam Bajaj gautam1...@gmail.com wrote:

 Alright, I have also asked this question in StackOverflow:
 http://stackoverflow.com/questions/28896898/using-neo4j-with-apache-spark

 The code there is pretty neat.

 On Thu, Mar 12, 2015 at 4:55 PM, Tathagata Das t...@databricks.com
 wrote:

 I am not sure if you realized but the code snipper it pretty mangled up
 in the email we received. It might be a good idea to put the code in
 pastebin or gist, much much easier for everyone to read.


 On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r gautam1...@gmail.com
 wrote:

 I'm trying to use Neo4j with Apache Spark Streaming but I am finding
 serializability as an issue.

 Basically, I want Apache Spark to parse and bundle my data in real time.
 After, the data has been bundled it should be stored in the database,
 Neo4j.
 However, I am getting this error:

 org.apache.spark.SparkException: Task not serializable
 at

 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
 at
 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
 at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
 at

 org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297)
 at
 org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45)
 at twoGrams.Main$4.call(Main.java:102)
 at twoGrams.Main$4.call(Main.java:1)
 at

 org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
 at

 org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
 at

 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
 at

 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at

 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at scala.util.Try$.apply(Try.scala:161)
 at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
 at

 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: java.io.NotSerializableException:
 org.neo4j.kernel.EmbeddedGraphDatabase
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
 at

 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at

 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at

 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at
 java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
 at

 org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
 at

 org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
 at

 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
 ... 17 more
 Here is my code:

 output a stream of type: JavaPairDStreamString, ArrayListlt;String

 output.foreachRDD(
 new
 Function2JavaPairRDDlt;String,ArrayListlt;String,Time,Void(){

 @Override
 public Void call(
 JavaPairRDDString, ArrayListlt;String
 arg0,
 Time arg1) throws Exception {
 // TODO Auto-generated method stub

 arg0.foreach(
 new
 VoidFunctionTuple2lt;String,ArrayListlt;String(){

 @Override
 public void call(
 Tuple2String

Re: Using Neo4j with Apache Spark

2015-03-12 Thread Tathagata Das
Well, that's why I had also suggested using a pool of the GraphDBService
objects :)
Also present in the programming guide link I had given.

TD


On Thu, Mar 12, 2015 at 7:38 PM, Gautam Bajaj gautam1...@gmail.com wrote:

 Thanks a ton! That worked.

 However, this may have performance issue. As for each partition, I'd need
 to restart the server, that was the basic reason I was creating graphDb
 object outside this loop.

 On Fri, Mar 13, 2015 at 5:34 AM, Tathagata Das t...@databricks.com
 wrote:

 (Putting user@spark back in the to list)

 In the gist, you are creating graphDB object way outside the
 RDD.foreachPartition. I said last time, create the graphDB object inside
 the RDD.foreachPartition. You are creating it outside DStream.foreachRDD,
 and then using it from inside the rdd.foreachPartition. That is bringing
 the graphDB object in the task closure, and hence the system is trying to
 serialize the graphDB object when its serializing the closure. If you
 create the graphDB object inside the RDD.foreachPartition, then the closure
 will not refer to any prior graphDB object and therefore not serialize
 anything.

 On Thu, Mar 12, 2015 at 3:46 AM, Gautam Bajaj gautam1...@gmail.com
 wrote:

 Here: https://gist.github.com/d34th4ck3r/0c99d1e9fa288e0cc8ab

 I'll add the flag and send you stack trace, I have meetings now.

 On Thu, Mar 12, 2015 at 6:28 PM, Tathagata Das t...@databricks.com
 wrote:

 Could you show us that version of the code?

 Also helps to turn on java flag of extended debug info. That will show
 the lineage of objects leading to the nonserilaizable one.
 On Mar 12, 2015 1:32 AM, Gautam Bajaj gautam1...@gmail.com wrote:

 I tried that too. It result in same serializability issue.

 GraphDatabaseSerive that I'm using is : GraphDatabaseFactory() :
 http://neo4j.com/api_docs/2.0.0/org/neo4j/graphdb/factory/GraphDatabaseFactory.html

 On Thu, Mar 12, 2015 at 5:21 PM, Tathagata Das t...@databricks.com
 wrote:

 What is GraphDatabaseService object that you are using? Instead of
 creating them on the driver (outside foreachRDD), can you create them
 inside the RDD.foreach?

 In general, the right pattern for doing this in the programming guide

 http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd

 So you should be doing (sorry for writing in scala)

 dstream.foreachRDD ((rdd: RDD, time: Time) = {
 rdd.foreachPartition(iterator =
 // Create GraphDatabaseService object, or fetch it from a
 pool of GraphDatabaseService objects
 // Use it to send the whole partition to Neo4j
 // Destroy the object or release it to the pool
 })


 On Thu, Mar 12, 2015 at 1:15 AM, Gautam Bajaj gautam1...@gmail.com
 wrote:

 Neo4j is running externally. It has nothing to do with Spark
 processes.

 Basically, the problem is, I'm unable to figure out a way to store
 output of Spark on the database. As Spark Streaming requires Neo4j Core
 Java API to be serializable as well.

 The answer points out to using REST API but their performance is
 really poor when compared to Core Java API :
 http://www.rene-pickhardt.de/get-the-full-neo4j-power-by-using-the-core-java-api-for-traversing-your-graph-data-base-instead-of-cypher-query-language/

 On Thu, Mar 12, 2015 at 5:09 PM, Tathagata Das t...@databricks.com
 wrote:

 Well the answers you got there are correct as well.
 Unfortunately I am not familiar with Neo4j enough to comment any
 more. Is the Neo4j graph database running externally (outside Spark
 cluster), or within the driver process, or on all the executors? Can 
 you
 clarify that?

 TD


 On Thu, Mar 12, 2015 at 12:58 AM, Gautam Bajaj 
 gautam1...@gmail.com wrote:

 Alright, I have also asked this question in StackOverflow:
 http://stackoverflow.com/questions/28896898/using-neo4j-with-apache-spark

 The code there is pretty neat.

 On Thu, Mar 12, 2015 at 4:55 PM, Tathagata Das 
 t...@databricks.com wrote:

 I am not sure if you realized but the code snipper it pretty
 mangled up in the email we received. It might be a good idea to put 
 the
 code in pastebin or gist, much much easier for everyone to read.


 On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r 
 gautam1...@gmail.com wrote:

 I'm trying to use Neo4j with Apache Spark Streaming but I am
 finding
 serializability as an issue.

 Basically, I want Apache Spark to parse and bundle my data in
 real time.
 After, the data has been bundled it should be stored in the
 database, Neo4j.
 However, I am getting this error:

 org.apache.spark.SparkException: Task not serializable
 at

 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
 at
 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
 at
 org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
 at

 org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297)
 at
 org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45

Re: Handling worker batch processing during driver shutdown

2015-03-12 Thread Tathagata Das
What version of Spark are you using. You may be hitting a known but solved
bug where the receivers would not get stop signal and (stopGracefully =
true) would wait for a while for the receivers to stop indefinitely. Try
setting stopGracefully to false and see if it works.
This bug should have been solved in spark 1.2.1

https://issues.apache.org/jira/browse/SPARK-5035

TD

On Thu, Mar 12, 2015 at 7:48 PM, Jose Fernandez jfernan...@sdl.com wrote:

  Thanks for the reply!



 Theoretically I should be able to do as you suggest as I follow the pool
 design pattern from the documentation, but I don’t seem to be able to run
 any code after .stop() is called.



   override def main(args: Array[String]) {

 // setup

 val ssc = new StreamingContext(sparkConf, Seconds(streamTime))

 val inputStreams = (1 to numReceivers).map(i =
 ssc.receiverStream(custom receiver))

 val messages = ssc.union(inputStreams)



 messages.foreachRDD { rdd =

   rdd.foreachPartition { p =

 val indexer = Indexer.getInstance()



 p.foreach(Indexer.process(_) match {

   case Some(entry) = indexer.index(entry)

   case None =

 })



 Indexer.returnInstance(indexer)

   }

 }



 messages.print()



 sys.ShutdownHookThread {

   logInfo(** Shutdown hook triggered
 **)

   ssc.stop(false, true)

   logInfo(** Shutdown finished **)

   ssc.stop(true)

 }



 ssc.start()

 ssc.awaitTermination()

   }



 The first shutdown log message is always displayed, but the second message
 never does. I’ve tried multiple permutations of the stop function calls and
 even used try/catch around it. I’m running in yarn-cluster mode using Spark
 1.2 on CDH 5.3. I stop the application with yarn application -kill appID.





 *From:* Tathagata Das [mailto:t...@databricks.com]
 *Sent:* Thursday, March 12, 2015 1:29 PM
 *To:* Jose Fernandez
 *Cc:* user@spark.apache.org
 *Subject:* Re: Handling worker batch processing during driver shutdown



 Can you access the batcher directly? Like is there is there a handle to
 get access to the batchers on the executors by running a task on that
 executor? If so, after the streamingContext has been stopped (not the
 SparkContext), then you can use `sc.makeRDD()` to run a dummy task like
 this.



 sc.makeRDD(1 to 1000, 1000).foreach { x =

Batcher.get().flush()

 }



 With large number of tasks and no other jobs running in the system, at
 least one task will run in each executor and therefore will flush the
 batcher.



 TD



 On Thu, Mar 12, 2015 at 12:27 PM, Jose Fernandez jfernan...@sdl.com
 wrote:

 Hi folks,



 I have a shutdown hook in my driver which stops the streaming context
 cleanly. This is great as workers can finish their current processing unit
 before shutting down. Unfortunately each worker contains a batch processor
 which only flushes every X entries. We’re indexing to different indices in
 elasticsearch and using the bulk index request for performance. As far as
 Spark is concerned, once data is added to the batcher it is considered
 processed, so our workers are being shut down with data still in the
 batcher.



 Is there any way to coordinate the shutdown with the workers? I haven’t
 had any luck searching for a solution online. I would appreciate any
 suggestions you may have.



 Thanks :)




   http://www.sdl.com/innovate/sanfran



 SDL PLC confidential, all rights reserved. If you are not the intended
 recipient of this mail SDL requests and requires that you delete it without
 acting upon or copying any of its contents, and we further request that you
 advise us.

 SDL PLC is a public limited company registered in England and Wales.
 Registered number: 02675207.
 Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6
 7DY, UK.



  This message has been scanned for malware by Websense. www.websense.com





 Click here
 https://www.mailcontrol.com/sr/XpJmGbjdkhnGX2PQPOmvUq10po4Wab0lISUyc4KGaMSKtJYdrvwK9eA2sGsCtyo!dlcWvDplrZSU8yB5sLY89Q==
 to report this email as spam.

   http://www.sdl.com/innovate/sanfran

   SDL PLC confidential, all rights reserved. If you are not the intended
 recipient of this mail SDL requests and requires that you delete it without
 acting upon or copying any of its contents, and we further request that you
 advise us.

 SDL PLC is a public limited company registered in England and Wales.
 Registered number: 02675207.
 Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6
 7DY, UK.



Re: Writing to a single file from multiple executors

2015-03-12 Thread Tathagata Das
If you use DStream.saveAsHadoopFiles (or equivalent RDD ops) with the
appropriate output format (for Avro) then each partition of the RDDs will
be written to a different file. However there is probably going to be a
large number of small files and you may have to run a separate compaction
phase to coalesce them into larger files.
On Mar 12, 2015 9:47 AM, Maiti, Samya samya.ma...@philips.com wrote:

  Hi TD,

  I want to append my record to a AVRO file which will be later used for
 querying.

  Having a single file is not mandatory for us but then how can we make
 the executors append the AVRO data to multiple files.

  Thanks,
 Sam
  On Mar 12, 2015, at 4:09 AM, Tathagata Das t...@databricks.com wrote:

  Why do you have to write a single file?



 On Wed, Mar 11, 2015 at 1:00 PM, SamyaMaiti samya.maiti2...@gmail.com
 wrote:

 Hi Experts,

 I have a scenario, where in I want to write to a avro file from a
 streaming
 job that reads data from kafka.

 But the issue is, as there are multiple executors and when all try to
 write
 to a given file I get a concurrent exception.

 I way to mitigate the issue is to repartition  have a single writer task,
 but as my data is huge that is not a feasible option.

 Any suggestions welcomed.

 Regards,
 Sam



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Writing-to-a-single-file-from-multiple-executors-tp22003.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




 --
 The information contained in this message may be confidential and legally
 protected under applicable law. The message is intended solely for the
 addressee(s). If you are not the intended recipient, you are hereby
 notified that any use, forwarding, dissemination, or reproduction of this
 message is strictly prohibited and may be unlawful. If you are not the
 intended recipient, please contact the sender by return e-mail and destroy
 all copies of the original message.



Re: Handling worker batch processing during driver shutdown

2015-03-12 Thread Tathagata Das
Can you access the batcher directly? Like is there is there a handle to get
access to the batchers on the executors by running a task on that executor?
If so, after the streamingContext has been stopped (not the SparkContext),
then you can use `sc.makeRDD()` to run a dummy task like this.

sc.makeRDD(1 to 1000, 1000).foreach { x =
   Batcher.get().flush()
}

With large number of tasks and no other jobs running in the system, at
least one task will run in each executor and therefore will flush the
batcher.

TD

On Thu, Mar 12, 2015 at 12:27 PM, Jose Fernandez jfernan...@sdl.com wrote:

  Hi folks,



 I have a shutdown hook in my driver which stops the streaming context
 cleanly. This is great as workers can finish their current processing unit
 before shutting down. Unfortunately each worker contains a batch processor
 which only flushes every X entries. We’re indexing to different indices in
 elasticsearch and using the bulk index request for performance. As far as
 Spark is concerned, once data is added to the batcher it is considered
 processed, so our workers are being shut down with data still in the
 batcher.



 Is there any way to coordinate the shutdown with the workers? I haven’t
 had any luck searching for a solution online. I would appreciate any
 suggestions you may have.



 Thanks :)



   http://www.sdl.com/innovate/sanfran

   SDL PLC confidential, all rights reserved. If you are not the intended
 recipient of this mail SDL requests and requires that you delete it without
 acting upon or copying any of its contents, and we further request that you
 advise us.

 SDL PLC is a public limited company registered in England and Wales.
 Registered number: 02675207.
 Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6
 7DY, UK.


 This message has been scanned for malware by Websense. www.websense.com



Re: Efficient Top count in each window

2015-03-12 Thread Tathagata Das
Why are you repartitioning 1? That would obviously be slow, you are
converting a distributed operation to a single node operation.
Also consider using RDD.top(). If you define the ordering right (based on
the count), then you will get top K across then without doing a shuffle for
sortByKey. Much cheaper.

TD



On Thu, Mar 12, 2015 at 11:06 AM, Laeeq Ahmed laeeqsp...@yahoo.com.invalid
wrote:

 Hi,

 I have a streaming application where am doing top 10 count in each window
 which seems slow. Is there efficient way to do this.

 val counts = keyAndValues.map(x =
 math.round(x._3.toDouble)).countByValueAndWindow(Seconds(4), Seconds(4))
 val topCounts = counts.repartition(1).map(_.swap).transform(rdd =
 rdd.sortByKey(false)).map(_.swap).mapPartitions(rdd = rdd.take(10))

 Regards,
 Laeeq




Re: Writing to a single file from multiple executors

2015-03-11 Thread Tathagata Das
Why do you have to write a single file?



On Wed, Mar 11, 2015 at 1:00 PM, SamyaMaiti samya.maiti2...@gmail.com
wrote:

 Hi Experts,

 I have a scenario, where in I want to write to a avro file from a streaming
 job that reads data from kafka.

 But the issue is, as there are multiple executors and when all try to write
 to a given file I get a concurrent exception.

 I way to mitigate the issue is to repartition  have a single writer task,
 but as my data is huge that is not a feasible option.

 Any suggestions welcomed.

 Regards,
 Sam



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Writing-to-a-single-file-from-multiple-executors-tp22003.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark Streaming recover from Checkpoint with Spark SQL

2015-03-11 Thread Tathagata Das
Can you show us the code that you are using?

This might help. This is the updated streaming programming guide for 1.3,
soon to be up, this is a quick preview.
http://people.apache.org/~tdas/spark-1.3.0-temp-docs/streaming-programming-guide.html#dataframe-and-sql-operations

TD

On Wed, Mar 11, 2015 at 12:20 PM, Marius Soutier mps@gmail.com wrote:

 Forgot to mention, it works when using .foreachRDD(_.saveAsTextFile(“”)).

  On 11.03.2015, at 18:35, Marius Soutier mps@gmail.com wrote:
 
  Hi,
 
  I’ve written a Spark Streaming Job that inserts into a Parquet, using
 stream.foreachRDD(_insertInto(“table”, overwrite = true). Now I’ve added
 checkpointing; everything works fine when starting from scratch. When
 starting from a checkpoint however, the job doesn’t work and produces the
 following exception in the foreachRDD:
 
  ERROR org.apache.spark.streaming.scheduler.JobScheduler: Error running
 job streaming job 142609383 ms.2
  org.apache.spark.SparkException: RDD transformations and actions can
 only be invoked by the driver, not inside of other transformations; for
 example, rdd1.map(x = rdd2.values.count() * x) is invalid because the
 values transformation and count action cannot be performed inside of the
 rdd1.map transformation. For more information, see SPARK-5063.
at org.apache.spark.rdd.RDD.sc(RDD.scala:90)
at org.apache.spark.rdd.RDD.init(RDD.scala:143)
at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:108)
at
 org.apache.spark.sql.SQLContext.createSchemaRDD(SQLContext.scala:114)
at
 MyStreamingJob$$anonfun$createComputation$3.apply(MyStreamingJob:167)
at
 MyStreamingJob$$anonfun$createComputation$3.apply(MyStreamingJob:167)
at
 org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:535)
at
 org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:535)
at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 
 
 
 
  Cheers
  - Marius
 


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Compilation error

2015-03-10 Thread Tathagata Das
If you are using tools like SBT/Maven/Gradle/etc, they figure out all the
recursive dependencies and includes them in the class path. I haven't
touched Eclipse in years so I am not sure off the top of my head what's
going on instead. Just in case you only downloaded the
spark-streaming_2.10.jar  then that is indeed insufficient and you have to
download all the recursive dependencies. May be you should create a Maven
project inside Eclipse?

TD

On Tue, Mar 10, 2015 at 11:00 AM, Mohit Anchlia mohitanch...@gmail.com
wrote:

 How do I do that? I haven't used Scala before.

 Also, linking page doesn't mention that:

 http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html#linking

 On Tue, Mar 10, 2015 at 10:57 AM, Sean Owen so...@cloudera.com wrote:

 It means you do not have Scala library classes in your project classpath.

 On Tue, Mar 10, 2015 at 5:54 PM, Mohit Anchlia mohitanch...@gmail.com
 wrote:
  I am trying out streaming example as documented and I am using spark
 1.2.1
  streaming from maven for Java.
 
  When I add this code I get compilation error on and eclipse is not able
 to
  recognize Tuple2. I also don't see any import scala.Tuple2 class.
 
 
 
 http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html#a-quick-example
 
 
  private void map(JavaReceiverInputDStreamString lines) {
 
  JavaDStreamString words = lines.flatMap(
 
  new FlatMapFunctionString, String() {
 
  @Override public IterableString call(String x) {
 
  return Arrays.asList(x.split( ));
 
  }
 
  });
 
  // Count each word in each batch
 
  JavaPairDStreamString, Integer pairs = words.map(
 
  new PairFunctionString, String, Integer() {
 
  @Override public Tuple2String, Integer call(String s) throws
 Exception {
 
  return new Tuple2String, Integer(s, 1);
 
  }
 
  });
 
  }





Re: Compilation error

2015-03-10 Thread Tathagata Das
You have to include Scala libraries in the Eclipse dependencies.

TD

On Tue, Mar 10, 2015 at 10:54 AM, Mohit Anchlia mohitanch...@gmail.com
wrote:

 I am trying out streaming example as documented and I am using spark 1.2.1
 streaming from maven for Java.

 When I add this code I get compilation error on and eclipse is not able to
 recognize Tuple2. I also don't see any import scala.Tuple2 class.



 http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html#a-quick-example


 *private* *void* map(JavaReceiverInputDStreamString lines) {

 JavaDStreamString words = lines.flatMap(

 *new* *FlatMapFunctionString, String()* {

 @Override *public* IterableString call(String x) {

 *return* Arrays.*asList*(x.split( ));

 }

 });

  // Count each word in each batch

 JavaPairDStreamString, Integer pairs = words.*map*(

 *new* *PairFunctionString, String, Integer()* {

 @Override *public* *Tuple2*String, Integer call(String s) *throws*
 Exception {

 *return* *new* *Tuple2*String, Integer(s, 1);

 }

 });

  }



Re: Why spark master consumes 100% CPU when we kill a spark streaming app?

2015-03-10 Thread Tathagata Das
Do you have event logging enabled?
That could be the problem. The Master tries to aggressively recreate the
web ui of the completed job with the event logs (when it is enabled)
causing the Master to stall.
I created a JIRA for this.
https://issues.apache.org/jira/browse/SPARK-6270

On Tue, Mar 10, 2015 at 7:10 PM, Xuelin Cao xuelincao2...@gmail.com wrote:


 Hey,

  Recently, we found in our cluster, that when we kill a spark
 streaming app, the whole cluster cannot response for 10 minutes.

  And, we investigate the master node, and found the master process
 consumes 100% CPU when we kill the spark streaming app.

  How could it happen? Did anyone had the similar problem before?







Re: Compilation error

2015-03-10 Thread Tathagata Das
See if you can import scala libraries in your project.

On Tue, Mar 10, 2015 at 11:32 AM, Mohit Anchlia mohitanch...@gmail.com
wrote:

 I am using maven and my dependency looks like this, but this doesn't seem
 to be working

  dependencies

 dependency

 groupIdorg.apache.spark/groupId

 artifactIdspark-streaming_2.10/artifactId

 version1.2.0/version

 /dependency

 dependency

 groupIdorg.apache.spark/groupId

 artifactIdspark-core_2.10/artifactId

 version1.2.1/version

 /dependency

 /dependencies

 On Tue, Mar 10, 2015 at 11:06 AM, Tathagata Das t...@databricks.com
 wrote:

 If you are using tools like SBT/Maven/Gradle/etc, they figure out all the
 recursive dependencies and includes them in the class path. I haven't
 touched Eclipse in years so I am not sure off the top of my head what's
 going on instead. Just in case you only downloaded the
 spark-streaming_2.10.jar  then that is indeed insufficient and you have to
 download all the recursive dependencies. May be you should create a Maven
 project inside Eclipse?

 TD

 On Tue, Mar 10, 2015 at 11:00 AM, Mohit Anchlia mohitanch...@gmail.com
 wrote:

 How do I do that? I haven't used Scala before.

 Also, linking page doesn't mention that:


 http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html#linking

 On Tue, Mar 10, 2015 at 10:57 AM, Sean Owen so...@cloudera.com wrote:

 It means you do not have Scala library classes in your project
 classpath.

 On Tue, Mar 10, 2015 at 5:54 PM, Mohit Anchlia mohitanch...@gmail.com
 wrote:
  I am trying out streaming example as documented and I am using spark
 1.2.1
  streaming from maven for Java.
 
  When I add this code I get compilation error on and eclipse is not
 able to
  recognize Tuple2. I also don't see any import scala.Tuple2 class.
 
 
 
 http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html#a-quick-example
 
 
  private void map(JavaReceiverInputDStreamString lines) {
 
  JavaDStreamString words = lines.flatMap(
 
  new FlatMapFunctionString, String() {
 
  @Override public IterableString call(String x) {
 
  return Arrays.asList(x.split( ));
 
  }
 
  });
 
  // Count each word in each batch
 
  JavaPairDStreamString, Integer pairs = words.map(
 
  new PairFunctionString, String, Integer() {
 
  @Override public Tuple2String, Integer call(String s) throws
 Exception {
 
  return new Tuple2String, Integer(s, 1);
 
  }
 
  });
 
  }







Re: Spark Streaming input data source list

2015-03-09 Thread Tathagata Das
Spark Streaming has StreamingContext.socketStream()
http://spark.apache.org/docs/1.2.1/api/java/org/apache/spark/streaming/StreamingContext.html#socketStream(java.lang.String,
int, scala.Function1, org.apache.spark.storage.StorageLevel,
scala.reflect.ClassTag)

TD

On Mon, Mar 9, 2015 at 11:37 AM, Cui Lin cui@hds.com wrote:

  Dear all,

  Could you send me a list for input data source that spark streaming
 could support?
 My list is HDFS, Kafka, textfile?…

  I am wondering if spark streaming could directly read data from certain
 port (443 e.g.) that my devices directly send to?



  Best regards,

  Cui Lin



Re: Spark Streaming input data source list

2015-03-09 Thread Tathagata Das
Link to custom receiver guide
https://spark.apache.org/docs/latest/streaming-custom-receivers.html

On Mon, Mar 9, 2015 at 5:55 PM, Shao, Saisai saisai.s...@intel.com wrote:

  Hi Lin,



 AFAIK, currently there’s no built-in receiver API for RDBMs, but you can
 customize your own receiver to get data from RDBMs, for the details you can
 refer to the docs.



 Thanks

 Jerry



 *From:* Cui Lin [mailto:cui@hds.com]
 *Sent:* Tuesday, March 10, 2015 8:36 AM
 *To:* Tathagata Das
 *Cc:* user@spark.apache.org
 *Subject:* Re: Spark Streaming input data source list



 Tathagata,



 Thanks for your quick response. The link is helpful to me.

 Do you know any API for streaming data from RMDB ?





 Best regards,



 Cui Lin



 *From: *Tathagata Das t...@databricks.com
 *Date: *Monday, March 9, 2015 at 11:28 AM
 *To: *Cui Lin cui@hds.com
 *Cc: *user@spark.apache.org user@spark.apache.org
 *Subject: *Re: Spark Streaming input data source list



 Spark Streaming has StreamingContext.socketStream()


 http://spark.apache.org/docs/1.2.1/api/java/org/apache/spark/streaming/StreamingContext.html#socketStream(java.lang.String,
 int, scala.Function1, org.apache.spark.storage.StorageLevel,
 scala.reflect.ClassTag)



 TD



 On Mon, Mar 9, 2015 at 11:37 AM, Cui Lin cui@hds.com wrote:

  Dear all,



 Could you send me a list for input data source that spark streaming could
 support?

 My list is HDFS, Kafka, textfile?…



 I am wondering if spark streaming could directly read data from certain
 port (443 e.g.) that my devices directly send to?







 Best regards,



 Cui Lin





Re: Spark Streaming Switchover Time

2015-03-06 Thread Tathagata Das
It is probably the time taken by the system to figure out that the worker
is down. Could you see in the logs to find what goes on when you kill the
worker?

TD

On Wed, Mar 4, 2015 at 6:20 AM, Nastooh Avessta (navesta) nave...@cisco.com
 wrote:

  Indeed. And am wondering if this switchover time can be decreased.

 Cheers,



 [image: http://www.cisco.com/web/europe/images/email/signature/logo05.jpg]

 *Nastooh Avessta*
 ENGINEER.SOFTWARE ENGINEERING
 nave...@cisco.com
 Phone: *+1 604 647 1527 %2B1%20604%20647%201527*

 *Cisco Systems Limited*
 595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121
 VANCOUVER
 BRITISH COLUMBIA
 V7X 1J1
 CA
 Cisco.com http://www.cisco.com/



 [image: Think before you print.]Think before you print.

 This email may contain confidential and privileged material for the sole
 use of the intended recipient. Any review, use, distribution or disclosure
 by others is strictly prohibited. If you are not the intended recipient (or
 authorized to receive for the recipient), please contact the sender by
 reply email and delete all copies of this message.

 For corporate legal information go to:
 http://www.cisco.com/web/about/doing_business/legal/cri/index.html

 Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J
 2T3. Phone: 416-306-7000; Fax: 416-306-7099. *Preferences
 http://www.cisco.com/offer/subscribe/?sid=000478326 - Unsubscribe
 http://www.cisco.com/offer/unsubscribe/?sid=000478327 – Privacy
 http://www.cisco.com/web/siteassets/legal/privacy.html*



 *From:* Tathagata Das [mailto:t...@databricks.com]
 *Sent:* Tuesday, March 03, 2015 11:11 PM

 *To:* Nastooh Avessta (navesta)
 *Cc:* user@spark.apache.org
 *Subject:* Re: Spark Streaming Switchover Time



 I am confused. Are you killing the 1st worker node to see whether the
 system restarts the receiver on the second worker?



 TD



 On Tue, Mar 3, 2015 at 10:49 PM, Nastooh Avessta (navesta) 
 nave...@cisco.com wrote:

 This is the time that it takes for the driver to start receiving data once
 again, from the 2nd worker, when the 1st worker, where streaming thread
 was initially running, is shutdown.

 Cheers,



 [image: http://www.cisco.com/web/europe/images/email/signature/logo05.jpg]

 *Nastooh Avessta*
 ENGINEER.SOFTWARE ENGINEERING
 nave...@cisco.com
 Phone: *+1 604 647 1527 %2B1%20604%20647%201527*

 *Cisco Systems Limited*
 595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121
 VANCOUVER
 BRITISH COLUMBIA
 V7X 1J1
 CA
 Cisco.com http://www.cisco.com/



 [image: Think before you print.]Think before you print.

 This email may contain confidential and privileged material for the sole
 use of the intended recipient. Any review, use, distribution or disclosure
 by others is strictly prohibited. If you are not the intended recipient (or
 authorized to receive for the recipient), please contact the sender by
 reply email and delete all copies of this message.

 For corporate legal information go to:
 http://www.cisco.com/web/about/doing_business/legal/cri/index.html

 Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J
 2T3. Phone: 416-306-7000; Fax: 416-306-7099. *Preferences
 http://www.cisco.com/offer/subscribe/?sid=000478326 - Unsubscribe
 http://www.cisco.com/offer/unsubscribe/?sid=000478327 – Privacy
 http://www.cisco.com/web/siteassets/legal/privacy.html*



 *From:* Tathagata Das [mailto:t...@databricks.com]
 *Sent:* Tuesday, March 03, 2015 10:24 PM
 *To:* Nastooh Avessta (navesta)
 *Cc:* user@spark.apache.org
 *Subject:* Re: Spark Streaming Switchover Time



 Can you elaborate on what is this switchover time?



 TD



 On Tue, Mar 3, 2015 at 9:57 PM, Nastooh Avessta (navesta) 
 nave...@cisco.com wrote:

 Hi

 On a standalone, Spark 1.0.0, with 1 master and 2 workers, operating in
 client mode, running a udp streaming application, I am noting around 2
 second elapse time on switchover, upon shutting down the streaming worker,
 where streaming window length is 1 sec. I am wondering what parameters are
 available to the developer to shorten this switchover time.

 Cheers,



 [image: http://www.cisco.com/web/europe/images/email/signature/logo05.jpg]

 *Nastooh Avessta*
 ENGINEER.SOFTWARE ENGINEERING
 nave...@cisco.com
 Phone: *+1 604 647 1527 %2B1%20604%20647%201527*

 *Cisco Systems Limited*
 595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121
 VANCOUVER
 BRITISH COLUMBIA
 V7X 1J1
 CA
 Cisco.com http://www.cisco.com/



 [image: Think before you print.]Think before you print.

 This email may contain confidential and privileged material for the sole
 use of the intended recipient. Any review, use, distribution or disclosure
 by others is strictly prohibited. If you are not the intended recipient (or
 authorized to receive for the recipient), please contact the sender by
 reply email and delete all copies of this message.

 For corporate legal information go to:
 http://www.cisco.com/web/about/doing_business/legal/cri/index.html

Re: Spark Streaming - Duration 1s not matching reality

2015-03-05 Thread Tathagata Das
Hint: Print() just gives a sample of what is in the data, and does not
enforce the processing on all the data (only the first partition of the rdd
is computed to get 10 items). Count() actually processes all the data. This
is all due to lazy eval, if you don't need to use all the data, don't
compute all the data :)

HTH

TD

On Thu, Mar 5, 2015 at 3:10 PM, eleroy ele...@msn.com wrote:

 Hello,

 Getting started with Spark.
 Got JavaNetworkWordcount working on a 3 node cluster. netcat on  with a
 infinite loop printing random numbers 0-100

 With a duration of 1sec, I do see a list of (word, count) values every
 second. The list is limited to 10 values (as per the docs)

 The count is ~6000 counts per number. I assume that since my input is
 random
 numbers from 0 to 100, and i count 6000 for each, the distribution being
 homogeneous, that would mean 600,000 values are being ingested.
 I switch to using a constant number, and then I'm seeing between 200,000
 and
 2,000,000 counts, but the console response is erratic: it's not 1sec
 anymore, it's sometimes 2, sometimes more, and sometimes much faster...

 I am looking to do 1-to-1 processing (one value outputs one result) so I
 replaced the flatMap function with a map function, and do my calculation.

 Now I'd like to know how many events I was able to process but it's not
 clear at all:
 If I use print, it's fast again (1sec) but I only see the first 10 results.
 I was trying to add a counter... and realize the counter only seem to
 increment by only 11 each time

 This is very confusing... It looks like the counter is only incremented on
 the elements affected by the print statement... so does that mean the other
 values are not even calculated until requested?

 If i use .count() on the output RDD, then I do see a realistic count, but
 then it doesn't take 1sec anymore: it's more 4 to 5sec to get 600,000 -
 1,000,000 events counted.

 I'm not sure where to get from here or how to benchmark the time to
 actually
 process the events

 Any hint or useful link would be appreciated.
 Thanks for your help.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Duration-1s-not-matching-reality-tp21938.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Why can't Spark Streaming recover from the checkpoint directory when using a third party library for processingmulti-line JSON?

2015-03-04 Thread Tathagata Das
That could be a corner case bug. How do you add the 3rd party library to
the class path of the driver? Through spark-submit? Could you give the
command you used?

TD

On Wed, Mar 4, 2015 at 12:42 AM, Emre Sevinc emre.sev...@gmail.com wrote:

 I've also tried the following:

 Configuration hadoopConfiguration = new Configuration();
 hadoopConfiguration.set(multilinejsoninputformat.member, itemSet);

 JavaStreamingContext ssc =
 JavaStreamingContext.getOrCreate(checkpointDirectory, hadoopConfiguration,
 factory, false);


 but I still get the same exception.

 Why doesn't getOrCreate ignore that Hadoop configuration part (which
 normally works, e.g. when not recovering)?

 --
 Emre


 On Tue, Mar 3, 2015 at 3:36 PM, Emre Sevinc emre.sev...@gmail.com wrote:

 Hello,

 I have a Spark Streaming application (that uses Spark 1.2.1) that listens
 to an input directory, and when new JSON files are copied to that directory
 processes them, and writes them to an output directory.

 It uses a 3rd party library to process the multi-line JSON files (
 https://github.com/alexholmes/json-mapreduce). You can see the relevant
 part of the streaming application at:

   https://gist.github.com/emres/ec18ee264e4eb0dd8f1a

 When I run this application locally, it works perfectly fine. But then I
 wanted to test whether it could recover from failure, e.g. if I stopped it
 right in the middle of processing some files. I started the streaming
 application, copied 100 files to the input directory, and hit Ctrl+C when
 it has alread processed about 50 files:

 ...
 2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 [Stage
 0:==
 (65 + 4) / 100]
 ^C

 Then I started the application again, expecting that it could recover
 from the checkpoint. For a while it started to read files again and then
 gave an exception:

 ...
 2015-03-03 15:06:39 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-03-03 15:06:39 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-03-03 15:06:39 WARN  SchemaValidatorDriver:145 -
  * * * hadoopConfiguration: itemSet
 2015-03-03 15:06:40 ERROR Executor:96 - Exception in task 0.0 in stage
 0.0 (TID 0)
 java.io.IOException: Missing configuration value for
 multilinejsoninputformat.member
 at
 com.alexholmes.json.mapreduce.MultiLineJsonInputFormat.createRecordReader(MultiLineJsonInputFormat.java:30)
 at
 org.apache.spark.rdd.NewHadoopRDD$$anon$1.init(NewHadoopRDD.scala:133)
 at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107)
 at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)

 Since in the exception it refers to a missing configuration
 multilinejsoninputformat.member, I think it is about the following line:

ssc.ssc().sc().hadoopConfiguration().set(
 multilinejsoninputformat.member, itemSet);

 And this is why I also log the value of it, and as you can see above,
 just before it gives the exception in the recovery process, it shows that 
 multilinejsoninputformat.member
 is set to itemSet. But somehow it is not found during the recovery.
 This exception happens only when it tries to recover from a previously
 interrupted run.

 I've also tried moving the above line into the createContext method,
 but still had the same exception.

 Why is that?

 And how can I work around it?

 --
 Emre Sevinç
 http://www.bigindustries.be/




 

Re: Is FileInputDStream returned by fileStream method a reliable receiver?

2015-03-04 Thread Tathagata Das
The file stream does not use receiver. May be that was not clear in the
programming guide. I am updating it for 1.3 release right now, I will make
it more clear.
And file stream has full reliability. Read this in the programming guide.
http://spark.apache.org/docs/latest/streaming-programming-guide.html#semantics-with-files-as-input-source

On Wed, Mar 4, 2015 at 2:14 AM, Emre Sevinc emre.sev...@gmail.com wrote:

 Is FileInputDStream returned by fileStream method a reliable receiver?

 In the Spark Streaming Guide it says:

   There can be two kinds of data sources based on their *reliability*.
 Sources (like Kafka and Flume) allow the transferred data to be
 acknowledged. If the system receiving data from these *reliable* sources
 acknowledge the received data correctly, it can be ensured that no data
 gets lost due to any kind of failure. This leads to two kinds of receivers.

1. *Reliable Receiver* - A *reliable receiver* correctly acknowledges
a reliable source that the data has been received and stored in Spark with
replication.
2. *Unreliable Receiver* - These are receivers for sources that do not
support acknowledging. Even for reliable sources, one may implement an
unreliable receiver that do not go into the complexity of acknowledging
correctly.


 So I wonder whether the receivers for HDFS (and local file system) are
 reliable, e.g. when I'm using fileStream method to process files in a
 directory locally or on HDFS?


 --
 Emre Sevinç



Re: Why different numbers of partitions give different results for the same computation on the same dataset?

2015-03-03 Thread Tathagata Das
You can use DStream.transform() to do any arbitrary RDD transformations on
the RDDs generated by a DStream.

val coalescedDStream = myDStream.transform { _.coalesce(...) }



On Tue, Mar 3, 2015 at 1:47 PM, Saiph Kappa saiph.ka...@gmail.com wrote:

 Sorry I made a mistake in my code. Please ignore my question number 2.
 Different numbers of partitions give *the same* results!


 On Tue, Mar 3, 2015 at 7:32 PM, Saiph Kappa saiph.ka...@gmail.com wrote:

 Hi,

 I have a spark streaming application, running on a single node,
 consisting mainly of map operations. I perform repartitioning to control
 the number of CPU cores that I want to use. The code goes like this:

 val ssc = new StreamingContext(sparkConf, Seconds(5))
 val distFile = ssc.textFileStream(/home/myuser/spark-example/dump)
 val words = distFile.repartition(cores.toInt).flatMap(_.split( ))
   .filter(_.length  3)

   val wordCharValues = words.map(word = {
 var sum = 0
 word.toCharArray.foreach(c = {sum += c.toInt})
 sum.toDouble / word.length.toDouble
   }).foreachRDD(rdd = {
 println(MEAN:  + rdd.mean())
   })


 I have 2 questions:
 1) How can I use coalesce in this code instead of repartition?

 2) Why, using the same dataset (which is a small file processed within a
 single batch), the result that I obtain for the mean varies with the number
 of partitions? If I don't call the repartition method, the result is always
 the same for every execution, as it should be. But repartitioning for
 instance in 2 partitions gives a different mean value than using 8
 partitions. I really don't understand why given that my code is
 deterministic. Can someone enlighten me on this?

 Thanks.





Re: Spark Streaming Switchover Time

2015-03-03 Thread Tathagata Das
Can you elaborate on what is this switchover time?

TD

On Tue, Mar 3, 2015 at 9:57 PM, Nastooh Avessta (navesta) nave...@cisco.com
 wrote:

  Hi

 On a standalone, Spark 1.0.0, with 1 master and 2 workers, operating in
 client mode, running a udp streaming application, I am noting around 2
 second elapse time on switchover, upon shutting down the streaming worker,
 where streaming window length is 1 sec. I am wondering what parameters are
 available to the developer to shorten this switchover time.

 Cheers,



 [image: http://www.cisco.com/web/europe/images/email/signature/logo05.jpg]

 *Nastooh Avessta*
 ENGINEER.SOFTWARE ENGINEERING
 nave...@cisco.com
 Phone: *+1 604 647 1527 %2B1%20604%20647%201527*

 *Cisco Systems Limited*
 595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121
 VANCOUVER
 BRITISH COLUMBIA
 V7X 1J1
 CA
 Cisco.com http://www.cisco.com/



 [image: Think before you print.]Think before you print.

 This email may contain confidential and privileged material for the sole
 use of the intended recipient. Any review, use, distribution or disclosure
 by others is strictly prohibited. If you are not the intended recipient (or
 authorized to receive for the recipient), please contact the sender by
 reply email and delete all copies of this message.

 For corporate legal information go to:
 http://www.cisco.com/web/about/doing_business/legal/cri/index.html

 Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J
 2T3. Phone: 416-306-7000; Fax: 416-306-7099. *Preferences
 http://www.cisco.com/offer/subscribe/?sid=000478326 - Unsubscribe
 http://www.cisco.com/offer/unsubscribe/?sid=000478327 – Privacy
 http://www.cisco.com/web/siteassets/legal/privacy.html*





Re: Spark Streaming Switchover Time

2015-03-03 Thread Tathagata Das
I am confused. Are you killing the 1st worker node to see whether the
system restarts the receiver on the second worker?

TD

On Tue, Mar 3, 2015 at 10:49 PM, Nastooh Avessta (navesta) 
nave...@cisco.com wrote:

  This is the time that it takes for the driver to start receiving data
 once again, from the 2nd worker, when the 1st worker, where streaming
 thread was initially running, is shutdown.

 Cheers,



 [image: http://www.cisco.com/web/europe/images/email/signature/logo05.jpg]

 *Nastooh Avessta*
 ENGINEER.SOFTWARE ENGINEERING
 nave...@cisco.com
 Phone: *+1 604 647 1527 %2B1%20604%20647%201527*

 *Cisco Systems Limited*
 595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121
 VANCOUVER
 BRITISH COLUMBIA
 V7X 1J1
 CA
 Cisco.com http://www.cisco.com/



 [image: Think before you print.]Think before you print.

 This email may contain confidential and privileged material for the sole
 use of the intended recipient. Any review, use, distribution or disclosure
 by others is strictly prohibited. If you are not the intended recipient (or
 authorized to receive for the recipient), please contact the sender by
 reply email and delete all copies of this message.

 For corporate legal information go to:
 http://www.cisco.com/web/about/doing_business/legal/cri/index.html

 Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J
 2T3. Phone: 416-306-7000; Fax: 416-306-7099. *Preferences
 http://www.cisco.com/offer/subscribe/?sid=000478326 - Unsubscribe
 http://www.cisco.com/offer/unsubscribe/?sid=000478327 – Privacy
 http://www.cisco.com/web/siteassets/legal/privacy.html*



 *From:* Tathagata Das [mailto:t...@databricks.com]
 *Sent:* Tuesday, March 03, 2015 10:24 PM
 *To:* Nastooh Avessta (navesta)
 *Cc:* user@spark.apache.org
 *Subject:* Re: Spark Streaming Switchover Time



 Can you elaborate on what is this switchover time?



 TD



 On Tue, Mar 3, 2015 at 9:57 PM, Nastooh Avessta (navesta) 
 nave...@cisco.com wrote:

 Hi

 On a standalone, Spark 1.0.0, with 1 master and 2 workers, operating in
 client mode, running a udp streaming application, I am noting around 2
 second elapse time on switchover, upon shutting down the streaming worker,
 where streaming window length is 1 sec. I am wondering what parameters are
 available to the developer to shorten this switchover time.

 Cheers,



 [image: http://www.cisco.com/web/europe/images/email/signature/logo05.jpg]

 *Nastooh Avessta*
 ENGINEER.SOFTWARE ENGINEERING
 nave...@cisco.com
 Phone: *+1 604 647 1527 %2B1%20604%20647%201527*

 *Cisco Systems Limited*
 595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121
 VANCOUVER
 BRITISH COLUMBIA
 V7X 1J1
 CA
 Cisco.com http://www.cisco.com/



 [image: Think before you print.]Think before you print.

 This email may contain confidential and privileged material for the sole
 use of the intended recipient. Any review, use, distribution or disclosure
 by others is strictly prohibited. If you are not the intended recipient (or
 authorized to receive for the recipient), please contact the sender by
 reply email and delete all copies of this message.

 For corporate legal information go to:
 http://www.cisco.com/web/about/doing_business/legal/cri/index.html

 Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J
 2T3. Phone: 416-306-7000; Fax: 416-306-7099. *Preferences
 http://www.cisco.com/offer/subscribe/?sid=000478326 - Unsubscribe
 http://www.cisco.com/offer/unsubscribe/?sid=000478327 – Privacy
 http://www.cisco.com/web/siteassets/legal/privacy.html*







Re: Race Condition in Streaming Thread

2015-02-27 Thread Tathagata Das
Are you sure the multiple invocations are not from previous runs of the
program?

TD

On Fri, Feb 27, 2015 at 12:16 PM, Nastooh Avessta (navesta) 
nave...@cisco.com wrote:

  Hi

 Under Spark 1.0.0, standalone, client mode am trying to invoke a 3rd
 party udp traffic generator, from the streaming thread. The excerpt is as
 follows:

 …

do{



  try {




  p = Runtime.getRuntime().exec(Prog );



   socket.receive(packet);



   output.clear();

   kryo.writeObject(output, packet);

   store(output);

 …

 Program has a test to check for existing instantiation, e.g. [ $(pidof
 Prog) ]  exit.  This code runs fine, i.e., 3rd party application is
 invoked, data is received, analyzed on driver, etc. Problem arises, when I
 test redundancy and fault-tolerance. Specifically, when I manually
 terminate Prog, upon recovery,  multiple invocations are observed. This
 could be due to multiple threads getting through  [ $(pidof Prog) ] 
 exit. However, I was hoping by adding semaphores, as follows, to avoid this
 problem:

 …

do{



  try {


 sem.acquire();

   p =
 Runtime.getRuntime().exec(Prog);

  sem.release();

 }catch(IOException ioe){


   //ioe.printStackTrace();

   break;

 }

   socket.receive(packet);

   //InetAddress returnIPAddress = packet.getAddress();
 returnPort = packet.getPort();

   output.clear();

   kryo.writeObject(output, packet);

   store(output);

 …



 However, I am still seeing multiple invocations of Prog, upon recovery. I
 have also experimented with the following configuration parameters, to no
 avail:

   sparkConf.set(spark.cores.max, args[1]);

   sparkConf.set(spark.task.cpus, args[2]);

  sparkConf.set(spark.default.parallelism,args[2]);

 with args={(1,1),(2,1), (1,2),…}

 Any thoughts?

 Cheers,



 [image: http://www.cisco.com/web/europe/images/email/signature/logo05.jpg]

 *Nastooh Avessta*
 ENGINEER.SOFTWARE ENGINEERING
 nave...@cisco.com
 Phone: *+1 604 647 1527 %2B1%20604%20647%201527*

 *Cisco Systems Limited*
 595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121
 VANCOUVER
 BRITISH COLUMBIA
 V7X 1J1
 CA
 Cisco.com http://www.cisco.com/



 [image: Think before you print.]Think before you print.

 This email may contain confidential and privileged material for the sole
 use of the intended recipient. Any review, use, distribution or disclosure
 by others is strictly prohibited. If you are not the intended recipient (or
 authorized to receive for the recipient), please contact the sender by
 reply email and delete all copies of this message.

 For corporate legal information go to:
 http://www.cisco.com/web/about/doing_business/legal/cri/index.html

 Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J
 2T3. Phone: 416-306-7000; Fax: 416-306-7099. *Preferences
 http://www.cisco.com/offer/subscribe/?sid=000478326 - Unsubscribe
 http://www.cisco.com/offer/unsubscribe/?sid=000478327 – Privacy
 http://www.cisco.com/web/siteassets/legal/privacy.html*





Re: Race Condition in Streaming Thread

2015-02-27 Thread Tathagata Das
Its wasn't clear from the snippet whats going on. Can your provide the
whole Receiver code?

TD

On Fri, Feb 27, 2015 at 12:37 PM, Nastooh Avessta (navesta) 
nave...@cisco.com wrote:

  I am, as I issue killall -9 Prog, prior to testing.

 Cheers,



 [image: http://www.cisco.com/web/europe/images/email/signature/logo05.jpg]

 *Nastooh Avessta*
 ENGINEER.SOFTWARE ENGINEERING
 nave...@cisco.com
 Phone: *+1 604 647 1527 %2B1%20604%20647%201527*

 *Cisco Systems Limited*
 595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121
 VANCOUVER
 BRITISH COLUMBIA
 V7X 1J1
 CA
 Cisco.com http://www.cisco.com/



 [image: Think before you print.]Think before you print.

 This email may contain confidential and privileged material for the sole
 use of the intended recipient. Any review, use, distribution or disclosure
 by others is strictly prohibited. If you are not the intended recipient (or
 authorized to receive for the recipient), please contact the sender by
 reply email and delete all copies of this message.

 For corporate legal information go to:
 http://www.cisco.com/web/about/doing_business/legal/cri/index.html

 Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J
 2T3. Phone: 416-306-7000; Fax: 416-306-7099. *Preferences
 http://www.cisco.com/offer/subscribe/?sid=000478326 - Unsubscribe
 http://www.cisco.com/offer/unsubscribe/?sid=000478327 – Privacy
 http://www.cisco.com/web/siteassets/legal/privacy.html*



 *From:* Tathagata Das [mailto:t...@databricks.com]
 *Sent:* Friday, February 27, 2015 12:29 PM
 *To:* Nastooh Avessta (navesta)
 *Cc:* user@spark.apache.org
 *Subject:* Re: Race Condition in Streaming Thread



 Are you sure the multiple invocations are not from previous runs of the
 program?



 TD



 On Fri, Feb 27, 2015 at 12:16 PM, Nastooh Avessta (navesta) 
 nave...@cisco.com wrote:

 Hi

 Under Spark 1.0.0, standalone, client mode am trying to invoke a 3rd
 party udp traffic generator, from the streaming thread. The excerpt is as
 follows:

 …

do{



  try {




  p = Runtime.getRuntime().exec(Prog );



   socket.receive(packet);



   output.clear();

   kryo.writeObject(output, packet);

   store(output);

 …

 Program has a test to check for existing instantiation, e.g. [ $(pidof
 Prog) ]  exit.  This code runs fine, i.e., 3rd party application is
 invoked, data is received, analyzed on driver, etc. Problem arises, when I
 test redundancy and fault-tolerance. Specifically, when I manually
 terminate Prog, upon recovery,  multiple invocations are observed. This
 could be due to multiple threads getting through  [ $(pidof Prog) ] 
 exit. However, I was hoping by adding semaphores, as follows, to avoid this
 problem:

 …

do{



  try {


 sem.acquire();

   p =
 Runtime.getRuntime().exec(Prog);

  sem.release();

 }catch(IOException ioe){


   //ioe.printStackTrace();

   break;

 }

   socket.receive(packet);

   //InetAddress returnIPAddress = packet.getAddress();
 returnPort = packet.getPort();

   output.clear();

   kryo.writeObject(output, packet);

   store(output);

 …



 However, I am still seeing multiple invocations of Prog, upon recovery. I
 have also experimented with the following configuration parameters, to no
 avail:

   sparkConf.set(spark.cores.max, args[1]);

   sparkConf.set(spark.task.cpus, args[2]);

  sparkConf.set(spark.default.parallelism,args[2]);

 with args={(1,1),(2,1), (1,2),…}

 Any thoughts?

 Cheers,



 [image: http://www.cisco.com/web/europe/images/email/signature/logo05.jpg]

 *Nastooh Avessta*
 ENGINEER.SOFTWARE ENGINEERING
 nave...@cisco.com
 Phone: *+1 604 647 1527 %2B1%20604%20647%201527*

 *Cisco Systems Limited*
 595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121
 VANCOUVER
 BRITISH COLUMBIA
 V7X 1J1
 CA
 Cisco.com http://www.cisco.com/



 [image: Think before you print.]Think before you print.

 This email may contain confidential and privileged material for the sole
 use of the intended recipient. Any review, use, distribution or disclosure
 by others is strictly prohibited. If you are not the intended recipient (or
 authorized to receive for the recipient), please contact the sender by
 reply email and delete all copies of this message.

 For corporate legal information go to:
 http://www.cisco.com/web/about/doing_business/legal/cri/index.html

 Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J
 2T3. Phone: 416-306-7000; Fax: 416-306-7099. *Preferences
 http://www.cisco.com/offer/subscribe/?sid=000478326 - Unsubscribe
 http://www.cisco.com/offer/unsubscribe/?sid=000478327

Re: throughput in the web console?

2015-02-26 Thread Tathagata Das
If you have one receiver, and you are doing only map-like operaitons then
the process will primarily happen on one machine. To use all the machines,
either receiver in parallel with multiple receivers, or spread out the
computation by explicitly repartitioning the received streams
(DStream.repartition) with sufficient partitions to load balance across
more machines.

TD

On Thu, Feb 26, 2015 at 9:52 AM, Saiph Kappa saiph.ka...@gmail.com wrote:

 One more question: while processing the exact same batch I noticed that
 giving more CPUs to the worker does not decrease the duration of the batch.
 I tried this with 4 and 8 CPUs. Though, I noticed that giving only 1 CPU
 the duration increased, but apart from that the values were pretty similar,
 whether I was using 4 or 6 or 8 CPUs.

 On Thu, Feb 26, 2015 at 5:35 PM, Saiph Kappa saiph.ka...@gmail.com
 wrote:

 By setting spark.eventLog.enabled to true it is possible to see the
 application UI after the application has finished its execution, however
 the Streaming tab is no longer visible.

 For measuring the duration of batches in the code I am doing something
 like this:
 «wordCharValues.foreachRDD(rdd = {
 val startTick = System.currentTimeMillis()
 val result = rdd.take(1)
 val timeDiff = System.currentTimeMillis() - startTick»

 But my quesiton is: is it possible to see the rate/throughput
 (records/sec) when I have a stream to process log files that appear in a
 folder?



 On Thu, Feb 26, 2015 at 1:36 AM, Tathagata Das t...@databricks.com
 wrote:

 Yes. # tuples processed in a batch = sum of all the tuples received by
 all the receivers.

 In screen shot, there was a batch with 69.9K records, and there was a
 batch which took 1 s 473 ms. These two batches can be the same, can be
 different batches.

 TD

 On Wed, Feb 25, 2015 at 10:11 AM, Josh J joshjd...@gmail.com wrote:

 If I'm using the kafka receiver, can I assume the number of records
 processed in the batch is the sum of the number of records processed by the
 kafka receiver?

 So in the screen shot attached the max rate of tuples processed in a
 batch is 42.7K + 27.2K = 69.9K tuples processed in a batch with a max
 processing time of 1 second 473 ms?

 On Wed, Feb 25, 2015 at 8:48 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 By throughput you mean Number of events processed etc?

 [image: Inline image 1]

 Streaming tab already have these statistics.



 Thanks
 Best Regards

 On Wed, Feb 25, 2015 at 9:59 PM, Josh J joshjd...@gmail.com wrote:


 On Wed, Feb 25, 2015 at 7:54 AM, Akhil Das 
 ak...@sigmoidanalytics.com wrote:

 For SparkStreaming applications, there is already a tab called
 Streaming which displays the basic statistics.


 Would I just need to extend this tab to add the throughput?





 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Re: Integrating Spark Streaming with Reactive Mongo

2015-02-26 Thread Tathagata Das
Hey Mike,

I quickly looked through the example and I found major performance issue.
You are collecting the RDDs to the driver and then sending them to Mongo in
a foreach. Why not doing a distributed push to Mongo?

WHAT YOU HAVE
val mongoConnection = ...

WHAT YOU SHUOLD DO

rdd.foreachPartition { iterator =
   val connection = createConnection()
   iterator.foreach { ... push partition using connection ...  }
}


On Thu, Feb 26, 2015 at 1:25 PM, Mike Trienis mike.trie...@orcsol.com
wrote:

 Hi All,

 I have Spark Streaming setup to write data to a replicated MongoDB
 database and would like to understand if there would be any issues using
 the Reactive Mongo library to write directly to the mongoDB? My stack is
 Apache Spark sitting on top of Cassandra for the datastore, so my thinking
 is that the MongoDB connector for Hadoop will not be particular useful for
 me since I'm not using HDFS? Is there anything that I'm missing?

 Here is an example of code that I'm planning on using as a starting point
 for my implementation.

 LogAggregator
 https://github.com/chimpler/blog-spark-streaming-log-aggregation/blob/master/src/main/scala/com/chimpler/sparkstreaminglogaggregation/LogAggregator.scala

 Thanks, Mike.



Re: throughput in the web console?

2015-02-25 Thread Tathagata Das
Yes. # tuples processed in a batch = sum of all the tuples received by all
the receivers.

In screen shot, there was a batch with 69.9K records, and there was a batch
which took 1 s 473 ms. These two batches can be the same, can be different
batches.

TD

On Wed, Feb 25, 2015 at 10:11 AM, Josh J joshjd...@gmail.com wrote:

 If I'm using the kafka receiver, can I assume the number of records
 processed in the batch is the sum of the number of records processed by the
 kafka receiver?

 So in the screen shot attached the max rate of tuples processed in a batch
 is 42.7K + 27.2K = 69.9K tuples processed in a batch with a max processing
 time of 1 second 473 ms?

 On Wed, Feb 25, 2015 at 8:48 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 By throughput you mean Number of events processed etc?

 [image: Inline image 1]

 Streaming tab already have these statistics.



 Thanks
 Best Regards

 On Wed, Feb 25, 2015 at 9:59 PM, Josh J joshjd...@gmail.com wrote:


 On Wed, Feb 25, 2015 at 7:54 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 For SparkStreaming applications, there is already a tab called
 Streaming which displays the basic statistics.


 Would I just need to extend this tab to add the throughput?





 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



Re: Re: Many Receiver vs. Many threads per Receiver

2015-02-25 Thread Tathagata Das
Spark Streaming has a new Kafka direct stream, to be release as
experimental feature with 1.3. That uses a low level consumer. Not sure if
it satisfies your purpose.
If you want more control, its best to create your own Receiver with the low
level Kafka API.

TD

On Tue, Feb 24, 2015 at 12:09 AM, bit1...@163.com bit1...@163.com wrote:

 Thanks Akhil.
 Not sure whether thelowlevel consumer.
 https://github.com/dibbhatt/kafka-spark-consumerwill be officially
 supported by Spark Streaming. So far, I don't see it mentioned/documented
 in the spark streaming programming guide.

 --
 bit1...@163.com


 *From:* Akhil Das ak...@sigmoidanalytics.com
 *Date:* 2015-02-24 16:21
 *To:* bit1...@163.com
 *CC:* user user@spark.apache.org
 *Subject:* Re: Many Receiver vs. Many threads per Receiver
 I believe when you go with 1, it will distribute the consumer across your
 cluster (possibly on 6 machines), but still it i don't see a away to tell
 from which partition it will consume etc. If you are looking to have a
 consumer where you can specify the partition details and all, then you are
 better off with the lowlevel consumer.
 https://github.com/dibbhatt/kafka-spark-consumer



 Thanks
 Best Regards

 On Tue, Feb 24, 2015 at 9:36 AM, bit1...@163.com bit1...@163.com wrote:

 Hi,
 I  am experimenting Spark Streaming and Kafka Integration, To read
 messages from Kafka in parallel, basically there are two ways
 1. Create many Receivers like (1 to 6).map(_ = KakfaUtils.createStream).
 2. Specifiy many threads when calling KakfaUtils.createStream like val
 topicMap(myTopic=6), this will create one receiver with 6 reading
 threads.

 My question is which option is better, sounds option 2 is better is to me
 because it saves a lot of cores(one Receiver one core), but I learned
 from somewhere else that choice 1 is better, so I would ask and see how you
 guys elaborate on this. Thank

 --
 bit1...@163.com





Re: Why must the dstream.foreachRDD(...) parameter be serializable?

2015-02-23 Thread Tathagata Das
There are different kinds of checkpointing going on. updateStateByKey
requires RDD checkpointing which can be enabled only by called
sparkContext.setCheckpointDirectory. But that does not enable Spark
Streaming driver checkpoints, which is necessary for recovering from driver
failures. That is enabled only by streamingContext.checkpoint(...) which
internally calls sparkContext.setCheckpointDirectory and also enables other
stuff.

TD

On Mon, Feb 23, 2015 at 1:28 AM, Tobias Pfeiffer t...@preferred.jp wrote:

 Sean,

 thanks for your message!

 On Mon, Feb 23, 2015 at 6:03 PM, Sean Owen so...@cloudera.com wrote:

 What I haven't investigated is whether you can enable checkpointing
 for the state in updateStateByKey separately from this mechanism,
 which is exactly your question. What happens if you set a checkpoint
 dir, but do *not* use StreamingContext.getOrCreate, but *do* call
 DStream.checkpoint?


 I didn't even use StreamingContext.getOrCreate(), just calling
 streamingContext.checkpoint(...) blew everything up. Well, blew up in the
 sense that actor.OneForOneStrategy will print the stack trace of
 the java.io.NotSerializableException every couple of seconds and
 something is not going right with execution (I think).

 BUT, indeed, just calling sparkContext.setCheckpointDir seems to be
 sufficient for updateStateByKey! Looking at what
 streamingContext.checkpoint() does, I don't get why ;-) and I am not sure
 that this is a robust solution, but in fact that seems to work!

 Thanks a lot,
 Tobias




Re: Query data in Spark RRD

2015-02-23 Thread Tathagata Das
You could build a rest API, but you may have issue if you want to return
back arbitrary binary data. A more complex but robust alternative is to use
some RPC libraries like Akka, Thrift, etc.

TD

On Mon, Feb 23, 2015 at 12:45 AM, Nikhil Bafna nikhil.ba...@flipkart.com
wrote:


 Tathagata - Yes, I'm thinking on that line.

 The problem is how to send to send the query to the backend? Bundle a http
 server into a spark streaming job, that will accept the parameters?

 --
 Nikhil Bafna

 On Mon, Feb 23, 2015 at 2:04 PM, Tathagata Das t...@databricks.com
 wrote:

 You will have a build a split infrastructure - a front end that takes the
 queries from the UI and sends them to the backend, and the backend (running
 the Spark Streaming app) will actually run the queries on table created in
 the contexts. The RPCs necessary between the frontend and backend will need
 to be implemented by you.

 On Sat, Feb 21, 2015 at 11:57 PM, Nikhil Bafna nikhil.ba...@flipkart.com
  wrote:


 Yes. As my understanding, it would allow me to write SQLs to query a
 spark context. But, the query needs to be specified within a job  deployed.

 What I want is to be able to run multiple dynamic queries specified at
 runtime from a dashboard.



 --
 Nikhil Bafna

 On Sat, Feb 21, 2015 at 8:37 PM, Ted Yu yuzhih...@gmail.com wrote:

 Have you looked at
 http://spark.apache.org/docs/1.2.0/api/scala/index.html#org.apache.spark.sql.SchemaRDD
 ?

 Cheers

 On Sat, Feb 21, 2015 at 4:24 AM, Nikhil Bafna 
 nikhil.ba...@flipkart.com wrote:


 Hi.

 My use case is building a realtime monitoring system over
 multi-dimensional data.

 The way I'm planning to go about it is to use Spark Streaming to store
 aggregated count over all dimensions in 10 sec interval.

 Then, from a dashboard, I would be able to specify a query over some
 dimensions, which will need re-aggregation from the already computed job.

 My query is, how can I run dynamic queries over data in schema RDDs?

 --
 Nikhil Bafna








Re: Periodic Broadcast in Apache Spark Streaming

2015-02-23 Thread Tathagata Das
You could do something like this.


def rddTrasnformationUsingBroadcast(rdd: RDD[...]): RDD[...] = {

   val broadcastToUse = getBroadcast()// get the reference to a
broadcast variable, new or existing.

   rdd.map { ..  } // use broadcast variable
}


dstream.transform(rddTrasnformationUsingBroadcast)


The function `rddTrasnformationUsingBroadcast` will be called at every
interval, which will call `getBroadcast` every time. That's an arbitrary
function returning a broadcast variable, so you can update what broadcast
variable to use whenever you want.

TD



On Wed, Feb 18, 2015 at 6:11 AM, aanilpala aanilp...@gmail.com wrote:

 I am implementing a stream learner for text classification. There are some
 single-valued parameters in my implementation that needs to be updated as
 new stream items arrive. For example, I want to change learning rate as the
 new predictions are made. However, I doubt that there is a way to broadcast
 variables after the initial broadcast. So what happens if I need to
 broadcast a variable every time I update it. If there is a way to do it or
 a
 workaround for what I want to accomplish in Spark Streaming, I'd be happy
 to
 hear about it.

 Thanks in advance.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Periodic-Broadcast-in-Apache-Spark-Streaming-tp21703.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: How to diagnose could not compute split errors and failed jobs?

2015-02-23 Thread Tathagata Das
Could you find the executor logs on the executor where that task was
scheduled? That may provide more information on what caused the error.
Also take a look at where the block in question was stored, and where the
task was scheduled.
You will need to enabled log4j INFO level logs for this debugging.

TD

On Thu, Feb 19, 2015 at 10:38 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Not quiet sure, but this can be the case. One of your executor is stuck on
 GC pause while the other one asks for the data from it and hence the
 request timesout ending in that exception. You can try increasing the akk
 framesize and ack wait timeout as follows:

   .set(spark.core.connection.ack.wait.timeout,600)  
 .set(spark.akka.frameSize,50)


 Thanks
 Best Regards

 On Fri, Feb 20, 2015 at 6:21 AM, Tim Smith secs...@gmail.com wrote:

 My streaming app runs fine for a few hours and then starts spewing Could
 not compute split, block input-xx-xxx not found errors. After this,
 jobs start to fail and batches start to pile up.

 My question isn't so much about why this error but rather, how do I trace
 what leads to this error? I am using disk+memory for storage so shouldn't
 be a case of data loss resulting from memory overrun.

 15/02/18 22:04:49 ERROR JobScheduler: Error running job streaming job
 142429705 ms.28
 org.apache.spark.SparkException: Job aborted due to stage failure: Task 3
 in stage 247644.0 failed 64 times, most recent failure: Lost task 3.63 in
 stage 247644.0 (TID 3705290, node-dn1-16-test.abcdefg.com):
 java.lang.Exception: Could not compute split, block input-28-1424297042500
 not found
 at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
 at
 org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)

 Driver stacktrace:
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
 at scala.Option.foreach(Option.scala:236)
 at
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 Thanks,

 Tim





Re: Query data in Spark RRD

2015-02-23 Thread Tathagata Das
You will have a build a split infrastructure - a front end that takes the
queries from the UI and sends them to the backend, and the backend (running
the Spark Streaming app) will actually run the queries on table created in
the contexts. The RPCs necessary between the frontend and backend will need
to be implemented by you.

On Sat, Feb 21, 2015 at 11:57 PM, Nikhil Bafna nikhil.ba...@flipkart.com
wrote:


 Yes. As my understanding, it would allow me to write SQLs to query a spark
 context. But, the query needs to be specified within a job  deployed.

 What I want is to be able to run multiple dynamic queries specified at
 runtime from a dashboard.



 --
 Nikhil Bafna

 On Sat, Feb 21, 2015 at 8:37 PM, Ted Yu yuzhih...@gmail.com wrote:

 Have you looked at
 http://spark.apache.org/docs/1.2.0/api/scala/index.html#org.apache.spark.sql.SchemaRDD
 ?

 Cheers

 On Sat, Feb 21, 2015 at 4:24 AM, Nikhil Bafna nikhil.ba...@flipkart.com
 wrote:


 Hi.

 My use case is building a realtime monitoring system over
 multi-dimensional data.

 The way I'm planning to go about it is to use Spark Streaming to store
 aggregated count over all dimensions in 10 sec interval.

 Then, from a dashboard, I would be able to specify a query over some
 dimensions, which will need re-aggregation from the already computed job.

 My query is, how can I run dynamic queries over data in schema RDDs?

 --
 Nikhil Bafna






Re: spark streaming window operations on a large window size

2015-02-23 Thread Tathagata Das
The default persistence level is MEMORY_AND_DISK, so the LRU policy would
discard the blocks to disk, so the streaming app will not fail. However,
since things will get constantly read in and out of disk as windows are
processed, the performance wont be great. So it is best to have sufficient
memory to keep all the window data in memory.

TD

On Mon, Feb 23, 2015 at 8:26 AM, Shao, Saisai saisai.s...@intel.com wrote:

 I don't think current Spark Streaming supports window operations which
 beyond its available memory, internally Spark Streaming puts all the data
 in the memory belongs to the effective window, if the memory is not enough,
 BlockManager will discard the blocks at LRU policy, so something unexpected
 will be occurred.

 Thanks
 Jerry

 -Original Message-
 From: avilevi3 [mailto:avile...@gmail.com]
 Sent: Monday, February 23, 2015 12:57 AM
 To: user@spark.apache.org
 Subject: spark streaming window operations on a large window size

 Hi guys,

 does spark streaming supports window operations on a sliding window that
 is data is larger than the available memory?
 we would like to
 currently we are using kafka as input, but we could change that if needed.

 thanks
 Avi



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-window-operations-on-a-large-window-size-tp21764.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
 commands, e-mail: user-h...@spark.apache.org


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Accumulator in SparkUI for streaming

2015-02-23 Thread Tathagata Das
Unless I am unaware some latest changes, the SparkUI shows stages, and
jobs, not accumulator results. And the UI not designed to be pluggable for
showing user-defined stuff.

TD

On Fri, Feb 20, 2015 at 12:25 AM, Tim Smith secs...@gmail.com wrote:

 On Spark 1.2:

 I am trying to capture # records read from a kafka topic:

 val inRecords = ssc.sparkContext.accumulator(0, InRecords)

 ..

 kInStreams.foreach( k =
 {

  k.foreachRDD ( rdd =  inRecords += rdd.count().toInt  )
  inRecords.value


 Question is how do I get the accumulator to show up in the UI? I tried
 inRecords.value but that didn't help. Pretty sure it isn't showing up in
 Stage metrics.

 What's the trick here? collect?

 Thanks,

 Tim




Re: Re: About FlumeUtils.createStream

2015-02-23 Thread Tathagata Das
Distributed among cluster nodes.

On Mon, Feb 23, 2015 at 8:45 PM, bit1...@163.com bit1...@163.com wrote:

 Hi, Akhil,Tathagata,

 This leads me to another question ,For the Spark Streaming and Kafka
 Integration, If there are more than one Receiver in the cluster, such as
   val streams = (1 to 6).map ( _ = KafkaUtils.createStream(ssc,
 zkQuorum, group, topicMap).map(_._2) ),
 then these Receivers will stay on one cluster node, or will they
 distributed among the cluster nodes?

 --
 bit1...@163.com


 *From:* Akhil Das ak...@sigmoidanalytics.com
 *Date:* 2015-02-24 12:58
 *To:* Tathagata Das t...@databricks.com
 *CC:* user user@spark.apache.org; bit1129 bit1...@163.com
 *Subject:* Re: About FlumeUtils.createStream

 I see, thanks for the clarification TD.
 On 24 Feb 2015 09:56, Tathagata Das t...@databricks.com wrote:

 Akhil, that is incorrect.

 Spark will list on the given port for Flume to push data into it.
 When in local mode, it will listen on localhost:
 When in some kind of cluster, instead of localhost you will have to give
 the hostname of the cluster node where you want Flume to forward the data.
 Spark will launch the Flume receiver on that node (assuming the hostname
 matching is correct), and list on port , for receiving data from Flume.
 So only the configured machine will listen on port .

 I suggest trying the other stream. FlumeUtils.createPollingStream. More
 details here.
 http://spark.apache.org/docs/latest/streaming-flume-integration.html



 On Sat, Feb 21, 2015 at 12:17 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Spark won't listen on  mate, It basically means you have a flume
 source running at port  of your localhost. And when you submit your
 application in standalone mode, workers will consume date from that port.

 Thanks
 Best Regards

 On Sat, Feb 21, 2015 at 9:22 AM, bit1...@163.com bit1...@163.com
 wrote:


 Hi,
 In the spark streaming application, I write the code, 
 FlumeUtils.createStream(ssc,localhost,),which
 means spark will listen on the  port, and wait for Flume Sink to write
 to it.
 My question is:  when I submit the application to the Spark Standalone
 cluster, will  be opened only on the Driver Machine or all the workers
 will also open the  port and wait for the Flume data?

 --






Re: Magic number 16: Why doesn't Spark Streaming process more than 16 files?

2015-02-23 Thread Tathagata Das
In case this mystery has not been solved, DStream.print() essentially does
a RDD.take(10) on each RDD, which computes only a subset of the partitions
in the RDD. But collects forces the evaluation of all the RDDs. Since you
are writing to json in the mapI() function, this could be the reason.

TD

On Wed, Feb 18, 2015 at 7:15 AM, Imran Rashid iras...@cloudera.com wrote:

 so if you only change this line:


 https://gist.github.com/emres/0fb6de128baea099e741#file-mymoduledriver-java-L137

 to

 json.print()

 it processes 16 files instead?  I am totally perplexed.  My only
 suggestions to help debug are
 (1) see what happens when you get rid of MyModuleWorker completely --
 change MyModuleDriver#process to just
 inStream.print()
 and see what happens

 (2) stick a bunch of printlns into MyModuleWorker#call

 (3) turn on DEBUG logging
 for org.apache.spark.streaming.dstream.FileInputDStream

 my gut instinct is that something else is flaky about the file input
 stream (eg., it makes some assumption about the file system which maybe
 aren't valid in your case, it has a bunch of caveats), and that it has just
 happened to work sometimes with your foreachRdd and failed sometimes with
 print.

 Sorry I am not a lot of help in this case, hope this leads you down the
 right track or somebody else can help out.

 Imran


 On Wed, Feb 18, 2015 at 2:28 AM, Emre Sevinc emre.sev...@gmail.com
 wrote:

 Hello Imran,

 (a) I know that all 20 files are processed when I use foreachRDD, because
 I can see the processed files in the output directory. (My application
 logic writes them to an output directory after they are processed, *but*
 that writing operation does not happen in foreachRDD, below you can see the
 URL that includes my code and clarifies this).

 (b) I know only 16 files are processed because in the output directory I
 see only 16 files processed. I wait for minutes and minutes and no more
 files appear in the output directory. When I see only 16 files are
 processed and Spark Streaming went to the mode of idly watching the input
 directory, and then if I copy a few more files, they are also processed.

 (c) Sure, you can see part of my code in the following gist:
 https://gist.github.com/emres/0fb6de128baea099e741
  It might seem a little convoluted at first, because my application
 is divided into two classes, a Driver class (setting up things and
 initializing them), and a Worker class (that implements the core
 functionality). I've also put the relevant methods from the my utility
 classes for completeness.

 I am as perplexed as you are as to why forcing the output via foreachRDD
 ended up in different behaviour compared to simply using print() method.

 Kind regards,
 Emre



 On Tue, Feb 17, 2015 at 4:23 PM, Imran Rashid iras...@cloudera.com
 wrote:

 Hi Emre,

 there shouldn't be any difference in which files get processed w/
 print() vs. foreachRDD().  In fact, if you look at the definition of
 print(), it is just calling foreachRDD() underneath.  So there is something
 else going on here.

 We need a little more information to figure out exactly what is going
 on.  (I think Sean was getting at the same thing ...)

 (a) how do you know that when you use foreachRDD, all 20 files get
 processed?

 (b) How do you know that only 16 files get processed when you print()?
 Do you know the other files are being skipped, or maybe they are just
 stuck somewhere?  eg., suppose you start w/ 20 files, and you see 16 get
 processed ... what happens after you add a few more files to the
 directory?  Are they processed immediately, or are they never processed
 either?

 (c) Can you share any more code of what you are doing to the dstreams
 *before* the print() / foreachRDD()?  That might give us more details about
 what the difference is.

 I can't see how .count.println() would be different than just println(),
 but maybe I am missing something also.

 Imran

 On Mon, Feb 16, 2015 at 7:49 AM, Emre Sevinc emre.sev...@gmail.com
 wrote:

 Sean,

 In this case, I've been testing the code on my local machine and using
 Spark locally, so I all the log output was available on my terminal. And
 I've used the .print() method to have an output operation, just to force
 Spark execute.

 And I was not using foreachRDD, I was only using print() method on a
 JavaDStream object, and it was working fine for a few files, up to 16 (and
 without print() it did not do anything because there were no output
 operations).

 To sum it up, in my case:

  - Initially, use .print() and no foreachRDD: processes up to 16 files
 and does not do anything for the remaining 4.
  - Remove .print() and use foreachRDD: processes all of the 20 files.

 Maybe, as in Akhil Das's suggestion, using .count.print() might also
 have fixed my problem, but I'm satisfied with foreachRDD approach for now.
 (Though it is still a mystery to me why using .print() had a difference,
 maybe my mental model of Spark is wrong, I thought no matter what output
 

Re: Write ahead Logs and checkpoint

2015-02-23 Thread Tathagata Das
Exactly, that is the reason.

To avoid that, in Spark 1.3 to-be-released, we have added a new Kafka API
(called direct stream) which does not use Zookeeper at all to keep track of
progress, and maintains offset within Spark Streaming. That can guarantee
all records being received exactly-once. Its experimental for now, but we
will make it stable. Please try it out.

TD

On Mon, Feb 23, 2015 at 9:41 PM, V Dineshkumar developer.dines...@gmail.com
 wrote:

 Hi,

 My spark streaming application is pulling data from Kafka.To prevent data
 loss I have implemented WAL and enable checkpointing.On killing my
 application and restarting it I am able to prevent data loss now but
 however I am getting duplicate messages.

 Is it because the application got killed before it was able checkpoint the
 current processing state??
 If yes how to tackle the duplicate messages?

 Thanks,
 Dinesh



Re: Write ahead Logs and checkpoint

2015-02-23 Thread Tathagata Das
I think it will not affect. We are ignore the offsets store any where
outside Spark Streaming. It is the fact that progress information was being
stored in two different places (SS and Kafka/ZK) that was causing
inconsistencies and duplicates.

TD

On Mon, Feb 23, 2015 at 11:27 PM, Felix C felixcheun...@hotmail.com wrote:

  Kafka 0.8.2 has built-in offset management, how would that affect direct
 stream in spark?
 Please see KAFKA-1012

 --- Original Message ---

 From: Tathagata Das t...@databricks.com
 Sent: February 23, 2015 9:53 PM
 To: V Dineshkumar developer.dines...@gmail.com
 Cc: user user@spark.apache.org
 Subject: Re: Write ahead Logs and checkpoint

  Exactly, that is the reason.

  To avoid that, in Spark 1.3 to-be-released, we have added a new Kafka
 API (called direct stream) which does not use Zookeeper at all to keep
 track of progress, and maintains offset within Spark Streaming. That can
 guarantee all records being received exactly-once. Its experimental for
 now, but we will make it stable. Please try it out.

  TD

 On Mon, Feb 23, 2015 at 9:41 PM, V Dineshkumar 
 developer.dines...@gmail.com wrote:

 Hi,

  My spark streaming application is pulling data from Kafka.To prevent
 data loss I have implemented WAL and enable checkpointing.On killing my
 application and restarting it I am able to prevent data loss now but
 however I am getting duplicate messages.

  Is it because the application got killed before it was able checkpoint
 the current processing state??
 If yes how to tackle the duplicate messages?

  Thanks,
 Dinesh





Re: Any sample code for Kafka consumer

2015-02-22 Thread Tathagata Das
Spark Streaming already directly supports Kafka
http://spark.apache.org/docs/latest/streaming-programming-guide.html#advanced-sources

Is there any reason why that is not sufficient?

TD

On Sun, Feb 22, 2015 at 5:18 PM, mykidong mykid...@gmail.com wrote:

 In java, you can see this example:
 https://github.com/mykidong/spark-kafka-simple-consumer-receiver

 - Kidong.

 -- Original Message --
 From: icecreamlc [via Apache Spark User List] [hidden email]
 http:///user/SendEmail.jtp?type=nodenode=21758i=0
 To: mykidong [hidden email]
 http:///user/SendEmail.jtp?type=nodenode=21758i=1
 Sent: 2015-02-21 오전 11:16:37
 Subject: Any sample code for Kafka consumer


 Dear all,

 Do you have any sample code that consume data from Kafka server using
 spark streaming? Thanks a lot!!!

  --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Any-sample-code-for-Kafka-consumer-tp21746.html
 To unsubscribe from Apache Spark User List, click here.
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml


 --
 View this message in context: Re: Any sample code for Kafka consumer
 http://apache-spark-user-list.1001560.n3.nabble.com/Any-sample-code-for-Kafka-consumer-tp21746p21758.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.



Re: In a Spark Streaming application, what might be the potential causes for util.AkkaUtils: Error sending message in 1 attempts and java.util.concurrent.TimeoutException: Futures timed out and

2015-02-19 Thread Tathagata Das
What version of Spark are you using?

TD

On Thu, Feb 19, 2015 at 2:45 AM, Emre Sevinc emre.sev...@gmail.com wrote:

 Hello,

 We have a Spark Streaming application that watches an input directory, and
 as files are copied there the application reads them and sends the contents
 to a RESTful web service, receives a response and write some contents to an
 output directory.

 When testing the application by copying a few thousand files at once to
 its input directory, we have realized that after having processed about
 3800 files, it creates messages as the following in the log file:

 15/02/19 10:22:06 INFO storage.MemoryStore: Block broadcast_17935 of size
 9960 dropped from memory (free 447798720)
 15/02/19 10:22:55 WARN util.AkkaUtils: Error sending message in 1 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
 at
 scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
 at
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
 at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
 at
 scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
 at scala.concurrent.Await$.result(package.scala:107)
 at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187)
 at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398)

 and then the Spark Streaming application dies.

 What might be the potential causes to check for such errors?

 Below you can see last few lines before it dies:


 15/02/19 10:22:03 INFO broadcast.TorrentBroadcast: Started reading
 broadcast variable 12894
 15/02/19 10:22:04 INFO storage.MemoryStore: ensureFreeSpace(20978) called
 with curMem=107884847, maxMem=556038881
 15/02/19 10:22:04 INFO storage.MemoryStore: Block broadcast_12894_piece0
 stored as bytes in memory (estimated size 20.5 KB, free 427.4 MB)
 15/02/19 10:22:04 INFO storage.BlockManagerMaster: Updated info of block
 broadcast_12894_piece0
 15/02/19 10:22:04 INFO broadcast.TorrentBroadcast: Reading broadcast
 variable 12894 took 460 ms
 15/02/19 10:22:04 INFO storage.MemoryStore: ensureFreeSpace(347363) called
 with curMem=107905825, maxMem=556038881
 15/02/19 10:22:04 INFO storage.MemoryStore: Block broadcast_12894 stored
 as values in memory (estimated size 339.2 KB, free 427.0 MB)
 15/02/19 10:22:04 INFO storage.MemoryStore: ensureFreeSpace(1079) called
 with curMem=108253188, maxMem=556038881
 15/02/19 10:22:04 INFO storage.MemoryStore: Block rdd_30466_35 stored as
 bytes in memory (estimated size 1079.0 B, free 427.0 MB)
 15/02/19 10:22:04 INFO storage.BlockManagerMaster: Updated info of block
 rdd_30466_35
 15/02/19 10:22:05 INFO storage.MemoryStore: ensureFreeSpace(5) called with
 curMem=108254267, maxMem=556038881
 15/02/19 10:22:05 INFO storage.MemoryStore: Block rdd_30467_35 stored as
 bytes in memory (estimated size 5.0 B, free 427.0 MB)
 15/02/19 10:22:05 INFO storage.BlockManagerMaster: Updated info of block
 rdd_30467_35
 15/02/19 10:22:05 INFO executor.Executor: Finished task 35.0 in stage
 351.0 (TID 12229). 2353 bytes result sent to driver
 15/02/19 10:22:06 INFO storage.BlockManager: Removing broadcast 17935
 15/02/19 10:22:06 INFO storage.BlockManager: Removing block
 broadcast_17935_piece0
 15/02/19 10:22:06 INFO storage.MemoryStore: Block broadcast_17935_piece0
 of size 4151 dropped from memory (free 447788760)
 15/02/19 10:22:06 INFO storage.BlockManagerMaster: Updated info of block
 broadcast_17935_piece0
 15/02/19 10:22:06 INFO storage.BlockManager: Removing block broadcast_17935
 15/02/19 10:22:06 INFO storage.MemoryStore: Block broadcast_17935 of size
 9960 dropped from memory (free 447798720)
 15/02/19 10:22:55 WARN util.AkkaUtils: Error sending message in 1 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
 at
 scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
 at
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
 at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
 at
 scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
 at scala.concurrent.Await$.result(package.scala:107)
 at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187)
 at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398)
 15/02/19 10:23:28 WARN util.AkkaUtils: Error sending message in 2 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
 at
 scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
 at
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
 at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
 at
 scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
 at scala.concurrent.Await$.result(package.scala:107)
 at 

Re: spark-local dir running out of space during long ALS run

2015-02-16 Thread Tathagata Das
Correct, brute force clean up is not useful. Since Spark 1.0, Spark can do
automatic cleanup of files based on which RDDs are used/garbage collected
by JVM. That would be the best way, but depends on the JVM GC
characteristics. If you force a GC periodically in the driver that might
help you get rid of files in the workers that are not needed.

TD

On Mon, Feb 16, 2015 at 12:27 AM, Antony Mayi antonym...@yahoo.com.invalid
wrote:

 spark.cleaner.ttl is not the right way - seems to be really designed for
 streaming. although it keeps the disk usage under control it also causes
 loss of rdds and broadcasts that are required later leading to crash.

 is there any other way?
 thanks,
 Antony.


   On Sunday, 15 February 2015, 21:42, Antony Mayi antonym...@yahoo.com
 wrote:



 spark.cleaner.ttl ?


   On Sunday, 15 February 2015, 18:23, Antony Mayi antonym...@yahoo.com
 wrote:



 Hi,

 I am running bigger ALS on spark 1.2.0 on yarn (cdh 5.3.0) - ALS is using
 about 3 billions of ratings and I am doing several trainImplicit() runs in
 loop within one spark session. I have four node cluster with 3TB disk space
 on each. before starting the job there is less then 8% of the disk space
 used. while the ALS is running I can see the disk usage rapidly growing
 mainly because of files being stored
 under 
 yarn/local/usercache/user/appcache/application_XXX_YYY/spark-local-ZZZ-AAA.
 after about 10 hours the disk usage hits 90% and yarn kills the particular
 containers.

 am I missing doing some cleanup somewhere while looping over the several
 trainImplicit() calls? taking 4*3TB of disk space seems immense.

 thanks for any help,
 Antony.








Re: NaiveBayes classifier causes ShuffleDependency class cast exception

2015-02-13 Thread Tathagata Das
You cannot have two Spark Contexts in the same JVM active at the same time.
Just create one SparkContext and then use it for both purpose.

TD

On Fri, Feb 6, 2015 at 8:49 PM, VISHNU SUBRAMANIAN 
johnfedrickena...@gmail.com wrote:

 Can you try creating just a single spark context  and then try your code.
 If you want to use it for streaming pass the same sparkcontext object
 instead of conf.

 Note: Instead of just replying to me , try to use reply to all so that the
 post is visible for the community . That way you can expect immediate
 responses.


 On Fri, Feb 6, 2015 at 6:09 AM, aanilpala aanilp...@gmail.com wrote:

 I have the following code:


 SparkConf conf = new
 SparkConf().setAppName(streamer).setMaster(local[2]);
 conf.set(spark.driver.allowMultipleContexts, true);
 JavaStreamingContext ssc = new JavaStreamingContext(conf, new
 Duration(batch_interval));
 ssc.checkpoint(/tmp/spark/checkpoint);

 SparkConf conf2 = new
 SparkConf().setAppName(classifier).setMaster(local[1]);
 conf2.set(spark.driver.allowMultipleContexts, true);
 JavaSparkContext sc = new JavaSparkContext(conf);

 JavaReceiverInputDStreamString stream =
 ssc.socketTextStream(localhost, );

 // String to Tuple3 Conversion
 JavaDStreamTuple3lt;Long, String, String tuple_stream =
 stream.map(new FunctionString, Tuple3lt;Long, String, String() {
  ... });

 JavaPairDStreamInteger, DictionaryEntry
 raw_dictionary_stream =
 tuple_stream.filter(new FunctionTuple3lt;Long, String,String,
 Boolean()
 {

 @Override
 public Boolean call(Tuple3Long, String,String
 tuple) throws Exception {
 if((tuple._1()/Time.scaling_factor %
 training_interval)  training_dur)
 NaiveBayes.train(sc.parallelize(training_set).rdd());

 return true;
 }


 }).

 I am working on a text mining project and I want to use
 NaiveBayesClassifier
 of MLlib to classify some stream items. So, I have two Spark contexts one
 of
 which is a streaming context. The call to NaiveBayes.train causes the
 following exception.

 Any ideas?


  Exception in thread main org.apache.spark.SparkException: Job aborted
 due
 to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure:
 Lost task 0.0 in stage 0.0 (TID 0, localhost):
 java.lang.ClassCastException:
 org.apache.spark.SparkContext$$anonfun$runJob$4 cannot be cast to
 org.apache.spark.ShuffleDependency
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:60)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)

 Driver stacktrace:
 at
 org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
 at

 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at

 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
 at scala.Option.foreach(Option.scala:236)
 at

 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
 at

 org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
 at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
 at

 org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
 at akka.actor.ActorCell.invoke(ActorCell.scala:487)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
 at akka.dispatch.Mailbox.run(Mailbox.scala:220)
 at

 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at

 

Re: Interact with streams in a non-blocking way

2015-02-13 Thread Tathagata Das
Here is an example of how you can do. Lets say myDStream contains the
data that you may want to asynchornously query, say using, Spark SQL.

val sqlContext = new SqlContext(streamingContext.sparkContext)

myDStream.foreachRDD { rdd =   // rdd is a RDD of case class
   sqlContext.registerRDDAsTable(rdd, mytable)
}

This will make sure that the table mytable always refers to the latest
RDD generated by the DStream.
Then from a diffrent thread you can asynchronously query

sqlContext.sql(select * from mytable)

Hope this helps.

TD



On Fri, Feb 13, 2015 at 3:59 AM, Sean Owen so...@cloudera.com wrote:

 Sure it's possible, but you would use Streaming to update some shared
 state, and create another service that accessed that shared state too.

 On Fri, Feb 13, 2015 at 11:57 AM, Tamas Jambor jambo...@gmail.com wrote:
  Thanks for the reply, I am trying to setup a streaming as a service
  approach, using the framework that is used for spark-jobserver. for that
 I
  would need to handle asynchronous  operations that are initiated from
  outside of the stream. Do you think it is not possible?
 
  On Fri Feb 13 2015 at 10:14:18 Sean Owen so...@cloudera.com wrote:
 
  You call awaitTermination() in the main thread, and indeed it blocks
  there forever. From there Spark Streaming takes over, and is invoking
  the operations you set up. Your operations have access to the data of
  course. That's the model; you don't make external threads that reach
  in to Spark Streaming's objects, but can easily create operations that
  take whatever actions you want and invoke them in Streaming.
 
  On Fri, Feb 13, 2015 at 10:04 AM, jamborta jambo...@gmail.com wrote:
   Hi all,
  
   I am trying to come up with a workflow where I can query streams
   asynchronously. The problem I have is a ssc.awaitTermination() line
   blocks
   the whole thread, so it is not straightforward to me whether it is
   possible
   to get hold of objects from streams once they are started. any
   suggestion on
   what is the best way to implement this?
  
   thanks,
  
  
  
   --
   View this message in context:
  
 http://apache-spark-user-list.1001560.n3.nabble.com/Interact-with-streams-in-a-non-blocking-way-tp21640.html
   Sent from the Apache Spark User List mailing list archive at
 Nabble.com.
  
   -
   To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
   For additional commands, e-mail: user-h...@spark.apache.org
  

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Concurrent batch processing

2015-02-12 Thread Tathagata Das
So you have come across spark.streaming.concurrentJobs already :)
Yeah, that is an undocumented feature that does allow multiple output
operations to submitted in parallel. However, this is not made public for
the exact reasons that you realized - the semantics in case of stateful
operations is not clear. It is semantically safe to enable, how ever it may
cause redundant computations, as next batches jobs may recompute some RDDs
twice rather than using the cached values of another RDDs.

In general, only in a very few cases is it useful to increase this
concurrency. If batch processing times  batch interval, then you need to
use more resources, and parallelize the ingestion and processing enough to
utilize those resources efficiently.
The spikes that you see despite average hardware utilization is low
probably indicates that the parallelization of the Spark Streaming jobs is
insufficient. There are bunch of optimizations that can be done.
http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-processing-time-of-each-batch

If you have already done this, can you tell me more about what sort of
utilization and psike do you see, and what sort of parallelization you have
already done?

TD

On Thu, Feb 12, 2015 at 12:09 PM, Matus Faro matus.f...@kik.com wrote:

 I've been experimenting with my configuration for couple of days and
 gained quite a bit of power through small optimizations, but it may very
 well be something I'm doing crazy that is causing this problem.

 To give a little bit of a background, I am in the early stages of a
 project that consumes a stream of data in the order of 100,000 per second
 that requires processing over a sliding window over one day (ideally a
 week). Spark Streaming is a good candidate but I want to make sure I squash
 any performance issues ahead of time before I commit.

 With a 5 second batch size, in 40 minutes, the processing time is also 5
 seconds. I see the CPU spikes over two seconds out of five. I assume the
 sliding window operation is very expensive in this case and that's the root
 cause of this effect.

 I should've done a little bit more research before I posted, I just came
 across a post about an undocumented property spark.streaming.concurrentJobs
 that I am about to try. I'm still confused how exactly this works with a
 sliding window where the result of one batch depends on the other. I assume
 the concurrency can only be achieved up until the window action is
 executed. Either way, I am going to give this a try and post back here if
 that doesn't work.

 Thanks!



 On Thu, Feb 12, 2015 at 2:55 PM, Arush Kharbanda 
 ar...@sigmoidanalytics.com wrote:

 It could depend on the nature of your application but spark streaming
 would use spark internally and concurrency should be there what is your use
 case?


 Are you sure that your configuration is good?


 On Fri, Feb 13, 2015 at 1:17 AM, Matus Faro matus.f...@kik.com wrote:

 Hi,

 Please correct me if I'm wrong, in Spark Streaming, next batch will
 not start processing until the previous batch has completed. Is there
 any way to be able to start processing the next batch if the previous
 batch is taking longer to process than the batch interval?

 The problem I am facing is that I don't see a hardware bottleneck in
 my Spark cluster, but Spark is not able to handle the amount of data I
 am pumping through (batch processing time is longer than batch
 interval). What I'm seeing is spikes of CPU, network and disk IO usage
 which I assume are due to different stages of a job, but on average,
 the hardware is under utilized. Concurrency in batch processing would
 allow the average batch processing time to be greater than batch
 interval while fully utilizing the hardware.

 Any ideas on what can be done? One option I can think of is to split
 the application into multiple applications running concurrently and
 dividing the initial stream of data between those applications.
 However, I would have to lose the benefits of having a single
 application.

 Thank you,
 Matus

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




 --

 [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com

 *Arush Kharbanda* || Technical Teamlead

 ar...@sigmoidanalytics.com || www.sigmoidanalytics.com





Re: How to broadcast a variable read from a file in yarn-cluster mode?

2015-02-12 Thread Tathagata Das
Can you give me the whole logs?

TD

On Tue, Feb 10, 2015 at 10:48 AM, Jon Gregg jonrgr...@gmail.com wrote:

 OK that worked and getting close here ... the job ran successfully for a
 bit and I got output for the first couple buckets before getting a
 java.lang.Exception: Could not compute split, block input-0-1423593163000
 not found error.

 So I bumped up the memory at the command line from 2 gb to 5 gb, ran it
 again ... this time I got around 8 successful outputs before erroring.

 Bumped up the memory from 5 gb to 10 gb ... got around 15 successful
 outputs before erroring.


 I'm not persisting or caching anything except for the broadcast IP table
 and another broadcast small user agents list used for the same type of
 filtering, and both files are tiny.  The Hadoop cluster is nearly empty
 right now and has more than enough available memory to handle this job.  I
 am connecting to Kafka as well and so there's a lot of data coming through
 as my index is trying to catch up to the current date, but yarn-client mode
 has several times in the past few weeks been able to catch up to the
 current date and run successfully for days without issue.

 My guess is memory isn't being cleared after each bucket?  Relevant
 portion of the log below.


 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593134400 on
 phd40010023.na.com:1 in memory (size: 50.1 MB, free: 10.2 GB)
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593135400 on
 phd40010023.na.com:1 in memory (size: 24.9 MB, free: 10.2 GB)
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593136400 on
 phd40010023.na.com:1 in memory (size: 129.0 MB, free: 10.3 GB)
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593137000 on
 phd40010023.na.com:1 in memory (size: 112.4 MB, free: 10.4 GB)
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593137200 on
 phd40010023.na.com:1 in memory (size: 481.0 B, free: 10.4 GB)
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593137800 on
 phd40010023.na.com:1 in memory (size: 44.6 MB, free: 10.5 GB)
 15/02/10 13:34:54 INFO MappedRDD: Removing RDD 754 from persistence list
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593138400 on
 phd40010023.na.com:1 in memory (size: 95.8 MB, free: 10.6 GB)
 15/02/10 13:34:54 INFO BlockManager: Removing RDD 754
 15/02/10 13:34:54 INFO MapPartitionsRDD: Removing RDD 753 from persistence
 list
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593138600 on
 phd40010023.na.com:1 in memory (size: 123.2 MB, free: 10.7 GB)
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593138800 on
 phd40010023.na.com:1 in memory (size: 5.2 KB, free: 10.7 GB)
 15/02/10 13:34:54 INFO BlockManager: Removing RDD 753
 15/02/10 13:34:54 INFO MappedRDD: Removing RDD 750 from persistence list
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593139600 on
 phd40010023.na.com:1 in memory (size: 106.4 MB, free: 10.8 GB)
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593139800 on
 phd40010023.na.com:1 in memory (size: 107.0 MB, free: 10.9 GB)
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-142359314 on
 phd40010023.na.com:1 in memory (size: 59.5 MB, free: 10.9 GB)
 15/02/10 13:34:54 INFO BlockManager: Removing RDD 750
 15/02/10 13:34:54 INFO MappedRDD: Removing RDD 749 from persistence list
 15/02/10 13:34:54 INFO DAGScheduler: Missing parents: List(Stage 117,
 Stage 114, Stage 115, Stage 116)
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593140200 on
 phd40010023.na.com:1 in memory (size: 845.0 B, free: 10.9 GB)
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593140600 on
 phd40010023.na.com:1 in memory (size: 19.2 MB, free: 11.0 GB)
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593140800 on
 phd40010023.na.com:1 in memory (size: 492.0 B, free: 11.0 GB)
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593141000 on
 phd40010023.na.com:1 in memory (size: 1018.0 B, free: 11.0 GB)
 15/02/10 13:34:54 INFO BlockManager: Removing RDD 749
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593142400 on
 phd40010023.na.com:1 in memory (size: 48.6 MB, free: 11.0 GB)
 15/02/10 13:34:54 INFO MappedRDD: Removing RDD 767 from persistence list
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593142600 on
 phd40010023.na.com:1 in memory (size: 4.9 KB, free: 11.0 GB)
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593142800 on
 phd40010023.na.com:1 in memory (size: 780.0 B, free: 11.0 GB)
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593143800 on
 phd40010023.na.com:1 in memory (size: 847.0 B, free: 11.0 GB)
 15/02/10 13:34:54 INFO BlockManager: Removing RDD 767
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593144400 on
 phd40010023.na.com:1 in memory (size: 43.7 MB, 

Re: Streaming scheduling delay

2015-02-12 Thread Tathagata Das
) on executor nodedn1-22-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 54]
 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.55 in stage 16291.0
(TID 1042745) on executor nodedn1-18-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 55]
 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.56 in stage 16291.0
(TID 1042754) on executor nodedn1-17-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 56]
 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.57 in stage 16291.0
(TID 1042758) on executor nodedn1-17-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 57]
 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.58 in stage 16291.0
(TID 1042762) on executor nodedn1-12-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 58]
 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.59 in stage 16291.0
(TID 1042766) on executor nodedn1-23-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 59]
 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.60 in stage 16291.0
(TID 1042774) on executor nodedn1-20-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 60]
 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.61 in stage 16291.0
(TID 1042779) on executor nodedn1-13-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 61]
 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.62 in stage 16291.0
(TID 1042789) on executor nodedn1-20-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 62]
 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.63 in stage 16291.0
(TID 1042793) on executor nodedn1-15-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 63]
 org.apache.spark.SparkException: Job aborted due to stage failure: Task
54 in stage 16291.0 failed 64 times, most recent failure: Lost task 54.63
in stage 16291.0 (TID 1042793, nodedn1-15-acme.com): java.lang.Exception:
Could not compute split, block input-4-1423758372200 not found
 Exception in thread main org.apache.spark.SparkException: Job aborted
due to stage failure: Task 54 in stage 16291.0 failed 64 times, most recent
failure: Lost task 54.63 in stage 16291.0 (TID 1042793, nodedn1-15-acme.com):
java.lang.Exception: Could not compute split, block input-4-1423758372200
not found


 Thanks for looking into it.





 On Thu, Feb 12, 2015 at 8:10 PM, Tathagata Das t...@databricks.com
wrote:

 Hey Tim,

 Let me get the key points.
 1. If you are not writing back to Kafka, the delay is stable? That is,
instead of foreachRDD { // write to kafka }  if you do dstream.count,
then the delay is stable. Right?
 2. If so, then Kafka is the bottleneck. Is the number of partitions,
that you spoke of the in the second mail, that determines the parallelism
in writes? Is it stable with 30 partitions?

 Regarding the block exception, could you give me a trace of info level
logging that leads to this error? Basically I want trace the lifecycle of
the block.

 TD



 On Thu, Feb 12, 2015 at 6:29 PM, Tim Smith secs...@gmail.com wrote:

 Hi Gerard,

 Great write-up and really good guidance in there.

 I have to be honest, I don't know why but setting # of partitions for
each dStream to a low number (5-10) just causes the app to choke/crash.
Setting it to 20 gets the app going but with not so great delays. Bump it
up to 30 and I start winning the war where processing time is consistently
below batch time window (20 seconds) except for a batch every few batches
where the compute time spikes 10x the usual.

 Following your guide, I took out some logInfo statements I had in the
app but didn't seem to make much difference :(

 With a higher time window (20 seconds), I got the app to run stably for
a few hours but then ran into the dreaded java.lang.Exception: Could not
compute split, block input-0-1423761240800 not found. Wonder if I need to
add RDD persistence back?

 Also, I am reaching out to Virdata with some ProServ inquiries.

 Thanks





 On Thu, Feb 12, 2015 at 4:30 AM, Gerard Maas gerard.m...@gmail.com
wrote:

 Hi Tim,

 From this:  There are 5 kafka receivers and each incoming stream is
split into 40 partitions  I suspect that you're creating too many tasks
for Spark to process on time.
 Could you try some of the 'knobs' I describe here to see if that would
help?

 http://www.virdata.com/tuning-spark/

 -kr, Gerard.

 On Thu, Feb 12, 2015 at 8:44 AM, Tim Smith secs...@gmail.com wrote:

 Just read the thread Are these numbers abnormal for spark
streaming? and I think I am seeing similar results - that is - increasing
the window seems to be the trick

Re: Spark streaming job throwing ClassNotFound exception when recovering from checkpointing

2015-02-12 Thread Tathagata Das
Could you come up with a minimal example through which I can reproduce the
problem?

On Tue, Feb 10, 2015 at 12:30 PM, conor fennell.co...@gmail.com wrote:

 I am getting the following error when I kill the spark driver and restart
 the job:

 15/02/10 17:31:05 INFO CheckpointReader: Attempting to load checkpoint from
 file

 hdfs://hdfs-namenode.vagrant:9000/reporting/SampleJob$0.1.0/checkpoint-142358910.bk
 15/02/10 17:31:05 WARN CheckpointReader: Error reading checkpoint from
 file

 hdfs://hdfs-namenode.vagrant:9000/reporting/SampleJob$0.1.0/checkpoint-142358910.bk
 java.io.IOException: java.lang.ClassNotFoundException:
 com.example.spark.streaming.reporting.live.jobs.Bucket
 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:988)
 at
 org.apache.spark.streaming.DStreamGraph.readObject(DStreamGraph.scala:176)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at

 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at

 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)



 Spark version is 1.2.0

 The streaming job is executing every 10 seconds with the following steps:

   1. Consuming JSON from a kafka topic called journeys and converting to
   case classes
   2. Filters resulting journeys stream based on a time attribute being set
   3. FlatMaps journeys into separate time buckets 15, 60 and 1440 minutes
   e.g. (HOUR1234569000, ActiveState(HOUR, 1234569000,
 hyperLogLog(journey
   id), 360) )
   4. ReduceByKey adding hyperloglogs
   5. UpdateStateByKey to add to previous states hyperloglog
   6. Then output results to Cassandra


 I have pasted in a sample app below to mimic the problem and put all
 classes
 into one file, it is also attached here  SampleJob.scala
 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n21582/SampleJob.scala
 

 To get around the issue for the moment, I have removed the Bucket class and
 stopped passing in a bucket array to the ActiveJourney class.
 And instead I hard code all the time buckets I need in the ActiveJourney
 class; this approach works and recovers from checkpointing but is not
 extensible.

 Can the Spark gurus explain why I get that ClassNotFound exception?

 Need any more information, please let me know.

 Much thanks,
 Conor



 package com.example.spark.streaming.reporting.live.jobs
 import java.util.Date
 import scala.Array.canBuildFrom
 import scala.collection.mutable.MutableList
 import org.apache.spark.SparkConf
 import org.apache.spark.streaming.Seconds
 import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
 import org.apache.spark.streaming.dstream.DStream
 import org.apache.spark.streaming.kafka.KafkaUtils
 import org.json4s.DefaultFormats
 import org.json4s.jackson.JsonMethods.parse
 import org.json4s.jvalue2extractable
 import org.json4s.string2JsonInput
 import com.example.spark.streaming.utils.MilliSecondUtils
 import com.example.spark.streaming.utils.constants.ColumnFamilies
 import com.example.spark.streaming.utils.constants.Constants
 import com.example.spark.streaming.utils.constants.Milliseconds
 import com.example.spark.streaming.utils.constants.SparkConfig
 import com.datastax.spark.connector.SomeColumns
 import com.datastax.spark.connector.streaming.toDStreamFunctions
 import com.datastax.spark.connector.toNamedColumnRef
 import com.twitter.algebird.HLL
 import com.twitter.algebird.HyperLogLogMonoid
 // Json parsing classes
 case class Journey(o: Option[JourneyCommand], o2: Option[JourneyDetails])
 case class JourneyDetails(_id: String)
 case class JourneyCommand($set: Option[JourneySet])
 case class JourneySet(awayAt: Date)
 // Class not found bucket
 case class Bucket(val bucketType: String, val roundDown: (Long) = Long,
 val
 columnFamily: String, val size: Long, val maxIntervals: Int)

 // used for updateStateByKey
 case class ActiveState(var bucketType: String, var time: Long, var
 hyperLogLog: HLL, var ttl: Int)

 object SampleJob {
  private final val Name = this.getClass().getSimpleName()
  def main(args: Array[String]) {
if (args.length  8) {
  System.err.println(sUsage: $Name enviroment zkQuorum group
 topics numThreads hdfsUri cassandra intervalSeconds)
  System.exit(1)
}
System.out.print(args)
val Array(environment, zkQuorum, group, topics, numThreads, hdfsUri,
 cassandra, intervalSeconds) = args
val checkpointDirectory = hdfsUri + /reporting/ + Name + getClass().
 getPackage().getImplementationVersion()
def functionToCreateContext(): StreamingContext = {

  // how many buckets
  val fifteen = Bucket(QUARTER_HOUR, MilliSecondUtils.
 roundDownToNearestQuarterHour, ColumnFamilies.Visits_15, Milliseconds.
 FifteenMinutes, 90)
  val hour = Bucket(HOUR, MilliSecondUtils.roundDownToNearestHour,
 ColumnFamilies.Visits_60, Milliseconds.Hour, 360)
  val day 

Re: streaming joining multiple streams

2015-02-12 Thread Tathagata Das
Sorry for the late response. With the amount of data you are planning join,
any system would take time. However, between Hive's MapRduce joins, and
Spark's basic shuffle, and Spark SQL's join, the latter wins hands down.
Furthermore, with the APIs of Spark and Spark Streaming, you will have to
do strictly less work to set the infrastructure that you want to build.

Yes, Spark Streaming currently does not support providing own timer,
because the logic to handle delays etc, is pretty complex and specific to
each application. Usually that logic can be implemented on top of the
windowing solutoin that Spark Streaming already provides.

TD



On Thu, Feb 5, 2015 at 7:37 AM, Zilvinas Saltys zilvinas.sal...@gmail.com
wrote:

 The challenge I have is this. There's two streams of data where an event
 might look like this in stream1: (time, hashkey, foo1) and in stream2:
 (time, hashkey, foo2)
 The result after joining should be (time, hashkey, foo1, foo2) .. The join
 happens on hashkey and the time difference can be ~30 mins between events.
 The amount of data is enormous .. hundreds of billions of events per
 month. I need not only join the existing history data but continue to do so
 with incoming data (comes in batches not really streamed)

 For now I was thinking to implement this in MapReduce and sliding windows
 .. I'm wondering if spark can actually help me with this sort of challenge?
 How would a join of two huge streams of historic data would actually
 happen internally within spark and would it be more efficient than let's
 say hive map reduce stream join of two big tables?

 I also saw spark streaming has windowing support but it seems you cannot
 provide your own timer? As in I cannot make the time be derived from events
 itself rather than having an actual clock running.

 Thanks,



Re: Streaming scheduling delay

2015-02-12 Thread Tathagata Das
Hey Tim,

Let me get the key points.
1. If you are not writing back to Kafka, the delay is stable? That is,
instead of foreachRDD { // write to kafka }  if you do dstream.count,
then the delay is stable. Right?
2. If so, then Kafka is the bottleneck. Is the number of partitions, that
you spoke of the in the second mail, that determines the parallelism in
writes? Is it stable with 30 partitions?

Regarding the block exception, could you give me a trace of info level
logging that leads to this error? Basically I want trace the lifecycle of
the block.

On Thu, Feb 12, 2015 at 6:29 PM, Tim Smith secs...@gmail.com wrote:

 Hi Gerard,

 Great write-up and really good guidance in there.

 I have to be honest, I don't know why but setting # of partitions for each
 dStream to a low number (5-10) just causes the app to choke/crash. Setting
 it to 20 gets the app going but with not so great delays. Bump it up to 30
 and I start winning the war where processing time is consistently below
 batch time window (20 seconds) except for a batch every few batches where
 the compute time spikes 10x the usual.

 Following your guide, I took out some logInfo statements I had in the
 app but didn't seem to make much difference :(

 With a higher time window (20 seconds), I got the app to run stably for a
 few hours but then ran into the dreaded java.lang.Exception: Could not
 compute split, block input-0-1423761240800 not found. Wonder if I need to
 add RDD persistence back?

 Also, I am reaching out to Virdata with some ProServ inquiries.

 Thanks





 On Thu, Feb 12, 2015 at 4:30 AM, Gerard Maas gerard.m...@gmail.com
 wrote:

 Hi Tim,

 From this:  There are 5 kafka receivers and each incoming stream is
 split into 40 partitions  I suspect that you're creating too many tasks
 for Spark to process on time.
 Could you try some of the 'knobs' I describe here to see if that would
 help?

 http://www.virdata.com/tuning-spark/

 -kr, Gerard.

 On Thu, Feb 12, 2015 at 8:44 AM, Tim Smith secs...@gmail.com wrote:

 Just read the thread Are these numbers abnormal for spark streaming?
 and I think I am seeing similar results - that is - increasing the window
 seems to be the trick here. I will have to monitor for a few hours/days
 before I can conclude (there are so many knobs/dials).



 On Wed, Feb 11, 2015 at 11:16 PM, Tim Smith secs...@gmail.com wrote:

 On Spark 1.2 (have been seeing this behaviour since 1.0), I have a
 streaming app that consumes data from Kafka and writes it back to Kafka
 (different topic). My big problem has been Total Delay. While execution
 time is usually window size (in seconds), the total delay ranges from a
 minutes to hours(s) (keeps going up).

 For a little while, I thought I had solved the issue by bumping up the
 driver memory. Then I expanded my Kafka cluster to add more nodes and the
 issue came up again. I tried a few things to smoke out the issue and
 something tells me the driver is the bottleneck again:

 1) From my app, I took out the entire write-out-to-kafka piece. Sure
 enough, execution, scheduling delay and hence total delay fell to sub
 second. This assured me that whatever processing I do before writing back
 to kafka isn't the bottleneck.

 2) In my app, I had RDD persistence set at different points but my code
 wasn't really re-using any RDDs so I took out all explicit persist()
 statements. And added, spar...unpersist to true in the context. After
 this, it doesn't seem to matter how much memory I give my executor, the
 total delay seems to be in the same range. I tried per executor memory from
 2G to 12G with no change in total delay so executors aren't memory starved.
 Also, in the SparkUI, under the Executors tab, all executors show 0/1060MB
 used when per executor memory is set to 2GB, for example.

 3) Input rate in the kafka consumer restricts spikes in incoming data.

 4) Tried FIFO and FAIR but didn't make any difference.

 5) Adding executors beyond a certain points seems useless (I guess
 excess ones just sit idle).

 At any given point in time, the SparkUI shows only one batch pending
 processing. So with just one batch pending processing, why would the
 scheduling delay run into minutes/hours if execution time is within the
 batch window duration? There aren't any failed stages or jobs.

 Right now, I have 100 executors ( i have tried setting executors from
 50-150), each with 2GB and 4 cores and the driver running with 16GB. There
 are 5 kafka receivers and each incoming stream is split into 40 partitions.
 Per receiver, input rate is restricted to 2 messages per second.

 Can anyone help me with clues or areas to look into, for
 troubleshooting the issue?

 One nugget I found buried in the code says:
 The scheduler delay includes the network delay to send the task to the
 worker machine and to send back the result (but not the time to fetch the
 task result, if it needed to be fetched from the block manager on the
 worker).

 

Re: StreamingContext getOrCreate with queueStream

2015-02-05 Thread Tathagata Das
I dont think your screenshots came through in the email. None the less,
queueStream will not work with getOrCreate. Its mainly for testing (by
generating your own RDDs) and not really useful for production usage (where
you really need to checkpoint-based recovery).

TD

On Thu, Feb 5, 2015 at 4:12 PM, pnpritchard nicholas.pritch...@falkonry.com
 wrote:

 I am trying to use the StreamingContext getOrCreate method in my app.

 I started by running the example ( RecoverableNetworkWordCount
 
 https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
 
 ), which worked as expected. However, when I modified that example to use
 /queueStream/ rather than /socketTextStream/ for it's input, then things
 broke down. I first ran it with an empty checkpoint directory, then
 restarted the app and got a NPE (copied below).

 Is this a known limitation of using queueStream? Am I assuming something by
 using it? Thanks in advance, for any advice!



 FYI, I changed line 73 in the example to be:




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/StreamingContext-getOrCreate-with-queueStream-tp21528.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: how to send JavaDStream RDD using foreachRDD using Java

2015-02-02 Thread Tathagata Das
Hello Sachin,

While Akhil's solution is correct, this is not sufficient for your usecase.
RDD.foreach (that Akhil is using) will run on the workers, but you are
creating the Producer object on the driver. This will not work, a producer
create on the driver cannot be used from the worker/executor. The best way
to do what you want to do is to use rdd.foreachPartition. Inside the
function supplied to RDD.foreachPartition, create the producer, send the
whole partition, and close the producer. Am an phone so I am not able to
generate Java code.

TD

On Mon, Feb 2, 2015 at 11:38 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Here you go:

 JavaDStreamString textStream =
 ssc.textFileStream(/home/akhld/sigmoid/);

 textStream.foreachRDD(new FunctionJavaRDDString,Void() {

 @Override
 public Void call(JavaRDDString rdd) throws Exception {
 // TODO Auto-generated method stub
 rdd.foreach(new VoidFunctionString(){

 @Override
 public void call(String stringData) throws Exception {
 // Use this data!
 System.out.println(W00t!! Data : + stringData);
 }
  });
  return null;
 }
   });

 Thanks
 Best Regards

 On Sun, Feb 1, 2015 at 9:08 PM, sachin Singh sachin.sha...@gmail.com
 wrote:

 Hi I want to send streaming data to kafka topic,
 I am having RDD data which I converted in JavaDStream ,now I want to send
 it
 to kafka topic, I don't want kafka sending code, just I need foreachRDD
 implementation, my code is look like as
 public void publishtoKafka(ITblStream t)
 {
 MyTopicProducer MTP =
 ProducerFactory.createProducer(hostname+:+port);
 JavaDStream? rdd = (JavaDStream?) t.getRDD();

 rdd.foreachRDD(new FunctionString, String() {
 @Override
 public Void call(JavaRDDString rdd) throws Exception {
  KafkaUtils.sendDataAsString(MTP,topicName, String RDDData);
 return null;
 }
   });
 log.debug(sent to kafka:
 --);

 }

 here myTopicproducer will create producer which is working fine
 KafkaUtils.sendDataAsString is method which will publish data to kafka
 topic
 is also working fine,

 I have only one problem I am not able to convert JavaDStream rdd as string
 using foreach or foreachRDD finally I need String message from rdds,
 kindly
 suggest java code only and I dont want to use anonymous classes, Please
 send
 me only the part to send JavaDStream RDD using foreachRDD using Function
 Call

 Thanks in advance,



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/how-to-send-JavaDStream-RDD-using-foreachRDD-using-Java-tp21456.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Error when Spark streaming consumes from Kafka

2015-02-02 Thread Tathagata Das
This is an issue that is hard to resolve without rearchitecting the whole
Kafka Receiver. There are some workarounds worth looking into.

http://mail-archives.apache.org/mod_mbox/kafka-users/201312.mbox/%3CCAFbh0Q38qQ0aAg_cj=jzk-kbi8xwf+1m6xlj+fzf6eetj9z...@mail.gmail.com%3E

On Mon, Feb 2, 2015 at 1:07 PM, Greg Temchenko s...@dicefield.com wrote:

 Hi,

 This seems not fixed yet.
 I filed an issue in jira: https://issues.apache.org/jira/browse/SPARK-5505

 Greg



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-Spark-streaming-consumes-from-Kafka-tp19570p21471.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Build error

2015-01-30 Thread Tathagata Das
That is a known issue uncovered last week. It fails on certain
environments, not on Jenkins which is our testing environment.
There is already a PR up to fix it. For now you can build using mvn
package -DskipTests
TD

On Fri, Jan 30, 2015 at 8:59 PM, Andrew Musselman 
andrew.mussel...@gmail.com wrote:

 Off master, got this error; is that typical?

 ---
  T E S T S
 ---
 Running org.apache.spark.streaming.mqtt.JavaMQTTStreamSuite
 Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 2.495 sec
 - in org.apache.spark.streaming.mqtt.JavaMQTTStreamSuite

 Results :




 Tests run: 1, Failures: 0, Errors: 0, Skipped: 0

 [INFO]
 [INFO] --- scalatest-maven-plugin:1.0:test (test) @
 spark-streaming-mqtt_2.10 ---
 Discovery starting.
 Discovery completed in 498 milliseconds.
 Run starting. Expected test count is: 1
 MQTTStreamSuite:
 - mqtt input stream *** FAILED ***
   org.eclipse.paho.client.mqttv3.MqttException: Too many publishes in
 progress
   at
 org.eclipse.paho.client.mqttv3.internal.ClientState.send(ClientState.java:432)
   at
 org.eclipse.paho.client.mqttv3.internal.ClientComms.internalSend(ClientComms.java:121)
   at
 org.eclipse.paho.client.mqttv3.internal.ClientComms.sendNoWait(ClientComms.java:139)
   at org.eclipse.paho.client.mqttv3.MqttTopic.publish(MqttTopic.java:107)
   at
 org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$publishData$1.apply(MQTTStreamSuite.scala:125)
   at
 org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$publishData$1.apply(MQTTStreamSuite.scala:124)
   at scala.collection.immutable.Range.foreach(Range.scala:141)
   at
 org.apache.spark.streaming.mqtt.MQTTStreamSuite.publishData(MQTTStreamSuite.scala:124)
   at
 org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$3.apply$mcV$sp(MQTTStreamSuite.scala:78)
   at
 org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$3.apply(MQTTStreamSuite.scala:66)
   ...
 Exception in thread Thread-20 org.apache.spark.SparkException: Job
 cancelled because SparkContext was shut down
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:690)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:689)
 at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
 at
 org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:689)
 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1384)
 at org.apache.spark.util.EventLoop.stop(EventLoop.scala:81)
 at
 org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1319)
 at org.apache.spark.SparkContext.stop(SparkContext.scala:1250)
 at
 org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:510)
 at
 org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:485)
 at
 org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$2.apply$mcV$sp(MQTTStreamSuite.scala:59)
 at
 org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$2.apply(MQTTStreamSuite.scala:57)
 at
 org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$2.apply(MQTTStreamSuite.scala:57)
 at org.scalatest.BeforeAndAfter$class.runTest(BeforeAndAfter.scala:210)
 at
 org.apache.spark.streaming.mqtt.MQTTStreamSuite.runTest(MQTTStreamSuite.scala:38)
 at
 org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
 at
 org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
 at
 org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
 at
 org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
 at org.scalatest.SuperEngine.org
 $scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
 at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
 at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208)
 at org.scalatest.FunSuite.runTests(FunSuite.scala:1555)
 at org.scalatest.Suite$class.run(Suite.scala:1424)
 at org.scalatest.FunSuite.org
 $scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555)
 at
 org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
 at
 org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
 at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
 at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212)
 at org.apache.spark.streaming.mqtt.MQTTStreamSuite.org
 $scalatest$BeforeAndAfter$$super$run(MQTTStreamSuite.scala:38)
 at org.scalatest.BeforeAndAfter$class.run(BeforeAndAfter.scala:241)
 at
 org.apache.spark.streaming.mqtt.MQTTStreamSuite.run(MQTTStreamSuite.scala:38)
 at 

Re: reduceByKeyAndWindow, but using log timestamps instead of clock seconds

2015-01-28 Thread Tathagata Das
Ohhh nice! Would be great if you can share us some code soon. It is
indeed a very complicated problem and there is probably no single
solution that fits all usecases. So having one way of doing things
would be a great reference. Looking forward to that!

On Wed, Jan 28, 2015 at 4:52 PM, Tobias Pfeiffer t...@preferred.jp wrote:
 Hi,

 On Thu, Jan 29, 2015 at 1:54 AM, YaoPau jonrgr...@gmail.com wrote:

 My thinking is to maintain state in an RDD and update it an persist it
 with
 each 2-second pass, but this also seems like it could get messy.  Any
 thoughts or examples that might help me?


 I have just implemented some timestamp-based windowing on DStreams (can't
 share the code now, but will be published a couple of months ahead),
 although with the assumption that items are in correct order. The main
 challenge (rather technical) was to keep proper state across RDD boundaries
 and to tell the state you can mark this partial window from the last
 interval as 'complete' now without shuffling too much data around. For
 example, if there are some empty intervals, you don't know when the next
 item to go into the partial window will arrive, or if there will be one at
 all. I guess if you want to have out-of-order tolerance, that will become
 even trickier, as you need to define and think about some timeout for
 partial windows in your state...

 Tobias



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Error reporting/collecting for users

2015-01-28 Thread Tathagata Das
You could use foreachRDD to do the operations and then inside the
foreach create an accumulator to gather all the errors together

dstream.foreachRDD { rdd =

   val accumulator = new Accumulator[]

   rdd.map { . }.count   // whatever operation that is error prone

   // gather all errors from  accumulator and show on webpage, push to
database?
}


On Tue, Jan 27, 2015 at 9:34 PM, Tobias Pfeiffer t...@preferred.jp wrote:
 Hi,

 On Wed, Jan 28, 2015 at 1:45 PM, Soumitra Kumar kumar.soumi...@gmail.com
 wrote:

 It is a Streaming application, so how/when do you plan to access the
 accumulator on driver?


 Well... maybe there would be some user command or web interface showing the
 errors that have happened during processing...?

 Thanks
 Tobias


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to make spark partition sticky, i.e. stay with node?

2015-01-23 Thread Tathagata Das
Hello mingyu,

That is a reasonable way of doing this. Spark Streaming natively does
not support sticky because Spark launches tasks based on data
locality. If there is no locality (example reduce tasks can run
anywhere), location is randomly assigned. So the cogroup or join
introduces a locality and which forces Spark scheduler to be sticky.
Another way to achieve this is using updateStateByKey which
internally uses cogroup, but presents a nicer streaming-like API for
per-key stateful operations.

TD

On Fri, Jan 23, 2015 at 8:23 AM, mingyu mingyut...@gmail.com wrote:
 I found a workaround.
 I can make my auxiliary data a RDD. Partition it and cache it.
 Later, I can cogroup it with other RDDs and Spark will try to keep the
 cached RDD partitions where they are and not shuffle them.



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-make-spark-partition-sticky-i-e-stay-with-node-tp21322p21338.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Are these numbers abnormal for spark streaming?

2015-01-22 Thread Tathagata Das
This is not normal. Its a huge scheduling delay!! Can you tell me more
about the application?
- cluser setup, number of receivers, whats the computation, etc.

On Thu, Jan 22, 2015 at 3:11 AM, Ashic Mahtab as...@live.com wrote:

 Hate to do this...but...erm...bump? Would really appreciate input from
 others using Streaming. Or at least some docs that would tell me if these
 are expected or not.

 --
 From: as...@live.com
 To: user@spark.apache.org
 Subject: Are these numbers abnormal for spark streaming?
 Date: Wed, 21 Jan 2015 11:26:31 +


 Hi Guys,
 I've got Spark Streaming set up for a low data rate system (using spark's
 features for analysis, rather than high throughput). Messages are coming in
 throughout the day, at around 1-20 per second (finger in the air
 estimate...not analysed yet).  In the spark streaming UI for the
 application, I'm getting the following after 17 hours.

 Streaming

- *Started at: *Tue Jan 20 16:58:43 GMT 2015
- *Time since start: *18 hours 24 minutes 34 seconds
- *Network receivers: *2
- *Batch interval: *2 seconds
- *Processed batches: *16482
- *Waiting batches: *1



 Statistics over last 100 processed batchesReceiver Statistics

- Receiver


- Status


- Location


- Records in last batch
- [2015/01/21 11:23:18]


- Minimum rate
- [records/sec]


- Median rate
- [records/sec]


- Maximum rate
- [records/sec]


- Last Error

 RmqReceiver-0ACTIVEF
 144727-RmqReceiver-1ACTIVEBR
 124726-
 Batch Processing Statistics

MetricLast batchMinimum25th percentileMedian75th 
 percentileMaximumProcessing
Time3 seconds 994 ms157 ms4 seconds 16 ms4 seconds 961 ms5 seconds 3 ms5
seconds 171 msScheduling Delay9 hours 15 minutes 4 seconds9 hours 10
minutes 54 seconds9 hours 11 minutes 56 seconds9 hours 12 minutes 57
seconds9 hours 14 minutes 5 seconds9 hours 15 minutes 4 secondsTotal
Delay9 hours 15 minutes 8 seconds9 hours 10 minutes 58 seconds9 hours
12 minutes9 hours 13 minutes 2 seconds9 hours 14 minutes 10 seconds9
hours 15 minutes 8 seconds


 Are these normal. I was wondering what the scheduling delay and total
 delay terms are, and if it's normal for them to be 9 hours.

 I've got a standalone spark master and 4 spark nodes. The streaming app
 has been given 4 cores, and it's using 1 core per worker node. The
 streaming app is submitted from a 5th machine, and that machine has nothing
 but the driver running. The worker nodes are running alongside Cassandra
 (and reading and writing to it).

 Any insights would be appreciated.

 Regards,
 Ashic.



Re: Big performance difference between client and cluster deployment mode; is this expected?

2014-12-31 Thread Tathagata Das
Whats your spark-submit commands in both cases? Is it Spark Standalone or
YARN (both support client and cluster)? Accordingly what is the number of
executors/cores requested?

TD

On Wed, Dec 31, 2014 at 10:36 AM, Enno Shioji eshi...@gmail.com wrote:

 Also the job was deployed from the master machine in the cluster.
 ᐧ

 On Wed, Dec 31, 2014 at 6:35 PM, Enno Shioji eshi...@gmail.com wrote:

 Oh sorry that was a edit mistake. The code is essentially:

  val msgStream = kafkaStream
.map { case (k, v) = v}
.map(DatatypeConverter.printBase64Binary)
.saveAsTextFile(s3n://some.bucket/path, classOf[LzoCodec])

 I.e. there is essentially no original code (I was calling saveAsTextFile
 in a save function but that was just a remnant from previous debugging).


 ᐧ

 On Wed, Dec 31, 2014 at 6:21 PM, Sean Owen so...@cloudera.com wrote:

 -dev, +user

 A decent guess: Does your 'save' function entail collecting data back
 to the driver? and are you running this from a machine that's not in
 your Spark cluster? Then in client mode you're shipping data back to a
 less-nearby machine, compared to with cluster mode. That could explain
 the bottleneck.

 On Wed, Dec 31, 2014 at 4:12 PM, Enno Shioji eshi...@gmail.com wrote:
  Hi,
 
  I have a very, very simple streaming job. When I deploy this on the
 exact
  same cluster, with the exact same parameters, I see big (40%)
 performance
  difference between client and cluster deployment mode. This seems
 a bit
  surprising.. Is this expected?
 
  The streaming job is:
 
  val msgStream = kafkaStream
.map { case (k, v) = v}
.map(DatatypeConverter.printBase64Binary)
.foreachRDD(save)
.saveAsTextFile(s3n://some.bucket/path, classOf[LzoCodec])
 
  I tried several times, but the job deployed with client mode can only
  write at 60% throughput of the job deployed with cluster mode and
 this
  happens consistently. I'm logging at INFO level, but my application
 code
  doesn't log anything so it's only Spark logs. The logs I see in
 client
  mode doesn't seem like a crazy amount.
 
  The setup is:
  spark-ec2 [...] \
--copy-aws-credentials \
--instance-type=m3.2xlarge \
-s 2 launch test_cluster
 
  And all the deployment was done from the master machine.
 
  ᐧ






Re: Spark Streaming: HiveContext within Custom Actor

2014-12-30 Thread Tathagata Das
I am not sure that can be done. Receivers are designed to be run only
on the executors/workers, whereas a SQLContext (for using Spark SQL)
can only be defined on the driver.


On Mon, Dec 29, 2014 at 6:45 PM, sranga sra...@gmail.com wrote:
 Hi

 Could Spark-SQL be used from within a custom actor that acts as a receiver
 for a streaming application? If yes, what is the recommended way of passing
 the SparkContext to the actor?
 Thanks for your help.


 - Ranga



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-HiveContext-within-Custom-Actor-tp20892.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: word count aggregation

2014-12-30 Thread Tathagata Das
For windows that large (1 hour), you will probably also have to
increase the batch interval for efficiency.

TD

On Mon, Dec 29, 2014 at 12:16 AM, Akhil Das ak...@sigmoidanalytics.com wrote:
 You can use reduceByKeyAndWindow for that. Here's a pretty clean example
 https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala

 Thanks
 Best Regards

 On Mon, Dec 29, 2014 at 1:30 PM, Hoai-Thu Vuong thuv...@gmail.com wrote:

 dear user of spark

 I've got a program, streaming a folder, when a new file is created in this
 folder, I count a word, which appears in this document and update it (I used
 StatefulNetworkWordCount to do it). And it work like charm. However, I would
 like to know the different of top 10 word at now and at time (one hour
 before). How could I do it? I try to use windowDuration, but it seem not
 work.



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Kafka + Spark streaming

2014-12-30 Thread Tathagata Das
1. Of course, a single block / partition has many Kafka messages, and
from different Kafka topics interleaved together. The message count is
not related to the block count. Any message received within a
particular block interval will go in the same block.

2. Yes, the receiver will be started on another worker.

TD


On Tue, Dec 30, 2014 at 2:19 PM, SamyaMaiti samya.maiti2...@gmail.com wrote:
 Hi Experts,

 Few general Queries :

 1. Can a single block/partition in a RDD have more than 1 kafka message? or
 there will be one  only one kafka message per block? In a more broader way,
 is the message count related to block in any way or its just that any
 message received with in a particular block interval will go in the same
 block.

 2. If a worker goes down which runs the Receiver for Kafka, Will the
 receiver be restarted on some other worker?

 Regards,
 Sam



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Spark-streaming-tp20914.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SPARK-streaming app running 10x slower on YARN vs STANDALONE cluster

2014-12-30 Thread Tathagata Das
Thats is kind of expected due to data locality. Though you should see
some tasks running on the executors as the data gets replicated to
other nodes and can therefore run tasks based on locality. You have
two solutions

1. kafkaStream.repartition() to explicitly repartition the received
data across the cluster.
2. Create multiple kafka streams and union them together.

See 
http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-processing-time-of-each-batch

On Tue, Dec 30, 2014 at 1:43 AM, Mukesh Jha me.mukesh@gmail.com wrote:
 Thanks Sandy, It was the issue with the no of cores.

 Another issue I was facing is that tasks are not getting distributed evenly
 among all executors and are running on the NODE_LOCAL locality level i.e.
 all the tasks are running on the same executor where my kafkareceiver(s) are
 running even though other executors are idle.

 I configured spark.locality.wait=50 instead of the default 3000 ms, which
 forced the task rebalancing among nodes, let me know if there is a better
 way to deal with this.


 On Tue, Dec 30, 2014 at 12:09 AM, Mukesh Jha me.mukesh@gmail.com
 wrote:

 Makes sense, I've also tries it in standalone mode where all 3 workers 
 driver were running on the same 8 core box and the results were similar.

 Anyways I will share the results in YARN mode with 8 core yarn containers.

 On Mon, Dec 29, 2014 at 11:58 PM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 When running in standalone mode, each executor will be able to use all 8
 cores on the box.  When running on YARN, each executor will only have access
 to 2 cores.  So the comparison doesn't seem fair, no?

 -Sandy

 On Mon, Dec 29, 2014 at 10:22 AM, Mukesh Jha me.mukesh@gmail.com
 wrote:

 Nope, I am setting 5 executors with 2  cores each. Below is the command
 that I'm using to submit in YARN mode. This starts up 5 executor nodes and 
 a
 drives as per the spark  application master UI.

 spark-submit --master yarn-cluster --num-executors 5 --driver-memory
 1024m --executor-memory 1024m --executor-cores 2 --class
 com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar vm.cloud.com:2181/kafka
 spark-yarn avro 1 5000

 On Mon, Dec 29, 2014 at 11:45 PM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 *oops, I mean are you setting --executor-cores to 8

 On Mon, Dec 29, 2014 at 10:15 AM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Are you setting --num-executors to 8?

 On Mon, Dec 29, 2014 at 10:13 AM, Mukesh Jha me.mukesh@gmail.com
 wrote:

 Sorry Sandy, The command is just for reference but I can confirm that
 there are 4 executors and a driver as shown in the spark UI page.

 Each of these machines is a 8 core box with ~15G of ram.

 On Mon, Dec 29, 2014 at 11:23 PM, Sandy Ryza
 sandy.r...@cloudera.com wrote:

 Hi Mukesh,

 Based on your spark-submit command, it looks like you're only
 running with 2 executors on YARN.  Also, how many cores does each 
 machine
 have?

 -Sandy

 On Mon, Dec 29, 2014 at 4:36 AM, Mukesh Jha
 me.mukesh@gmail.com wrote:

 Hello Experts,
 I'm bench-marking Spark on YARN
 (https://spark.apache.org/docs/latest/running-on-yarn.html) vs a 
 standalone
 spark cluster 
 (https://spark.apache.org/docs/latest/spark-standalone.html).
 I have a standalone cluster with 3 executors, and a spark app
 running on yarn with 4 executors as shown below.

 The spark job running inside yarn is 10x slower than the one
 running on the standalone cluster (even though the yarn has more 
 number of
 workers), also in both the case all the executors are in the same 
 datacenter
 so there shouldn't be any latency. On YARN each 5sec batch is reading 
 data
 from kafka and processing it in 5sec  on the standalone cluster each 
 5sec
 batch is getting processed in 0.4sec.
 Also, In YARN mode all the executors are not getting used up evenly
 as vm-13  vm-14 are running most of the tasks whereas in the 
 standalone
 mode all the executors are running the tasks.

 Do I need to set up some configuration to evenly distribute the
 tasks? Also do you have any pointers on the reasons the yarn job is 
 10x
 slower than the standalone job?
 Any suggestion is greatly appreciated, Thanks in advance.

 YARN(5 workers + driver)
 
 Executor ID Address RDD Blocks Memory Used DU AT FT CT TT TT Input
 ShuffleRead ShuffleWrite Thread Dump
 1 vm-18.cloud.com:51796 0 0.0B/530.3MB 0.0 B 1 0 16 17 634 ms 0.0 B
 2047.0 B 1710.0 B Thread Dump
 2 vm-13.cloud.com:57264 0 0.0B/530.3MB 0.0 B 0 0 1427 1427 5.5 m
 0.0 B 0.0 B 0.0 B Thread Dump
 3 vm-14.cloud.com:54570 0 0.0B/530.3MB 0.0 B 0 0 1379 1379 5.2 m
 0.0 B 1368.0 B 2.8 KB Thread Dump
 4 vm-11.cloud.com:56201 0 0.0B/530.3MB 0.0 B 0 0 10 10 625 ms 0.0 B
 1368.0 B 1026.0 B Thread Dump
 5 vm-5.cloud.com:42958 0 0.0B/530.3MB 0.0 B 0 0 22 22 632 ms 0.0 B
 1881.0 B 2.8 KB Thread Dump
 driver vm.cloud.com:51847 0 0.0B/530.0MB 0.0 B 0 0 0 0 0 ms 0.0 B
 0.0 B 0.0 B Thread Dump

 /homext/spark/bin/spark-submit
 --master yarn-cluster 

Re: Gradual slow down of the Streaming job (getCallSite at DStream.scala:294)

2014-12-30 Thread Tathagata Das
Which version of Spark Streaming are you using.

When the batch processing time increases to 15-20 seconds, could you
compare the task times compared to the tasks time when the application
is just launched? Basically is the increase from 6 seconds to 15-20
seconds is caused by increase in computation or because of GC's etc.

On Tue, Dec 30, 2014 at 1:41 PM, RK prk...@yahoo.com.invalid wrote:
 Here is the code for my streaming job.

 ~~
 val sparkConf = new SparkConf().setAppName(SparkStreamingJob)
 sparkConf.set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)
 sparkConf.set(spark.default.parallelism, 100)
 sparkConf.set(spark.shuffle.consolidateFiles, true)
 sparkConf.set(spark.speculation, true)
 sparkConf.set(spark.speculation.interval, 5000)
 sparkConf.set(spark.speculation.quantile, 0.9)
 sparkConf.set(spark.speculation.multiplier, 3)
 sparkConf.set(spark.mesos.coarse, true)
 sparkConf.set(spark.executor.extraJavaOptions, -verbose:gc
 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseConcMarkSweepGC)
 sparkConf.set(spark.shuffle.manager, SORT)

 val ssc = new StreamingContext(sparkConf, Seconds(10))
 ssc.checkpoint(checkpointDir)

 val topics = trace
 val numThreads = 1
 val topicMap = topics.split(,).map((_,numThreads)).toMap

 val kafkaPartitions = 20
 val kafkaDStreams = (1 to kafkaPartitions).map { _ =
   KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
 }

 val lines = ssc.union(kafkaDStreams)
 val words = lines.map(line = doSomething_1(line))
 val filteredWords = words.filter(word = word != test)
 val groupedWords = filteredWords.map(word = (word, 1))

 val windowedWordCounts = groupedWords.reduceByKeyAndWindow(_ + _, _ - _,
 Seconds(30), Seconds(10))
 val windowedWordsFiltered = windowedWordCounts.filter{case (word, count) =
 count  50}
 val finalResult = windowedWordsFiltered.foreachRDD(words =
 doSomething_2(words))

 ssc.start()
 ssc.awaitTermination()
 ~~

 I am running this job on a 9 slave AWS EC2 cluster with each slave node has
 32 vCPU  60GB memory.

 When I start this job, the processing time is usually around 5 - 6 seconds
 for the 10 seconds batch and the scheduling delay is around 0 seconds or a
 few ms. However, as the job run for 6 - 8 hours, the processing time
 increases to 15 - 20 seconds but the scheduling delay is increasing to 4 - 6
 hours.

 When I look at the completed stages, I see that the time taken for
 getCallSite at DStream.scala:294 keeps increasing as time passes by. It goes
 from around 2 seconds to more than a few minutes.

 Clicking on +details next to this stage description shows the following
 execution trace.
 org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1088)
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:294)
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:288)
 scala.Option.orElse(Option.scala:257)
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:285)
 org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
 org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
 org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
 scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
 org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
 org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)
 org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)
 scala.util.Try$.apply(Try.scala:161)
 org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:221)
 org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:165)

 When I click on one of these slow stages that executed after 6 - 8 hours, I
 find the following information for individual tasks inside.
 - All tasks seem to execute with PROCESS_LOCAL locality.
 - Quite a few of these tasks seem to spend anywhere between 30 - 80% of
 their time in GC. Although, when I look at the total memory usage on each of
 the slave nodes under executors information, I see that the usage is only
 around 200MB out of 20GB available.

 Even after a few hours, the map stages (val groupedWords =
 filteredWords.map(word = (word, 1))) seem to have consistent times as
 during the start of the job which seems to indicate 

Re: Help with updateStateByKey

2014-12-18 Thread Tathagata Das
Another point to start playing with updateStateByKey is the example
StatefulNetworkWordCount. See the streaming examples directory in the
Spark repository.

TD



On Thu, Dec 18, 2014 at 6:07 AM, Pierce Lamb
richard.pierce.l...@gmail.com wrote:
 I am trying to run stateful Spark Streaming computations over (fake)
 apache web server logs read from Kafka. The goal is to sessionize
 the web traffic similar to this blog post:
 http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionization-with-spark-streaming-and-apache-hadoop/

 The only difference is that I want to sessionize each page the IP
 hits, instead of the entire session. I was able to do this reading
 from a file of fake web traffic using Spark in batch mode, but now I
 want to do it in a streaming context.

 Log files are read from Kafka and parsed into K/V pairs of

 (String, (String, Long, Long)) or

 (IP, (requestPage, time, time))

 I then call groupByKey() on this K/V pair. In batch mode, this would
 produce a:

 (String, CollectionBuffer((String, Long, Long), ...) or

 (IP, CollectionBuffer((requestPage, time, time), ...)

 In a StreamingContext, it produces a:

 (String, ArrayBuffer((String, Long, Long), ...) like so:

 (183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000)))

 However, as the next microbatch (DStream) arrives, this information is
 discarded. Ultimately what I want is for that ArrayBuffer to fill up
 over time as a given IP continues to interact and to run some
 computations on its data to sessionize the page time. I believe the
 operator to make that happen is updateStateByKey. I'm having some
 trouble with this operator (I'm new to both Spark  Scala); any help
 is appreciated.

 Thus far:

 val grouped = ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey)


 def updateGroupByKey(
   a: Seq[(String, ArrayBuffer[(String,
 Long, Long)])],
   b: Option[(String, ArrayBuffer[(String,
 Long, Long)])]
   ): Option[(String, ArrayBuffer[(String,
 Long, Long)])] = {

   }

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming Python APIs?

2014-12-18 Thread Tathagata Das
A more updated version of the streaming programming guide is here

http://people.apache.org/~tdas/spark-1.2-temp/streaming-programming-guide.html

Please refer to this until we make the official release of Spark 1.2

TD

On Tue, Dec 16, 2014 at 3:50 PM, smallmonkey...@hotmail.com
smallmonkey...@hotmail.com wrote:
 Hi zhu:
 maybe there is not the python api for spark-stream
 baishuo
 
 smallmonkey...@hotmail.com


 From: Xiaoyong Zhu
 Date: 2014-12-15 10:52
 To: user@spark.apache.org
 Subject: Spark Streaming Python APIs?

 Hi spark experts



 Are there any Python APIs for Spark Streaming? I didn’t find the Python APIs
 in Spark Streaming programming guide..

 http://spark.apache.org/docs/latest/streaming-programming-guide.html



 Xiaoyong



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Read data from SparkStreaming from Java socket.

2014-12-12 Thread Tathagata Das
Yes, socketTextStream starts a TCP client that tries to connect to a
TCP server (localhost: in your case). If there is a server running
on that port that can send data to connected TCP connections, then you
will receive data in the stream.

Did you check out the quick example in the streaming programming guide?
http://spark.apache.org/docs/latest/streaming-programming-guide.html
That has instructions to start a netcat server on port  and send
data to spark streaming through that.

TD

On Fri, Dec 12, 2014 at 9:54 PM, Akhil Das ak...@sigmoidanalytics.com wrote:
 socketTextStream is Socket client which will read from a TCP ServerSocket.

 Thanks
 Best Regards

 On Fri, Dec 12, 2014 at 7:21 PM, Guillermo Ortiz konstt2...@gmail.com
 wrote:

 I dont' understand what spark streaming socketTextStream is waiting...
 is it like a server so you just have to send data from a client?? or
 what's it excepting?

 2014-12-12 14:19 GMT+01:00 Akhil Das ak...@sigmoidanalytics.com:
  I have created a Serversocket program which you can find over here
  https://gist.github.com/akhld/4286df9ab0677a555087 It simply listens to
  the
  given port and when the client connects, it will send the contents of
  the
  given file. I'm attaching the executable jar also, you can run the jar
  as:
 
  java -jar SocketBenchmark.jar student 12345 io
 
  Here student is the file which will be sent to the client whoever
  connects
  on 12345, i have it tested and is working with SparkStreaming
  (socketTextStream).
 
 
  Thanks
  Best Regards
 
  On Fri, Dec 12, 2014 at 6:25 PM, Guillermo Ortiz konstt2...@gmail.com
  wrote:
 
  Hi,
 
  I'm a newbie with Spark,, I'm just trying to use SparkStreaming and
  filter some data sent with a Java Socket but it's not working... it
  works when I use ncat
 
  Why is it not working??
 
  My sparkcode is just this:
  val sparkConf = new
  SparkConf().setMaster(local[2]).setAppName(Test)
  val ssc = new StreamingContext(sparkConf, Seconds(5))
  val lines = ssc.socketTextStream(localhost, )
  val errorLines = lines.filter(_.contains(hello))
  errorLines.print()
 
  I created a client socket which sends data to that port, but it could
  connect any address, I guess that Spark doesn't work like a
  serverSocket... what's the way to send data from a socket with Java to
  be able to read from socketTextStream??
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Key not valid / already cancelled using Spark Streaming

2014-12-11 Thread Tathagata Das
Following Gerard's thoughts, here are possible things that could be happening.

1. Is there another process in the background that is deleting files
in the directory where you are trying to write? Seems like the
temporary file generated by one of the tasks is getting delete before
it is renamed to the final output file. I suggest trying to not write
to S3, rather just count and print (with rest of the computation
staying exactly same) and see if the error still occurs. That would
narrow down the culprit to what Gerard suggested.
2. Do you have speculative execution turned on? If so, could you turn
it off and try?

TD

On Thu, Dec 11, 2014 at 1:42 AM, Gerard Maas gerard.m...@gmail.com wrote:
 If the timestamps in the logs are to be trusted It looks like your driver is
 dying with that java.io.FileNotFoundException: and therefore the workers
 loose their connection and close down.

 -kr, Gerard.

 On Thu, Dec 11, 2014 at 7:39 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Try to add the following to the sparkConf

  .set(spark.core.connection.ack.wait.timeout,6000)

   .set(spark.akka.frameSize,60)

 Used to face that issue with spark 1.1.0

 Thanks
 Best Regards

 On Thu, Dec 11, 2014 at 2:14 AM, Flávio Santos
 bar...@chaordicsystems.com wrote:

 Dear Spark'ers,

 I'm trying to run a simple job using Spark Streaming (version 1.1.1) and
 YARN (yarn-cluster), but unfortunately I'm facing many issues. In short, my
 job does the following:
 - Consumes a specific Kafka topic
 - Writes its content to S3 or HDFS

 Records in Kafka are in the form:
 {key: someString}

 This is important because I use the value of key to define the output
 file name in S3.
 Here are the Spark and Kafka parameters I'm using:

 val sparkConf = new SparkConf()
   .setAppName(MyDumperApp)
   .set(spark.task.maxFailures, 100)
   .set(spark.hadoop.validateOutputSpecs, false)
   .set(spark.serializer, org.apache.spark.serializer.KryoSerializer)
   .set(spark.executor.extraJavaOptions, -XX:+UseCompressedOops)
 val kafkaParams = Map(
   zookeeper.connect - zkQuorum,
   zookeeper.session.timeout.ms - 1,
   rebalance.backoff.ms - 8000,
   rebalance.max.retries - 10,
   group.id - group,
   auto.offset.reset - largest
 )


 My application is the following:

 KafkaUtils.createStream[String, String, StringDecoder,
 StringDecoder](ssc, kafkaParams, Map(topic - 1),
 StorageLevel.MEMORY_AND_DISK_SER_2)
   .foreachRDD((rdd, time) =
 rdd.map {
   case (_, line) =
 val json = parse(line)
 val key = extract(json, key).getOrElse(key_not_found)
 (key, dateFormatter.format(time.milliseconds)) - line
 }
   .partitionBy(new HashPartitioner(10))
   .saveAsHadoopFile[KeyBasedOutput[(String,String),
 String]](s3://BUCKET, classOf[BZip2Codec])
   )


 And the last piece:

 class KeyBasedOutput[T : Null, V : AnyRef] extends
 MultipleTextOutputFormat[T , V] {
   override protected def generateFileNameForKeyValue(key: T, value: V,
 leaf: String) = key match {
 case (myKey, batchId) =
   somedir + / + myKey + / +
 prefix- + myKey + _ + batchId + _ + leaf
   }
   override protected def generateActualKey(key: T, value: V) = null
 }


 I use batch sizes of 5 minutes with checkpoints activated.
 The job fails nondeterministically (I think it never ran longer than ~5
 hours). I have no clue why, it simply fails.
 Please find below the exceptions thrown by my application.

 I really appreciate any kind of hint.
 Thank you very much in advance.

 Regards,
 -- Flávio

  Executor 1

 2014-12-10 19:05:15,150 INFO  [handle-read-write-executor-3]
 network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
 SendingConnection
  to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
 2014-12-10 19:05:15,201 INFO  [Thread-6] storage.MemoryStore
 (Logging.scala:logInfo(59)) - ensureFreeSpace(98665) called with
 curMem=194463488,
  maxMem=4445479895
 2014-12-10 19:05:15,202 INFO  [Thread-6] storage.MemoryStore
 (Logging.scala:logInfo(59)) - Block input-0-1418238315000 stored as bytes in
 memor
 y (estimated size 96.4 KB, free 4.0 GB)
 2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-2]
 network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
 ReceivingConnecti
 on to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443)
 2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-1]
 network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
 SendingConnection
  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443)
 2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-2]
 network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
 SendingConnection
  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443)
 2014-12-10 19:05:16,649 INFO  [handle-read-write-executor-0]
 network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
 ReceivingConnecti
 on to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644)
 

Re: Spark steaming : work with collect() but not without collect()

2014-12-11 Thread Tathagata Das
What does process do? Maybe when this process function is being run in
the Spark executor, it is causing the some static initialization,
which fails causing this exception. For Oracle documentation,
an ExceptionInInitializerError is thrown to indicate that an exception
occurred during evaluation of a static initializer or the initializer
for a static variable.

TD

On Thu, Dec 11, 2014 at 1:36 AM, Gerard Maas gerard.m...@gmail.com wrote:
 Have you tried with  kafkaStream.foreachRDD(rdd = {rdd.foreach(...)} ?
 Would that make a difference?


 On Thu, Dec 11, 2014 at 10:24 AM, david david...@free.fr wrote:

 Hi,

   We use the following Spark Streaming code to collect and process Kafka
 event :

 kafkaStream.foreachRDD(rdd = {
   rdd.collect().foreach(event = {
   process(event._1, event._2)
   })
 })

 This work fine.

 But without /collect()/ function, the following exception is raised for
 call
 to function process:
 *Loss was due to java.lang.ExceptionInInitializerError*


   We attempt to rewrite like this but the same exception is raised :

  kafkaStream.foreachRDD(rdd = {
   rdd.foreachPartition(iter =
 iter.foreach (event = {
 process(event._1, event._2)
   })
   )
 })


 Does anybody can explain to us why and how to solve this issue ?

 Thank's

 Regards






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-steaming-work-with-collect-but-not-without-collect-tp20622.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Session for connections?

2014-12-11 Thread Tathagata Das
You could create a lazily initialized singleton factory and connection
pool. Whenever an executor starts running the firt task that needs to
push out data, it will create the connection pool as a singleton. And
subsequent tasks running on the executor is going to use the
connection pool. You will also have to intelligently shutdown the
connections because there is not a obvious way to shut them down. You
could have a usage timeout - shutdown connection after not being used
for 10 x batch interval.

TD

On Thu, Dec 11, 2014 at 4:28 AM, Ashic Mahtab as...@live.com wrote:
 Hi,
 I was wondering if there's any way of having long running session type
 behaviour in spark. For example, let's say we're using Spark Streaming to
 listen to a stream of events. Upon receiving an event, we process it, and if
 certain conditions are met, we wish to send a message to rabbitmq. Now,
 rabbit clients have the concept of a connection factory, from which you
 create a connection, from which you create a channel. You use the channel to
 get a queue, and finally the queue is what you publish messages on.

 Currently, what I'm doing can be summarised as :

 dstream.foreachRDD(x = x.forEachPartition(y = {
val factory = ..
val connection = ...
val channel = ...
val queue = channel.declareQueue(...);

y.foreach(z = Processor.Process(z, queue));

cleanup the queue stuff.
 }));

 I'm doing the same thing for using Cassandra, etc. Now in these cases, the
 session initiation is expensive, so foing it per message is not a good idea.
 However, I can't find a way to say hey...do this per worker once and only
 once.

 Is there a better pattern to do this?

 Regards,
 Ashic.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: KafkaUtils explicit acks

2014-12-11 Thread Tathagata Das
I am updating the docs right now. Here is a staged copy that you can
have sneak peek of. This will be part of the Spark 1.2.

http://people.apache.org/~tdas/spark-1.2-temp/streaming-programming-guide.html

The updated fault-tolerance section tries to simplify the explanation
of when and what data can be lost, and how to prevent that using the
new experimental feature of write ahead logs.
Any feedback will be much appreciated.

TD

On Wed, Dec 10, 2014 at 2:42 AM,  francois.garil...@typesafe.com wrote:
 [sorry for the botched half-message]

 Hi Mukesh,

 There’s been some great work on Spark Streaming reliability lately.
 https://www.youtube.com/watch?v=jcJq3ZalXD8
 Look at the links from:
 https://issues.apache.org/jira/browse/SPARK-3129

 I’m not aware of any doc yet (did I miss something ?) but you can look at
 the ReliableKafkaReceiver’s test suite:

 external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala

 —
 FG


 On Wed, Dec 10, 2014 at 11:17 AM, Mukesh Jha me.mukesh@gmail.com
 wrote:

 Hello Guys,

 Any insights on this??
 If I'm not clear enough my question is how can I use kafka consumer and
 not loose any data in cases of failures with spark-streaming.

 On Tue, Dec 9, 2014 at 2:53 PM, Mukesh Jha me.mukesh@gmail.com
 wrote:

 Hello Experts,

 I'm working on a spark app which reads data from kafka  persists it in
 hbase.

 Spark documentation states the below [1] that in case of worker failure
 we can loose some data. If not how can I make my kafka stream more reliable?
 I have seen there is a simple consumer [2] but I'm not sure if it has
 been used/tested extensively.

 I was wondering if there is a way to explicitly acknowledge the kafka
 offsets once they are replicated in memory of other worker nodes (if it's
 not already done) to tackle this issue.

 Any help is appreciated in advance.


 Using any input source that receives data through a network - For
 network-based data sources like Kafka and Flume, the received input data is
 replicated in memory between nodes of the cluster (default replication
 factor is 2). So if a worker node fails, then the system can recompute the
 lost from the the left over copy of the input data. However, if the worker
 node where a network receiver was running fails, then a tiny bit of data may
 be lost, that is, the data received by the system but not yet replicated to
 other node(s). The receiver will be started on a different node and it will
 continue to receive data.
 https://github.com/dibbhatt/kafka-spark-consumer

 Txz,

 Mukesh Jha




 --


 Thanks  Regards,

 Mukesh Jha



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Session for connections?

2014-12-11 Thread Tathagata Das
Also, this is covered in the streaming programming guide in bits and pieces.
http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd

On Thu, Dec 11, 2014 at 4:55 AM, Ashic Mahtab as...@live.com wrote:
 That makes sense. I'll try that.

 Thanks :)

 From: tathagata.das1...@gmail.com
 Date: Thu, 11 Dec 2014 04:53:01 -0800
 Subject: Re: Session for connections?
 To: as...@live.com
 CC: user@spark.apache.org


 You could create a lazily initialized singleton factory and connection
 pool. Whenever an executor starts running the firt task that needs to
 push out data, it will create the connection pool as a singleton. And
 subsequent tasks running on the executor is going to use the
 connection pool. You will also have to intelligently shutdown the
 connections because there is not a obvious way to shut them down. You
 could have a usage timeout - shutdown connection after not being used
 for 10 x batch interval.

 TD

 On Thu, Dec 11, 2014 at 4:28 AM, Ashic Mahtab as...@live.com wrote:
  Hi,
  I was wondering if there's any way of having long running session type
  behaviour in spark. For example, let's say we're using Spark Streaming
  to
  listen to a stream of events. Upon receiving an event, we process it,
  and if
  certain conditions are met, we wish to send a message to rabbitmq. Now,
  rabbit clients have the concept of a connection factory, from which you
  create a connection, from which you create a channel. You use the
  channel to
  get a queue, and finally the queue is what you publish messages on.
 
  Currently, what I'm doing can be summarised as :
 
  dstream.foreachRDD(x = x.forEachPartition(y = {
  val factory = ..
  val connection = ...
  val channel = ...
  val queue = channel.declareQueue(...);
 
  y.foreach(z = Processor.Process(z, queue));
 
  cleanup the queue stuff.
  }));
 
  I'm doing the same thing for using Cassandra, etc. Now in these cases,
  the
  session initiation is expensive, so foing it per message is not a good
  idea.
  However, I can't find a way to say hey...do this per worker once and
  only
  once.
 
  Is there a better pattern to do this?
 
  Regards,
  Ashic.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Locking for shared RDDs

2014-12-11 Thread Tathagata Das
Aditya, I think you have the mental model of spark streaming a little
off the mark. Unlike traditional streaming systems, where any kind of
state is mutable, SparkStreaming is designed on Sparks immutable RDDs.
Streaming data is received and divided into immutable blocks, then
form immutable RDDs, and then transformations form new immutable RDDs.
Its best that you first read the Spark paper and then the Spark
Streaming paper to under the model. Once you understand that, you will
realize that since everything is immutable, the question of
consistency does not even arise :)

TD

On Mon, Dec 8, 2014 at 9:44 PM, Raghavendra Pandey
raghavendra.pan...@gmail.com wrote:
 You don't need to worry about locks as such as one thread/worker is
 responsible exclusively for one partition of the RDD. You can use
 Accumulator variables that spark provides to get the state updates.


 On Mon Dec 08 2014 at 8:14:28 PM aditya.athalye adbrihadarany...@gmail.com
 wrote:

 I am relatively new to Spark. I am planning to use Spark Streaming for my
 OLAP use case, but I would like to know how RDDs are shared between
 multiple
 workers.
 If I need to constantly compute some stats on the streaming data,
 presumably
 shared state would have to updated serially by different spark workers. Is
 this managed by Spark automatically or does the application need to ensure
 distributed locks are acquired?

 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Locking-for-shared-RDDs-tp20578.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Is there an efficient way to append new data to a registered Spark SQL Table?

2014-12-11 Thread Tathagata Das
First of all, how long do you want to keep doing this? The data is
going to increase infinitely and without any bounds, its going to get
too big for any cluster to handle. If all that is within bounds, then
try the following.

- Maintain a global variable having the current RDD storing all the
log data. We are going to keep updating this variable.
- Every batch interval, take new data and union it with the earlier
unified RDD (in the global variable) and update the global variable.
If you want sequel queries on this data, then you will have
re-register this new RDD as the named table.
- With this approach the number of partitions is going to increase
rapidly. So periodically take the unified RDD and repartition it to a
smaller set of partitions. This messes up the ordering of data, but
you maybe fine with if your queries are order agnostic. Also,
periodically, checkpoint this RDD, otherwise the lineage is going to
grow indefinitely and everything will start getting slower.

Hope this helps.

TD

On Mon, Dec 8, 2014 at 6:29 PM, Xuelin Cao xuelin...@yahoo.com.invalid wrote:

 Hi,

   I'm wondering whether there is an  efficient way to continuously
 append new data to a registered spark SQL table.

   This is what I want:
   I want to make an ad-hoc query service to a json formated system log.
 Certainly, the system log is continuously generated. I will use spark
 streaming to connect the system log as my input, and I want to find a way to
 effectively append the new data into an existed spark SQL table. Further
 more, I want the whole table being cached in memory/tachyon.

   It looks like spark sql supports the INSERT method, but only for
 parquet file. In addition, it is inefficient to insert a single row every
 time.

   I do know that somebody build a similar system that I want (ad-hoc
 query service to a on growing system log). So, there must be an efficient
 way. Anyone knows?



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



<    1   2   3   4   5   6   7   8   9   >