This is an automated email from the ASF dual-hosted git repository.

pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-r2dbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 880ba59  Replace deprecated OverflowStrategy.dropNew in 
R2dbcReadJournal (#365)
880ba59 is described below

commit 880ba59378945e157c88a3c80bd7b451e1af72f1
Author: PJ Fanning <[email protected]>
AuthorDate: Thu May 7 20:25:14 2026 +0100

    Replace deprecated OverflowStrategy.dropNew in R2dbcReadJournal (#365)
    
    * fix: replace deprecated OverflowStrategy.dropNew with dropHead in 
R2dbcReadJournal
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/e193ad7b-6d57-4550-92f2-f6ad7fa72e89
    
    Co-authored-by: pjfanning <[email protected]>
    
    * fix: add comment explaining why dropHead is acceptable for pub/sub 
overflow
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/e193ad7b-6d57-4550-92f2-f6ad7fa72e89
    
    Co-authored-by: pjfanning <[email protected]>
    
    * Update comment on OverflowStrategy deprecation
    
    Clarify deprecation notice for OverflowStrategy.dropNew.
    
    * test: add EventsBySlicePubSubOverflowSpec to verify no events lost on 
pub-sub buffer overflow
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/781d3606-377d-4bff-8497-4f8f6643cb8c
    
    Co-authored-by: pjfanning <[email protected]>
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 .../r2dbc/query/scaladsl/R2dbcReadJournal.scala    |   6 +-
 .../query/EventsBySlicePubSubOverflowSpec.scala    | 123 +++++++++++++++++++++
 2 files changed, 128 insertions(+), 1 deletion(-)

diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
index 2f8de36..d951a66 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
@@ -164,7 +164,11 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, 
config: Config, cfgPat
             completionMatcher = PartialFunction.empty,
             failureMatcher = PartialFunction.empty,
             bufferSize = settings.bufferSize,
-            overflowStrategy = OverflowStrategy.dropNew)
+            // dropHead drops the oldest buffered event when full; any dropped 
pub/sub events are
+            // recovered from the database source which is merged below, and 
deduplication handles
+            // any events received via both paths.
+            // OverflowStrategy.dropNew is long deprecated and removed in 
Pekko 2.0.0.
+            overflowStrategy = OverflowStrategy.dropHead)
           .mapMaterializedValue { ref =>
             (minSlice to maxSlice).foreach { slice =>
               import pekko.actor.typed.scaladsl.adapter._
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubOverflowSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubOverflowSpec.scala
new file mode 100644
index 0000000..2d27c30
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubOverflowSpec.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc.query
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.testkit.typed.scaladsl.LogCapturing
+import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
+import pekko.actor.typed.ActorSystem
+import pekko.actor.typed.internal.pubsub.TopicImpl
+import pekko.persistence.query.NoOffset
+import pekko.persistence.query.PersistenceQuery
+import pekko.persistence.query.typed.EventEnvelope
+import pekko.persistence.r2dbc.TestActors
+import pekko.persistence.r2dbc.TestActors.Persister.PersistWithAck
+import pekko.persistence.r2dbc.TestConfig
+import pekko.persistence.r2dbc.TestData
+import pekko.persistence.r2dbc.TestDbLifecycle
+import pekko.persistence.r2dbc.internal.PubSub
+import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
+import pekko.stream.testkit.TestSubscriber
+import pekko.stream.testkit.scaladsl.TestSink
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+import org.scalatest.wordspec.AnyWordSpecLike
+
+import scala.concurrent.duration._
+
+object EventsBySlicePubSubOverflowSpec {
+  def config: Config =
+    ConfigFactory.load(
+      ConfigFactory
+        .parseString("""
+    pekko.persistence.r2dbc {
+      journal.publish-events = on
+      query {
+        # Small buffer so that the pub-sub overflow strategy is forced to kick 
in
+        # when the consumer is not pulling (no downstream demand).
+        buffer-size = 3
+        # Enable deduplication so that events delivered via both pub-sub and
+        # the DB recovery source are not emitted twice.
+        deduplicate-capacity = 100
+      }
+    }
+    pekko.actor.testkit.typed.filter-leeway = 20.seconds
+    """)
+        
.withFallback(TestConfig.backtrackingDisabledConfig.withFallback(TestConfig.unresolvedConfig))
+    )
+}
+
+class EventsBySlicePubSubOverflowSpec
+    extends ScalaTestWithActorTestKit(EventsBySlicePubSubOverflowSpec.config)
+    with AnyWordSpecLike
+    with TestDbLifecycle
+    with TestData
+    with LogCapturing {
+
+  override def typedSystem: ActorSystem[_] = system
+
+  private val query =
+    
PersistenceQuery(testKit.system).readJournalFor[R2dbcReadJournal](R2dbcReadJournal.Identifier)
+
+  "EventsBySlices pub-sub" should {
+
+    "not lose events when the pub-sub buffer overflows and the DB source 
recovers them" in {
+      val entityType = nextEntityType()
+      val persistenceId = nextPid(entityType)
+      val slice = query.sliceForPersistenceId(persistenceId)
+      val persister = spawn(TestActors.Persister(persistenceId))
+      val probe = createTestProbe[Done]()
+      val sinkProbe = TestSink[EventEnvelope[String]]()(system.classicSystem)
+
+      // Start the query but do NOT request any elements (no downstream 
demand).
+      // The pub-sub actorRef source pushes events into its buffer regardless 
of
+      // downstream demand. With buffer-size=3 and 20 incoming events the 
buffer
+      // overflows: OverflowStrategy.dropHead evicts the oldest buffered event 
on
+      // each overflow, so only the 3 most-recent events survive in the buffer.
+      val result: TestSubscriber.Probe[EventEnvelope[String]] =
+        query
+          .eventsBySlices[String](entityType, slice, slice, NoOffset)
+          .runWith(sinkProbe)
+
+      // Wait for the pub-sub subscription to be established before persisting.
+      val topicStatsProbe = createTestProbe[TopicImpl.TopicStats]()
+      eventually {
+        PubSub(typedSystem).eventTopic[String](entityType, slice) ! 
TopicImpl.GetTopicStats(topicStatsProbe.ref)
+        topicStatsProbe.receiveMessage().localSubscriberCount shouldBe 1
+      }
+
+      // Persist many more events than the buffer-size (3).
+      // All events are written to the DB and published to pub-sub.
+      // After the 3rd event the pub-sub buffer is full: each subsequent push
+      // evicts the oldest buffered event (dropHead), so the buffer will hold
+      // only the 3 most-recently published events when we start consuming.
+      val eventCount = 20
+      for (i <- 1 to eventCount) {
+        persister ! PersistWithAck(s"e-$i", probe.ref)
+        probe.expectMessage(Done)
+      }
+
+      // Now signal demand. The pub-sub source delivers the 3 events still in
+      // its buffer. Events that overflowed are recovered by the DB source
+      // (merged at lower priority). The deduplicate stage prevents duplicates
+      // for the events received via both paths.
+      result.request(eventCount.toLong)
+      val received = (1 to eventCount).map(_ => result.expectNext(30.seconds))
+      received.map(_.event).toSet shouldBe (1 to eventCount).map(i => 
s"e-$i").toSet
+
+      result.cancel()
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to