This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-r2dbc.git
The following commit(s) were added to refs/heads/main by this push:
new 880ba59 Replace deprecated OverflowStrategy.dropNew in
R2dbcReadJournal (#365)
880ba59 is described below
commit 880ba59378945e157c88a3c80bd7b451e1af72f1
Author: PJ Fanning <[email protected]>
AuthorDate: Thu May 7 20:25:14 2026 +0100
Replace deprecated OverflowStrategy.dropNew in R2dbcReadJournal (#365)
* fix: replace deprecated OverflowStrategy.dropNew with dropHead in
R2dbcReadJournal
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/e193ad7b-6d57-4550-92f2-f6ad7fa72e89
Co-authored-by: pjfanning <[email protected]>
* fix: add comment explaining why dropHead is acceptable for pub/sub
overflow
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/e193ad7b-6d57-4550-92f2-f6ad7fa72e89
Co-authored-by: pjfanning <[email protected]>
* Update comment on OverflowStrategy deprecation
Clarify deprecation notice for OverflowStrategy.dropNew.
* test: add EventsBySlicePubSubOverflowSpec to verify no events lost on
pub-sub buffer overflow
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/781d3606-377d-4bff-8497-4f8f6643cb8c
Co-authored-by: pjfanning <[email protected]>
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
.../r2dbc/query/scaladsl/R2dbcReadJournal.scala | 6 +-
.../query/EventsBySlicePubSubOverflowSpec.scala | 123 +++++++++++++++++++++
2 files changed, 128 insertions(+), 1 deletion(-)
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
index 2f8de36..d951a66 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
@@ -164,7 +164,11 @@ final class R2dbcReadJournal(system: ExtendedActorSystem,
config: Config, cfgPat
completionMatcher = PartialFunction.empty,
failureMatcher = PartialFunction.empty,
bufferSize = settings.bufferSize,
- overflowStrategy = OverflowStrategy.dropNew)
+ // dropHead drops the oldest buffered event when full; any dropped
pub/sub events are
+ // recovered from the database source which is merged below, and
deduplication handles
+ // any events received via both paths.
+ // OverflowStrategy.dropNew is long deprecated and removed in
Pekko 2.0.0.
+ overflowStrategy = OverflowStrategy.dropHead)
.mapMaterializedValue { ref =>
(minSlice to maxSlice).foreach { slice =>
import pekko.actor.typed.scaladsl.adapter._
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubOverflowSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubOverflowSpec.scala
new file mode 100644
index 0000000..2d27c30
--- /dev/null
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubOverflowSpec.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc.query
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.testkit.typed.scaladsl.LogCapturing
+import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
+import pekko.actor.typed.ActorSystem
+import pekko.actor.typed.internal.pubsub.TopicImpl
+import pekko.persistence.query.NoOffset
+import pekko.persistence.query.PersistenceQuery
+import pekko.persistence.query.typed.EventEnvelope
+import pekko.persistence.r2dbc.TestActors
+import pekko.persistence.r2dbc.TestActors.Persister.PersistWithAck
+import pekko.persistence.r2dbc.TestConfig
+import pekko.persistence.r2dbc.TestData
+import pekko.persistence.r2dbc.TestDbLifecycle
+import pekko.persistence.r2dbc.internal.PubSub
+import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
+import pekko.stream.testkit.TestSubscriber
+import pekko.stream.testkit.scaladsl.TestSink
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+import org.scalatest.wordspec.AnyWordSpecLike
+
+import scala.concurrent.duration._
+
+object EventsBySlicePubSubOverflowSpec {
+ def config: Config =
+ ConfigFactory.load(
+ ConfigFactory
+ .parseString("""
+ pekko.persistence.r2dbc {
+ journal.publish-events = on
+ query {
+ # Small buffer so that the pub-sub overflow strategy is forced to kick
in
+ # when the consumer is not pulling (no downstream demand).
+ buffer-size = 3
+ # Enable deduplication so that events delivered via both pub-sub and
+ # the DB recovery source are not emitted twice.
+ deduplicate-capacity = 100
+ }
+ }
+ pekko.actor.testkit.typed.filter-leeway = 20.seconds
+ """)
+
.withFallback(TestConfig.backtrackingDisabledConfig.withFallback(TestConfig.unresolvedConfig))
+ )
+}
+
+class EventsBySlicePubSubOverflowSpec
+ extends ScalaTestWithActorTestKit(EventsBySlicePubSubOverflowSpec.config)
+ with AnyWordSpecLike
+ with TestDbLifecycle
+ with TestData
+ with LogCapturing {
+
+ override def typedSystem: ActorSystem[_] = system
+
+ private val query =
+
PersistenceQuery(testKit.system).readJournalFor[R2dbcReadJournal](R2dbcReadJournal.Identifier)
+
+ "EventsBySlices pub-sub" should {
+
+ "not lose events when the pub-sub buffer overflows and the DB source
recovers them" in {
+ val entityType = nextEntityType()
+ val persistenceId = nextPid(entityType)
+ val slice = query.sliceForPersistenceId(persistenceId)
+ val persister = spawn(TestActors.Persister(persistenceId))
+ val probe = createTestProbe[Done]()
+ val sinkProbe = TestSink[EventEnvelope[String]]()(system.classicSystem)
+
+ // Start the query but do NOT request any elements (no downstream
demand).
+ // The pub-sub actorRef source pushes events into its buffer regardless
of
+ // downstream demand. With buffer-size=3 and 20 incoming events the
buffer
+ // overflows: OverflowStrategy.dropHead evicts the oldest buffered event
on
+ // each overflow, so only the 3 most-recent events survive in the buffer.
+ val result: TestSubscriber.Probe[EventEnvelope[String]] =
+ query
+ .eventsBySlices[String](entityType, slice, slice, NoOffset)
+ .runWith(sinkProbe)
+
+ // Wait for the pub-sub subscription to be established before persisting.
+ val topicStatsProbe = createTestProbe[TopicImpl.TopicStats]()
+ eventually {
+ PubSub(typedSystem).eventTopic[String](entityType, slice) !
TopicImpl.GetTopicStats(topicStatsProbe.ref)
+ topicStatsProbe.receiveMessage().localSubscriberCount shouldBe 1
+ }
+
+ // Persist many more events than the buffer-size (3).
+ // All events are written to the DB and published to pub-sub.
+ // After the 3rd event the pub-sub buffer is full: each subsequent push
+ // evicts the oldest buffered event (dropHead), so the buffer will hold
+ // only the 3 most-recently published events when we start consuming.
+ val eventCount = 20
+ for (i <- 1 to eventCount) {
+ persister ! PersistWithAck(s"e-$i", probe.ref)
+ probe.expectMessage(Done)
+ }
+
+ // Now signal demand. The pub-sub source delivers the 3 events still in
+ // its buffer. Events that overflowed are recovered by the DB source
+ // (merged at lower priority). The deduplicate stage prevents duplicates
+ // for the events received via both paths.
+ result.request(eventCount.toLong)
+ val received = (1 to eventCount).map(_ => result.expectNext(30.seconds))
+ received.map(_.event).toSet shouldBe (1 to eventCount).map(i =>
s"e-$i").toSet
+
+ result.cancel()
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]