This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-projection.git
The following commit(s) were added to refs/heads/main by this push:
new 32d8af8 test: increase test coverage for EventSourcedProvider and
DurableStateSourceProvider (#465)
32d8af8 is described below
commit 32d8af8c7aa55c3c9664f10e7c42262dcf77a064
Author: PJ Fanning <[email protected]>
AuthorDate: Thu May 7 20:48:45 2026 +0100
test: increase test coverage for EventSourcedProvider and
DurableStateSourceProvider (#465)
* test: increase test coverage for EventSourcedProvider and
DurableStateSourceProvider
- EventSourcedProviderSpec: add tests for eventsByTag/eventsBySlices with
direct query instance overloads; add sliceForPersistenceId/sliceRanges
without custom Config (using default plugin ID)
- DurableStateSourceProviderSpec: add tests for changesByTag with direct
query instance; add changesBySlices (pluginId and direct query variants);
add sliceForPersistenceId and sliceRanges tests
Addresses apache/pekko-projection#458
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-projection/sessions/912ca590-0b37-4697-acfa-97c41236e831
Co-authored-by: pjfanning <[email protected]>
* test: add negative tests, non-default config tests, and robust
direct-query tests
EventSourcedProviderSpec:
- Update "using direct query instance" tests to write to a custom journal
and create the query from that journal's config; if implementation ignores
the passed instance and reads the default journal, events are not found
- Add negative tests: timestampOf and loadEnvelope fail with
IllegalStateException when underlying query does not implement those
interfaces
DurableStateSourceProviderSpec:
- Add custom-durable-state-store plugin config (clearly non-default)
- Add changesByTag and changesBySlices tests using the non-default plugin
- Add negative test: getObject returns None for non-existent persistenceId
- Add negative test: changesByTag with unmatched tag returns empty source
- Consolidate implicit ExecutionContext to class level (removes repeated
unused-local warnings that were present in the original code too)
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-projection/sessions/4699cea0-ed57-49f8-9ce7-f0e57d4e5d9f
Co-authored-by: pjfanning <[email protected]>
* scalafmt
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
.../scaladsl/DurableStateSourceProviderSpec.scala | 229 ++++++++++++++++++++-
.../scaldsl/EventSourcedProviderSpec.scala | 127 ++++++++++++
2 files changed, 355 insertions(+), 1 deletion(-)
diff --git
a/durable-state/src/test/scala/org/apache/pekko/projection/state/scaladsl/DurableStateSourceProviderSpec.scala
b/durable-state/src/test/scala/org/apache/pekko/projection/state/scaladsl/DurableStateSourceProviderSpec.scala
index c77e3a5..628cabc 100644
---
a/durable-state/src/test/scala/org/apache/pekko/projection/state/scaladsl/DurableStateSourceProviderSpec.scala
+++
b/durable-state/src/test/scala/org/apache/pekko/projection/state/scaladsl/DurableStateSourceProviderSpec.scala
@@ -14,14 +14,18 @@
package org.apache.pekko.projection.state.scaladsl
import scala.concurrent.Future
+import scala.concurrent.ExecutionContext
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import org.apache.pekko
import pekko.actor.testkit.typed.scaladsl._
+import pekko.persistence.Persistence
import pekko.persistence.query.Offset
import pekko.persistence.query.UpdatedDurableState
+import pekko.persistence.query.scaladsl.DurableStateStoreQuery
+import pekko.persistence.query.typed.scaladsl.DurableStateStoreBySliceQuery
import pekko.persistence.state.DurableStateStoreRegistry
import pekko.persistence.state.scaladsl._
import
pekko.persistence.testkit.state.scaladsl.PersistenceTestKitDurableStateStore
@@ -29,8 +33,13 @@ import pekko.stream.testkit.scaladsl.TestSink
import pekko.persistence.testkit.PersistenceTestKitDurableStateStorePlugin
object DurableStateSourceProviderSpec {
+ val customPluginId = "custom-durable-state-store"
+
def conf: Config =
PersistenceTestKitDurableStateStorePlugin.config.withFallback(ConfigFactory.parseString(s"""
pekko.loglevel = INFO
+ $customPluginId {
+ class =
"org.apache.pekko.persistence.testkit.state.PersistenceTestKitDurableStateStoreProvider"
+ }
"""))
final case class Record(id: Int, name: String)
}
@@ -38,6 +47,11 @@ object DurableStateSourceProviderSpec {
class DurableStateSourceProviderSpec
extends ScalaTestWithActorTestKit(DurableStateSourceProviderSpec.conf)
with AnyWordSpecLike {
+
+ private lazy val persistence = Persistence(system)
+ private lazy val numberOfSlices = persistence.numberOfSlices
+ implicit val ec: ExecutionContext = system.executionContext
+
"A DurableStateSourceProvider" must {
import DurableStateSourceProviderSpec._
@@ -49,7 +63,6 @@ class DurableStateSourceProviderSpec
val durableStateStore: DurableStateUpdateStore[Record] =
DurableStateStoreRegistry(system)
.durableStateStoreFor[DurableStateUpdateStore[Record]](PersistenceTestKitDurableStateStore.Identifier)
- implicit val ec = system.classicSystem.dispatcher
val fut = Future.sequence(
Vector(
durableStateStore.upsertObject("persistent-id-1", 0L, record, tag),
@@ -72,6 +85,220 @@ class DurableStateSourceProviderSpec
}
}
}
+
+ "provide changes by tag using a direct query instance" in {
+ val record = Record(0, "Name-1")
+ val tag = "tag-direct"
+ val recordChange = Record(0, "Name-2")
+
+ val durableStateStore: DurableStateUpdateStore[Record] =
+ DurableStateStoreRegistry(system)
+
.durableStateStoreFor[DurableStateUpdateStore[Record]](PersistenceTestKitDurableStateStore.Identifier)
+ val fut = Future.sequence(
+ Vector(
+ durableStateStore.upsertObject("persistent-id-direct-1", 0L, record,
tag),
+ durableStateStore.upsertObject("persistent-id-direct-2", 0L, record,
"tag-other"),
+ durableStateStore.upsertObject("persistent-id-direct-1", 1L,
recordChange, tag)))
+ whenReady(fut) { _ =>
+ val queryInstance =
+ DurableStateStoreRegistry(system)
+
.durableStateStoreFor[DurableStateStoreQuery[Record]](PersistenceTestKitDurableStateStore.Identifier)
+ val sourceProvider =
DurableStateSourceProvider.changesByTag[Record](system, queryInstance, tag)
+
+ whenReady(sourceProvider.source(() =>
Future.successful[Option[Offset]](None))) { source =>
+ val stateChange = source
+ .collect { case u: UpdatedDurableState[Record] => u }
+ .runWith(TestSink[UpdatedDurableState[Record]]())
+ .request(1)
+ .expectNext()
+
+ stateChange.value should be(recordChange)
+ stateChange.revision should be(1L)
+ }
+ }
+ }
+
+ "provide changes by slices" in {
+ val entityType = "SliceEntity"
+ val persistenceId = s"$entityType|slice-id-1"
+ val record = Record(1, "slice-record-1")
+ val recordChange = Record(1, "slice-record-2")
+ val slice = persistence.sliceForPersistenceId(persistenceId)
+
+ val durableStateStore: DurableStateUpdateStore[Record] =
+ DurableStateStoreRegistry(system)
+
.durableStateStoreFor[DurableStateUpdateStore[Record]](PersistenceTestKitDurableStateStore.Identifier)
+ val fut = Future.sequence(
+ Vector(
+ durableStateStore.upsertObject(persistenceId, 0L, record, ""),
+ durableStateStore.upsertObject(persistenceId, 1L, recordChange, "")))
+ whenReady(fut) { _ =>
+ val sourceProvider =
DurableStateSourceProvider.changesBySlices[Record](
+ system,
+ PersistenceTestKitDurableStateStore.Identifier,
+ entityType,
+ slice,
+ slice)
+
+ whenReady(sourceProvider.source(() =>
Future.successful[Option[Offset]](None))) { source =>
+ val stateChange = source
+ .collect { case u: UpdatedDurableState[Record] => u }
+ .runWith(TestSink[UpdatedDurableState[Record]]())
+ .request(1)
+ .expectNext()
+
+ stateChange.value should be(recordChange)
+ stateChange.revision should be(1L)
+ }
+ }
+ }
+
+ "provide changes by slices using a direct query instance" in {
+ val entityType = "SliceEntityDirect"
+ val persistenceId = s"$entityType|slice-direct-id-1"
+ val record = Record(2, "slice-direct-record-1")
+ val recordChange = Record(2, "slice-direct-record-2")
+ val slice = persistence.sliceForPersistenceId(persistenceId)
+
+ val durableStateStore: DurableStateUpdateStore[Record] =
+ DurableStateStoreRegistry(system)
+
.durableStateStoreFor[DurableStateUpdateStore[Record]](PersistenceTestKitDurableStateStore.Identifier)
+ val fut = Future.sequence(
+ Vector(
+ durableStateStore.upsertObject(persistenceId, 0L, record, ""),
+ durableStateStore.upsertObject(persistenceId, 1L, recordChange, "")))
+ whenReady(fut) { _ =>
+ val queryInstance =
+ DurableStateStoreRegistry(system)
+
.durableStateStoreFor[DurableStateStoreBySliceQuery[Record]](PersistenceTestKitDurableStateStore.Identifier)
+ val sourceProvider =
+ DurableStateSourceProvider.changesBySlices[Record](system,
queryInstance, entityType, slice, slice)
+
+ whenReady(sourceProvider.source(() =>
Future.successful[Option[Offset]](None))) { source =>
+ val stateChange = source
+ .collect { case u: UpdatedDurableState[Record] => u }
+ .runWith(TestSink[UpdatedDurableState[Record]]())
+ .request(1)
+ .expectNext()
+
+ stateChange.value should be(recordChange)
+ stateChange.revision should be(1L)
+ }
+ }
+ }
+
+ "return slice for persistence id" in {
+ val entityType = "SliceQueryEntity"
+ val persistenceId = s"$entityType|query-id-1"
+ val slice = DurableStateSourceProvider.sliceForPersistenceId(
+ system,
+ PersistenceTestKitDurableStateStore.Identifier,
+ persistenceId)
+ slice shouldBe persistence.sliceForPersistenceId(persistenceId)
+ }
+
+ "return slice ranges" in {
+ val sliceRanges = DurableStateSourceProvider.sliceRanges(
+ system,
+ PersistenceTestKitDurableStateStore.Identifier,
+ 1)
+ sliceRanges shouldBe Seq(0 until numberOfSlices)
+ }
+
+ // Tests using a non-default durable state store plugin configuration
+ "provide changes by tag using a non-default plugin config" in {
+ val record = Record(0, "Name-custom-1")
+ val tag = "tag-custom-plugin"
+ val recordChange = Record(0, "Name-custom-2")
+
+ val customStore: DurableStateUpdateStore[Record] =
+ DurableStateStoreRegistry(system)
+
.durableStateStoreFor[DurableStateUpdateStore[Record]](customPluginId)
+ val fut = Future.sequence(
+ Vector(
+ customStore.upsertObject("custom-persistent-id-1", 0L, record, tag),
+ customStore.upsertObject("custom-persistent-id-2", 0L, record,
"tag-other"),
+ customStore.upsertObject("custom-persistent-id-1", 1L, recordChange,
tag)))
+ whenReady(fut) { _ =>
+ val sourceProvider =
DurableStateSourceProvider.changesByTag[Record](system, customPluginId, tag)
+
+ whenReady(sourceProvider.source(() =>
Future.successful[Option[Offset]](None))) { source =>
+ val stateChange = source
+ .collect { case u: UpdatedDurableState[Record] => u }
+ .runWith(TestSink[UpdatedDurableState[Record]]())
+ .request(1)
+ .expectNext()
+
+ stateChange.value should be(recordChange)
+ stateChange.revision should be(1L)
+ }
+ }
+ }
+
+ "provide changes by slices using a non-default plugin config" in {
+ val entityType = "CustomPluginEntity"
+ val persistenceId = s"$entityType|custom-plugin-slice-id-1"
+ val record = Record(3, "custom-slice-record-1")
+ val recordChange = Record(3, "custom-slice-record-2")
+ val slice = persistence.sliceForPersistenceId(persistenceId)
+
+ val customStore: DurableStateUpdateStore[Record] =
+ DurableStateStoreRegistry(system)
+
.durableStateStoreFor[DurableStateUpdateStore[Record]](customPluginId)
+ val fut = Future.sequence(
+ Vector(
+ customStore.upsertObject(persistenceId, 0L, record, ""),
+ customStore.upsertObject(persistenceId, 1L, recordChange, "")))
+ whenReady(fut) { _ =>
+ val sourceProvider =
DurableStateSourceProvider.changesBySlices[Record](
+ system,
+ customPluginId,
+ entityType,
+ slice,
+ slice)
+
+ whenReady(sourceProvider.source(() =>
Future.successful[Option[Offset]](None))) { source =>
+ val stateChange = source
+ .collect { case u: UpdatedDurableState[Record] => u }
+ .runWith(TestSink[UpdatedDurableState[Record]]())
+ .request(1)
+ .expectNext()
+
+ stateChange.value should be(recordChange)
+ stateChange.revision should be(1L)
+ }
+ }
+ }
+
+ // Negative tests
+ "return None from getObject for a non-existent persistenceId" in {
+ val entityType = "NonExistentEntity"
+ val persistenceId = s"$entityType|does-not-exist"
+ val slice = persistence.sliceForPersistenceId(persistenceId)
+
+ val sourceProvider = DurableStateSourceProvider
+ .changesBySlices[Record](system,
PersistenceTestKitDurableStateStore.Identifier, entityType, slice, slice)
+ val result =
sourceProvider.asInstanceOf[DurableStateStore[Record]].getObject(persistenceId)
+ whenReady(result) { objectResult =>
+ objectResult.value shouldBe None
+ objectResult.revision shouldBe 0L
+ }
+ }
+
+ "return an empty source for changesByTag when no records match the tag" in
{
+ val emptyTag = "tag-with-no-records"
+
+ val sourceProvider =
+ DurableStateSourceProvider.changesByTag[Record](system,
PersistenceTestKitDurableStateStore.Identifier,
+ emptyTag)
+
+ whenReady(sourceProvider.source(() =>
Future.successful[Option[Offset]](None))) { source =>
+ source
+
.runWith(TestSink[pekko.persistence.query.DurableStateChange[Record]]())
+ .request(1)
+ .expectNoMessage()
+ }
+ }
}
}
diff --git
a/eventsourced/src/test/scala/org/apache/pekko/projection/eventsourced/scaldsl/EventSourcedProviderSpec.scala
b/eventsourced/src/test/scala/org/apache/pekko/projection/eventsourced/scaldsl/EventSourcedProviderSpec.scala
index 6ee4111..c5f2e16 100644
---
a/eventsourced/src/test/scala/org/apache/pekko/projection/eventsourced/scaldsl/EventSourcedProviderSpec.scala
+++
b/eventsourced/src/test/scala/org/apache/pekko/projection/eventsourced/scaldsl/EventSourcedProviderSpec.scala
@@ -22,17 +22,25 @@ import scala.concurrent.Future
import com.typesafe.config.ConfigFactory
import org.apache.pekko
import pekko.Done
+import pekko.NotUsed
import pekko.actor.testkit.typed.scaladsl.LogCapturing
import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import pekko.actor.typed.ActorRef
import pekko.persistence.Persistence
import pekko.persistence.query.NoOffset
+import pekko.persistence.query.Offset
+import pekko.persistence.query.PersistenceQuery
+import pekko.persistence.query.scaladsl.EventsByTagQuery
+import pekko.persistence.query.typed.scaladsl.EventTimestampQuery
+import pekko.persistence.query.typed.scaladsl.EventsBySliceQuery
+import pekko.persistence.query.typed.scaladsl.LoadEventQuery
import pekko.persistence.testkit.PersistenceTestKitPlugin
import pekko.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import pekko.persistence.typed.PersistenceId
import pekko.persistence.typed.scaladsl.Effect
import pekko.persistence.typed.scaladsl.EventSourcedBehavior
import pekko.projection.eventsourced.scaladsl.EventSourcedProvider
+import pekko.stream.scaladsl.Source
import pekko.stream.testkit.scaladsl.TestSink
import org.scalatest.Inspectors.forEvery
import org.scalatest.freespec.AnyFreeSpecLike
@@ -192,6 +200,32 @@ class EventSourcedProviderSpec
assertTag(tag1, expectedEvents.map(_ + s"-$journal1"),
Some(journal1))
assertTag(tag1, expectedEvents.map(_ + s"-$journal2"),
Some(journal2))
}
+
+ "using direct query instance" in {
+ val persistenceId = "e-id-1"
+ val tag = "e-tag-1"
+ val journal = "e-journal-1"
+
+ // Write events to a custom journal — if the implementation ignores
the
+ // passed query instance and reads from the default journal instead,
+ // these events will not be found and the test will fail
+ setup(persistenceId, Set(tag), Some(journal))
+
+ val eventsByTagQuery =
+
PersistenceQuery(system).readJournalFor[EventsByTagQuery](s"$journal.query",
journalConfig(journal))
+ EventSourcedProvider
+ .eventsByTag[String](system, eventsByTagQuery, tag)
+ .source(() => Future.successful(Some(NoOffset)))
+ .futureValue
+ .map(_.event)
+ .runWith(TestSink())
+ .request(3)
+ .expectNextN(
+ Seq(s"$persistenceId-event-1-$journal",
s"$persistenceId-event-2-$journal",
+ s"$persistenceId-event-3-$journal"))
+ .request(1)
+ .expectNoMessage()
+ }
}
"by slices" - {
@@ -225,6 +259,32 @@ class EventSourcedProviderSpec
assertSlices(0, numberOfSlices - 1, expectedEvents.map(_ +
s"-$journal1"), maybeJournal = Some(journal1))
assertSlices(0, numberOfSlices - 1, expectedEvents.map(_ +
s"-$journal2"), maybeJournal = Some(journal2))
}
+
+ "using direct query instance" in {
+ val persistenceId = makeFullPersistenceId("f-id-1")
+ val journal = "f-journal-1"
+ val slice = persistence.sliceForPersistenceId(persistenceId)
+
+ // Write events to a custom journal — if the implementation ignores
the
+ // passed query instance and reads from the default journal instead,
+ // these events will not be found and the test will fail
+ setup(persistenceId, maybeJournal = Some(journal))
+
+ val eventsBySlicesQuery =
+
PersistenceQuery(system).readJournalFor[EventsBySliceQuery](s"$journal.query",
journalConfig(journal))
+ EventSourcedProvider
+ .eventsBySlices[String](system, eventsBySlicesQuery, entityType,
slice, slice)
+ .source(() => Future.successful(Some(NoOffset)))
+ .futureValue
+ .map(_.event)
+ .runWith(TestSink())
+ .request(3)
+ .expectNextN(
+ Seq(s"$persistenceId-event-1-$journal",
s"$persistenceId-event-2-$journal",
+ s"$persistenceId-event-3-$journal"))
+ .request(1)
+ .expectNoMessage()
+ }
}
}
@@ -256,5 +316,72 @@ class EventSourcedProviderSpec
}
}
}
+
+ "using the default plugin" - {
+ "return slice for persistence id" in {
+ EventSourcedProvider.sliceForPersistenceId(
+ system,
+ PersistenceTestKitReadJournal.Identifier,
+ makeFullPersistenceId("d-id-1")
+ ) shouldBe 594
+ }
+
+ "return slice ranges" in {
+ EventSourcedProvider.sliceRanges(
+ system,
+ PersistenceTestKitReadJournal.Identifier,
+ 1
+ ) shouldBe Seq(0 until numberOfSlices)
+ }
+ }
+
+ "negative tests" - {
+ // A minimal EventsBySliceQuery that does NOT implement
EventTimestampQuery or
+ // LoadEventQuery, used to trigger the failure paths in
EventsBySlicesSourceProvider
+ def minimalEventsBySliceQuery(): EventsBySliceQuery = {
+ val delegate =
+
PersistenceQuery(system).readJournalFor[EventsBySliceQuery](PersistenceTestKitReadJournal.Identifier)
+ new EventsBySliceQuery {
+ override def eventsBySlices[Event](
+ entityType: String,
+ minSlice: Int,
+ maxSlice: Int,
+ offset: Offset):
Source[pekko.persistence.query.typed.EventEnvelope[Event], NotUsed] =
+ delegate.eventsBySlices(entityType, minSlice, maxSlice, offset)
+ override def sliceForPersistenceId(persistenceId: String): Int =
+ delegate.sliceForPersistenceId(persistenceId)
+ override def sliceRanges(numberOfRanges: Int): Seq[Range] =
+ delegate.sliceRanges(numberOfRanges)
+ }
+ }
+
+ "fail with IllegalStateException on timestampOf when query does not
implement EventTimestampQuery" in {
+ val provider = EventSourcedProvider.eventsBySlices[String](
+ system,
+ minimalEventsBySliceQuery(),
+ entityType,
+ 0,
+ numberOfSlices - 1)
+ provider
+ .asInstanceOf[EventTimestampQuery]
+ .timestampOf("pid-neg-1", 1L)
+ .failed
+ .futureValue shouldBe an[IllegalStateException]
+ }
+
+ "fail with IllegalStateException on loadEnvelope when query does not
implement LoadEventQuery" in {
+ val provider = EventSourcedProvider.eventsBySlices[String](
+ system,
+ minimalEventsBySliceQuery(),
+ entityType,
+ 0,
+ numberOfSlices - 1)
+ provider
+ .asInstanceOf[LoadEventQuery]
+ .loadEnvelope[String]("pid-neg-2", 1L)
+ .failed
+ .futureValue shouldBe an[IllegalStateException]
+ }
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]