Repository: bahir Updated Branches: refs/heads/master aecd5fd9f -> fb752570c
[BAHIR-66] Switch to Java binding for ZeroMQ Initially, I just wanted to implement integration test for BAHIR-66. Google pointed me to JeroMQ, which provides official ZeroMQ binding for Java and does not require native libraries. I have decided to give it a try, but quickly realized that akka-zeromq module (transient dependency from current Bahir master) is not compatible with JeroMQ. Actually Akka team also wanted to move to JeroMQ (akka/akka#13856), but in the end decided to remove akka-zeromq project completely (akka/akka#15864, https://www.lightbend.com/blog/akka-roadmap-update-2014). Having in mind that akka-zeromq does not support latest version of ZeroMQ protocol and further development may come delayed, I have decided to refactor streaming-zeromq implementation and leverage JeroMQ. With the change we receive various benefits, such as support for PUB-SUB and PUSH-PULL messaging patterns and the ability to bind the socket on whatever end of communication channel (see test cases), subscription to multiple channels, etc. JeroMQ seems pretty reliable and reconnection is handled out-of-the-box. Actually, we could even start the ZeroMQ subscriber trying to connect to remote socket before other end created and bound the socket. While I tried to preserve backward compatibility of method signatures, there was no easy way to support Akka API and business logic that users could put there (e.g. akka.actor.ActorSystem). Closes #71 Project: http://git-wip-us.apache.org/repos/asf/bahir/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/fb752570 Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/fb752570 Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/fb752570 Branch: refs/heads/master Commit: fb752570c7ac817b414c738e05b751dd5864feb6 Parents: aecd5fd Author: Lukasz Antoniak <[email protected]> Authored: Tue Nov 27 06:58:42 2018 -0800 Committer: Luciano Resende <[email protected]> Committed: Fri Nov 30 11:10:49 2018 +0100 ---------------------------------------------------------------------- streaming-zeromq/README.md | 17 +- .../streaming/zeromq/ZeroMQWordCount.scala | 112 ++++---- streaming-zeromq/pom.xml | 14 +- .../streaming/zeromq/ZeroMQInputDStream.scala | 123 +++++++++ .../spark/streaming/zeromq/ZeroMQReceiver.scala | 55 ---- .../spark/streaming/zeromq/ZeroMQUtils.scala | 190 ++++++------- .../streaming/LocalJavaStreamingContext.java | 4 +- .../streaming/zeromq/JavaZeroMQStreamSuite.java | 60 ++-- .../src/test/resources/log4j.properties | 9 +- .../streaming/zeromq/ZeroMQStreamSuite.scala | 272 ++++++++++++++++--- 10 files changed, 565 insertions(+), 291 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir/blob/fb752570/streaming-zeromq/README.md ---------------------------------------------------------------------- diff --git a/streaming-zeromq/README.md b/streaming-zeromq/README.md index 952bb4f..8ced539 100644 --- a/streaming-zeromq/README.md +++ b/streaming-zeromq/README.md @@ -1,3 +1,4 @@ +# Spark Streaming ZeroMQ Connector A library for reading data from [ZeroMQ](http://zeromq.org/) using Spark Streaming. @@ -27,13 +28,23 @@ This library is cross-published for Scala 2.10 and Scala 2.11, so users should r ## Examples +Review end-to-end examples at [ZeroMQ Examples](https://github.com/apache/bahir/tree/master/streaming-zeromq/examples). ### Scala API - val lines = ZeroMQUtils.createStream(ssc, ...) + import org.apache.spark.streaming.zeromq.ZeroMQUtils + + val lines = ZeroMQUtils.createTextStream( + ssc, "tcp://server:5555", true, Seq("my-topic".getBytes) + ) ### Java API - JavaDStream<String> lines = ZeroMQUtils.createStream(jssc, ...); + import org.apache.spark.storage.StorageLevel; + import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; + import org.apache.spark.streaming.zeromq.ZeroMQUtils; -See end-to-end examples at [ZeroMQ Examples](https://github.com/apache/bahir/tree/master/streaming-zeromq/examples) \ No newline at end of file + JavaReceiverInputDStream<String> test1 = ZeroMQUtils.createJavaStream( + ssc, "tcp://server:5555", true, Arrays.asList("my-topic.getBytes()), + StorageLevel.MEMORY_AND_DISK_SER_2() + ); \ No newline at end of file http://git-wip-us.apache.org/repos/asf/bahir/blob/fb752570/streaming-zeromq/examples/src/main/scala/org/apache/spark/examples/streaming/zeromq/ZeroMQWordCount.scala ---------------------------------------------------------------------- diff --git a/streaming-zeromq/examples/src/main/scala/org/apache/spark/examples/streaming/zeromq/ZeroMQWordCount.scala b/streaming-zeromq/examples/src/main/scala/org/apache/spark/examples/streaming/zeromq/ZeroMQWordCount.scala index 00fd815..24284ec 100644 --- a/streaming-zeromq/examples/src/main/scala/org/apache/spark/examples/streaming/zeromq/ZeroMQWordCount.scala +++ b/streaming-zeromq/examples/src/main/scala/org/apache/spark/examples/streaming/zeromq/ZeroMQWordCount.scala @@ -15,105 +15,117 @@ * limitations under the License. */ -// scalastyle:off println awaitresult package org.apache.spark.examples.streaming.zeromq -import scala.concurrent.Await -import scala.concurrent.duration.Duration import scala.language.implicitConversions +import scala.util.Random -import akka.actor.ActorSystem -import akka.actor.actorRef2Scala -import akka.util.ByteString -import akka.zeromq._ -import akka.zeromq.Subscribe import org.apache.log4j.{Level, Logger} +import org.zeromq.ZContext +import org.zeromq.ZMQ +import org.zeromq.ZMQException +import org.zeromq.ZMsg import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} -import org.apache.spark.streaming.zeromq._ +import org.apache.spark.streaming.zeromq.ZeroMQUtils /** - * A simple publisher for demonstration purposes, repeatedly publishes random Messages - * every one second. + * Simple publisher for demonstration purposes, + * repeatedly publishes random messages every one second. */ object SimpleZeroMQPublisher { - def main(args: Array[String]): Unit = { if (args.length < 2) { - System.err.println("Usage: SimpleZeroMQPublisher <zeroMQUrl> <topic> ") + // scalastyle:off println + System.err.println("Usage: SimpleZeroMQPublisher <zeroMqUrl> <topic>") + // scalastyle:on println System.exit(1) } val Seq(url, topic) = args.toSeq - val acs: ActorSystem = ActorSystem() - - val pubSocket = ZeroMQExtension(acs).newSocket(SocketType.Pub, Bind(url)) - implicit def stringToByteString(x: String): ByteString = ByteString(x) - val messages: List[ByteString] = List("words ", "may ", "count ") - while (true) { - Thread.sleep(1000) - pubSocket ! ZMQMessage(ByteString(topic) :: messages) - } - Await.result(acs.whenTerminated, Duration.Inf) + val context = new ZContext + val socket = context.createSocket(ZMQ.PUB) + socket.bind(url) + + val zmqThread = new Thread(new Runnable { + def run() { + val messages = List("words", "may", "count infinitely") + val random = new Random + while (!Thread.currentThread.isInterrupted) { + try { + Thread.sleep(random.nextInt(1000)) + val msg = new ZMsg + msg.add(topic.getBytes) + msg.add(messages(random.nextInt(messages.size)).getBytes) + msg.send(socket) + } catch { + case e: ZMQException if ZMQ.Error.ETERM.getCode == e.getErrorCode => + Thread.currentThread.interrupt() + case e: InterruptedException => + case e: Throwable => throw e + } + } + } + }) + + sys.addShutdownHook( { + context.destroy() + zmqThread.interrupt() + zmqThread.join() + } ) + + zmqThread.start() } } -// scalastyle:off /** - * A sample wordcount with ZeroMQStream stream - * - * To work with zeroMQ, some native libraries have to be installed. - * Install zeroMQ (release 2.1) core libraries. [ZeroMQ Install guide] - * (http://www.zeromq.org/intro:get-the-software) + * Sample word count with ZeroMQ stream. * - * Usage: ZeroMQWordCount <zeroMQurl> <topic> - * <zeroMQurl> and <topic> describe where zeroMq publisher is running. + * Usage: ZeroMQWordCount <zeroMqUrl> <topic> + * <zeroMqUrl> describes where ZeroMQ publisher is running + * <topic> defines logical message type * - * To run this example locally, you may run publisher as + * To run this example locally, you may start publisher as: * `$ bin/run-example \ * org.apache.spark.examples.streaming.zeromq.SimpleZeroMQPublisher tcp://127.0.0.1:1234 foo` - * and run the example as + * and run the example as: * `$ bin/run-example \ * org.apache.spark.examples.streaming.zeromq.ZeroMQWordCount tcp://127.0.0.1:1234 foo` */ -// scalastyle:on object ZeroMQWordCount { def main(args: Array[String]) { if (args.length < 2) { - System.err.println("Usage: ZeroMQWordCount <zeroMQurl> <topic>") + // scalastyle:off println + System.err.println("Usage: ZeroMQWordCount <zeroMqUrl> <topic>") + // scalastyle:on println System.exit(1) } - // Set logging level if log4j not configured (override by adding log4j.properties to classpath) - if (!Logger.getRootLogger.getAllAppenders.hasMoreElements) { - Logger.getRootLogger.setLevel(Level.WARN) - } + // Set logging level if log4j not configured (override by adding log4j.properties to classpath). + Logger.getRootLogger.setLevel(Level.WARN) val Seq(url, topic) = args.toSeq val sparkConf = new SparkConf().setAppName("ZeroMQWordCount") - // check Spark configuration for master URL, set it to local if not configured + // Check Spark configuration for master URL, set it to local if not present. if (!sparkConf.contains("spark.master")) { sparkConf.setMaster("local[2]") } - // Create the context and set the batch size + // Create the context and set the batch size. val ssc = new StreamingContext(sparkConf, Seconds(2)) - def bytesToStringIterator(x: Seq[ByteString]): Iterator[String] = x.map(_.utf8String).iterator - - // For this stream, a zeroMQ publisher should be running. - val lines = ZeroMQUtils.createStream( - ssc, - url, - Subscribe(topic), - bytesToStringIterator _) + val lines = ZeroMQUtils.createTextStream( + ssc, url, true, Seq(topic.getBytes) + ) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) + wordCounts.print() + ssc.start() ssc.awaitTermination() } } -// scalastyle:on println awaitresult + http://git-wip-us.apache.org/repos/asf/bahir/blob/fb752570/streaming-zeromq/pom.xml ---------------------------------------------------------------------- diff --git a/streaming-zeromq/pom.xml b/streaming-zeromq/pom.xml index b95c118..0587c6e 100644 --- a/streaming-zeromq/pom.xml +++ b/streaming-zeromq/pom.xml @@ -46,6 +46,11 @@ <scope>provided</scope> </dependency> <dependency> + <groupId>org.zeromq</groupId> + <artifactId>jeromq</artifactId> + <version>0.4.3</version> + </dependency> + <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${spark.version}</version> @@ -53,15 +58,6 @@ <scope>test</scope> </dependency> <dependency> - <groupId>org.apache.bahir</groupId> - <artifactId>spark-streaming-akka_${scala.binary.version}</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>${akka.group}</groupId> - <artifactId>akka-zeromq_${scala.binary.version}</artifactId> - </dependency> - <dependency> <groupId>org.scalacheck</groupId> <artifactId>scalacheck_${scala.binary.version}</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/bahir/blob/fb752570/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQInputDStream.scala ---------------------------------------------------------------------- diff --git a/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQInputDStream.scala b/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQInputDStream.scala new file mode 100644 index 0000000..ec2ec3a --- /dev/null +++ b/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQInputDStream.scala @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.zeromq + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +import org.zeromq.ZContext +import org.zeromq.ZMQ +import org.zeromq.ZMQException +import org.zeromq.ZMsg + +import org.apache.spark.internal.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.apache.spark.streaming.receiver.Receiver + +/** + * ZeroMQ receive stream. + */ +private[streaming] +class ZeroMQInputDStream[T: ClassTag]( + ssc: StreamingContext, + publisherUrl: String, + connect: Boolean, + topics: Seq[Array[Byte]], + bytesToObjects: Array[Array[Byte]] => Iterable[T], + storageLevel: StorageLevel) + extends ReceiverInputDStream[T](ssc) with Logging { + + override def getReceiver(): Receiver[T] = { + new ZeroMQReceiver(publisherUrl, connect, topics, bytesToObjects, storageLevel) + } +} + +private[zeromq] +class ZeroMQReceiver[T: ClassTag]( + publisherUrl: String, + connect: Boolean, + topics: Seq[Array[Byte]], + bytesToObjects: Array[Array[Byte]] => Iterable[T], + storageLevel: StorageLevel) + extends Receiver[T](storageLevel) { + + private var receivingThread: Thread = _ + + override def onStart(): Unit = { + receivingThread = new Thread("zeromq-receiver-" + publisherUrl) { + override def run() { + subscribe() + } + } + receivingThread.start() + } + + def subscribe(): Unit = { + val context = new ZContext + + // JeroMQ requires to create and destroy socket in the same thread. + // Socket API is not thread-safe. + val socket = context.createSocket(ZMQ.SUB) + topics.foreach(socket.subscribe) + socket.setReceiveTimeOut(1000) + if (connect) { + socket.connect(publisherUrl) + } else { + socket.bind(publisherUrl) + } + + try { + while (!isStopped()) { + receiveLoop(socket) + } + } finally { + // Context will take care of destructing all associated sockets. + context.close() + } + } + + def receiveLoop(socket: ZMQ.Socket): Unit = { + try { + val message = ZMsg.recvMsg(socket) + if (message != null) { + val frames = new ArrayBuffer[Array[Byte]] + message.asScala.foreach(f => frames.append(f.getData)) + bytesToObjects(frames.toArray).foreach(store) + } + } catch { + case e: ZMQException => + if (e.getErrorCode != zmq.ZError.ETERM + && e.getErrorCode != zmq.ZError.EINTR) { + // 1) Context was terminated. It means that we have just closed the context + // from a different thread, while trying to receive new message. + // Error is expected and can happen in normal situation. + // Reference: http://zguide.zeromq.org/java:interrupt. + // 2) System call interrupted. + throw e + } + } + } + + override def onStop(): Unit = { + receivingThread.join() + } +} + http://git-wip-us.apache.org/repos/asf/bahir/blob/fb752570/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala ---------------------------------------------------------------------- diff --git a/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala b/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala deleted file mode 100644 index 4f6f006..0000000 --- a/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.zeromq - -import scala.reflect.ClassTag - -import akka.util.ByteString -import akka.zeromq._ - -import org.apache.spark.internal.Logging -import org.apache.spark.streaming.akka.ActorReceiver - -/** - * A receiver to subscribe to ZeroMQ stream. - */ -private[streaming] class ZeroMQReceiver[T: ClassTag]( - publisherUrl: String, - subscribe: Subscribe, - bytesToObjects: Seq[ByteString] => Iterator[T]) - extends ActorReceiver with Logging { - - override def preStart(): Unit = { - ZeroMQExtension(context.system) - .newSocket(SocketType.Sub, Listener(self), Connect(publisherUrl), subscribe) - } - - def receive: Receive = { - - case Connecting => logInfo("connecting ...") - - case m: ZMQMessage => - logDebug("Received message for:" + m.frame(0)) - - // We ignore first frame for processing as it is the topic - val bytes = m.frames.tail - store(bytesToObjects(bytes)) - - case Closed => logInfo("received closed ") - } -} http://git-wip-us.apache.org/repos/asf/bahir/blob/fb752570/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala ---------------------------------------------------------------------- diff --git a/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala index 1784d6e..2f7b645 100644 --- a/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala +++ b/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala @@ -17,147 +17,127 @@ package org.apache.spark.streaming.zeromq +import java.lang.{Iterable => JIterable} +import java.util.{List => JList} + import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag -import akka.actor.{ActorSystem, Props, SupervisorStrategy} -import akka.util.ByteString -import akka.zeromq.Subscribe +import org.zeromq.ZMQ -import org.apache.spark.api.java.function.{Function => JFunction, Function0 => JFunction0} +import org.apache.spark.api.java.function.{Function => JFunction} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils} import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext} import org.apache.spark.streaming.dstream.ReceiverInputDStream object ZeroMQUtils { + val textMessageConverter: Array[Array[Byte]] => Iterable[String] = + (bytes: Array[Array[Byte]]) => { + // First frame typically contains topic name, so we skip it to extract only payload. + val result = new ArrayBuffer[String]() + for (i <- 1 until bytes.length) { + result.append(new String(bytes(i), ZMQ.CHARSET)) + } + result + } + /** - * Create an input stream that receives messages pushed by a zeromq publisher. - * @param ssc StreamingContext object - * @param publisherUrl Url of remote zeromq publisher - * @param subscribe Topic to subscribe to - * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic - * and each frame has sequence of byte thus it needs the converter - * (which might be deserializer of bytes) to translate from sequence - * of sequence of bytes, where sequence refer to a frame - * and sub sequence refer to its payload. - * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2. - * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will - * be shut down when the receiver is stopping (default: - * ActorReceiver.defaultActorSystemCreator) - * @param supervisorStrategy the supervisor strategy (default: ActorReceiver.defaultStrategy) + * Create an input stream that receives messages pushed by a ZeroMQ publisher. + * @param ssc Streaming context + * @param publisherUrl URL of remote ZeroMQ publisher + * @param connect When positive, connector will try to establish connectivity with remote server. + * Otherwise, it attempts to create and bind local socket. + * @param topics List of topics to subscribe + * @param messageConverter ZeroMQ stream publishes sequence of frames for each topic + * and each frame has sequence of byte thus it needs the converter + * (which might be deserializer of bytes) to translate from sequence + * of sequence of bytes, where sequence refer to a frame + * and sub sequence refer to its payload. First frame typically + * contains message envelope, which corresponds to topic name. + * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2. */ def createStream[T: ClassTag]( ssc: StreamingContext, publisherUrl: String, - subscribe: Subscribe, - bytesToObjects: Seq[ByteString] => Iterator[T], - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, - actorSystemCreator: () => ActorSystem = ActorReceiver.defaultActorSystemCreator, - supervisorStrategy: SupervisorStrategy = ActorReceiver.defaultSupervisorStrategy + connect: Boolean, + topics: Seq[Array[Byte]], + messageConverter: Array[Array[Byte]] => Iterable[T], + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): ReceiverInputDStream[T] = { - AkkaUtils.createStream( - ssc, - Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)), - "ZeroMQReceiver", - storageLevel, - actorSystemCreator, - supervisorStrategy) + ssc.withNamedScope("ZeroMQ stream") { + new ZeroMQInputDStream(ssc, publisherUrl, connect, topics, messageConverter, storageLevel) + } } /** - * Create an input stream that receives messages pushed by a zeromq publisher. - * @param jssc JavaStreamingContext object - * @param publisherUrl Url of remote ZeroMQ publisher - * @param subscribe Topic to subscribe to - * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each - * frame has sequence of byte thus it needs the converter(which might be - * deserializer of bytes) to translate from sequence of sequence of bytes, - * where sequence refer to a frame and sub sequence refer to its payload. - * @param storageLevel Storage level to use for storing the received objects - * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will - * be shut down when the receiver is stopping. - * @param supervisorStrategy the supervisor strategy (default: ActorReceiver.defaultStrategy) + * Create text input stream that receives messages pushed by a ZeroMQ publisher. + * @param ssc Streaming context + * @param publisherUrl URL of remote ZeroMQ publisher + * @param connect When positive, connector will try to establish connectivity with remote server. + * Otherwise, it attempts to create and bind local socket. + * @param topics List of topics to subscribe + * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2. */ - def createStream[T]( - jssc: JavaStreamingContext, + def createTextStream( + ssc: StreamingContext, publisherUrl: String, - subscribe: Subscribe, - bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], - storageLevel: StorageLevel, - actorSystemCreator: JFunction0[ActorSystem], - supervisorStrategy: SupervisorStrategy - ): JavaReceiverInputDStream[T] = { - implicit val cm: ClassTag[T] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - val fn = - (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala - createStream[T]( - jssc.ssc, - publisherUrl, - subscribe, - fn, - storageLevel, - () => actorSystemCreator.call(), - supervisorStrategy) + connect: Boolean, + topics: Seq[Array[Byte]], + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): ReceiverInputDStream[String] = { + createStream[String](ssc, publisherUrl, connect, topics, textMessageConverter, storageLevel) } /** - * Create an input stream that receives messages pushed by a zeromq publisher. - * @param jssc JavaStreamingContext object - * @param publisherUrl Url of remote zeromq publisher - * @param subscribe Topic to subscribe to - * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each - * frame has sequence of byte thus it needs the converter(which might be - * deserializer of bytes) to translate from sequence of sequence of bytes, - * where sequence refer to a frame and sub sequence refer to its payload. - * @param storageLevel RDD storage level. + * Create an input stream that receives messages pushed by a ZeroMQ publisher. + * @param jssc Java streaming context + * @param publisherUrl URL of remote ZeroMQ publisher + * @param connect When positive, connector will try to establish connectivity with remote server. + * Otherwise, it attempts to create and bind local socket. + * @param topics List of topics to subscribe + * @param messageConverter ZeroMQ stream publishes sequence of frames for each topic and each + * frame has sequence of byte thus it needs the converter (which might be + * deserializer of bytes) to translate from sequence of sequence of bytes, + * where sequence refer to a frame and sub sequence refer to its payload. + * First frame typically contains message envelope, which corresponds + * to topic name. + * @param storageLevel Storage level to use for persisting received objects */ - def createStream[T]( + def createJavaStream[T]( jssc: JavaStreamingContext, publisherUrl: String, - subscribe: Subscribe, - bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], + connect: Boolean, + topics: JList[Array[Byte]], + messageConverter: JFunction[Array[Array[Byte]], JIterable[T]], storageLevel: StorageLevel ): JavaReceiverInputDStream[T] = { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - val fn = - (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala - createStream[T]( - jssc.ssc, - publisherUrl, - subscribe, - fn, - storageLevel) + val fn = (x: Array[Array[Byte]]) => + messageConverter.call(x).iterator().asScala.toIterable + createStream(jssc.ssc, publisherUrl, connect, topics.asScala, fn, storageLevel) } /** - * Create an input stream that receives messages pushed by a zeromq publisher. - * @param jssc JavaStreamingContext object - * @param publisherUrl Url of remote zeromq publisher - * @param subscribe Topic to subscribe to - * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each - * frame has sequence of byte thus it needs the converter(which might - * be deserializer of bytes) to translate from sequence of sequence of - * bytes, where sequence refer to a frame and sub sequence refer to its - * payload. + * Create text input stream that receives messages pushed by a ZeroMQ publisher. + * @param jssc Java streaming context + * @param publisherUrl URL of remote ZeroMQ publisher + * @param connect When positive, connector will try to establish connectivity with remote server. + * Otherwise, it attempts to create and bind local socket. + * @param topics List of topics to subscribe + * @param storageLevel Storage level to use for persisting received objects */ - def createStream[T]( + def createTextJavaStream( jssc: JavaStreamingContext, publisherUrl: String, - subscribe: Subscribe, - bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]] - ): JavaReceiverInputDStream[T] = { - implicit val cm: ClassTag[T] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - val fn = - (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala - createStream[T]( - jssc.ssc, - publisherUrl, - subscribe, - fn) + connect: Boolean, + topics: JList[Array[Byte]], + storageLevel: StorageLevel + ): JavaReceiverInputDStream[String] = { + createStream(jssc.ssc, publisherUrl, connect, topics.asScala, + textMessageConverter, storageLevel + ) } } http://git-wip-us.apache.org/repos/asf/bahir/blob/fb752570/streaming-zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java ---------------------------------------------------------------------- diff --git a/streaming-zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming-zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java index cfedb5a..f9cee96 100644 --- a/streaming-zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java +++ b/streaming-zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java @@ -23,12 +23,11 @@ import org.junit.After; import org.junit.Before; public abstract class LocalJavaStreamingContext { - protected transient JavaStreamingContext ssc; @Before public void setUp() { - SparkConf conf = new SparkConf() + final SparkConf conf = new SparkConf() .setMaster("local[2]") .setAppName("test") .set("spark.streaming.clock", "org.apache.spark.util.ManualClock"); @@ -42,3 +41,4 @@ public abstract class LocalJavaStreamingContext { ssc = null; } } + http://git-wip-us.apache.org/repos/asf/bahir/blob/fb752570/streaming-zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java ---------------------------------------------------------------------- diff --git a/streaming-zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java b/streaming-zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java index 9fd0424..7a32972 100644 --- a/streaming-zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java +++ b/streaming-zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java @@ -17,47 +17,39 @@ package org.apache.spark.streaming.zeromq; -import akka.actor.ActorSystem; -import akka.actor.SupervisorStrategy; -import akka.util.ByteString; -import akka.zeromq.Subscribe; import org.junit.Test; import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.Function0; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.LocalJavaStreamingContext; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; +import zmq.ZMQ; -public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext { - - @Test // tests the API, does not actually test data receiving - public void testZeroMQStream() { - String publishUrl = "abc"; - Subscribe subscribe = new Subscribe((ByteString)null); - Function<byte[][], Iterable<String>> bytesToObjects = new BytesToObjects(); - Function0<ActorSystem> actorSystemCreator = new ActorSystemCreatorForTest(); - - JavaReceiverInputDStream<String> test1 = ZeroMQUtils.<String>createStream( - ssc, publishUrl, subscribe, bytesToObjects); - JavaReceiverInputDStream<String> test2 = ZeroMQUtils.<String>createStream( - ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2()); - JavaReceiverInputDStream<String> test3 = ZeroMQUtils.<String>createStream( - ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), - actorSystemCreator, SupervisorStrategy.defaultStrategy()); - } -} +import java.util.Arrays; -class BytesToObjects implements Function<byte[][], Iterable<String>> { - @Override - public Iterable<String> call(byte[][] bytes) throws Exception { - return null; - } +public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext { + @Test + public void testZeroMQAPICompatibility() { + // Test the API, but do not exchange any messages. + final String publishUrl = "tcp://localhost:5555"; + final String topic = "topic1"; + final Function<byte[][], Iterable<String>> messageConverter = + new Function<byte[][], Iterable<String>>() { + @Override + public Iterable<String> call(byte[][] bytes) throws Exception { + // Skip topic name and assume that each message contains only one frame. + return Arrays.asList(new String(bytes[1], ZMQ.CHARSET)); + } + }; + + JavaReceiverInputDStream<String> test1 = ZeroMQUtils.createJavaStream( + ssc, publishUrl, true, Arrays.asList(topic.getBytes()), messageConverter, + StorageLevel.MEMORY_AND_DISK_SER_2() + ); + JavaReceiverInputDStream<String> test2 = ZeroMQUtils.createTextJavaStream( + ssc, publishUrl, true, Arrays.asList(topic.getBytes()), + StorageLevel.MEMORY_AND_DISK_SER_2() + ); + } } -class ActorSystemCreatorForTest implements Function0<ActorSystem> { - @Override - public ActorSystem call() { - return null; - } -} http://git-wip-us.apache.org/repos/asf/bahir/blob/fb752570/streaming-zeromq/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/streaming-zeromq/src/test/resources/log4j.properties b/streaming-zeromq/src/test/resources/log4j.properties index 75e3b53..bcb37d2 100644 --- a/streaming-zeromq/src/test/resources/log4j.properties +++ b/streaming-zeromq/src/test/resources/log4j.properties @@ -15,8 +15,13 @@ # limitations under the License. # -# Set everything to be logged to the file target/unit-tests.log -log4j.rootCategory=INFO, file +log4j.rootCategory=INFO, console, file + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.out +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.conversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n + log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.append=true log4j.appender.file.file=target/unit-tests.log http://git-wip-us.apache.org/repos/asf/bahir/blob/fb752570/streaming-zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala ---------------------------------------------------------------------- diff --git a/streaming-zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala b/streaming-zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala index bac2679..547c948 100644 --- a/streaming-zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala +++ b/streaming-zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala @@ -17,47 +17,257 @@ package org.apache.spark.streaming.zeromq -import akka.actor.SupervisorStrategy -import akka.util.ByteString -import akka.zeromq.Subscribe +import scala.collection.mutable + +import org.scalatest.BeforeAndAfter +import org.scalatest.concurrent.Eventually +import org.scalatest.time +import org.scalatest.time.Span +import org.zeromq.Utils +import org.zeromq.ZContext +import org.zeromq.ZMQ +import org.zeromq.ZMsg import org.apache.spark.SparkFunSuite import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.ReceiverInputDStream -class ZeroMQStreamSuite extends SparkFunSuite { - - val batchDuration = Seconds(1) +class ZeroMQStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter { + private val publishUrl = "tcp://localhost:" + Utils.findOpenPort() + private val topic1 = "topic1" + private val topic2 = "topic2" + private val messageConverter = (bytes: Array[Array[Byte]]) => { + if (bytes(0) == null || bytes(0).length == 0) { + // Just to test that topic name is correctly populated. + // Should never happen, but it will cause test to fail. + // Assertions are not serializable. + Seq() + } else { + Seq(new String(bytes(1), ZMQ.CHARSET)) + } + } - private val master: String = "local[2]" + private var ssc: StreamingContext = _ + private var zeroContext: ZContext = _ + private var zeroSocket: ZMQ.Socket = _ - private val framework: String = this.getClass.getSimpleName + before { + ssc = new StreamingContext("local[2]", this.getClass.getSimpleName, Seconds(1)) + } - test("zeromq input stream") { - val ssc = new StreamingContext(master, framework, batchDuration) - val publishUrl = "abc" - val subscribe = new Subscribe(null.asInstanceOf[ByteString]) - val bytesToObjects = (bytes: Seq[ByteString]) => null.asInstanceOf[Iterator[String]] + after { + if (zeroContext != null) { + zeroContext.close() + zeroContext = null + } + zeroSocket = null + if (ssc != null) { + ssc.stop() + ssc = null + } + } - // tests the API, does not actually test data receiving - val test1: ReceiverInputDStream[String] = - ZeroMQUtils.createStream( - ssc, publishUrl, subscribe, bytesToObjects, actorSystemCreator = () => null) + test("Input stream API") { + // Test the API, but do not exchange any messages. + val test1: ReceiverInputDStream[String] = ZeroMQUtils.createStream( + ssc, publishUrl, true, Seq(topic1.getBytes), messageConverter + ) val test2: ReceiverInputDStream[String] = ZeroMQUtils.createStream( - ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2, () => null) - val test3: ReceiverInputDStream[String] = ZeroMQUtils.createStream( - ssc, publishUrl, subscribe, bytesToObjects, - StorageLevel.MEMORY_AND_DISK_SER_2, () => null, SupervisorStrategy.defaultStrategy) - val test4: ReceiverInputDStream[String] = - ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects) - val test5: ReceiverInputDStream[String] = ZeroMQUtils.createStream( - ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2) - val test6: ReceiverInputDStream[String] = ZeroMQUtils.createStream( - ssc, publishUrl, subscribe, bytesToObjects, - StorageLevel.MEMORY_AND_DISK_SER_2, supervisorStrategy = SupervisorStrategy.defaultStrategy) - - // TODO: Actually test data receiving. A real test needs the native ZeroMQ library - ssc.stop() + ssc, publishUrl, true, Seq(topic1.getBytes), messageConverter, + StorageLevel.MEMORY_AND_DISK_SER_2 + ) + val test3: ReceiverInputDStream[String] = ZeroMQUtils.createTextStream( + ssc, publishUrl, true, Seq(topic1.getBytes) + ) + } + + test("Publisher Bind(), Subscriber Connect()") { + zeroContext = new ZContext + zeroSocket = zeroContext.createSocket(ZMQ.PUB) + zeroSocket.bind(publishUrl) + + val receiveStream = ZeroMQUtils.createStream( + ssc, publishUrl, true, Seq(ZMQ.SUBSCRIPTION_ALL), messageConverter + ) + + @volatile var receivedMessages: mutable.Set[String] = mutable.Set() + receiveStream.foreachRDD { rdd => + for (element <- rdd.collect()) { + receivedMessages += element + } + receivedMessages + } + + ssc.start() + + checkAllReceived( + Map("Hello, World!" -> topic1, "Hello, ZeroMQ!" -> topic2), + receivedMessages + ) + } + + test("Publisher Connect(), Subscriber Bind()") { + val receiveStream = ZeroMQUtils.createStream( + ssc, publishUrl, false, Seq(ZMQ.SUBSCRIPTION_ALL), messageConverter + ) + + @volatile var receivedMessages: mutable.Set[String] = mutable.Set() + receiveStream.foreachRDD { rdd => + for (element <- rdd.collect()) { + receivedMessages += element + } + receivedMessages + } + + ssc.start() + + zeroContext = new ZContext + zeroSocket = zeroContext.createSocket(ZMQ.PUB) + zeroSocket.connect(publishUrl) + + checkAllReceived( + Map("Hello, World!" -> topic1, "Hello, ZeroMQ!" -> topic2), + receivedMessages + ) + } + + test("Filter by topic") { + zeroContext = new ZContext + zeroSocket = zeroContext.createSocket(ZMQ.PUB) + zeroSocket.bind(publishUrl) + + val receiveStream = ZeroMQUtils.createStream( + ssc, publishUrl, true, Seq(topic1.getBytes, topic2.getBytes), messageConverter + ) + + @volatile var receivedMessages: Set[String] = Set() + receiveStream.foreachRDD { rdd => + for (element <- rdd.collect()) { + receivedMessages += element + } + receivedMessages + } + + ssc.start() + + eventually(timeout(Span(5, time.Seconds)), interval(Span(500, time.Millis))) { + val payload1 = "Hello, World!" + val payload2 = "Hello, 0MQ!" + + // First message should not be picked up. + val msg1 = new ZMsg + msg1.add("wrong-topic".getBytes) + msg1.add("Bye, World!".getBytes) + msg1.send(zeroSocket) + + // Second message should be received. + val msg2 = new ZMsg + msg2.add(topic1.getBytes) + msg2.add(payload1.getBytes) + msg2.send(zeroSocket) + + // Third message should be received. + val msg3 = new ZMsg + msg3.add(topic2.getBytes) + msg3.add(payload2.getBytes) + msg3.send(zeroSocket) + + assert(receivedMessages.size == 2) + assert(Set(payload1, payload2).equals(receivedMessages)) + } + } + + test("Multiple frame message") { + zeroContext = new ZContext + zeroSocket = zeroContext.createSocket(ZMQ.PUB) + zeroSocket.bind(publishUrl) + + val receiveStream = ZeroMQUtils.createTextStream( + ssc, publishUrl, true, Seq(topic1.getBytes, topic2.getBytes) + ) + + @volatile var receivedMessages: Set[String] = Set() + receiveStream.foreachRDD { rdd => + for (element <- rdd.collect()) { + receivedMessages += element + } + receivedMessages + } + + ssc.start() + + eventually(timeout(Span(5, time.Seconds)), interval(Span(500, time.Millis))) { + val part1 = "first line" + val part2 = "second line" + + val msg = new ZMsg + msg.add(topic1.getBytes) + msg.add(part1.getBytes) + msg.add(part2.getBytes) + msg.send(zeroSocket) + + assert(receivedMessages.size == 2) + assert(Set(part1, part2).equals(receivedMessages)) + } + } + + test("Reconnection") { + zeroContext = new ZContext + zeroSocket = zeroContext.createSocket(ZMQ.PUB) + zeroSocket.bind(publishUrl) + + val receiveStream = ZeroMQUtils.createStream( + ssc, publishUrl, true, Seq(ZMQ.SUBSCRIPTION_ALL), messageConverter + ) + + @volatile var receivedMessages: mutable.Set[String] = mutable.Set() + receiveStream.foreachRDD { rdd => + for (element <- rdd.collect()) { + receivedMessages += element + } + receivedMessages + } + + ssc.start() + + checkAllReceived( + Map("Hello, World!" -> topic1, "Hello, ZeroMQ!" -> topic2), + receivedMessages + ) + + // Terminate bounded socket (server). + zeroContext.close() + zeroSocket = null + + Thread.sleep(2000) + + // Create new socket without stopping Spark stream. + zeroContext = new ZContext + zeroSocket = zeroContext.createSocket(ZMQ.PUB) + zeroSocket.bind(publishUrl) + + receivedMessages.clear() + + checkAllReceived( + Map("Apache Spark" -> topic1, "Apache Kafka" -> topic2), + receivedMessages + ) + } + + def checkAllReceived( + publishMessages: Map[String, String], + receivedMessages: mutable.Set[String]): Unit = { + eventually(timeout(Span(5, time.Seconds)), interval(Span(500, time.Millis))) { + for ((k, v) <- publishMessages) { + val msg = new ZMsg + msg.add(v.getBytes) + msg.add(k.getBytes) + msg.send(zeroSocket) + } + assert(receivedMessages.size == publishMessages.size) + assert(publishMessages.keySet.equals(receivedMessages)) + } } } +
