A few high-level suggestions.

1. I recommend using the new Receiver API in almost-released Spark 1.0 (see
branch-1.0 / master branch on github). Its a slightly better version of the
earlier NetworkReceiver, as it hides away blockgenerator (which needed to
be unnecessarily manually started and stopped) and add other lifecycle
management methods like stop, restart, reportError to deal with errors in
receiving data. Also, adds ability to write custom receiver from Java. Take
a look at this 
example<https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala>
of
writing custom receiver in the new API. I am updating the custom receiver
guide right now (https://github.com/apache/spark/pull/652).

2. Once you create a JMSReceiver class by extending
NetworkReceiver/Receiver, you can create DStream out of the receiver by

val jmsStream = ssc.networkStream(new JMSReceiver("...."))

3. As far as i understand from seeing the docs of
akka,camel.Consumer<http://doc.akka.io/api/akka/2.3.2/index.html#akka.camel.Consumer>,
it is essentially a specialized Akka actor. For Akka actors, there is a
ssc.actorStream, where you can specify your own actor class. You get actor
supervision (and therefore error handling, etc.) with that. See the example
AkkaWordCount - old style using
NetworkReceiver<https://github.com/apache/spark/blob/branch-0.9/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala>,
or new style using
Receiver<https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala>
.

I havent personally played around with JMS before so cant comment much on
JMS specific intricacies.

TD



On Mon, May 5, 2014 at 5:31 AM, Patrick McGloin
<mcgloin.patr...@gmail.com>wrote:

> Hi all,
>
> Is there a "best practice" for subscribing to JMS with Spark Streaming?  I
> have searched but not found anything conclusive.
>
> In the absence of a standard practice the solution I was thinking of was
> to use Akka + Camel (akka.camel.Consumer) to create a subscription for a
> Spark Streaming Custom Receiver.  So the actor would look something like
> this:
>
> class JmsReceiver(jmsURI: String) extends NetworkReceiver[String] with
> Consumer {
>   //e.g. "jms:sonicmq://localhost:2506/queue?destination=SampleQ1"
>   def endpointUri = jmsURI
>   lazy val blockGenerator = new
> BlockGenerator(StorageLevel.MEMORY_ONLY_SER)
>
>   protected override def onStart() {
>     blockGenerator.start
>   }
>
>   def receive = {
>     case msg: CamelMessage => { blockGenerator += msg.body }
>     case _                 => { /* ... */ }
>   }
>
>   protected override def onStop() {
>     blockGenerator.stop
>   }
> }
>
> And then in the main application create receivers like this:
>
> val ssc = new StreamingContext(...)
> object tascQueue extends JmsReceiver[String](ssc) {
> override def getReceiver():JmsReceiver[String] = {
>  new JmsReceiver("jms
> :sonicmq://localhost:2506/queue?destination=TascQueue")
>  }
> }
> ssc.registerInputStream(tascQueue)
>
> Is this the best way to go?
>
> Best regards,
> Patrick
>

Reply via email to