This is an automated email from the ASF dual-hosted git repository.

lresende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir.git


The following commit(s) were added to refs/heads/master by this push:
     new 69729e3  [BAHIR-203] Manual acknowledge PubSub messages
69729e3 is described below

commit 69729e304062152687ee3a0d20a4d51e3feb8fbe
Author: Lukasz Antoniak <[email protected]>
AuthorDate: Thu Apr 11 08:14:02 2019 +0200

    [BAHIR-203] Manual acknowledge PubSub messages
---
 .../streaming/pubsub/PubsubInputDStream.scala      | 36 ++++++++++++---
 .../spark/streaming/pubsub/PubsubUtils.scala       | 27 ++++++++++-
 .../spark/streaming/pubsub/PubsubStreamSuite.scala | 53 +++++++++++-----------
 .../spark/streaming/pubsub/PubsubTestUtils.scala   |  9 ++++
 4 files changed, 90 insertions(+), 35 deletions(-)

diff --git 
a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala
 
b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala
index e769f2e..1e4d8fa 100644
--- 
a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala
+++ 
b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala
@@ -51,11 +51,12 @@ class PubsubInputDStream(
     val topic: Option[String],
     val subscription: String,
     val credential: SparkGCPCredentials,
-    val _storageLevel: StorageLevel
+    val _storageLevel: StorageLevel,
+    val autoAcknowledge: Boolean
 ) extends ReceiverInputDStream[SparkPubsubMessage](_ssc) {
 
   override def getReceiver(): Receiver[SparkPubsubMessage] = {
-    new PubsubReceiver(project, topic, subscription, credential, _storageLevel)
+    new PubsubReceiver(project, topic, subscription, credential, 
_storageLevel, autoAcknowledge)
   }
 }
 
