This is an automated email from the ASF dual-hosted git repository.

pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-r2dbc.git


The following commit(s) were added to refs/heads/main by this push:
     new f45f11f  Port missing tests and skipPubSubTooFarAhead from 
akka-persistence-r2dbc (#380)
f45f11f is described below

commit f45f11f33838c0feb04b78ba86d797181d78ec19
Author: PJ Fanning <[email protected]>
AuthorDate: Wed May 20 08:29:38 2026 +0100

    Port missing tests and skipPubSubTooFarAhead from akka-persistence-r2dbc 
(#380)
    
    * Add skipPubSubTooFarAhead to R2dbcReadJournal; add cleanup and 
MultiPlugin tests from akka repo
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/b15f893b-ba97-4ab6-91d4-59a55f5017dc
    
    Co-authored-by: pjfanning <[email protected]>
    
    * Update R2dbcReadJournal.scala
    
    * Fix EventSourcedPubSubSpec by reducing backtracking.behind-current-time; 
add skipPubSubTooFarAhead test to EventsBySlicePubSubSpec; add querySettings to 
TestDbLifecycle
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/c582f2eb-c468-46b7-a8a6-77c5ffe2decd
    
    Co-authored-by: pjfanning <[email protected]>
    
    * Fix flaky EventSourcedPubSubSpec: use PersistWithAck for pre-projection 
events
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/ad105d29-a215-4050-8f10-12eb32e40427
    
    Co-authored-by: pjfanning <[email protected]>
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 .../r2dbc/query/scaladsl/R2dbcReadJournal.scala    |  48 +++
 .../pekko/persistence/r2dbc/TestActors.scala       |   8 +
 .../pekko/persistence/r2dbc/TestDbLifecycle.scala  |   2 +
 .../cleanup/scaladsl/DurableStateCleanupSpec.scala | 125 ++++++++
 .../cleanup/scaladsl/EventSourcedCleanupSpec.scala | 347 +++++++++++++++++++++
 .../r2dbc/journal/MultiPluginSpec.scala            | 120 +++++++
 .../r2dbc/query/EventsBySlicePubSubSpec.scala      |  42 +++
 .../projection/r2dbc/EventSourcedPubSubSpec.scala  |   8 +-
 8 files changed, 697 insertions(+), 3 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
index a3e9ab3..556e1f8 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
@@ -14,6 +14,7 @@
 package org.apache.pekko.persistence.r2dbc.query.scaladsl
 
 import java.time.Instant
+import java.time.{ Duration => JDuration }
 
 import scala.collection.immutable
 import scala.collection.mutable
@@ -188,6 +189,8 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, 
config: Config, cfgPat
           }
       dbSource
         .mergePrioritized(pubSubSource, leftPriority = 1, rightPriority = 10)
+        .via(skipPubSubTooFarAhead(settings.backtrackingEnabled,
+          JDuration.ofMillis(settings.backtrackingWindow.toMillis)))
         .via(deduplicate(settings.deduplicateCapacity))
     } else
       dbSource
@@ -233,6 +236,51 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, 
config: Config, cfgPat
     }
   }
 
