[ 
https://issues.apache.org/jira/browse/BAHIR-66?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701736#comment-16701736
 ] 

ASF GitHub Bot commented on BAHIR-66:
-------------------------------------

Github user lresende commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/71#discussion_r237029598
  
    --- Diff: 
streaming-zeromq/examples/src/main/scala/org/apache/spark/examples/streaming/zeromq/ZeroMQWordCount.scala
 ---
    @@ -15,105 +15,123 @@
      * limitations under the License.
      */
     
    -// scalastyle:off println awaitresult
     package org.apache.spark.examples.streaming.zeromq
     
     import scala.concurrent.Await
     import scala.concurrent.duration.Duration
     import scala.language.implicitConversions
    +import scala.util.Random
     
    -import akka.actor.ActorSystem
    -import akka.actor.actorRef2Scala
    -import akka.util.ByteString
    -import akka.zeromq._
    -import akka.zeromq.Subscribe
     import org.apache.log4j.{Level, Logger}
    +import org.zeromq.ZContext
    +import org.zeromq.ZMQ
    +import org.zeromq.ZMQException
    +import org.zeromq.ZMsg
     
     import org.apache.spark.SparkConf
     import org.apache.spark.streaming.{Seconds, StreamingContext}
    -import org.apache.spark.streaming.zeromq._
    +import org.apache.spark.streaming.zeromq.ZeroMQUtils
     
     /**
    - * A simple publisher for demonstration purposes, repeatedly publishes 
random Messages
    - * every one second.
    + * Simple publisher for demonstration purposes,
    + * repeatedly publishes random messages every one second.
      */
     object SimpleZeroMQPublisher {
    -
       def main(args: Array[String]): Unit = {
         if (args.length < 2) {
    -      System.err.println("Usage: SimpleZeroMQPublisher <zeroMQUrl> <topic> 
")
    +      // scalastyle:off println
    +      System.err.println("Usage: SimpleZeroMQPublisher <zeroMqUrl> 
<topic>")
    +      // scalastyle:on println
           System.exit(1)
         }
     
         val Seq(url, topic) = args.toSeq
    -    val acs: ActorSystem = ActorSystem()
    -
    -    val pubSocket = ZeroMQExtension(acs).newSocket(SocketType.Pub, 
Bind(url))
    -    implicit def stringToByteString(x: String): ByteString = ByteString(x)
    -    val messages: List[ByteString] = List("words ", "may ", "count ")
    -    while (true) {
    -      Thread.sleep(1000)
    -      pubSocket ! ZMQMessage(ByteString(topic) :: messages)
    -    }
    -    Await.result(acs.whenTerminated, Duration.Inf)
    +    val context = new ZContext
    +    val socket = context.createSocket(ZMQ.PUB)
    +    socket.bind(url)
    +
    +    val zmqThread = new Thread(new Runnable {
    +      def run() {
    +        val messages = List("words", "may", "count infinitely")
    +        val random = new Random
    +        while (!Thread.currentThread.isInterrupted) {
    +          try {
    +            Thread.sleep(random.nextInt(1000))
    +            val msg1 = new ZMsg
    +            msg1.add(topic.getBytes)
    +            msg1.add(messages(random.nextInt(messages.size)).getBytes)
    +            msg1.send(socket)
    +          } catch {
    +            case e: ZMQException if ZMQ.Error.ETERM.getCode == 
e.getErrorCode =>
    +              Thread.currentThread.interrupt()
    +            case e: InterruptedException =>
    +            case e: Throwable => throw e
    +          }
    +        }
    +      }
    +    })
    +
    +    sys.addShutdownHook( {
    +      context.destroy()
    +      zmqThread.interrupt()
    +      zmqThread.join()
    +    } )
    +
    +    zmqThread.start()
       }
     }
     
    -// scalastyle:off
     /**
    - * A sample wordcount with ZeroMQStream stream
    - *
    - * To work with zeroMQ, some native libraries have to be installed.
    - * Install zeroMQ (release 2.1) core libraries. [ZeroMQ Install guide]
    - * (http://www.zeromq.org/intro:get-the-software)
    + * Sample word count with ZeroMQ stream.
      *
    - * Usage: ZeroMQWordCount <zeroMQurl> <topic>
    - *   <zeroMQurl> and <topic> describe where zeroMq publisher is running.
    + * Usage: ZeroMQWordCount <zeroMqUrl> <topic>
    + *   <zeroMqUrl> describes where ZeroMQ publisher is running
    + *   <topic> defines logical message type
      *
    - * To run this example locally, you may run publisher as
    + * To run this example locally, you may start publisher as:
      *    `$ bin/run-example \
      *      org.apache.spark.examples.streaming.zeromq.SimpleZeroMQPublisher 
tcp://127.0.0.1:1234 foo`
    - * and run the example as
    + * and run the example as:
      *    `$ bin/run-example \
      *      org.apache.spark.examples.streaming.zeromq.ZeroMQWordCount 
tcp://127.0.0.1:1234 foo`
      */
    -// scalastyle:on
     object ZeroMQWordCount {
       def main(args: Array[String]) {
         if (args.length < 2) {
    -      System.err.println("Usage: ZeroMQWordCount <zeroMQurl> <topic>")
    +      // scalastyle:off println
    +      System.err.println("Usage: ZeroMQWordCount <zeroMqUrl> <topic>")
    +      // scalastyle:on println
           System.exit(1)
         }
     
    -    // Set logging level if log4j not configured (override by adding 
log4j.properties to classpath)
    -    if (!Logger.getRootLogger.getAllAppenders.hasMoreElements) {
    -      Logger.getRootLogger.setLevel(Level.WARN)
    -    }
    +    // Set logging level if log4j not configured (override by adding 
log4j.properties to classpath).
    +    Logger.getRootLogger.setLevel(Level.WARN)
     
         val Seq(url, topic) = args.toSeq
         val sparkConf = new SparkConf().setAppName("ZeroMQWordCount")
     
    -    // check Spark configuration for master URL, set it to local if not 
configured
    +    // Check Spark configuration for master URL, set it to local if not 
present.
         if (!sparkConf.contains("spark.master")) {
           sparkConf.setMaster("local[2]")
         }
     
    -    // Create the context and set the batch size
    +    // Create the context and set the batch size.
         val ssc = new StreamingContext(sparkConf, Seconds(2))
     
    -    def bytesToStringIterator(x: Seq[ByteString]): Iterator[String] = 
x.map(_.utf8String).iterator
    +    def bytesToString(bytes: Array[Array[Byte]]) = {
    +      Seq(new String(bytes(1), zmq.ZMQ.CHARSET))
    --- End diff --
    
    How about exposing a const from ZeroMQUtils instead of exposing something 
from the java api ? this would make it more flexible and independent. 


> Add test that ZeroMQ streaming connector can receive data
> ---------------------------------------------------------
>
>                 Key: BAHIR-66
>                 URL: https://issues.apache.org/jira/browse/BAHIR-66
>             Project: Bahir
>          Issue Type: Sub-task
>          Components: Spark Streaming Connectors
>            Reporter: Christian Kadner
>            Priority: Major
>              Labels: test
>
> Add test cases that verify that the *ZeroMQ streaming connector* can receive 
> streaming data.
> See [BAHIR-63|https://issues.apache.org/jira/browse/BAHIR-63]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to