Re: Simple but faster data streaming
I am afraid not. The whole point of Spark Streaming is to make it easy to do complicated processing on streaming data while interoperating with core Spark, MLlib, SQL without the operational overheads of maintain 4 different systems. As a slight cost of achieving that unification, there maybe some overheads compared to specialized systems that are designed to one specific thing. If you have to do something simple that could have been done using Flume, then the resources needed by the Spark Streaming program shouldn't be too high. Can you provide more details? TD On Thu, Apr 2, 2015 at 11:51 AM, Harut Martirosyan harut.martiros...@gmail.com wrote: Hi guys. Is there a more lightweight way of stream processing with Spark? What we want is a simpler way, preferably with no scheduling, which just streams the data to destinations multiple. We extensively use Spark Core, SQL, Streaming, GraphX, so it's our main tool and don't want to add new things to the stack like Storm or Flume, but from other side, it really takes much more resources on same streaming than our previous setup with Flume, especially if we have multiple destinations (triggers multiple actions/scheduling) -- RGRDZ Harut
Re: Spark Streaming FileStream Nested File Support
Yes, definitely can be added. Just haven't gotten around to doing it :) There are proposals for this that you can try - https://github.com/apache/spark/pull/2765/files . Have you review it at some point. On Fri, Apr 3, 2015 at 1:08 PM, Adam Ritter adamge...@gmail.com wrote: That doesn't seem like a good solution unfortunately as I would be needing this to work in a production environment. Do you know why the limitation exists for FileInputDStream in the first place? Unless I'm missing something important about how some of the internals work I don't see why this feature could be added in at some point. On Fri, Apr 3, 2015 at 12:47 PM, Tathagata Das t...@databricks.com wrote: I sort-a-hacky workaround is to use a queueStream where you can manually create RDDs (using sparkContext.hadoopFile) and insert into the queue. Note that this is for testing only as queueStream does not work with driver fautl recovery. TD On Fri, Apr 3, 2015 at 12:23 PM, adamgerst adamge...@gmail.com wrote: So after pulling my hair out for a bit trying to convert one of my standard spark jobs to streaming I found that FileInputDStream does not support nested folders (see the brief mention here http://spark.apache.org/docs/latest/streaming-programming-guide.html#basic-sources the fileStream method returns a FileInputDStream). So before, for my standard job, I was reading from say s3n://mybucket/2015/03/02/*log And could also modify it to simply get an entire months worth of logs. Since the logs are split up based upon their date, when the batch ran for the day, I simply passed in a parameter of the date to make sure I was reading the correct data But since I want to turn this job into a streaming job I need to simply do something like s3n://mybucket/*log This would totally work fine if it were a standard spark application, but fails for streaming. Is there anyway I can get around this limitation? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-FileStream-Nested-File-Support-tp22370.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: WordCount example
How many cores are present in the works allocated to the standalone cluster spark://ip-10-241-251-232:7077 ? On Fri, Apr 3, 2015 at 2:18 PM, Mohit Anchlia mohitanch...@gmail.com wrote: If I use local[2] instead of *URL:* spark://ip-10-241-251-232:7077 this seems to work. I don't understand why though because when I give spark://ip-10-241-251-232:7077 application seem to bootstrap successfully, just doesn't create a socket on port ? On Fri, Mar 27, 2015 at 10:55 AM, Mohit Anchlia mohitanch...@gmail.com wrote: I checked the ports using netstat and don't see any connections established on that port. Logs show only this: 15/03/27 13:50:48 INFO Master: Registering app NetworkWordCount 15/03/27 13:50:48 INFO Master: Registered app NetworkWordCount with ID app-20150327135048-0002 Spark ui shows: Running Applications IDNameCoresMemory per NodeSubmitted TimeUserStateDuration app-20150327135048-0002 http://54.69.225.94:8080/app?appId=app-20150327135048-0002 NetworkWordCount http://ip-10-241-251-232.us-west-2.compute.internal:4040/0512.0 MB2015/03/27 13:50:48ec2-userWAITING33 s Code looks like is being executed: java -cp .:* org.spark.test.WordCount spark://ip-10-241-251-232:7077 *public* *static* *void* doWork(String masterUrl){ SparkConf conf = *new* SparkConf().setMaster(masterUrl).setAppName( NetworkWordCount); JavaStreamingContext *jssc* = *new* JavaStreamingContext(conf, Durations. *seconds*(1)); JavaReceiverInputDStreamString lines = jssc.socketTextStream( localhost, ); System.*out*.println(Successfully created connection); *mapAndReduce*(lines); jssc.start(); // Start the computation jssc.awaitTermination(); // Wait for the computation to terminate } *public* *static* *void* main(String ...args){ *doWork*(args[0]); } And output of the java program after submitting the task: java -cp .:* org.spark.test.WordCount spark://ip-10-241-251-232:7077 Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/03/27 13:50:46 INFO SecurityManager: Changing view acls to: ec2-user 15/03/27 13:50:46 INFO SecurityManager: Changing modify acls to: ec2-user 15/03/27 13:50:46 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(ec2-user); users with modify permissions: Set(ec2-user) 15/03/27 13:50:46 INFO Slf4jLogger: Slf4jLogger started 15/03/27 13:50:46 INFO Remoting: Starting remoting 15/03/27 13:50:47 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkdri...@ip-10-241-251-232.us-west-2.compute.internal :60184] 15/03/27 13:50:47 INFO Utils: Successfully started service 'sparkDriver' on port 60184. 15/03/27 13:50:47 INFO SparkEnv: Registering MapOutputTracker 15/03/27 13:50:47 INFO SparkEnv: Registering BlockManagerMaster 15/03/27 13:50:47 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20150327135047-5399 15/03/27 13:50:47 INFO MemoryStore: MemoryStore started with capacity 3.5 GB 15/03/27 13:50:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/03/27 13:50:47 INFO HttpFileServer: HTTP File server directory is /tmp/spark-7e26df49-1520-4c77-b411-c837da59fa5b 15/03/27 13:50:47 INFO HttpServer: Starting HTTP Server 15/03/27 13:50:47 INFO Utils: Successfully started service 'HTTP file server' on port 57955. 15/03/27 13:50:47 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/03/27 13:50:47 INFO SparkUI: Started SparkUI at http://ip-10-241-251-232.us-west-2.compute.internal:4040 15/03/27 13:50:47 INFO AppClient$ClientActor: Connecting to master spark://ip-10-241-251-232:7077... 15/03/27 13:50:48 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20150327135048-0002 15/03/27 13:50:48 INFO NettyBlockTransferService: Server created on 58358 15/03/27 13:50:48 INFO BlockManagerMaster: Trying to register BlockManager 15/03/27 13:50:48 INFO BlockManagerMasterActor: Registering block manager ip-10-241-251-232.us-west-2.compute.internal:58358 with 3.5 GB RAM, BlockManagerId(driver, ip-10-241-251-232.us-west-2.compute.internal, 58358) 15/03/27 13:50:48 INFO BlockManagerMaster: Registered BlockManager 15/03/27 13:50:48 INFO SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0 15/03/27 13:50:48 INFO ReceiverTracker: ReceiverTracker started 15/03/27 13:50:48 INFO ForEachDStream: metadataCleanupDelay = -1 15/03/27 13:50:48 INFO ShuffledDStream: metadataCleanupDelay = -1 15/03/27 13:50:48 INFO MappedDStream: metadataCleanupDelay = -1 15/03/27 13:50:48 INFO FlatMappedDStream: metadataCleanupDelay = -1 15/03/27 13:50:48 INFO SocketInputDStream: metadataCleanupDelay = -1 15/03/27 13:50:48 INFO SocketInputDStream: Slide time = 1000 ms 15/03/27 13:50:48 INFO SocketInputDStream: Storage level =
Re: Spark + Kinesis
Just remove provided for spark-streaming-kinesis-asl libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 On Fri, Apr 3, 2015 at 12:45 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Thanks. So how do I fix it? ᐧ On Fri, Apr 3, 2015 at 3:43 PM, Kelly, Jonathan jonat...@amazon.com wrote: spark-streaming-kinesis-asl is not part of the Spark distribution on your cluster, so you cannot have it be just a provided dependency. This is also why the KCL and its dependencies were not included in the assembly (but yes, they should be). ~ Jonathan Kelly From: Vadim Bichutskiy vadim.bichuts...@gmail.com Date: Friday, April 3, 2015 at 12:26 PM To: Jonathan Kelly jonat...@amazon.com Cc: user@spark.apache.org user@spark.apache.org Subject: Re: Spark + Kinesis Hi all, Good news! I was able to create a Kinesis consumer and assemble it into an uber jar following http://spark.apache.org/docs/latest/streaming-kinesis-integration.html and example https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala . However when I try to spark-submit it I get the following exception: *Exception in thread main java.lang.NoClassDefFoundError: com/amazonaws/auth/AWSCredentialsProvider* Do I need to include KCL dependency in *build.sbt*, here's what it looks like currently: import AssemblyKeys._ name := Kinesis Consumer version := 1.0 organization := com.myconsumer scalaVersion := 2.11.5 libraryDependencies += org.apache.spark %% spark-core % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 % provided assemblySettings jarName in assembly := consumer-assembly.jar assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala=false) Any help appreciated. Thanks, Vadim On Thu, Apr 2, 2015 at 1:15 PM, Kelly, Jonathan jonat...@amazon.com wrote: It looks like you're attempting to mix Scala versions, so that's going to cause some problems. If you really want to use Scala 2.11.5, you must also use Spark package versions built for Scala 2.11 rather than 2.10. Anyway, that's not quite the correct way to specify Scala dependencies in build.sbt. Instead of placing the Scala version after the artifactId (like spark-core_2.10), what you actually want is to use just spark-core with two percent signs before it. Using two percent signs will make it use the version of Scala that matches your declared scalaVersion. For example: libraryDependencies += org.apache.spark %% spark-core % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 I think that may get you a little closer, though I think you're probably going to run into the same problems I ran into in this thread: https://www.mail-archive.com/user@spark.apache.org/msg23891.html I never really got an answer for that, and I temporarily moved on to other things for now. ~ Jonathan Kelly From: 'Vadim Bichutskiy' vadim.bichuts...@gmail.com Date: Thursday, April 2, 2015 at 9:53 AM To: user@spark.apache.org user@spark.apache.org Subject: Spark + Kinesis Hi all, I am trying to write an Amazon Kinesis consumer Scala app that processes data in the Kinesis stream. Is this the correct way to specify *build.sbt*: --- *import AssemblyKeys._* *name := Kinesis Consumer* *version := 1.0 organization := com.myconsumer scalaVersion := 2.11.5 libraryDependencies ++= Seq(org.apache.spark % spark-core_2.10 % 1.3.0 % provided, org.apache.spark % spark-streaming_2.10 % 1.3.0 org.apache.spark % spark-streaming-kinesis-asl_2.10 % 1.3.0)* * assemblySettings jarName in assembly := consumer-assembly.jar assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala=false)* In *project/assembly.sbt* I have only the following line: *addSbtPlugin(com.eed3si9n % sbt-assembly % 0.13.0)* I am using sbt 0.13.7. I adapted Example 7.7 in the Learning Spark book. Thanks, Vadim
Re: Spark Streaming FileStream Nested File Support
I sort-a-hacky workaround is to use a queueStream where you can manually create RDDs (using sparkContext.hadoopFile) and insert into the queue. Note that this is for testing only as queueStream does not work with driver fautl recovery. TD On Fri, Apr 3, 2015 at 12:23 PM, adamgerst adamge...@gmail.com wrote: So after pulling my hair out for a bit trying to convert one of my standard spark jobs to streaming I found that FileInputDStream does not support nested folders (see the brief mention here http://spark.apache.org/docs/latest/streaming-programming-guide.html#basic-sources the fileStream method returns a FileInputDStream). So before, for my standard job, I was reading from say s3n://mybucket/2015/03/02/*log And could also modify it to simply get an entire months worth of logs. Since the logs are split up based upon their date, when the batch ran for the day, I simply passed in a parameter of the date to make sure I was reading the correct data But since I want to turn this job into a streaming job I need to simply do something like s3n://mybucket/*log This would totally work fine if it were a standard spark application, but fails for streaming. Is there anyway I can get around this limitation? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-FileStream-Nested-File-Support-tp22370.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming Worker runs out of inodes
Are you saying that even with the spark.cleaner.ttl set your files are not getting cleaned up? TD On Thu, Apr 2, 2015 at 8:23 AM, andrem amesa...@gmail.com wrote: Apparently Spark Streaming 1.3.0 is not cleaning up its internal files and the worker nodes eventually run out of inodes. We see tons of old shuffle_*.data and *.index files that are never deleted. How do we get Spark to remove these files? We have a simple standalone app with one RabbitMQ receiver and a two node cluster (2 x r3large AWS instances). Batch interval is 10 minutes after which we process data and write results to DB. No windowing or state mgmt is used. I've poured over the documentation and tried setting the following properties but they have not helped. As a work around we're using a cron script that periodically cleans up old files but this has a bad smell to it. SPARK_WORKER_OPTS in spark-env.sh on every worker node spark.worker.cleanup.enabled true spark.worker.cleanup.interval spark.worker.cleanup.appDataTtl Also tried on the driver side: spark.cleaner.ttl spark.shuffle.consolidateFiles true -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Worker-runs-out-of-inodes-tp22355.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming 1.3 Kafka Direct Streams
We should be able to support that use case in the direct API. It may be as simple as allowing the users to pass on a function that returns the set of topic+partitions to read from. That is function (Time) = Set[TopicAndPartition] This gets called every batch interval before the offsets are decided. This would allow users to add topics, delete topics, modify partitions on the fly. What do you think Cody? On Wed, Apr 1, 2015 at 11:57 AM, Neelesh neele...@gmail.com wrote: Thanks Cody! On Wed, Apr 1, 2015 at 11:21 AM, Cody Koeninger c...@koeninger.org wrote: If you want to change topics from batch to batch, you can always just create a KafkaRDD repeatedly. The streaming code as it stands assumes a consistent set of topics though. The implementation is private so you cant subclass it without building your own spark. On Wed, Apr 1, 2015 at 1:09 PM, Neelesh neele...@gmail.com wrote: Thanks Cody, that was really helpful. I have a much better understanding now. One last question - Kafka topics are initialized once in the driver, is there an easy way of adding/removing topics on the fly? KafkaRDD#getPartitions() seems to be computed only once, and no way of refreshing them. Thanks again! On Wed, Apr 1, 2015 at 10:01 AM, Cody Koeninger c...@koeninger.org wrote: https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md The kafka consumers run in the executors. On Wed, Apr 1, 2015 at 11:18 AM, Neelesh neele...@gmail.com wrote: With receivers, it was pretty obvious which code ran where - each receiver occupied a core and ran on the workers. However, with the new kafka direct input streams, its hard for me to understand where the code that's reading from kafka brokers runs. Does it run on the driver (I hope not), or does it run on workers? Any help appreciated thanks! -neelesh
Re: Spark Streaming and JMS
Its not a built in component of Spark. However there is a spark-package for Apache Camel receiver which can integrate with JMS. http://spark-packages.org/package/synsys/spark I have not tried it but do check it out. TD On Wed, Apr 1, 2015 at 4:38 AM, danila danila.erma...@gmail.com wrote: Hi Tathagata do you know if JMS Reciever was introduced during last year as standard Spark component or somebody is developing it? Regards Danila -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-and-JMS-tp5371p22337.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Size of arbitrary state managed via DStream updateStateByKey
In the current state yes there will be performance issues. It can be done much more efficiently and we are working on doing that. TD On Wed, Apr 1, 2015 at 7:49 AM, Vinoth Chandar vin...@uber.com wrote: Hi all, As I understand from docs and talks, the streaming state is in memory as RDD (optionally checkpointable to disk). SPARK-2629 hints that this in memory structure is not indexed efficiently? I am wondering how my performance would be if the streaming state does not fit in memory (say 100GB state over 10GB total RAM), and I did random updates to different keys via updateStateByKey? (Would throwing in SSDs help out). I am picturing some kind of performance degeneration would happen akin to Linux/innoDB Buffer cache thrashing. But if someone can demystify this, that would be awesome. Thanks Vinoth
Re: Spark Streaming 1.3 Kafka Direct Streams
The challenge of opening up these internal classes to public (even with Developer API tag) is that it prevents us from making non-trivial changes without breaking API compatibility for all those who had subclassed. Its a tradeoff that is hard to optimize. That's why we favor exposing more optional parameters in the stable API (KafkaUtils) so that we can maintain binary compatibility with user code as well as allowing us to make non-trivial changes internally. That said, it may be worthwhile to actually take an optional compute function as a parameter through the KafkaUtils, as Cody suggested ( (Time, current offsets, kafka metadata, etc) = Option[KafkaRDD]). Worth thinking about its implications in the context of the driver restarts, etc (as those function will get called again on restart, and different return value from before can screw up semantics). TD On Wed, Apr 1, 2015 at 12:28 PM, Neelesh neele...@gmail.com wrote: +1 for subclassing. its more flexible if we can subclass the implementation classes. On Apr 1, 2015 12:19 PM, Cody Koeninger c...@koeninger.org wrote: As I said in the original ticket, I think the implementation classes should be exposed so that people can subclass and override compute() to suit their needs. Just adding a function from Time = Set[TopicAndPartition] wouldn't be sufficient for some of my current production use cases. compute() isn't really a function from Time = Option[KafkaRDD], it's a function from (Time, current offsets, kafka metadata, etc) = Option[KafkaRDD] I think it's more straightforward to give access to that additional state via subclassing than it is to add in more callbacks for every possible use case. On Wed, Apr 1, 2015 at 2:01 PM, Tathagata Das t...@databricks.com wrote: We should be able to support that use case in the direct API. It may be as simple as allowing the users to pass on a function that returns the set of topic+partitions to read from. That is function (Time) = Set[TopicAndPartition] This gets called every batch interval before the offsets are decided. This would allow users to add topics, delete topics, modify partitions on the fly. What do you think Cody? On Wed, Apr 1, 2015 at 11:57 AM, Neelesh neele...@gmail.com wrote: Thanks Cody! On Wed, Apr 1, 2015 at 11:21 AM, Cody Koeninger c...@koeninger.org wrote: If you want to change topics from batch to batch, you can always just create a KafkaRDD repeatedly. The streaming code as it stands assumes a consistent set of topics though. The implementation is private so you cant subclass it without building your own spark. On Wed, Apr 1, 2015 at 1:09 PM, Neelesh neele...@gmail.com wrote: Thanks Cody, that was really helpful. I have a much better understanding now. One last question - Kafka topics are initialized once in the driver, is there an easy way of adding/removing topics on the fly? KafkaRDD#getPartitions() seems to be computed only once, and no way of refreshing them. Thanks again! On Wed, Apr 1, 2015 at 10:01 AM, Cody Koeninger c...@koeninger.org wrote: https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md The kafka consumers run in the executors. On Wed, Apr 1, 2015 at 11:18 AM, Neelesh neele...@gmail.com wrote: With receivers, it was pretty obvious which code ran where - each receiver occupied a core and ran on the workers. However, with the new kafka direct input streams, its hard for me to understand where the code that's reading from kafka brokers runs. Does it run on the driver (I hope not), or does it run on workers? Any help appreciated thanks! -neelesh
Re: Could not compute split, block not found in Spark Streaming Simple Application
If it is deterministically reproducible, could you generate full DEBUG level logs, from the driver and the workers and give it to me? Basically I want to trace through what is happening to the block that is not being found. And can you tell what Cluster manager are you using? Spark Standalone, Mesos or YARN? On Fri, Mar 27, 2015 at 10:09 AM, Saiph Kappa saiph.ka...@gmail.com wrote: Hi, I am just running this simple example with machineA: 1 master + 1 worker machineB: 1 worker « val ssc = new StreamingContext(sparkConf, Duration(1000)) val rawStreams = (1 to numStreams).map(_ =ssc.rawSocketStream[String](host, port, StorageLevel.MEMORY_ONLY_SER)).toArray val union = ssc.union(rawStreams) union.filter(line = Random.nextInt(1) == 0).map(line = { var sum = BigInt(0) line.toCharArray.foreach(chr = sum += chr.toInt) fib2(sum) sum }).reduceByWindow(_+_, Seconds(1),Seconds(1)).map(s = s### result: $s).print() » And I'm getting the following exceptions: Log from machineB « 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task 132 15/03/27 16:21:35 INFO Executor: Running task 0.0 in stage 27.0 (TID 132) 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task 134 15/03/27 16:21:35 INFO Executor: Running task 2.0 in stage 27.0 (TID 134) 15/03/27 16:21:35 INFO TorrentBroadcast: Started reading broadcast variable 24 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task 136 15/03/27 16:21:35 INFO Executor: Running task 4.0 in stage 27.0 (TID 136) 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task 138 15/03/27 16:21:35 INFO Executor: Running task 6.0 in stage 27.0 (TID 138) 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task 140 15/03/27 16:21:35 INFO Executor: Running task 8.0 in stage 27.0 (TID 140) 15/03/27 16:21:35 INFO MemoryStore: ensureFreeSpace(1886) called with curMem=47117, maxMem=280248975 15/03/27 16:21:35 INFO MemoryStore: Block broadcast_24_piece0 stored as bytes in memory (estimated size 1886.0 B, free 267.2 MB) 15/03/27 16:21:35 INFO BlockManagerMaster: Updated info of block broadcast_24_piece0 15/03/27 16:21:35 INFO TorrentBroadcast: Reading broadcast variable 24 took 19 ms 15/03/27 16:21:35 INFO MemoryStore: ensureFreeSpace(3104) called with curMem=49003, maxMem=280248975 15/03/27 16:21:35 INFO MemoryStore: Block broadcast_24 stored as values in memory (estimated size 3.0 KB, free 267.2 MB) 15/03/27 16:21:35 ERROR Executor: Exception in task 8.0 in stage 27.0 (TID 140) java.lang.Exception: Could not compute split, block input-0-1427473262420 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:701) 15/03/27 16:21:35 ERROR Executor: Exception in task 6.0 in stage 27.0 (TID 138) java.lang.Exception: Could not compute split, block input-0-1427473262418 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at
Re: spark streaming driver hang
Do you have the logs of the driver? Does that give any exceptions? TD On Fri, Mar 27, 2015 at 12:24 PM, Chen Song chen.song...@gmail.com wrote: I ran a spark streaming job. 100 executors 30G heap per executor 4 cores per executor The version I used is 1.3.0-cdh5.1.0. The job is reading from a directory on HDFS (with files incoming continuously) and does some join on the data. I set batch interval to be 15 minutes and the job worked fine in the first few batches. However, it just stalled after 7-8 batches. Below are some symptoms. * In Spark UI, every tab worked fine except Streaming tab. When I clicked on it, it just hang forever. * I did not see any GC activity on driver. * Nothing was printed out from driver log. Anyone has seen this before? -- Chen Song
Re: Strange JavaDeserialization error - java.lang.ClassNotFoundException: org/apache/spark/storage/StorageLevel
Does it fail with just Spark jobs (using storage levels) on non-coarse mode? TD On Fri, Mar 27, 2015 at 4:39 AM, Ondrej Smola ondrej.sm...@gmail.com wrote: More info when using *spark.mesos.coarse* everything works as expected. I think this must be a bug in spark-mesos integration. 2015-03-27 9:23 GMT+01:00 Ondrej Smola ondrej.sm...@gmail.com: It happens only when StorageLevel is used with 1 replica ( StorageLevel. MEMORY_ONLY_2,StorageLevel.MEMORY_AND_DISK_2) , StorageLevel.MEMORY_ONLY ,StorageLevel.MEMORY_AND_DISK works - the problems must be clearly somewhere between mesos-spark . From console I see that spark is trying to replicate to nodes - nodes show up in Mesos active tasks ... but they always fail with ClassNotFoundE. 2015-03-27 0:52 GMT+01:00 Tathagata Das t...@databricks.com: Could you try running a simpler spark streaming program with receiver (may be socketStream) and see if that works. TD On Thu, Mar 26, 2015 at 2:08 PM, Ondrej Smola ondrej.sm...@gmail.com wrote: Hi thanks for reply, yes I have custom receiver - but it has simple logic .. pop ids from redis queue - load docs based on ids from elastic and store them in spark. No classloader modifications. I am running multiple Spark batch jobs (with user supplied partitioning) and they have no problems, debug in local mode show no errors. 2015-03-26 21:47 GMT+01:00 Tathagata Das t...@databricks.com: Here are few steps to debug. 1. Try using replication from a Spark job: sc.parallelize(1 to 100, 100).persist(StorageLevel.MEMORY_ONLY_2).count() 2. If one works, then we know that there is probably nothing wrong with the Spark installation, and probably in the threads related to the receivers receiving the data. Are you writing a custom receiver? Are you somehow playing around with the class loader in the custom receiver? TD On Thu, Mar 26, 2015 at 10:59 AM, Ondrej Smola ondrej.sm...@gmail.com wrote: Hi, I am running spark streaming v 1.3.0 (running inside Docker) on Mesos 0.21.1. Spark streaming is started using Marathon - docker container gets deployed and starts streaming (from custom Actor). Spark binary is located on shared GlusterFS volume. Data is streamed from Elasticsearch/Redis. When new batch arrives Spark tries to replicate it but fails with following error : 15/03/26 14:50:00 INFO MemoryStore: Block broadcast_0 of size 2840 dropped from memory (free 278017782) 15/03/26 14:50:00 INFO BlockManager: Removing block broadcast_0_piece0 15/03/26 14:50:00 INFO MemoryStore: Block broadcast_0_piece0 of size 1658 dropped from memory (free 278019440) 15/03/26 14:50:00 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 15/03/26 14:50:00 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() on RPC id 7178767328921933569 java.lang.ClassNotFoundException: org/apache/spark/storage/StorageLevel at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:344) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:65) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:68) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:88) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:65) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead
Re: Strange JavaDeserialization error - java.lang.ClassNotFoundException: org/apache/spark/storage/StorageLevel
Seems like a bug, could you file a JIRA? @Tim: Patrick said you take a look at Mesos related issues. Could you take a look at this. Thanks! TD On Fri, Mar 27, 2015 at 1:25 PM, Ondrej Smola ondrej.sm...@gmail.com wrote: Yes, only when using fine grained mode and replication (StorageLevel.MEMORY_ONLY_2 etc). 2015-03-27 19:06 GMT+01:00 Tathagata Das t...@databricks.com: Does it fail with just Spark jobs (using storage levels) on non-coarse mode? TD On Fri, Mar 27, 2015 at 4:39 AM, Ondrej Smola ondrej.sm...@gmail.com wrote: More info when using *spark.mesos.coarse* everything works as expected. I think this must be a bug in spark-mesos integration. 2015-03-27 9:23 GMT+01:00 Ondrej Smola ondrej.sm...@gmail.com: It happens only when StorageLevel is used with 1 replica ( StorageLevel.MEMORY_ONLY_2,StorageLevel.MEMORY_AND_DISK_2) , StorageLevel.MEMORY_ONLY ,StorageLevel.MEMORY_AND_DISK works - the problems must be clearly somewhere between mesos-spark . From console I see that spark is trying to replicate to nodes - nodes show up in Mesos active tasks ... but they always fail with ClassNotFoundE. 2015-03-27 0:52 GMT+01:00 Tathagata Das t...@databricks.com: Could you try running a simpler spark streaming program with receiver (may be socketStream) and see if that works. TD On Thu, Mar 26, 2015 at 2:08 PM, Ondrej Smola ondrej.sm...@gmail.com wrote: Hi thanks for reply, yes I have custom receiver - but it has simple logic .. pop ids from redis queue - load docs based on ids from elastic and store them in spark. No classloader modifications. I am running multiple Spark batch jobs (with user supplied partitioning) and they have no problems, debug in local mode show no errors. 2015-03-26 21:47 GMT+01:00 Tathagata Das t...@databricks.com: Here are few steps to debug. 1. Try using replication from a Spark job: sc.parallelize(1 to 100, 100).persist(StorageLevel.MEMORY_ONLY_2).count() 2. If one works, then we know that there is probably nothing wrong with the Spark installation, and probably in the threads related to the receivers receiving the data. Are you writing a custom receiver? Are you somehow playing around with the class loader in the custom receiver? TD On Thu, Mar 26, 2015 at 10:59 AM, Ondrej Smola ondrej.sm...@gmail.com wrote: Hi, I am running spark streaming v 1.3.0 (running inside Docker) on Mesos 0.21.1. Spark streaming is started using Marathon - docker container gets deployed and starts streaming (from custom Actor). Spark binary is located on shared GlusterFS volume. Data is streamed from Elasticsearch/Redis. When new batch arrives Spark tries to replicate it but fails with following error : 15/03/26 14:50:00 INFO MemoryStore: Block broadcast_0 of size 2840 dropped from memory (free 278017782) 15/03/26 14:50:00 INFO BlockManager: Removing block broadcast_0_piece0 15/03/26 14:50:00 INFO MemoryStore: Block broadcast_0_piece0 of size 1658 dropped from memory (free 278019440) 15/03/26 14:50:00 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 15/03/26 14:50:00 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() on RPC id 7178767328921933569 java.lang.ClassNotFoundException: org/apache/spark/storage/StorageLevel at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:344) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:65) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:68) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:88) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:65) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103
Re: foreachRDD execution
Yes, that is the correct understanding. There are undocumented parameters that allow that, but I do not recommend using those :) TD On Wed, Mar 25, 2015 at 6:57 AM, Luis Ángel Vicente Sánchez langel.gro...@gmail.com wrote: I have a simple and probably dumb question about foreachRDD. We are using spark streaming + cassandra to compute concurrent users every 5min. Our batch size is 10secs and our block interval is 2.5secs. At the end of the world we are using foreachRDD to join the data in the RDD with existing data in Cassandra, update the counters and then save it back to Cassandra. To the best of my understanding, in this scenario, spark streaming produces one RDD every 10secs and foreachRDD executes them sequentially, that is, foreachRDD would never run in parallel. Am I right? Regards, Luis
Re: MappedStream vs Transform API
That's not super essential, and hence hasn't been done till now. Even in core Spark there are MappedRDD, etc. even though all of them can be implemented by MapPartitionedRDD (may be the name is wrong). So its nice to maintain the consistency, MappedDStream creates MappedRDDs. :) Though this does not eliminate the possibility that we will do it. Maybe in future, if we find that maintaining these different DStreams is becoming a maintenance burden (its isn't yet), we may collapse them to use transform. We did so in the python API for exactly this reason. If you are interested in contributing to Spark Streaming, i can point you to a number of issues where your contributions will be more valuable. TD On Tue, Mar 17, 2015 at 1:56 AM, madhu phatak phatak@gmail.com wrote: Hi, Thank you for the response. Can I give a PR to use transform for all the functions like map,flatMap etc so they are consistent with other API's?. Regards, Madhukara Phatak http://datamantra.io/ On Mon, Mar 16, 2015 at 11:42 PM, Tathagata Das t...@databricks.com wrote: It's mostly for legacy reasons. First we had added all the MappedDStream, etc. and then later we realized we need to expose something that is more generic for arbitrary RDD-RDD transformations. It can be easily replaced. However, there is a slight value in having MappedDStream, for developers to learn about DStreams. TD On Mon, Mar 16, 2015 at 3:37 AM, madhu phatak phatak@gmail.com wrote: Hi, Thanks for the response. I understand that part. But I am asking why the internal implementation using a subclass when it can use an existing api? Unless there is a real difference, it feels like code smell to me. Regards, Madhukara Phatak http://datamantra.io/ On Mon, Mar 16, 2015 at 2:14 PM, Shao, Saisai saisai.s...@intel.com wrote: I think these two ways are both OK for you to write streaming job, `transform` is a more general way for you to transform from one DStream to another if there’s no related DStream API (but have related RDD API). But using map maybe more straightforward and easy to understand. Thanks Jerry *From:* madhu phatak [mailto:phatak@gmail.com] *Sent:* Monday, March 16, 2015 4:32 PM *To:* user@spark.apache.org *Subject:* MappedStream vs Transform API Hi, Current implementation of map function in spark streaming looks as below. *def *map[U: ClassTag](mapFunc: T = U): DStream[U] = { *new *MappedDStream(*this*, context.sparkContext.clean(mapFunc)) } It creates an instance of MappedDStream which is a subclass of DStream. The same function can be also implemented using transform API *def map*[U: ClassTag](mapFunc: T = U): DStream[U] = this.transform(rdd = { rdd.map(mapFunc) }) Both implementation looks same. If they are same, is there any advantage having a subclass of DStream?. Why can't we just use transform API? Regards, Madhukara Phatak http://datamantra.io/
Re: why generateJob is a private API?
It was not really meant to be pubic and overridden. Because anything you want to do to generate jobs from RDDs can be done using DStream.foreachRDD On Sun, Mar 15, 2015 at 11:14 PM, madhu phatak phatak@gmail.com wrote: Hi, I am trying to create a simple subclass of DStream. If I understand correctly, I should override *compute *lazy operations and *generateJob* for actions. But when I try to override, generateJob it gives error saying method is private to the streaming package. Is my approach is correct or am I missing something? Regards, Madhukara Phatak http://datamantra.io/
Re: MappedStream vs Transform API
It's mostly for legacy reasons. First we had added all the MappedDStream, etc. and then later we realized we need to expose something that is more generic for arbitrary RDD-RDD transformations. It can be easily replaced. However, there is a slight value in having MappedDStream, for developers to learn about DStreams. TD On Mon, Mar 16, 2015 at 3:37 AM, madhu phatak phatak@gmail.com wrote: Hi, Thanks for the response. I understand that part. But I am asking why the internal implementation using a subclass when it can use an existing api? Unless there is a real difference, it feels like code smell to me. Regards, Madhukara Phatak http://datamantra.io/ On Mon, Mar 16, 2015 at 2:14 PM, Shao, Saisai saisai.s...@intel.com wrote: I think these two ways are both OK for you to write streaming job, `transform` is a more general way for you to transform from one DStream to another if there’s no related DStream API (but have related RDD API). But using map maybe more straightforward and easy to understand. Thanks Jerry *From:* madhu phatak [mailto:phatak@gmail.com] *Sent:* Monday, March 16, 2015 4:32 PM *To:* user@spark.apache.org *Subject:* MappedStream vs Transform API Hi, Current implementation of map function in spark streaming looks as below. *def *map[U: ClassTag](mapFunc: T = U): DStream[U] = { *new *MappedDStream(*this*, context.sparkContext.clean(mapFunc)) } It creates an instance of MappedDStream which is a subclass of DStream. The same function can be also implemented using transform API *def map*[U: ClassTag](mapFunc: T = U): DStream[U] = this.transform(rdd = { rdd.map(mapFunc) }) Both implementation looks same. If they are same, is there any advantage having a subclass of DStream?. Why can't we just use transform API? Regards, Madhukara Phatak http://datamantra.io/
Re: problems with spark-streaming-kinesis-asl and sbt assembly (different file contents found)
If you are creating an assembly, make sure spark-streaming is marked as provided. spark-streaming is already part of the spark installation so will be present at run time. That might solve some of these, may be!? TD On Mon, Mar 16, 2015 at 11:30 AM, Kelly, Jonathan jonat...@amazon.com wrote: I'm attempting to use the Spark Kinesis Connector, so I've added the following dependency in my build.sbt: libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 My app works fine with sbt run, but I can't seem to get sbt assembly to work without failing with different file contents found errors due to different versions of various packages getting pulled in to the assembly. This only occurs when I've added spark-streaming-kinesis-asl as a dependency. sbt assembly works fine otherwise. Here are the conflicts that I see: com.esotericsoftware.kryo:kryo:2.21 com.esotericsoftware.minlog:minlog:1.2 com.google.guava:guava:15.0 org.apache.spark:spark-network-common_2.10:1.3.0 (Note: The conflict is with javac.sh; why is this even getting included?) org.apache.spark:spark-streaming-kinesis-asl_2.10:1.3.0 org.apache.spark:spark-streaming_2.10:1.3.0 org.apache.spark:spark-core_2.10:1.3.0 org.apache.spark:spark-network-common_2.10:1.3.0 org.apache.spark:spark-network-shuffle_2.10:1.3.0 (Note: I'm actually using my own custom-built version of Spark-1.3.0 where I've upgraded to v1.9.24 of the AWS Java SDK, but that has nothing to do with all of these conflicts, as I upgraded the dependency *because* I was getting all of these conflicts with the Spark 1.3.0 artifacts from the central repo.) com.amazonaws:aws-java-sdk-s3:1.9.24 net.java.dev.jets3t:jets3t:0.9.3 commons-collections:commons-collections:3.2.1 commons-beanutils-commons-beanutils:1.7.0 commons-beanutils:commons-beanutils-core:1.8.0 commons-logging:commons-logging:1.1.3 org.slf4j:jcl-over-slf4j:1.7.10 (Note: The conflict is with a few package-info.class files, which seems really silly.) org.apache.hadoop:hadoop-yarn-common:2.4.0 org.apache.hadoop:hadoop-yarn-api:2.4.0 (Note: The conflict is with org/apache/spark/unused/UnusedStubClass.class, which seems even more silly.) org.apache.spark:spark-streaming-kinesis-asl_2.10:1.3.0 org.apache.spark:spark-streaming_2.10:1.3.0 org.apache.spark:spark-core_2.10:1.3.0 org.apache.spark:spark-network-common_2.10:1.3.0 org.spark-project.spark:unused:1.0.0 (?!?!?!) org.apache.spark:spark-network-shuffle_2.10:1.3.0 I can get rid of some of the conflicts by using excludeAll() to exclude artifacts with organization = org.apache.hadoop or organization = org.apache.spark and name = spark-streaming, and I might be able to resolve a few other conflicts this way, but the bottom line is that this is way more complicated than it should be, so either something is really broken or I'm just doing something wrong. Many of these don't even make sense to me. For example, the very first conflict is between classes in com.esotericsoftware.kryo:kryo:2.21 and in com.esotericsoftware.minlog:minlog:1.2, but the former *depends* upon the latter, so ??? It seems wrong to me that one package would contain different versions of the same classes that are included in one of its dependencies. I guess it doesn't make too much difference though if I could only get my assembly to include/exclude the right packages. I of course don't want any of the spark or hadoop dependencies included (other than spark-streaming-kinesis-asl itself), but I want all of spark-streaming-kinesis-asl's dependencies included (such as the AWS Java SDK and its dependencies). That doesn't seem to be possible without what I imagine will become an unruly and fragile exclusion list though. Thanks, Jonathan
Re: problems with spark-streaming-kinesis-asl and sbt assembly (different file contents found)
Can you give use your SBT project? Minus the source codes if you don't wish to expose them. TD On Mon, Mar 16, 2015 at 12:54 PM, Kelly, Jonathan jonat...@amazon.com wrote: Yes, I do have the following dependencies marked as provided: libraryDependencies += org.apache.spark %% spark-core % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-hive % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-sql % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming % 1.3.0 % provided However, spark-streaming-kinesis-asl has a compile time dependency on spark-streaming, so I think that causes it and its dependencies to be pulled into the assembly. I expected that simply excluding spark-streaming in the spark-streaming-kinesis-asl dependency would solve this problem, but it does not. That is, this doesn't work either: libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 exclude(org.apache.spark, spark-streaming) As I mentioned originally, the following solved some but not all conflicts: libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 excludeAll( ExclusionRule(organization = org.apache.hadoop), ExclusionRule(organization = org.apache.spark, name = spark-streaming) ) (Note that ExclusionRule(organization = org.apache.spark) without the name attribute does not work because that apparently causes it to exclude even spark-streaming-kinesis-asl.) Jonathan Kelly Elastic MapReduce - SDE Port 99 (SEA35) 08.220.C2 From: Tathagata Das t...@databricks.com Date: Monday, March 16, 2015 at 12:45 PM To: Jonathan Kelly jonat...@amazon.com Cc: user@spark.apache.org user@spark.apache.org Subject: Re: problems with spark-streaming-kinesis-asl and sbt assembly (different file contents found) If you are creating an assembly, make sure spark-streaming is marked as provided. spark-streaming is already part of the spark installation so will be present at run time. That might solve some of these, may be!? TD On Mon, Mar 16, 2015 at 11:30 AM, Kelly, Jonathan jonat...@amazon.com wrote: I'm attempting to use the Spark Kinesis Connector, so I've added the following dependency in my build.sbt: libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 My app works fine with sbt run, but I can't seem to get sbt assembly to work without failing with different file contents found errors due to different versions of various packages getting pulled in to the assembly. This only occurs when I've added spark-streaming-kinesis-asl as a dependency. sbt assembly works fine otherwise. Here are the conflicts that I see: com.esotericsoftware.kryo:kryo:2.21 com.esotericsoftware.minlog:minlog:1.2 com.google.guava:guava:15.0 org.apache.spark:spark-network-common_2.10:1.3.0 (Note: The conflict is with javac.sh; why is this even getting included?) org.apache.spark:spark-streaming-kinesis-asl_2.10:1.3.0 org.apache.spark:spark-streaming_2.10:1.3.0 org.apache.spark:spark-core_2.10:1.3.0 org.apache.spark:spark-network-common_2.10:1.3.0 org.apache.spark:spark-network-shuffle_2.10:1.3.0 (Note: I'm actually using my own custom-built version of Spark-1.3.0 where I've upgraded to v1.9.24 of the AWS Java SDK, but that has nothing to do with all of these conflicts, as I upgraded the dependency *because* I was getting all of these conflicts with the Spark 1.3.0 artifacts from the central repo.) com.amazonaws:aws-java-sdk-s3:1.9.24 net.java.dev.jets3t:jets3t:0.9.3 commons-collections:commons-collections:3.2.1 commons-beanutils-commons-beanutils:1.7.0 commons-beanutils:commons-beanutils-core:1.8.0 commons-logging:commons-logging:1.1.3 org.slf4j:jcl-over-slf4j:1.7.10 (Note: The conflict is with a few package-info.class files, which seems really silly.) org.apache.hadoop:hadoop-yarn-common:2.4.0 org.apache.hadoop:hadoop-yarn-api:2.4.0 (Note: The conflict is with org/apache/spark/unused/UnusedStubClass.class, which seems even more silly.) org.apache.spark:spark-streaming-kinesis-asl_2.10:1.3.0 org.apache.spark:spark-streaming_2.10:1.3.0 org.apache.spark:spark-core_2.10:1.3.0 org.apache.spark:spark-network-common_2.10:1.3.0 org.spark-project.spark:unused:1.0.0 (?!?!?!) org.apache.spark:spark-network-shuffle_2.10:1.3.0 I can get rid of some of the conflicts by using excludeAll() to exclude artifacts with organization = org.apache.hadoop or organization = org.apache.spark and name = spark-streaming, and I might be able to resolve a few other conflicts this way, but the bottom line is that this is way more complicated than it should be, so either something is really broken or I'm just doing something wrong. Many of these don't even make sense to me. For example, the very first conflict is between classes in com.esotericsoftware.kryo:kryo:2.21 and in com.esotericsoftware.minlog:minlog:1.2
RE: Handling worker batch processing during driver shutdown
Are you running the code before or after closing the spark context? It must be after stopping streaming context (without spark context) and before stopping spark context. Cc'ing Sean. He may have more insights. On Mar 13, 2015 11:19 AM, Jose Fernandez jfernan...@sdl.com wrote: Thanks, I am using 1.2.0 so it looks like I am affected by the bug you described. It also appears that the shutdown hook doesn’t work correctly when the driver is running in YARN. According to the logs it looks like the SparkContext is closed and the code you suggested is never executed and fails silently. I really appreciate your help, but it looks like I’m back to the drawing board on this one. *From:* Tathagata Das [mailto:t...@databricks.com] *Sent:* Thursday, March 12, 2015 7:53 PM *To:* Jose Fernandez *Cc:* user@spark.apache.org *Subject:* Re: Handling worker batch processing during driver shutdown What version of Spark are you using. You may be hitting a known but solved bug where the receivers would not get stop signal and (stopGracefully = true) would wait for a while for the receivers to stop indefinitely. Try setting stopGracefully to false and see if it works. This bug should have been solved in spark 1.2.1 https://issues.apache.org/jira/browse/SPARK-5035 TD On Thu, Mar 12, 2015 at 7:48 PM, Jose Fernandez jfernan...@sdl.com wrote: Thanks for the reply! Theoretically I should be able to do as you suggest as I follow the pool design pattern from the documentation, but I don’t seem to be able to run any code after .stop() is called. override def main(args: Array[String]) { // setup val ssc = new StreamingContext(sparkConf, Seconds(streamTime)) val inputStreams = (1 to numReceivers).map(i = ssc.receiverStream(custom receiver)) val messages = ssc.union(inputStreams) messages.foreachRDD { rdd = rdd.foreachPartition { p = val indexer = Indexer.getInstance() p.foreach(Indexer.process(_) match { case Some(entry) = indexer.index(entry) case None = }) Indexer.returnInstance(indexer) } } messages.print() sys.ShutdownHookThread { logInfo(** Shutdown hook triggered **) ssc.stop(false, true) logInfo(** Shutdown finished **) ssc.stop(true) } ssc.start() ssc.awaitTermination() } The first shutdown log message is always displayed, but the second message never does. I’ve tried multiple permutations of the stop function calls and even used try/catch around it. I’m running in yarn-cluster mode using Spark 1.2 on CDH 5.3. I stop the application with yarn application -kill appID. *From:* Tathagata Das [mailto:t...@databricks.com] *Sent:* Thursday, March 12, 2015 1:29 PM *To:* Jose Fernandez *Cc:* user@spark.apache.org *Subject:* Re: Handling worker batch processing during driver shutdown Can you access the batcher directly? Like is there is there a handle to get access to the batchers on the executors by running a task on that executor? If so, after the streamingContext has been stopped (not the SparkContext), then you can use `sc.makeRDD()` to run a dummy task like this. sc.makeRDD(1 to 1000, 1000).foreach { x = Batcher.get().flush() } With large number of tasks and no other jobs running in the system, at least one task will run in each executor and therefore will flush the batcher. TD On Thu, Mar 12, 2015 at 12:27 PM, Jose Fernandez jfernan...@sdl.com wrote: Hi folks, I have a shutdown hook in my driver which stops the streaming context cleanly. This is great as workers can finish their current processing unit before shutting down. Unfortunately each worker contains a batch processor which only flushes every X entries. We’re indexing to different indices in elasticsearch and using the bulk index request for performance. As far as Spark is concerned, once data is added to the batcher it is considered processed, so our workers are being shut down with data still in the batcher. Is there any way to coordinate the shutdown with the workers? I haven’t had any luck searching for a solution online. I would appreciate any suggestions you may have. Thanks :) http://www.sdl.com/innovate/sanfran SDL PLC confidential, all rights reserved. If you are not the intended recipient of this mail SDL requests and requires that you delete it without acting upon or copying any of its contents, and we further request that you advise us. SDL PLC is a public limited company registered in England and Wales. Registered number: 02675207. Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6 7DY, UK. This message has been scanned for malware by Websense. www.websense.com Click here https://www.mailcontrol.com/sr
Re: Unable to connect
Are you running the driver program (that is your application process) in your desktop and trying to run it on the cluster in EC2? It could very well be a hostname mismatch in some way due to the all the public hostname, private hostname, private ip, firewall craziness of ec2. You have to probably try different combinations of hostname and ip. First of all, you could try to submit applications from with one of nodes inside ec2 to narrow down the problem. Additionally, see the Configuration page in the Spark Standalone documentation to configure which hostname is used (for binding) by Spark Standalone daemons. TD On Fri, Mar 13, 2015 at 2:41 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I am running spark streaming standalone in ec2 and I am trying to run wordcount example from my desktop. The program is unable to connect to the master, in the logs I see, which seems to be an issue with hostname. 15/03/13 17:37:44 ERROR EndpointWriter: dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp:// sparkMaster@54.69.22.4:7077/]] arriving at [akka.tcp:// sparkMaster@54.69.22.4:7077] inbound addresses are [akka.tcp://sparkMaster@ip-10-241-251-232:7077]
Re: Using rdd methods with Dstream
Is the number of top K elements you want to keep small? That is, is K small? In which case, you can 1. either do it in the driver on the array DStream.foreachRDD ( rdd = { val topK = rdd.top(K) ; // use top K }) 2. Or, you can use the topK to create another RDD using sc.makeRDD DStream.transform ( rdd = { val topK = rdd.top(K) ; rdd.context.makeRDD(topK, numPartitions) }) TD On Fri, Mar 13, 2015 at 5:58 AM, Laeeq Ahmed laeeqsp...@yahoo.com.invalid wrote: Hi, Earlier my code was like follwing but slow due to repartition. I want top K of each window in a stream. val counts = keyAndValues.map(x = math.round(x._3.toDouble)).countByValueAndWindow(Seconds(4), Seconds(4)) val topCounts = counts.repartition(1).map(_.swap).transform(rdd = rdd.sortByKey(false)).map(_.swap).mapPartitions(rdd = rdd.take(10)) so I thought to use dstream.transform(rdd=rdd.top()) but this return Array rather than rdd. I have to perform further steps on topCounts dstream. [ERROR] found : Array[(Long, Long)] [ERROR] required: org.apache.spark.rdd.RDD[?] [ERROR] val topCounts = counts.transform(rdd = rdd.top(10)) Regards, Laeeq On Friday, March 13, 2015 1:47 PM, Sean Owen so...@cloudera.com wrote: Hm, aren't you able to use the SparkContext here? DStream operations happen on the driver. So you can parallelize() the result? take() won't work as it's not the same as top() On Fri, Mar 13, 2015 at 11:23 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Like this? dtream.repartition(1).mapPartitions(it = it.take(5)) Thanks Best Regards On Fri, Mar 13, 2015 at 4:11 PM, Laeeq Ahmed laeeqsp...@yahoo.com.invalid wrote: Hi, I normally use dstream.transform whenever I need to use methods which are available in RDD API but not in streaming API. e.g. dstream.transform(x = x.sortByKey(true)) But there are other RDD methods which return types other than RDD. e.g. dstream.transform(x = x.top(5)) top here returns Array. In the second scenario, how can i return RDD rather than array, so that i can perform further steps on dstream. Regards, Laeeq - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Partitioning
If you want to access the keys in an RDD that is partition by key, then you can use RDD.mapPartition(), which gives you access to the whole partition as an iteratorkey, value. You have the option of maintaing the partitioning information or not by setting the preservePartitioning flag in mapPartition (see docs). But use it at your own risk. If you modify the keys, and yet preserve partitioning, the partitioning would not make sense any more as the hash of the keys have changed. TD On Fri, Mar 13, 2015 at 2:26 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I am trying to look for a documentation on partitioning, which I can't seem to find. I am looking at spark streaming and was wondering how does it partition RDD in a multi node environment. Where are the keys defined that is used for partitioning? For instance in below example keys seem to be implicit: Which one is key and which one is value? Or is it called a flatMap because there are no keys? // Split each line into words JavaDStreamString words = lines.flatMap( new FlatMapFunctionString, String() { @Override public IterableString call(String x) { return Arrays.asList(x.split( )); } }); And are Keys available inside of Function2 in case it's required for a given use case ? JavaPairDStreamString, Integer wordCounts = pairs.reduceByKey( new Function2Integer, Integer, Integer() { @Override public Integer call(Integer i1, Integer i2) throws Exception { return i1 + i2; } });
Re: Partitioning
If you want to learn about how Spark partitions the data based on keys, here is a recent talk about that http://www.slideshare.net/databricks/strata-sj-everyday-im-shuffling-tips-for-writing-better-spark-programs?related=1 Of course you can read the original Spark paper https://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf On Fri, Mar 13, 2015 at 3:52 PM, Gerard Maas gerard.m...@gmail.com wrote: In spark-streaming, the consumers will fetch data and put it into 'blocks'. Each block becomes a partition of the rdd generated during that batch interval. The size of each is block controlled by the conf: 'spark.streaming.blockInterval'. That is, the amount of data the consumer can collect in that time. The number of RDD partitions in a streaming interval will be then: batch interval/ spark.streaming.blockInterval * # of consumers. -kr, Gerard On Mar 13, 2015 11:18 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I still don't follow how spark is partitioning data in multi node environment. Is there a document on how spark does portioning of data. For eg: in word count eg how is spark distributing words to multiple nodes? On Fri, Mar 13, 2015 at 3:01 PM, Tathagata Das t...@databricks.com wrote: If you want to access the keys in an RDD that is partition by key, then you can use RDD.mapPartition(), which gives you access to the whole partition as an iteratorkey, value. You have the option of maintaing the partitioning information or not by setting the preservePartitioning flag in mapPartition (see docs). But use it at your own risk. If you modify the keys, and yet preserve partitioning, the partitioning would not make sense any more as the hash of the keys have changed. TD On Fri, Mar 13, 2015 at 2:26 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I am trying to look for a documentation on partitioning, which I can't seem to find. I am looking at spark streaming and was wondering how does it partition RDD in a multi node environment. Where are the keys defined that is used for partitioning? For instance in below example keys seem to be implicit: Which one is key and which one is value? Or is it called a flatMap because there are no keys? // Split each line into words JavaDStreamString words = lines.flatMap( new FlatMapFunctionString, String() { @Override public IterableString call(String x) { return Arrays.asList(x.split( )); } }); And are Keys available inside of Function2 in case it's required for a given use case ? JavaPairDStreamString, Integer wordCounts = pairs.reduceByKey( new Function2Integer, Integer, Integer() { @Override public Integer call(Integer i1, Integer i2) throws Exception { return i1 + i2; } });
Re: Using Neo4j with Apache Spark
What is GraphDatabaseService object that you are using? Instead of creating them on the driver (outside foreachRDD), can you create them inside the RDD.foreach? In general, the right pattern for doing this in the programming guide http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd So you should be doing (sorry for writing in scala) dstream.foreachRDD ((rdd: RDD, time: Time) = { rdd.foreachPartition(iterator = // Create GraphDatabaseService object, or fetch it from a pool of GraphDatabaseService objects // Use it to send the whole partition to Neo4j // Destroy the object or release it to the pool }) On Thu, Mar 12, 2015 at 1:15 AM, Gautam Bajaj gautam1...@gmail.com wrote: Neo4j is running externally. It has nothing to do with Spark processes. Basically, the problem is, I'm unable to figure out a way to store output of Spark on the database. As Spark Streaming requires Neo4j Core Java API to be serializable as well. The answer points out to using REST API but their performance is really poor when compared to Core Java API : http://www.rene-pickhardt.de/get-the-full-neo4j-power-by-using-the-core-java-api-for-traversing-your-graph-data-base-instead-of-cypher-query-language/ On Thu, Mar 12, 2015 at 5:09 PM, Tathagata Das t...@databricks.com wrote: Well the answers you got there are correct as well. Unfortunately I am not familiar with Neo4j enough to comment any more. Is the Neo4j graph database running externally (outside Spark cluster), or within the driver process, or on all the executors? Can you clarify that? TD On Thu, Mar 12, 2015 at 12:58 AM, Gautam Bajaj gautam1...@gmail.com wrote: Alright, I have also asked this question in StackOverflow: http://stackoverflow.com/questions/28896898/using-neo4j-with-apache-spark The code there is pretty neat. On Thu, Mar 12, 2015 at 4:55 PM, Tathagata Das t...@databricks.com wrote: I am not sure if you realized but the code snipper it pretty mangled up in the email we received. It might be a good idea to put the code in pastebin or gist, much much easier for everyone to read. On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r gautam1...@gmail.com wrote: I'm trying to use Neo4j with Apache Spark Streaming but I am finding serializability as an issue. Basically, I want Apache Spark to parse and bundle my data in real time. After, the data has been bundled it should be stored in the database, Neo4j. However, I am getting this error: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1264) at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297) at org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45) at twoGrams.Main$4.call(Main.java:102) at twoGrams.Main$4.call(Main.java:1) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.NotSerializableException: org.neo4j.kernel.EmbeddedGraphDatabase at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547
Re: Using Neo4j with Apache Spark
I am not sure if you realized but the code snipper it pretty mangled up in the email we received. It might be a good idea to put the code in pastebin or gist, much much easier for everyone to read. On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r gautam1...@gmail.com wrote: I'm trying to use Neo4j with Apache Spark Streaming but I am finding serializability as an issue. Basically, I want Apache Spark to parse and bundle my data in real time. After, the data has been bundled it should be stored in the database, Neo4j. However, I am getting this error: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1264) at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297) at org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45) at twoGrams.Main$4.call(Main.java:102) at twoGrams.Main$4.call(Main.java:1) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.NotSerializableException: org.neo4j.kernel.EmbeddedGraphDatabase at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) ... 17 more Here is my code: output a stream of type: JavaPairDStreamString, ArrayListlt;String output.foreachRDD( new Function2JavaPairRDDlt;String,ArrayListlt;String,Time,Void(){ @Override public Void call( JavaPairRDDString, ArrayListlt;String arg0, Time arg1) throws Exception { // TODO Auto-generated method stub arg0.foreach( new VoidFunctionTuple2lt;String,ArrayListlt;String(){ @Override public void call( Tuple2String, ArrayListlt;String arg0) throws Exception { // TODO Auto-generated method stub try( Transaction tx = graphDB.beginTx()){ if(Neo4jOperations.getHMacFromValue(graphDB, arg0._1)!=null) System.out.println(Alread in Database: + arg0._1); else{ Neo4jOperations.createHMac(graphDB, arg0._1); }
Re: Using Neo4j with Apache Spark
Well the answers you got there are correct as well. Unfortunately I am not familiar with Neo4j enough to comment any more. Is the Neo4j graph database running externally (outside Spark cluster), or within the driver process, or on all the executors? Can you clarify that? TD On Thu, Mar 12, 2015 at 12:58 AM, Gautam Bajaj gautam1...@gmail.com wrote: Alright, I have also asked this question in StackOverflow: http://stackoverflow.com/questions/28896898/using-neo4j-with-apache-spark The code there is pretty neat. On Thu, Mar 12, 2015 at 4:55 PM, Tathagata Das t...@databricks.com wrote: I am not sure if you realized but the code snipper it pretty mangled up in the email we received. It might be a good idea to put the code in pastebin or gist, much much easier for everyone to read. On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r gautam1...@gmail.com wrote: I'm trying to use Neo4j with Apache Spark Streaming but I am finding serializability as an issue. Basically, I want Apache Spark to parse and bundle my data in real time. After, the data has been bundled it should be stored in the database, Neo4j. However, I am getting this error: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1264) at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297) at org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45) at twoGrams.Main$4.call(Main.java:102) at twoGrams.Main$4.call(Main.java:1) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.NotSerializableException: org.neo4j.kernel.EmbeddedGraphDatabase at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) ... 17 more Here is my code: output a stream of type: JavaPairDStreamString, ArrayListlt;String output.foreachRDD( new Function2JavaPairRDDlt;String,ArrayListlt;String,Time,Void(){ @Override public Void call( JavaPairRDDString, ArrayListlt;String arg0, Time arg1) throws Exception { // TODO Auto-generated method stub arg0.foreach( new VoidFunctionTuple2lt;String,ArrayListlt;String(){ @Override public void call( Tuple2String
Re: Using Neo4j with Apache Spark
Well, that's why I had also suggested using a pool of the GraphDBService objects :) Also present in the programming guide link I had given. TD On Thu, Mar 12, 2015 at 7:38 PM, Gautam Bajaj gautam1...@gmail.com wrote: Thanks a ton! That worked. However, this may have performance issue. As for each partition, I'd need to restart the server, that was the basic reason I was creating graphDb object outside this loop. On Fri, Mar 13, 2015 at 5:34 AM, Tathagata Das t...@databricks.com wrote: (Putting user@spark back in the to list) In the gist, you are creating graphDB object way outside the RDD.foreachPartition. I said last time, create the graphDB object inside the RDD.foreachPartition. You are creating it outside DStream.foreachRDD, and then using it from inside the rdd.foreachPartition. That is bringing the graphDB object in the task closure, and hence the system is trying to serialize the graphDB object when its serializing the closure. If you create the graphDB object inside the RDD.foreachPartition, then the closure will not refer to any prior graphDB object and therefore not serialize anything. On Thu, Mar 12, 2015 at 3:46 AM, Gautam Bajaj gautam1...@gmail.com wrote: Here: https://gist.github.com/d34th4ck3r/0c99d1e9fa288e0cc8ab I'll add the flag and send you stack trace, I have meetings now. On Thu, Mar 12, 2015 at 6:28 PM, Tathagata Das t...@databricks.com wrote: Could you show us that version of the code? Also helps to turn on java flag of extended debug info. That will show the lineage of objects leading to the nonserilaizable one. On Mar 12, 2015 1:32 AM, Gautam Bajaj gautam1...@gmail.com wrote: I tried that too. It result in same serializability issue. GraphDatabaseSerive that I'm using is : GraphDatabaseFactory() : http://neo4j.com/api_docs/2.0.0/org/neo4j/graphdb/factory/GraphDatabaseFactory.html On Thu, Mar 12, 2015 at 5:21 PM, Tathagata Das t...@databricks.com wrote: What is GraphDatabaseService object that you are using? Instead of creating them on the driver (outside foreachRDD), can you create them inside the RDD.foreach? In general, the right pattern for doing this in the programming guide http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd So you should be doing (sorry for writing in scala) dstream.foreachRDD ((rdd: RDD, time: Time) = { rdd.foreachPartition(iterator = // Create GraphDatabaseService object, or fetch it from a pool of GraphDatabaseService objects // Use it to send the whole partition to Neo4j // Destroy the object or release it to the pool }) On Thu, Mar 12, 2015 at 1:15 AM, Gautam Bajaj gautam1...@gmail.com wrote: Neo4j is running externally. It has nothing to do with Spark processes. Basically, the problem is, I'm unable to figure out a way to store output of Spark on the database. As Spark Streaming requires Neo4j Core Java API to be serializable as well. The answer points out to using REST API but their performance is really poor when compared to Core Java API : http://www.rene-pickhardt.de/get-the-full-neo4j-power-by-using-the-core-java-api-for-traversing-your-graph-data-base-instead-of-cypher-query-language/ On Thu, Mar 12, 2015 at 5:09 PM, Tathagata Das t...@databricks.com wrote: Well the answers you got there are correct as well. Unfortunately I am not familiar with Neo4j enough to comment any more. Is the Neo4j graph database running externally (outside Spark cluster), or within the driver process, or on all the executors? Can you clarify that? TD On Thu, Mar 12, 2015 at 12:58 AM, Gautam Bajaj gautam1...@gmail.com wrote: Alright, I have also asked this question in StackOverflow: http://stackoverflow.com/questions/28896898/using-neo4j-with-apache-spark The code there is pretty neat. On Thu, Mar 12, 2015 at 4:55 PM, Tathagata Das t...@databricks.com wrote: I am not sure if you realized but the code snipper it pretty mangled up in the email we received. It might be a good idea to put the code in pastebin or gist, much much easier for everyone to read. On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r gautam1...@gmail.com wrote: I'm trying to use Neo4j with Apache Spark Streaming but I am finding serializability as an issue. Basically, I want Apache Spark to parse and bundle my data in real time. After, the data has been bundled it should be stored in the database, Neo4j. However, I am getting this error: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1264) at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297) at org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45
Re: Handling worker batch processing during driver shutdown
What version of Spark are you using. You may be hitting a known but solved bug where the receivers would not get stop signal and (stopGracefully = true) would wait for a while for the receivers to stop indefinitely. Try setting stopGracefully to false and see if it works. This bug should have been solved in spark 1.2.1 https://issues.apache.org/jira/browse/SPARK-5035 TD On Thu, Mar 12, 2015 at 7:48 PM, Jose Fernandez jfernan...@sdl.com wrote: Thanks for the reply! Theoretically I should be able to do as you suggest as I follow the pool design pattern from the documentation, but I don’t seem to be able to run any code after .stop() is called. override def main(args: Array[String]) { // setup val ssc = new StreamingContext(sparkConf, Seconds(streamTime)) val inputStreams = (1 to numReceivers).map(i = ssc.receiverStream(custom receiver)) val messages = ssc.union(inputStreams) messages.foreachRDD { rdd = rdd.foreachPartition { p = val indexer = Indexer.getInstance() p.foreach(Indexer.process(_) match { case Some(entry) = indexer.index(entry) case None = }) Indexer.returnInstance(indexer) } } messages.print() sys.ShutdownHookThread { logInfo(** Shutdown hook triggered **) ssc.stop(false, true) logInfo(** Shutdown finished **) ssc.stop(true) } ssc.start() ssc.awaitTermination() } The first shutdown log message is always displayed, but the second message never does. I’ve tried multiple permutations of the stop function calls and even used try/catch around it. I’m running in yarn-cluster mode using Spark 1.2 on CDH 5.3. I stop the application with yarn application -kill appID. *From:* Tathagata Das [mailto:t...@databricks.com] *Sent:* Thursday, March 12, 2015 1:29 PM *To:* Jose Fernandez *Cc:* user@spark.apache.org *Subject:* Re: Handling worker batch processing during driver shutdown Can you access the batcher directly? Like is there is there a handle to get access to the batchers on the executors by running a task on that executor? If so, after the streamingContext has been stopped (not the SparkContext), then you can use `sc.makeRDD()` to run a dummy task like this. sc.makeRDD(1 to 1000, 1000).foreach { x = Batcher.get().flush() } With large number of tasks and no other jobs running in the system, at least one task will run in each executor and therefore will flush the batcher. TD On Thu, Mar 12, 2015 at 12:27 PM, Jose Fernandez jfernan...@sdl.com wrote: Hi folks, I have a shutdown hook in my driver which stops the streaming context cleanly. This is great as workers can finish their current processing unit before shutting down. Unfortunately each worker contains a batch processor which only flushes every X entries. We’re indexing to different indices in elasticsearch and using the bulk index request for performance. As far as Spark is concerned, once data is added to the batcher it is considered processed, so our workers are being shut down with data still in the batcher. Is there any way to coordinate the shutdown with the workers? I haven’t had any luck searching for a solution online. I would appreciate any suggestions you may have. Thanks :) http://www.sdl.com/innovate/sanfran SDL PLC confidential, all rights reserved. If you are not the intended recipient of this mail SDL requests and requires that you delete it without acting upon or copying any of its contents, and we further request that you advise us. SDL PLC is a public limited company registered in England and Wales. Registered number: 02675207. Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6 7DY, UK. This message has been scanned for malware by Websense. www.websense.com Click here https://www.mailcontrol.com/sr/XpJmGbjdkhnGX2PQPOmvUq10po4Wab0lISUyc4KGaMSKtJYdrvwK9eA2sGsCtyo!dlcWvDplrZSU8yB5sLY89Q== to report this email as spam. http://www.sdl.com/innovate/sanfran SDL PLC confidential, all rights reserved. If you are not the intended recipient of this mail SDL requests and requires that you delete it without acting upon or copying any of its contents, and we further request that you advise us. SDL PLC is a public limited company registered in England and Wales. Registered number: 02675207. Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6 7DY, UK.
Re: Writing to a single file from multiple executors
If you use DStream.saveAsHadoopFiles (or equivalent RDD ops) with the appropriate output format (for Avro) then each partition of the RDDs will be written to a different file. However there is probably going to be a large number of small files and you may have to run a separate compaction phase to coalesce them into larger files. On Mar 12, 2015 9:47 AM, Maiti, Samya samya.ma...@philips.com wrote: Hi TD, I want to append my record to a AVRO file which will be later used for querying. Having a single file is not mandatory for us but then how can we make the executors append the AVRO data to multiple files. Thanks, Sam On Mar 12, 2015, at 4:09 AM, Tathagata Das t...@databricks.com wrote: Why do you have to write a single file? On Wed, Mar 11, 2015 at 1:00 PM, SamyaMaiti samya.maiti2...@gmail.com wrote: Hi Experts, I have a scenario, where in I want to write to a avro file from a streaming job that reads data from kafka. But the issue is, as there are multiple executors and when all try to write to a given file I get a concurrent exception. I way to mitigate the issue is to repartition have a single writer task, but as my data is huge that is not a feasible option. Any suggestions welcomed. Regards, Sam -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Writing-to-a-single-file-from-multiple-executors-tp22003.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- The information contained in this message may be confidential and legally protected under applicable law. The message is intended solely for the addressee(s). If you are not the intended recipient, you are hereby notified that any use, forwarding, dissemination, or reproduction of this message is strictly prohibited and may be unlawful. If you are not the intended recipient, please contact the sender by return e-mail and destroy all copies of the original message.
Re: Handling worker batch processing during driver shutdown
Can you access the batcher directly? Like is there is there a handle to get access to the batchers on the executors by running a task on that executor? If so, after the streamingContext has been stopped (not the SparkContext), then you can use `sc.makeRDD()` to run a dummy task like this. sc.makeRDD(1 to 1000, 1000).foreach { x = Batcher.get().flush() } With large number of tasks and no other jobs running in the system, at least one task will run in each executor and therefore will flush the batcher. TD On Thu, Mar 12, 2015 at 12:27 PM, Jose Fernandez jfernan...@sdl.com wrote: Hi folks, I have a shutdown hook in my driver which stops the streaming context cleanly. This is great as workers can finish their current processing unit before shutting down. Unfortunately each worker contains a batch processor which only flushes every X entries. We’re indexing to different indices in elasticsearch and using the bulk index request for performance. As far as Spark is concerned, once data is added to the batcher it is considered processed, so our workers are being shut down with data still in the batcher. Is there any way to coordinate the shutdown with the workers? I haven’t had any luck searching for a solution online. I would appreciate any suggestions you may have. Thanks :) http://www.sdl.com/innovate/sanfran SDL PLC confidential, all rights reserved. If you are not the intended recipient of this mail SDL requests and requires that you delete it without acting upon or copying any of its contents, and we further request that you advise us. SDL PLC is a public limited company registered in England and Wales. Registered number: 02675207. Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6 7DY, UK. This message has been scanned for malware by Websense. www.websense.com
Re: Efficient Top count in each window
Why are you repartitioning 1? That would obviously be slow, you are converting a distributed operation to a single node operation. Also consider using RDD.top(). If you define the ordering right (based on the count), then you will get top K across then without doing a shuffle for sortByKey. Much cheaper. TD On Thu, Mar 12, 2015 at 11:06 AM, Laeeq Ahmed laeeqsp...@yahoo.com.invalid wrote: Hi, I have a streaming application where am doing top 10 count in each window which seems slow. Is there efficient way to do this. val counts = keyAndValues.map(x = math.round(x._3.toDouble)).countByValueAndWindow(Seconds(4), Seconds(4)) val topCounts = counts.repartition(1).map(_.swap).transform(rdd = rdd.sortByKey(false)).map(_.swap).mapPartitions(rdd = rdd.take(10)) Regards, Laeeq
Re: Writing to a single file from multiple executors
Why do you have to write a single file? On Wed, Mar 11, 2015 at 1:00 PM, SamyaMaiti samya.maiti2...@gmail.com wrote: Hi Experts, I have a scenario, where in I want to write to a avro file from a streaming job that reads data from kafka. But the issue is, as there are multiple executors and when all try to write to a given file I get a concurrent exception. I way to mitigate the issue is to repartition have a single writer task, but as my data is huge that is not a feasible option. Any suggestions welcomed. Regards, Sam -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Writing-to-a-single-file-from-multiple-executors-tp22003.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming recover from Checkpoint with Spark SQL
Can you show us the code that you are using? This might help. This is the updated streaming programming guide for 1.3, soon to be up, this is a quick preview. http://people.apache.org/~tdas/spark-1.3.0-temp-docs/streaming-programming-guide.html#dataframe-and-sql-operations TD On Wed, Mar 11, 2015 at 12:20 PM, Marius Soutier mps@gmail.com wrote: Forgot to mention, it works when using .foreachRDD(_.saveAsTextFile(“”)). On 11.03.2015, at 18:35, Marius Soutier mps@gmail.com wrote: Hi, I’ve written a Spark Streaming Job that inserts into a Parquet, using stream.foreachRDD(_insertInto(“table”, overwrite = true). Now I’ve added checkpointing; everything works fine when starting from scratch. When starting from a checkpoint however, the job doesn’t work and produces the following exception in the foreachRDD: ERROR org.apache.spark.streaming.scheduler.JobScheduler: Error running job streaming job 142609383 ms.2 org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x = rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063. at org.apache.spark.rdd.RDD.sc(RDD.scala:90) at org.apache.spark.rdd.RDD.init(RDD.scala:143) at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:108) at org.apache.spark.sql.SQLContext.createSchemaRDD(SQLContext.scala:114) at MyStreamingJob$$anonfun$createComputation$3.apply(MyStreamingJob:167) at MyStreamingJob$$anonfun$createComputation$3.apply(MyStreamingJob:167) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:535) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:535) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) Cheers - Marius - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Compilation error
If you are using tools like SBT/Maven/Gradle/etc, they figure out all the recursive dependencies and includes them in the class path. I haven't touched Eclipse in years so I am not sure off the top of my head what's going on instead. Just in case you only downloaded the spark-streaming_2.10.jar then that is indeed insufficient and you have to download all the recursive dependencies. May be you should create a Maven project inside Eclipse? TD On Tue, Mar 10, 2015 at 11:00 AM, Mohit Anchlia mohitanch...@gmail.com wrote: How do I do that? I haven't used Scala before. Also, linking page doesn't mention that: http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html#linking On Tue, Mar 10, 2015 at 10:57 AM, Sean Owen so...@cloudera.com wrote: It means you do not have Scala library classes in your project classpath. On Tue, Mar 10, 2015 at 5:54 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I am trying out streaming example as documented and I am using spark 1.2.1 streaming from maven for Java. When I add this code I get compilation error on and eclipse is not able to recognize Tuple2. I also don't see any import scala.Tuple2 class. http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html#a-quick-example private void map(JavaReceiverInputDStreamString lines) { JavaDStreamString words = lines.flatMap( new FlatMapFunctionString, String() { @Override public IterableString call(String x) { return Arrays.asList(x.split( )); } }); // Count each word in each batch JavaPairDStreamString, Integer pairs = words.map( new PairFunctionString, String, Integer() { @Override public Tuple2String, Integer call(String s) throws Exception { return new Tuple2String, Integer(s, 1); } }); }
Re: Compilation error
You have to include Scala libraries in the Eclipse dependencies. TD On Tue, Mar 10, 2015 at 10:54 AM, Mohit Anchlia mohitanch...@gmail.com wrote: I am trying out streaming example as documented and I am using spark 1.2.1 streaming from maven for Java. When I add this code I get compilation error on and eclipse is not able to recognize Tuple2. I also don't see any import scala.Tuple2 class. http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html#a-quick-example *private* *void* map(JavaReceiverInputDStreamString lines) { JavaDStreamString words = lines.flatMap( *new* *FlatMapFunctionString, String()* { @Override *public* IterableString call(String x) { *return* Arrays.*asList*(x.split( )); } }); // Count each word in each batch JavaPairDStreamString, Integer pairs = words.*map*( *new* *PairFunctionString, String, Integer()* { @Override *public* *Tuple2*String, Integer call(String s) *throws* Exception { *return* *new* *Tuple2*String, Integer(s, 1); } }); }
Re: Why spark master consumes 100% CPU when we kill a spark streaming app?
Do you have event logging enabled? That could be the problem. The Master tries to aggressively recreate the web ui of the completed job with the event logs (when it is enabled) causing the Master to stall. I created a JIRA for this. https://issues.apache.org/jira/browse/SPARK-6270 On Tue, Mar 10, 2015 at 7:10 PM, Xuelin Cao xuelincao2...@gmail.com wrote: Hey, Recently, we found in our cluster, that when we kill a spark streaming app, the whole cluster cannot response for 10 minutes. And, we investigate the master node, and found the master process consumes 100% CPU when we kill the spark streaming app. How could it happen? Did anyone had the similar problem before?
Re: Compilation error
See if you can import scala libraries in your project. On Tue, Mar 10, 2015 at 11:32 AM, Mohit Anchlia mohitanch...@gmail.com wrote: I am using maven and my dependency looks like this, but this doesn't seem to be working dependencies dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming_2.10/artifactId version1.2.0/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.2.1/version /dependency /dependencies On Tue, Mar 10, 2015 at 11:06 AM, Tathagata Das t...@databricks.com wrote: If you are using tools like SBT/Maven/Gradle/etc, they figure out all the recursive dependencies and includes them in the class path. I haven't touched Eclipse in years so I am not sure off the top of my head what's going on instead. Just in case you only downloaded the spark-streaming_2.10.jar then that is indeed insufficient and you have to download all the recursive dependencies. May be you should create a Maven project inside Eclipse? TD On Tue, Mar 10, 2015 at 11:00 AM, Mohit Anchlia mohitanch...@gmail.com wrote: How do I do that? I haven't used Scala before. Also, linking page doesn't mention that: http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html#linking On Tue, Mar 10, 2015 at 10:57 AM, Sean Owen so...@cloudera.com wrote: It means you do not have Scala library classes in your project classpath. On Tue, Mar 10, 2015 at 5:54 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I am trying out streaming example as documented and I am using spark 1.2.1 streaming from maven for Java. When I add this code I get compilation error on and eclipse is not able to recognize Tuple2. I also don't see any import scala.Tuple2 class. http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html#a-quick-example private void map(JavaReceiverInputDStreamString lines) { JavaDStreamString words = lines.flatMap( new FlatMapFunctionString, String() { @Override public IterableString call(String x) { return Arrays.asList(x.split( )); } }); // Count each word in each batch JavaPairDStreamString, Integer pairs = words.map( new PairFunctionString, String, Integer() { @Override public Tuple2String, Integer call(String s) throws Exception { return new Tuple2String, Integer(s, 1); } }); }
Re: Spark Streaming input data source list
Spark Streaming has StreamingContext.socketStream() http://spark.apache.org/docs/1.2.1/api/java/org/apache/spark/streaming/StreamingContext.html#socketStream(java.lang.String, int, scala.Function1, org.apache.spark.storage.StorageLevel, scala.reflect.ClassTag) TD On Mon, Mar 9, 2015 at 11:37 AM, Cui Lin cui@hds.com wrote: Dear all, Could you send me a list for input data source that spark streaming could support? My list is HDFS, Kafka, textfile?… I am wondering if spark streaming could directly read data from certain port (443 e.g.) that my devices directly send to? Best regards, Cui Lin
Re: Spark Streaming input data source list
Link to custom receiver guide https://spark.apache.org/docs/latest/streaming-custom-receivers.html On Mon, Mar 9, 2015 at 5:55 PM, Shao, Saisai saisai.s...@intel.com wrote: Hi Lin, AFAIK, currently there’s no built-in receiver API for RDBMs, but you can customize your own receiver to get data from RDBMs, for the details you can refer to the docs. Thanks Jerry *From:* Cui Lin [mailto:cui@hds.com] *Sent:* Tuesday, March 10, 2015 8:36 AM *To:* Tathagata Das *Cc:* user@spark.apache.org *Subject:* Re: Spark Streaming input data source list Tathagata, Thanks for your quick response. The link is helpful to me. Do you know any API for streaming data from RMDB ? Best regards, Cui Lin *From: *Tathagata Das t...@databricks.com *Date: *Monday, March 9, 2015 at 11:28 AM *To: *Cui Lin cui@hds.com *Cc: *user@spark.apache.org user@spark.apache.org *Subject: *Re: Spark Streaming input data source list Spark Streaming has StreamingContext.socketStream() http://spark.apache.org/docs/1.2.1/api/java/org/apache/spark/streaming/StreamingContext.html#socketStream(java.lang.String, int, scala.Function1, org.apache.spark.storage.StorageLevel, scala.reflect.ClassTag) TD On Mon, Mar 9, 2015 at 11:37 AM, Cui Lin cui@hds.com wrote: Dear all, Could you send me a list for input data source that spark streaming could support? My list is HDFS, Kafka, textfile?… I am wondering if spark streaming could directly read data from certain port (443 e.g.) that my devices directly send to? Best regards, Cui Lin
Re: Spark Streaming Switchover Time
It is probably the time taken by the system to figure out that the worker is down. Could you see in the logs to find what goes on when you kill the worker? TD On Wed, Mar 4, 2015 at 6:20 AM, Nastooh Avessta (navesta) nave...@cisco.com wrote: Indeed. And am wondering if this switchover time can be decreased. Cheers, [image: http://www.cisco.com/web/europe/images/email/signature/logo05.jpg] *Nastooh Avessta* ENGINEER.SOFTWARE ENGINEERING nave...@cisco.com Phone: *+1 604 647 1527 %2B1%20604%20647%201527* *Cisco Systems Limited* 595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121 VANCOUVER BRITISH COLUMBIA V7X 1J1 CA Cisco.com http://www.cisco.com/ [image: Think before you print.]Think before you print. This email may contain confidential and privileged material for the sole use of the intended recipient. Any review, use, distribution or disclosure by others is strictly prohibited. If you are not the intended recipient (or authorized to receive for the recipient), please contact the sender by reply email and delete all copies of this message. For corporate legal information go to: http://www.cisco.com/web/about/doing_business/legal/cri/index.html Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J 2T3. Phone: 416-306-7000; Fax: 416-306-7099. *Preferences http://www.cisco.com/offer/subscribe/?sid=000478326 - Unsubscribe http://www.cisco.com/offer/unsubscribe/?sid=000478327 – Privacy http://www.cisco.com/web/siteassets/legal/privacy.html* *From:* Tathagata Das [mailto:t...@databricks.com] *Sent:* Tuesday, March 03, 2015 11:11 PM *To:* Nastooh Avessta (navesta) *Cc:* user@spark.apache.org *Subject:* Re: Spark Streaming Switchover Time I am confused. Are you killing the 1st worker node to see whether the system restarts the receiver on the second worker? TD On Tue, Mar 3, 2015 at 10:49 PM, Nastooh Avessta (navesta) nave...@cisco.com wrote: This is the time that it takes for the driver to start receiving data once again, from the 2nd worker, when the 1st worker, where streaming thread was initially running, is shutdown. Cheers, [image: http://www.cisco.com/web/europe/images/email/signature/logo05.jpg] *Nastooh Avessta* ENGINEER.SOFTWARE ENGINEERING nave...@cisco.com Phone: *+1 604 647 1527 %2B1%20604%20647%201527* *Cisco Systems Limited* 595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121 VANCOUVER BRITISH COLUMBIA V7X 1J1 CA Cisco.com http://www.cisco.com/ [image: Think before you print.]Think before you print. This email may contain confidential and privileged material for the sole use of the intended recipient. Any review, use, distribution or disclosure by others is strictly prohibited. If you are not the intended recipient (or authorized to receive for the recipient), please contact the sender by reply email and delete all copies of this message. For corporate legal information go to: http://www.cisco.com/web/about/doing_business/legal/cri/index.html Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J 2T3. Phone: 416-306-7000; Fax: 416-306-7099. *Preferences http://www.cisco.com/offer/subscribe/?sid=000478326 - Unsubscribe http://www.cisco.com/offer/unsubscribe/?sid=000478327 – Privacy http://www.cisco.com/web/siteassets/legal/privacy.html* *From:* Tathagata Das [mailto:t...@databricks.com] *Sent:* Tuesday, March 03, 2015 10:24 PM *To:* Nastooh Avessta (navesta) *Cc:* user@spark.apache.org *Subject:* Re: Spark Streaming Switchover Time Can you elaborate on what is this switchover time? TD On Tue, Mar 3, 2015 at 9:57 PM, Nastooh Avessta (navesta) nave...@cisco.com wrote: Hi On a standalone, Spark 1.0.0, with 1 master and 2 workers, operating in client mode, running a udp streaming application, I am noting around 2 second elapse time on switchover, upon shutting down the streaming worker, where streaming window length is 1 sec. I am wondering what parameters are available to the developer to shorten this switchover time. Cheers, [image: http://www.cisco.com/web/europe/images/email/signature/logo05.jpg] *Nastooh Avessta* ENGINEER.SOFTWARE ENGINEERING nave...@cisco.com Phone: *+1 604 647 1527 %2B1%20604%20647%201527* *Cisco Systems Limited* 595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121 VANCOUVER BRITISH COLUMBIA V7X 1J1 CA Cisco.com http://www.cisco.com/ [image: Think before you print.]Think before you print. This email may contain confidential and privileged material for the sole use of the intended recipient. Any review, use, distribution or disclosure by others is strictly prohibited. If you are not the intended recipient (or authorized to receive for the recipient), please contact the sender by reply email and delete all copies of this message. For corporate legal information go to: http://www.cisco.com/web/about/doing_business/legal/cri/index.html
Re: Spark Streaming - Duration 1s not matching reality
Hint: Print() just gives a sample of what is in the data, and does not enforce the processing on all the data (only the first partition of the rdd is computed to get 10 items). Count() actually processes all the data. This is all due to lazy eval, if you don't need to use all the data, don't compute all the data :) HTH TD On Thu, Mar 5, 2015 at 3:10 PM, eleroy ele...@msn.com wrote: Hello, Getting started with Spark. Got JavaNetworkWordcount working on a 3 node cluster. netcat on with a infinite loop printing random numbers 0-100 With a duration of 1sec, I do see a list of (word, count) values every second. The list is limited to 10 values (as per the docs) The count is ~6000 counts per number. I assume that since my input is random numbers from 0 to 100, and i count 6000 for each, the distribution being homogeneous, that would mean 600,000 values are being ingested. I switch to using a constant number, and then I'm seeing between 200,000 and 2,000,000 counts, but the console response is erratic: it's not 1sec anymore, it's sometimes 2, sometimes more, and sometimes much faster... I am looking to do 1-to-1 processing (one value outputs one result) so I replaced the flatMap function with a map function, and do my calculation. Now I'd like to know how many events I was able to process but it's not clear at all: If I use print, it's fast again (1sec) but I only see the first 10 results. I was trying to add a counter... and realize the counter only seem to increment by only 11 each time This is very confusing... It looks like the counter is only incremented on the elements affected by the print statement... so does that mean the other values are not even calculated until requested? If i use .count() on the output RDD, then I do see a realistic count, but then it doesn't take 1sec anymore: it's more 4 to 5sec to get 600,000 - 1,000,000 events counted. I'm not sure where to get from here or how to benchmark the time to actually process the events Any hint or useful link would be appreciated. Thanks for your help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Duration-1s-not-matching-reality-tp21938.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Why can't Spark Streaming recover from the checkpoint directory when using a third party library for processingmulti-line JSON?
That could be a corner case bug. How do you add the 3rd party library to the class path of the driver? Through spark-submit? Could you give the command you used? TD On Wed, Mar 4, 2015 at 12:42 AM, Emre Sevinc emre.sev...@gmail.com wrote: I've also tried the following: Configuration hadoopConfiguration = new Configuration(); hadoopConfiguration.set(multilinejsoninputformat.member, itemSet); JavaStreamingContext ssc = JavaStreamingContext.getOrCreate(checkpointDirectory, hadoopConfiguration, factory, false); but I still get the same exception. Why doesn't getOrCreate ignore that Hadoop configuration part (which normally works, e.g. when not recovering)? -- Emre On Tue, Mar 3, 2015 at 3:36 PM, Emre Sevinc emre.sev...@gmail.com wrote: Hello, I have a Spark Streaming application (that uses Spark 1.2.1) that listens to an input directory, and when new JSON files are copied to that directory processes them, and writes them to an output directory. It uses a 3rd party library to process the multi-line JSON files ( https://github.com/alexholmes/json-mapreduce). You can see the relevant part of the streaming application at: https://gist.github.com/emres/ec18ee264e4eb0dd8f1a When I run this application locally, it works perfectly fine. But then I wanted to test whether it could recover from failure, e.g. if I stopped it right in the middle of processing some files. I started the streaming application, copied 100 files to the input directory, and hit Ctrl+C when it has alread processed about 50 files: ... 2015-03-03 15:06:20 INFO FileInputFormat:280 - Total input paths to process : 1 2015-03-03 15:06:20 INFO FileInputFormat:280 - Total input paths to process : 1 2015-03-03 15:06:20 INFO FileInputFormat:280 - Total input paths to process : 1 [Stage 0:== (65 + 4) / 100] ^C Then I started the application again, expecting that it could recover from the checkpoint. For a while it started to read files again and then gave an exception: ... 2015-03-03 15:06:39 INFO FileInputFormat:280 - Total input paths to process : 1 2015-03-03 15:06:39 INFO FileInputFormat:280 - Total input paths to process : 1 2015-03-03 15:06:39 WARN SchemaValidatorDriver:145 - * * * hadoopConfiguration: itemSet 2015-03-03 15:06:40 ERROR Executor:96 - Exception in task 0.0 in stage 0.0 (TID 0) java.io.IOException: Missing configuration value for multilinejsoninputformat.member at com.alexholmes.json.mapreduce.MultiLineJsonInputFormat.createRecordReader(MultiLineJsonInputFormat.java:30) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.init(NewHadoopRDD.scala:133) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) at org.apache.spark.rdd.RDD.iterator(RDD.scala:245) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) at org.apache.spark.rdd.RDD.iterator(RDD.scala:245) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Since in the exception it refers to a missing configuration multilinejsoninputformat.member, I think it is about the following line: ssc.ssc().sc().hadoopConfiguration().set( multilinejsoninputformat.member, itemSet); And this is why I also log the value of it, and as you can see above, just before it gives the exception in the recovery process, it shows that multilinejsoninputformat.member is set to itemSet. But somehow it is not found during the recovery. This exception happens only when it tries to recover from a previously interrupted run. I've also tried moving the above line into the createContext method, but still had the same exception. Why is that? And how can I work around it? -- Emre Sevinç http://www.bigindustries.be/
Re: Is FileInputDStream returned by fileStream method a reliable receiver?
The file stream does not use receiver. May be that was not clear in the programming guide. I am updating it for 1.3 release right now, I will make it more clear. And file stream has full reliability. Read this in the programming guide. http://spark.apache.org/docs/latest/streaming-programming-guide.html#semantics-with-files-as-input-source On Wed, Mar 4, 2015 at 2:14 AM, Emre Sevinc emre.sev...@gmail.com wrote: Is FileInputDStream returned by fileStream method a reliable receiver? In the Spark Streaming Guide it says: There can be two kinds of data sources based on their *reliability*. Sources (like Kafka and Flume) allow the transferred data to be acknowledged. If the system receiving data from these *reliable* sources acknowledge the received data correctly, it can be ensured that no data gets lost due to any kind of failure. This leads to two kinds of receivers. 1. *Reliable Receiver* - A *reliable receiver* correctly acknowledges a reliable source that the data has been received and stored in Spark with replication. 2. *Unreliable Receiver* - These are receivers for sources that do not support acknowledging. Even for reliable sources, one may implement an unreliable receiver that do not go into the complexity of acknowledging correctly. So I wonder whether the receivers for HDFS (and local file system) are reliable, e.g. when I'm using fileStream method to process files in a directory locally or on HDFS? -- Emre Sevinç
Re: Why different numbers of partitions give different results for the same computation on the same dataset?
You can use DStream.transform() to do any arbitrary RDD transformations on the RDDs generated by a DStream. val coalescedDStream = myDStream.transform { _.coalesce(...) } On Tue, Mar 3, 2015 at 1:47 PM, Saiph Kappa saiph.ka...@gmail.com wrote: Sorry I made a mistake in my code. Please ignore my question number 2. Different numbers of partitions give *the same* results! On Tue, Mar 3, 2015 at 7:32 PM, Saiph Kappa saiph.ka...@gmail.com wrote: Hi, I have a spark streaming application, running on a single node, consisting mainly of map operations. I perform repartitioning to control the number of CPU cores that I want to use. The code goes like this: val ssc = new StreamingContext(sparkConf, Seconds(5)) val distFile = ssc.textFileStream(/home/myuser/spark-example/dump) val words = distFile.repartition(cores.toInt).flatMap(_.split( )) .filter(_.length 3) val wordCharValues = words.map(word = { var sum = 0 word.toCharArray.foreach(c = {sum += c.toInt}) sum.toDouble / word.length.toDouble }).foreachRDD(rdd = { println(MEAN: + rdd.mean()) }) I have 2 questions: 1) How can I use coalesce in this code instead of repartition? 2) Why, using the same dataset (which is a small file processed within a single batch), the result that I obtain for the mean varies with the number of partitions? If I don't call the repartition method, the result is always the same for every execution, as it should be. But repartitioning for instance in 2 partitions gives a different mean value than using 8 partitions. I really don't understand why given that my code is deterministic. Can someone enlighten me on this? Thanks.
Re: Spark Streaming Switchover Time
Can you elaborate on what is this switchover time? TD On Tue, Mar 3, 2015 at 9:57 PM, Nastooh Avessta (navesta) nave...@cisco.com wrote: Hi On a standalone, Spark 1.0.0, with 1 master and 2 workers, operating in client mode, running a udp streaming application, I am noting around 2 second elapse time on switchover, upon shutting down the streaming worker, where streaming window length is 1 sec. I am wondering what parameters are available to the developer to shorten this switchover time. Cheers, [image: http://www.cisco.com/web/europe/images/email/signature/logo05.jpg] *Nastooh Avessta* ENGINEER.SOFTWARE ENGINEERING nave...@cisco.com Phone: *+1 604 647 1527 %2B1%20604%20647%201527* *Cisco Systems Limited* 595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121 VANCOUVER BRITISH COLUMBIA V7X 1J1 CA Cisco.com http://www.cisco.com/ [image: Think before you print.]Think before you print. This email may contain confidential and privileged material for the sole use of the intended recipient. Any review, use, distribution or disclosure by others is strictly prohibited. If you are not the intended recipient (or authorized to receive for the recipient), please contact the sender by reply email and delete all copies of this message. For corporate legal information go to: http://www.cisco.com/web/about/doing_business/legal/cri/index.html Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J 2T3. Phone: 416-306-7000; Fax: 416-306-7099. *Preferences http://www.cisco.com/offer/subscribe/?sid=000478326 - Unsubscribe http://www.cisco.com/offer/unsubscribe/?sid=000478327 – Privacy http://www.cisco.com/web/siteassets/legal/privacy.html*
Re: Spark Streaming Switchover Time
I am confused. Are you killing the 1st worker node to see whether the system restarts the receiver on the second worker? TD On Tue, Mar 3, 2015 at 10:49 PM, Nastooh Avessta (navesta) nave...@cisco.com wrote: This is the time that it takes for the driver to start receiving data once again, from the 2nd worker, when the 1st worker, where streaming thread was initially running, is shutdown. Cheers, [image: http://www.cisco.com/web/europe/images/email/signature/logo05.jpg] *Nastooh Avessta* ENGINEER.SOFTWARE ENGINEERING nave...@cisco.com Phone: *+1 604 647 1527 %2B1%20604%20647%201527* *Cisco Systems Limited* 595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121 VANCOUVER BRITISH COLUMBIA V7X 1J1 CA Cisco.com http://www.cisco.com/ [image: Think before you print.]Think before you print. This email may contain confidential and privileged material for the sole use of the intended recipient. Any review, use, distribution or disclosure by others is strictly prohibited. If you are not the intended recipient (or authorized to receive for the recipient), please contact the sender by reply email and delete all copies of this message. For corporate legal information go to: http://www.cisco.com/web/about/doing_business/legal/cri/index.html Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J 2T3. Phone: 416-306-7000; Fax: 416-306-7099. *Preferences http://www.cisco.com/offer/subscribe/?sid=000478326 - Unsubscribe http://www.cisco.com/offer/unsubscribe/?sid=000478327 – Privacy http://www.cisco.com/web/siteassets/legal/privacy.html* *From:* Tathagata Das [mailto:t...@databricks.com] *Sent:* Tuesday, March 03, 2015 10:24 PM *To:* Nastooh Avessta (navesta) *Cc:* user@spark.apache.org *Subject:* Re: Spark Streaming Switchover Time Can you elaborate on what is this switchover time? TD On Tue, Mar 3, 2015 at 9:57 PM, Nastooh Avessta (navesta) nave...@cisco.com wrote: Hi On a standalone, Spark 1.0.0, with 1 master and 2 workers, operating in client mode, running a udp streaming application, I am noting around 2 second elapse time on switchover, upon shutting down the streaming worker, where streaming window length is 1 sec. I am wondering what parameters are available to the developer to shorten this switchover time. Cheers, [image: http://www.cisco.com/web/europe/images/email/signature/logo05.jpg] *Nastooh Avessta* ENGINEER.SOFTWARE ENGINEERING nave...@cisco.com Phone: *+1 604 647 1527 %2B1%20604%20647%201527* *Cisco Systems Limited* 595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121 VANCOUVER BRITISH COLUMBIA V7X 1J1 CA Cisco.com http://www.cisco.com/ [image: Think before you print.]Think before you print. This email may contain confidential and privileged material for the sole use of the intended recipient. Any review, use, distribution or disclosure by others is strictly prohibited. If you are not the intended recipient (or authorized to receive for the recipient), please contact the sender by reply email and delete all copies of this message. For corporate legal information go to: http://www.cisco.com/web/about/doing_business/legal/cri/index.html Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J 2T3. Phone: 416-306-7000; Fax: 416-306-7099. *Preferences http://www.cisco.com/offer/subscribe/?sid=000478326 - Unsubscribe http://www.cisco.com/offer/unsubscribe/?sid=000478327 – Privacy http://www.cisco.com/web/siteassets/legal/privacy.html*
Re: Race Condition in Streaming Thread
Are you sure the multiple invocations are not from previous runs of the program? TD On Fri, Feb 27, 2015 at 12:16 PM, Nastooh Avessta (navesta) nave...@cisco.com wrote: Hi Under Spark 1.0.0, standalone, client mode am trying to invoke a 3rd party udp traffic generator, from the streaming thread. The excerpt is as follows: … do{ try { p = Runtime.getRuntime().exec(Prog ); socket.receive(packet); output.clear(); kryo.writeObject(output, packet); store(output); … Program has a test to check for existing instantiation, e.g. [ $(pidof Prog) ] exit. This code runs fine, i.e., 3rd party application is invoked, data is received, analyzed on driver, etc. Problem arises, when I test redundancy and fault-tolerance. Specifically, when I manually terminate Prog, upon recovery, multiple invocations are observed. This could be due to multiple threads getting through [ $(pidof Prog) ] exit. However, I was hoping by adding semaphores, as follows, to avoid this problem: … do{ try { sem.acquire(); p = Runtime.getRuntime().exec(Prog); sem.release(); }catch(IOException ioe){ //ioe.printStackTrace(); break; } socket.receive(packet); //InetAddress returnIPAddress = packet.getAddress(); returnPort = packet.getPort(); output.clear(); kryo.writeObject(output, packet); store(output); … However, I am still seeing multiple invocations of Prog, upon recovery. I have also experimented with the following configuration parameters, to no avail: sparkConf.set(spark.cores.max, args[1]); sparkConf.set(spark.task.cpus, args[2]); sparkConf.set(spark.default.parallelism,args[2]); with args={(1,1),(2,1), (1,2),…} Any thoughts? Cheers, [image: http://www.cisco.com/web/europe/images/email/signature/logo05.jpg] *Nastooh Avessta* ENGINEER.SOFTWARE ENGINEERING nave...@cisco.com Phone: *+1 604 647 1527 %2B1%20604%20647%201527* *Cisco Systems Limited* 595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121 VANCOUVER BRITISH COLUMBIA V7X 1J1 CA Cisco.com http://www.cisco.com/ [image: Think before you print.]Think before you print. This email may contain confidential and privileged material for the sole use of the intended recipient. Any review, use, distribution or disclosure by others is strictly prohibited. If you are not the intended recipient (or authorized to receive for the recipient), please contact the sender by reply email and delete all copies of this message. For corporate legal information go to: http://www.cisco.com/web/about/doing_business/legal/cri/index.html Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J 2T3. Phone: 416-306-7000; Fax: 416-306-7099. *Preferences http://www.cisco.com/offer/subscribe/?sid=000478326 - Unsubscribe http://www.cisco.com/offer/unsubscribe/?sid=000478327 – Privacy http://www.cisco.com/web/siteassets/legal/privacy.html*
Re: Race Condition in Streaming Thread
Its wasn't clear from the snippet whats going on. Can your provide the whole Receiver code? TD On Fri, Feb 27, 2015 at 12:37 PM, Nastooh Avessta (navesta) nave...@cisco.com wrote: I am, as I issue killall -9 Prog, prior to testing. Cheers, [image: http://www.cisco.com/web/europe/images/email/signature/logo05.jpg] *Nastooh Avessta* ENGINEER.SOFTWARE ENGINEERING nave...@cisco.com Phone: *+1 604 647 1527 %2B1%20604%20647%201527* *Cisco Systems Limited* 595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121 VANCOUVER BRITISH COLUMBIA V7X 1J1 CA Cisco.com http://www.cisco.com/ [image: Think before you print.]Think before you print. This email may contain confidential and privileged material for the sole use of the intended recipient. Any review, use, distribution or disclosure by others is strictly prohibited. If you are not the intended recipient (or authorized to receive for the recipient), please contact the sender by reply email and delete all copies of this message. For corporate legal information go to: http://www.cisco.com/web/about/doing_business/legal/cri/index.html Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J 2T3. Phone: 416-306-7000; Fax: 416-306-7099. *Preferences http://www.cisco.com/offer/subscribe/?sid=000478326 - Unsubscribe http://www.cisco.com/offer/unsubscribe/?sid=000478327 – Privacy http://www.cisco.com/web/siteassets/legal/privacy.html* *From:* Tathagata Das [mailto:t...@databricks.com] *Sent:* Friday, February 27, 2015 12:29 PM *To:* Nastooh Avessta (navesta) *Cc:* user@spark.apache.org *Subject:* Re: Race Condition in Streaming Thread Are you sure the multiple invocations are not from previous runs of the program? TD On Fri, Feb 27, 2015 at 12:16 PM, Nastooh Avessta (navesta) nave...@cisco.com wrote: Hi Under Spark 1.0.0, standalone, client mode am trying to invoke a 3rd party udp traffic generator, from the streaming thread. The excerpt is as follows: … do{ try { p = Runtime.getRuntime().exec(Prog ); socket.receive(packet); output.clear(); kryo.writeObject(output, packet); store(output); … Program has a test to check for existing instantiation, e.g. [ $(pidof Prog) ] exit. This code runs fine, i.e., 3rd party application is invoked, data is received, analyzed on driver, etc. Problem arises, when I test redundancy and fault-tolerance. Specifically, when I manually terminate Prog, upon recovery, multiple invocations are observed. This could be due to multiple threads getting through [ $(pidof Prog) ] exit. However, I was hoping by adding semaphores, as follows, to avoid this problem: … do{ try { sem.acquire(); p = Runtime.getRuntime().exec(Prog); sem.release(); }catch(IOException ioe){ //ioe.printStackTrace(); break; } socket.receive(packet); //InetAddress returnIPAddress = packet.getAddress(); returnPort = packet.getPort(); output.clear(); kryo.writeObject(output, packet); store(output); … However, I am still seeing multiple invocations of Prog, upon recovery. I have also experimented with the following configuration parameters, to no avail: sparkConf.set(spark.cores.max, args[1]); sparkConf.set(spark.task.cpus, args[2]); sparkConf.set(spark.default.parallelism,args[2]); with args={(1,1),(2,1), (1,2),…} Any thoughts? Cheers, [image: http://www.cisco.com/web/europe/images/email/signature/logo05.jpg] *Nastooh Avessta* ENGINEER.SOFTWARE ENGINEERING nave...@cisco.com Phone: *+1 604 647 1527 %2B1%20604%20647%201527* *Cisco Systems Limited* 595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121 VANCOUVER BRITISH COLUMBIA V7X 1J1 CA Cisco.com http://www.cisco.com/ [image: Think before you print.]Think before you print. This email may contain confidential and privileged material for the sole use of the intended recipient. Any review, use, distribution or disclosure by others is strictly prohibited. If you are not the intended recipient (or authorized to receive for the recipient), please contact the sender by reply email and delete all copies of this message. For corporate legal information go to: http://www.cisco.com/web/about/doing_business/legal/cri/index.html Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J 2T3. Phone: 416-306-7000; Fax: 416-306-7099. *Preferences http://www.cisco.com/offer/subscribe/?sid=000478326 - Unsubscribe http://www.cisco.com/offer/unsubscribe/?sid=000478327
Re: throughput in the web console?
If you have one receiver, and you are doing only map-like operaitons then the process will primarily happen on one machine. To use all the machines, either receiver in parallel with multiple receivers, or spread out the computation by explicitly repartitioning the received streams (DStream.repartition) with sufficient partitions to load balance across more machines. TD On Thu, Feb 26, 2015 at 9:52 AM, Saiph Kappa saiph.ka...@gmail.com wrote: One more question: while processing the exact same batch I noticed that giving more CPUs to the worker does not decrease the duration of the batch. I tried this with 4 and 8 CPUs. Though, I noticed that giving only 1 CPU the duration increased, but apart from that the values were pretty similar, whether I was using 4 or 6 or 8 CPUs. On Thu, Feb 26, 2015 at 5:35 PM, Saiph Kappa saiph.ka...@gmail.com wrote: By setting spark.eventLog.enabled to true it is possible to see the application UI after the application has finished its execution, however the Streaming tab is no longer visible. For measuring the duration of batches in the code I am doing something like this: «wordCharValues.foreachRDD(rdd = { val startTick = System.currentTimeMillis() val result = rdd.take(1) val timeDiff = System.currentTimeMillis() - startTick» But my quesiton is: is it possible to see the rate/throughput (records/sec) when I have a stream to process log files that appear in a folder? On Thu, Feb 26, 2015 at 1:36 AM, Tathagata Das t...@databricks.com wrote: Yes. # tuples processed in a batch = sum of all the tuples received by all the receivers. In screen shot, there was a batch with 69.9K records, and there was a batch which took 1 s 473 ms. These two batches can be the same, can be different batches. TD On Wed, Feb 25, 2015 at 10:11 AM, Josh J joshjd...@gmail.com wrote: If I'm using the kafka receiver, can I assume the number of records processed in the batch is the sum of the number of records processed by the kafka receiver? So in the screen shot attached the max rate of tuples processed in a batch is 42.7K + 27.2K = 69.9K tuples processed in a batch with a max processing time of 1 second 473 ms? On Wed, Feb 25, 2015 at 8:48 AM, Akhil Das ak...@sigmoidanalytics.com wrote: By throughput you mean Number of events processed etc? [image: Inline image 1] Streaming tab already have these statistics. Thanks Best Regards On Wed, Feb 25, 2015 at 9:59 PM, Josh J joshjd...@gmail.com wrote: On Wed, Feb 25, 2015 at 7:54 AM, Akhil Das ak...@sigmoidanalytics.com wrote: For SparkStreaming applications, there is already a tab called Streaming which displays the basic statistics. Would I just need to extend this tab to add the throughput? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Integrating Spark Streaming with Reactive Mongo
Hey Mike, I quickly looked through the example and I found major performance issue. You are collecting the RDDs to the driver and then sending them to Mongo in a foreach. Why not doing a distributed push to Mongo? WHAT YOU HAVE val mongoConnection = ... WHAT YOU SHUOLD DO rdd.foreachPartition { iterator = val connection = createConnection() iterator.foreach { ... push partition using connection ... } } On Thu, Feb 26, 2015 at 1:25 PM, Mike Trienis mike.trie...@orcsol.com wrote: Hi All, I have Spark Streaming setup to write data to a replicated MongoDB database and would like to understand if there would be any issues using the Reactive Mongo library to write directly to the mongoDB? My stack is Apache Spark sitting on top of Cassandra for the datastore, so my thinking is that the MongoDB connector for Hadoop will not be particular useful for me since I'm not using HDFS? Is there anything that I'm missing? Here is an example of code that I'm planning on using as a starting point for my implementation. LogAggregator https://github.com/chimpler/blog-spark-streaming-log-aggregation/blob/master/src/main/scala/com/chimpler/sparkstreaminglogaggregation/LogAggregator.scala Thanks, Mike.
Re: throughput in the web console?
Yes. # tuples processed in a batch = sum of all the tuples received by all the receivers. In screen shot, there was a batch with 69.9K records, and there was a batch which took 1 s 473 ms. These two batches can be the same, can be different batches. TD On Wed, Feb 25, 2015 at 10:11 AM, Josh J joshjd...@gmail.com wrote: If I'm using the kafka receiver, can I assume the number of records processed in the batch is the sum of the number of records processed by the kafka receiver? So in the screen shot attached the max rate of tuples processed in a batch is 42.7K + 27.2K = 69.9K tuples processed in a batch with a max processing time of 1 second 473 ms? On Wed, Feb 25, 2015 at 8:48 AM, Akhil Das ak...@sigmoidanalytics.com wrote: By throughput you mean Number of events processed etc? [image: Inline image 1] Streaming tab already have these statistics. Thanks Best Regards On Wed, Feb 25, 2015 at 9:59 PM, Josh J joshjd...@gmail.com wrote: On Wed, Feb 25, 2015 at 7:54 AM, Akhil Das ak...@sigmoidanalytics.com wrote: For SparkStreaming applications, there is already a tab called Streaming which displays the basic statistics. Would I just need to extend this tab to add the throughput? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Re: Many Receiver vs. Many threads per Receiver
Spark Streaming has a new Kafka direct stream, to be release as experimental feature with 1.3. That uses a low level consumer. Not sure if it satisfies your purpose. If you want more control, its best to create your own Receiver with the low level Kafka API. TD On Tue, Feb 24, 2015 at 12:09 AM, bit1...@163.com bit1...@163.com wrote: Thanks Akhil. Not sure whether thelowlevel consumer. https://github.com/dibbhatt/kafka-spark-consumerwill be officially supported by Spark Streaming. So far, I don't see it mentioned/documented in the spark streaming programming guide. -- bit1...@163.com *From:* Akhil Das ak...@sigmoidanalytics.com *Date:* 2015-02-24 16:21 *To:* bit1...@163.com *CC:* user user@spark.apache.org *Subject:* Re: Many Receiver vs. Many threads per Receiver I believe when you go with 1, it will distribute the consumer across your cluster (possibly on 6 machines), but still it i don't see a away to tell from which partition it will consume etc. If you are looking to have a consumer where you can specify the partition details and all, then you are better off with the lowlevel consumer. https://github.com/dibbhatt/kafka-spark-consumer Thanks Best Regards On Tue, Feb 24, 2015 at 9:36 AM, bit1...@163.com bit1...@163.com wrote: Hi, I am experimenting Spark Streaming and Kafka Integration, To read messages from Kafka in parallel, basically there are two ways 1. Create many Receivers like (1 to 6).map(_ = KakfaUtils.createStream). 2. Specifiy many threads when calling KakfaUtils.createStream like val topicMap(myTopic=6), this will create one receiver with 6 reading threads. My question is which option is better, sounds option 2 is better is to me because it saves a lot of cores(one Receiver one core), but I learned from somewhere else that choice 1 is better, so I would ask and see how you guys elaborate on this. Thank -- bit1...@163.com
Re: Why must the dstream.foreachRDD(...) parameter be serializable?
There are different kinds of checkpointing going on. updateStateByKey requires RDD checkpointing which can be enabled only by called sparkContext.setCheckpointDirectory. But that does not enable Spark Streaming driver checkpoints, which is necessary for recovering from driver failures. That is enabled only by streamingContext.checkpoint(...) which internally calls sparkContext.setCheckpointDirectory and also enables other stuff. TD On Mon, Feb 23, 2015 at 1:28 AM, Tobias Pfeiffer t...@preferred.jp wrote: Sean, thanks for your message! On Mon, Feb 23, 2015 at 6:03 PM, Sean Owen so...@cloudera.com wrote: What I haven't investigated is whether you can enable checkpointing for the state in updateStateByKey separately from this mechanism, which is exactly your question. What happens if you set a checkpoint dir, but do *not* use StreamingContext.getOrCreate, but *do* call DStream.checkpoint? I didn't even use StreamingContext.getOrCreate(), just calling streamingContext.checkpoint(...) blew everything up. Well, blew up in the sense that actor.OneForOneStrategy will print the stack trace of the java.io.NotSerializableException every couple of seconds and something is not going right with execution (I think). BUT, indeed, just calling sparkContext.setCheckpointDir seems to be sufficient for updateStateByKey! Looking at what streamingContext.checkpoint() does, I don't get why ;-) and I am not sure that this is a robust solution, but in fact that seems to work! Thanks a lot, Tobias
Re: Query data in Spark RRD
You could build a rest API, but you may have issue if you want to return back arbitrary binary data. A more complex but robust alternative is to use some RPC libraries like Akka, Thrift, etc. TD On Mon, Feb 23, 2015 at 12:45 AM, Nikhil Bafna nikhil.ba...@flipkart.com wrote: Tathagata - Yes, I'm thinking on that line. The problem is how to send to send the query to the backend? Bundle a http server into a spark streaming job, that will accept the parameters? -- Nikhil Bafna On Mon, Feb 23, 2015 at 2:04 PM, Tathagata Das t...@databricks.com wrote: You will have a build a split infrastructure - a front end that takes the queries from the UI and sends them to the backend, and the backend (running the Spark Streaming app) will actually run the queries on table created in the contexts. The RPCs necessary between the frontend and backend will need to be implemented by you. On Sat, Feb 21, 2015 at 11:57 PM, Nikhil Bafna nikhil.ba...@flipkart.com wrote: Yes. As my understanding, it would allow me to write SQLs to query a spark context. But, the query needs to be specified within a job deployed. What I want is to be able to run multiple dynamic queries specified at runtime from a dashboard. -- Nikhil Bafna On Sat, Feb 21, 2015 at 8:37 PM, Ted Yu yuzhih...@gmail.com wrote: Have you looked at http://spark.apache.org/docs/1.2.0/api/scala/index.html#org.apache.spark.sql.SchemaRDD ? Cheers On Sat, Feb 21, 2015 at 4:24 AM, Nikhil Bafna nikhil.ba...@flipkart.com wrote: Hi. My use case is building a realtime monitoring system over multi-dimensional data. The way I'm planning to go about it is to use Spark Streaming to store aggregated count over all dimensions in 10 sec interval. Then, from a dashboard, I would be able to specify a query over some dimensions, which will need re-aggregation from the already computed job. My query is, how can I run dynamic queries over data in schema RDDs? -- Nikhil Bafna
Re: Periodic Broadcast in Apache Spark Streaming
You could do something like this. def rddTrasnformationUsingBroadcast(rdd: RDD[...]): RDD[...] = { val broadcastToUse = getBroadcast()// get the reference to a broadcast variable, new or existing. rdd.map { .. } // use broadcast variable } dstream.transform(rddTrasnformationUsingBroadcast) The function `rddTrasnformationUsingBroadcast` will be called at every interval, which will call `getBroadcast` every time. That's an arbitrary function returning a broadcast variable, so you can update what broadcast variable to use whenever you want. TD On Wed, Feb 18, 2015 at 6:11 AM, aanilpala aanilp...@gmail.com wrote: I am implementing a stream learner for text classification. There are some single-valued parameters in my implementation that needs to be updated as new stream items arrive. For example, I want to change learning rate as the new predictions are made. However, I doubt that there is a way to broadcast variables after the initial broadcast. So what happens if I need to broadcast a variable every time I update it. If there is a way to do it or a workaround for what I want to accomplish in Spark Streaming, I'd be happy to hear about it. Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Periodic-Broadcast-in-Apache-Spark-Streaming-tp21703.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to diagnose could not compute split errors and failed jobs?
Could you find the executor logs on the executor where that task was scheduled? That may provide more information on what caused the error. Also take a look at where the block in question was stored, and where the task was scheduled. You will need to enabled log4j INFO level logs for this debugging. TD On Thu, Feb 19, 2015 at 10:38 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Not quiet sure, but this can be the case. One of your executor is stuck on GC pause while the other one asks for the data from it and hence the request timesout ending in that exception. You can try increasing the akk framesize and ack wait timeout as follows: .set(spark.core.connection.ack.wait.timeout,600) .set(spark.akka.frameSize,50) Thanks Best Regards On Fri, Feb 20, 2015 at 6:21 AM, Tim Smith secs...@gmail.com wrote: My streaming app runs fine for a few hours and then starts spewing Could not compute split, block input-xx-xxx not found errors. After this, jobs start to fail and batches start to pile up. My question isn't so much about why this error but rather, how do I trace what leads to this error? I am using disk+memory for storage so shouldn't be a case of data loss resulting from memory overrun. 15/02/18 22:04:49 ERROR JobScheduler: Error running job streaming job 142429705 ms.28 org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 247644.0 failed 64 times, most recent failure: Lost task 3.63 in stage 247644.0 (TID 3705290, node-dn1-16-test.abcdefg.com): java.lang.Exception: Could not compute split, block input-28-1424297042500 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) at org.apache.spark.rdd.RDD.iterator(RDD.scala:228) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Thanks, Tim
Re: Query data in Spark RRD
You will have a build a split infrastructure - a front end that takes the queries from the UI and sends them to the backend, and the backend (running the Spark Streaming app) will actually run the queries on table created in the contexts. The RPCs necessary between the frontend and backend will need to be implemented by you. On Sat, Feb 21, 2015 at 11:57 PM, Nikhil Bafna nikhil.ba...@flipkart.com wrote: Yes. As my understanding, it would allow me to write SQLs to query a spark context. But, the query needs to be specified within a job deployed. What I want is to be able to run multiple dynamic queries specified at runtime from a dashboard. -- Nikhil Bafna On Sat, Feb 21, 2015 at 8:37 PM, Ted Yu yuzhih...@gmail.com wrote: Have you looked at http://spark.apache.org/docs/1.2.0/api/scala/index.html#org.apache.spark.sql.SchemaRDD ? Cheers On Sat, Feb 21, 2015 at 4:24 AM, Nikhil Bafna nikhil.ba...@flipkart.com wrote: Hi. My use case is building a realtime monitoring system over multi-dimensional data. The way I'm planning to go about it is to use Spark Streaming to store aggregated count over all dimensions in 10 sec interval. Then, from a dashboard, I would be able to specify a query over some dimensions, which will need re-aggregation from the already computed job. My query is, how can I run dynamic queries over data in schema RDDs? -- Nikhil Bafna
Re: spark streaming window operations on a large window size
The default persistence level is MEMORY_AND_DISK, so the LRU policy would discard the blocks to disk, so the streaming app will not fail. However, since things will get constantly read in and out of disk as windows are processed, the performance wont be great. So it is best to have sufficient memory to keep all the window data in memory. TD On Mon, Feb 23, 2015 at 8:26 AM, Shao, Saisai saisai.s...@intel.com wrote: I don't think current Spark Streaming supports window operations which beyond its available memory, internally Spark Streaming puts all the data in the memory belongs to the effective window, if the memory is not enough, BlockManager will discard the blocks at LRU policy, so something unexpected will be occurred. Thanks Jerry -Original Message- From: avilevi3 [mailto:avile...@gmail.com] Sent: Monday, February 23, 2015 12:57 AM To: user@spark.apache.org Subject: spark streaming window operations on a large window size Hi guys, does spark streaming supports window operations on a sliding window that is data is larger than the available memory? we would like to currently we are using kafka as input, but we could change that if needed. thanks Avi -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-window-operations-on-a-large-window-size-tp21764.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Accumulator in SparkUI for streaming
Unless I am unaware some latest changes, the SparkUI shows stages, and jobs, not accumulator results. And the UI not designed to be pluggable for showing user-defined stuff. TD On Fri, Feb 20, 2015 at 12:25 AM, Tim Smith secs...@gmail.com wrote: On Spark 1.2: I am trying to capture # records read from a kafka topic: val inRecords = ssc.sparkContext.accumulator(0, InRecords) .. kInStreams.foreach( k = { k.foreachRDD ( rdd = inRecords += rdd.count().toInt ) inRecords.value Question is how do I get the accumulator to show up in the UI? I tried inRecords.value but that didn't help. Pretty sure it isn't showing up in Stage metrics. What's the trick here? collect? Thanks, Tim
Re: Re: About FlumeUtils.createStream
Distributed among cluster nodes. On Mon, Feb 23, 2015 at 8:45 PM, bit1...@163.com bit1...@163.com wrote: Hi, Akhil,Tathagata, This leads me to another question ,For the Spark Streaming and Kafka Integration, If there are more than one Receiver in the cluster, such as val streams = (1 to 6).map ( _ = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) ), then these Receivers will stay on one cluster node, or will they distributed among the cluster nodes? -- bit1...@163.com *From:* Akhil Das ak...@sigmoidanalytics.com *Date:* 2015-02-24 12:58 *To:* Tathagata Das t...@databricks.com *CC:* user user@spark.apache.org; bit1129 bit1...@163.com *Subject:* Re: About FlumeUtils.createStream I see, thanks for the clarification TD. On 24 Feb 2015 09:56, Tathagata Das t...@databricks.com wrote: Akhil, that is incorrect. Spark will list on the given port for Flume to push data into it. When in local mode, it will listen on localhost: When in some kind of cluster, instead of localhost you will have to give the hostname of the cluster node where you want Flume to forward the data. Spark will launch the Flume receiver on that node (assuming the hostname matching is correct), and list on port , for receiving data from Flume. So only the configured machine will listen on port . I suggest trying the other stream. FlumeUtils.createPollingStream. More details here. http://spark.apache.org/docs/latest/streaming-flume-integration.html On Sat, Feb 21, 2015 at 12:17 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Spark won't listen on mate, It basically means you have a flume source running at port of your localhost. And when you submit your application in standalone mode, workers will consume date from that port. Thanks Best Regards On Sat, Feb 21, 2015 at 9:22 AM, bit1...@163.com bit1...@163.com wrote: Hi, In the spark streaming application, I write the code, FlumeUtils.createStream(ssc,localhost,),which means spark will listen on the port, and wait for Flume Sink to write to it. My question is: when I submit the application to the Spark Standalone cluster, will be opened only on the Driver Machine or all the workers will also open the port and wait for the Flume data? --
Re: Magic number 16: Why doesn't Spark Streaming process more than 16 files?
In case this mystery has not been solved, DStream.print() essentially does a RDD.take(10) on each RDD, which computes only a subset of the partitions in the RDD. But collects forces the evaluation of all the RDDs. Since you are writing to json in the mapI() function, this could be the reason. TD On Wed, Feb 18, 2015 at 7:15 AM, Imran Rashid iras...@cloudera.com wrote: so if you only change this line: https://gist.github.com/emres/0fb6de128baea099e741#file-mymoduledriver-java-L137 to json.print() it processes 16 files instead? I am totally perplexed. My only suggestions to help debug are (1) see what happens when you get rid of MyModuleWorker completely -- change MyModuleDriver#process to just inStream.print() and see what happens (2) stick a bunch of printlns into MyModuleWorker#call (3) turn on DEBUG logging for org.apache.spark.streaming.dstream.FileInputDStream my gut instinct is that something else is flaky about the file input stream (eg., it makes some assumption about the file system which maybe aren't valid in your case, it has a bunch of caveats), and that it has just happened to work sometimes with your foreachRdd and failed sometimes with print. Sorry I am not a lot of help in this case, hope this leads you down the right track or somebody else can help out. Imran On Wed, Feb 18, 2015 at 2:28 AM, Emre Sevinc emre.sev...@gmail.com wrote: Hello Imran, (a) I know that all 20 files are processed when I use foreachRDD, because I can see the processed files in the output directory. (My application logic writes them to an output directory after they are processed, *but* that writing operation does not happen in foreachRDD, below you can see the URL that includes my code and clarifies this). (b) I know only 16 files are processed because in the output directory I see only 16 files processed. I wait for minutes and minutes and no more files appear in the output directory. When I see only 16 files are processed and Spark Streaming went to the mode of idly watching the input directory, and then if I copy a few more files, they are also processed. (c) Sure, you can see part of my code in the following gist: https://gist.github.com/emres/0fb6de128baea099e741 It might seem a little convoluted at first, because my application is divided into two classes, a Driver class (setting up things and initializing them), and a Worker class (that implements the core functionality). I've also put the relevant methods from the my utility classes for completeness. I am as perplexed as you are as to why forcing the output via foreachRDD ended up in different behaviour compared to simply using print() method. Kind regards, Emre On Tue, Feb 17, 2015 at 4:23 PM, Imran Rashid iras...@cloudera.com wrote: Hi Emre, there shouldn't be any difference in which files get processed w/ print() vs. foreachRDD(). In fact, if you look at the definition of print(), it is just calling foreachRDD() underneath. So there is something else going on here. We need a little more information to figure out exactly what is going on. (I think Sean was getting at the same thing ...) (a) how do you know that when you use foreachRDD, all 20 files get processed? (b) How do you know that only 16 files get processed when you print()? Do you know the other files are being skipped, or maybe they are just stuck somewhere? eg., suppose you start w/ 20 files, and you see 16 get processed ... what happens after you add a few more files to the directory? Are they processed immediately, or are they never processed either? (c) Can you share any more code of what you are doing to the dstreams *before* the print() / foreachRDD()? That might give us more details about what the difference is. I can't see how .count.println() would be different than just println(), but maybe I am missing something also. Imran On Mon, Feb 16, 2015 at 7:49 AM, Emre Sevinc emre.sev...@gmail.com wrote: Sean, In this case, I've been testing the code on my local machine and using Spark locally, so I all the log output was available on my terminal. And I've used the .print() method to have an output operation, just to force Spark execute. And I was not using foreachRDD, I was only using print() method on a JavaDStream object, and it was working fine for a few files, up to 16 (and without print() it did not do anything because there were no output operations). To sum it up, in my case: - Initially, use .print() and no foreachRDD: processes up to 16 files and does not do anything for the remaining 4. - Remove .print() and use foreachRDD: processes all of the 20 files. Maybe, as in Akhil Das's suggestion, using .count.print() might also have fixed my problem, but I'm satisfied with foreachRDD approach for now. (Though it is still a mystery to me why using .print() had a difference, maybe my mental model of Spark is wrong, I thought no matter what output
Re: Write ahead Logs and checkpoint
Exactly, that is the reason. To avoid that, in Spark 1.3 to-be-released, we have added a new Kafka API (called direct stream) which does not use Zookeeper at all to keep track of progress, and maintains offset within Spark Streaming. That can guarantee all records being received exactly-once. Its experimental for now, but we will make it stable. Please try it out. TD On Mon, Feb 23, 2015 at 9:41 PM, V Dineshkumar developer.dines...@gmail.com wrote: Hi, My spark streaming application is pulling data from Kafka.To prevent data loss I have implemented WAL and enable checkpointing.On killing my application and restarting it I am able to prevent data loss now but however I am getting duplicate messages. Is it because the application got killed before it was able checkpoint the current processing state?? If yes how to tackle the duplicate messages? Thanks, Dinesh
Re: Write ahead Logs and checkpoint
I think it will not affect. We are ignore the offsets store any where outside Spark Streaming. It is the fact that progress information was being stored in two different places (SS and Kafka/ZK) that was causing inconsistencies and duplicates. TD On Mon, Feb 23, 2015 at 11:27 PM, Felix C felixcheun...@hotmail.com wrote: Kafka 0.8.2 has built-in offset management, how would that affect direct stream in spark? Please see KAFKA-1012 --- Original Message --- From: Tathagata Das t...@databricks.com Sent: February 23, 2015 9:53 PM To: V Dineshkumar developer.dines...@gmail.com Cc: user user@spark.apache.org Subject: Re: Write ahead Logs and checkpoint Exactly, that is the reason. To avoid that, in Spark 1.3 to-be-released, we have added a new Kafka API (called direct stream) which does not use Zookeeper at all to keep track of progress, and maintains offset within Spark Streaming. That can guarantee all records being received exactly-once. Its experimental for now, but we will make it stable. Please try it out. TD On Mon, Feb 23, 2015 at 9:41 PM, V Dineshkumar developer.dines...@gmail.com wrote: Hi, My spark streaming application is pulling data from Kafka.To prevent data loss I have implemented WAL and enable checkpointing.On killing my application and restarting it I am able to prevent data loss now but however I am getting duplicate messages. Is it because the application got killed before it was able checkpoint the current processing state?? If yes how to tackle the duplicate messages? Thanks, Dinesh
Re: Any sample code for Kafka consumer
Spark Streaming already directly supports Kafka http://spark.apache.org/docs/latest/streaming-programming-guide.html#advanced-sources Is there any reason why that is not sufficient? TD On Sun, Feb 22, 2015 at 5:18 PM, mykidong mykid...@gmail.com wrote: In java, you can see this example: https://github.com/mykidong/spark-kafka-simple-consumer-receiver - Kidong. -- Original Message -- From: icecreamlc [via Apache Spark User List] [hidden email] http:///user/SendEmail.jtp?type=nodenode=21758i=0 To: mykidong [hidden email] http:///user/SendEmail.jtp?type=nodenode=21758i=1 Sent: 2015-02-21 오전 11:16:37 Subject: Any sample code for Kafka consumer Dear all, Do you have any sample code that consume data from Kafka server using spark streaming? Thanks a lot!!! -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Any-sample-code-for-Kafka-consumer-tp21746.html To unsubscribe from Apache Spark User List, click here. NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: Re: Any sample code for Kafka consumer http://apache-spark-user-list.1001560.n3.nabble.com/Any-sample-code-for-Kafka-consumer-tp21746p21758.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: In a Spark Streaming application, what might be the potential causes for util.AkkaUtils: Error sending message in 1 attempts and java.util.concurrent.TimeoutException: Futures timed out and
What version of Spark are you using? TD On Thu, Feb 19, 2015 at 2:45 AM, Emre Sevinc emre.sev...@gmail.com wrote: Hello, We have a Spark Streaming application that watches an input directory, and as files are copied there the application reads them and sends the contents to a RESTful web service, receives a response and write some contents to an output directory. When testing the application by copying a few thousand files at once to its input directory, we have realized that after having processed about 3800 files, it creates messages as the following in the log file: 15/02/19 10:22:06 INFO storage.MemoryStore: Block broadcast_17935 of size 9960 dropped from memory (free 447798720) 15/02/19 10:22:55 WARN util.AkkaUtils: Error sending message in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187) at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398) and then the Spark Streaming application dies. What might be the potential causes to check for such errors? Below you can see last few lines before it dies: 15/02/19 10:22:03 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 12894 15/02/19 10:22:04 INFO storage.MemoryStore: ensureFreeSpace(20978) called with curMem=107884847, maxMem=556038881 15/02/19 10:22:04 INFO storage.MemoryStore: Block broadcast_12894_piece0 stored as bytes in memory (estimated size 20.5 KB, free 427.4 MB) 15/02/19 10:22:04 INFO storage.BlockManagerMaster: Updated info of block broadcast_12894_piece0 15/02/19 10:22:04 INFO broadcast.TorrentBroadcast: Reading broadcast variable 12894 took 460 ms 15/02/19 10:22:04 INFO storage.MemoryStore: ensureFreeSpace(347363) called with curMem=107905825, maxMem=556038881 15/02/19 10:22:04 INFO storage.MemoryStore: Block broadcast_12894 stored as values in memory (estimated size 339.2 KB, free 427.0 MB) 15/02/19 10:22:04 INFO storage.MemoryStore: ensureFreeSpace(1079) called with curMem=108253188, maxMem=556038881 15/02/19 10:22:04 INFO storage.MemoryStore: Block rdd_30466_35 stored as bytes in memory (estimated size 1079.0 B, free 427.0 MB) 15/02/19 10:22:04 INFO storage.BlockManagerMaster: Updated info of block rdd_30466_35 15/02/19 10:22:05 INFO storage.MemoryStore: ensureFreeSpace(5) called with curMem=108254267, maxMem=556038881 15/02/19 10:22:05 INFO storage.MemoryStore: Block rdd_30467_35 stored as bytes in memory (estimated size 5.0 B, free 427.0 MB) 15/02/19 10:22:05 INFO storage.BlockManagerMaster: Updated info of block rdd_30467_35 15/02/19 10:22:05 INFO executor.Executor: Finished task 35.0 in stage 351.0 (TID 12229). 2353 bytes result sent to driver 15/02/19 10:22:06 INFO storage.BlockManager: Removing broadcast 17935 15/02/19 10:22:06 INFO storage.BlockManager: Removing block broadcast_17935_piece0 15/02/19 10:22:06 INFO storage.MemoryStore: Block broadcast_17935_piece0 of size 4151 dropped from memory (free 447788760) 15/02/19 10:22:06 INFO storage.BlockManagerMaster: Updated info of block broadcast_17935_piece0 15/02/19 10:22:06 INFO storage.BlockManager: Removing block broadcast_17935 15/02/19 10:22:06 INFO storage.MemoryStore: Block broadcast_17935 of size 9960 dropped from memory (free 447798720) 15/02/19 10:22:55 WARN util.AkkaUtils: Error sending message in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187) at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398) 15/02/19 10:23:28 WARN util.AkkaUtils: Error sending message in 2 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at
Re: spark-local dir running out of space during long ALS run
Correct, brute force clean up is not useful. Since Spark 1.0, Spark can do automatic cleanup of files based on which RDDs are used/garbage collected by JVM. That would be the best way, but depends on the JVM GC characteristics. If you force a GC periodically in the driver that might help you get rid of files in the workers that are not needed. TD On Mon, Feb 16, 2015 at 12:27 AM, Antony Mayi antonym...@yahoo.com.invalid wrote: spark.cleaner.ttl is not the right way - seems to be really designed for streaming. although it keeps the disk usage under control it also causes loss of rdds and broadcasts that are required later leading to crash. is there any other way? thanks, Antony. On Sunday, 15 February 2015, 21:42, Antony Mayi antonym...@yahoo.com wrote: spark.cleaner.ttl ? On Sunday, 15 February 2015, 18:23, Antony Mayi antonym...@yahoo.com wrote: Hi, I am running bigger ALS on spark 1.2.0 on yarn (cdh 5.3.0) - ALS is using about 3 billions of ratings and I am doing several trainImplicit() runs in loop within one spark session. I have four node cluster with 3TB disk space on each. before starting the job there is less then 8% of the disk space used. while the ALS is running I can see the disk usage rapidly growing mainly because of files being stored under yarn/local/usercache/user/appcache/application_XXX_YYY/spark-local-ZZZ-AAA. after about 10 hours the disk usage hits 90% and yarn kills the particular containers. am I missing doing some cleanup somewhere while looping over the several trainImplicit() calls? taking 4*3TB of disk space seems immense. thanks for any help, Antony.
Re: NaiveBayes classifier causes ShuffleDependency class cast exception
You cannot have two Spark Contexts in the same JVM active at the same time. Just create one SparkContext and then use it for both purpose. TD On Fri, Feb 6, 2015 at 8:49 PM, VISHNU SUBRAMANIAN johnfedrickena...@gmail.com wrote: Can you try creating just a single spark context and then try your code. If you want to use it for streaming pass the same sparkcontext object instead of conf. Note: Instead of just replying to me , try to use reply to all so that the post is visible for the community . That way you can expect immediate responses. On Fri, Feb 6, 2015 at 6:09 AM, aanilpala aanilp...@gmail.com wrote: I have the following code: SparkConf conf = new SparkConf().setAppName(streamer).setMaster(local[2]); conf.set(spark.driver.allowMultipleContexts, true); JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(batch_interval)); ssc.checkpoint(/tmp/spark/checkpoint); SparkConf conf2 = new SparkConf().setAppName(classifier).setMaster(local[1]); conf2.set(spark.driver.allowMultipleContexts, true); JavaSparkContext sc = new JavaSparkContext(conf); JavaReceiverInputDStreamString stream = ssc.socketTextStream(localhost, ); // String to Tuple3 Conversion JavaDStreamTuple3lt;Long, String, String tuple_stream = stream.map(new FunctionString, Tuple3lt;Long, String, String() { ... }); JavaPairDStreamInteger, DictionaryEntry raw_dictionary_stream = tuple_stream.filter(new FunctionTuple3lt;Long, String,String, Boolean() { @Override public Boolean call(Tuple3Long, String,String tuple) throws Exception { if((tuple._1()/Time.scaling_factor % training_interval) training_dur) NaiveBayes.train(sc.parallelize(training_set).rdd()); return true; } }). I am working on a text mining project and I want to use NaiveBayesClassifier of MLlib to classify some stream items. So, I have two Spark contexts one of which is a streaming context. The call to NaiveBayes.train causes the following exception. Any ideas? Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.ClassCastException: org.apache.spark.SparkContext$$anonfun$runJob$4 cannot be cast to org.apache.spark.ShuffleDependency at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:60) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
Re: Interact with streams in a non-blocking way
Here is an example of how you can do. Lets say myDStream contains the data that you may want to asynchornously query, say using, Spark SQL. val sqlContext = new SqlContext(streamingContext.sparkContext) myDStream.foreachRDD { rdd = // rdd is a RDD of case class sqlContext.registerRDDAsTable(rdd, mytable) } This will make sure that the table mytable always refers to the latest RDD generated by the DStream. Then from a diffrent thread you can asynchronously query sqlContext.sql(select * from mytable) Hope this helps. TD On Fri, Feb 13, 2015 at 3:59 AM, Sean Owen so...@cloudera.com wrote: Sure it's possible, but you would use Streaming to update some shared state, and create another service that accessed that shared state too. On Fri, Feb 13, 2015 at 11:57 AM, Tamas Jambor jambo...@gmail.com wrote: Thanks for the reply, I am trying to setup a streaming as a service approach, using the framework that is used for spark-jobserver. for that I would need to handle asynchronous operations that are initiated from outside of the stream. Do you think it is not possible? On Fri Feb 13 2015 at 10:14:18 Sean Owen so...@cloudera.com wrote: You call awaitTermination() in the main thread, and indeed it blocks there forever. From there Spark Streaming takes over, and is invoking the operations you set up. Your operations have access to the data of course. That's the model; you don't make external threads that reach in to Spark Streaming's objects, but can easily create operations that take whatever actions you want and invoke them in Streaming. On Fri, Feb 13, 2015 at 10:04 AM, jamborta jambo...@gmail.com wrote: Hi all, I am trying to come up with a workflow where I can query streams asynchronously. The problem I have is a ssc.awaitTermination() line blocks the whole thread, so it is not straightforward to me whether it is possible to get hold of objects from streams once they are started. any suggestion on what is the best way to implement this? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Interact-with-streams-in-a-non-blocking-way-tp21640.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Concurrent batch processing
So you have come across spark.streaming.concurrentJobs already :) Yeah, that is an undocumented feature that does allow multiple output operations to submitted in parallel. However, this is not made public for the exact reasons that you realized - the semantics in case of stateful operations is not clear. It is semantically safe to enable, how ever it may cause redundant computations, as next batches jobs may recompute some RDDs twice rather than using the cached values of another RDDs. In general, only in a very few cases is it useful to increase this concurrency. If batch processing times batch interval, then you need to use more resources, and parallelize the ingestion and processing enough to utilize those resources efficiently. The spikes that you see despite average hardware utilization is low probably indicates that the parallelization of the Spark Streaming jobs is insufficient. There are bunch of optimizations that can be done. http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-processing-time-of-each-batch If you have already done this, can you tell me more about what sort of utilization and psike do you see, and what sort of parallelization you have already done? TD On Thu, Feb 12, 2015 at 12:09 PM, Matus Faro matus.f...@kik.com wrote: I've been experimenting with my configuration for couple of days and gained quite a bit of power through small optimizations, but it may very well be something I'm doing crazy that is causing this problem. To give a little bit of a background, I am in the early stages of a project that consumes a stream of data in the order of 100,000 per second that requires processing over a sliding window over one day (ideally a week). Spark Streaming is a good candidate but I want to make sure I squash any performance issues ahead of time before I commit. With a 5 second batch size, in 40 minutes, the processing time is also 5 seconds. I see the CPU spikes over two seconds out of five. I assume the sliding window operation is very expensive in this case and that's the root cause of this effect. I should've done a little bit more research before I posted, I just came across a post about an undocumented property spark.streaming.concurrentJobs that I am about to try. I'm still confused how exactly this works with a sliding window where the result of one batch depends on the other. I assume the concurrency can only be achieved up until the window action is executed. Either way, I am going to give this a try and post back here if that doesn't work. Thanks! On Thu, Feb 12, 2015 at 2:55 PM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: It could depend on the nature of your application but spark streaming would use spark internally and concurrency should be there what is your use case? Are you sure that your configuration is good? On Fri, Feb 13, 2015 at 1:17 AM, Matus Faro matus.f...@kik.com wrote: Hi, Please correct me if I'm wrong, in Spark Streaming, next batch will not start processing until the previous batch has completed. Is there any way to be able to start processing the next batch if the previous batch is taking longer to process than the batch interval? The problem I am facing is that I don't see a hardware bottleneck in my Spark cluster, but Spark is not able to handle the amount of data I am pumping through (batch processing time is longer than batch interval). What I'm seeing is spikes of CPU, network and disk IO usage which I assume are due to different stages of a job, but on average, the hardware is under utilized. Concurrency in batch processing would allow the average batch processing time to be greater than batch interval while fully utilizing the hardware. Any ideas on what can be done? One option I can think of is to split the application into multiple applications running concurrently and dividing the initial stream of data between those applications. However, I would have to lose the benefits of having a single application. Thank you, Matus - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Re: How to broadcast a variable read from a file in yarn-cluster mode?
Can you give me the whole logs? TD On Tue, Feb 10, 2015 at 10:48 AM, Jon Gregg jonrgr...@gmail.com wrote: OK that worked and getting close here ... the job ran successfully for a bit and I got output for the first couple buckets before getting a java.lang.Exception: Could not compute split, block input-0-1423593163000 not found error. So I bumped up the memory at the command line from 2 gb to 5 gb, ran it again ... this time I got around 8 successful outputs before erroring. Bumped up the memory from 5 gb to 10 gb ... got around 15 successful outputs before erroring. I'm not persisting or caching anything except for the broadcast IP table and another broadcast small user agents list used for the same type of filtering, and both files are tiny. The Hadoop cluster is nearly empty right now and has more than enough available memory to handle this job. I am connecting to Kafka as well and so there's a lot of data coming through as my index is trying to catch up to the current date, but yarn-client mode has several times in the past few weeks been able to catch up to the current date and run successfully for days without issue. My guess is memory isn't being cleared after each bucket? Relevant portion of the log below. 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593134400 on phd40010023.na.com:1 in memory (size: 50.1 MB, free: 10.2 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593135400 on phd40010023.na.com:1 in memory (size: 24.9 MB, free: 10.2 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593136400 on phd40010023.na.com:1 in memory (size: 129.0 MB, free: 10.3 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593137000 on phd40010023.na.com:1 in memory (size: 112.4 MB, free: 10.4 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593137200 on phd40010023.na.com:1 in memory (size: 481.0 B, free: 10.4 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593137800 on phd40010023.na.com:1 in memory (size: 44.6 MB, free: 10.5 GB) 15/02/10 13:34:54 INFO MappedRDD: Removing RDD 754 from persistence list 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593138400 on phd40010023.na.com:1 in memory (size: 95.8 MB, free: 10.6 GB) 15/02/10 13:34:54 INFO BlockManager: Removing RDD 754 15/02/10 13:34:54 INFO MapPartitionsRDD: Removing RDD 753 from persistence list 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593138600 on phd40010023.na.com:1 in memory (size: 123.2 MB, free: 10.7 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593138800 on phd40010023.na.com:1 in memory (size: 5.2 KB, free: 10.7 GB) 15/02/10 13:34:54 INFO BlockManager: Removing RDD 753 15/02/10 13:34:54 INFO MappedRDD: Removing RDD 750 from persistence list 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593139600 on phd40010023.na.com:1 in memory (size: 106.4 MB, free: 10.8 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593139800 on phd40010023.na.com:1 in memory (size: 107.0 MB, free: 10.9 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-142359314 on phd40010023.na.com:1 in memory (size: 59.5 MB, free: 10.9 GB) 15/02/10 13:34:54 INFO BlockManager: Removing RDD 750 15/02/10 13:34:54 INFO MappedRDD: Removing RDD 749 from persistence list 15/02/10 13:34:54 INFO DAGScheduler: Missing parents: List(Stage 117, Stage 114, Stage 115, Stage 116) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593140200 on phd40010023.na.com:1 in memory (size: 845.0 B, free: 10.9 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593140600 on phd40010023.na.com:1 in memory (size: 19.2 MB, free: 11.0 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593140800 on phd40010023.na.com:1 in memory (size: 492.0 B, free: 11.0 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593141000 on phd40010023.na.com:1 in memory (size: 1018.0 B, free: 11.0 GB) 15/02/10 13:34:54 INFO BlockManager: Removing RDD 749 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593142400 on phd40010023.na.com:1 in memory (size: 48.6 MB, free: 11.0 GB) 15/02/10 13:34:54 INFO MappedRDD: Removing RDD 767 from persistence list 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593142600 on phd40010023.na.com:1 in memory (size: 4.9 KB, free: 11.0 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593142800 on phd40010023.na.com:1 in memory (size: 780.0 B, free: 11.0 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593143800 on phd40010023.na.com:1 in memory (size: 847.0 B, free: 11.0 GB) 15/02/10 13:34:54 INFO BlockManager: Removing RDD 767 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593144400 on phd40010023.na.com:1 in memory (size: 43.7 MB,
Re: Streaming scheduling delay
) on executor nodedn1-22-acme.com: java.lang.Exception (Could not compute split, block input-4-1423758372200 not found) [duplicate 54] 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.55 in stage 16291.0 (TID 1042745) on executor nodedn1-18-acme.com: java.lang.Exception (Could not compute split, block input-4-1423758372200 not found) [duplicate 55] 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.56 in stage 16291.0 (TID 1042754) on executor nodedn1-17-acme.com: java.lang.Exception (Could not compute split, block input-4-1423758372200 not found) [duplicate 56] 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.57 in stage 16291.0 (TID 1042758) on executor nodedn1-17-acme.com: java.lang.Exception (Could not compute split, block input-4-1423758372200 not found) [duplicate 57] 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.58 in stage 16291.0 (TID 1042762) on executor nodedn1-12-acme.com: java.lang.Exception (Could not compute split, block input-4-1423758372200 not found) [duplicate 58] 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.59 in stage 16291.0 (TID 1042766) on executor nodedn1-23-acme.com: java.lang.Exception (Could not compute split, block input-4-1423758372200 not found) [duplicate 59] 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.60 in stage 16291.0 (TID 1042774) on executor nodedn1-20-acme.com: java.lang.Exception (Could not compute split, block input-4-1423758372200 not found) [duplicate 60] 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.61 in stage 16291.0 (TID 1042779) on executor nodedn1-13-acme.com: java.lang.Exception (Could not compute split, block input-4-1423758372200 not found) [duplicate 61] 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.62 in stage 16291.0 (TID 1042789) on executor nodedn1-20-acme.com: java.lang.Exception (Could not compute split, block input-4-1423758372200 not found) [duplicate 62] 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.63 in stage 16291.0 (TID 1042793) on executor nodedn1-15-acme.com: java.lang.Exception (Could not compute split, block input-4-1423758372200 not found) [duplicate 63] org.apache.spark.SparkException: Job aborted due to stage failure: Task 54 in stage 16291.0 failed 64 times, most recent failure: Lost task 54.63 in stage 16291.0 (TID 1042793, nodedn1-15-acme.com): java.lang.Exception: Could not compute split, block input-4-1423758372200 not found Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 54 in stage 16291.0 failed 64 times, most recent failure: Lost task 54.63 in stage 16291.0 (TID 1042793, nodedn1-15-acme.com): java.lang.Exception: Could not compute split, block input-4-1423758372200 not found Thanks for looking into it. On Thu, Feb 12, 2015 at 8:10 PM, Tathagata Das t...@databricks.com wrote: Hey Tim, Let me get the key points. 1. If you are not writing back to Kafka, the delay is stable? That is, instead of foreachRDD { // write to kafka } if you do dstream.count, then the delay is stable. Right? 2. If so, then Kafka is the bottleneck. Is the number of partitions, that you spoke of the in the second mail, that determines the parallelism in writes? Is it stable with 30 partitions? Regarding the block exception, could you give me a trace of info level logging that leads to this error? Basically I want trace the lifecycle of the block. TD On Thu, Feb 12, 2015 at 6:29 PM, Tim Smith secs...@gmail.com wrote: Hi Gerard, Great write-up and really good guidance in there. I have to be honest, I don't know why but setting # of partitions for each dStream to a low number (5-10) just causes the app to choke/crash. Setting it to 20 gets the app going but with not so great delays. Bump it up to 30 and I start winning the war where processing time is consistently below batch time window (20 seconds) except for a batch every few batches where the compute time spikes 10x the usual. Following your guide, I took out some logInfo statements I had in the app but didn't seem to make much difference :( With a higher time window (20 seconds), I got the app to run stably for a few hours but then ran into the dreaded java.lang.Exception: Could not compute split, block input-0-1423761240800 not found. Wonder if I need to add RDD persistence back? Also, I am reaching out to Virdata with some ProServ inquiries. Thanks On Thu, Feb 12, 2015 at 4:30 AM, Gerard Maas gerard.m...@gmail.com wrote: Hi Tim, From this: There are 5 kafka receivers and each incoming stream is split into 40 partitions I suspect that you're creating too many tasks for Spark to process on time. Could you try some of the 'knobs' I describe here to see if that would help? http://www.virdata.com/tuning-spark/ -kr, Gerard. On Thu, Feb 12, 2015 at 8:44 AM, Tim Smith secs...@gmail.com wrote: Just read the thread Are these numbers abnormal for spark streaming? and I think I am seeing similar results - that is - increasing the window seems to be the trick
Re: Spark streaming job throwing ClassNotFound exception when recovering from checkpointing
Could you come up with a minimal example through which I can reproduce the problem? On Tue, Feb 10, 2015 at 12:30 PM, conor fennell.co...@gmail.com wrote: I am getting the following error when I kill the spark driver and restart the job: 15/02/10 17:31:05 INFO CheckpointReader: Attempting to load checkpoint from file hdfs://hdfs-namenode.vagrant:9000/reporting/SampleJob$0.1.0/checkpoint-142358910.bk 15/02/10 17:31:05 WARN CheckpointReader: Error reading checkpoint from file hdfs://hdfs-namenode.vagrant:9000/reporting/SampleJob$0.1.0/checkpoint-142358910.bk java.io.IOException: java.lang.ClassNotFoundException: com.example.spark.streaming.reporting.live.jobs.Bucket at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:988) at org.apache.spark.streaming.DStreamGraph.readObject(DStreamGraph.scala:176) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) Spark version is 1.2.0 The streaming job is executing every 10 seconds with the following steps: 1. Consuming JSON from a kafka topic called journeys and converting to case classes 2. Filters resulting journeys stream based on a time attribute being set 3. FlatMaps journeys into separate time buckets 15, 60 and 1440 minutes e.g. (HOUR1234569000, ActiveState(HOUR, 1234569000, hyperLogLog(journey id), 360) ) 4. ReduceByKey adding hyperloglogs 5. UpdateStateByKey to add to previous states hyperloglog 6. Then output results to Cassandra I have pasted in a sample app below to mimic the problem and put all classes into one file, it is also attached here SampleJob.scala http://apache-spark-user-list.1001560.n3.nabble.com/file/n21582/SampleJob.scala To get around the issue for the moment, I have removed the Bucket class and stopped passing in a bucket array to the ActiveJourney class. And instead I hard code all the time buckets I need in the ActiveJourney class; this approach works and recovers from checkpointing but is not extensible. Can the Spark gurus explain why I get that ClassNotFound exception? Need any more information, please let me know. Much thanks, Conor package com.example.spark.streaming.reporting.live.jobs import java.util.Date import scala.Array.canBuildFrom import scala.collection.mutable.MutableList import org.apache.spark.SparkConf import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.kafka.KafkaUtils import org.json4s.DefaultFormats import org.json4s.jackson.JsonMethods.parse import org.json4s.jvalue2extractable import org.json4s.string2JsonInput import com.example.spark.streaming.utils.MilliSecondUtils import com.example.spark.streaming.utils.constants.ColumnFamilies import com.example.spark.streaming.utils.constants.Constants import com.example.spark.streaming.utils.constants.Milliseconds import com.example.spark.streaming.utils.constants.SparkConfig import com.datastax.spark.connector.SomeColumns import com.datastax.spark.connector.streaming.toDStreamFunctions import com.datastax.spark.connector.toNamedColumnRef import com.twitter.algebird.HLL import com.twitter.algebird.HyperLogLogMonoid // Json parsing classes case class Journey(o: Option[JourneyCommand], o2: Option[JourneyDetails]) case class JourneyDetails(_id: String) case class JourneyCommand($set: Option[JourneySet]) case class JourneySet(awayAt: Date) // Class not found bucket case class Bucket(val bucketType: String, val roundDown: (Long) = Long, val columnFamily: String, val size: Long, val maxIntervals: Int) // used for updateStateByKey case class ActiveState(var bucketType: String, var time: Long, var hyperLogLog: HLL, var ttl: Int) object SampleJob { private final val Name = this.getClass().getSimpleName() def main(args: Array[String]) { if (args.length 8) { System.err.println(sUsage: $Name enviroment zkQuorum group topics numThreads hdfsUri cassandra intervalSeconds) System.exit(1) } System.out.print(args) val Array(environment, zkQuorum, group, topics, numThreads, hdfsUri, cassandra, intervalSeconds) = args val checkpointDirectory = hdfsUri + /reporting/ + Name + getClass(). getPackage().getImplementationVersion() def functionToCreateContext(): StreamingContext = { // how many buckets val fifteen = Bucket(QUARTER_HOUR, MilliSecondUtils. roundDownToNearestQuarterHour, ColumnFamilies.Visits_15, Milliseconds. FifteenMinutes, 90) val hour = Bucket(HOUR, MilliSecondUtils.roundDownToNearestHour, ColumnFamilies.Visits_60, Milliseconds.Hour, 360) val day
Re: streaming joining multiple streams
Sorry for the late response. With the amount of data you are planning join, any system would take time. However, between Hive's MapRduce joins, and Spark's basic shuffle, and Spark SQL's join, the latter wins hands down. Furthermore, with the APIs of Spark and Spark Streaming, you will have to do strictly less work to set the infrastructure that you want to build. Yes, Spark Streaming currently does not support providing own timer, because the logic to handle delays etc, is pretty complex and specific to each application. Usually that logic can be implemented on top of the windowing solutoin that Spark Streaming already provides. TD On Thu, Feb 5, 2015 at 7:37 AM, Zilvinas Saltys zilvinas.sal...@gmail.com wrote: The challenge I have is this. There's two streams of data where an event might look like this in stream1: (time, hashkey, foo1) and in stream2: (time, hashkey, foo2) The result after joining should be (time, hashkey, foo1, foo2) .. The join happens on hashkey and the time difference can be ~30 mins between events. The amount of data is enormous .. hundreds of billions of events per month. I need not only join the existing history data but continue to do so with incoming data (comes in batches not really streamed) For now I was thinking to implement this in MapReduce and sliding windows .. I'm wondering if spark can actually help me with this sort of challenge? How would a join of two huge streams of historic data would actually happen internally within spark and would it be more efficient than let's say hive map reduce stream join of two big tables? I also saw spark streaming has windowing support but it seems you cannot provide your own timer? As in I cannot make the time be derived from events itself rather than having an actual clock running. Thanks,
Re: Streaming scheduling delay
Hey Tim, Let me get the key points. 1. If you are not writing back to Kafka, the delay is stable? That is, instead of foreachRDD { // write to kafka } if you do dstream.count, then the delay is stable. Right? 2. If so, then Kafka is the bottleneck. Is the number of partitions, that you spoke of the in the second mail, that determines the parallelism in writes? Is it stable with 30 partitions? Regarding the block exception, could you give me a trace of info level logging that leads to this error? Basically I want trace the lifecycle of the block. On Thu, Feb 12, 2015 at 6:29 PM, Tim Smith secs...@gmail.com wrote: Hi Gerard, Great write-up and really good guidance in there. I have to be honest, I don't know why but setting # of partitions for each dStream to a low number (5-10) just causes the app to choke/crash. Setting it to 20 gets the app going but with not so great delays. Bump it up to 30 and I start winning the war where processing time is consistently below batch time window (20 seconds) except for a batch every few batches where the compute time spikes 10x the usual. Following your guide, I took out some logInfo statements I had in the app but didn't seem to make much difference :( With a higher time window (20 seconds), I got the app to run stably for a few hours but then ran into the dreaded java.lang.Exception: Could not compute split, block input-0-1423761240800 not found. Wonder if I need to add RDD persistence back? Also, I am reaching out to Virdata with some ProServ inquiries. Thanks On Thu, Feb 12, 2015 at 4:30 AM, Gerard Maas gerard.m...@gmail.com wrote: Hi Tim, From this: There are 5 kafka receivers and each incoming stream is split into 40 partitions I suspect that you're creating too many tasks for Spark to process on time. Could you try some of the 'knobs' I describe here to see if that would help? http://www.virdata.com/tuning-spark/ -kr, Gerard. On Thu, Feb 12, 2015 at 8:44 AM, Tim Smith secs...@gmail.com wrote: Just read the thread Are these numbers abnormal for spark streaming? and I think I am seeing similar results - that is - increasing the window seems to be the trick here. I will have to monitor for a few hours/days before I can conclude (there are so many knobs/dials). On Wed, Feb 11, 2015 at 11:16 PM, Tim Smith secs...@gmail.com wrote: On Spark 1.2 (have been seeing this behaviour since 1.0), I have a streaming app that consumes data from Kafka and writes it back to Kafka (different topic). My big problem has been Total Delay. While execution time is usually window size (in seconds), the total delay ranges from a minutes to hours(s) (keeps going up). For a little while, I thought I had solved the issue by bumping up the driver memory. Then I expanded my Kafka cluster to add more nodes and the issue came up again. I tried a few things to smoke out the issue and something tells me the driver is the bottleneck again: 1) From my app, I took out the entire write-out-to-kafka piece. Sure enough, execution, scheduling delay and hence total delay fell to sub second. This assured me that whatever processing I do before writing back to kafka isn't the bottleneck. 2) In my app, I had RDD persistence set at different points but my code wasn't really re-using any RDDs so I took out all explicit persist() statements. And added, spar...unpersist to true in the context. After this, it doesn't seem to matter how much memory I give my executor, the total delay seems to be in the same range. I tried per executor memory from 2G to 12G with no change in total delay so executors aren't memory starved. Also, in the SparkUI, under the Executors tab, all executors show 0/1060MB used when per executor memory is set to 2GB, for example. 3) Input rate in the kafka consumer restricts spikes in incoming data. 4) Tried FIFO and FAIR but didn't make any difference. 5) Adding executors beyond a certain points seems useless (I guess excess ones just sit idle). At any given point in time, the SparkUI shows only one batch pending processing. So with just one batch pending processing, why would the scheduling delay run into minutes/hours if execution time is within the batch window duration? There aren't any failed stages or jobs. Right now, I have 100 executors ( i have tried setting executors from 50-150), each with 2GB and 4 cores and the driver running with 16GB. There are 5 kafka receivers and each incoming stream is split into 40 partitions. Per receiver, input rate is restricted to 2 messages per second. Can anyone help me with clues or areas to look into, for troubleshooting the issue? One nugget I found buried in the code says: The scheduler delay includes the network delay to send the task to the worker machine and to send back the result (but not the time to fetch the task result, if it needed to be fetched from the block manager on the worker).
Re: StreamingContext getOrCreate with queueStream
I dont think your screenshots came through in the email. None the less, queueStream will not work with getOrCreate. Its mainly for testing (by generating your own RDDs) and not really useful for production usage (where you really need to checkpoint-based recovery). TD On Thu, Feb 5, 2015 at 4:12 PM, pnpritchard nicholas.pritch...@falkonry.com wrote: I am trying to use the StreamingContext getOrCreate method in my app. I started by running the example ( RecoverableNetworkWordCount https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala ), which worked as expected. However, when I modified that example to use /queueStream/ rather than /socketTextStream/ for it's input, then things broke down. I first ran it with an empty checkpoint directory, then restarted the app and got a NPE (copied below). Is this a known limitation of using queueStream? Am I assuming something by using it? Thanks in advance, for any advice! FYI, I changed line 73 in the example to be: -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/StreamingContext-getOrCreate-with-queueStream-tp21528.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: how to send JavaDStream RDD using foreachRDD using Java
Hello Sachin, While Akhil's solution is correct, this is not sufficient for your usecase. RDD.foreach (that Akhil is using) will run on the workers, but you are creating the Producer object on the driver. This will not work, a producer create on the driver cannot be used from the worker/executor. The best way to do what you want to do is to use rdd.foreachPartition. Inside the function supplied to RDD.foreachPartition, create the producer, send the whole partition, and close the producer. Am an phone so I am not able to generate Java code. TD On Mon, Feb 2, 2015 at 11:38 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Here you go: JavaDStreamString textStream = ssc.textFileStream(/home/akhld/sigmoid/); textStream.foreachRDD(new FunctionJavaRDDString,Void() { @Override public Void call(JavaRDDString rdd) throws Exception { // TODO Auto-generated method stub rdd.foreach(new VoidFunctionString(){ @Override public void call(String stringData) throws Exception { // Use this data! System.out.println(W00t!! Data : + stringData); } }); return null; } }); Thanks Best Regards On Sun, Feb 1, 2015 at 9:08 PM, sachin Singh sachin.sha...@gmail.com wrote: Hi I want to send streaming data to kafka topic, I am having RDD data which I converted in JavaDStream ,now I want to send it to kafka topic, I don't want kafka sending code, just I need foreachRDD implementation, my code is look like as public void publishtoKafka(ITblStream t) { MyTopicProducer MTP = ProducerFactory.createProducer(hostname+:+port); JavaDStream? rdd = (JavaDStream?) t.getRDD(); rdd.foreachRDD(new FunctionString, String() { @Override public Void call(JavaRDDString rdd) throws Exception { KafkaUtils.sendDataAsString(MTP,topicName, String RDDData); return null; } }); log.debug(sent to kafka: --); } here myTopicproducer will create producer which is working fine KafkaUtils.sendDataAsString is method which will publish data to kafka topic is also working fine, I have only one problem I am not able to convert JavaDStream rdd as string using foreach or foreachRDD finally I need String message from rdds, kindly suggest java code only and I dont want to use anonymous classes, Please send me only the part to send JavaDStream RDD using foreachRDD using Function Call Thanks in advance, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-send-JavaDStream-RDD-using-foreachRDD-using-Java-tp21456.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Error when Spark streaming consumes from Kafka
This is an issue that is hard to resolve without rearchitecting the whole Kafka Receiver. There are some workarounds worth looking into. http://mail-archives.apache.org/mod_mbox/kafka-users/201312.mbox/%3CCAFbh0Q38qQ0aAg_cj=jzk-kbi8xwf+1m6xlj+fzf6eetj9z...@mail.gmail.com%3E On Mon, Feb 2, 2015 at 1:07 PM, Greg Temchenko s...@dicefield.com wrote: Hi, This seems not fixed yet. I filed an issue in jira: https://issues.apache.org/jira/browse/SPARK-5505 Greg -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-Spark-streaming-consumes-from-Kafka-tp19570p21471.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Build error
That is a known issue uncovered last week. It fails on certain environments, not on Jenkins which is our testing environment. There is already a PR up to fix it. For now you can build using mvn package -DskipTests TD On Fri, Jan 30, 2015 at 8:59 PM, Andrew Musselman andrew.mussel...@gmail.com wrote: Off master, got this error; is that typical? --- T E S T S --- Running org.apache.spark.streaming.mqtt.JavaMQTTStreamSuite Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 2.495 sec - in org.apache.spark.streaming.mqtt.JavaMQTTStreamSuite Results : Tests run: 1, Failures: 0, Errors: 0, Skipped: 0 [INFO] [INFO] --- scalatest-maven-plugin:1.0:test (test) @ spark-streaming-mqtt_2.10 --- Discovery starting. Discovery completed in 498 milliseconds. Run starting. Expected test count is: 1 MQTTStreamSuite: - mqtt input stream *** FAILED *** org.eclipse.paho.client.mqttv3.MqttException: Too many publishes in progress at org.eclipse.paho.client.mqttv3.internal.ClientState.send(ClientState.java:432) at org.eclipse.paho.client.mqttv3.internal.ClientComms.internalSend(ClientComms.java:121) at org.eclipse.paho.client.mqttv3.internal.ClientComms.sendNoWait(ClientComms.java:139) at org.eclipse.paho.client.mqttv3.MqttTopic.publish(MqttTopic.java:107) at org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$publishData$1.apply(MQTTStreamSuite.scala:125) at org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$publishData$1.apply(MQTTStreamSuite.scala:124) at scala.collection.immutable.Range.foreach(Range.scala:141) at org.apache.spark.streaming.mqtt.MQTTStreamSuite.publishData(MQTTStreamSuite.scala:124) at org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$3.apply$mcV$sp(MQTTStreamSuite.scala:78) at org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$3.apply(MQTTStreamSuite.scala:66) ... Exception in thread Thread-20 org.apache.spark.SparkException: Job cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:690) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:689) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:689) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1384) at org.apache.spark.util.EventLoop.stop(EventLoop.scala:81) at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1319) at org.apache.spark.SparkContext.stop(SparkContext.scala:1250) at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:510) at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:485) at org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$2.apply$mcV$sp(MQTTStreamSuite.scala:59) at org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$2.apply(MQTTStreamSuite.scala:57) at org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$2.apply(MQTTStreamSuite.scala:57) at org.scalatest.BeforeAndAfter$class.runTest(BeforeAndAfter.scala:210) at org.apache.spark.streaming.mqtt.MQTTStreamSuite.runTest(MQTTStreamSuite.scala:38) at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208) at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401) at scala.collection.immutable.List.foreach(List.scala:318) at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) at org.scalatest.SuperEngine.org $scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396) at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483) at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208) at org.scalatest.FunSuite.runTests(FunSuite.scala:1555) at org.scalatest.Suite$class.run(Suite.scala:1424) at org.scalatest.FunSuite.org $scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555) at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212) at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212) at org.scalatest.SuperEngine.runImpl(Engine.scala:545) at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212) at org.apache.spark.streaming.mqtt.MQTTStreamSuite.org $scalatest$BeforeAndAfter$$super$run(MQTTStreamSuite.scala:38) at org.scalatest.BeforeAndAfter$class.run(BeforeAndAfter.scala:241) at org.apache.spark.streaming.mqtt.MQTTStreamSuite.run(MQTTStreamSuite.scala:38) at
Re: reduceByKeyAndWindow, but using log timestamps instead of clock seconds
Ohhh nice! Would be great if you can share us some code soon. It is indeed a very complicated problem and there is probably no single solution that fits all usecases. So having one way of doing things would be a great reference. Looking forward to that! On Wed, Jan 28, 2015 at 4:52 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Thu, Jan 29, 2015 at 1:54 AM, YaoPau jonrgr...@gmail.com wrote: My thinking is to maintain state in an RDD and update it an persist it with each 2-second pass, but this also seems like it could get messy. Any thoughts or examples that might help me? I have just implemented some timestamp-based windowing on DStreams (can't share the code now, but will be published a couple of months ahead), although with the assumption that items are in correct order. The main challenge (rather technical) was to keep proper state across RDD boundaries and to tell the state you can mark this partial window from the last interval as 'complete' now without shuffling too much data around. For example, if there are some empty intervals, you don't know when the next item to go into the partial window will arrive, or if there will be one at all. I guess if you want to have out-of-order tolerance, that will become even trickier, as you need to define and think about some timeout for partial windows in your state... Tobias - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Error reporting/collecting for users
You could use foreachRDD to do the operations and then inside the foreach create an accumulator to gather all the errors together dstream.foreachRDD { rdd = val accumulator = new Accumulator[] rdd.map { . }.count // whatever operation that is error prone // gather all errors from accumulator and show on webpage, push to database? } On Tue, Jan 27, 2015 at 9:34 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Wed, Jan 28, 2015 at 1:45 PM, Soumitra Kumar kumar.soumi...@gmail.com wrote: It is a Streaming application, so how/when do you plan to access the accumulator on driver? Well... maybe there would be some user command or web interface showing the errors that have happened during processing...? Thanks Tobias - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to make spark partition sticky, i.e. stay with node?
Hello mingyu, That is a reasonable way of doing this. Spark Streaming natively does not support sticky because Spark launches tasks based on data locality. If there is no locality (example reduce tasks can run anywhere), location is randomly assigned. So the cogroup or join introduces a locality and which forces Spark scheduler to be sticky. Another way to achieve this is using updateStateByKey which internally uses cogroup, but presents a nicer streaming-like API for per-key stateful operations. TD On Fri, Jan 23, 2015 at 8:23 AM, mingyu mingyut...@gmail.com wrote: I found a workaround. I can make my auxiliary data a RDD. Partition it and cache it. Later, I can cogroup it with other RDDs and Spark will try to keep the cached RDD partitions where they are and not shuffle them. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-make-spark-partition-sticky-i-e-stay-with-node-tp21322p21338.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Are these numbers abnormal for spark streaming?
This is not normal. Its a huge scheduling delay!! Can you tell me more about the application? - cluser setup, number of receivers, whats the computation, etc. On Thu, Jan 22, 2015 at 3:11 AM, Ashic Mahtab as...@live.com wrote: Hate to do this...but...erm...bump? Would really appreciate input from others using Streaming. Or at least some docs that would tell me if these are expected or not. -- From: as...@live.com To: user@spark.apache.org Subject: Are these numbers abnormal for spark streaming? Date: Wed, 21 Jan 2015 11:26:31 + Hi Guys, I've got Spark Streaming set up for a low data rate system (using spark's features for analysis, rather than high throughput). Messages are coming in throughout the day, at around 1-20 per second (finger in the air estimate...not analysed yet). In the spark streaming UI for the application, I'm getting the following after 17 hours. Streaming - *Started at: *Tue Jan 20 16:58:43 GMT 2015 - *Time since start: *18 hours 24 minutes 34 seconds - *Network receivers: *2 - *Batch interval: *2 seconds - *Processed batches: *16482 - *Waiting batches: *1 Statistics over last 100 processed batchesReceiver Statistics - Receiver - Status - Location - Records in last batch - [2015/01/21 11:23:18] - Minimum rate - [records/sec] - Median rate - [records/sec] - Maximum rate - [records/sec] - Last Error RmqReceiver-0ACTIVEF 144727-RmqReceiver-1ACTIVEBR 124726- Batch Processing Statistics MetricLast batchMinimum25th percentileMedian75th percentileMaximumProcessing Time3 seconds 994 ms157 ms4 seconds 16 ms4 seconds 961 ms5 seconds 3 ms5 seconds 171 msScheduling Delay9 hours 15 minutes 4 seconds9 hours 10 minutes 54 seconds9 hours 11 minutes 56 seconds9 hours 12 minutes 57 seconds9 hours 14 minutes 5 seconds9 hours 15 minutes 4 secondsTotal Delay9 hours 15 minutes 8 seconds9 hours 10 minutes 58 seconds9 hours 12 minutes9 hours 13 minutes 2 seconds9 hours 14 minutes 10 seconds9 hours 15 minutes 8 seconds Are these normal. I was wondering what the scheduling delay and total delay terms are, and if it's normal for them to be 9 hours. I've got a standalone spark master and 4 spark nodes. The streaming app has been given 4 cores, and it's using 1 core per worker node. The streaming app is submitted from a 5th machine, and that machine has nothing but the driver running. The worker nodes are running alongside Cassandra (and reading and writing to it). Any insights would be appreciated. Regards, Ashic.
Re: Big performance difference between client and cluster deployment mode; is this expected?
Whats your spark-submit commands in both cases? Is it Spark Standalone or YARN (both support client and cluster)? Accordingly what is the number of executors/cores requested? TD On Wed, Dec 31, 2014 at 10:36 AM, Enno Shioji eshi...@gmail.com wrote: Also the job was deployed from the master machine in the cluster. ᐧ On Wed, Dec 31, 2014 at 6:35 PM, Enno Shioji eshi...@gmail.com wrote: Oh sorry that was a edit mistake. The code is essentially: val msgStream = kafkaStream .map { case (k, v) = v} .map(DatatypeConverter.printBase64Binary) .saveAsTextFile(s3n://some.bucket/path, classOf[LzoCodec]) I.e. there is essentially no original code (I was calling saveAsTextFile in a save function but that was just a remnant from previous debugging). ᐧ On Wed, Dec 31, 2014 at 6:21 PM, Sean Owen so...@cloudera.com wrote: -dev, +user A decent guess: Does your 'save' function entail collecting data back to the driver? and are you running this from a machine that's not in your Spark cluster? Then in client mode you're shipping data back to a less-nearby machine, compared to with cluster mode. That could explain the bottleneck. On Wed, Dec 31, 2014 at 4:12 PM, Enno Shioji eshi...@gmail.com wrote: Hi, I have a very, very simple streaming job. When I deploy this on the exact same cluster, with the exact same parameters, I see big (40%) performance difference between client and cluster deployment mode. This seems a bit surprising.. Is this expected? The streaming job is: val msgStream = kafkaStream .map { case (k, v) = v} .map(DatatypeConverter.printBase64Binary) .foreachRDD(save) .saveAsTextFile(s3n://some.bucket/path, classOf[LzoCodec]) I tried several times, but the job deployed with client mode can only write at 60% throughput of the job deployed with cluster mode and this happens consistently. I'm logging at INFO level, but my application code doesn't log anything so it's only Spark logs. The logs I see in client mode doesn't seem like a crazy amount. The setup is: spark-ec2 [...] \ --copy-aws-credentials \ --instance-type=m3.2xlarge \ -s 2 launch test_cluster And all the deployment was done from the master machine. ᐧ
Re: Spark Streaming: HiveContext within Custom Actor
I am not sure that can be done. Receivers are designed to be run only on the executors/workers, whereas a SQLContext (for using Spark SQL) can only be defined on the driver. On Mon, Dec 29, 2014 at 6:45 PM, sranga sra...@gmail.com wrote: Hi Could Spark-SQL be used from within a custom actor that acts as a receiver for a streaming application? If yes, what is the recommended way of passing the SparkContext to the actor? Thanks for your help. - Ranga -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-HiveContext-within-Custom-Actor-tp20892.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: word count aggregation
For windows that large (1 hour), you will probably also have to increase the batch interval for efficiency. TD On Mon, Dec 29, 2014 at 12:16 AM, Akhil Das ak...@sigmoidanalytics.com wrote: You can use reduceByKeyAndWindow for that. Here's a pretty clean example https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala Thanks Best Regards On Mon, Dec 29, 2014 at 1:30 PM, Hoai-Thu Vuong thuv...@gmail.com wrote: dear user of spark I've got a program, streaming a folder, when a new file is created in this folder, I count a word, which appears in this document and update it (I used StatefulNetworkWordCount to do it). And it work like charm. However, I would like to know the different of top 10 word at now and at time (one hour before). How could I do it? I try to use windowDuration, but it seem not work. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Kafka + Spark streaming
1. Of course, a single block / partition has many Kafka messages, and from different Kafka topics interleaved together. The message count is not related to the block count. Any message received within a particular block interval will go in the same block. 2. Yes, the receiver will be started on another worker. TD On Tue, Dec 30, 2014 at 2:19 PM, SamyaMaiti samya.maiti2...@gmail.com wrote: Hi Experts, Few general Queries : 1. Can a single block/partition in a RDD have more than 1 kafka message? or there will be one only one kafka message per block? In a more broader way, is the message count related to block in any way or its just that any message received with in a particular block interval will go in the same block. 2. If a worker goes down which runs the Receiver for Kafka, Will the receiver be restarted on some other worker? Regards, Sam -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Spark-streaming-tp20914.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SPARK-streaming app running 10x slower on YARN vs STANDALONE cluster
Thats is kind of expected due to data locality. Though you should see some tasks running on the executors as the data gets replicated to other nodes and can therefore run tasks based on locality. You have two solutions 1. kafkaStream.repartition() to explicitly repartition the received data across the cluster. 2. Create multiple kafka streams and union them together. See http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-processing-time-of-each-batch On Tue, Dec 30, 2014 at 1:43 AM, Mukesh Jha me.mukesh@gmail.com wrote: Thanks Sandy, It was the issue with the no of cores. Another issue I was facing is that tasks are not getting distributed evenly among all executors and are running on the NODE_LOCAL locality level i.e. all the tasks are running on the same executor where my kafkareceiver(s) are running even though other executors are idle. I configured spark.locality.wait=50 instead of the default 3000 ms, which forced the task rebalancing among nodes, let me know if there is a better way to deal with this. On Tue, Dec 30, 2014 at 12:09 AM, Mukesh Jha me.mukesh@gmail.com wrote: Makes sense, I've also tries it in standalone mode where all 3 workers driver were running on the same 8 core box and the results were similar. Anyways I will share the results in YARN mode with 8 core yarn containers. On Mon, Dec 29, 2014 at 11:58 PM, Sandy Ryza sandy.r...@cloudera.com wrote: When running in standalone mode, each executor will be able to use all 8 cores on the box. When running on YARN, each executor will only have access to 2 cores. So the comparison doesn't seem fair, no? -Sandy On Mon, Dec 29, 2014 at 10:22 AM, Mukesh Jha me.mukesh@gmail.com wrote: Nope, I am setting 5 executors with 2 cores each. Below is the command that I'm using to submit in YARN mode. This starts up 5 executor nodes and a drives as per the spark application master UI. spark-submit --master yarn-cluster --num-executors 5 --driver-memory 1024m --executor-memory 1024m --executor-cores 2 --class com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar vm.cloud.com:2181/kafka spark-yarn avro 1 5000 On Mon, Dec 29, 2014 at 11:45 PM, Sandy Ryza sandy.r...@cloudera.com wrote: *oops, I mean are you setting --executor-cores to 8 On Mon, Dec 29, 2014 at 10:15 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Are you setting --num-executors to 8? On Mon, Dec 29, 2014 at 10:13 AM, Mukesh Jha me.mukesh@gmail.com wrote: Sorry Sandy, The command is just for reference but I can confirm that there are 4 executors and a driver as shown in the spark UI page. Each of these machines is a 8 core box with ~15G of ram. On Mon, Dec 29, 2014 at 11:23 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Mukesh, Based on your spark-submit command, it looks like you're only running with 2 executors on YARN. Also, how many cores does each machine have? -Sandy On Mon, Dec 29, 2014 at 4:36 AM, Mukesh Jha me.mukesh@gmail.com wrote: Hello Experts, I'm bench-marking Spark on YARN (https://spark.apache.org/docs/latest/running-on-yarn.html) vs a standalone spark cluster (https://spark.apache.org/docs/latest/spark-standalone.html). I have a standalone cluster with 3 executors, and a spark app running on yarn with 4 executors as shown below. The spark job running inside yarn is 10x slower than the one running on the standalone cluster (even though the yarn has more number of workers), also in both the case all the executors are in the same datacenter so there shouldn't be any latency. On YARN each 5sec batch is reading data from kafka and processing it in 5sec on the standalone cluster each 5sec batch is getting processed in 0.4sec. Also, In YARN mode all the executors are not getting used up evenly as vm-13 vm-14 are running most of the tasks whereas in the standalone mode all the executors are running the tasks. Do I need to set up some configuration to evenly distribute the tasks? Also do you have any pointers on the reasons the yarn job is 10x slower than the standalone job? Any suggestion is greatly appreciated, Thanks in advance. YARN(5 workers + driver) Executor ID Address RDD Blocks Memory Used DU AT FT CT TT TT Input ShuffleRead ShuffleWrite Thread Dump 1 vm-18.cloud.com:51796 0 0.0B/530.3MB 0.0 B 1 0 16 17 634 ms 0.0 B 2047.0 B 1710.0 B Thread Dump 2 vm-13.cloud.com:57264 0 0.0B/530.3MB 0.0 B 0 0 1427 1427 5.5 m 0.0 B 0.0 B 0.0 B Thread Dump 3 vm-14.cloud.com:54570 0 0.0B/530.3MB 0.0 B 0 0 1379 1379 5.2 m 0.0 B 1368.0 B 2.8 KB Thread Dump 4 vm-11.cloud.com:56201 0 0.0B/530.3MB 0.0 B 0 0 10 10 625 ms 0.0 B 1368.0 B 1026.0 B Thread Dump 5 vm-5.cloud.com:42958 0 0.0B/530.3MB 0.0 B 0 0 22 22 632 ms 0.0 B 1881.0 B 2.8 KB Thread Dump driver vm.cloud.com:51847 0 0.0B/530.0MB 0.0 B 0 0 0 0 0 ms 0.0 B 0.0 B 0.0 B Thread Dump /homext/spark/bin/spark-submit --master yarn-cluster
Re: Gradual slow down of the Streaming job (getCallSite at DStream.scala:294)
Which version of Spark Streaming are you using. When the batch processing time increases to 15-20 seconds, could you compare the task times compared to the tasks time when the application is just launched? Basically is the increase from 6 seconds to 15-20 seconds is caused by increase in computation or because of GC's etc. On Tue, Dec 30, 2014 at 1:41 PM, RK prk...@yahoo.com.invalid wrote: Here is the code for my streaming job. ~~ val sparkConf = new SparkConf().setAppName(SparkStreamingJob) sparkConf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) sparkConf.set(spark.default.parallelism, 100) sparkConf.set(spark.shuffle.consolidateFiles, true) sparkConf.set(spark.speculation, true) sparkConf.set(spark.speculation.interval, 5000) sparkConf.set(spark.speculation.quantile, 0.9) sparkConf.set(spark.speculation.multiplier, 3) sparkConf.set(spark.mesos.coarse, true) sparkConf.set(spark.executor.extraJavaOptions, -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseConcMarkSweepGC) sparkConf.set(spark.shuffle.manager, SORT) val ssc = new StreamingContext(sparkConf, Seconds(10)) ssc.checkpoint(checkpointDir) val topics = trace val numThreads = 1 val topicMap = topics.split(,).map((_,numThreads)).toMap val kafkaPartitions = 20 val kafkaDStreams = (1 to kafkaPartitions).map { _ = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) } val lines = ssc.union(kafkaDStreams) val words = lines.map(line = doSomething_1(line)) val filteredWords = words.filter(word = word != test) val groupedWords = filteredWords.map(word = (word, 1)) val windowedWordCounts = groupedWords.reduceByKeyAndWindow(_ + _, _ - _, Seconds(30), Seconds(10)) val windowedWordsFiltered = windowedWordCounts.filter{case (word, count) = count 50} val finalResult = windowedWordsFiltered.foreachRDD(words = doSomething_2(words)) ssc.start() ssc.awaitTermination() ~~ I am running this job on a 9 slave AWS EC2 cluster with each slave node has 32 vCPU 60GB memory. When I start this job, the processing time is usually around 5 - 6 seconds for the 10 seconds batch and the scheduling delay is around 0 seconds or a few ms. However, as the job run for 6 - 8 hours, the processing time increases to 15 - 20 seconds but the scheduling delay is increasing to 4 - 6 hours. When I look at the completed stages, I see that the time taken for getCallSite at DStream.scala:294 keeps increasing as time passes by. It goes from around 2 seconds to more than a few minutes. Clicking on +details next to this stage description shows the following execution trace. org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1088) org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:294) org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:288) scala.Option.orElse(Option.scala:257) org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:285) org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116) org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221) org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221) scala.util.Try$.apply(Try.scala:161) org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:221) org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:165) When I click on one of these slow stages that executed after 6 - 8 hours, I find the following information for individual tasks inside. - All tasks seem to execute with PROCESS_LOCAL locality. - Quite a few of these tasks seem to spend anywhere between 30 - 80% of their time in GC. Although, when I look at the total memory usage on each of the slave nodes under executors information, I see that the usage is only around 200MB out of 20GB available. Even after a few hours, the map stages (val groupedWords = filteredWords.map(word = (word, 1))) seem to have consistent times as during the start of the job which seems to indicate
Re: Help with updateStateByKey
Another point to start playing with updateStateByKey is the example StatefulNetworkWordCount. See the streaming examples directory in the Spark repository. TD On Thu, Dec 18, 2014 at 6:07 AM, Pierce Lamb richard.pierce.l...@gmail.com wrote: I am trying to run stateful Spark Streaming computations over (fake) apache web server logs read from Kafka. The goal is to sessionize the web traffic similar to this blog post: http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionization-with-spark-streaming-and-apache-hadoop/ The only difference is that I want to sessionize each page the IP hits, instead of the entire session. I was able to do this reading from a file of fake web traffic using Spark in batch mode, but now I want to do it in a streaming context. Log files are read from Kafka and parsed into K/V pairs of (String, (String, Long, Long)) or (IP, (requestPage, time, time)) I then call groupByKey() on this K/V pair. In batch mode, this would produce a: (String, CollectionBuffer((String, Long, Long), ...) or (IP, CollectionBuffer((requestPage, time, time), ...) In a StreamingContext, it produces a: (String, ArrayBuffer((String, Long, Long), ...) like so: (183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000))) However, as the next microbatch (DStream) arrives, this information is discarded. Ultimately what I want is for that ArrayBuffer to fill up over time as a given IP continues to interact and to run some computations on its data to sessionize the page time. I believe the operator to make that happen is updateStateByKey. I'm having some trouble with this operator (I'm new to both Spark Scala); any help is appreciated. Thus far: val grouped = ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey) def updateGroupByKey( a: Seq[(String, ArrayBuffer[(String, Long, Long)])], b: Option[(String, ArrayBuffer[(String, Long, Long)])] ): Option[(String, ArrayBuffer[(String, Long, Long)])] = { } - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming Python APIs?
A more updated version of the streaming programming guide is here http://people.apache.org/~tdas/spark-1.2-temp/streaming-programming-guide.html Please refer to this until we make the official release of Spark 1.2 TD On Tue, Dec 16, 2014 at 3:50 PM, smallmonkey...@hotmail.com smallmonkey...@hotmail.com wrote: Hi zhu: maybe there is not the python api for spark-stream baishuo smallmonkey...@hotmail.com From: Xiaoyong Zhu Date: 2014-12-15 10:52 To: user@spark.apache.org Subject: Spark Streaming Python APIs? Hi spark experts Are there any Python APIs for Spark Streaming? I didn’t find the Python APIs in Spark Streaming programming guide.. http://spark.apache.org/docs/latest/streaming-programming-guide.html Xiaoyong - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Read data from SparkStreaming from Java socket.
Yes, socketTextStream starts a TCP client that tries to connect to a TCP server (localhost: in your case). If there is a server running on that port that can send data to connected TCP connections, then you will receive data in the stream. Did you check out the quick example in the streaming programming guide? http://spark.apache.org/docs/latest/streaming-programming-guide.html That has instructions to start a netcat server on port and send data to spark streaming through that. TD On Fri, Dec 12, 2014 at 9:54 PM, Akhil Das ak...@sigmoidanalytics.com wrote: socketTextStream is Socket client which will read from a TCP ServerSocket. Thanks Best Regards On Fri, Dec 12, 2014 at 7:21 PM, Guillermo Ortiz konstt2...@gmail.com wrote: I dont' understand what spark streaming socketTextStream is waiting... is it like a server so you just have to send data from a client?? or what's it excepting? 2014-12-12 14:19 GMT+01:00 Akhil Das ak...@sigmoidanalytics.com: I have created a Serversocket program which you can find over here https://gist.github.com/akhld/4286df9ab0677a555087 It simply listens to the given port and when the client connects, it will send the contents of the given file. I'm attaching the executable jar also, you can run the jar as: java -jar SocketBenchmark.jar student 12345 io Here student is the file which will be sent to the client whoever connects on 12345, i have it tested and is working with SparkStreaming (socketTextStream). Thanks Best Regards On Fri, Dec 12, 2014 at 6:25 PM, Guillermo Ortiz konstt2...@gmail.com wrote: Hi, I'm a newbie with Spark,, I'm just trying to use SparkStreaming and filter some data sent with a Java Socket but it's not working... it works when I use ncat Why is it not working?? My sparkcode is just this: val sparkConf = new SparkConf().setMaster(local[2]).setAppName(Test) val ssc = new StreamingContext(sparkConf, Seconds(5)) val lines = ssc.socketTextStream(localhost, ) val errorLines = lines.filter(_.contains(hello)) errorLines.print() I created a client socket which sends data to that port, but it could connect any address, I guess that Spark doesn't work like a serverSocket... what's the way to send data from a socket with Java to be able to read from socketTextStream?? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Key not valid / already cancelled using Spark Streaming
Following Gerard's thoughts, here are possible things that could be happening. 1. Is there another process in the background that is deleting files in the directory where you are trying to write? Seems like the temporary file generated by one of the tasks is getting delete before it is renamed to the final output file. I suggest trying to not write to S3, rather just count and print (with rest of the computation staying exactly same) and see if the error still occurs. That would narrow down the culprit to what Gerard suggested. 2. Do you have speculative execution turned on? If so, could you turn it off and try? TD On Thu, Dec 11, 2014 at 1:42 AM, Gerard Maas gerard.m...@gmail.com wrote: If the timestamps in the logs are to be trusted It looks like your driver is dying with that java.io.FileNotFoundException: and therefore the workers loose their connection and close down. -kr, Gerard. On Thu, Dec 11, 2014 at 7:39 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Try to add the following to the sparkConf .set(spark.core.connection.ack.wait.timeout,6000) .set(spark.akka.frameSize,60) Used to face that issue with spark 1.1.0 Thanks Best Regards On Thu, Dec 11, 2014 at 2:14 AM, Flávio Santos bar...@chaordicsystems.com wrote: Dear Spark'ers, I'm trying to run a simple job using Spark Streaming (version 1.1.1) and YARN (yarn-cluster), but unfortunately I'm facing many issues. In short, my job does the following: - Consumes a specific Kafka topic - Writes its content to S3 or HDFS Records in Kafka are in the form: {key: someString} This is important because I use the value of key to define the output file name in S3. Here are the Spark and Kafka parameters I'm using: val sparkConf = new SparkConf() .setAppName(MyDumperApp) .set(spark.task.maxFailures, 100) .set(spark.hadoop.validateOutputSpecs, false) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.executor.extraJavaOptions, -XX:+UseCompressedOops) val kafkaParams = Map( zookeeper.connect - zkQuorum, zookeeper.session.timeout.ms - 1, rebalance.backoff.ms - 8000, rebalance.max.retries - 10, group.id - group, auto.offset.reset - largest ) My application is the following: KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Map(topic - 1), StorageLevel.MEMORY_AND_DISK_SER_2) .foreachRDD((rdd, time) = rdd.map { case (_, line) = val json = parse(line) val key = extract(json, key).getOrElse(key_not_found) (key, dateFormatter.format(time.milliseconds)) - line } .partitionBy(new HashPartitioner(10)) .saveAsHadoopFile[KeyBasedOutput[(String,String), String]](s3://BUCKET, classOf[BZip2Codec]) ) And the last piece: class KeyBasedOutput[T : Null, V : AnyRef] extends MultipleTextOutputFormat[T , V] { override protected def generateFileNameForKeyValue(key: T, value: V, leaf: String) = key match { case (myKey, batchId) = somedir + / + myKey + / + prefix- + myKey + _ + batchId + _ + leaf } override protected def generateActualKey(key: T, value: V) = null } I use batch sizes of 5 minutes with checkpoints activated. The job fails nondeterministically (I think it never ran longer than ~5 hours). I have no clue why, it simply fails. Please find below the exceptions thrown by my application. I really appreciate any kind of hint. Thank you very much in advance. Regards, -- Flávio Executor 1 2014-12-10 19:05:15,150 INFO [handle-read-write-executor-3] network.ConnectionManager (Logging.scala:logInfo(59)) - Removing SendingConnection to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163) 2014-12-10 19:05:15,201 INFO [Thread-6] storage.MemoryStore (Logging.scala:logInfo(59)) - ensureFreeSpace(98665) called with curMem=194463488, maxMem=4445479895 2014-12-10 19:05:15,202 INFO [Thread-6] storage.MemoryStore (Logging.scala:logInfo(59)) - Block input-0-1418238315000 stored as bytes in memor y (estimated size 96.4 KB, free 4.0 GB) 2014-12-10 19:05:16,506 INFO [handle-read-write-executor-2] network.ConnectionManager (Logging.scala:logInfo(59)) - Removing ReceivingConnecti on to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443) 2014-12-10 19:05:16,506 INFO [handle-read-write-executor-1] network.ConnectionManager (Logging.scala:logInfo(59)) - Removing SendingConnection to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443) 2014-12-10 19:05:16,506 INFO [handle-read-write-executor-2] network.ConnectionManager (Logging.scala:logInfo(59)) - Removing SendingConnection to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443) 2014-12-10 19:05:16,649 INFO [handle-read-write-executor-0] network.ConnectionManager (Logging.scala:logInfo(59)) - Removing ReceivingConnecti on to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644)
Re: Spark steaming : work with collect() but not without collect()
What does process do? Maybe when this process function is being run in the Spark executor, it is causing the some static initialization, which fails causing this exception. For Oracle documentation, an ExceptionInInitializerError is thrown to indicate that an exception occurred during evaluation of a static initializer or the initializer for a static variable. TD On Thu, Dec 11, 2014 at 1:36 AM, Gerard Maas gerard.m...@gmail.com wrote: Have you tried with kafkaStream.foreachRDD(rdd = {rdd.foreach(...)} ? Would that make a difference? On Thu, Dec 11, 2014 at 10:24 AM, david david...@free.fr wrote: Hi, We use the following Spark Streaming code to collect and process Kafka event : kafkaStream.foreachRDD(rdd = { rdd.collect().foreach(event = { process(event._1, event._2) }) }) This work fine. But without /collect()/ function, the following exception is raised for call to function process: *Loss was due to java.lang.ExceptionInInitializerError* We attempt to rewrite like this but the same exception is raised : kafkaStream.foreachRDD(rdd = { rdd.foreachPartition(iter = iter.foreach (event = { process(event._1, event._2) }) ) }) Does anybody can explain to us why and how to solve this issue ? Thank's Regards -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-steaming-work-with-collect-but-not-without-collect-tp20622.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Session for connections?
You could create a lazily initialized singleton factory and connection pool. Whenever an executor starts running the firt task that needs to push out data, it will create the connection pool as a singleton. And subsequent tasks running on the executor is going to use the connection pool. You will also have to intelligently shutdown the connections because there is not a obvious way to shut them down. You could have a usage timeout - shutdown connection after not being used for 10 x batch interval. TD On Thu, Dec 11, 2014 at 4:28 AM, Ashic Mahtab as...@live.com wrote: Hi, I was wondering if there's any way of having long running session type behaviour in spark. For example, let's say we're using Spark Streaming to listen to a stream of events. Upon receiving an event, we process it, and if certain conditions are met, we wish to send a message to rabbitmq. Now, rabbit clients have the concept of a connection factory, from which you create a connection, from which you create a channel. You use the channel to get a queue, and finally the queue is what you publish messages on. Currently, what I'm doing can be summarised as : dstream.foreachRDD(x = x.forEachPartition(y = { val factory = .. val connection = ... val channel = ... val queue = channel.declareQueue(...); y.foreach(z = Processor.Process(z, queue)); cleanup the queue stuff. })); I'm doing the same thing for using Cassandra, etc. Now in these cases, the session initiation is expensive, so foing it per message is not a good idea. However, I can't find a way to say hey...do this per worker once and only once. Is there a better pattern to do this? Regards, Ashic. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: KafkaUtils explicit acks
I am updating the docs right now. Here is a staged copy that you can have sneak peek of. This will be part of the Spark 1.2. http://people.apache.org/~tdas/spark-1.2-temp/streaming-programming-guide.html The updated fault-tolerance section tries to simplify the explanation of when and what data can be lost, and how to prevent that using the new experimental feature of write ahead logs. Any feedback will be much appreciated. TD On Wed, Dec 10, 2014 at 2:42 AM, francois.garil...@typesafe.com wrote: [sorry for the botched half-message] Hi Mukesh, There’s been some great work on Spark Streaming reliability lately. https://www.youtube.com/watch?v=jcJq3ZalXD8 Look at the links from: https://issues.apache.org/jira/browse/SPARK-3129 I’m not aware of any doc yet (did I miss something ?) but you can look at the ReliableKafkaReceiver’s test suite: external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala — FG On Wed, Dec 10, 2014 at 11:17 AM, Mukesh Jha me.mukesh@gmail.com wrote: Hello Guys, Any insights on this?? If I'm not clear enough my question is how can I use kafka consumer and not loose any data in cases of failures with spark-streaming. On Tue, Dec 9, 2014 at 2:53 PM, Mukesh Jha me.mukesh@gmail.com wrote: Hello Experts, I'm working on a spark app which reads data from kafka persists it in hbase. Spark documentation states the below [1] that in case of worker failure we can loose some data. If not how can I make my kafka stream more reliable? I have seen there is a simple consumer [2] but I'm not sure if it has been used/tested extensively. I was wondering if there is a way to explicitly acknowledge the kafka offsets once they are replicated in memory of other worker nodes (if it's not already done) to tackle this issue. Any help is appreciated in advance. Using any input source that receives data through a network - For network-based data sources like Kafka and Flume, the received input data is replicated in memory between nodes of the cluster (default replication factor is 2). So if a worker node fails, then the system can recompute the lost from the the left over copy of the input data. However, if the worker node where a network receiver was running fails, then a tiny bit of data may be lost, that is, the data received by the system but not yet replicated to other node(s). The receiver will be started on a different node and it will continue to receive data. https://github.com/dibbhatt/kafka-spark-consumer Txz, Mukesh Jha -- Thanks Regards, Mukesh Jha - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Session for connections?
Also, this is covered in the streaming programming guide in bits and pieces. http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd On Thu, Dec 11, 2014 at 4:55 AM, Ashic Mahtab as...@live.com wrote: That makes sense. I'll try that. Thanks :) From: tathagata.das1...@gmail.com Date: Thu, 11 Dec 2014 04:53:01 -0800 Subject: Re: Session for connections? To: as...@live.com CC: user@spark.apache.org You could create a lazily initialized singleton factory and connection pool. Whenever an executor starts running the firt task that needs to push out data, it will create the connection pool as a singleton. And subsequent tasks running on the executor is going to use the connection pool. You will also have to intelligently shutdown the connections because there is not a obvious way to shut them down. You could have a usage timeout - shutdown connection after not being used for 10 x batch interval. TD On Thu, Dec 11, 2014 at 4:28 AM, Ashic Mahtab as...@live.com wrote: Hi, I was wondering if there's any way of having long running session type behaviour in spark. For example, let's say we're using Spark Streaming to listen to a stream of events. Upon receiving an event, we process it, and if certain conditions are met, we wish to send a message to rabbitmq. Now, rabbit clients have the concept of a connection factory, from which you create a connection, from which you create a channel. You use the channel to get a queue, and finally the queue is what you publish messages on. Currently, what I'm doing can be summarised as : dstream.foreachRDD(x = x.forEachPartition(y = { val factory = .. val connection = ... val channel = ... val queue = channel.declareQueue(...); y.foreach(z = Processor.Process(z, queue)); cleanup the queue stuff. })); I'm doing the same thing for using Cassandra, etc. Now in these cases, the session initiation is expensive, so foing it per message is not a good idea. However, I can't find a way to say hey...do this per worker once and only once. Is there a better pattern to do this? Regards, Ashic. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Locking for shared RDDs
Aditya, I think you have the mental model of spark streaming a little off the mark. Unlike traditional streaming systems, where any kind of state is mutable, SparkStreaming is designed on Sparks immutable RDDs. Streaming data is received and divided into immutable blocks, then form immutable RDDs, and then transformations form new immutable RDDs. Its best that you first read the Spark paper and then the Spark Streaming paper to under the model. Once you understand that, you will realize that since everything is immutable, the question of consistency does not even arise :) TD On Mon, Dec 8, 2014 at 9:44 PM, Raghavendra Pandey raghavendra.pan...@gmail.com wrote: You don't need to worry about locks as such as one thread/worker is responsible exclusively for one partition of the RDD. You can use Accumulator variables that spark provides to get the state updates. On Mon Dec 08 2014 at 8:14:28 PM aditya.athalye adbrihadarany...@gmail.com wrote: I am relatively new to Spark. I am planning to use Spark Streaming for my OLAP use case, but I would like to know how RDDs are shared between multiple workers. If I need to constantly compute some stats on the streaming data, presumably shared state would have to updated serially by different spark workers. Is this managed by Spark automatically or does the application need to ensure distributed locks are acquired? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Locking-for-shared-RDDs-tp20578.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is there an efficient way to append new data to a registered Spark SQL Table?
First of all, how long do you want to keep doing this? The data is going to increase infinitely and without any bounds, its going to get too big for any cluster to handle. If all that is within bounds, then try the following. - Maintain a global variable having the current RDD storing all the log data. We are going to keep updating this variable. - Every batch interval, take new data and union it with the earlier unified RDD (in the global variable) and update the global variable. If you want sequel queries on this data, then you will have re-register this new RDD as the named table. - With this approach the number of partitions is going to increase rapidly. So periodically take the unified RDD and repartition it to a smaller set of partitions. This messes up the ordering of data, but you maybe fine with if your queries are order agnostic. Also, periodically, checkpoint this RDD, otherwise the lineage is going to grow indefinitely and everything will start getting slower. Hope this helps. TD On Mon, Dec 8, 2014 at 6:29 PM, Xuelin Cao xuelin...@yahoo.com.invalid wrote: Hi, I'm wondering whether there is an efficient way to continuously append new data to a registered spark SQL table. This is what I want: I want to make an ad-hoc query service to a json formated system log. Certainly, the system log is continuously generated. I will use spark streaming to connect the system log as my input, and I want to find a way to effectively append the new data into an existed spark SQL table. Further more, I want the whole table being cached in memory/tachyon. It looks like spark sql supports the INSERT method, but only for parquet file. In addition, it is inefficient to insert a single row every time. I do know that somebody build a similar system that I want (ad-hoc query service to a on growing system log). So, there must be an efficient way. Anyone knows? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org