This is an automated email from the ASF dual-hosted git repository.
lresende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir.git
The following commit(s) were added to refs/heads/master by this push:
new b681d69 [MINOR] Handle case when no messages from pubsub
b681d69 is described below
commit b681d691c28d2623bc8de047a15ed805e15987c8
Author: Grzegorz Lyczba <[email protected]>
AuthorDate: Fri May 17 00:53:05 2019 +0200
[MINOR] Handle case when no messages from pubsub
Method getReceivedMessages returns NULL when there is no message in
a subscription. Store for processing in Spark and prepare the ACK request
only when, at least, one message is ready for processing.
---
.../spark/streaming/pubsub/PubsubInputDStream.scala | 16 +++++++++-------
1 file changed, 9 insertions(+), 7 deletions(-)
diff --git
a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala
b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala
index 1e4d8fa..7357d23 100644
---
a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala
+++
b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala
@@ -22,7 +22,6 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput}
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
-import com.google.api.client.auth.oauth2.Credential
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport
import com.google.api.client.googleapis.json.GoogleJsonResponseException
import com.google.api.client.json.jackson2.JacksonFactory
@@ -277,8 +276,9 @@ class PubsubReceiver(
try {
val pullResponse =
client.projects().subscriptions().pull(subscriptionFullName,
pullRequest).execute()
- val receivedMessages = pullResponse.getReceivedMessages.asScala.toList
- store(receivedMessages
+ val receivedMessages = pullResponse.getReceivedMessages
+ if (receivedMessages != null) {
+ store(receivedMessages.asScala.toList
.map(x => {
val sm = new SparkPubsubMessage
sm.message = x.getMessage
@@ -287,10 +287,12 @@ class PubsubReceiver(
})
.iterator)
- if (autoAcknowledge) {
- val ackRequest = new AcknowledgeRequest()
- ackRequest.setAckIds(receivedMessages.map(x => x.getAckId).asJava)
- client.projects().subscriptions().acknowledge(subscriptionFullName,
ackRequest).execute()
+ if (autoAcknowledge) {
+ val ackRequest = new AcknowledgeRequest()
+ ackRequest.setAckIds(receivedMessages.asScala.map(x =>
x.getAckId).asJava)
+ client.projects().subscriptions().acknowledge(subscriptionFullName,
+ ackRequest).execute()
+ }
}
backoff = INIT_BACKOFF
} catch {