This is an automated email from the ASF dual-hosted git repository.

pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-projection.git


The following commit(s) were added to refs/heads/main by this push:
     new 32d8af8  test: increase test coverage for EventSourcedProvider and 
DurableStateSourceProvider (#465)
32d8af8 is described below

commit 32d8af8c7aa55c3c9664f10e7c42262dcf77a064
Author: PJ Fanning <[email protected]>
AuthorDate: Thu May 7 20:48:45 2026 +0100

    test: increase test coverage for EventSourcedProvider and 
DurableStateSourceProvider (#465)
    
    * test: increase test coverage for EventSourcedProvider and 
DurableStateSourceProvider
    
    - EventSourcedProviderSpec: add tests for eventsByTag/eventsBySlices with
      direct query instance overloads; add sliceForPersistenceId/sliceRanges
      without custom Config (using default plugin ID)
    - DurableStateSourceProviderSpec: add tests for changesByTag with direct
      query instance; add changesBySlices (pluginId and direct query variants);
      add sliceForPersistenceId and sliceRanges tests
    
    Addresses apache/pekko-projection#458
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-projection/sessions/912ca590-0b37-4697-acfa-97c41236e831
    
    Co-authored-by: pjfanning <[email protected]>
    
    * test: add negative tests, non-default config tests, and robust 
direct-query tests
    
    EventSourcedProviderSpec:
    - Update "using direct query instance" tests to write to a custom journal
      and create the query from that journal's config; if implementation ignores
      the passed instance and reads the default journal, events are not found
    - Add negative tests: timestampOf and loadEnvelope fail with
      IllegalStateException when underlying query does not implement those
      interfaces
    
    DurableStateSourceProviderSpec:
    - Add custom-durable-state-store plugin config (clearly non-default)
    - Add changesByTag and changesBySlices tests using the non-default plugin
    - Add negative test: getObject returns None for non-existent persistenceId
    - Add negative test: changesByTag with unmatched tag returns empty source
    - Consolidate implicit ExecutionContext to class level (removes repeated
      unused-local warnings that were present in the original code too)
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-projection/sessions/4699cea0-ed57-49f8-9ce7-f0e57d4e5d9f
    
    Co-authored-by: pjfanning <[email protected]>
    
    * scalafmt
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 .../scaladsl/DurableStateSourceProviderSpec.scala  | 229 ++++++++++++++++++++-
 .../scaldsl/EventSourcedProviderSpec.scala         | 127 ++++++++++++
 2 files changed, 355 insertions(+), 1 deletion(-)

diff --git 
a/durable-state/src/test/scala/org/apache/pekko/projection/state/scaladsl/DurableStateSourceProviderSpec.scala
 
b/durable-state/src/test/scala/org/apache/pekko/projection/state/scaladsl/DurableStateSourceProviderSpec.scala
index c77e3a5..628cabc 100644
--- 
a/durable-state/src/test/scala/org/apache/pekko/projection/state/scaladsl/DurableStateSourceProviderSpec.scala
+++ 
b/durable-state/src/test/scala/org/apache/pekko/projection/state/scaladsl/DurableStateSourceProviderSpec.scala
@@ -14,14 +14,18 @@
 package org.apache.pekko.projection.state.scaladsl
 
 import scala.concurrent.Future
+import scala.concurrent.ExecutionContext
 
 import com.typesafe.config.Config
 import com.typesafe.config.ConfigFactory
 import org.scalatest.wordspec.AnyWordSpecLike
 import org.apache.pekko
 import pekko.actor.testkit.typed.scaladsl._
+import pekko.persistence.Persistence
 import pekko.persistence.query.Offset
 import pekko.persistence.query.UpdatedDurableState
+import pekko.persistence.query.scaladsl.DurableStateStoreQuery
+import pekko.persistence.query.typed.scaladsl.DurableStateStoreBySliceQuery
 import pekko.persistence.state.DurableStateStoreRegistry
 import pekko.persistence.state.scaladsl._
 import 
pekko.persistence.testkit.state.scaladsl.PersistenceTestKitDurableStateStore
@@ -29,8 +33,13 @@ import pekko.stream.testkit.scaladsl.TestSink
 import pekko.persistence.testkit.PersistenceTestKitDurableStateStorePlugin
 
 object DurableStateSourceProviderSpec {
+  val customPluginId = "custom-durable-state-store"
+
   def conf: Config = 
PersistenceTestKitDurableStateStorePlugin.config.withFallback(ConfigFactory.parseString(s"""
     pekko.loglevel = INFO
+    $customPluginId {
+      class = 
"org.apache.pekko.persistence.testkit.state.PersistenceTestKitDurableStateStoreProvider"
+    }
     """))
   final case class Record(id: Int, name: String)
 }
@@ -38,6 +47,11 @@ object DurableStateSourceProviderSpec {
 class DurableStateSourceProviderSpec
     extends ScalaTestWithActorTestKit(DurableStateSourceProviderSpec.conf)
     with AnyWordSpecLike {
+
+  private lazy val persistence = Persistence(system)
+  private lazy val numberOfSlices = persistence.numberOfSlices
+  implicit val ec: ExecutionContext = system.executionContext
+
   "A DurableStateSourceProvider" must {
     import DurableStateSourceProviderSpec._
 
@@ -49,7 +63,6 @@ class DurableStateSourceProviderSpec
       val durableStateStore: DurableStateUpdateStore[Record] =
         DurableStateStoreRegistry(system)
           
.durableStateStoreFor[DurableStateUpdateStore[Record]](PersistenceTestKitDurableStateStore.Identifier)
-      implicit val ec = system.classicSystem.dispatcher
       val fut = Future.sequence(
         Vector(
           durableStateStore.upsertObject("persistent-id-1", 0L, record, tag),
@@ -72,6 +85,220 @@ class DurableStateSourceProviderSpec
         }
       }
     }
+
+    "provide changes by tag using a direct query instance" in {
+      val record = Record(0, "Name-1")
+      val tag = "tag-direct"
+      val recordChange = Record(0, "Name-2")
+
+      val durableStateStore: DurableStateUpdateStore[Record] =
+        DurableStateStoreRegistry(system)
+          
.durableStateStoreFor[DurableStateUpdateStore[Record]](PersistenceTestKitDurableStateStore.Identifier)
+      val fut = Future.sequence(
+        Vector(
+          durableStateStore.upsertObject("persistent-id-direct-1", 0L, record, 
tag),
+          durableStateStore.upsertObject("persistent-id-direct-2", 0L, record, 
"tag-other"),
+          durableStateStore.upsertObject("persistent-id-direct-1", 1L, 
recordChange, tag)))
+      whenReady(fut) { _ =>
+        val queryInstance =
+          DurableStateStoreRegistry(system)
+            
.durableStateStoreFor[DurableStateStoreQuery[Record]](PersistenceTestKitDurableStateStore.Identifier)
+        val sourceProvider = 
DurableStateSourceProvider.changesByTag[Record](system, queryInstance, tag)
+
+        whenReady(sourceProvider.source(() => 
Future.successful[Option[Offset]](None))) { source =>
+          val stateChange = source
+            .collect { case u: UpdatedDurableState[Record] => u }
+            .runWith(TestSink[UpdatedDurableState[Record]]())
+            .request(1)
+            .expectNext()
+
+          stateChange.value should be(recordChange)
+          stateChange.revision should be(1L)
+        }
+      }
+    }
+
+    "provide changes by slices" in {
+      val entityType = "SliceEntity"
+      val persistenceId = s"$entityType|slice-id-1"
+      val record = Record(1, "slice-record-1")
+      val recordChange = Record(1, "slice-record-2")
+      val slice = persistence.sliceForPersistenceId(persistenceId)
+
+      val durableStateStore: DurableStateUpdateStore[Record] =
+        DurableStateStoreRegistry(system)
+          
.durableStateStoreFor[DurableStateUpdateStore[Record]](PersistenceTestKitDurableStateStore.Identifier)
+      val fut = Future.sequence(
+        Vector(
+          durableStateStore.upsertObject(persistenceId, 0L, record, ""),
+          durableStateStore.upsertObject(persistenceId, 1L, recordChange, "")))
+      whenReady(fut) { _ =>
+        val sourceProvider = 
DurableStateSourceProvider.changesBySlices[Record](
+          system,
+          PersistenceTestKitDurableStateStore.Identifier,
+          entityType,
+          slice,
+          slice)
+
+        whenReady(sourceProvider.source(() => 
Future.successful[Option[Offset]](None))) { source =>
+          val stateChange = source
+            .collect { case u: UpdatedDurableState[Record] => u }
+            .runWith(TestSink[UpdatedDurableState[Record]]())
+            .request(1)
+            .expectNext()
+
+          stateChange.value should be(recordChange)
+          stateChange.revision should be(1L)
+        }
+      }
+    }
+
+    "provide changes by slices using a direct query instance" in {
+      val entityType = "SliceEntityDirect"
+      val persistenceId = s"$entityType|slice-direct-id-1"
+      val record = Record(2, "slice-direct-record-1")
+      val recordChange = Record(2, "slice-direct-record-2")
+      val slice = persistence.sliceForPersistenceId(persistenceId)
+
+      val durableStateStore: DurableStateUpdateStore[Record] =
+        DurableStateStoreRegistry(system)
+          
.durableStateStoreFor[DurableStateUpdateStore[Record]](PersistenceTestKitDurableStateStore.Identifier)
+      val fut = Future.sequence(
+        Vector(
+          durableStateStore.upsertObject(persistenceId, 0L, record, ""),
+          durableStateStore.upsertObject(persistenceId, 1L, recordChange, "")))
+      whenReady(fut) { _ =>
+        val queryInstance =
+          DurableStateStoreRegistry(system)
+            
.durableStateStoreFor[DurableStateStoreBySliceQuery[Record]](PersistenceTestKitDurableStateStore.Identifier)
+        val sourceProvider =
+          DurableStateSourceProvider.changesBySlices[Record](system, 
queryInstance, entityType, slice, slice)
+
+        whenReady(sourceProvider.source(() => 
Future.successful[Option[Offset]](None))) { source =>
+          val stateChange = source
+            .collect { case u: UpdatedDurableState[Record] => u }
+            .runWith(TestSink[UpdatedDurableState[Record]]())
+            .request(1)
+            .expectNext()
+
+          stateChange.value should be(recordChange)
+          stateChange.revision should be(1L)
+        }
+      }
+    }
+
+    "return slice for persistence id" in {
+      val entityType = "SliceQueryEntity"
+      val persistenceId = s"$entityType|query-id-1"
+      val slice = DurableStateSourceProvider.sliceForPersistenceId(
+        system,
+        PersistenceTestKitDurableStateStore.Identifier,
+        persistenceId)
+      slice shouldBe persistence.sliceForPersistenceId(persistenceId)
+    }
+
+    "return slice ranges" in {
+      val sliceRanges = DurableStateSourceProvider.sliceRanges(
+        system,
+        PersistenceTestKitDurableStateStore.Identifier,
+        1)
+      sliceRanges shouldBe Seq(0 until numberOfSlices)
+    }
+
+    // Tests using a non-default durable state store plugin configuration
+    "provide changes by tag using a non-default plugin config" in {
+      val record = Record(0, "Name-custom-1")
+      val tag = "tag-custom-plugin"
+      val recordChange = Record(0, "Name-custom-2")
+
+      val customStore: DurableStateUpdateStore[Record] =
+        DurableStateStoreRegistry(system)
+          
.durableStateStoreFor[DurableStateUpdateStore[Record]](customPluginId)
+      val fut = Future.sequence(
+        Vector(
+          customStore.upsertObject("custom-persistent-id-1", 0L, record, tag),
+          customStore.upsertObject("custom-persistent-id-2", 0L, record, 
"tag-other"),
+          customStore.upsertObject("custom-persistent-id-1", 1L, recordChange, 
tag)))
+      whenReady(fut) { _ =>
+        val sourceProvider = 
DurableStateSourceProvider.changesByTag[Record](system, customPluginId, tag)
+
+        whenReady(sourceProvider.source(() => 
Future.successful[Option[Offset]](None))) { source =>
+          val stateChange = source
+            .collect { case u: UpdatedDurableState[Record] => u }
+            .runWith(TestSink[UpdatedDurableState[Record]]())
+            .request(1)
+            .expectNext()
+
+          stateChange.value should be(recordChange)
+          stateChange.revision should be(1L)
+        }
+      }
+    }
+
+    "provide changes by slices using a non-default plugin config" in {
+      val entityType = "CustomPluginEntity"
+      val persistenceId = s"$entityType|custom-plugin-slice-id-1"
+      val record = Record(3, "custom-slice-record-1")
+      val recordChange = Record(3, "custom-slice-record-2")
+      val slice = persistence.sliceForPersistenceId(persistenceId)
+
+      val customStore: DurableStateUpdateStore[Record] =
+        DurableStateStoreRegistry(system)
+          
.durableStateStoreFor[DurableStateUpdateStore[Record]](customPluginId)
+      val fut = Future.sequence(
+        Vector(
+          customStore.upsertObject(persistenceId, 0L, record, ""),
+          customStore.upsertObject(persistenceId, 1L, recordChange, "")))
+      whenReady(fut) { _ =>
+        val sourceProvider = 
DurableStateSourceProvider.changesBySlices[Record](
+          system,
+          customPluginId,
+          entityType,
+          slice,
+          slice)
+
+        whenReady(sourceProvider.source(() => 
Future.successful[Option[Offset]](None))) { source =>
+          val stateChange = source
+            .collect { case u: UpdatedDurableState[Record] => u }
+            .runWith(TestSink[UpdatedDurableState[Record]]())
+            .request(1)
+            .expectNext()
+
+          stateChange.value should be(recordChange)
+          stateChange.revision should be(1L)
+        }
+      }
+    }
+
+    // Negative tests
+    "return None from getObject for a non-existent persistenceId" in {
+      val entityType = "NonExistentEntity"
+      val persistenceId = s"$entityType|does-not-exist"
+      val slice = persistence.sliceForPersistenceId(persistenceId)
+
+      val sourceProvider = DurableStateSourceProvider
+        .changesBySlices[Record](system, 
PersistenceTestKitDurableStateStore.Identifier, entityType, slice, slice)
+      val result = 
sourceProvider.asInstanceOf[DurableStateStore[Record]].getObject(persistenceId)
+      whenReady(result) { objectResult =>
+        objectResult.value shouldBe None
+        objectResult.revision shouldBe 0L
+      }
+    }
+
+    "return an empty source for changesByTag when no records match the tag" in 
{
+      val emptyTag = "tag-with-no-records"
+
+      val sourceProvider =
+        DurableStateSourceProvider.changesByTag[Record](system, 
PersistenceTestKitDurableStateStore.Identifier,
+          emptyTag)
+
+      whenReady(sourceProvider.source(() => 
Future.successful[Option[Offset]](None))) { source =>
+        source
+          
.runWith(TestSink[pekko.persistence.query.DurableStateChange[Record]]())
+          .request(1)
+          .expectNoMessage()
+      }
+    }
   }
 
 }
diff --git 
a/eventsourced/src/test/scala/org/apache/pekko/projection/eventsourced/scaldsl/EventSourcedProviderSpec.scala
 
b/eventsourced/src/test/scala/org/apache/pekko/projection/eventsourced/scaldsl/EventSourcedProviderSpec.scala
index 6ee4111..c5f2e16 100644
--- 
a/eventsourced/src/test/scala/org/apache/pekko/projection/eventsourced/scaldsl/EventSourcedProviderSpec.scala
+++ 
b/eventsourced/src/test/scala/org/apache/pekko/projection/eventsourced/scaldsl/EventSourcedProviderSpec.scala
@@ -22,17 +22,25 @@ import scala.concurrent.Future
 import com.typesafe.config.ConfigFactory
 import org.apache.pekko
 import pekko.Done
+import pekko.NotUsed
 import pekko.actor.testkit.typed.scaladsl.LogCapturing
 import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
 import pekko.actor.typed.ActorRef
 import pekko.persistence.Persistence
 import pekko.persistence.query.NoOffset
+import pekko.persistence.query.Offset
+import pekko.persistence.query.PersistenceQuery
+import pekko.persistence.query.scaladsl.EventsByTagQuery
+import pekko.persistence.query.typed.scaladsl.EventTimestampQuery
+import pekko.persistence.query.typed.scaladsl.EventsBySliceQuery
+import pekko.persistence.query.typed.scaladsl.LoadEventQuery
 import pekko.persistence.testkit.PersistenceTestKitPlugin
 import pekko.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
 import pekko.persistence.typed.PersistenceId
 import pekko.persistence.typed.scaladsl.Effect
 import pekko.persistence.typed.scaladsl.EventSourcedBehavior
 import pekko.projection.eventsourced.scaladsl.EventSourcedProvider
+import pekko.stream.scaladsl.Source
 import pekko.stream.testkit.scaladsl.TestSink
 import org.scalatest.Inspectors.forEvery
 import org.scalatest.freespec.AnyFreeSpecLike
@@ -192,6 +200,32 @@ class EventSourcedProviderSpec
           assertTag(tag1, expectedEvents.map(_ + s"-$journal1"), 
Some(journal1))
           assertTag(tag1, expectedEvents.map(_ + s"-$journal2"), 
Some(journal2))
         }
+
+        "using direct query instance" in {
+          val persistenceId = "e-id-1"
+          val tag = "e-tag-1"
+          val journal = "e-journal-1"
+
+          // Write events to a custom journal — if the implementation ignores 
the
+          // passed query instance and reads from the default journal instead,
+          // these events will not be found and the test will fail
+          setup(persistenceId, Set(tag), Some(journal))
+
+          val eventsByTagQuery =
+            
PersistenceQuery(system).readJournalFor[EventsByTagQuery](s"$journal.query", 
journalConfig(journal))
+          EventSourcedProvider
+            .eventsByTag[String](system, eventsByTagQuery, tag)
+            .source(() => Future.successful(Some(NoOffset)))
+            .futureValue
+            .map(_.event)
+            .runWith(TestSink())
+            .request(3)
+            .expectNextN(
+              Seq(s"$persistenceId-event-1-$journal", 
s"$persistenceId-event-2-$journal",
+                s"$persistenceId-event-3-$journal"))
+            .request(1)
+            .expectNoMessage()
+        }
       }
 
       "by slices" - {
@@ -225,6 +259,32 @@ class EventSourcedProviderSpec
           assertSlices(0, numberOfSlices - 1, expectedEvents.map(_ + 
s"-$journal1"), maybeJournal = Some(journal1))
           assertSlices(0, numberOfSlices - 1, expectedEvents.map(_ + 
s"-$journal2"), maybeJournal = Some(journal2))
         }
