This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-projection.git
The following commit(s) were added to refs/heads/main by this push:
new efcf7b7 Support runtime persistence plugin configuration in
sliceForPersistenceId and sliceRanges methods of EventSourcedProvider (#302)
efcf7b7 is described below
commit efcf7b7349c2d8ed61592e009bd4e4987a511192
Author: Domantas Petrauskas <[email protected]>
AuthorDate: Sun Oct 5 03:46:05 2025 +0300
Support runtime persistence plugin configuration in sliceForPersistenceId
and sliceRanges methods of EventSourcedProvider (#302)
* Add config parameter to sliceForPersistenceId and sliceRanges methods of
EventSourcedProvider
* Address review comments
---
.../javadsl/EventSourcedProvider.scala | 21 ++++
.../scaladsl/EventSourcedProvider.scala | 20 ++++
.../scaldsl/EventSourcedProviderSpec.scala | 124 +++++++++++++--------
3 files changed, 119 insertions(+), 46 deletions(-)
diff --git
a/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/javadsl/EventSourcedProvider.scala
b/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/javadsl/EventSourcedProvider.scala
index 35b0727..e4a5eb6 100644
---
a/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/javadsl/EventSourcedProvider.scala
+++
b/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/javadsl/EventSourcedProvider.scala
@@ -147,6 +147,17 @@ object EventSourcedProvider {
.getReadJournalFor(classOf[EventsBySliceQuery], readJournalPluginId)
.sliceForPersistenceId(persistenceId)
+ /** @since 2.0.0 */
+ def sliceForPersistenceId(
+ system: ActorSystem[_],
+ readJournalPluginId: String,
+ readJournalConfig: Config,
+ persistenceId: String
+ ): Int =
+ PersistenceQuery(system)
+ .getReadJournalFor(classOf[EventsBySliceQuery], readJournalPluginId,
readJournalConfig)
+ .sliceForPersistenceId(persistenceId)
+
def sliceRanges(
system: ActorSystem[_],
readJournalPluginId: String,
@@ -155,6 +166,16 @@ object EventSourcedProvider {
.getReadJournalFor(classOf[EventsBySliceQuery], readJournalPluginId)
.sliceRanges(numberOfRanges)
+ /** @since 2.0.0 */
+ def sliceRanges(
+ system: ActorSystem[_],
+ readJournalPluginId: String,
+ readJournalConfig: Config,
+ numberOfRanges: Int): java.util.List[Pair[Integer, Integer]] =
+ PersistenceQuery(system)
+ .getReadJournalFor(classOf[EventsBySliceQuery], readJournalPluginId,
readJournalConfig)
+ .sliceRanges(numberOfRanges)
+
/**
* INTERNAL API
*/
diff --git
a/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/scaladsl/EventSourcedProvider.scala
b/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/scaladsl/EventSourcedProvider.scala
index a97afdc..42dafa9 100644
---
a/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/scaladsl/EventSourcedProvider.scala
+++
b/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/scaladsl/EventSourcedProvider.scala
@@ -111,9 +111,29 @@ object EventSourcedProvider {
.readJournalFor[EventsBySliceQuery](readJournalPluginId)
.sliceForPersistenceId(persistenceId)
+ /** @since 2.0.0 */
+ def sliceForPersistenceId(
+ system: ActorSystem[_],
+ readJournalPluginId: String,
+ readJournalConfig: Config,
+ persistenceId: String): Int =
+ PersistenceQuery(system)
+ .readJournalFor[EventsBySliceQuery](readJournalPluginId,
readJournalConfig)
+ .sliceForPersistenceId(persistenceId)
+
def sliceRanges(system: ActorSystem[_], readJournalPluginId: String,
numberOfRanges: Int): immutable.Seq[Range] =
PersistenceQuery(system).readJournalFor[EventsBySliceQuery](readJournalPluginId).sliceRanges(numberOfRanges)
+ /** @since 2.0.0 */
+ def sliceRanges(
+ system: ActorSystem[_],
+ readJournalPluginId: String,
+ readJournalConfig: Config,
+ numberOfRanges: Int): immutable.Seq[Range] =
+ PersistenceQuery(system)
+ .readJournalFor[EventsBySliceQuery](readJournalPluginId,
readJournalConfig)
+ .sliceRanges(numberOfRanges)
+
private class EventsBySlicesSourceProvider[Event](
eventsBySlicesQuery: EventsBySliceQuery,
entityType: String,
diff --git
a/eventsourced/src/test/scala/org/apache/pekko/projection/eventsourced/scaldsl/EventSourcedProviderSpec.scala
b/eventsourced/src/test/scala/org/apache/pekko/projection/eventsourced/scaldsl/EventSourcedProviderSpec.scala
index ce462fa..2d4e4fb 100644
---
a/eventsourced/src/test/scala/org/apache/pekko/projection/eventsourced/scaldsl/EventSourcedProviderSpec.scala
+++
b/eventsourced/src/test/scala/org/apache/pekko/projection/eventsourced/scaldsl/EventSourcedProviderSpec.scala
@@ -34,6 +34,7 @@ import pekko.persistence.typed.scaladsl.Effect
import pekko.persistence.typed.scaladsl.EventSourcedBehavior
import pekko.projection.eventsourced.scaladsl.EventSourcedProvider
import pekko.stream.testkit.scaladsl.TestSink
+import org.scalatest.Inspectors.forEvery
import org.scalatest.freespec.AnyFreeSpecLike
object EventSourcedProviderSpec {
@@ -162,66 +163,97 @@ class EventSourcedProviderSpec
.expectNoMessage()
}
- "Should provide different events" - {
- "by tags" - {
- "for different tags" in {
- val persistenceId1 = "a-id-1"
- val persistenceId2 = "a-id-2"
- val tag1 = "a-tag-1"
- val tag2 = "a-tag-2"
+ "Should " - {
+ "provide different events" - {
+ "by tags" - {
+ "for different tags" in {
+ val persistenceId1 = "a-id-1"
+ val persistenceId2 = "a-id-2"
+ val tag1 = "a-tag-1"
+ val tag2 = "a-tag-2"
- setup(persistenceId1, Set(tag1))
- setup(persistenceId2, Set(tag2))
+ setup(persistenceId1, Set(tag1))
+ setup(persistenceId2, Set(tag2))
- assertTag(tag1, Seq(s"$persistenceId1-event-1",
s"$persistenceId1-event-2", s"$persistenceId1-event-3"))
- assertTag(tag2, Seq(s"$persistenceId2-event-1",
s"$persistenceId2-event-2", s"$persistenceId2-event-3"))
- }
+ assertTag(tag1, Seq(s"$persistenceId1-event-1",
s"$persistenceId1-event-2", s"$persistenceId1-event-3"))
+ assertTag(tag2, Seq(s"$persistenceId2-event-1",
s"$persistenceId2-event-2", s"$persistenceId2-event-3"))
+ }
- "for different journals" in {
- val persistenceId1 = "b-id-1"
- val tag1 = "b-tag-1"
- val journal1 = "b-journal-1"
- val journal2 = "b-journal-2"
+ "for different journals" in {
+ val persistenceId1 = "b-id-1"
+ val tag1 = "b-tag-1"
+ val journal1 = "b-journal-1"
+ val journal2 = "b-journal-2"
- setup(persistenceId1, Set(tag1), Some(journal1))
- setup(persistenceId1, Set(tag1), Some(journal2))
+ setup(persistenceId1, Set(tag1), Some(journal1))
+ setup(persistenceId1, Set(tag1), Some(journal2))
- val expectedEvents = Seq(s"$persistenceId1-event-1",
s"$persistenceId1-event-2", s"$persistenceId1-event-3")
- assertTag(tag1, expectedEvents.map(_ + s"-$journal1"), Some(journal1))
- assertTag(tag1, expectedEvents.map(_ + s"-$journal2"), Some(journal2))
+ val expectedEvents = Seq(s"$persistenceId1-event-1",
s"$persistenceId1-event-2", s"$persistenceId1-event-3")
+ assertTag(tag1, expectedEvents.map(_ + s"-$journal1"),
Some(journal1))
+ assertTag(tag1, expectedEvents.map(_ + s"-$journal2"),
Some(journal2))
+ }
}
- }
- "by slices" - {
- "for different slices" in {
- val persistenceId1 = makeFullPersistenceId("c-id-1")
- val persistenceId2 = makeFullPersistenceId("c-id-2")
+ "by slices" - {
+ "for different slices" in {
+ val persistenceId1 = makeFullPersistenceId("c-id-1")
+ val persistenceId2 = makeFullPersistenceId("c-id-2")
+
+ val slice1 = persistence.sliceForPersistenceId(persistenceId1)
+ val slice2 = persistence.sliceForPersistenceId(persistenceId2)
+ slice1 should not be slice2
+
+ setup(persistenceId1)
+ setup(persistenceId2)
- val slice1 = persistence.sliceForPersistenceId(persistenceId1)
- val slice2 = persistence.sliceForPersistenceId(persistenceId2)
- slice1 should not be slice2
+ val expectedEvents1 = Seq(s"$persistenceId1-event-1",
s"$persistenceId1-event-2", s"$persistenceId1-event-3")
+ val expectedEvents2 = Seq(s"$persistenceId2-event-1",
s"$persistenceId2-event-2", s"$persistenceId2-event-3")
+ assertSlices(0, numberOfSlices - 1, expectedEvents1 ++
expectedEvents2)
+ assertSlices(slice1, slice1, expectedEvents1)
+ assertSlices(slice2, slice2, expectedEvents2)
+ }
- setup(persistenceId1)
- setup(persistenceId2)
+ "for different journals" in {
+ val persistenceId1 = makeFullPersistenceId("d-id-1")
+ val journal1 = "d-journal-1"
+ val journal2 = "d-journal-2"
- val expectedEvents1 = Seq(s"$persistenceId1-event-1",
s"$persistenceId1-event-2", s"$persistenceId1-event-3")
- val expectedEvents2 = Seq(s"$persistenceId2-event-1",
s"$persistenceId2-event-2", s"$persistenceId2-event-3")
- assertSlices(0, numberOfSlices - 1, expectedEvents1 ++ expectedEvents2)
- assertSlices(slice1, slice1, expectedEvents1)
- assertSlices(slice2, slice2, expectedEvents2)
+ setup(persistenceId1, maybeJournal = Some(journal1))
+ setup(persistenceId1, maybeJournal = Some(journal2))
+
+ val expectedEvents = Seq(s"$persistenceId1-event-1",
s"$persistenceId1-event-2", s"$persistenceId1-event-3")
+ assertSlices(0, numberOfSlices - 1, expectedEvents.map(_ +
s"-$journal1"), maybeJournal = Some(journal1))
+ assertSlices(0, numberOfSlices - 1, expectedEvents.map(_ +
s"-$journal2"), maybeJournal = Some(journal2))
+ }
}
+ }
- "for different journals" in {
- val persistenceId1 = makeFullPersistenceId("d-id-1")
- val journal1 = "d-journal-1"
- val journal2 = "d-journal-2"
+ "for different journals" - {
+ val persistenceId1 = makeFullPersistenceId("d-id-1")
+ val journal1 = "d-journal-1"
+ val journal2 = "d-journal-2"
+ val journals = Seq(journal1, journal2)
- setup(persistenceId1, maybeJournal = Some(journal1))
- setup(persistenceId1, maybeJournal = Some(journal2))
+ "return slice for persistence id" in {
+ forEvery(journals) { journal =>
+ EventSourcedProvider.sliceForPersistenceId(
+ system,
+ s"$journal.query",
+ journalConfig(journal),
+ persistenceId1
+ ) shouldBe 594
+ }
+ }
- val expectedEvents = Seq(s"$persistenceId1-event-1",
s"$persistenceId1-event-2", s"$persistenceId1-event-3")
- assertSlices(0, numberOfSlices - 1, expectedEvents.map(_ +
s"-$journal1"), maybeJournal = Some(journal1))
- assertSlices(0, numberOfSlices - 1, expectedEvents.map(_ +
s"-$journal2"), maybeJournal = Some(journal2))
+ "return slices ranges" in {
+ forEvery(journals) { journal =>
+ EventSourcedProvider.sliceRanges(
+ system,
+ s"$journal.query",
+ journalConfig(journal),
+ 1
+ ) shouldBe Seq(0 until numberOfSlices)
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]