This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-r2dbc.git
The following commit(s) were added to refs/heads/main by this push:
new f45f11f Port missing tests and skipPubSubTooFarAhead from
akka-persistence-r2dbc (#380)
f45f11f is described below
commit f45f11f33838c0feb04b78ba86d797181d78ec19
Author: PJ Fanning <[email protected]>
AuthorDate: Wed May 20 08:29:38 2026 +0100
Port missing tests and skipPubSubTooFarAhead from akka-persistence-r2dbc
(#380)
* Add skipPubSubTooFarAhead to R2dbcReadJournal; add cleanup and
MultiPlugin tests from akka repo
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/b15f893b-ba97-4ab6-91d4-59a55f5017dc
Co-authored-by: pjfanning <[email protected]>
* Update R2dbcReadJournal.scala
* Fix EventSourcedPubSubSpec by reducing backtracking.behind-current-time;
add skipPubSubTooFarAhead test to EventsBySlicePubSubSpec; add querySettings to
TestDbLifecycle
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/c582f2eb-c468-46b7-a8a6-77c5ffe2decd
Co-authored-by: pjfanning <[email protected]>
* Fix flaky EventSourcedPubSubSpec: use PersistWithAck for pre-projection
events
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/ad105d29-a215-4050-8f10-12eb32e40427
Co-authored-by: pjfanning <[email protected]>
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
.../r2dbc/query/scaladsl/R2dbcReadJournal.scala | 48 +++
.../pekko/persistence/r2dbc/TestActors.scala | 8 +
.../pekko/persistence/r2dbc/TestDbLifecycle.scala | 2 +
.../cleanup/scaladsl/DurableStateCleanupSpec.scala | 125 ++++++++
.../cleanup/scaladsl/EventSourcedCleanupSpec.scala | 347 +++++++++++++++++++++
.../r2dbc/journal/MultiPluginSpec.scala | 120 +++++++
.../r2dbc/query/EventsBySlicePubSubSpec.scala | 42 +++
.../projection/r2dbc/EventSourcedPubSubSpec.scala | 8 +-
8 files changed, 697 insertions(+), 3 deletions(-)
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
index a3e9ab3..556e1f8 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
@@ -14,6 +14,7 @@
package org.apache.pekko.persistence.r2dbc.query.scaladsl
import java.time.Instant
+import java.time.{ Duration => JDuration }
import scala.collection.immutable
import scala.collection.mutable
@@ -188,6 +189,8 @@ final class R2dbcReadJournal(system: ExtendedActorSystem,
config: Config, cfgPat
}
dbSource
.mergePrioritized(pubSubSource, leftPriority = 1, rightPriority = 10)
+ .via(skipPubSubTooFarAhead(settings.backtrackingEnabled,
+ JDuration.ofMillis(settings.backtrackingWindow.toMillis)))
.via(deduplicate(settings.deduplicateCapacity))
} else
dbSource
@@ -233,6 +236,51 @@ final class R2dbcReadJournal(system: ExtendedActorSystem,
config: Config, cfgPat
}
}
+ /**
+ * INTERNAL API
+ */
+ @InternalApi private[pekko] def skipPubSubTooFarAhead[Event](
+ enabled: Boolean,
+ maxAheadOfBacktracking: JDuration): Flow[EventEnvelope[Event],
EventEnvelope[Event], NotUsed] = {
+ if (!enabled)
+ Flow[EventEnvelope[Event]]
+ else
+ Flow[EventEnvelope[Event]]
+ .statefulMapConcat(() => {
+ // track backtracking offset
+ var latestBacktracking = Instant.EPOCH
+ env => {
+ env.offset match {
+ case t: TimestampOffset =>
+ if (EnvelopeOrigin.fromBacktracking(env)) {
+ latestBacktracking = t.timestamp
+ env :: Nil
+ } else if (EnvelopeOrigin.fromPubSub(env) &&
latestBacktracking == Instant.EPOCH) {
+ log.trace(
+ "Dropping pubsub event for persistenceId [{}] seqNr [{}]
because no event from backtracking yet.",
+ env.persistenceId,
+ env.sequenceNr: java.lang.Long)
+ Nil
+ } else if (EnvelopeOrigin.fromPubSub(env) &&
+ JDuration
+ .between(latestBacktracking, t.timestamp)
+ .compareTo(maxAheadOfBacktracking) > 0) {
+ // drop from pubsub when too far ahead from backtracking
+ log.debug(
+ "Dropping pubsub event for persistenceId [{}] seqNr [{}]
because too far ahead of backtracking.",
+ env.persistenceId,
+ env.sequenceNr: java.lang.Long)
+ Nil
+ } else {
+ env :: Nil
+ }
+ case _ =>
+ env :: Nil
+ }
+ }
+ })
+ }
+
override def currentEventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestActors.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestActors.scala
index f555d9a..9b848b5 100644
--- a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestActors.scala
+++ b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestActors.scala
@@ -37,6 +37,7 @@ object TestActors {
final case class PersistAll(payloads: List[Any]) extends Command
final case class Ping(replyTo: ActorRef[Done]) extends Command
final case class GetState(replyTo: ActorRef[String]) extends Command
+ final case class GetSeqNr(replyTo: ActorRef[Long]) extends Command
final case class Stop(replyTo: ActorRef[Done]) extends Command
def apply(pid: String): Behavior[Command] =
@@ -102,6 +103,9 @@ object TestActors {
case GetState(replyTo) =>
replyTo ! state
Effect.none
+ case GetSeqNr(replyTo) =>
+ replyTo ! EventSourcedBehavior.lastSequenceNumber(context)
+ Effect.none
case Stop(replyTo) =>
replyTo ! Done
Effect.stop()
@@ -120,6 +124,7 @@ object TestActors {
final case class DeleteWithAck(replyTo: ActorRef[Done]) extends Command
final case class Ping(replyTo: ActorRef[Done]) extends Command
final case class GetState(replyTo: ActorRef[Any]) extends Command
+ final case class GetRevision(replyTo: ActorRef[Long]) extends Command
final case class Stop(replyTo: ActorRef[Done]) extends Command
def apply(pid: String): Behavior[Command] =
@@ -153,6 +158,9 @@ object TestActors {
case GetState(replyTo) =>
replyTo ! state
Effect.none
+ case GetRevision(replyTo) =>
+ replyTo ! DurableStateBehavior.lastSequenceNumber(context)
+ Effect.none
case Ping(replyTo) =>
replyTo ! Done
Effect.none
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestDbLifecycle.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestDbLifecycle.scala
index 5adfee8..8fb2865 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestDbLifecycle.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestDbLifecycle.scala
@@ -39,6 +39,8 @@ trait TestDbLifecycle extends BeforeAndAfterAll { this: Suite
=>
lazy val stateSettings: StateSettings = new
StateSettings(config.getConfig(testConfigPath + ".state"))
+ lazy val querySettings: QuerySettings =
QuerySettings(config.getConfig(testConfigPath + ".query"))
+
// making sure that test harness does not initialize connection factory for
the plugin that is being tested
lazy val connectionFactoryProvider: ConnectionFactory =
ConnectionFactoryProvider(typedSystem)
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanupSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanupSpec.scala
new file mode 100644
index 0000000..d10d3ad
--- /dev/null
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanupSpec.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 - 2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc.cleanup.scaladsl
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.testkit.typed.scaladsl.LogCapturing
+import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
+import pekko.actor.typed.ActorSystem
+import pekko.persistence.r2dbc.TestActors.DurableStatePersister
+import pekko.persistence.r2dbc.TestConfig
+import pekko.persistence.r2dbc.TestData
+import pekko.persistence.r2dbc.TestDbLifecycle
+import com.typesafe.config.ConfigFactory
+import org.scalatest.wordspec.AnyWordSpecLike
+
+object DurableStateCleanupSpec {
+ val config = ConfigFactory
+ .parseString(s"""
+ pekko.loglevel = DEBUG
+ pekko.persistence.r2dbc.cleanup {
+ log-progress-every = 2
+ }
+ """)
+ .withFallback(TestConfig.config)
+}
+
+class DurableStateCleanupSpec
+ extends ScalaTestWithActorTestKit(DurableStateCleanupSpec.config)
+ with AnyWordSpecLike
+ with TestDbLifecycle
+ with TestData
+ with LogCapturing {
+
+ override def typedSystem: ActorSystem[_] = system
+
+ "DurableStateCleanup" must {
+ "delete state for one persistenceId" in {
+ val ackProbe = createTestProbe[Done]()
+ val stateProbe = createTestProbe[Any]()
+ val revisionProbe = createTestProbe[Long]()
+ val pid = nextPid()
+ val p = spawn(DurableStatePersister(pid))
+
+ p ! DurableStatePersister.PersistWithAck("a", ackProbe.ref)
+ ackProbe.expectMessage(Done)
+
+ testKit.stop(p)
+
+ val cleanup = new DurableStateCleanup(system)
+ cleanup.deleteState(pid, resetRevisionNumber = true).futureValue
+
+ val p2 = spawn(DurableStatePersister(pid))
+ p2 ! DurableStatePersister.GetState(stateProbe.ref)
+ stateProbe.expectMessage("")
+ p2 ! DurableStatePersister.GetRevision(revisionProbe.ref)
+ revisionProbe.expectMessage(0L)
+ }
+
+ "delete events for one persistenceId, but keep seqNr" in {
+ val ackProbe = createTestProbe[Done]()
+ val stateProbe = createTestProbe[Any]()
+ val revisionProbe = createTestProbe[Long]()
+ val pid = nextPid()
+ val p = spawn(DurableStatePersister(pid))
+
+ p ! DurableStatePersister.PersistWithAck("a", ackProbe.ref)
+ ackProbe.expectMessage(Done)
+ p ! DurableStatePersister.PersistWithAck("b", ackProbe.ref)
+ ackProbe.expectMessage(Done)
+
+ testKit.stop(p)
+
+ val cleanup = new DurableStateCleanup(system)
+ cleanup.deleteState(pid, resetRevisionNumber = false).futureValue
+
+ val p2 = spawn(DurableStatePersister(pid))
+ p2 ! DurableStatePersister.GetState(stateProbe.ref)
+ stateProbe.expectMessage("")
+ p2 ! DurableStatePersister.GetRevision(revisionProbe.ref)
+ revisionProbe.expectMessage(3L)
+ }
+
+ "delete all" in {
+ val ackProbe = createTestProbe[Done]()
+ val stateProbe = createTestProbe[Any]()
+ val seqNrProbe = createTestProbe[Long]()
+ val pids = Vector(nextPid(), nextPid(), nextPid())
+ val persisters = pids.map(pid => spawn(DurableStatePersister(pid)))
+
+ (1 to 3).foreach { n =>
+ persisters.foreach { p =>
+ p ! DurableStatePersister.PersistWithAck(s"$n", ackProbe.ref)
+ ackProbe.expectMessage(Done)
+ }
+ }
+
+ persisters.foreach(testKit.stop(_))
+
+ val cleanup = new DurableStateCleanup(system)
+ cleanup.deleteStates(pids, resetRevisionNumber = true).futureValue
+
+ val persisters2 = pids.map(pid => spawn(DurableStatePersister(pid)))
+ persisters2.foreach { p =>
+ p ! DurableStatePersister.GetState(stateProbe.ref)
+ stateProbe.expectMessage("")
+ p ! DurableStatePersister.GetRevision(seqNrProbe.ref)
+ seqNrProbe.expectMessage(0L)
+ }
+ }
+
+ }
+
+}
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/cleanup/scaladsl/EventSourcedCleanupSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/cleanup/scaladsl/EventSourcedCleanupSpec.scala
new file mode 100644
index 0000000..91b96fa
--- /dev/null
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/cleanup/scaladsl/EventSourcedCleanupSpec.scala
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 - 2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc.cleanup.scaladsl
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.testkit.typed.scaladsl.LogCapturing
+import pekko.actor.testkit.typed.scaladsl.LoggingTestKit
+import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
+import pekko.actor.typed.ActorSystem
+import pekko.actor.typed.scaladsl.Behaviors
+import pekko.persistence.r2dbc.TestActors.Persister
+import pekko.persistence.r2dbc.TestConfig
+import pekko.persistence.r2dbc.TestData
+import pekko.persistence.r2dbc.TestDbLifecycle
+import pekko.persistence.typed.PersistenceId
+import com.typesafe.config.ConfigFactory
+import org.scalatest.wordspec.AnyWordSpecLike
+import org.slf4j.event.Level
+
+object EventSourcedCleanupSpec {
+ val config = ConfigFactory
+ .parseString(s"""
+ pekko.loglevel = DEBUG
+ pekko.persistence.r2dbc.cleanup {
+ log-progress-every = 2
+ events-journal-delete-batch-size = 10
+ }
+ """)
+ .withFallback(TestConfig.config)
+}
+
+class EventSourcedCleanupSpec
+ extends ScalaTestWithActorTestKit(EventSourcedCleanupSpec.config)
+ with AnyWordSpecLike
+ with TestDbLifecycle
+ with TestData
+ with LogCapturing {
+
+ override def typedSystem: ActorSystem[_] = system
+
+ "EventSourcedCleanup" must {
+ "delete events for one persistenceId" in {
+ val ackProbe = createTestProbe[Done]()
+ val stateProbe = createTestProbe[String]()
+ val seqNrProbe = createTestProbe[Long]()
+ val pid = nextPid()
+ val p = spawn(Persister(pid))
+
+ (1 to 10).foreach { n =>
+ p ! Persister.PersistWithAck(n, ackProbe.ref)
+ ackProbe.expectMessage(Done)
+ }
+
+ testKit.stop(p)
+
+ val cleanup = new EventSourcedCleanup(system)
+ cleanup.deleteAllEvents(pid, resetSequenceNumber = true).futureValue
+
+ val p2 = spawn(Persister(pid))
+ p2 ! Persister.GetState(stateProbe.ref)
+ stateProbe.expectMessage("")
+ p2 ! Persister.GetSeqNr(seqNrProbe.ref)
+ seqNrProbe.expectMessage(0L)
+ }
+
+ "delete events for one persistenceId in batches" in {
+ val ackProbe = createTestProbe[Done]()
+ val stateProbe = createTestProbe[String]()
+ val seqNrProbe = createTestProbe[Long]()
+ val pid = nextPid()
+ val p = spawn(Persister(pid))
+
+ val maxSeqNumber = 47
+ (1 to maxSeqNumber).foreach { n =>
+ p ! Persister.PersistWithAck(n, ackProbe.ref)
+ ackProbe.expectMessage(Done)
+ }
+
+ testKit.stop(p)
+
+ val cleanup = new EventSourcedCleanup(system)
+
+ var iteration = 0
+ val batchSize = 10
+
+ LoggingTestKit
+ .info("Deleted")
+ .withLogLevel(Level.DEBUG)
+ .withOccurrences(5)
+ .withCustom { event =>
+ val from = (iteration * batchSize) + 1
+ iteration = iteration + 1
+ val to = Math.min(maxSeqNumber, from + batchSize - 1)
+ val deleted = to - from + 1
+ val expectedMsg = s"Deleted [$deleted] events for persistenceId
[$pid], from seq num [$from] to [$to]"
+ event.message == expectedMsg
+ }
+ .expect {
+ cleanup.deleteAllEvents(pid, resetSequenceNumber = true).futureValue
+ }
+
+ val p2 = spawn(Persister(pid))
+ p2 ! Persister.GetState(stateProbe.ref)
+ stateProbe.expectMessage("")
+ p2 ! Persister.GetSeqNr(seqNrProbe.ref)
+ seqNrProbe.expectMessage(0L)
+ }
+
+ "delete events for one persistenceId, but keep seqNr" in {
+ val ackProbe = createTestProbe[Done]()
+ val stateProbe = createTestProbe[String]()
+ val seqNrProbe = createTestProbe[Long]()
+ val pid = nextPid()
+ val p = spawn(Persister(pid))
+
+ (1 to 10).foreach { n =>
+ p ! Persister.PersistWithAck(n, ackProbe.ref)
+ ackProbe.expectMessage(Done)
+ }
+
+ testKit.stop(p)
+
+ val cleanup = new EventSourcedCleanup(system)
+ cleanup.deleteAllEvents(pid, resetSequenceNumber = false).futureValue
+
+ val p2 = spawn(Persister(pid))
+ p2 ! Persister.GetState(stateProbe.ref)
+ stateProbe.expectMessage("")
+ p2 ! Persister.GetSeqNr(seqNrProbe.ref)
+ seqNrProbe.expectMessage(10L)
+ }
+
+ "delete events for one persistenceId in batches, but keep seqNr" in {
+ val ackProbe = createTestProbe[Done]()
+ val stateProbe = createTestProbe[String]()
+ val seqNrProbe = createTestProbe[Long]()
+ val pid = nextPid()
+ val p = spawn(Persister(pid))
+
+ val maxSeqNumber = 47
+ (1 to maxSeqNumber).foreach { n =>
+ p ! Persister.PersistWithAck(n, ackProbe.ref)
+ ackProbe.expectMessage(Done)
+ }
+
+ testKit.stop(p)
+
+ val cleanup = new EventSourcedCleanup(system)
+
+ var iteration = 0
+ val batchSize = 10
+
+ LoggingTestKit
+ .info("Deleted")
+ .withLogLevel(Level.DEBUG)
+ .withOccurrences(5)
+ .withCustom { event =>
+ val from = (iteration * batchSize) + 1
+ iteration = iteration + 1
+ val to = Math.min(maxSeqNumber, from + batchSize - 1)
+ val deleted = to - from + 1
+ val expectedMsg = s"Deleted [$deleted] events for persistenceId
[$pid], from seq num [$from] to [$to]"
+ event.message == expectedMsg
+ }
+ .expect {
+ cleanup.deleteAllEvents(pid, resetSequenceNumber = false).futureValue
+ }
+
+ val p2 = spawn(Persister(pid))
+ p2 ! Persister.GetState(stateProbe.ref)
+ stateProbe.expectMessage("")
+ p2 ! Persister.GetSeqNr(seqNrProbe.ref)
+ seqNrProbe.expectMessage(maxSeqNumber.toLong)
+ }
+
+ "delete some for one persistenceId" in {
+ val ackProbe = createTestProbe[Done]()
+ val stateProbe = createTestProbe[String]()
+ val seqNrProbe = createTestProbe[Long]()
+ val pid = nextPid()
+ val p = spawn(Persister(pid))
+
+ (1 to 8).foreach { n =>
+ p ! Persister.PersistWithAck(n, ackProbe.ref)
+ ackProbe.expectMessage(Done)
+ }
+
+ testKit.stop(p)
+
+ val cleanup = new EventSourcedCleanup(system)
+ cleanup.deleteEventsTo(pid, 5).futureValue
+
+ val p2 = spawn(Persister(pid))
+ p2 ! Persister.GetState(stateProbe.ref)
+ stateProbe.expectMessage("6|7|8")
+ p2 ! Persister.GetSeqNr(seqNrProbe.ref)
+ seqNrProbe.expectMessage(8L)
+ }
+
+ "delete snapshots for one persistenceId" in {
+ val ackProbe = createTestProbe[Done]()
+ val stateProbe = createTestProbe[String]()
+ val pid = nextPid()
+ val p = spawn(Behaviors.setup[Persister.Command] { context =>
+ Persister
+ .eventSourcedBehavior(PersistenceId.ofUniqueId(pid), context)
+ .snapshotWhen((_, event, _) => event.toString.contains("snap"))
+ })
+
+ (1 to 10).foreach { n =>
+ p ! Persister.PersistWithAck(s"${if (n == 3) n + "-snap" else n}",
ackProbe.ref)
+ ackProbe.expectMessage(Done)
+ }
+
+ testKit.stop(p)
+
+ val cleanup = new EventSourcedCleanup(system)
+ cleanup.deleteAllEvents(pid, resetSequenceNumber = false).futureValue
+
+ val p2 = spawn(Persister(pid))
+ p2 ! Persister.GetState(stateProbe.ref)
+ stateProbe.expectMessage("1|2|3-snap")
+ testKit.stop(p2)
+
+ cleanup.deleteSnapshot(pid).futureValue
+
+ val p3 = spawn(Persister(pid))
+ p3 ! Persister.GetState(stateProbe.ref)
+ stateProbe.expectMessage("")
+ }
+
+ "cleanup before snapshot" in {
+ val ackProbe = createTestProbe[Done]()
+ val stateProbe = createTestProbe[String]()
+ val pid = nextPid()
+ val p = spawn(Behaviors.setup[Persister.Command] { context =>
+ Persister
+ .eventSourcedBehavior(PersistenceId.ofUniqueId(pid), context)
+ .snapshotWhen((_, event, _) => event.toString.contains("snap"))
+ })
+
+ (1 to 10).foreach { n =>
+ p ! Persister.PersistWithAck(s"${if (n == 3) n + "-snap" else n}",
ackProbe.ref)
+ ackProbe.expectMessage(Done)
+ }
+
+ testKit.stop(p)
+
+ val cleanup = new EventSourcedCleanup(system)
+ cleanup.cleanupBeforeSnapshot(pid).futureValue
+
+ val p2 = spawn(Persister(pid))
+ p2 ! Persister.GetState(stateProbe.ref)
+ stateProbe.expectMessage("1|2|3-snap|4|5|6|7|8|9|10")
+ testKit.stop(p2)
+
+ cleanup.deleteSnapshot(pid).futureValue
+
+ val p3 = spawn(Persister(pid))
+ p3 ! Persister.GetState(stateProbe.ref)
+ // from replaying remaining events
+ stateProbe.expectMessage("4|5|6|7|8|9|10")
+ }
+
+ "delete all" in {
+ val ackProbe = createTestProbe[Done]()
+ val stateProbe = createTestProbe[String]()
+ val seqNrProbe = createTestProbe[Long]()
+ val pids = Vector(nextPid(), nextPid(), nextPid())
+ val persisters =
+ pids.map { pid =>
+ spawn(Behaviors.setup[Persister.Command] { context =>
+ Persister
+ .eventSourcedBehavior(PersistenceId.ofUniqueId(pid), context)
+ .snapshotWhen((_, event, _) => event.toString.contains("snap"))
+ })
+ }
+
+ (1 to 10).foreach { n =>
+ persisters.foreach { p =>
+ p ! Persister.PersistWithAck(s"${if (n == 3) n + "-snap" else n}",
ackProbe.ref)
+ ackProbe.expectMessage(Done)
+ }
+ }
+
+ persisters.foreach(testKit.stop(_))
+
+ val cleanup = new EventSourcedCleanup(system)
+ cleanup.deleteAll(pids, resetSequenceNumber = true).futureValue
+
+ val persisters2 = pids.map(pid => spawn(Persister(pid)))
+ persisters2.foreach { p =>
+ p ! Persister.GetState(stateProbe.ref)
+ stateProbe.expectMessage("")
+ p ! Persister.GetSeqNr(seqNrProbe.ref)
+ seqNrProbe.expectMessage(0L)
+ }
+ }
+
+ "cleanup all before snapshot" in {
+ val ackProbe = createTestProbe[Done]()
+ val stateProbe = createTestProbe[String]()
+ val pids = Vector(nextPid(), nextPid(), nextPid())
+ val persisters =
+ pids.map { pid =>
+ spawn(Behaviors.setup[Persister.Command] { context =>
+ Persister
+ .eventSourcedBehavior(PersistenceId.ofUniqueId(pid), context)
+ .snapshotWhen((_, event, _) => event.toString.contains("snap"))
+ })
+ }
+
+ (1 to 10).foreach { n =>
+ persisters.foreach { p =>
+ p ! Persister.PersistWithAck(s"${if (n == 3) n + "-snap" else n}",
ackProbe.ref)
+ ackProbe.expectMessage(Done)
+ }
+ }
+
+ persisters.foreach(testKit.stop(_))
+
+ val cleanup = new EventSourcedCleanup(system)
+ cleanup.cleanupBeforeSnapshot(pids).futureValue
+ cleanup.deleteSnapshots(pids).futureValue
+
+ val persisters2 = pids.map(pid => spawn(Persister(pid)))
+ persisters2.foreach { p =>
+ p ! Persister.GetState(stateProbe.ref)
+ // from replaying remaining events
+ stateProbe.expectMessage("4|5|6|7|8|9|10")
+ }
+ }
+ }
+
+}
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/MultiPluginSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/MultiPluginSpec.scala
new file mode 100644
index 0000000..c45abaa
--- /dev/null
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/MultiPluginSpec.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 - 2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc.journal
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.testkit.typed.scaladsl.LogCapturing
+import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
+import pekko.actor.typed.ActorRef
+import pekko.actor.typed.ActorSystem
+import pekko.persistence.r2dbc.TestConfig
+import pekko.persistence.r2dbc.TestData
+import pekko.persistence.r2dbc.TestDbLifecycle
+import pekko.persistence.typed.PersistenceId
+import pekko.persistence.typed.scaladsl.Effect
+import pekko.persistence.typed.scaladsl.EventSourcedBehavior
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+import org.scalatest.wordspec.AnyWordSpecLike
+
+object MultiPluginSpec {
+ val config: Config = ConfigFactory
+ .parseString("""
+ // #default-config
+ pekko.persistence.journal.plugin = "pekko.persistence.r2dbc.journal"
+ pekko.persistence.snapshot-store.plugin =
"pekko.persistence.r2dbc.snapshot"
+ pekko.persistence.state.plugin = "pekko.persistence.r2dbc.state"
+ // #default-config
+
+ // #second-config
+ second-r2dbc = ${pekko.persistence.r2dbc}
+ second-r2dbc {
+ connection-factory {
+ # specific connection properties here
+ }
+ journal {
+ # specific journal properties here
+ }
+ snapshot {
+ # specific snapshot properties here
+ }
+ state {
+ # specific durable state properties here
+ }
+ query {
+ # specific query properties here
+ }
+ }
+ // #second-config
+ """)
+ .withFallback(TestConfig.config)
+ .resolve()
+
+ object MyEntity {
+ sealed trait Command
+ final case class Persist(payload: String, replyTo: ActorRef[State])
extends Command
+ type Event = String
+ object State {
+ def apply(): State = ""
+ }
+ type State = String
+
+ def commandHandler(state: State, cmd: Command): Effect[Event, State] = {
+ cmd match {
+ case Persist(payload, replyTo) =>
+ Effect
+ .persist(payload)
+ .thenReply(replyTo)(newState => newState)
+ }
+ }
+
+ def eventHandler(state: State, evt: Event): State =
+ state + evt
+
+ def apply(persistenceId: PersistenceId): EventSourcedBehavior[Command,
Event, State] = {
+ // #withPlugins
+ EventSourcedBehavior(persistenceId, emptyState = State(),
commandHandler, eventHandler)
+ .withJournalPluginId("second-r2dbc.journal")
+ .withSnapshotPluginId("second-r2dbc.snapshot")
+ // #withPlugins
+ }
+ }
+}
+
+class MultiPluginSpec
+ extends ScalaTestWithActorTestKit(MultiPluginSpec.config)
+ with AnyWordSpecLike
+ with TestDbLifecycle
+ with TestData
+ with LogCapturing {
+
+ import MultiPluginSpec.MyEntity
+
+ override def typedSystem: ActorSystem[_] = system
+
+ "Addition plugin config" should {
+
+ "be supported for EventSourcedBehavior" in {
+ val probe = createTestProbe[MyEntity.State]()
+ val pid = PersistenceId.ofUniqueId(nextPid(nextEntityType()))
+ val ref = spawn(MyEntity(pid))
+ ref ! MyEntity.Persist("a", probe.ref)
+ probe.expectMessage("a")
+ ref ! MyEntity.Persist("b", probe.ref)
+ probe.expectMessage("ab")
+ }
+
+ }
+}
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
index 660bdda..6f754cb 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
@@ -14,6 +14,7 @@
package org.apache.pekko.persistence.r2dbc.query
import java.time.Instant
+import java.time.{ Duration => JDuration }
import scala.concurrent.Await
import scala.concurrent.duration._
@@ -45,6 +46,7 @@ import pekko.stream.scaladsl.Sink
import pekko.stream.scaladsl.Source
import pekko.stream.testkit.TestSubscriber
import pekko.stream.testkit.scaladsl.TestSink
+import pekko.stream.testkit.scaladsl.TestSource
import pekko.stream.typed.scaladsl.ActorFlow
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
@@ -204,6 +206,46 @@ class EventsBySlicePubSubSpec
out shouldBe List(envA1, envA2, envA3, envB1, envA1, envB2) // envA1 was
evicted and therefore duplicate
}
+ "skipPubSubTooFarAhead" in {
+ val backtrackingWindow =
JDuration.ofMillis(querySettings.backtrackingWindow.toMillis)
+ val (in, out) =
+ TestSource[EventEnvelope[String]]()
+ .via(
+ query.skipPubSubTooFarAhead(
+ enabled = true,
+ maxAheadOfBacktracking = backtrackingWindow))
+ .toMat(TestSink[EventEnvelope[String]]())(Keep.both)
+ .run()
+ out.request(100)
+ in.sendNext(envA1)
+ in.sendNext(envA2)
+
+ // all pubsub events dropped before the first backtracking event
+ out.expectNoMessage()
+
+ val pidC = PersistenceId(entityType, "C")
+ in.sendNext(backtrackingEnvelope(envA1))
+ out.expectNext(backtrackingEnvelope(envA1))
+ // now the pubsub event is passed through
+ in.sendNext(envB1)
+ out.expectNext(envB1)
+
+ val time2 = envA1.offset
+ .asInstanceOf[TimestampOffset]
+ .timestamp
+ .plusMillis(backtrackingWindow.toMillis)
+ val envC1 = createEnvelope(pidC, 1L, "c1", time2.plusMillis(1))
+ val envC2 = createEnvelope(pidC, 2L, "c2", time2.plusMillis(2))
+ in.sendNext(envC1)
+ // dropped because > backtrackingWindow
+ out.expectNoMessage()
+
+ in.sendNext(backtrackingEnvelope(envB1))
+ out.expectNext(backtrackingEnvelope(envB1))
+ in.sendNext(envC2)
+ out.expectNext(envC2)
+ }
+
"dynamically enable/disable publishing based on throughput" in new Setup {
import pekko.actor.typed.scaladsl.adapter._
diff --git
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedPubSubSpec.scala
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedPubSubSpec.scala
index 338cf42..b31ae09 100644
---
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedPubSubSpec.scala
+++
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedPubSubSpec.scala
@@ -53,7 +53,7 @@ object EventSourcedPubSubSpec {
buffer-size = 10
backtracking {
- behind-current-time = 5 seconds
+ behind-current-time = 100 millis
window = 20 seconds
}
}
@@ -175,11 +175,13 @@ class EventSourcedPubSubSpec
spawn(Persister(persistenceId), s"p$n")
}
- // write some before starting the projections
+ // write some before starting the projections, with ack to ensure they
are all in the DB
+ val ackProbe = createTestProbe[Done]()
(1 to 10).foreach { n =>
val p = n % numberOfEntities
- entities(p) ! Persister.Persist(mkEvent(n))
+ entities(p) ! Persister.PersistWithAck(mkEvent(n), ackProbe.ref)
}
+ (1 to 10).foreach { _ => ackProbe.receiveMessage(10.seconds) }
val projectionName = UUID.randomUUID().toString
val processedProbe = createTestProbe[Processed]()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]