A few high-level suggestions. 1. I recommend using the new Receiver API in almost-released Spark 1.0 (see branch-1.0 / master branch on github). Its a slightly better version of the earlier NetworkReceiver, as it hides away blockgenerator (which needed to be unnecessarily manually started and stopped) and add other lifecycle management methods like stop, restart, reportError to deal with errors in receiving data. Also, adds ability to write custom receiver from Java. Take a look at this example<https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala> of writing custom receiver in the new API. I am updating the custom receiver guide right now (https://github.com/apache/spark/pull/652).
2. Once you create a JMSReceiver class by extending NetworkReceiver/Receiver, you can create DStream out of the receiver by val jmsStream = ssc.networkStream(new JMSReceiver("....")) 3. As far as i understand from seeing the docs of akka,camel.Consumer<http://doc.akka.io/api/akka/2.3.2/index.html#akka.camel.Consumer>, it is essentially a specialized Akka actor. For Akka actors, there is a ssc.actorStream, where you can specify your own actor class. You get actor supervision (and therefore error handling, etc.) with that. See the example AkkaWordCount - old style using NetworkReceiver<https://github.com/apache/spark/blob/branch-0.9/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala>, or new style using Receiver<https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala> . I havent personally played around with JMS before so cant comment much on JMS specific intricacies. TD On Mon, May 5, 2014 at 5:31 AM, Patrick McGloin <mcgloin.patr...@gmail.com>wrote: > Hi all, > > Is there a "best practice" for subscribing to JMS with Spark Streaming? I > have searched but not found anything conclusive. > > In the absence of a standard practice the solution I was thinking of was > to use Akka + Camel (akka.camel.Consumer) to create a subscription for a > Spark Streaming Custom Receiver. So the actor would look something like > this: > > class JmsReceiver(jmsURI: String) extends NetworkReceiver[String] with > Consumer { > //e.g. "jms:sonicmq://localhost:2506/queue?destination=SampleQ1" > def endpointUri = jmsURI > lazy val blockGenerator = new > BlockGenerator(StorageLevel.MEMORY_ONLY_SER) > > protected override def onStart() { > blockGenerator.start > } > > def receive = { > case msg: CamelMessage => { blockGenerator += msg.body } > case _ => { /* ... */ } > } > > protected override def onStop() { > blockGenerator.stop > } > } > > And then in the main application create receivers like this: > > val ssc = new StreamingContext(...) > object tascQueue extends JmsReceiver[String](ssc) { > override def getReceiver():JmsReceiver[String] = { > new JmsReceiver("jms > :sonicmq://localhost:2506/queue?destination=TascQueue") > } > } > ssc.registerInputStream(tascQueue) > > Is this the best way to go? > > Best regards, > Patrick >