+
+        "using direct query instance" in {
+          val persistenceId = makeFullPersistenceId("f-id-1")
+          val journal = "f-journal-1"
+          val slice = persistence.sliceForPersistenceId(persistenceId)
+
+          // Write events to a custom journal — if the implementation ignores 
the
+          // passed query instance and reads from the default journal instead,
+          // these events will not be found and the test will fail
+          setup(persistenceId, maybeJournal = Some(journal))
+
+          val eventsBySlicesQuery =
+            
PersistenceQuery(system).readJournalFor[EventsBySliceQuery](s"$journal.query", 
journalConfig(journal))
+          EventSourcedProvider
+            .eventsBySlices[String](system, eventsBySlicesQuery, entityType, 
slice, slice)
+            .source(() => Future.successful(Some(NoOffset)))
+            .futureValue
+            .map(_.event)
+            .runWith(TestSink())
+            .request(3)
+            .expectNextN(
+              Seq(s"$persistenceId-event-1-$journal", 
s"$persistenceId-event-2-$journal",
+                s"$persistenceId-event-3-$journal"))
+            .request(1)
+            .expectNoMessage()
+        }
       }
     }
 
@@ -256,5 +316,72 @@ class EventSourcedProviderSpec
         }
       }
     }
+
+    "using the default plugin" - {
+      "return slice for persistence id" in {
+        EventSourcedProvider.sliceForPersistenceId(
+          system,
+          PersistenceTestKitReadJournal.Identifier,
+          makeFullPersistenceId("d-id-1")
+        ) shouldBe 594
+      }
+
+      "return slice ranges" in {
+        EventSourcedProvider.sliceRanges(
+          system,
+          PersistenceTestKitReadJournal.Identifier,
+          1
+        ) shouldBe Seq(0 until numberOfSlices)
+      }
+    }
+
+    "negative tests" - {
+      // A minimal EventsBySliceQuery that does NOT implement 
EventTimestampQuery or
+      // LoadEventQuery, used to trigger the failure paths in 
EventsBySlicesSourceProvider
+      def minimalEventsBySliceQuery(): EventsBySliceQuery = {
+        val delegate =
+          
PersistenceQuery(system).readJournalFor[EventsBySliceQuery](PersistenceTestKitReadJournal.Identifier)
+        new EventsBySliceQuery {
+          override def eventsBySlices[Event](
+              entityType: String,
+              minSlice: Int,
+              maxSlice: Int,
+              offset: Offset): 
Source[pekko.persistence.query.typed.EventEnvelope[Event], NotUsed] =
+            delegate.eventsBySlices(entityType, minSlice, maxSlice, offset)
+          override def sliceForPersistenceId(persistenceId: String): Int =
+            delegate.sliceForPersistenceId(persistenceId)
+          override def sliceRanges(numberOfRanges: Int): Seq[Range] =
+            delegate.sliceRanges(numberOfRanges)
+        }
+      }
+
+      "fail with IllegalStateException on timestampOf when query does not 
implement EventTimestampQuery" in {
+        val provider = EventSourcedProvider.eventsBySlices[String](
+          system,
+          minimalEventsBySliceQuery(),
+          entityType,
+          0,
+          numberOfSlices - 1)
+        provider
+          .asInstanceOf[EventTimestampQuery]
+          .timestampOf("pid-neg-1", 1L)
+          .failed
+          .futureValue shouldBe an[IllegalStateException]
+      }
+
+      "fail with IllegalStateException on loadEnvelope when query does not 
implement LoadEventQuery" in {
+        val provider = EventSourcedProvider.eventsBySlices[String](
+          system,
+          minimalEventsBySliceQuery(),
+          entityType,
+          0,
+          numberOfSlices - 1)
+        provider
+          .asInstanceOf[LoadEventQuery]
+          .loadEnvelope[String]("pid-neg-2", 1L)
+          .failed
+          .futureValue shouldBe an[IllegalStateException]
+      }
+    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to