This is an automated email from the ASF dual-hosted git repository.

pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-r2dbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 4df0439  Port akka/akka-persistence-r2dbc#346: add persistenceIds by 
entity type queries (#359)
4df0439 is described below

commit 4df0439e56120a56ee584af0c6dbbe7b631f738e
Author: PJ Fanning <[email protected]>
AuthorDate: Fri May 8 10:03:30 2026 +0100

    Port akka/akka-persistence-r2dbc#346: add persistenceIds by entity type 
queries (#359)
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/52e0d683-4795-489b-b41d-8eaf43554ec9
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 .../r2dbc/query/javadsl/R2dbcReadJournal.scala     | 20 ++++++++++++++
 .../r2dbc/query/scaladsl/QueryDao.scala            | 32 ++++++++++++++++++++++
 .../r2dbc/query/scaladsl/R2dbcReadJournal.scala    | 20 ++++++++++++++
 .../state/javadsl/R2dbcDurableStateStore.scala     | 22 +++++++++++++++
 .../r2dbc/state/scaladsl/DurableStateDao.scala     | 31 +++++++++++++++++++++
 .../state/scaladsl/R2dbcDurableStateStore.scala    | 20 ++++++++++++++
 .../query/CurrentPersistenceIdsQuerySpec.scala     | 29 ++++++++++++++++++--
 .../state/CurrentPersistenceIdsQuerySpec.scala     | 28 +++++++++++++++++--
 8 files changed, 197 insertions(+), 5 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/javadsl/R2dbcReadJournal.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/javadsl/R2dbcReadJournal.scala
index 172f855..184b689 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/javadsl/R2dbcReadJournal.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/javadsl/R2dbcReadJournal.scala
@@ -94,6 +94,26 @@ final class R2dbcReadJournal(delegate: 
scaladsl.R2dbcReadJournal)
   override def currentPersistenceIds(afterId: Optional[String], limit: Long): 
Source[String, NotUsed] =
     delegate.currentPersistenceIds(afterId.toScala, limit).asJava
 
+  /**
+   * Get the current persistence ids.
+   *
+   * Note: to reuse existing index, the actual query filters entity types 
based on persistence_id column and sql LIKE
+   * operator. Hence the persistenceId must start with an entity type followed 
by default separator ("|") from
+   * [[pekko.persistence.typed.PersistenceId]].
+   *
+   * @param entityType
+   *   The entity type name.
+   * @param afterId
+   *   The ID to start returning results from, or empty to return all ids. 
This should be an id returned from a previous
+   *   invocation of this command. Callers should not assume that ids are 
returned in sorted order.
+   * @param limit
+   *   The maximum results to return. Use Long.MAX_VALUE to return all 
results. Must be greater than zero.
+   * @return
+   *   A source containing all the persistence ids, limited as specified.
+   */
+  def currentPersistenceIds(entityType: String, afterId: Optional[String], 
limit: Long): Source[String, NotUsed] =
+    delegate.currentPersistenceIds(entityType, afterId.toScala, limit).asJava
+
   override def timestampOf(persistenceId: String, sequenceNr: Long): 
CompletionStage[Optional[Instant]] =
     delegate.timestampOf(persistenceId, 
sequenceNr).map(_.toJava)(ExecutionContext.parasitic).asJava
 
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
index 20aa370..11b207f 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
@@ -36,6 +36,7 @@ import 
pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
 import pekko.persistence.r2dbc.journal.JournalDao
 import pekko.persistence.r2dbc.journal.JournalDao.SerializedJournalRow
 import pekko.persistence.r2dbc.query.scaladsl.mysql.MySQLQueryDao
+import pekko.persistence.typed.PersistenceId
 import pekko.stream.scaladsl.Source
 import com.typesafe.config.Config
 import io.r2dbc.spi.ConnectionFactory
@@ -143,9 +144,15 @@ private[r2dbc] class QueryDao(val settings: QuerySettings, 
connectionFactory: Co
   private val allPersistenceIdsSql =
     sql"SELECT DISTINCT(persistence_id) from $journalTable ORDER BY 
persistence_id LIMIT ?"
 
+  private val persistenceIdsForEntityTypeSql =
+    sql"SELECT DISTINCT(persistence_id) from $journalTable WHERE 
persistence_id LIKE ? ORDER BY persistence_id LIMIT ?"
+
   private val allPersistenceIdsAfterSql =
     sql"SELECT DISTINCT(persistence_id) from $journalTable WHERE 
persistence_id > ? ORDER BY persistence_id LIMIT ?"
 
+  private val persistenceIdsForEntityTypeAfterSql =
+    sql"SELECT DISTINCT(persistence_id) from $journalTable WHERE 
persistence_id LIKE ? AND persistence_id > ? ORDER BY persistence_id LIMIT ?"
+
   protected val r2dbcExecutor =
     new R2dbcExecutor(connectionFactory, log, 
settings.logDbCallsExceeding)(ec, system)
 
@@ -305,6 +312,31 @@ private[r2dbc] class QueryDao(val settings: QuerySettings, 
connectionFactory: Co
           tags = Set.empty, // tags not fetched in queries (yet)
           metadata = readMetadata(row)))
 
+  def persistenceIds(entityType: String, afterId: Option[String], limit: 
Long): Source[String, NotUsed] = {
+    val likeStmtPostfix = PersistenceId.DefaultSeparator + "%"
+    val result = r2dbcExecutor.select(s"select persistenceIds by entity type")(
+      connection =>
+        afterId match {
+          case Some(after) =>
+            connection
+              .createStatement(persistenceIdsForEntityTypeAfterSql)
+              .bind(0, entityType + likeStmtPostfix)
+              .bind(1, after)
+              .bind(2, limit)
+          case None =>
+            connection
+              .createStatement(persistenceIdsForEntityTypeSql)
+              .bind(0, entityType + likeStmtPostfix)
+              .bind(1, limit)
+        },
+      row => row.get("persistence_id", classOf[String]))
+
+    if (log.isDebugEnabled)
+      result.foreach(rows => log.debug("Read [{}] persistence ids by entity 
type [{}]", rows.size, entityType))
+
+    Source.futureSource(result.map(Source(_))).mapMaterializedValue(_ => 
NotUsed)
+  }
+
   def persistenceIds(afterId: Option[String], limit: Long): Source[String, 
NotUsed] = {
     val result = r2dbcExecutor.select(s"select persistenceIds")(
       connection =>
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
index d951a66..5be7856 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
@@ -349,6 +349,26 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, 
config: Config, cfgPat
   override def currentPersistenceIds(afterId: Option[String], limit: Long): 
Source[String, NotUsed] =
     queryDao.persistenceIds(afterId, limit)
 
+  /**
+   * Get the current persistence ids.
+   *
+   * Note: to reuse existing index, the actual query filters entity types 
based on persistence_id column and sql LIKE
+   * operator. Hence the persistenceId must start with an entity type followed 
by default separator ("|") from
+   * [[pekko.persistence.typed.PersistenceId]].
+   *
+   * @param entityType
+   *   The entity type name.
+   * @param afterId
+   *   The ID to start returning results from, or [[None]] to return all ids. 
This should be an id returned from a
+   *   previous invocation of this command. Callers should not assume that ids 
are returned in sorted order.
+   * @param limit
+   *   The maximum results to return. Use Long.MaxValue to return all results. 
Must be greater than zero.
+   * @return
+   *   A source containing all the persistence ids, limited as specified.
+   */
+  def currentPersistenceIds(entityType: String, afterId: Option[String], 
limit: Long): Source[String, NotUsed] =
+    queryDao.persistenceIds(entityType, afterId, limit)
+
   override def currentPersistenceIds(): Source[String, NotUsed] = {
     import settings.persistenceIdsBufferSize
     def updateState(state: PersistenceIdsQueryState, pid: String): 
PersistenceIdsQueryState =
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/javadsl/R2dbcDurableStateStore.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/javadsl/R2dbcDurableStateStore.scala
index 8770f85..abc1a85 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/javadsl/R2dbcDurableStateStore.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/javadsl/R2dbcDurableStateStore.scala
@@ -87,6 +87,28 @@ class R2dbcDurableStateStore[A](scalaStore: 
ScalaR2dbcDurableStateStore[A])(impl
     scalaStore.currentPersistenceIds(afterId.toScala, limit).asJava
   }
 
+  /**
+   * Get the current persistence ids.
+   *
+   * Note: to reuse existing index, the actual query filters entity types 
based on persistence_id column and sql LIKE
+   * operator. Hence the persistenceId must start with an entity type followed 
by default separator ("|") from
+   * [[pekko.persistence.typed.PersistenceId]].
+   *
+   * @param entityType
+   *   The entity type name.
+   * @param afterId
+   *   The ID to start returning results from, or empty to return all ids. 
This should be an id returned from a previous
+   *   invocation of this command. Callers should not assume that ids are 
returned in sorted order.
+   * @param limit
+   *   The maximum results to return. Use Long.MAX_VALUE to return all 
results. Must be greater than zero.
+   * @return
+   *   A source containing all the persistence ids, limited as specified.
+   */
+  def currentPersistenceIds(entityType: String, afterId: Optional[String], 
limit: Long): Source[String, NotUsed] = {
+    import scala.jdk.OptionConverters._
+    scalaStore.currentPersistenceIds(entityType, afterId.toScala, limit).asJava
+  }
+
   def currentPersistenceIds(): Source[String, NotUsed] =
     scalaStore.currentPersistenceIds().asJava
 }
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
index 96598d4..2eda936 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
@@ -160,9 +160,15 @@ private[r2dbc] class DurableStateDao(settings: 
StateSettings, connectionFactory:
   private val allPersistenceIdsSql =
     sql"SELECT persistence_id from $stateTable ORDER BY persistence_id LIMIT ?"
 
+  private val persistenceIdsForEntityTypeSql =
+    sql"SELECT persistence_id from $stateTable WHERE persistence_id LIKE ? 
ORDER BY persistence_id LIMIT ?"
+
   private val allPersistenceIdsAfterSql =
     sql"SELECT persistence_id from $stateTable WHERE persistence_id > ? ORDER 
BY persistence_id LIMIT ?"
 
+  private val persistenceIdsForEntityTypeAfterSql =
+    sql"SELECT persistence_id from $stateTable WHERE persistence_id LIKE ? AND 
persistence_id > ? ORDER BY persistence_id LIMIT ?"
+
   protected def stateBySlicesRangeSql(
       maxDbTimestampParam: Boolean,
       behindCurrentTime: FiniteDuration,
@@ -464,6 +470,31 @@ private[r2dbc] class DurableStateDao(settings: 
StateSettings, connectionFactory:
     Source.futureSource(result.map(Source(_))).mapMaterializedValue(_ => 
NotUsed)
   }
 
+  def persistenceIds(entityType: String, afterId: Option[String], limit: 
Long): Source[String, NotUsed] = {
+    val likeStmtPostfix = PersistenceId.DefaultSeparator + "%"
+    val result = r2dbcExecutor.select(s"select persistenceIds by entity type")(
+      connection =>
+        afterId match {
+          case Some(after) =>
+            connection
+              .createStatement(persistenceIdsForEntityTypeAfterSql)
+              .bind(0, entityType + likeStmtPostfix)
+              .bind(1, after)
+              .bind(2, limit)
+          case None =>
+            connection
+              .createStatement(persistenceIdsForEntityTypeSql)
+              .bind(0, entityType + likeStmtPostfix)
+              .bind(1, limit)
+        },
+      row => row.get("persistence_id", classOf[String]))
+
+    if (log.isDebugEnabled)
+      result.foreach(rows => log.debug("Read [{}] persistence ids by entity 
type [{}]", rows.size, entityType))
+
+    Source.futureSource(result.map(Source(_))).mapMaterializedValue(_ => 
NotUsed)
+  }
+
   def persistenceIds(afterId: Option[String], limit: Long): Source[String, 
NotUsed] = {
     val result = r2dbcExecutor.select(s"select persistenceIds")(
       connection =>
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala
index d53b479..5e9ceb1 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala
@@ -174,6 +174,26 @@ class R2dbcDurableStateStore[A](system: 
ExtendedActorSystem, config: Config, cfg
   override def currentPersistenceIds(afterId: Option[String], limit: Long): 
Source[String, NotUsed] =
     stateDao.persistenceIds(afterId, limit)
 
+  /**
+   * Get the current persistence ids.
+   *
+   * Note: to reuse existing index, the actual query filters entity types 
based on persistence_id column and sql LIKE
+   * operator. Hence the persistenceId must start with an entity type followed 
by default separator ("|") from
+   * [[pekko.persistence.typed.PersistenceId]].
+   *
+   * @param entityType
+   *   The entity type name.
+   * @param afterId
+   *   The ID to start returning results from, or [[None]] to return all ids. 
This should be an id returned from a
+   *   previous invocation of this command. Callers should not assume that ids 
are returned in sorted order.
+   * @param limit
+   *   The maximum results to return. Use Long.MaxValue to return all results. 
Must be greater than zero.
+   * @return
+   *   A source containing all the persistence ids, limited as specified.
+   */
+  def currentPersistenceIds(entityType: String, afterId: Option[String], 
limit: Long): Source[String, NotUsed] =
+    stateDao.persistenceIds(entityType, afterId, limit)
+
   def currentPersistenceIds(): Source[String, NotUsed] = {
     import settings.persistenceIdsBufferSize
     def updateState(state: PersistenceIdsQueryState, pid: String): 
PersistenceIdsQueryState =
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/CurrentPersistenceIdsQuerySpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/CurrentPersistenceIdsQuerySpec.scala
index 0a59495..6b533c9 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/CurrentPersistenceIdsQuerySpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/CurrentPersistenceIdsQuerySpec.scala
@@ -49,9 +49,14 @@ class CurrentPersistenceIdsQuerySpec
   private val query = 
PersistenceQuery(testKit.system).readJournalFor[R2dbcReadJournal](R2dbcReadJournal.Identifier)
 
   private val zeros = "0000"
-  private val entityType = nextEntityType()
+  private val numberOfEntityTypes = 5
+  private val entityTypes = (1 to numberOfEntityTypes).map(_ => 
nextEntityType())
   private val numberOfPids = 100
-  private val pids = (1 to numberOfPids).map(n => PersistenceId(entityType, 
"p" + zeros.drop(n.toString.length) + n))
+  private val pids = {
+    (1 to numberOfEntityTypes).flatMap(entityTypeId =>
+      (1 to numberOfPids / numberOfEntityTypes).map(n =>
+        PersistenceId(entityTypes(entityTypeId - 1), "p" + 
zeros.drop(n.toString.length) + n)))
+  }
 
   override protected def beforeAll(): Unit = {
     super.beforeAll()
@@ -73,11 +78,29 @@ class CurrentPersistenceIdsQuerySpec
       result shouldBe pids.map(_.id)
     }
 
-    "retrieve ids afterId" in {
+    "retrieve ids after id" in {
       val result = query.currentPersistenceIds(afterId = Some(pids(9).id), 
limit = 7).runWith(Sink.seq).futureValue
       result shouldBe pids.slice(10, 17).map(_.id)
     }
 
+    "retrieve ids for entity type" in {
+      val entityType = entityTypes(1)
+      val result =
+        query.currentPersistenceIds(entityType = entityType, afterId = None, 
limit = 30).runWith(Sink.seq).futureValue
+      result shouldBe pids.filter(_.entityTypeHint == entityType).map(_.id)
+    }
+
+    "retrieve ids for entity type after id" in {
+      val entityType = entityTypes(0)
+      val result =
+        query
+          .currentPersistenceIds(entityType = entityType, afterId = 
Some(pids(9).id), limit = 7)
+          .runWith(Sink.seq)
+          .futureValue
+
+      result shouldBe pids.filter(_.entityTypeHint == entityType).slice(10, 
17).map(_.id)
+    }
+
   }
 
 }
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/CurrentPersistenceIdsQuerySpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/CurrentPersistenceIdsQuerySpec.scala
index e4de282..acd9b65 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/CurrentPersistenceIdsQuerySpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/CurrentPersistenceIdsQuerySpec.scala
@@ -50,9 +50,15 @@ class CurrentPersistenceIdsQuerySpec
     
.durableStateStoreFor[R2dbcDurableStateStore[String]](R2dbcDurableStateStore.Identifier)
 
   private val zeros = "0000"
-  private val entityType = nextEntityType()
+  private val numberOfEntityTypes = 5
+  private val entityTypes = (1 to numberOfEntityTypes).map(_ => 
nextEntityType())
   private val numberOfPids = 100
-  private val pids = (1 to numberOfPids).map(n => PersistenceId(entityType, 
"p" + zeros.drop(n.toString.length) + n))
+
+  private val pids = {
+    (1 to numberOfEntityTypes).flatMap(entityTypeId =>
+      (1 to numberOfPids / numberOfEntityTypes).map(n =>
+        PersistenceId(entityTypes(entityTypeId - 1), "p" + 
zeros.drop(n.toString.length) + n)))
+  }
 
   override protected def beforeAll(): Unit = {
     super.beforeAll()
@@ -78,6 +84,24 @@ class CurrentPersistenceIdsQuerySpec
       result shouldBe pids.slice(10, 17).map(_.id)
     }
 
+    "retrieve ids for entity type" in {
+      val entityType = entityTypes(1)
+      val result =
+        store.currentPersistenceIds(entityType = entityType, afterId = None, 
limit = 30).runWith(Sink.seq).futureValue
+      result shouldBe pids.filter(_.entityTypeHint == entityType).map(_.id)
+    }
+
+    "retrieve ids for entity type after id" in {
+      val entityType = entityTypes(0)
+      val result =
+        store
+          .currentPersistenceIds(entityType = entityType, afterId = 
Some(pids(9).id), limit = 7)
+          .runWith(Sink.seq)
+          .futureValue
+
+      result shouldBe pids.filter(_.entityTypeHint == entityType).slice(10, 
17).map(_.id)
+    }
+
   }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to