This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-r2dbc.git
The following commit(s) were added to refs/heads/main by this push:
new 4df0439 Port akka/akka-persistence-r2dbc#346: add persistenceIds by
entity type queries (#359)
4df0439 is described below
commit 4df0439e56120a56ee584af0c6dbbe7b631f738e
Author: PJ Fanning <[email protected]>
AuthorDate: Fri May 8 10:03:30 2026 +0100
Port akka/akka-persistence-r2dbc#346: add persistenceIds by entity type
queries (#359)
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/52e0d683-4795-489b-b41d-8eaf43554ec9
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
.../r2dbc/query/javadsl/R2dbcReadJournal.scala | 20 ++++++++++++++
.../r2dbc/query/scaladsl/QueryDao.scala | 32 ++++++++++++++++++++++
.../r2dbc/query/scaladsl/R2dbcReadJournal.scala | 20 ++++++++++++++
.../state/javadsl/R2dbcDurableStateStore.scala | 22 +++++++++++++++
.../r2dbc/state/scaladsl/DurableStateDao.scala | 31 +++++++++++++++++++++
.../state/scaladsl/R2dbcDurableStateStore.scala | 20 ++++++++++++++
.../query/CurrentPersistenceIdsQuerySpec.scala | 29 ++++++++++++++++++--
.../state/CurrentPersistenceIdsQuerySpec.scala | 28 +++++++++++++++++--
8 files changed, 197 insertions(+), 5 deletions(-)
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/javadsl/R2dbcReadJournal.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/javadsl/R2dbcReadJournal.scala
index 172f855..184b689 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/javadsl/R2dbcReadJournal.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/javadsl/R2dbcReadJournal.scala
@@ -94,6 +94,26 @@ final class R2dbcReadJournal(delegate:
scaladsl.R2dbcReadJournal)
override def currentPersistenceIds(afterId: Optional[String], limit: Long):
Source[String, NotUsed] =
delegate.currentPersistenceIds(afterId.toScala, limit).asJava
+ /**
+ * Get the current persistence ids.
+ *
+ * Note: to reuse existing index, the actual query filters entity types
based on persistence_id column and sql LIKE
+ * operator. Hence the persistenceId must start with an entity type followed
by default separator ("|") from
+ * [[pekko.persistence.typed.PersistenceId]].
+ *
+ * @param entityType
+ * The entity type name.
+ * @param afterId
+ * The ID to start returning results from, or empty to return all ids.
This should be an id returned from a previous
+ * invocation of this command. Callers should not assume that ids are
returned in sorted order.
+ * @param limit
+ * The maximum results to return. Use Long.MAX_VALUE to return all
results. Must be greater than zero.
+ * @return
+ * A source containing all the persistence ids, limited as specified.
+ */
+ def currentPersistenceIds(entityType: String, afterId: Optional[String],
limit: Long): Source[String, NotUsed] =
+ delegate.currentPersistenceIds(entityType, afterId.toScala, limit).asJava
+
override def timestampOf(persistenceId: String, sequenceNr: Long):
CompletionStage[Optional[Instant]] =
delegate.timestampOf(persistenceId,
sequenceNr).map(_.toJava)(ExecutionContext.parasitic).asJava
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
index 20aa370..11b207f 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
@@ -36,6 +36,7 @@ import
pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.persistence.r2dbc.journal.JournalDao
import pekko.persistence.r2dbc.journal.JournalDao.SerializedJournalRow
import pekko.persistence.r2dbc.query.scaladsl.mysql.MySQLQueryDao
+import pekko.persistence.typed.PersistenceId
import pekko.stream.scaladsl.Source
import com.typesafe.config.Config
import io.r2dbc.spi.ConnectionFactory
@@ -143,9 +144,15 @@ private[r2dbc] class QueryDao(val settings: QuerySettings,
connectionFactory: Co
private val allPersistenceIdsSql =
sql"SELECT DISTINCT(persistence_id) from $journalTable ORDER BY
persistence_id LIMIT ?"
+ private val persistenceIdsForEntityTypeSql =
+ sql"SELECT DISTINCT(persistence_id) from $journalTable WHERE
persistence_id LIKE ? ORDER BY persistence_id LIMIT ?"
+
private val allPersistenceIdsAfterSql =
sql"SELECT DISTINCT(persistence_id) from $journalTable WHERE
persistence_id > ? ORDER BY persistence_id LIMIT ?"
+ private val persistenceIdsForEntityTypeAfterSql =
+ sql"SELECT DISTINCT(persistence_id) from $journalTable WHERE
persistence_id LIKE ? AND persistence_id > ? ORDER BY persistence_id LIMIT ?"
+
protected val r2dbcExecutor =
new R2dbcExecutor(connectionFactory, log,
settings.logDbCallsExceeding)(ec, system)
@@ -305,6 +312,31 @@ private[r2dbc] class QueryDao(val settings: QuerySettings,
connectionFactory: Co
tags = Set.empty, // tags not fetched in queries (yet)
metadata = readMetadata(row)))
+ def persistenceIds(entityType: String, afterId: Option[String], limit:
Long): Source[String, NotUsed] = {
+ val likeStmtPostfix = PersistenceId.DefaultSeparator + "%"
+ val result = r2dbcExecutor.select(s"select persistenceIds by entity type")(
+ connection =>
+ afterId match {
+ case Some(after) =>
+ connection
+ .createStatement(persistenceIdsForEntityTypeAfterSql)
+ .bind(0, entityType + likeStmtPostfix)
+ .bind(1, after)
+ .bind(2, limit)
+ case None =>
+ connection
+ .createStatement(persistenceIdsForEntityTypeSql)
+ .bind(0, entityType + likeStmtPostfix)
+ .bind(1, limit)
+ },
+ row => row.get("persistence_id", classOf[String]))
+
+ if (log.isDebugEnabled)
+ result.foreach(rows => log.debug("Read [{}] persistence ids by entity
type [{}]", rows.size, entityType))
+
+ Source.futureSource(result.map(Source(_))).mapMaterializedValue(_ =>
NotUsed)
+ }
+
def persistenceIds(afterId: Option[String], limit: Long): Source[String,
NotUsed] = {
val result = r2dbcExecutor.select(s"select persistenceIds")(
connection =>
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
index d951a66..5be7856 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
@@ -349,6 +349,26 @@ final class R2dbcReadJournal(system: ExtendedActorSystem,
config: Config, cfgPat
override def currentPersistenceIds(afterId: Option[String], limit: Long):
Source[String, NotUsed] =
queryDao.persistenceIds(afterId, limit)
+ /**
+ * Get the current persistence ids.
+ *
+ * Note: to reuse existing index, the actual query filters entity types
based on persistence_id column and sql LIKE
+ * operator. Hence the persistenceId must start with an entity type followed
by default separator ("|") from
+ * [[pekko.persistence.typed.PersistenceId]].
+ *
+ * @param entityType
+ * The entity type name.
+ * @param afterId
+ * The ID to start returning results from, or [[None]] to return all ids.
This should be an id returned from a
+ * previous invocation of this command. Callers should not assume that ids
are returned in sorted order.
+ * @param limit
+ * The maximum results to return. Use Long.MaxValue to return all results.
Must be greater than zero.
+ * @return
+ * A source containing all the persistence ids, limited as specified.
+ */
+ def currentPersistenceIds(entityType: String, afterId: Option[String],
limit: Long): Source[String, NotUsed] =
+ queryDao.persistenceIds(entityType, afterId, limit)
+
override def currentPersistenceIds(): Source[String, NotUsed] = {
import settings.persistenceIdsBufferSize
def updateState(state: PersistenceIdsQueryState, pid: String):
PersistenceIdsQueryState =
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/javadsl/R2dbcDurableStateStore.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/javadsl/R2dbcDurableStateStore.scala
index 8770f85..abc1a85 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/javadsl/R2dbcDurableStateStore.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/javadsl/R2dbcDurableStateStore.scala
@@ -87,6 +87,28 @@ class R2dbcDurableStateStore[A](scalaStore:
ScalaR2dbcDurableStateStore[A])(impl
scalaStore.currentPersistenceIds(afterId.toScala, limit).asJava
}
+ /**
+ * Get the current persistence ids.
+ *
+ * Note: to reuse existing index, the actual query filters entity types
based on persistence_id column and sql LIKE
+ * operator. Hence the persistenceId must start with an entity type followed
by default separator ("|") from
+ * [[pekko.persistence.typed.PersistenceId]].
+ *
+ * @param entityType
+ * The entity type name.
+ * @param afterId
+ * The ID to start returning results from, or empty to return all ids.
This should be an id returned from a previous
+ * invocation of this command. Callers should not assume that ids are
returned in sorted order.
+ * @param limit
+ * The maximum results to return. Use Long.MAX_VALUE to return all
results. Must be greater than zero.
+ * @return
+ * A source containing all the persistence ids, limited as specified.
+ */
+ def currentPersistenceIds(entityType: String, afterId: Optional[String],
limit: Long): Source[String, NotUsed] = {
+ import scala.jdk.OptionConverters._
+ scalaStore.currentPersistenceIds(entityType, afterId.toScala, limit).asJava
+ }
+
def currentPersistenceIds(): Source[String, NotUsed] =
scalaStore.currentPersistenceIds().asJava
}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
index 96598d4..2eda936 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
@@ -160,9 +160,15 @@ private[r2dbc] class DurableStateDao(settings:
StateSettings, connectionFactory:
private val allPersistenceIdsSql =
sql"SELECT persistence_id from $stateTable ORDER BY persistence_id LIMIT ?"
+ private val persistenceIdsForEntityTypeSql =
+ sql"SELECT persistence_id from $stateTable WHERE persistence_id LIKE ?
ORDER BY persistence_id LIMIT ?"
+
private val allPersistenceIdsAfterSql =
sql"SELECT persistence_id from $stateTable WHERE persistence_id > ? ORDER
BY persistence_id LIMIT ?"
+ private val persistenceIdsForEntityTypeAfterSql =
+ sql"SELECT persistence_id from $stateTable WHERE persistence_id LIKE ? AND
persistence_id > ? ORDER BY persistence_id LIMIT ?"
+
protected def stateBySlicesRangeSql(
maxDbTimestampParam: Boolean,
behindCurrentTime: FiniteDuration,
@@ -464,6 +470,31 @@ private[r2dbc] class DurableStateDao(settings:
StateSettings, connectionFactory:
Source.futureSource(result.map(Source(_))).mapMaterializedValue(_ =>
NotUsed)
}
+ def persistenceIds(entityType: String, afterId: Option[String], limit:
Long): Source[String, NotUsed] = {
+ val likeStmtPostfix = PersistenceId.DefaultSeparator + "%"
+ val result = r2dbcExecutor.select(s"select persistenceIds by entity type")(
+ connection =>
+ afterId match {
+ case Some(after) =>
+ connection
+ .createStatement(persistenceIdsForEntityTypeAfterSql)
+ .bind(0, entityType + likeStmtPostfix)
+ .bind(1, after)
+ .bind(2, limit)
+ case None =>
+ connection
+ .createStatement(persistenceIdsForEntityTypeSql)
+ .bind(0, entityType + likeStmtPostfix)
+ .bind(1, limit)
+ },
+ row => row.get("persistence_id", classOf[String]))
+
+ if (log.isDebugEnabled)
+ result.foreach(rows => log.debug("Read [{}] persistence ids by entity
type [{}]", rows.size, entityType))
+
+ Source.futureSource(result.map(Source(_))).mapMaterializedValue(_ =>
NotUsed)
+ }
+
def persistenceIds(afterId: Option[String], limit: Long): Source[String,
NotUsed] = {
val result = r2dbcExecutor.select(s"select persistenceIds")(
connection =>
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala
index d53b479..5e9ceb1 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala
@@ -174,6 +174,26 @@ class R2dbcDurableStateStore[A](system:
ExtendedActorSystem, config: Config, cfg
override def currentPersistenceIds(afterId: Option[String], limit: Long):
Source[String, NotUsed] =
stateDao.persistenceIds(afterId, limit)
+ /**
+ * Get the current persistence ids.
+ *
+ * Note: to reuse existing index, the actual query filters entity types
based on persistence_id column and sql LIKE
+ * operator. Hence the persistenceId must start with an entity type followed
by default separator ("|") from
+ * [[pekko.persistence.typed.PersistenceId]].
+ *
+ * @param entityType
+ * The entity type name.
+ * @param afterId
+ * The ID to start returning results from, or [[None]] to return all ids.
This should be an id returned from a
+ * previous invocation of this command. Callers should not assume that ids
are returned in sorted order.
+ * @param limit
+ * The maximum results to return. Use Long.MaxValue to return all results.
Must be greater than zero.
+ * @return
+ * A source containing all the persistence ids, limited as specified.
+ */
+ def currentPersistenceIds(entityType: String, afterId: Option[String],
limit: Long): Source[String, NotUsed] =
+ stateDao.persistenceIds(entityType, afterId, limit)
+
def currentPersistenceIds(): Source[String, NotUsed] = {
import settings.persistenceIdsBufferSize
def updateState(state: PersistenceIdsQueryState, pid: String):
PersistenceIdsQueryState =
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/CurrentPersistenceIdsQuerySpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/CurrentPersistenceIdsQuerySpec.scala
index 0a59495..6b533c9 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/CurrentPersistenceIdsQuerySpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/CurrentPersistenceIdsQuerySpec.scala
@@ -49,9 +49,14 @@ class CurrentPersistenceIdsQuerySpec
private val query =
PersistenceQuery(testKit.system).readJournalFor[R2dbcReadJournal](R2dbcReadJournal.Identifier)
private val zeros = "0000"
- private val entityType = nextEntityType()
+ private val numberOfEntityTypes = 5
+ private val entityTypes = (1 to numberOfEntityTypes).map(_ =>
nextEntityType())
private val numberOfPids = 100
- private val pids = (1 to numberOfPids).map(n => PersistenceId(entityType,
"p" + zeros.drop(n.toString.length) + n))
+ private val pids = {
+ (1 to numberOfEntityTypes).flatMap(entityTypeId =>
+ (1 to numberOfPids / numberOfEntityTypes).map(n =>
+ PersistenceId(entityTypes(entityTypeId - 1), "p" +
zeros.drop(n.toString.length) + n)))
+ }
override protected def beforeAll(): Unit = {
super.beforeAll()
@@ -73,11 +78,29 @@ class CurrentPersistenceIdsQuerySpec
result shouldBe pids.map(_.id)
}
- "retrieve ids afterId" in {
+ "retrieve ids after id" in {
val result = query.currentPersistenceIds(afterId = Some(pids(9).id),
limit = 7).runWith(Sink.seq).futureValue
result shouldBe pids.slice(10, 17).map(_.id)
}
+ "retrieve ids for entity type" in {
+ val entityType = entityTypes(1)
+ val result =
+ query.currentPersistenceIds(entityType = entityType, afterId = None,
limit = 30).runWith(Sink.seq).futureValue
+ result shouldBe pids.filter(_.entityTypeHint == entityType).map(_.id)
+ }
+
+ "retrieve ids for entity type after id" in {
+ val entityType = entityTypes(0)
+ val result =
+ query
+ .currentPersistenceIds(entityType = entityType, afterId =
Some(pids(9).id), limit = 7)
+ .runWith(Sink.seq)
+ .futureValue
+
+ result shouldBe pids.filter(_.entityTypeHint == entityType).slice(10,
17).map(_.id)
+ }
+
}
}
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/CurrentPersistenceIdsQuerySpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/CurrentPersistenceIdsQuerySpec.scala
index e4de282..acd9b65 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/CurrentPersistenceIdsQuerySpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/CurrentPersistenceIdsQuerySpec.scala
@@ -50,9 +50,15 @@ class CurrentPersistenceIdsQuerySpec
.durableStateStoreFor[R2dbcDurableStateStore[String]](R2dbcDurableStateStore.Identifier)
private val zeros = "0000"
- private val entityType = nextEntityType()
+ private val numberOfEntityTypes = 5
+ private val entityTypes = (1 to numberOfEntityTypes).map(_ =>
nextEntityType())
private val numberOfPids = 100
- private val pids = (1 to numberOfPids).map(n => PersistenceId(entityType,
"p" + zeros.drop(n.toString.length) + n))
+
+ private val pids = {
+ (1 to numberOfEntityTypes).flatMap(entityTypeId =>
+ (1 to numberOfPids / numberOfEntityTypes).map(n =>
+ PersistenceId(entityTypes(entityTypeId - 1), "p" +
zeros.drop(n.toString.length) + n)))
+ }
override protected def beforeAll(): Unit = {
super.beforeAll()
@@ -78,6 +84,24 @@ class CurrentPersistenceIdsQuerySpec
result shouldBe pids.slice(10, 17).map(_.id)
}
+ "retrieve ids for entity type" in {
+ val entityType = entityTypes(1)
+ val result =
+ store.currentPersistenceIds(entityType = entityType, afterId = None,
limit = 30).runWith(Sink.seq).futureValue
+ result shouldBe pids.filter(_.entityTypeHint == entityType).map(_.id)
+ }
+
+ "retrieve ids for entity type after id" in {
+ val entityType = entityTypes(0)
+ val result =
+ store
+ .currentPersistenceIds(entityType = entityType, afterId =
Some(pids(9).id), limit = 7)
+ .runWith(Sink.seq)
+ .futureValue
+
+ result shouldBe pids.filter(_.entityTypeHint == entityType).slice(10,
17).map(_.id)
+ }
+
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]