[ https://issues.apache.org/jira/browse/BAHIR-66?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16703987#comment-16703987 ]
ASF GitHub Bot commented on BAHIR-66: ------------------------------------- Github user lukasz-antoniak commented on a diff in the pull request: https://github.com/apache/bahir/pull/71#discussion_r237691415 --- Diff: streaming-zeromq/examples/src/main/scala/org/apache/spark/examples/streaming/zeromq/ZeroMQWordCount.scala --- @@ -15,105 +15,123 @@ * limitations under the License. */ -// scalastyle:off println awaitresult package org.apache.spark.examples.streaming.zeromq import scala.concurrent.Await import scala.concurrent.duration.Duration import scala.language.implicitConversions +import scala.util.Random -import akka.actor.ActorSystem -import akka.actor.actorRef2Scala -import akka.util.ByteString -import akka.zeromq._ -import akka.zeromq.Subscribe import org.apache.log4j.{Level, Logger} +import org.zeromq.ZContext +import org.zeromq.ZMQ +import org.zeromq.ZMQException +import org.zeromq.ZMsg import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} -import org.apache.spark.streaming.zeromq._ +import org.apache.spark.streaming.zeromq.ZeroMQUtils /** - * A simple publisher for demonstration purposes, repeatedly publishes random Messages - * every one second. + * Simple publisher for demonstration purposes, + * repeatedly publishes random messages every one second. */ object SimpleZeroMQPublisher { - def main(args: Array[String]): Unit = { if (args.length < 2) { - System.err.println("Usage: SimpleZeroMQPublisher <zeroMQUrl> <topic> ") + // scalastyle:off println + System.err.println("Usage: SimpleZeroMQPublisher <zeroMqUrl> <topic>") + // scalastyle:on println System.exit(1) } val Seq(url, topic) = args.toSeq - val acs: ActorSystem = ActorSystem() - - val pubSocket = ZeroMQExtension(acs).newSocket(SocketType.Pub, Bind(url)) - implicit def stringToByteString(x: String): ByteString = ByteString(x) - val messages: List[ByteString] = List("words ", "may ", "count ") - while (true) { - Thread.sleep(1000) - pubSocket ! ZMQMessage(ByteString(topic) :: messages) - } - Await.result(acs.whenTerminated, Duration.Inf) + val context = new ZContext + val socket = context.createSocket(ZMQ.PUB) + socket.bind(url) + + val zmqThread = new Thread(new Runnable { + def run() { + val messages = List("words", "may", "count infinitely") + val random = new Random + while (!Thread.currentThread.isInterrupted) { + try { + Thread.sleep(random.nextInt(1000)) + val msg1 = new ZMsg + msg1.add(topic.getBytes) + msg1.add(messages(random.nextInt(messages.size)).getBytes) + msg1.send(socket) + } catch { + case e: ZMQException if ZMQ.Error.ETERM.getCode == e.getErrorCode => + Thread.currentThread.interrupt() + case e: InterruptedException => + case e: Throwable => throw e + } + } + } + }) + + sys.addShutdownHook( { + context.destroy() + zmqThread.interrupt() + zmqThread.join() + } ) + + zmqThread.start() } } -// scalastyle:off /** - * A sample wordcount with ZeroMQStream stream - * - * To work with zeroMQ, some native libraries have to be installed. - * Install zeroMQ (release 2.1) core libraries. [ZeroMQ Install guide] - * (http://www.zeromq.org/intro:get-the-software) + * Sample word count with ZeroMQ stream. * - * Usage: ZeroMQWordCount <zeroMQurl> <topic> - * <zeroMQurl> and <topic> describe where zeroMq publisher is running. + * Usage: ZeroMQWordCount <zeroMqUrl> <topic> + * <zeroMqUrl> describes where ZeroMQ publisher is running + * <topic> defines logical message type * - * To run this example locally, you may run publisher as + * To run this example locally, you may start publisher as: * `$ bin/run-example \ * org.apache.spark.examples.streaming.zeromq.SimpleZeroMQPublisher tcp://127.0.0.1:1234 foo` - * and run the example as + * and run the example as: * `$ bin/run-example \ * org.apache.spark.examples.streaming.zeromq.ZeroMQWordCount tcp://127.0.0.1:1234 foo` */ -// scalastyle:on object ZeroMQWordCount { def main(args: Array[String]) { if (args.length < 2) { - System.err.println("Usage: ZeroMQWordCount <zeroMQurl> <topic>") + // scalastyle:off println + System.err.println("Usage: ZeroMQWordCount <zeroMqUrl> <topic>") + // scalastyle:on println System.exit(1) } - // Set logging level if log4j not configured (override by adding log4j.properties to classpath) - if (!Logger.getRootLogger.getAllAppenders.hasMoreElements) { - Logger.getRootLogger.setLevel(Level.WARN) - } + // Set logging level if log4j not configured (override by adding log4j.properties to classpath). + Logger.getRootLogger.setLevel(Level.WARN) val Seq(url, topic) = args.toSeq val sparkConf = new SparkConf().setAppName("ZeroMQWordCount") - // check Spark configuration for master URL, set it to local if not configured + // Check Spark configuration for master URL, set it to local if not present. if (!sparkConf.contains("spark.master")) { sparkConf.setMaster("local[2]") } - // Create the context and set the batch size + // Create the context and set the batch size. val ssc = new StreamingContext(sparkConf, Seconds(2)) - def bytesToStringIterator(x: Seq[ByteString]): Iterator[String] = x.map(_.utf8String).iterator + def bytesToString(bytes: Array[Array[Byte]]) = { + Seq(new String(bytes(1), zmq.ZMQ.CHARSET)) --- End diff -- Added test message converter supporting most common scenario. > Add test that ZeroMQ streaming connector can receive data > --------------------------------------------------------- > > Key: BAHIR-66 > URL: https://issues.apache.org/jira/browse/BAHIR-66 > Project: Bahir > Issue Type: Sub-task > Components: Spark Streaming Connectors > Reporter: Christian Kadner > Priority: Major > Labels: test > > Add test cases that verify that the *ZeroMQ streaming connector* can receive > streaming data. > See [BAHIR-63|https://issues.apache.org/jira/browse/BAHIR-63] -- This message was sent by Atlassian JIRA (v7.6.3#76005)