+  /**
+   * INTERNAL API
+   */
+  @InternalApi private[pekko] def skipPubSubTooFarAhead[Event](
+      enabled: Boolean,
+      maxAheadOfBacktracking: JDuration): Flow[EventEnvelope[Event], 
EventEnvelope[Event], NotUsed] = {
+    if (!enabled)
+      Flow[EventEnvelope[Event]]
+    else
+      Flow[EventEnvelope[Event]]
+        .statefulMapConcat(() => {
+          // track backtracking offset
+          var latestBacktracking = Instant.EPOCH
+          env => {
+            env.offset match {
+              case t: TimestampOffset =>
+                if (EnvelopeOrigin.fromBacktracking(env)) {
+                  latestBacktracking = t.timestamp
+                  env :: Nil
+                } else if (EnvelopeOrigin.fromPubSub(env) && 
latestBacktracking == Instant.EPOCH) {
+                  log.trace(
+                    "Dropping pubsub event for persistenceId [{}] seqNr [{}] 
because no event from backtracking yet.",
+                    env.persistenceId,
+                    env.sequenceNr: java.lang.Long)
+                  Nil
+                } else if (EnvelopeOrigin.fromPubSub(env) &&
+                  JDuration
+                    .between(latestBacktracking, t.timestamp)
+                    .compareTo(maxAheadOfBacktracking) > 0) {
+                  // drop from pubsub when too far ahead from backtracking
+                  log.debug(
+                    "Dropping pubsub event for persistenceId [{}] seqNr [{}] 
because too far ahead of backtracking.",
+                    env.persistenceId,
+                    env.sequenceNr: java.lang.Long)
+                  Nil
+                } else {
+                  env :: Nil
+                }
+              case _ =>
+                env :: Nil
+            }
+          }
+        })
+  }
+
   override def currentEventsByPersistenceId(
       persistenceId: String,
       fromSequenceNr: Long,
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestActors.scala 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestActors.scala
index f555d9a..9b848b5 100644
--- a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestActors.scala
+++ b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestActors.scala
@@ -37,6 +37,7 @@ object TestActors {
     final case class PersistAll(payloads: List[Any]) extends Command
     final case class Ping(replyTo: ActorRef[Done]) extends Command
     final case class GetState(replyTo: ActorRef[String]) extends Command
+    final case class GetSeqNr(replyTo: ActorRef[Long]) extends Command
     final case class Stop(replyTo: ActorRef[Done]) extends Command
 
     def apply(pid: String): Behavior[Command] =
@@ -102,6 +103,9 @@ object TestActors {
             case GetState(replyTo) =>
               replyTo ! state
               Effect.none
+            case GetSeqNr(replyTo) =>
+              replyTo ! EventSourcedBehavior.lastSequenceNumber(context)
+              Effect.none
             case Stop(replyTo) =>
               replyTo ! Done
               Effect.stop()
@@ -120,6 +124,7 @@ object TestActors {
     final case class DeleteWithAck(replyTo: ActorRef[Done]) extends Command
     final case class Ping(replyTo: ActorRef[Done]) extends Command
     final case class GetState(replyTo: ActorRef[Any]) extends Command
+    final case class GetRevision(replyTo: ActorRef[Long]) extends Command
     final case class Stop(replyTo: ActorRef[Done]) extends Command
 
     def apply(pid: String): Behavior[Command] =
@@ -153,6 +158,9 @@ object TestActors {
               case GetState(replyTo) =>
                 replyTo ! state
                 Effect.none
+              case GetRevision(replyTo) =>
+                replyTo ! DurableStateBehavior.lastSequenceNumber(context)
+                Effect.none
               case Ping(replyTo) =>
                 replyTo ! Done
                 Effect.none
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestDbLifecycle.scala 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestDbLifecycle.scala
index 5adfee8..8fb2865 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestDbLifecycle.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestDbLifecycle.scala
@@ -39,6 +39,8 @@ trait TestDbLifecycle extends BeforeAndAfterAll { this: Suite 
=>
 
   lazy val stateSettings: StateSettings = new 
StateSettings(config.getConfig(testConfigPath + ".state"))
 
+  lazy val querySettings: QuerySettings = 
QuerySettings(config.getConfig(testConfigPath + ".query"))
+
   // making sure that test harness does not initialize connection factory for 
the plugin that is being tested
   lazy val connectionFactoryProvider: ConnectionFactory =
     ConnectionFactoryProvider(typedSystem)
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanupSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanupSpec.scala
new file mode 100644
index 0000000..d10d3ad
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanupSpec.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 - 2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc.cleanup.scaladsl
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.testkit.typed.scaladsl.LogCapturing
+import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
+import pekko.actor.typed.ActorSystem
+import pekko.persistence.r2dbc.TestActors.DurableStatePersister
+import pekko.persistence.r2dbc.TestConfig
+import pekko.persistence.r2dbc.TestData
+import pekko.persistence.r2dbc.TestDbLifecycle
+import com.typesafe.config.ConfigFactory
+import org.scalatest.wordspec.AnyWordSpecLike
+
+object DurableStateCleanupSpec {
+  val config = ConfigFactory
+    .parseString(s"""
+    pekko.loglevel = DEBUG
+    pekko.persistence.r2dbc.cleanup {
+      log-progress-every = 2
+    }
+  """)
+    .withFallback(TestConfig.config)
+}
+
+class DurableStateCleanupSpec
+    extends ScalaTestWithActorTestKit(DurableStateCleanupSpec.config)
+    with AnyWordSpecLike
+    with TestDbLifecycle
+    with TestData
+    with LogCapturing {
+
+  override def typedSystem: ActorSystem[_] = system
+
+  "DurableStateCleanup" must {
+    "delete state for one persistenceId" in {
+      val ackProbe = createTestProbe[Done]()
+      val stateProbe = createTestProbe[Any]()
+      val revisionProbe = createTestProbe[Long]()
+      val pid = nextPid()
+      val p = spawn(DurableStatePersister(pid))
+
+      p ! DurableStatePersister.PersistWithAck("a", ackProbe.ref)
+      ackProbe.expectMessage(Done)
+
+      testKit.stop(p)
+
+      val cleanup = new DurableStateCleanup(system)
+      cleanup.deleteState(pid, resetRevisionNumber = true).futureValue
+
+      val p2 = spawn(DurableStatePersister(pid))
+      p2 ! DurableStatePersister.GetState(stateProbe.ref)
+      stateProbe.expectMessage("")
+      p2 ! DurableStatePersister.GetRevision(revisionProbe.ref)
+      revisionProbe.expectMessage(0L)
+    }
+
+    "delete events for one persistenceId, but keep seqNr" in {
+      val ackProbe = createTestProbe[Done]()
+      val stateProbe = createTestProbe[Any]()
+      val revisionProbe = createTestProbe[Long]()
+      val pid = nextPid()
+      val p = spawn(DurableStatePersister(pid))
+
+      p ! DurableStatePersister.PersistWithAck("a", ackProbe.ref)
+      ackProbe.expectMessage(Done)
+      p ! DurableStatePersister.PersistWithAck("b", ackProbe.ref)
+      ackProbe.expectMessage(Done)
+
+      testKit.stop(p)
+
+      val cleanup = new DurableStateCleanup(system)
+      cleanup.deleteState(pid, resetRevisionNumber = false).futureValue
+
+      val p2 = spawn(DurableStatePersister(pid))
+      p2 ! DurableStatePersister.GetState(stateProbe.ref)
+      stateProbe.expectMessage("")
+      p2 ! DurableStatePersister.GetRevision(revisionProbe.ref)
+      revisionProbe.expectMessage(3L)
+    }
+
+    "delete all" in {
+      val ackProbe = createTestProbe[Done]()
+      val stateProbe = createTestProbe[Any]()
+      val seqNrProbe = createTestProbe[Long]()
+      val pids = Vector(nextPid(), nextPid(), nextPid())
+      val persisters = pids.map(pid => spawn(DurableStatePersister(pid)))
+
+      (1 to 3).foreach { n =>
+        persisters.foreach { p =>
+          p ! DurableStatePersister.PersistWithAck(s"$n", ackProbe.ref)
+          ackProbe.expectMessage(Done)
+        }
+      }
+
+      persisters.foreach(testKit.stop(_))
+
+      val cleanup = new DurableStateCleanup(system)
+      cleanup.deleteStates(pids, resetRevisionNumber = true).futureValue
+
+      val persisters2 = pids.map(pid => spawn(DurableStatePersister(pid)))
+      persisters2.foreach { p =>
+        p ! DurableStatePersister.GetState(stateProbe.ref)
+        stateProbe.expectMessage("")
+        p ! DurableStatePersister.GetRevision(seqNrProbe.ref)
+        seqNrProbe.expectMessage(0L)
+      }
+    }
+
+  }
+
+}
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/cleanup/scaladsl/EventSourcedCleanupSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/cleanup/scaladsl/EventSourcedCleanupSpec.scala
new file mode 100644
index 0000000..91b96fa
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/cleanup/scaladsl/EventSourcedCleanupSpec.scala
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 - 2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc.cleanup.scaladsl
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.testkit.typed.scaladsl.LogCapturing
+import pekko.actor.testkit.typed.scaladsl.LoggingTestKit
+import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
+import pekko.actor.typed.ActorSystem
+import pekko.actor.typed.scaladsl.Behaviors
+import pekko.persistence.r2dbc.TestActors.Persister
+import pekko.persistence.r2dbc.TestConfig
+import pekko.persistence.r2dbc.TestData
+import pekko.persistence.r2dbc.TestDbLifecycle
+import pekko.persistence.typed.PersistenceId
+import com.typesafe.config.ConfigFactory
+import org.scalatest.wordspec.AnyWordSpecLike
+import org.slf4j.event.Level
+
+object EventSourcedCleanupSpec {
+  val config = ConfigFactory
+    .parseString(s"""
+    pekko.loglevel = DEBUG
+    pekko.persistence.r2dbc.cleanup {
+      log-progress-every = 2
+      events-journal-delete-batch-size = 10
+    }
+  """)
+    .withFallback(TestConfig.config)
+}
+
+class EventSourcedCleanupSpec
+    extends ScalaTestWithActorTestKit(EventSourcedCleanupSpec.config)
+    with AnyWordSpecLike
+    with TestDbLifecycle
+    with TestData
+    with LogCapturing {
+
+  override def typedSystem: ActorSystem[_] = system
+
+  "EventSourcedCleanup" must {
+    "delete events for one persistenceId" in {
+      val ackProbe = createTestProbe[Done]()
+      val stateProbe = createTestProbe[String]()
+      val seqNrProbe = createTestProbe[Long]()
+      val pid = nextPid()
+      val p = spawn(Persister(pid))
+
+      (1 to 10).foreach { n =>
+        p ! Persister.PersistWithAck(n, ackProbe.ref)
+        ackProbe.expectMessage(Done)
+      }
+
+      testKit.stop(p)
+
+      val cleanup = new EventSourcedCleanup(system)
+      cleanup.deleteAllEvents(pid, resetSequenceNumber = true).futureValue
+
+      val p2 = spawn(Persister(pid))
+      p2 ! Persister.GetState(stateProbe.ref)
+      stateProbe.expectMessage("")
+      p2 ! Persister.GetSeqNr(seqNrProbe.ref)
+      seqNrProbe.expectMessage(0L)
+    }
+
+    "delete events for one persistenceId in batches" in {
+      val ackProbe = createTestProbe[Done]()
+      val stateProbe = createTestProbe[String]()
+      val seqNrProbe = createTestProbe[Long]()
+      val pid = nextPid()
+      val p = spawn(Persister(pid))
+
+      val maxSeqNumber = 47
+      (1 to maxSeqNumber).foreach { n =>
+        p ! Persister.PersistWithAck(n, ackProbe.ref)
+        ackProbe.expectMessage(Done)
+      }
+
+      testKit.stop(p)
+
+      val cleanup = new EventSourcedCleanup(system)
+
+      var iteration = 0
+      val batchSize = 10
+
+      LoggingTestKit
+        .info("Deleted")
+        .withLogLevel(Level.DEBUG)
+        .withOccurrences(5)
+        .withCustom { event =>
+          val from = (iteration * batchSize) + 1
+          iteration = iteration + 1
+          val to = Math.min(maxSeqNumber, from + batchSize - 1)
+          val deleted = to - from + 1
+          val expectedMsg = s"Deleted [$deleted] events for persistenceId 
[$pid], from seq num [$from] to [$to]"
+          event.message == expectedMsg
+        }
+        .expect {
+          cleanup.deleteAllEvents(pid, resetSequenceNumber = true).futureValue
+        }
+
+      val p2 = spawn(Persister(pid))
+      p2 ! Persister.GetState(stateProbe.ref)
+      stateProbe.expectMessage("")
+      p2 ! Persister.GetSeqNr(seqNrProbe.ref)
+      seqNrProbe.expectMessage(0L)
+    }
+
+    "delete events for one persistenceId, but keep seqNr" in {
+      val ackProbe = createTestProbe[Done]()
+      val stateProbe = createTestProbe[String]()
+      val seqNrProbe = createTestProbe[Long]()
+      val pid = nextPid()
+      val p = spawn(Persister(pid))
+
+      (1 to 10).foreach { n =>
+        p ! Persister.PersistWithAck(n, ackProbe.ref)
+        ackProbe.expectMessage(Done)
+      }
+
+      testKit.stop(p)
+
+      val cleanup = new EventSourcedCleanup(system)
+      cleanup.deleteAllEvents(pid, resetSequenceNumber = false).futureValue
+
+      val p2 = spawn(Persister(pid))
+      p2 ! Persister.GetState(stateProbe.ref)
+      stateProbe.expectMessage("")
+      p2 ! Persister.GetSeqNr(seqNrProbe.ref)
+      seqNrProbe.expectMessage(10L)
+    }
+
+    "delete events for one persistenceId in batches, but keep seqNr" in {
+      val ackProbe = createTestProbe[Done]()
+      val stateProbe = createTestProbe[String]()
+      val seqNrProbe = createTestProbe[Long]()
+      val pid = nextPid()
+      val p = spawn(Persister(pid))
+
+      val maxSeqNumber = 47
+      (1 to maxSeqNumber).foreach { n =>
+        p ! Persister.PersistWithAck(n, ackProbe.ref)
+        ackProbe.expectMessage(Done)
+      }
+
+      testKit.stop(p)
+
+      val cleanup = new EventSourcedCleanup(system)
+
+      var iteration = 0
+      val batchSize = 10
+
+      LoggingTestKit
+        .info("Deleted")
+        .withLogLevel(Level.DEBUG)
+        .withOccurrences(5)
+        .withCustom { event =>
+          val from = (iteration * batchSize) + 1
+          iteration = iteration + 1
+          val to = Math.min(maxSeqNumber, from + batchSize - 1)
+          val deleted = to - from + 1
+          val expectedMsg = s"Deleted [$deleted] events for persistenceId 
[$pid], from seq num [$from] to [$to]"
+          event.message == expectedMsg
+        }
+        .expect {
+          cleanup.deleteAllEvents(pid, resetSequenceNumber = false).futureValue
+        }
+
+      val p2 = spawn(Persister(pid))
+      p2 ! Persister.GetState(stateProbe.ref)
+      stateProbe.expectMessage("")
+      p2 ! Persister.GetSeqNr(seqNrProbe.ref)
+      seqNrProbe.expectMessage(maxSeqNumber.toLong)
+    }
+
+    "delete some for one persistenceId" in {
+      val ackProbe = createTestProbe[Done]()
+      val stateProbe = createTestProbe[String]()
+      val seqNrProbe = createTestProbe[Long]()
+      val pid = nextPid()
+      val p = spawn(Persister(pid))
+
+      (1 to 8).foreach { n =>
+        p ! Persister.PersistWithAck(n, ackProbe.ref)
+        ackProbe.expectMessage(Done)
+      }
+
+      testKit.stop(p)
+
+      val cleanup = new EventSourcedCleanup(system)
+      cleanup.deleteEventsTo(pid, 5).futureValue
+
+      val p2 = spawn(Persister(pid))
+      p2 ! Persister.GetState(stateProbe.ref)
+      stateProbe.expectMessage("6|7|8")
+      p2 ! Persister.GetSeqNr(seqNrProbe.ref)
+      seqNrProbe.expectMessage(8L)
+    }
+
+    "delete snapshots for one persistenceId" in {
+      val ackProbe = createTestProbe[Done]()
+      val stateProbe = createTestProbe[String]()
+      val pid = nextPid()
+      val p = spawn(Behaviors.setup[Persister.Command] { context =>
+        Persister
+          .eventSourcedBehavior(PersistenceId.ofUniqueId(pid), context)
+          .snapshotWhen((_, event, _) => event.toString.contains("snap"))
+      })
+
+      (1 to 10).foreach { n =>
+        p ! Persister.PersistWithAck(s"${if (n == 3) n + "-snap" else n}", 
ackProbe.ref)
+        ackProbe.expectMessage(Done)
+      }
+
+      testKit.stop(p)
+
+      val cleanup = new EventSourcedCleanup(system)
+      cleanup.deleteAllEvents(pid, resetSequenceNumber = false).futureValue
+
+      val p2 = spawn(Persister(pid))
+      p2 ! Persister.GetState(stateProbe.ref)
+      stateProbe.expectMessage("1|2|3-snap")
+      testKit.stop(p2)
+
+      cleanup.deleteSnapshot(pid).futureValue
+
+      val p3 = spawn(Persister(pid))
+      p3 ! Persister.GetState(stateProbe.ref)
+      stateProbe.expectMessage("")
+    }
+
+    "cleanup before snapshot" in {
+      val ackProbe = createTestProbe[Done]()
+      val stateProbe = createTestProbe[String]()
+      val pid = nextPid()
+      val p = spawn(Behaviors.setup[Persister.Command] { context =>
+        Persister
+          .eventSourcedBehavior(PersistenceId.ofUniqueId(pid), context)
+          .snapshotWhen((_, event, _) => event.toString.contains("snap"))
+      })
+
+      (1 to 10).foreach { n =>
+        p ! Persister.PersistWithAck(s"${if (n == 3) n + "-snap" else n}", 
ackProbe.ref)
+        ackProbe.expectMessage(Done)
+      }
+
+      testKit.stop(p)
+
+      val cleanup = new EventSourcedCleanup(system)
+      cleanup.cleanupBeforeSnapshot(pid).futureValue
+
+      val p2 = spawn(Persister(pid))
+      p2 ! Persister.GetState(stateProbe.ref)
+      stateProbe.expectMessage("1|2|3-snap|4|5|6|7|8|9|10")
+      testKit.stop(p2)
+
+      cleanup.deleteSnapshot(pid).futureValue
+
+      val p3 = spawn(Persister(pid))
+      p3 ! Persister.GetState(stateProbe.ref)
+      // from replaying remaining events
+      stateProbe.expectMessage("4|5|6|7|8|9|10")
+    }
+
+    "delete all" in {
+      val ackProbe = createTestProbe[Done]()
+      val stateProbe = createTestProbe[String]()
+      val seqNrProbe = createTestProbe[Long]()
+      val pids = Vector(nextPid(), nextPid(), nextPid())
+      val persisters =
+        pids.map { pid =>
+          spawn(Behaviors.setup[Persister.Command] { context =>
+            Persister
+              .eventSourcedBehavior(PersistenceId.ofUniqueId(pid), context)
+              .snapshotWhen((_, event, _) => event.toString.contains("snap"))
+          })
+        }
+
+      (1 to 10).foreach { n =>
+        persisters.foreach { p =>
+          p ! Persister.PersistWithAck(s"${if (n == 3) n + "-snap" else n}", 
ackProbe.ref)
+          ackProbe.expectMessage(Done)
+        }
+      }
+
+      persisters.foreach(testKit.stop(_))
+
+      val cleanup = new EventSourcedCleanup(system)
+      cleanup.deleteAll(pids, resetSequenceNumber = true).futureValue
+
+      val persisters2 = pids.map(pid => spawn(Persister(pid)))
+      persisters2.foreach { p =>
+        p ! Persister.GetState(stateProbe.ref)
+        stateProbe.expectMessage("")
+        p ! Persister.GetSeqNr(seqNrProbe.ref)
+        seqNrProbe.expectMessage(0L)
+      }
+    }
+
+    "cleanup all before snapshot" in {
+      val ackProbe = createTestProbe[Done]()
+      val stateProbe = createTestProbe[String]()
+      val pids = Vector(nextPid(), nextPid(), nextPid())
+      val persisters =
+        pids.map { pid =>
+          spawn(Behaviors.setup[Persister.Command] { context =>
+            Persister
+              .eventSourcedBehavior(PersistenceId.ofUniqueId(pid), context)
+              .snapshotWhen((_, event, _) => event.toString.contains("snap"))
+          })
+        }
+
+      (1 to 10).foreach { n =>
+        persisters.foreach { p =>
+          p ! Persister.PersistWithAck(s"${if (n == 3) n + "-snap" else n}", 
ackProbe.ref)
+          ackProbe.expectMessage(Done)
+        }
+      }
+
+      persisters.foreach(testKit.stop(_))
+
+      val cleanup = new EventSourcedCleanup(system)
+      cleanup.cleanupBeforeSnapshot(pids).futureValue
+      cleanup.deleteSnapshots(pids).futureValue
+
+      val persisters2 = pids.map(pid => spawn(Persister(pid)))
+      persisters2.foreach { p =>
+        p ! Persister.GetState(stateProbe.ref)
+        // from replaying remaining events
+        stateProbe.expectMessage("4|5|6|7|8|9|10")
+      }
+    }
+  }
+
+}
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/MultiPluginSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/MultiPluginSpec.scala
new file mode 100644
index 0000000..c45abaa
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/MultiPluginSpec.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 - 2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc.journal
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.testkit.typed.scaladsl.LogCapturing
+import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
+import pekko.actor.typed.ActorRef
+import pekko.actor.typed.ActorSystem
+import pekko.persistence.r2dbc.TestConfig
+import pekko.persistence.r2dbc.TestData
+import pekko.persistence.r2dbc.TestDbLifecycle
+import pekko.persistence.typed.PersistenceId
+import pekko.persistence.typed.scaladsl.Effect
+import pekko.persistence.typed.scaladsl.EventSourcedBehavior
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+import org.scalatest.wordspec.AnyWordSpecLike
+
+object MultiPluginSpec {
+  val config: Config = ConfigFactory
+    .parseString("""
+    // #default-config
+    pekko.persistence.journal.plugin = "pekko.persistence.r2dbc.journal"
+    pekko.persistence.snapshot-store.plugin = 
"pekko.persistence.r2dbc.snapshot"
+    pekko.persistence.state.plugin = "pekko.persistence.r2dbc.state"
+    // #default-config
+
+    // #second-config
+    second-r2dbc = ${pekko.persistence.r2dbc}
+    second-r2dbc {
+      connection-factory {
+        # specific connection properties here
+      }
+      journal {
+        # specific journal properties here
+      }
+      snapshot {
+        # specific snapshot properties here
+      }
+      state {
+        # specific durable state properties here
+      }
+      query {
+        # specific query properties here
+      }
+    }
+    // #second-config
+    """)
+    .withFallback(TestConfig.config)
+    .resolve()
+
+  object MyEntity {
+    sealed trait Command
+    final case class Persist(payload: String, replyTo: ActorRef[State]) 
extends Command
+    type Event = String
+    object State {
+      def apply(): State = ""
+    }
+    type State = String
+
+    def commandHandler(state: State, cmd: Command): Effect[Event, State] = {
+      cmd match {
+        case Persist(payload, replyTo) =>
+          Effect
+            .persist(payload)
+            .thenReply(replyTo)(newState => newState)
+      }
+    }
+
+    def eventHandler(state: State, evt: Event): State =
+      state + evt
+
+    def apply(persistenceId: PersistenceId): EventSourcedBehavior[Command, 
Event, State] = {
+      // #withPlugins
+      EventSourcedBehavior(persistenceId, emptyState = State(), 
commandHandler, eventHandler)
+        .withJournalPluginId("second-r2dbc.journal")
+        .withSnapshotPluginId("second-r2dbc.snapshot")
+      // #withPlugins
+    }
+  }
+}
+
+class MultiPluginSpec
+    extends ScalaTestWithActorTestKit(MultiPluginSpec.config)
+    with AnyWordSpecLike
+    with TestDbLifecycle
+    with TestData
+    with LogCapturing {
+
+  import MultiPluginSpec.MyEntity
+
+  override def typedSystem: ActorSystem[_] = system
+
+  "Addition plugin config" should {
+
+    "be supported for EventSourcedBehavior" in {
+      val probe = createTestProbe[MyEntity.State]()
+      val pid = PersistenceId.ofUniqueId(nextPid(nextEntityType()))
+      val ref = spawn(MyEntity(pid))
+      ref ! MyEntity.Persist("a", probe.ref)
+      probe.expectMessage("a")
+      ref ! MyEntity.Persist("b", probe.ref)
+      probe.expectMessage("ab")
+    }
+
+  }
+}
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
index 660bdda..6f754cb 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
@@ -14,6 +14,7 @@
 package org.apache.pekko.persistence.r2dbc.query
 
 import java.time.Instant
+import java.time.{ Duration => JDuration }
 
 import scala.concurrent.Await
 import scala.concurrent.duration._
@@ -45,6 +46,7 @@ import pekko.stream.scaladsl.Sink
 import pekko.stream.scaladsl.Source
 import pekko.stream.testkit.TestSubscriber
 import pekko.stream.testkit.scaladsl.TestSink
+import pekko.stream.testkit.scaladsl.TestSource
 import pekko.stream.typed.scaladsl.ActorFlow
 import com.typesafe.config.Config
 import com.typesafe.config.ConfigFactory
@@ -204,6 +206,46 @@ class EventsBySlicePubSubSpec
       out shouldBe List(envA1, envA2, envA3, envB1, envA1, envB2) // envA1 was 
evicted and therefore duplicate
     }
 
+    "skipPubSubTooFarAhead" in {
+      val backtrackingWindow = 
JDuration.ofMillis(querySettings.backtrackingWindow.toMillis)
+      val (in, out) =
+        TestSource[EventEnvelope[String]]()
+          .via(
+            query.skipPubSubTooFarAhead(
+              enabled = true,
+              maxAheadOfBacktracking = backtrackingWindow))
+          .toMat(TestSink[EventEnvelope[String]]())(Keep.both)
+          .run()
+      out.request(100)
+      in.sendNext(envA1)
+      in.sendNext(envA2)
+
+      // all pubsub events dropped before the first backtracking event
+      out.expectNoMessage()
+
+      val pidC = PersistenceId(entityType, "C")
+      in.sendNext(backtrackingEnvelope(envA1))
+      out.expectNext(backtrackingEnvelope(envA1))
+      // now the pubsub event is passed through
+      in.sendNext(envB1)
+      out.expectNext(envB1)
+
+      val time2 = envA1.offset
+        .asInstanceOf[TimestampOffset]
+        .timestamp
+        .plusMillis(backtrackingWindow.toMillis)
+      val envC1 = createEnvelope(pidC, 1L, "c1", time2.plusMillis(1))
+      val envC2 = createEnvelope(pidC, 2L, "c2", time2.plusMillis(2))
+      in.sendNext(envC1)
+      // dropped because > backtrackingWindow
+      out.expectNoMessage()
+
+      in.sendNext(backtrackingEnvelope(envB1))
+      out.expectNext(backtrackingEnvelope(envB1))
+      in.sendNext(envC2)
+      out.expectNext(envC2)
+    }
+
     "dynamically enable/disable publishing based on throughput" in new Setup {
       import pekko.actor.typed.scaladsl.adapter._
 
diff --git 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedPubSubSpec.scala
 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedPubSubSpec.scala
index 338cf42..b31ae09 100644
--- 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedPubSubSpec.scala
+++ 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedPubSubSpec.scala
@@ -53,7 +53,7 @@ object EventSourcedPubSubSpec {
           buffer-size = 10
 
           backtracking {
-            behind-current-time = 5 seconds
+            behind-current-time = 100 millis
             window = 20 seconds
           }
       }
@@ -175,11 +175,13 @@ class EventSourcedPubSubSpec
         spawn(Persister(persistenceId), s"p$n")
       }
 
-      // write some before starting the projections
+      // write some before starting the projections, with ack to ensure they 
are all in the DB
+      val ackProbe = createTestProbe[Done]()
       (1 to 10).foreach { n =>
         val p = n % numberOfEntities
-        entities(p) ! Persister.Persist(mkEvent(n))
+        entities(p) ! Persister.PersistWithAck(mkEvent(n), ackProbe.ref)
       }
+      (1 to 10).foreach { _ => ackProbe.receiveMessage(10.seconds) }
 
       val projectionName = UUID.randomUUID().toString
       val processedProbe = createTestProbe[Processed]()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to