@@ -68,6 +69,7 @@ class PubsubInputDStream(
 class SparkPubsubMessage() extends Externalizable {
 
   private[pubsub] var message = new PubsubMessage
+  private[pubsub] var ackId: String = _
 
   def getData(): Array[Byte] = message.decodeData()
 
@@ -75,6 +77,8 @@ class SparkPubsubMessage() extends Externalizable {
 
   def getMessageId(): String = message.getMessageId
 
+  def getAckId(): String = ackId
+
   def getPublishTime(): String = message.getPublishTime
 
   override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException 
{
@@ -93,6 +97,14 @@ class SparkPubsubMessage() extends Externalizable {
         out.write(idBuff)
     }
 
+    ackId match {
+      case null => out.writeInt(-1)
+      case id =>
+        val ackIdBuff = Utils.serialize(ackId)
+        out.writeInt(ackIdBuff.length)
+        out.write(ackIdBuff)
+    }
+
     message.getPublishTime match {
       case null => out.writeInt(-1)
       case time =>
@@ -135,6 +147,14 @@ class SparkPubsubMessage() extends Externalizable {
     }
 
     in.readInt() match {
+      case -1 => ackId = null
+      case ackIdLength =>
+        val ackIdBuff = new Array[Byte](ackIdLength)
+        in.readFully(ackIdBuff)
+        ackId = Utils.deserialize(ackIdBuff)
+    }
+
+    in.readInt() match {
       case -1 => message.setPublishTime(null)
       case publishTimeLength =>
         val publishTimeBuff = new Array[Byte](publishTimeLength)
@@ -202,7 +222,8 @@ class PubsubReceiver(
     topic: Option[String],
     subscription: String,
     credential: SparkGCPCredentials,
-    storageLevel: StorageLevel)
+    storageLevel: StorageLevel,
+    autoAcknowledge: Boolean)
     extends Receiver[SparkPubsubMessage](storageLevel) {
 
   val APP_NAME = "sparkstreaming-pubsub-receiver"
@@ -261,13 +282,16 @@ class PubsubReceiver(
             .map(x => {
               val sm = new SparkPubsubMessage
               sm.message = x.getMessage
+              sm.ackId = x.getAckId
               sm
             })
             .iterator)
 
-        val ackRequest = new AcknowledgeRequest()
-        ackRequest.setAckIds(receivedMessages.map(x => x.getAckId).asJava)
-        client.projects().subscriptions().acknowledge(subscriptionFullName, 
ackRequest).execute()
+        if (autoAcknowledge) {
+          val ackRequest = new AcknowledgeRequest()
+          ackRequest.setAckIds(receivedMessages.map(x => x.getAckId).asJava)
+          client.projects().subscriptions().acknowledge(subscriptionFullName, 
ackRequest).execute()
+        }
         backoff = INIT_BACKOFF
       } catch {
         case e: GoogleJsonResponseException =>
diff --git 
a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubUtils.scala
 
b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubUtils.scala
index b4f02b9..05214c3 100644
--- 
a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubUtils.scala
+++ 
b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubUtils.scala
@@ -40,6 +40,7 @@ object PubsubUtils {
    * @param subscription    Subscription name to subscribe to
    * @param credentials     SparkGCPCredentials to use for authenticating
    * @param storageLevel    RDD storage level
+   * @param autoAcknowledge Auto acknowledge incoming messages
    * @return
    */
   def createStream(
@@ -48,7 +49,8 @@ object PubsubUtils {
       topic: Option[String],
       subscription: String,
       credentials: SparkGCPCredentials,
-      storageLevel: StorageLevel): ReceiverInputDStream[SparkPubsubMessage] = {
+      storageLevel: StorageLevel,
+      autoAcknowledge: Boolean = true): 
ReceiverInputDStream[SparkPubsubMessage] = {
     ssc.withNamedScope("pubsub stream") {
 
       new PubsubInputDStream(
@@ -57,7 +59,8 @@ object PubsubUtils {
         topic,
         subscription,
         credentials,
-        storageLevel)
+        storageLevel,
+        autoAcknowledge)
     }
   }
 
@@ -84,6 +87,26 @@ object PubsubUtils {
    * Create an input stream that receives messages pushed by a Pub/Sub 
publisher
    * using given credential
    *
+   * Throw not found exception if the subscription doesn't exist
+   *
+   * @param jssc         JavaStreamingContext object
+   * @param project      Google cloud project id
+   * @param subscription Subscription name to subscribe to
+   * @param credentials  SparkGCPCredentials to use for authenticating
+   * @param storageLevel RDD storage level
+   * @param autoAcknowledge Auto acknowledge incoming messages
+   * @return
+   */
+  def createStream(jssc: JavaStreamingContext, project: String, subscription: 
String,
+      credentials: SparkGCPCredentials, storageLevel: StorageLevel, 
autoAcknowledge: Boolean
+      ): JavaReceiverInputDStream[SparkPubsubMessage] = {
+    createStream(jssc.ssc, project, None, subscription, credentials, 
storageLevel, autoAcknowledge)
+  }
+
+  /**
+   * Create an input stream that receives messages pushed by a Pub/Sub 
publisher
+   * using given credential
+   *
    * If the subscription doesn't exist, create subscription by the given name.
    * Note: This Receiver will only receive the message arrived after the 
subscription created.
    *
diff --git 
a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubStreamSuite.scala
 
b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubStreamSuite.scala
index 8f499cb..b02f21d 100644
--- 
a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubStreamSuite.scala
+++ 
b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubStreamSuite.scala
@@ -29,6 +29,7 @@ import org.apache.spark.ConditionalSparkFunSuite
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.Seconds
 import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
 
 class PubsubStreamSuite extends ConditionalSparkFunSuite with Eventually with 
BeforeAndAfter {
 
@@ -44,11 +45,11 @@ class PubsubStreamSuite extends ConditionalSparkFunSuite 
with Eventually with Be
 
   private val subForCreateName: String = s"${topicName}_create_me"
 
-  private var ssc: StreamingContext = null
-  private var pubsubTestUtils: PubsubTestUtils = null
-  private var topicFullName: String = null
-  private var subscriptionFullName: String = null
-  private var subForCreateFullName: String = null
+  private var ssc: StreamingContext = _
+  private var pubsubTestUtils: PubsubTestUtils = _
+  private var topicFullName: String = _
+  private var subscriptionFullName: String = _
+  private var subForCreateFullName: String = _
 
   override def beforeAll(): Unit = {
     runIf(PubsubTestUtils.shouldRunTest) {
@@ -89,35 +90,30 @@ class PubsubStreamSuite extends ConditionalSparkFunSuite 
with Eventually with Be
       PubsubTestUtils.credential, StorageLevel.MEMORY_AND_DISK_SER_2)
   }
 
-  testIf("pubsub input stream", PubsubTestUtils.shouldRunTest) {
+  testIf("pubsub input stream", () => PubsubTestUtils.shouldRunTest()) {
     val receiveStream = PubsubUtils.createStream(
       ssc, PubsubTestUtils.projectId, Some(topicName), subscriptionName,
       PubsubTestUtils.credential, StorageLevel.MEMORY_AND_DISK_SER_2)
+    sendReceiveMessages(receiveStream)
+  }
 
-    @volatile var receiveMessages: List[SparkPubsubMessage] = List()
-    receiveStream.foreachRDD { rdd =>
-      if (rdd.collect().length > 0) {
-        receiveMessages = receiveMessages ::: List(rdd.first)
-        receiveMessages
-      }
-    }
-
-    ssc.start()
-
-    eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
-      val sendMessages = pubsubTestUtils.generatorMessages(10)
-      pubsubTestUtils.publishData(topicFullName, sendMessages)
-      assert(sendMessages.map(m => new String(m.getData))
-          .contains(new String(receiveMessages(0).getData)))
-      
assert(sendMessages.map(_.getAttributes).contains(receiveMessages(0).getAttributes))
-    }
+  testIf("manual acknowledgement", () => PubsubTestUtils.shouldRunTest()) {
+    val receiveStream = PubsubUtils.createStream(
+      ssc, PubsubTestUtils.projectId, Some(topicName), subscriptionName,
+      PubsubTestUtils.credential, StorageLevel.MEMORY_AND_DISK_SER_2, 
autoAcknowledge = false)
+    sendReceiveMessages(receiveStream)
+    ssc.stop()
+    assert(pubsubTestUtils.receiveData(subscriptionFullName, 10).nonEmpty)
   }
 
-  testIf("pubsub input stream, create pubsub", PubsubTestUtils.shouldRunTest) {
+  testIf("create subscription", () => PubsubTestUtils.shouldRunTest()) {
     val receiveStream = PubsubUtils.createStream(
       ssc, PubsubTestUtils.projectId, Some(topicName), subForCreateName,
       PubsubTestUtils.credential, StorageLevel.MEMORY_AND_DISK_SER_2)
+    sendReceiveMessages(receiveStream)
+  }
 
+  private def sendReceiveMessages(receiveStream: 
ReceiverInputDStream[SparkPubsubMessage]): Unit = {
     @volatile var receiveMessages: List[SparkPubsubMessage] = List()
     receiveStream.foreachRDD { rdd =>
       if (rdd.collect().length > 0) {
@@ -131,9 +127,12 @@ class PubsubStreamSuite extends ConditionalSparkFunSuite 
with Eventually with Be
     eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
       val sendMessages = pubsubTestUtils.generatorMessages(10)
       pubsubTestUtils.publishData(topicFullName, sendMessages)
-      assert(sendMessages.map(m => new String(m.getData))
-          .contains(new String(receiveMessages(0).getData)))
-      
assert(sendMessages.map(_.getAttributes).contains(receiveMessages(0).getAttributes))
+      assert(
+        sendMessages.map(m => new String(m.getData()))
+          .contains(new String(receiveMessages.head.getData()))
+      )
+      
assert(sendMessages.map(_.getAttributes()).contains(receiveMessages.head.getAttributes()))
+      assert(receiveMessages.head.getAckId() != null)
     }
   }
 }
diff --git 
a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubTestUtils.scala
 
b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubTestUtils.scala
index 39597ca..c5ca5ee 100644
--- 
a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubTestUtils.scala
+++ 
b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubTestUtils.scala
@@ -23,6 +23,8 @@ import com.google.api.services.pubsub.Pubsub
 import com.google.api.services.pubsub.Pubsub.Builder
 import com.google.api.services.pubsub.model.PublishRequest
 import com.google.api.services.pubsub.model.PubsubMessage
+import com.google.api.services.pubsub.model.PullRequest
+import com.google.api.services.pubsub.model.ReceivedMessage
 import com.google.api.services.pubsub.model.Subscription
 import com.google.api.services.pubsub.model.Topic
 import com.google.cloud.hadoop.util.RetryHttpInitializer
@@ -62,6 +64,13 @@ private[pubsub] class PubsubTestUtils extends Logging {
     client.projects().topics().publish(topic, publishRequest).execute()
   }
 
+  def receiveData(subscription: String, maxMsgs: Integer): 
List[ReceivedMessage] = {
+    val pullRequest = new 
PullRequest().setMaxMessages(maxMsgs).setReturnImmediately(false)
+    val pullResponse = client.projects().subscriptions().pull(subscription, 
pullRequest).execute()
+    val returnedMessages = pullResponse.getReceivedMessages
+    if (returnedMessages != null) returnedMessages.asScala.toList else List()
+  }
+
   def removeSubscription(subscription: String): Unit = {
     client.projects().subscriptions().delete(subscription).execute()
   }

Reply via email to