This is an automated email from the ASF dual-hosted git repository.
lresende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir.git
The following commit(s) were added to refs/heads/master by this push:
new 69729e3 [BAHIR-203] Manual acknowledge PubSub messages
69729e3 is described below
commit 69729e304062152687ee3a0d20a4d51e3feb8fbe
Author: Lukasz Antoniak <[email protected]>
AuthorDate: Thu Apr 11 08:14:02 2019 +0200
[BAHIR-203] Manual acknowledge PubSub messages
---
.../streaming/pubsub/PubsubInputDStream.scala | 36 ++++++++++++---
.../spark/streaming/pubsub/PubsubUtils.scala | 27 ++++++++++-
.../spark/streaming/pubsub/PubsubStreamSuite.scala | 53 +++++++++++-----------
.../spark/streaming/pubsub/PubsubTestUtils.scala | 9 ++++
4 files changed, 90 insertions(+), 35 deletions(-)
diff --git
a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala
b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala
index e769f2e..1e4d8fa 100644
---
a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala
+++
b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala
@@ -51,11 +51,12 @@ class PubsubInputDStream(
val topic: Option[String],
val subscription: String,
val credential: SparkGCPCredentials,
- val _storageLevel: StorageLevel
+ val _storageLevel: StorageLevel,
+ val autoAcknowledge: Boolean
) extends ReceiverInputDStream[SparkPubsubMessage](_ssc) {
override def getReceiver(): Receiver[SparkPubsubMessage] = {
- new PubsubReceiver(project, topic, subscription, credential, _storageLevel)
+ new PubsubReceiver(project, topic, subscription, credential,
_storageLevel, autoAcknowledge)
}
}
@@ -68,6 +69,7 @@ class PubsubInputDStream(
class SparkPubsubMessage() extends Externalizable {
private[pubsub] var message = new PubsubMessage
+ private[pubsub] var ackId: String = _
def getData(): Array[Byte] = message.decodeData()
@@ -75,6 +77,8 @@ class SparkPubsubMessage() extends Externalizable {
def getMessageId(): String = message.getMessageId
+ def getAckId(): String = ackId
+
def getPublishTime(): String = message.getPublishTime
override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException
{
@@ -93,6 +97,14 @@ class SparkPubsubMessage() extends Externalizable {
out.write(idBuff)
}
+ ackId match {
+ case null => out.writeInt(-1)
+ case id =>
+ val ackIdBuff = Utils.serialize(ackId)
+ out.writeInt(ackIdBuff.length)
+ out.write(ackIdBuff)
+ }
+
message.getPublishTime match {
case null => out.writeInt(-1)
case time =>
@@ -135,6 +147,14 @@ class SparkPubsubMessage() extends Externalizable {
}
in.readInt() match {
+ case -1 => ackId = null
+ case ackIdLength =>
+ val ackIdBuff = new Array[Byte](ackIdLength)
+ in.readFully(ackIdBuff)
+ ackId = Utils.deserialize(ackIdBuff)
+ }
+
+ in.readInt() match {
case -1 => message.setPublishTime(null)
case publishTimeLength =>
val publishTimeBuff = new Array[Byte](publishTimeLength)
@@ -202,7 +222,8 @@ class PubsubReceiver(
topic: Option[String],
subscription: String,
credential: SparkGCPCredentials,
- storageLevel: StorageLevel)
+ storageLevel: StorageLevel,
+ autoAcknowledge: Boolean)
extends Receiver[SparkPubsubMessage](storageLevel) {
val APP_NAME = "sparkstreaming-pubsub-receiver"
@@ -261,13 +282,16 @@ class PubsubReceiver(
.map(x => {
val sm = new SparkPubsubMessage
sm.message = x.getMessage
+ sm.ackId = x.getAckId
sm
})
.iterator)
- val ackRequest = new AcknowledgeRequest()
- ackRequest.setAckIds(receivedMessages.map(x => x.getAckId).asJava)
- client.projects().subscriptions().acknowledge(subscriptionFullName,
ackRequest).execute()
+ if (autoAcknowledge) {
+ val ackRequest = new AcknowledgeRequest()
+ ackRequest.setAckIds(receivedMessages.map(x => x.getAckId).asJava)
+ client.projects().subscriptions().acknowledge(subscriptionFullName,
ackRequest).execute()
+ }
backoff = INIT_BACKOFF
} catch {
case e: GoogleJsonResponseException =>
diff --git
a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubUtils.scala
b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubUtils.scala
index b4f02b9..05214c3 100644
---
a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubUtils.scala
+++
b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubUtils.scala
@@ -40,6 +40,7 @@ object PubsubUtils {
* @param subscription Subscription name to subscribe to
* @param credentials SparkGCPCredentials to use for authenticating
* @param storageLevel RDD storage level
+ * @param autoAcknowledge Auto acknowledge incoming messages
* @return
*/
def createStream(
@@ -48,7 +49,8 @@ object PubsubUtils {
topic: Option[String],
subscription: String,
credentials: SparkGCPCredentials,
- storageLevel: StorageLevel): ReceiverInputDStream[SparkPubsubMessage] = {
+ storageLevel: StorageLevel,
+ autoAcknowledge: Boolean = true):
ReceiverInputDStream[SparkPubsubMessage] = {
ssc.withNamedScope("pubsub stream") {
new PubsubInputDStream(
@@ -57,7 +59,8 @@ object PubsubUtils {
topic,
subscription,
credentials,
- storageLevel)
+ storageLevel,
+ autoAcknowledge)
}
}
@@ -84,6 +87,26 @@ object PubsubUtils {
* Create an input stream that receives messages pushed by a Pub/Sub
publisher
* using given credential
*
+ * Throw not found exception if the subscription doesn't exist
+ *
+ * @param jssc JavaStreamingContext object
+ * @param project Google cloud project id
+ * @param subscription Subscription name to subscribe to
+ * @param credentials SparkGCPCredentials to use for authenticating
+ * @param storageLevel RDD storage level
+ * @param autoAcknowledge Auto acknowledge incoming messages
+ * @return
+ */
+ def createStream(jssc: JavaStreamingContext, project: String, subscription:
String,
+ credentials: SparkGCPCredentials, storageLevel: StorageLevel,
autoAcknowledge: Boolean
+ ): JavaReceiverInputDStream[SparkPubsubMessage] = {
+ createStream(jssc.ssc, project, None, subscription, credentials,
storageLevel, autoAcknowledge)
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a Pub/Sub
publisher
+ * using given credential
+ *
* If the subscription doesn't exist, create subscription by the given name.
* Note: This Receiver will only receive the message arrived after the
subscription created.
*
diff --git
a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubStreamSuite.scala
b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubStreamSuite.scala
index 8f499cb..b02f21d 100644
---
a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubStreamSuite.scala
+++
b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubStreamSuite.scala
@@ -29,6 +29,7 @@ import org.apache.spark.ConditionalSparkFunSuite
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
class PubsubStreamSuite extends ConditionalSparkFunSuite with Eventually with
BeforeAndAfter {
@@ -44,11 +45,11 @@ class PubsubStreamSuite extends ConditionalSparkFunSuite
with Eventually with Be
private val subForCreateName: String = s"${topicName}_create_me"
- private var ssc: StreamingContext = null
- private var pubsubTestUtils: PubsubTestUtils = null
- private var topicFullName: String = null
- private var subscriptionFullName: String = null
- private var subForCreateFullName: String = null
+ private var ssc: StreamingContext = _
+ private var pubsubTestUtils: PubsubTestUtils = _
+ private var topicFullName: String = _
+ private var subscriptionFullName: String = _
+ private var subForCreateFullName: String = _
override def beforeAll(): Unit = {
runIf(PubsubTestUtils.shouldRunTest) {
@@ -89,35 +90,30 @@ class PubsubStreamSuite extends ConditionalSparkFunSuite
with Eventually with Be
PubsubTestUtils.credential, StorageLevel.MEMORY_AND_DISK_SER_2)
}
- testIf("pubsub input stream", PubsubTestUtils.shouldRunTest) {
+ testIf("pubsub input stream", () => PubsubTestUtils.shouldRunTest()) {
val receiveStream = PubsubUtils.createStream(
ssc, PubsubTestUtils.projectId, Some(topicName), subscriptionName,
PubsubTestUtils.credential, StorageLevel.MEMORY_AND_DISK_SER_2)
+ sendReceiveMessages(receiveStream)
+ }
- @volatile var receiveMessages: List[SparkPubsubMessage] = List()
- receiveStream.foreachRDD { rdd =>
- if (rdd.collect().length > 0) {
- receiveMessages = receiveMessages ::: List(rdd.first)
- receiveMessages
- }
- }
-
- ssc.start()
-
- eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
- val sendMessages = pubsubTestUtils.generatorMessages(10)
- pubsubTestUtils.publishData(topicFullName, sendMessages)
- assert(sendMessages.map(m => new String(m.getData))
- .contains(new String(receiveMessages(0).getData)))
-
assert(sendMessages.map(_.getAttributes).contains(receiveMessages(0).getAttributes))
- }
+ testIf("manual acknowledgement", () => PubsubTestUtils.shouldRunTest()) {
+ val receiveStream = PubsubUtils.createStream(
+ ssc, PubsubTestUtils.projectId, Some(topicName), subscriptionName,
+ PubsubTestUtils.credential, StorageLevel.MEMORY_AND_DISK_SER_2,
autoAcknowledge = false)
+ sendReceiveMessages(receiveStream)
+ ssc.stop()
+ assert(pubsubTestUtils.receiveData(subscriptionFullName, 10).nonEmpty)
}
- testIf("pubsub input stream, create pubsub", PubsubTestUtils.shouldRunTest) {
+ testIf("create subscription", () => PubsubTestUtils.shouldRunTest()) {
val receiveStream = PubsubUtils.createStream(
ssc, PubsubTestUtils.projectId, Some(topicName), subForCreateName,
PubsubTestUtils.credential, StorageLevel.MEMORY_AND_DISK_SER_2)
+ sendReceiveMessages(receiveStream)
+ }
+ private def sendReceiveMessages(receiveStream:
ReceiverInputDStream[SparkPubsubMessage]): Unit = {
@volatile var receiveMessages: List[SparkPubsubMessage] = List()
receiveStream.foreachRDD { rdd =>
if (rdd.collect().length > 0) {
@@ -131,9 +127,12 @@ class PubsubStreamSuite extends ConditionalSparkFunSuite
with Eventually with Be
eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
val sendMessages = pubsubTestUtils.generatorMessages(10)
pubsubTestUtils.publishData(topicFullName, sendMessages)
- assert(sendMessages.map(m => new String(m.getData))
- .contains(new String(receiveMessages(0).getData)))
-
assert(sendMessages.map(_.getAttributes).contains(receiveMessages(0).getAttributes))
+ assert(
+ sendMessages.map(m => new String(m.getData()))
+ .contains(new String(receiveMessages.head.getData()))
+ )
+
assert(sendMessages.map(_.getAttributes()).contains(receiveMessages.head.getAttributes()))
+ assert(receiveMessages.head.getAckId() != null)
}
}
}
diff --git
a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubTestUtils.scala
b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubTestUtils.scala
index 39597ca..c5ca5ee 100644
---
a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubTestUtils.scala
+++
b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubTestUtils.scala
@@ -23,6 +23,8 @@ import com.google.api.services.pubsub.Pubsub
import com.google.api.services.pubsub.Pubsub.Builder
import com.google.api.services.pubsub.model.PublishRequest
import com.google.api.services.pubsub.model.PubsubMessage
+import com.google.api.services.pubsub.model.PullRequest
+import com.google.api.services.pubsub.model.ReceivedMessage
import com.google.api.services.pubsub.model.Subscription
import com.google.api.services.pubsub.model.Topic
import com.google.cloud.hadoop.util.RetryHttpInitializer
@@ -62,6 +64,13 @@ private[pubsub] class PubsubTestUtils extends Logging {
client.projects().topics().publish(topic, publishRequest).execute()
}
+ def receiveData(subscription: String, maxMsgs: Integer):
List[ReceivedMessage] = {
+ val pullRequest = new
PullRequest().setMaxMessages(maxMsgs).setReturnImmediately(false)
+ val pullResponse = client.projects().subscriptions().pull(subscription,
pullRequest).execute()
+ val returnedMessages = pullResponse.getReceivedMessages
+ if (returnedMessages != null) returnedMessages.asScala.toList else List()
+ }
+
def removeSubscription(subscription: String): Unit = {
client.projects().subscriptions().delete(subscription).execute()
}