This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-projection.git


The following commit(s) were added to refs/heads/main by this push:
     new efcf7b7  Support runtime persistence plugin configuration in 
sliceForPersistenceId and sliceRanges methods of EventSourcedProvider (#302)
efcf7b7 is described below

commit efcf7b7349c2d8ed61592e009bd4e4987a511192
Author: Domantas Petrauskas <[email protected]>
AuthorDate: Sun Oct 5 03:46:05 2025 +0300

    Support runtime persistence plugin configuration in sliceForPersistenceId 
and sliceRanges methods of EventSourcedProvider (#302)
    
    * Add config parameter to sliceForPersistenceId and sliceRanges methods of 
EventSourcedProvider
    
    * Address review comments
---
 .../javadsl/EventSourcedProvider.scala             |  21 ++++
 .../scaladsl/EventSourcedProvider.scala            |  20 ++++
 .../scaldsl/EventSourcedProviderSpec.scala         | 124 +++++++++++++--------
 3 files changed, 119 insertions(+), 46 deletions(-)

diff --git 
a/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/javadsl/EventSourcedProvider.scala
 
b/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/javadsl/EventSourcedProvider.scala
index 35b0727..e4a5eb6 100644
--- 
a/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/javadsl/EventSourcedProvider.scala
+++ 
b/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/javadsl/EventSourcedProvider.scala
@@ -147,6 +147,17 @@ object EventSourcedProvider {
       .getReadJournalFor(classOf[EventsBySliceQuery], readJournalPluginId)
       .sliceForPersistenceId(persistenceId)
 
+  /** @since 2.0.0 */
+  def sliceForPersistenceId(
+      system: ActorSystem[_],
+      readJournalPluginId: String,
+      readJournalConfig: Config,
+      persistenceId: String
+  ): Int =
+    PersistenceQuery(system)
+      .getReadJournalFor(classOf[EventsBySliceQuery], readJournalPluginId, 
readJournalConfig)
+      .sliceForPersistenceId(persistenceId)
+
   def sliceRanges(
       system: ActorSystem[_],
       readJournalPluginId: String,
@@ -155,6 +166,16 @@ object EventSourcedProvider {
       .getReadJournalFor(classOf[EventsBySliceQuery], readJournalPluginId)
       .sliceRanges(numberOfRanges)
 
+  /** @since 2.0.0 */
+  def sliceRanges(
+      system: ActorSystem[_],
+      readJournalPluginId: String,
+      readJournalConfig: Config,
+      numberOfRanges: Int): java.util.List[Pair[Integer, Integer]] =
+    PersistenceQuery(system)
+      .getReadJournalFor(classOf[EventsBySliceQuery], readJournalPluginId, 
readJournalConfig)
+      .sliceRanges(numberOfRanges)
+
   /**
    * INTERNAL API
    */
diff --git 
a/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/scaladsl/EventSourcedProvider.scala
 
b/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/scaladsl/EventSourcedProvider.scala
index a97afdc..42dafa9 100644
--- 
a/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/scaladsl/EventSourcedProvider.scala
+++ 
b/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/scaladsl/EventSourcedProvider.scala
@@ -111,9 +111,29 @@ object EventSourcedProvider {
       .readJournalFor[EventsBySliceQuery](readJournalPluginId)
       .sliceForPersistenceId(persistenceId)
 
+  /** @since 2.0.0 */
+  def sliceForPersistenceId(
+      system: ActorSystem[_],
+      readJournalPluginId: String,
+      readJournalConfig: Config,
+      persistenceId: String): Int =
+    PersistenceQuery(system)
+      .readJournalFor[EventsBySliceQuery](readJournalPluginId, 
readJournalConfig)
+      .sliceForPersistenceId(persistenceId)
+
   def sliceRanges(system: ActorSystem[_], readJournalPluginId: String, 
numberOfRanges: Int): immutable.Seq[Range] =
     
PersistenceQuery(system).readJournalFor[EventsBySliceQuery](readJournalPluginId).sliceRanges(numberOfRanges)
 
+  /** @since 2.0.0 */
+  def sliceRanges(
+      system: ActorSystem[_],
+      readJournalPluginId: String,
+      readJournalConfig: Config,
+      numberOfRanges: Int): immutable.Seq[Range] =
+    PersistenceQuery(system)
+      .readJournalFor[EventsBySliceQuery](readJournalPluginId, 
readJournalConfig)
+      .sliceRanges(numberOfRanges)
+
   private class EventsBySlicesSourceProvider[Event](
       eventsBySlicesQuery: EventsBySliceQuery,
       entityType: String,
diff --git 
a/eventsourced/src/test/scala/org/apache/pekko/projection/eventsourced/scaldsl/EventSourcedProviderSpec.scala
 
b/eventsourced/src/test/scala/org/apache/pekko/projection/eventsourced/scaldsl/EventSourcedProviderSpec.scala
index ce462fa..2d4e4fb 100644
--- 
a/eventsourced/src/test/scala/org/apache/pekko/projection/eventsourced/scaldsl/EventSourcedProviderSpec.scala
+++ 
b/eventsourced/src/test/scala/org/apache/pekko/projection/eventsourced/scaldsl/EventSourcedProviderSpec.scala
@@ -34,6 +34,7 @@ import pekko.persistence.typed.scaladsl.Effect
 import pekko.persistence.typed.scaladsl.EventSourcedBehavior
 import pekko.projection.eventsourced.scaladsl.EventSourcedProvider
 import pekko.stream.testkit.scaladsl.TestSink
+import org.scalatest.Inspectors.forEvery
 import org.scalatest.freespec.AnyFreeSpecLike
 
 object EventSourcedProviderSpec {
@@ -162,66 +163,97 @@ class EventSourcedProviderSpec
       .expectNoMessage()
   }
 
-  "Should provide different events" - {
-    "by tags" - {
-      "for different tags" in {
-        val persistenceId1 = "a-id-1"
-        val persistenceId2 = "a-id-2"
-        val tag1 = "a-tag-1"
-        val tag2 = "a-tag-2"
+  "Should " - {
+    "provide different events" - {
+      "by tags" - {
+        "for different tags" in {
+          val persistenceId1 = "a-id-1"
+          val persistenceId2 = "a-id-2"
+          val tag1 = "a-tag-1"
+          val tag2 = "a-tag-2"
 
-        setup(persistenceId1, Set(tag1))
-        setup(persistenceId2, Set(tag2))
+          setup(persistenceId1, Set(tag1))
+          setup(persistenceId2, Set(tag2))
 
-        assertTag(tag1, Seq(s"$persistenceId1-event-1", 
s"$persistenceId1-event-2", s"$persistenceId1-event-3"))
-        assertTag(tag2, Seq(s"$persistenceId2-event-1", 
s"$persistenceId2-event-2", s"$persistenceId2-event-3"))
-      }
+          assertTag(tag1, Seq(s"$persistenceId1-event-1", 
s"$persistenceId1-event-2", s"$persistenceId1-event-3"))
+          assertTag(tag2, Seq(s"$persistenceId2-event-1", 
s"$persistenceId2-event-2", s"$persistenceId2-event-3"))
+        }
 
-      "for different journals" in {
-        val persistenceId1 = "b-id-1"
-        val tag1 = "b-tag-1"
-        val journal1 = "b-journal-1"
-        val journal2 = "b-journal-2"
+        "for different journals" in {
+          val persistenceId1 = "b-id-1"
+          val tag1 = "b-tag-1"
+          val journal1 = "b-journal-1"
+          val journal2 = "b-journal-2"
 
-        setup(persistenceId1, Set(tag1), Some(journal1))
-        setup(persistenceId1, Set(tag1), Some(journal2))
+          setup(persistenceId1, Set(tag1), Some(journal1))
+          setup(persistenceId1, Set(tag1), Some(journal2))
 
-        val expectedEvents = Seq(s"$persistenceId1-event-1", 
s"$persistenceId1-event-2", s"$persistenceId1-event-3")
-        assertTag(tag1, expectedEvents.map(_ + s"-$journal1"), Some(journal1))
-        assertTag(tag1, expectedEvents.map(_ + s"-$journal2"), Some(journal2))
+          val expectedEvents = Seq(s"$persistenceId1-event-1", 
s"$persistenceId1-event-2", s"$persistenceId1-event-3")
+          assertTag(tag1, expectedEvents.map(_ + s"-$journal1"), 
Some(journal1))
+          assertTag(tag1, expectedEvents.map(_ + s"-$journal2"), 
Some(journal2))
+        }
       }
-    }
 
-    "by slices" - {
-      "for different slices" in {
-        val persistenceId1 = makeFullPersistenceId("c-id-1")
-        val persistenceId2 = makeFullPersistenceId("c-id-2")
+      "by slices" - {
+        "for different slices" in {
+          val persistenceId1 = makeFullPersistenceId("c-id-1")
+          val persistenceId2 = makeFullPersistenceId("c-id-2")
+
+          val slice1 = persistence.sliceForPersistenceId(persistenceId1)
+          val slice2 = persistence.sliceForPersistenceId(persistenceId2)
+          slice1 should not be slice2
+
+          setup(persistenceId1)
+          setup(persistenceId2)
 
-        val slice1 = persistence.sliceForPersistenceId(persistenceId1)
-        val slice2 = persistence.sliceForPersistenceId(persistenceId2)
-        slice1 should not be slice2
+          val expectedEvents1 = Seq(s"$persistenceId1-event-1", 
s"$persistenceId1-event-2", s"$persistenceId1-event-3")
+          val expectedEvents2 = Seq(s"$persistenceId2-event-1", 
s"$persistenceId2-event-2", s"$persistenceId2-event-3")
+          assertSlices(0, numberOfSlices - 1, expectedEvents1 ++ 
expectedEvents2)
+          assertSlices(slice1, slice1, expectedEvents1)
+          assertSlices(slice2, slice2, expectedEvents2)
+        }
 
-        setup(persistenceId1)
-        setup(persistenceId2)
+        "for different journals" in {
+          val persistenceId1 = makeFullPersistenceId("d-id-1")
+          val journal1 = "d-journal-1"
+          val journal2 = "d-journal-2"
 
-        val expectedEvents1 = Seq(s"$persistenceId1-event-1", 
s"$persistenceId1-event-2", s"$persistenceId1-event-3")
-        val expectedEvents2 = Seq(s"$persistenceId2-event-1", 
s"$persistenceId2-event-2", s"$persistenceId2-event-3")
-        assertSlices(0, numberOfSlices - 1, expectedEvents1 ++ expectedEvents2)
-        assertSlices(slice1, slice1, expectedEvents1)
-        assertSlices(slice2, slice2, expectedEvents2)
+          setup(persistenceId1, maybeJournal = Some(journal1))
+          setup(persistenceId1, maybeJournal = Some(journal2))
+
+          val expectedEvents = Seq(s"$persistenceId1-event-1", 
s"$persistenceId1-event-2", s"$persistenceId1-event-3")
+          assertSlices(0, numberOfSlices - 1, expectedEvents.map(_ + 
s"-$journal1"), maybeJournal = Some(journal1))
+          assertSlices(0, numberOfSlices - 1, expectedEvents.map(_ + 
s"-$journal2"), maybeJournal = Some(journal2))
+        }
       }
+    }
 
-      "for different journals" in {
-        val persistenceId1 = makeFullPersistenceId("d-id-1")
-        val journal1 = "d-journal-1"
-        val journal2 = "d-journal-2"
+    "for different journals" - {
+      val persistenceId1 = makeFullPersistenceId("d-id-1")
+      val journal1 = "d-journal-1"
+      val journal2 = "d-journal-2"
+      val journals = Seq(journal1, journal2)
 
-        setup(persistenceId1, maybeJournal = Some(journal1))
-        setup(persistenceId1, maybeJournal = Some(journal2))
+      "return slice for persistence id" in {
+        forEvery(journals) { journal =>
+          EventSourcedProvider.sliceForPersistenceId(
+            system,
+            s"$journal.query",
+            journalConfig(journal),
+            persistenceId1
+          ) shouldBe 594
+        }
+      }
 
-        val expectedEvents = Seq(s"$persistenceId1-event-1", 
s"$persistenceId1-event-2", s"$persistenceId1-event-3")
-        assertSlices(0, numberOfSlices - 1, expectedEvents.map(_ + 
s"-$journal1"), maybeJournal = Some(journal1))
-        assertSlices(0, numberOfSlices - 1, expectedEvents.map(_ + 
s"-$journal2"), maybeJournal = Some(journal2))
+      "return slices ranges" in {
+        forEvery(journals) { journal =>
+          EventSourcedProvider.sliceRanges(
+            system,
+            s"$journal.query",
+            journalConfig(journal),
+            1
+          ) shouldBe Seq(0 until numberOfSlices)
+        }
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to