Repository: bahir
Updated Branches:
  refs/heads/master aecd5fd9f -> fb752570c


[BAHIR-66] Switch to Java binding for ZeroMQ

Initially, I just wanted to implement integration test for BAHIR-66.
Google pointed me to JeroMQ, which provides official ZeroMQ binding
for Java and does not require native libraries. I have decided to give
it a try, but quickly realized that akka-zeromq module (transient
dependency from current Bahir master) is not compatible with JeroMQ.
Actually Akka team also wanted to move to JeroMQ (akka/akka#13856),
but in the end decided to remove akka-zeromq project completely
(akka/akka#15864, https://www.lightbend.com/blog/akka-roadmap-update-2014).

Having in mind that akka-zeromq does not support latest version of ZeroMQ
protocol and further development may come delayed, I have decided to refactor
streaming-zeromq implementation and leverage JeroMQ. With the change we receive
various benefits, such as support for PUB-SUB and PUSH-PULL messaging patterns
and the ability to bind the socket on whatever end of communication channel
(see test cases), subscription to multiple channels, etc. JeroMQ seems pretty
reliable and reconnection is handled out-of-the-box. Actually, we could even
start the ZeroMQ subscriber trying to connect to remote socket before other
end created and bound the socket. While I tried to preserve backward 
compatibility
of method signatures, there was no easy way to support Akka API and business
logic that users could put there (e.g. akka.actor.ActorSystem).

Closes #71


Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/fb752570
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/fb752570
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/fb752570

Branch: refs/heads/master
Commit: fb752570c7ac817b414c738e05b751dd5864feb6
Parents: aecd5fd
Author: Lukasz Antoniak <[email protected]>
Authored: Tue Nov 27 06:58:42 2018 -0800
Committer: Luciano Resende <[email protected]>
Committed: Fri Nov 30 11:10:49 2018 +0100

----------------------------------------------------------------------
 streaming-zeromq/README.md                      |  17 +-
 .../streaming/zeromq/ZeroMQWordCount.scala      | 112 ++++----
 streaming-zeromq/pom.xml                        |  14 +-
 .../streaming/zeromq/ZeroMQInputDStream.scala   | 123 +++++++++
 .../spark/streaming/zeromq/ZeroMQReceiver.scala |  55 ----
 .../spark/streaming/zeromq/ZeroMQUtils.scala    | 190 ++++++-------
 .../streaming/LocalJavaStreamingContext.java    |   4 +-
 .../streaming/zeromq/JavaZeroMQStreamSuite.java |  60 ++--
 .../src/test/resources/log4j.properties         |   9 +-
 .../streaming/zeromq/ZeroMQStreamSuite.scala    | 272 ++++++++++++++++---
 10 files changed, 565 insertions(+), 291 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir/blob/fb752570/streaming-zeromq/README.md
----------------------------------------------------------------------
diff --git a/streaming-zeromq/README.md b/streaming-zeromq/README.md
index 952bb4f..8ced539 100644
--- a/streaming-zeromq/README.md
+++ b/streaming-zeromq/README.md
@@ -1,3 +1,4 @@
+# Spark Streaming ZeroMQ Connector
 
 A library for reading data from [ZeroMQ](http://zeromq.org/) using Spark 
Streaming. 
 
@@ -27,13 +28,23 @@ This library is cross-published for Scala 2.10 and Scala 
2.11, so users should r
 
 ## Examples
 
+Review end-to-end examples at [ZeroMQ 
Examples](https://github.com/apache/bahir/tree/master/streaming-zeromq/examples).
 
 ### Scala API
 
-    val lines = ZeroMQUtils.createStream(ssc, ...)
+    import org.apache.spark.streaming.zeromq.ZeroMQUtils
+
+    val lines = ZeroMQUtils.createTextStream(
+      ssc, "tcp://server:5555", true, Seq("my-topic".getBytes)
+    )
 
 ### Java API
 
-    JavaDStream<String> lines = ZeroMQUtils.createStream(jssc, ...);
+    import org.apache.spark.storage.StorageLevel;
+    import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+    import org.apache.spark.streaming.zeromq.ZeroMQUtils;
 
-See end-to-end examples at [ZeroMQ 
Examples](https://github.com/apache/bahir/tree/master/streaming-zeromq/examples)
\ No newline at end of file
+    JavaReceiverInputDStream<String> test1 = ZeroMQUtils.createJavaStream(
+        ssc, "tcp://server:5555", true, Arrays.asList("my-topic.getBytes()),
+        StorageLevel.MEMORY_AND_DISK_SER_2()
+    );
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bahir/blob/fb752570/streaming-zeromq/examples/src/main/scala/org/apache/spark/examples/streaming/zeromq/ZeroMQWordCount.scala
----------------------------------------------------------------------
diff --git 
a/streaming-zeromq/examples/src/main/scala/org/apache/spark/examples/streaming/zeromq/ZeroMQWordCount.scala
 
b/streaming-zeromq/examples/src/main/scala/org/apache/spark/examples/streaming/zeromq/ZeroMQWordCount.scala
index 00fd815..24284ec 100644
--- 
a/streaming-zeromq/examples/src/main/scala/org/apache/spark/examples/streaming/zeromq/ZeroMQWordCount.scala
+++ 
b/streaming-zeromq/examples/src/main/scala/org/apache/spark/examples/streaming/zeromq/ZeroMQWordCount.scala
@@ -15,105 +15,117 @@
  * limitations under the License.
  */
 
-// scalastyle:off println awaitresult
 package org.apache.spark.examples.streaming.zeromq
 
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
 import scala.language.implicitConversions
+import scala.util.Random
 
-import akka.actor.ActorSystem
-import akka.actor.actorRef2Scala
-import akka.util.ByteString
-import akka.zeromq._
-import akka.zeromq.Subscribe
 import org.apache.log4j.{Level, Logger}
+import org.zeromq.ZContext
+import org.zeromq.ZMQ
+import org.zeromq.ZMQException
+import org.zeromq.ZMsg
 
 import org.apache.spark.SparkConf
 import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.zeromq._
+import org.apache.spark.streaming.zeromq.ZeroMQUtils
 
 /**
- * A simple publisher for demonstration purposes, repeatedly publishes random 
Messages
- * every one second.
+ * Simple publisher for demonstration purposes,
+ * repeatedly publishes random messages every one second.
  */
 object SimpleZeroMQPublisher {
-
   def main(args: Array[String]): Unit = {
     if (args.length < 2) {
-      System.err.println("Usage: SimpleZeroMQPublisher <zeroMQUrl> <topic> ")
+      // scalastyle:off println
+      System.err.println("Usage: SimpleZeroMQPublisher <zeroMqUrl> <topic>")
+      // scalastyle:on println
       System.exit(1)
     }
 
     val Seq(url, topic) = args.toSeq
-    val acs: ActorSystem = ActorSystem()
-
-    val pubSocket = ZeroMQExtension(acs).newSocket(SocketType.Pub, Bind(url))
-    implicit def stringToByteString(x: String): ByteString = ByteString(x)
-    val messages: List[ByteString] = List("words ", "may ", "count ")
-    while (true) {
-      Thread.sleep(1000)
-      pubSocket ! ZMQMessage(ByteString(topic) :: messages)
-    }
-    Await.result(acs.whenTerminated, Duration.Inf)
+    val context = new ZContext
+    val socket = context.createSocket(ZMQ.PUB)
+    socket.bind(url)
+
+    val zmqThread = new Thread(new Runnable {
+      def run() {
+        val messages = List("words", "may", "count infinitely")
+        val random = new Random
+        while (!Thread.currentThread.isInterrupted) {
+          try {
+            Thread.sleep(random.nextInt(1000))
+            val msg = new ZMsg
+            msg.add(topic.getBytes)
+            msg.add(messages(random.nextInt(messages.size)).getBytes)
+            msg.send(socket)
+          } catch {
+            case e: ZMQException if ZMQ.Error.ETERM.getCode == e.getErrorCode 
=>
+              Thread.currentThread.interrupt()
+            case e: InterruptedException =>
+            case e: Throwable => throw e
+          }
+        }
+      }
+    })
+
+    sys.addShutdownHook( {
+      context.destroy()
+      zmqThread.interrupt()
+      zmqThread.join()
+    } )
+
+    zmqThread.start()
   }
 }
 
-// scalastyle:off
 /**
- * A sample wordcount with ZeroMQStream stream
- *
- * To work with zeroMQ, some native libraries have to be installed.
- * Install zeroMQ (release 2.1) core libraries. [ZeroMQ Install guide]
- * (http://www.zeromq.org/intro:get-the-software)
+ * Sample word count with ZeroMQ stream.
  *
- * Usage: ZeroMQWordCount <zeroMQurl> <topic>
- *   <zeroMQurl> and <topic> describe where zeroMq publisher is running.
+ * Usage: ZeroMQWordCount <zeroMqUrl> <topic>
+ *   <zeroMqUrl> describes where ZeroMQ publisher is running
+ *   <topic> defines logical message type
  *
- * To run this example locally, you may run publisher as
+ * To run this example locally, you may start publisher as:
  *    `$ bin/run-example \
  *      org.apache.spark.examples.streaming.zeromq.SimpleZeroMQPublisher 
tcp://127.0.0.1:1234 foo`
- * and run the example as
+ * and run the example as:
  *    `$ bin/run-example \
  *      org.apache.spark.examples.streaming.zeromq.ZeroMQWordCount 
tcp://127.0.0.1:1234 foo`
  */
-// scalastyle:on
 object ZeroMQWordCount {
   def main(args: Array[String]) {
     if (args.length < 2) {
-      System.err.println("Usage: ZeroMQWordCount <zeroMQurl> <topic>")
+      // scalastyle:off println
+      System.err.println("Usage: ZeroMQWordCount <zeroMqUrl> <topic>")
+      // scalastyle:on println
       System.exit(1)
     }
 
-    // Set logging level if log4j not configured (override by adding 
log4j.properties to classpath)
-    if (!Logger.getRootLogger.getAllAppenders.hasMoreElements) {
-      Logger.getRootLogger.setLevel(Level.WARN)
-    }
+    // Set logging level if log4j not configured (override by adding 
log4j.properties to classpath).
+    Logger.getRootLogger.setLevel(Level.WARN)
 
     val Seq(url, topic) = args.toSeq
     val sparkConf = new SparkConf().setAppName("ZeroMQWordCount")
 
-    // check Spark configuration for master URL, set it to local if not 
configured
+    // Check Spark configuration for master URL, set it to local if not 
present.
     if (!sparkConf.contains("spark.master")) {
       sparkConf.setMaster("local[2]")
     }
 
-    // Create the context and set the batch size
+    // Create the context and set the batch size.
     val ssc = new StreamingContext(sparkConf, Seconds(2))
 
-    def bytesToStringIterator(x: Seq[ByteString]): Iterator[String] = 
x.map(_.utf8String).iterator
-
-    // For this stream, a zeroMQ publisher should be running.
-    val lines = ZeroMQUtils.createStream(
-      ssc,
-      url,
-      Subscribe(topic),
-      bytesToStringIterator _)
+    val lines = ZeroMQUtils.createTextStream(
+      ssc, url, true, Seq(topic.getBytes)
+    )
     val words = lines.flatMap(_.split(" "))
     val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
+
     wordCounts.print()
+
     ssc.start()
     ssc.awaitTermination()
   }
 }
-// scalastyle:on println awaitresult
+

http://git-wip-us.apache.org/repos/asf/bahir/blob/fb752570/streaming-zeromq/pom.xml
----------------------------------------------------------------------
diff --git a/streaming-zeromq/pom.xml b/streaming-zeromq/pom.xml
index b95c118..0587c6e 100644
--- a/streaming-zeromq/pom.xml
+++ b/streaming-zeromq/pom.xml
@@ -46,6 +46,11 @@
       <scope>provided</scope>
     </dependency>
     <dependency>
+      <groupId>org.zeromq</groupId>
+      <artifactId>jeromq</artifactId>
+      <version>0.4.3</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-core_${scala.binary.version}</artifactId>
       <version>${spark.version}</version>
@@ -53,15 +58,6 @@
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>org.apache.bahir</groupId>
-      <artifactId>spark-streaming-akka_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>${akka.group}</groupId>
-      <artifactId>akka-zeromq_${scala.binary.version}</artifactId>
-    </dependency>
-    <dependency>
       <groupId>org.scalacheck</groupId>
       <artifactId>scalacheck_${scala.binary.version}</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/bahir/blob/fb752570/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQInputDStream.scala
 
b/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQInputDStream.scala
new file mode 100644
index 0000000..ec2ec3a
--- /dev/null
+++ 
b/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQInputDStream.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.zeromq
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import org.zeromq.ZContext
+import org.zeromq.ZMQ
+import org.zeromq.ZMQException
+import org.zeromq.ZMsg
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.streaming.receiver.Receiver
+
+/**
+ * ZeroMQ receive stream.
+ */
+private[streaming]
+class ZeroMQInputDStream[T: ClassTag](
+    ssc: StreamingContext,
+    publisherUrl: String,
+    connect: Boolean,
+    topics: Seq[Array[Byte]],
+    bytesToObjects: Array[Array[Byte]] => Iterable[T],
+    storageLevel: StorageLevel)
+  extends ReceiverInputDStream[T](ssc) with Logging {
+
+  override def getReceiver(): Receiver[T] = {
+    new ZeroMQReceiver(publisherUrl, connect, topics, bytesToObjects, 
storageLevel)
+  }
+}
+
+private[zeromq]
+class ZeroMQReceiver[T: ClassTag](
+    publisherUrl: String,
+    connect: Boolean,
+    topics: Seq[Array[Byte]],
+    bytesToObjects: Array[Array[Byte]] => Iterable[T],
+    storageLevel: StorageLevel)
+  extends Receiver[T](storageLevel) {
+
+  private var receivingThread: Thread = _
+
+  override def onStart(): Unit = {
+    receivingThread = new Thread("zeromq-receiver-" + publisherUrl) {
+      override def run() {
+        subscribe()
+      }
+    }
+    receivingThread.start()
+  }
+
+  def subscribe(): Unit = {
+    val context = new ZContext
+
+    // JeroMQ requires to create and destroy socket in the same thread.
+    // Socket API is not thread-safe.
+    val socket = context.createSocket(ZMQ.SUB)
+    topics.foreach(socket.subscribe)
+    socket.setReceiveTimeOut(1000)
+    if (connect) {
+      socket.connect(publisherUrl)
+    } else {
+      socket.bind(publisherUrl)
+    }
+
+    try {
+      while (!isStopped()) {
+        receiveLoop(socket)
+      }
+    } finally {
+      // Context will take care of destructing all associated sockets.
+      context.close()
+    }
+  }
+
+  def receiveLoop(socket: ZMQ.Socket): Unit = {
+    try {
+      val message = ZMsg.recvMsg(socket)
+      if (message != null) {
+        val frames = new ArrayBuffer[Array[Byte]]
+        message.asScala.foreach(f => frames.append(f.getData))
+        bytesToObjects(frames.toArray).foreach(store)
+      }
+    } catch {
+      case e: ZMQException =>
+        if (e.getErrorCode != zmq.ZError.ETERM
+          && e.getErrorCode != zmq.ZError.EINTR) {
+          // 1) Context was terminated. It means that we have just closed the 
context
+          // from a different thread, while trying to receive new message.
+          // Error is expected and can happen in normal situation.
+          // Reference: http://zguide.zeromq.org/java:interrupt.
+          // 2) System call interrupted.
+          throw e
+        }
+    }
+  }
+
+  override def onStop(): Unit = {
+    receivingThread.join()
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/bahir/blob/fb752570/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
----------------------------------------------------------------------
diff --git 
a/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
 
b/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
deleted file mode 100644
index 4f6f006..0000000
--- 
a/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.zeromq
-
-import scala.reflect.ClassTag
-
-import akka.util.ByteString
-import akka.zeromq._
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.streaming.akka.ActorReceiver
-
-/**
- * A receiver to subscribe to ZeroMQ stream.
- */
-private[streaming] class ZeroMQReceiver[T: ClassTag](
-    publisherUrl: String,
-    subscribe: Subscribe,
-    bytesToObjects: Seq[ByteString] => Iterator[T])
-  extends ActorReceiver with Logging {
-
-  override def preStart(): Unit = {
-    ZeroMQExtension(context.system)
-      .newSocket(SocketType.Sub, Listener(self), Connect(publisherUrl), 
subscribe)
-  }
-
-  def receive: Receive = {
-
-    case Connecting => logInfo("connecting ...")
-
-    case m: ZMQMessage =>
-      logDebug("Received message for:" + m.frame(0))
-
-      // We ignore first frame for processing as it is the topic
-      val bytes = m.frames.tail
-      store(bytesToObjects(bytes))
-
-    case Closed => logInfo("received closed ")
-  }
-}

http://git-wip-us.apache.org/repos/asf/bahir/blob/fb752570/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
----------------------------------------------------------------------
diff --git 
a/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
 
b/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
index 1784d6e..2f7b645 100644
--- 
a/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
+++ 
b/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
@@ -17,147 +17,127 @@
 
 package org.apache.spark.streaming.zeromq
 
+import java.lang.{Iterable => JIterable}
+import java.util.{List => JList}
+
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
 import scala.reflect.ClassTag
 
-import akka.actor.{ActorSystem, Props, SupervisorStrategy}
-import akka.util.ByteString
-import akka.zeromq.Subscribe
+import org.zeromq.ZMQ
 
-import org.apache.spark.api.java.function.{Function => JFunction, Function0 => 
JFunction0}
+import org.apache.spark.api.java.function.{Function => JFunction}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils}
 import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, 
JavaStreamingContext}
 import org.apache.spark.streaming.dstream.ReceiverInputDStream
 
 object ZeroMQUtils {
+  val textMessageConverter: Array[Array[Byte]] => Iterable[String] =
+      (bytes: Array[Array[Byte]]) => {
+    // First frame typically contains topic name, so we skip it to extract 
only payload.
+    val result = new ArrayBuffer[String]()
+    for (i <- 1 until bytes.length) {
+      result.append(new String(bytes(i), ZMQ.CHARSET))
+    }
+    result
+  }
+
   /**
-   * Create an input stream that receives messages pushed by a zeromq 
publisher.
-   * @param ssc StreamingContext object
-   * @param publisherUrl Url of remote zeromq publisher
-   * @param subscribe Topic to subscribe to
-   * @param bytesToObjects A zeroMQ stream publishes sequence of frames for 
each topic
-   *                       and each frame has sequence of byte thus it needs 
the converter
-   *                       (which might be deserializer of bytes) to translate 
from sequence
-   *                       of sequence of bytes, where sequence refer to a 
frame
-   *                       and sub sequence refer to its payload.
-   * @param storageLevel   RDD storage level. Defaults to 
StorageLevel.MEMORY_AND_DISK_SER_2.
-   * @param actorSystemCreator A function to create ActorSystem in executors. 
`ActorSystem` will
-   *                           be shut down when the receiver is stopping 
(default:
-   *                           ActorReceiver.defaultActorSystemCreator)
-   * @param supervisorStrategy the supervisor strategy (default: 
ActorReceiver.defaultStrategy)
+   * Create an input stream that receives messages pushed by a ZeroMQ 
publisher.
+   * @param ssc Streaming context
+   * @param publisherUrl URL of remote ZeroMQ publisher
+   * @param connect When positive, connector will try to establish 
connectivity with remote server.
+   *                Otherwise, it attempts to create and bind local socket.
+   * @param topics List of topics to subscribe
+   * @param messageConverter ZeroMQ stream publishes sequence of frames for 
each topic
+   *                         and each frame has sequence of byte thus it needs 
the converter
+   *                         (which might be deserializer of bytes) to 
translate from sequence
+   *                         of sequence of bytes, where sequence refer to a 
frame
+   *                         and sub sequence refer to its payload. First 
frame typically
+   *                         contains message envelope, which corresponds to 
topic name.
+   * @param storageLevel RDD storage level. Defaults to 
StorageLevel.MEMORY_AND_DISK_SER_2.
    */
   def createStream[T: ClassTag](
       ssc: StreamingContext,
       publisherUrl: String,
-      subscribe: Subscribe,
-      bytesToObjects: Seq[ByteString] => Iterator[T],
-      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
-      actorSystemCreator: () => ActorSystem = 
ActorReceiver.defaultActorSystemCreator,
-      supervisorStrategy: SupervisorStrategy = 
ActorReceiver.defaultSupervisorStrategy
+      connect: Boolean,
+      topics: Seq[Array[Byte]],
+      messageConverter: Array[Array[Byte]] => Iterable[T],
+      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
     ): ReceiverInputDStream[T] = {
-    AkkaUtils.createStream(
-      ssc,
-      Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)),
-      "ZeroMQReceiver",
-      storageLevel,
-      actorSystemCreator,
-      supervisorStrategy)
+    ssc.withNamedScope("ZeroMQ stream") {
+      new ZeroMQInputDStream(ssc, publisherUrl, connect, topics, 
messageConverter, storageLevel)
+    }
   }
 
   /**
-   * Create an input stream that receives messages pushed by a zeromq 
publisher.
-   * @param jssc JavaStreamingContext object
-   * @param publisherUrl Url of remote ZeroMQ publisher
-   * @param subscribe Topic to subscribe to
-   * @param bytesToObjects A zeroMQ stream publishes sequence of frames for 
each topic and each
-   *                       frame has sequence of byte thus it needs the 
converter(which might be
-   *                       deserializer of bytes) to translate from sequence 
of sequence of bytes,
-   *                       where sequence refer to a frame and sub sequence 
refer to its payload.
-   * @param storageLevel Storage level to use for storing the received objects
-   * @param actorSystemCreator A function to create ActorSystem in executors. 
`ActorSystem` will
-   *                           be shut down when the receiver is stopping.
-   * @param supervisorStrategy the supervisor strategy (default: 
ActorReceiver.defaultStrategy)
+   * Create text input stream that receives messages pushed by a ZeroMQ 
publisher.
+   * @param ssc Streaming context
+   * @param publisherUrl URL of remote ZeroMQ publisher
+   * @param connect When positive, connector will try to establish 
connectivity with remote server.
+   *                Otherwise, it attempts to create and bind local socket.
+   * @param topics List of topics to subscribe
+   * @param storageLevel RDD storage level. Defaults to 
StorageLevel.MEMORY_AND_DISK_SER_2.
    */
-  def createStream[T](
-      jssc: JavaStreamingContext,
+  def createTextStream(
+      ssc: StreamingContext,
       publisherUrl: String,
-      subscribe: Subscribe,
-      bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
-      storageLevel: StorageLevel,
-      actorSystemCreator: JFunction0[ActorSystem],
-      supervisorStrategy: SupervisorStrategy
-    ): JavaReceiverInputDStream[T] = {
-    implicit val cm: ClassTag[T] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
-    val fn =
-      (x: Seq[ByteString]) => 
bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala
-    createStream[T](
-      jssc.ssc,
-      publisherUrl,
-      subscribe,
-      fn,
-      storageLevel,
-      () => actorSystemCreator.call(),
-      supervisorStrategy)
+      connect: Boolean,
+      topics: Seq[Array[Byte]],
+      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+    ): ReceiverInputDStream[String] = {
+    createStream[String](ssc, publisherUrl, connect, topics, 
textMessageConverter, storageLevel)
   }
 
   /**
-   * Create an input stream that receives messages pushed by a zeromq 
publisher.
-   * @param jssc JavaStreamingContext object
-   * @param publisherUrl Url of remote zeromq publisher
-   * @param subscribe Topic to subscribe to
-   * @param bytesToObjects A zeroMQ stream publishes sequence of frames for 
each topic and each
-   *                       frame has sequence of byte thus it needs the 
converter(which might be
-   *                       deserializer of bytes) to translate from sequence 
of sequence of bytes,
-   *                       where sequence refer to a frame and sub sequence 
refer to its payload.
-   * @param storageLevel RDD storage level.
+   * Create an input stream that receives messages pushed by a ZeroMQ 
publisher.
+   * @param jssc Java streaming context
+   * @param publisherUrl URL of remote ZeroMQ publisher
+   * @param connect When positive, connector will try to establish 
connectivity with remote server.
+   *                Otherwise, it attempts to create and bind local socket.
+   * @param topics List of topics to subscribe
+   * @param messageConverter ZeroMQ stream publishes sequence of frames for 
each topic and each
+   *                         frame has sequence of byte thus it needs the 
converter (which might be
+   *                         deserializer of bytes) to translate from sequence 
of sequence of bytes,
+   *                         where sequence refer to a frame and sub sequence 
refer to its payload.
+   *                         First frame typically contains message envelope, 
which corresponds
+   *                         to topic name.
+   * @param storageLevel Storage level to use for persisting received objects
    */
-  def createStream[T](
+  def createJavaStream[T](
       jssc: JavaStreamingContext,
       publisherUrl: String,
-      subscribe: Subscribe,
-      bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
+      connect: Boolean,
+      topics: JList[Array[Byte]],
+      messageConverter: JFunction[Array[Array[Byte]], JIterable[T]],
       storageLevel: StorageLevel
     ): JavaReceiverInputDStream[T] = {
     implicit val cm: ClassTag[T] =
       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
-    val fn =
-      (x: Seq[ByteString]) => 
bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala
-    createStream[T](
-      jssc.ssc,
-      publisherUrl,
-      subscribe,
-      fn,
-      storageLevel)
+    val fn = (x: Array[Array[Byte]]) =>
+      messageConverter.call(x).iterator().asScala.toIterable
+    createStream(jssc.ssc, publisherUrl, connect, topics.asScala, fn, 
storageLevel)
   }
 
   /**
-   * Create an input stream that receives messages pushed by a zeromq 
publisher.
-   * @param jssc JavaStreamingContext object
-   * @param publisherUrl Url of remote zeromq publisher
-   * @param subscribe Topic to subscribe to
-   * @param bytesToObjects A zeroMQ stream publishes sequence of frames for 
each topic and each
-   *                       frame has sequence of byte thus it needs the 
converter(which might
-   *                       be deserializer of bytes) to translate from 
sequence of sequence of
-   *                       bytes, where sequence refer to a frame and sub 
sequence refer to its
-   *                       payload.
+   * Create text input stream that receives messages pushed by a ZeroMQ 
publisher.
+   * @param jssc Java streaming context
+   * @param publisherUrl URL of remote ZeroMQ publisher
+   * @param connect When positive, connector will try to establish 
connectivity with remote server.
+   *                Otherwise, it attempts to create and bind local socket.
+   * @param topics List of topics to subscribe
+   * @param storageLevel Storage level to use for persisting received objects
    */
-  def createStream[T](
+  def createTextJavaStream(
       jssc: JavaStreamingContext,
       publisherUrl: String,
-      subscribe: Subscribe,
-      bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]]
-    ): JavaReceiverInputDStream[T] = {
-    implicit val cm: ClassTag[T] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
-    val fn =
-      (x: Seq[ByteString]) => 
bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala
-    createStream[T](
-      jssc.ssc,
-      publisherUrl,
-      subscribe,
-      fn)
+      connect: Boolean,
+      topics: JList[Array[Byte]],
+      storageLevel: StorageLevel
+    ): JavaReceiverInputDStream[String] = {
+    createStream(jssc.ssc, publisherUrl, connect, topics.asScala,
+      textMessageConverter, storageLevel
+    )
   }
 }

http://git-wip-us.apache.org/repos/asf/bahir/blob/fb752570/streaming-zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
----------------------------------------------------------------------
diff --git 
a/streaming-zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
 
b/streaming-zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
index cfedb5a..f9cee96 100644
--- 
a/streaming-zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ 
b/streaming-zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -23,12 +23,11 @@ import org.junit.After;
 import org.junit.Before;
 
 public abstract class LocalJavaStreamingContext {
-
     protected transient JavaStreamingContext ssc;
 
     @Before
     public void setUp() {
-        SparkConf conf = new SparkConf()
+        final SparkConf conf = new SparkConf()
             .setMaster("local[2]")
             .setAppName("test")
             .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
@@ -42,3 +41,4 @@ public abstract class LocalJavaStreamingContext {
         ssc = null;
     }
 }
+

http://git-wip-us.apache.org/repos/asf/bahir/blob/fb752570/streaming-zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
----------------------------------------------------------------------
diff --git 
a/streaming-zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
 
b/streaming-zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
index 9fd0424..7a32972 100644
--- 
a/streaming-zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
+++ 
b/streaming-zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
@@ -17,47 +17,39 @@
 
 package org.apache.spark.streaming.zeromq;
 
-import akka.actor.ActorSystem;
-import akka.actor.SupervisorStrategy;
-import akka.util.ByteString;
-import akka.zeromq.Subscribe;
 import org.junit.Test;
 
 import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.Function0;
 import org.apache.spark.storage.StorageLevel;
 import org.apache.spark.streaming.LocalJavaStreamingContext;
 import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+import zmq.ZMQ;
 
-public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
-
-  @Test // tests the API, does not actually test data receiving
-  public void testZeroMQStream() {
-    String publishUrl = "abc";
-    Subscribe subscribe = new Subscribe((ByteString)null);
-    Function<byte[][], Iterable<String>> bytesToObjects = new BytesToObjects();
-    Function0<ActorSystem> actorSystemCreator = new 
ActorSystemCreatorForTest();
-
-    JavaReceiverInputDStream<String> test1 = ZeroMQUtils.<String>createStream(
-      ssc, publishUrl, subscribe, bytesToObjects);
-    JavaReceiverInputDStream<String> test2 = ZeroMQUtils.<String>createStream(
-      ssc, publishUrl, subscribe, bytesToObjects, 
StorageLevel.MEMORY_AND_DISK_SER_2());
-    JavaReceiverInputDStream<String> test3 = ZeroMQUtils.<String>createStream(
-      ssc, publishUrl, subscribe, bytesToObjects, 
StorageLevel.MEMORY_AND_DISK_SER_2(),
-      actorSystemCreator, SupervisorStrategy.defaultStrategy());
-  }
-}
+import java.util.Arrays;
 
-class BytesToObjects implements Function<byte[][], Iterable<String>> {
-  @Override
-  public Iterable<String> call(byte[][] bytes) throws Exception {
-    return null;
-  }
+public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
+    @Test
+    public void testZeroMQAPICompatibility() {
+        // Test the API, but do not exchange any messages.
+        final String publishUrl = "tcp://localhost:5555";
+        final String topic = "topic1";
+        final Function<byte[][], Iterable<String>> messageConverter =
+                new Function<byte[][], Iterable<String>>() {
+                    @Override
+                    public Iterable<String> call(byte[][] bytes) throws 
Exception {
+                        // Skip topic name and assume that each message 
contains only one frame.
+                        return Arrays.asList(new String(bytes[1], 
ZMQ.CHARSET));
+                    }
+                };
+
+        JavaReceiverInputDStream<String> test1 = ZeroMQUtils.createJavaStream(
+                ssc, publishUrl, true, Arrays.asList(topic.getBytes()), 
messageConverter,
+                StorageLevel.MEMORY_AND_DISK_SER_2()
+        );
+        JavaReceiverInputDStream<String> test2 = 
ZeroMQUtils.createTextJavaStream(
+                ssc, publishUrl, true, Arrays.asList(topic.getBytes()),
+                StorageLevel.MEMORY_AND_DISK_SER_2()
+        );
+    }
 }
 
-class ActorSystemCreatorForTest implements Function0<ActorSystem> {
-  @Override
-  public ActorSystem call() {
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/bahir/blob/fb752570/streaming-zeromq/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/streaming-zeromq/src/test/resources/log4j.properties 
b/streaming-zeromq/src/test/resources/log4j.properties
index 75e3b53..bcb37d2 100644
--- a/streaming-zeromq/src/test/resources/log4j.properties
+++ b/streaming-zeromq/src/test/resources/log4j.properties
@@ -15,8 +15,13 @@
 # limitations under the License.
 #
 
-# Set everything to be logged to the file target/unit-tests.log
-log4j.rootCategory=INFO, file
+log4j.rootCategory=INFO, console, file
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.out
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.conversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t 
%p %c{1}: %m%n
+
 log4j.appender.file=org.apache.log4j.FileAppender
 log4j.appender.file.append=true
 log4j.appender.file.file=target/unit-tests.log

http://git-wip-us.apache.org/repos/asf/bahir/blob/fb752570/streaming-zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming-zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
 
b/streaming-zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
index bac2679..547c948 100644
--- 
a/streaming-zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
+++ 
b/streaming-zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
@@ -17,47 +17,257 @@
 
 package org.apache.spark.streaming.zeromq
 
-import akka.actor.SupervisorStrategy
-import akka.util.ByteString
-import akka.zeromq.Subscribe
+import scala.collection.mutable
+
+import org.scalatest.BeforeAndAfter
+import org.scalatest.concurrent.Eventually
+import org.scalatest.time
+import org.scalatest.time.Span
+import org.zeromq.Utils
+import org.zeromq.ZContext
+import org.zeromq.ZMQ
+import org.zeromq.ZMsg
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.{Seconds, StreamingContext}
 import org.apache.spark.streaming.dstream.ReceiverInputDStream
 
-class ZeroMQStreamSuite extends SparkFunSuite {
-
-  val batchDuration = Seconds(1)
+class ZeroMQStreamSuite extends SparkFunSuite with Eventually with 
BeforeAndAfter {
+  private val publishUrl = "tcp://localhost:" + Utils.findOpenPort()
+  private val topic1 = "topic1"
+  private val topic2 = "topic2"
+  private val messageConverter = (bytes: Array[Array[Byte]]) => {
+    if (bytes(0) == null || bytes(0).length == 0) {
+      // Just to test that topic name is correctly populated.
+      // Should never happen, but it will cause test to fail.
+      // Assertions are not serializable.
+      Seq()
+    } else {
+      Seq(new String(bytes(1), ZMQ.CHARSET))
+    }
+  }
 
-  private val master: String = "local[2]"
+  private var ssc: StreamingContext = _
+  private var zeroContext: ZContext = _
+  private var zeroSocket: ZMQ.Socket = _
 
-  private val framework: String = this.getClass.getSimpleName
+  before {
+    ssc = new StreamingContext("local[2]", this.getClass.getSimpleName, 
Seconds(1))
+  }
 
-  test("zeromq input stream") {
-    val ssc = new StreamingContext(master, framework, batchDuration)
-    val publishUrl = "abc"
-    val subscribe = new Subscribe(null.asInstanceOf[ByteString])
-    val bytesToObjects = (bytes: Seq[ByteString]) => 
null.asInstanceOf[Iterator[String]]
+  after {
+    if (zeroContext != null) {
+      zeroContext.close()
+      zeroContext = null
+    }
+    zeroSocket = null
+    if (ssc != null) {
+      ssc.stop()
+      ssc = null
+    }
+  }
 
-    // tests the API, does not actually test data receiving
-    val test1: ReceiverInputDStream[String] =
-      ZeroMQUtils.createStream(
-        ssc, publishUrl, subscribe, bytesToObjects, actorSystemCreator = () => 
null)
+  test("Input stream API") {
+    // Test the API, but do not exchange any messages.
+    val test1: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
+      ssc, publishUrl, true, Seq(topic1.getBytes), messageConverter
+    )
     val test2: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
-      ssc, publishUrl, subscribe, bytesToObjects, 
StorageLevel.MEMORY_AND_DISK_SER_2, () => null)
-    val test3: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
-      ssc, publishUrl, subscribe, bytesToObjects,
-      StorageLevel.MEMORY_AND_DISK_SER_2, () => null, 
SupervisorStrategy.defaultStrategy)
-    val test4: ReceiverInputDStream[String] =
-      ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects)
-    val test5: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
-      ssc, publishUrl, subscribe, bytesToObjects, 
StorageLevel.MEMORY_AND_DISK_SER_2)
-    val test6: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
-      ssc, publishUrl, subscribe, bytesToObjects,
-      StorageLevel.MEMORY_AND_DISK_SER_2, supervisorStrategy = 
SupervisorStrategy.defaultStrategy)
-
-    // TODO: Actually test data receiving. A real test needs the native ZeroMQ 
library
-    ssc.stop()
+      ssc, publishUrl, true, Seq(topic1.getBytes), messageConverter,
+      StorageLevel.MEMORY_AND_DISK_SER_2
+    )
+    val test3: ReceiverInputDStream[String] = ZeroMQUtils.createTextStream(
+      ssc, publishUrl, true, Seq(topic1.getBytes)
+    )
+  }
+
+  test("Publisher Bind(), Subscriber Connect()") {
+    zeroContext = new ZContext
+    zeroSocket = zeroContext.createSocket(ZMQ.PUB)
+    zeroSocket.bind(publishUrl)
+
+    val receiveStream = ZeroMQUtils.createStream(
+      ssc, publishUrl, true, Seq(ZMQ.SUBSCRIPTION_ALL), messageConverter
+    )
+
+    @volatile var receivedMessages: mutable.Set[String] = mutable.Set()
+    receiveStream.foreachRDD { rdd =>
+      for (element <- rdd.collect()) {
+        receivedMessages += element
+      }
+      receivedMessages
+    }
+
+    ssc.start()
+
+    checkAllReceived(
+      Map("Hello, World!" -> topic1, "Hello, ZeroMQ!" -> topic2),
+      receivedMessages
+    )
+  }
+
+  test("Publisher Connect(), Subscriber Bind()") {
+    val receiveStream = ZeroMQUtils.createStream(
+      ssc, publishUrl, false, Seq(ZMQ.SUBSCRIPTION_ALL), messageConverter
+    )
+
+    @volatile var receivedMessages: mutable.Set[String] = mutable.Set()
+    receiveStream.foreachRDD { rdd =>
+      for (element <- rdd.collect()) {
+        receivedMessages += element
+      }
+      receivedMessages
+    }
+
+    ssc.start()
+
+    zeroContext = new ZContext
+    zeroSocket = zeroContext.createSocket(ZMQ.PUB)
+    zeroSocket.connect(publishUrl)
+
+    checkAllReceived(
+      Map("Hello, World!" -> topic1, "Hello, ZeroMQ!" -> topic2),
+      receivedMessages
+    )
+  }
+
+  test("Filter by topic") {
+    zeroContext = new ZContext
+    zeroSocket = zeroContext.createSocket(ZMQ.PUB)
+    zeroSocket.bind(publishUrl)
+
+    val receiveStream = ZeroMQUtils.createStream(
+      ssc, publishUrl, true, Seq(topic1.getBytes, topic2.getBytes), 
messageConverter
+    )
+
+    @volatile var receivedMessages: Set[String] = Set()
+    receiveStream.foreachRDD { rdd =>
+      for (element <- rdd.collect()) {
+        receivedMessages += element
+      }
+      receivedMessages
+    }
+
+    ssc.start()
+
+    eventually(timeout(Span(5, time.Seconds)), interval(Span(500, 
time.Millis))) {
+      val payload1 = "Hello, World!"
+      val payload2 = "Hello, 0MQ!"
+
+      // First message should not be picked up.
+      val msg1 = new ZMsg
+      msg1.add("wrong-topic".getBytes)
+      msg1.add("Bye, World!".getBytes)
+      msg1.send(zeroSocket)
+
+      // Second message should be received.
+      val msg2 = new ZMsg
+      msg2.add(topic1.getBytes)
+      msg2.add(payload1.getBytes)
+      msg2.send(zeroSocket)
+
+      // Third message should be received.
+      val msg3 = new ZMsg
+      msg3.add(topic2.getBytes)
+      msg3.add(payload2.getBytes)
+      msg3.send(zeroSocket)
+
+      assert(receivedMessages.size == 2)
+      assert(Set(payload1, payload2).equals(receivedMessages))
+    }
+  }
+
+  test("Multiple frame message") {
+    zeroContext = new ZContext
+    zeroSocket = zeroContext.createSocket(ZMQ.PUB)
+    zeroSocket.bind(publishUrl)
+
+    val receiveStream = ZeroMQUtils.createTextStream(
+      ssc, publishUrl, true, Seq(topic1.getBytes, topic2.getBytes)
+    )
+
+    @volatile var receivedMessages: Set[String] = Set()
+    receiveStream.foreachRDD { rdd =>
+      for (element <- rdd.collect()) {
+        receivedMessages += element
+      }
+      receivedMessages
+    }
+
+    ssc.start()
+
+    eventually(timeout(Span(5, time.Seconds)), interval(Span(500, 
time.Millis))) {
+      val part1 = "first line"
+      val part2 = "second line"
+
+      val msg = new ZMsg
+      msg.add(topic1.getBytes)
+      msg.add(part1.getBytes)
+      msg.add(part2.getBytes)
+      msg.send(zeroSocket)
+
+      assert(receivedMessages.size == 2)
+      assert(Set(part1, part2).equals(receivedMessages))
+    }
+  }
+
+  test("Reconnection") {
+    zeroContext = new ZContext
+    zeroSocket = zeroContext.createSocket(ZMQ.PUB)
+    zeroSocket.bind(publishUrl)
+
+    val receiveStream = ZeroMQUtils.createStream(
+      ssc, publishUrl, true, Seq(ZMQ.SUBSCRIPTION_ALL), messageConverter
+    )
+
+    @volatile var receivedMessages: mutable.Set[String] = mutable.Set()
+    receiveStream.foreachRDD { rdd =>
+      for (element <- rdd.collect()) {
+        receivedMessages += element
+      }
+      receivedMessages
+    }
+
+    ssc.start()
+
+    checkAllReceived(
+      Map("Hello, World!" -> topic1, "Hello, ZeroMQ!" -> topic2),
+      receivedMessages
+    )
+
+    // Terminate bounded socket (server).
+    zeroContext.close()
+    zeroSocket = null
+
+    Thread.sleep(2000)
+
+    // Create new socket without stopping Spark stream.
+    zeroContext = new ZContext
+    zeroSocket = zeroContext.createSocket(ZMQ.PUB)
+    zeroSocket.bind(publishUrl)
+
+    receivedMessages.clear()
+
+    checkAllReceived(
+      Map("Apache Spark" -> topic1, "Apache Kafka" -> topic2),
+      receivedMessages
+    )
+  }
+
+  def checkAllReceived(
+      publishMessages: Map[String, String],
+      receivedMessages: mutable.Set[String]): Unit = {
+    eventually(timeout(Span(5, time.Seconds)), interval(Span(500, 
time.Millis))) {
+      for ((k, v) <- publishMessages) {
+        val msg = new ZMsg
+        msg.add(v.getBytes)
+        msg.add(k.getBytes)
+        msg.send(zeroSocket)
+      }
+      assert(receivedMessages.size == publishMessages.size)
+      assert(publishMessages.keySet.equals(receivedMessages))
+    }
   }
 }
+

Reply via email to