This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git
The following commit(s) were added to refs/heads/main by this push:
new fde163d63 Google Cloud Pub/Sub gRPC: subscribe() must not echo
initial-only StreamingPullRequest fields on keepalive ticks (#1625)
fde163d63 is described below
commit fde163d634dec7275979d619423e6f769b725f63
Author: Haruhiko Nishi <[email protected]>
AuthorDate: Wed May 13 19:54:44 2026 +0900
Google Cloud Pub/Sub gRPC: subscribe() must not echo initial-only
StreamingPullRequest fields on keepalive ticks (#1625)
* Google Cloud Pub/Sub gRPC: subscribe() must not echo initial-only
StreamingPullRequest fields on keepalive ticks
* `subscribe(...)` was clearing only `subscription` and
`streamAckDeadlineSeconds` on the keepalive request, leaving `clientId`,
`maxOutstandingMessages`, and `maxOutstandingBytes` from the initial one.
Pub/Sub rejects those three on subsequent requests with `INVALID_ARGUMENT`, so
any caller setting `maxOutstandingMessages` loses the whole stream about a
second after start. Fix is one line on each DSL: use
`StreamingPullRequest.defaultInstance` (Scala) / `getDefaultInstance` (Java) fo
[...]
* Google Cloud Pub/Sub gRPC: replace Thread.sleep with eventually in the
keepalive-tick test
`Thread.sleep(500)` replaced with `eventually(timeout(5.s),
interval(50.ms))` polling the captured queue.
* Google Cloud Pub/Sub gRPC: use top-level imports in the test stub
Dropped `scaladsl.`/`pekko.NotUsed` prefixes.
---
.../pubsub/grpc/javadsl/GooglePubSub.scala | 11 ++-
.../pubsub/grpc/scaladsl/GooglePubSub.scala | 13 +--
.../pubsub/grpc/AutoExtendAckDeadlinesSpec.scala | 95 ++++++++++++++++++++--
3 files changed, 101 insertions(+), 18 deletions(-)
diff --git
a/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/javadsl/GooglePubSub.scala
b/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/javadsl/GooglePubSub.scala
index edf0f0af8..1e717a69f 100644
---
a/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/javadsl/GooglePubSub.scala
+++
b/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/javadsl/GooglePubSub.scala
@@ -66,10 +66,13 @@ object GooglePubSub {
.fromMaterializer { (mat, attr) =>
val cancellable = new CompletableFuture[Cancellable]()
- val subsequentRequest = request.toBuilder
- .setSubscription("")
- .setStreamAckDeadlineSeconds(0)
- .build()
+ // Don't echo initial-only fields on keepalive requests. Pub/Sub
allows only
+ // ackIds, modifyDeadlineSeconds, and modifyDeadlineAckIds on
subsequent
+ // StreamingPullRequests; anything else from the initial request
(subscription,
+ // streamAckDeadlineSeconds, clientId, maxOutstandingMessages,
maxOutstandingBytes)
+ // gets back INVALID_ARGUMENT. defaultInstance clears them all at once
and stays
+ // safe if the proto grows more initial-only fields.
+ val subsequentRequest = StreamingPullRequest.getDefaultInstance
subscriber(mat, attr).client
.streamingPull(
diff --git
a/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/scaladsl/GooglePubSub.scala
b/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/scaladsl/GooglePubSub.scala
index d0746242d..4408f8503 100644
---
a/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/scaladsl/GooglePubSub.scala
+++
b/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/scaladsl/GooglePubSub.scala
@@ -66,9 +66,13 @@ object GooglePubSub {
.fromMaterializer { (mat, attr) =>
val cancellable = Promise[Cancellable]()
- val subsequentRequest = request
- .withSubscription("")
- .withStreamAckDeadlineSeconds(0)
+ // Don't echo initial-only fields on keepalive requests. Pub/Sub
allows only
+ // ackIds, modifyDeadlineSeconds, and modifyDeadlineAckIds on
subsequent
+ // StreamingPullRequests; anything else from the initial request
(subscription,
+ // streamAckDeadlineSeconds, clientId, maxOutstandingMessages,
maxOutstandingBytes)
+ // gets back INVALID_ARGUMENT. defaultInstance clears them all at once
and stays
+ // safe if the proto grows more initial-only fields.
+ val subsequentRequest = StreamingPullRequest.defaultInstance
subscriber(mat, attr).client
.streamingPull(
@@ -76,8 +80,7 @@ object GooglePubSub {
.single(request)
.concat(
Source
- .tick(0.seconds, pollInterval, ())
- .map(_ => subsequentRequest)
+ .tick(0.seconds, pollInterval, subsequentRequest)
.mapMaterializedValue(cancellable.success)))
.mapConcat(_.receivedMessages.toVector)
.mapMaterializedValue(_ => cancellable.future)
diff --git
a/google-cloud-pub-sub-grpc/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/AutoExtendAckDeadlinesSpec.scala
b/google-cloud-pub-sub-grpc/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/AutoExtendAckDeadlinesSpec.scala
index 1038927df..a4e0eb565 100644
---
a/google-cloud-pub-sub-grpc/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/AutoExtendAckDeadlinesSpec.scala
+++
b/google-cloud-pub-sub-grpc/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/AutoExtendAckDeadlinesSpec.scala
@@ -18,26 +18,30 @@
package org.apache.pekko.stream.connectors.googlecloud.pubsub.grpc
import org.apache.pekko
-import pekko.Done
+import pekko.{ Done, NotUsed }
import pekko.actor.ActorSystem
-import pekko.stream.scaladsl.{ Sink, Source }
+import pekko.stream.scaladsl.{ Keep, Sink, Source }
import pekko.stream.connectors.googlecloud.pubsub.grpc.scaladsl.{
GooglePubSub, GrpcSubscriber, PubSubAttributes }
import com.google.protobuf.ByteString
import com.google.pubsub.v1.pubsub._
+import org.apache.pekko.stream._
+import java.util.concurrent.ConcurrentLinkedQueue
import scala.concurrent.{ Future, Promise }
import scala.concurrent.duration._
-
+import scala.jdk.CollectionConverters._
import org.scalatest.BeforeAndAfterAll
-import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.concurrent.{ Eventually, ScalaFutures }
import org.scalatest.matchers.should.Matchers
+import org.scalatest.time.{ Millis, Seconds, Span }
import org.scalatest.wordspec.AnyWordSpec
class AutoExtendAckDeadlinesSpec
extends AnyWordSpec
with Matchers
with BeforeAndAfterAll
- with ScalaFutures {
+ with ScalaFutures
+ with Eventually {
implicit val system: ActorSystem = ActorSystem("AutoExtendAckDeadlinesSpec")
implicit val patience: PatienceConfig = PatienceConfig(10.seconds,
100.millis)
@@ -156,17 +160,90 @@ class AutoExtendAckDeadlinesSpec
}
}
+ "GooglePubSub.subscribe" should {
+ "send only allowed fields on subsequent StreamingPullRequest messages" in {
+ // Regression test: the keepalive tick must NOT echo initial-only fields
+ // (clientId, maxOutstandingMessages, maxOutstandingBytes) back to the
server,
+ // which would otherwise return INVALID_ARGUMENT on the first tick.
+ val captured = new ConcurrentLinkedQueue[StreamingPullRequest]()
+ val testSubscriber = new GrpcSubscriber(new
CapturingStreamingPullClient(captured)(system))
+
+ val initial = StreamingPullRequest()
+ .withSubscription(subscription)
+ .withStreamAckDeadlineSeconds(60)
+ .withClientId("test-client")
+ .withMaxOutstandingMessages(100L)
+ .withMaxOutstandingBytes(10485760L)
+
+ val cancellableFut = GooglePubSub
+ .subscribe(initial, 100.millis)
+ .withAttributes(PubSubAttributes.subscriber(testSubscriber))
+ .toMat(Sink.ignore)(Keep.left)
+ .run()
+
+ // Wait until the initial request plus at least 2 keepalive ticks have
landed,
+ // then cancel.
+ eventually(timeout(Span(5, Seconds)), interval(Span(50, Millis))) {
+ captured.size should be >= 3
+ }
+ cancellableFut.futureValue.cancel()
+
+ val first = captured.poll()
+ first should not be null
+ withClue("initial request must carry caller-supplied fields verbatim: ")
{
+ first.subscription shouldBe subscription
+ first.streamAckDeadlineSeconds shouldBe 60
+ first.clientId shouldBe "test-client"
+ first.maxOutstandingMessages shouldBe 100L
+ first.maxOutstandingBytes shouldBe 10485760L
+ }
+
+ val subsequent = Iterator
+ .continually(Option(captured.poll()))
+ .takeWhile(_.isDefined)
+ .flatten
+ .toList
+ subsequent should not be empty
+ subsequent.zipWithIndex.foreach { case (req, idx) =>
+ withClue(s"subsequent request #${idx + 1} must be the default
instance: ") {
+ req.subscription shouldBe ""
+ req.streamAckDeadlineSeconds shouldBe 0
+ req.clientId shouldBe ""
+ req.maxOutstandingMessages shouldBe 0L
+ req.maxOutstandingBytes shouldBe 0L
+ }
+ }
+ }
+ }
+
override def afterAll(): Unit =
system.terminate()
}
+/**
+ * Stub that captures every StreamingPullRequest sent on the client → server
stream
+ * and keeps the response stream open indefinitely (so the polling tick keeps
firing).
+ */
+class CapturingStreamingPullClient(captured:
ConcurrentLinkedQueue[StreamingPullRequest])(
+ implicit sys: ActorSystem) extends TestSubscriberClientBase {
+ private implicit val mat: Materializer = Materializer.matFromSystem(sys)
+
+ override def streamingPull(
+ in: Source[StreamingPullRequest, NotUsed])
+ : Source[StreamingPullResponse, NotUsed] =
+ Source
+ .maybe[StreamingPullResponse]
+ .mapMaterializedValue { _ =>
+ in.runForeach(req => captured.add(req))
+ NotUsed
+ }
+}
+
/**
* Base stub for [[SubscriberClient]] that provides default implementations
* for all methods. Tests override only the methods they need.
*/
trait TestSubscriberClientBase extends SubscriberClient {
- import pekko.NotUsed
-
override def createSubscription(in: Subscription): Future[Subscription] = ???
override def getSubscription(in: GetSubscriptionRequest):
Future[Subscription] = ???
override def updateSubscription(in: UpdateSubscriptionRequest):
Future[Subscription] = ???
@@ -176,8 +253,8 @@ trait TestSubscriberClientBase extends SubscriberClient {
override def acknowledge(in: AcknowledgeRequest):
Future[com.google.protobuf.empty.Empty] = ???
override def pull(in: PullRequest): Future[PullResponse] = ???
override def streamingPull(
- in: pekko.stream.scaladsl.Source[StreamingPullRequest, NotUsed])
- : pekko.stream.scaladsl.Source[StreamingPullResponse, NotUsed] = ???
+ in: Source[StreamingPullRequest, NotUsed])
+ : Source[StreamingPullResponse, NotUsed] = ???
override def modifyPushConfig(in: ModifyPushConfigRequest):
Future[com.google.protobuf.empty.Empty] = ???
override def getSnapshot(in: GetSnapshotRequest): Future[Snapshot] = ???
override def listSnapshots(in: ListSnapshotsRequest):
Future[ListSnapshotsResponse] = ???
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]