This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-r2dbc.git
The following commit(s) were added to refs/heads/main by this push:
new 8b9f24f feat: add publish-events-number-of-topics support (#382)
8b9f24f is described below
commit 8b9f24f57315904367d94136ee0a66aafa980b8a
Author: PJ Fanning <[email protected]>
AuthorDate: Thu May 21 08:53:15 2026 +0100
feat: add publish-events-number-of-topics support (#382)
* feat: add publish-events-number-of-topics support
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/46bfeb49-fb2e-48fb-955d-2557d826e7f3
Co-authored-by: pjfanning <[email protected]>
* test: add typed query 'include tags' test to EventsByPersistenceIdSpec
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/46bfeb49-fb2e-48fb-955d-2557d826e7f3
Co-authored-by: pjfanning <[email protected]>
* scalafmt
* test: skip 'include tags' test for MySQL dialect
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/b52d4409-a714-4136-9970-759f542ac325
Co-authored-by: pjfanning <[email protected]>
* feat: MySQL tags via JSON column (TEXT -> JSON) with encode/decode support
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/4a1f1bd4-a41d-48ed-b4cd-308cdc8ac314
Co-authored-by: pjfanning <[email protected]>
* Update MySQLJournalDao.scala
* test: add MySQLTagsJsonSpec unit tests for tagsToJson/tagsFromJson with
IllegalStateException validation
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/8d19c9bb-890a-4ccc-93a9-0278bcd0471c
Co-authored-by: pjfanning <[email protected]>
* Update MySQLTagsJsonSpec.scala
* Update MySQLTagsJsonSpec.scala
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
core/src/main/resources/reference.conf | 12 ++
.../r2dbc/internal/EventsByPersistenceIdDao.scala | 6 +-
.../pekko/persistence/r2dbc/internal/PubSub.scala | 27 ++++-
.../persistence/r2dbc/journal/JournalDao.scala | 9 +-
.../r2dbc/journal/mysql/MySQLJournalDao.scala | 68 +++++++++++
.../r2dbc/query/scaladsl/QueryDao.scala | 5 +-
.../r2dbc/query/scaladsl/R2dbcReadJournal.scala | 8 +-
.../r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala | 5 +
.../r2dbc/state/scaladsl/DurableStateDao.scala | 19 ++-
.../scaladsl/mysql/MySQLDurableStateDao.scala | 5 +
.../r2dbc/journal/PersistTagsSpec.scala | 24 ++--
.../r2dbc/journal/mysql/MySQLTagsJsonSpec.scala | 132 +++++++++++++++++++++
.../r2dbc/query/EventsByPersistenceIdSpec.scala | 28 +++++
.../r2dbc/query/EventsBySlicePubSubSpec.scala | 39 ++++++
ddl-scripts/create_tables_mysql.sql | 4 +-
docs/src/main/paradox/query.md | 2 +
16 files changed, 355 insertions(+), 38 deletions(-)
diff --git a/core/src/main/resources/reference.conf
b/core/src/main/resources/reference.conf
index 60b553b..b05b9aa 100644
--- a/core/src/main/resources/reference.conf
+++ b/core/src/main/resources/reference.conf
@@ -36,6 +36,18 @@ pekko.persistence.r2dbc {
throughput-collect-interval = 10 seconds
}
+ # Group the slices for an entity type into this number of topics. Most
efficient is to use
+ # the same number as number of projection instances. If configured to less
than the number of
+ # projection instances the overhead is that events will be sent more than
once and discarded
+ # on the destination side. If configured to more than the number of
projection instances
+ # the events will only be sent once but there is a risk of exceeding the
limits of number
+ # of topics that PubSub can handle (e.g. OversizedPayloadException).
+ # Must be between 1 and 1024 and a whole number divisor of 1024 (number of
slices).
+ # This configuration can be changed in a rolling update, but there might
be some events
+ # that are not delivered via the pub-sub path and instead delivered later
by the queries.
+ # This configuration cannot be defined per journal, but is global for the
ActorSystem.
+ publish-events-number-of-topics = 128
+
# replay filter not needed for this plugin
replay-filter.mode = off
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/EventsByPersistenceIdDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/EventsByPersistenceIdDao.scala
index 9e63f2f..495a904 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/EventsByPersistenceIdDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/EventsByPersistenceIdDao.scala
@@ -26,6 +26,7 @@ import
pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.persistence.r2dbc.journal.JournalDao.SerializedJournalRow
import pekko.persistence.r2dbc.journal.JournalDao.readMetadata
import pekko.stream.scaladsl.Source
+import io.r2dbc.spi.Row
import org.slf4j.LoggerFactory
import scala.concurrent.ExecutionContext
@@ -65,6 +66,9 @@ private[r2dbc] trait EventsByPersistenceIdDao {
implicit protected def journalPayloadCodec: PayloadCodec
+ protected def tagsFromRow(row: Row): Set[String] =
+ setFromDb(row.get("tags", classOf[Array[String]]))
+
private lazy val selectEventsSql = sql"""
SELECT slice, entity_type, persistence_id, seq_nr, db_timestamp,
$statementTimestampSql AS read_db_timestamp, event_ser_id, event_ser_manifest,
event_payload, writer, adapter_manifest, meta_ser_id, meta_ser_manifest,
meta_payload, tags
from $journalTable
@@ -149,7 +153,7 @@ private[r2dbc] trait EventsByPersistenceIdDao {
serId = row.get[Integer]("event_ser_id", classOf[Integer]),
serManifest = row.get("event_ser_manifest", classOf[String]),
writerUuid = row.get("writer", classOf[String]),
- tags = setFromDb(row.get("tags", classOf[Array[String]])),
+ tags = tagsFromRow(row),
metadata = readMetadata(row)))
if (log.isDebugEnabled)
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/PubSub.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/PubSub.scala
index 7c6bbf4..2bf6f9b 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/PubSub.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/PubSub.scala
@@ -59,6 +59,14 @@ import org.slf4j.LoggerFactory
private val settings = new PublishEventsDynamicSettings(
system.settings.config.getConfig("pekko.persistence.r2dbc.journal.publish-events-dynamic"))
+
+ private val sliceRanges = {
+ val numberOfTopics =
+
system.settings.config.getInt("pekko.persistence.r2dbc.journal.publish-events-number-of-topics")
+ persistenceExt.sliceRanges(numberOfTopics)
+ }
+ private val sliceRangeLookup = new ConcurrentHashMap[Int, Range]
+
private val throughputCollectIntervalMillis =
settings.throughputCollectInterval.toMillis
private val throughputThreshold = settings.throughputThreshold.toDouble
private val throughputSampler =
@@ -74,8 +82,23 @@ import org.slf4j.LoggerFactory
.narrow[Topic.Command[EventEnvelope[Event]]]
}
- private def topicName(entityType: String, slice: Int): String =
- URLEncoder.encode(s"r2dbc-$entityType-$slice",
StandardCharsets.UTF_8.name())
+ def eventTopics[Event](
+ entityType: String,
+ minSlice: Int,
+ maxSlice: Int): Set[ActorRef[Topic.Command[EventEnvelope[Event]]]] = {
+ (minSlice to maxSlice).map(eventTopic[Event](entityType, _)).toSet
+ }
+
+ private def topicName(entityType: String, slice: Int): String = {
+ val range = sliceRangeLookup.computeIfAbsent(
+ slice,
+ _ =>
+ sliceRanges
+ .find(_.contains(slice))
+ .getOrElse(throw new IllegalArgumentException(s"Slice [$slice] not
found in " +
+ s"slice ranges [${sliceRanges.mkString(", ")}]")))
+ URLEncoder.encode(s"r2dbc-$entityType-${range.min}-${range.max}",
StandardCharsets.UTF_8.name())
+ }
def publish(pr: PersistentRepr, timestamp: Instant): Unit = {
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
index 73e26f9..974d2cd 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
@@ -116,6 +116,10 @@ private[r2dbc] class JournalDao(val settings:
JournalSettings, connectionFactory
protected val journalTable: String = settings.journalTableWithSchema
protected implicit val journalPayloadCodec: PayloadCodec =
settings.journalPayloadCodec
+ protected def bindTagsForWrite(stmt: Statement, tags: Set[String], index:
Int): Statement =
+ if (tags.isEmpty) stmt.bindNull(index, classOf[Array[String]])
+ else stmt.bind(index, tags.toArray)
+
protected val (insertEventWithParameterTimestampSql: String,
insertEventWithTransactionTimestampSql: String) = {
val baseSql =
s"INSERT INTO $journalTable " +
@@ -194,10 +198,7 @@ private[r2dbc] class JournalDao(val settings:
JournalSettings, connectionFactory
.bind(7, write.serManifest)
.bindPayload(8, write.payload.get)
- if (write.tags.isEmpty)
- stmt.bindNull(9, classOf[Array[String]])
- else
- stmt.bind(9, write.tags.toArray)
+ bindTagsForWrite(stmt, write.tags, 9)
// optional metadata
write.metadata match {
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/mysql/MySQLJournalDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/mysql/MySQLJournalDao.scala
index e336add..538dd0d 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/mysql/MySQLJournalDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/mysql/MySQLJournalDao.scala
@@ -29,12 +29,73 @@ import pekko.persistence.r2dbc.UseAppTimestamp
import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.persistence.r2dbc.journal.JournalDao
import io.r2dbc.spi.ConnectionFactory
+import io.r2dbc.spi.Row
+import io.r2dbc.spi.Statement
/**
* INTERNAL API
*/
@InternalApi
private[r2dbc] object MySQLJournalDao {
+
+ /** Serialise a set of tags to a JSON array string for MySQL JSON column
storage. */
+ def tagsToJson(tags: Set[String]): String =
+ tags.iterator
+ .map(t => "\"" + t.replace("\\", "\\\\").replace("\"", "\\\"") + "\"")
+ .mkString("[", ",", "]")
+
+ /**
+ * Deserialise a JSON array string from a MySQL JSON column back to a set of
tags.
+ * Returns an empty set for `null` or an empty JSON array (with optional
surrounding whitespace).
+ * Throws [[IllegalStateException]] if the input is not a valid JSON array
of strings.
+ */
+ def tagsFromJson(json: String): Set[String] = {
+ if (json == null) return Set.empty
+ val trimmed = json.trim
+ if (!trimmed.startsWith("[") || !trimmed.endsWith("]"))
+ throw new IllegalStateException(s"tags JSON is not a JSON array: $json")
+ val inner = trimmed.substring(1, trimmed.length - 1).trim
+ if (inner.isEmpty) return Set.empty
+ val result = scala.collection.mutable.Set.empty[String]
+ val n = trimmed.length
+ var i = 1 // skip opening '['
+ while (i < n - 1) { // stop before closing ']'
+ val c = trimmed.charAt(i)
+ if (c == '"') {
+ val sb = new StringBuilder
+ i += 1
+ var closed = false
+ while (i < n - 1 && !closed) {
+ trimmed.charAt(i) match {
+ case '"' =>
+ closed = true
+ case '\\' if i + 1 < n - 1 =>
+ i += 1
+ trimmed.charAt(i) match {
+ case '"' => sb.append('"')
+ case '\\' => sb.append('\\')
+ case 'n' => sb.append('\n')
+ case 'r' => sb.append('\r')
+ case 't' => sb.append('\t')
+ case c2 => sb.append('\\').append(c2)
+ }
+ i += 1
+ case other =>
+ sb.append(other)
+ i += 1
+ }
+ }
+ if (!closed)
+ throw new IllegalStateException(s"tags JSON contains an unterminated
string: $json")
+ result += sb.toString()
+ } else if (c != ',' && c != ' ' && c != '\t' && c != '\n' && c != '\r') {
+ throw new IllegalStateException(s"tags JSON contains non-string
element: $json")
+ }
+ i += 1
+ }
+ result.toSet
+ }
+
def settingRequirements(settings: UseAppTimestamp with
DbTimestampMonotonicIncreasing): Unit = {
// Application timestamps are used because MySQL does not have
transaction_timestamp like Postgres. In future releases
// they could be tried to be emulated, but the benefits are questionable -
no matter where the timestamps are generated,
@@ -65,6 +126,13 @@ private[r2dbc] class MySQLJournalDao(
override lazy val timestampSql: String = "NOW(6)"
override lazy val statementTimestampSql: String = "NOW(6)"
+ override protected def bindTagsForWrite(stmt: Statement, tags: Set[String],
index: Int): Statement =
+ if (tags.isEmpty) stmt.bindNull(index, classOf[String])
+ else stmt.bind(index, MySQLJournalDao.tagsToJson(tags))
+
+ override protected def tagsFromRow(row: Row): Set[String] =
+ MySQLJournalDao.tagsFromJson(row.get("tags", classOf[String]))
+
override val insertEventWithParameterTimestampSql: String =
sql"INSERT INTO $journalTable " +
"(slice, entity_type, persistence_id, seq_nr, writer, adapter_manifest,
event_ser_id, event_ser_manifest, " +
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
index 451225b..fa859bb 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
@@ -43,6 +43,7 @@ import pekko.persistence.typed.PersistenceId
import pekko.stream.scaladsl.Source
import com.typesafe.config.Config
import io.r2dbc.spi.ConnectionFactory
+import io.r2dbc.spi.Row
import org.slf4j.Logger
import org.slf4j.LoggerFactory
@@ -231,7 +232,7 @@ private[r2dbc] class QueryDao(val settings: QuerySettings,
connectionFactory: Co
serId = row.get[Integer]("event_ser_id", classOf[Integer]),
serManifest = row.get("event_ser_manifest", classOf[String]),
writerUuid = "", // not need in this query
- tags = setFromDb(row.get("tags", classOf[Array[String]])),
+ tags = tagsFromRow(row),
metadata = readMetadata(row)))
if (log.isDebugEnabled)
@@ -317,7 +318,7 @@ private[r2dbc] class QueryDao(val settings: QuerySettings,
connectionFactory: Co
serId = row.get[Integer]("event_ser_id", classOf[Integer]),
serManifest = row.get("event_ser_manifest", classOf[String]),
writerUuid = "", // not need in this query
- tags = setFromDb(row.get("tags", classOf[Array[String]])),
+ tags = tagsFromRow(row),
metadata = readMetadata(row)))
def persistenceIds(entityType: String, afterId: Option[String], limit:
Long): Source[String, NotUsed] = {
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
index 96c0988..beedaff 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
@@ -182,11 +182,15 @@ final class R2dbcReadJournal(system: ExtendedActorSystem,
config: Config, cfgPat
// OverflowStrategy.dropNew is long deprecated and removed in
Pekko 2.0.0.
overflowStrategy = OverflowStrategy.dropHead)
.mapMaterializedValue { ref =>
- (minSlice to maxSlice).foreach { slice =>
+ pubSub.eventTopics[Event](entityType, minSlice, maxSlice).foreach
{ topic =>
import pekko.actor.typed.scaladsl.adapter._
- pubSub.eventTopic(entityType, slice) !
Topic.Subscribe(ref.toTyped[EventEnvelope[Event]])
+ topic ! Topic.Subscribe(ref.toTyped[EventEnvelope[Event]])
}
}
+ .filter { env =>
+ val slice = sliceForPersistenceId(env.persistenceId)
+ minSlice <= slice && slice <= maxSlice
+ }
dbSource
.mergePrioritized(pubSubSource, leftPriority = 1, rightPriority = 10)
.via(skipPubSubTooFarAhead(settings.backtrackingEnabled,
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala
index 260b5a7..bb49a8a 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala
@@ -30,8 +30,10 @@ import pekko.actor.typed.ActorSystem
import pekko.annotation.InternalApi
import pekko.persistence.r2dbc.QuerySettings
import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
+import pekko.persistence.r2dbc.journal.mysql.MySQLJournalDao
import pekko.persistence.r2dbc.query.scaladsl.QueryDao
import io.r2dbc.spi.ConnectionFactory
+import io.r2dbc.spi.Row
/**
* INTERNAL API
@@ -44,6 +46,9 @@ private[r2dbc] class MySQLQueryDao(
override lazy val statementTimestampSql: String = "NOW(6)"
+ override protected def tagsFromRow(row: Row): Set[String] =
+ MySQLJournalDao.tagsFromJson(row.get("tags", classOf[String]))
+
override protected def selectColumns(backtracking: Boolean): String =
if (backtracking)
s"SELECT slice, persistence_id, seq_nr, db_timestamp,
$statementTimestampSql AS read_db_timestamp, tags "
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
index 0e39722..b7b48d1 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
@@ -125,6 +125,10 @@ private[r2dbc] class DurableStateDao(settings:
StateSettings, connectionFactory:
protected val stateTable = settings.durableStateTableWithSchema
protected implicit val statePayloadCodec: PayloadCodec =
settings.durableStatePayloadCodec
+ protected def bindTagsForWrite(stmt: Statement, tags: Set[String], index:
Int): Statement =
+ if (tags.isEmpty) stmt.bindNull(index, classOf[Array[String]])
+ else stmt.bind(index, tags.toArray)
+
private lazy val additionalColumns: Map[String,
immutable.IndexedSeq[AdditionalColumn[Any, Any]]] = {
settings.durableStateAdditionalColumnClasses.map { case (entityType,
columnClasses) =>
val instances = columnClasses.map(fqcn =>
AdditionalColumnFactory.create(system, fqcn))
@@ -334,13 +338,6 @@ private[r2dbc] class DurableStateDao(settings:
StateSettings, connectionFactory:
def upsertState(state: SerializedStateRow, value: Any): Future[Done] = {
require(state.revision > 0)
- def bindTags(stmt: Statement, i: Int): Statement = {
- if (state.tags.isEmpty)
- stmt.bindNull(i, classOf[Array[String]])
- else
- stmt.bind(i, state.tags.toArray)
- }
-
var bindIdx = 0
def getAndIncIndex(): Int = {
val i = bindIdx
@@ -388,7 +385,7 @@ private[r2dbc] class DurableStateDao(settings:
StateSettings, connectionFactory:
.bind(getAndIncIndex(), state.serId)
.bind(getAndIncIndex(), state.serManifest)
.bindPayloadOption(getAndIncIndex(), state.payload)
- bindTags(stmt, getAndIncIndex())
+ bindTagsForWrite(stmt, state.tags, getAndIncIndex())
bindAdditionalColumns(stmt, additionalBindings)
}
@@ -420,7 +417,7 @@ private[r2dbc] class DurableStateDao(settings:
StateSettings, connectionFactory:
.bind(getAndIncIndex(), state.serId)
.bind(getAndIncIndex(), state.serManifest)
.bindPayloadOption(getAndIncIndex(), state.payload)
- bindTags(stmt, getAndIncIndex())
+ bindTagsForWrite(stmt, state.tags, getAndIncIndex())
bindAdditionalColumns(stmt, additionalBindings)
if (settings.dbTimestampMonotonicIncreasing) {
@@ -503,7 +500,7 @@ private[r2dbc] class DurableStateDao(settings:
StateSettings, connectionFactory:
val slice = persistenceExt.sliceForPersistenceId(persistenceId)
def insertDeleteMarkerStatement(connection: Connection): Statement =
{
- connection
+ val stmt = connection
.createStatement(
insertStateSql(entityType, Vector.empty)
) // FIXME should the additional columns be cleared (null)? Then
they must allow NULL
@@ -514,7 +511,7 @@ private[r2dbc] class DurableStateDao(settings:
StateSettings, connectionFactory:
.bind(4, 0)
.bind(5, "")
.bindPayloadOption(6, None)
- .bindNull(7, classOf[Array[String]])
+ bindTagsForWrite(stmt, Set.empty, 7)
}
def recoverDataIntegrityViolation[A](f: Future[A]): Future[A] =
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/mysql/MySQLDurableStateDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/mysql/MySQLDurableStateDao.scala
index bcd7056..79f4168 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/mysql/MySQLDurableStateDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/mysql/MySQLDurableStateDao.scala
@@ -33,6 +33,7 @@ import
pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.persistence.r2dbc.journal.mysql.MySQLJournalDao
import pekko.persistence.r2dbc.state.scaladsl.DurableStateDao
import io.r2dbc.spi.ConnectionFactory
+import io.r2dbc.spi.Statement
/**
* INTERNAL API
@@ -46,6 +47,10 @@ private[r2dbc] class MySQLDurableStateDao(
override lazy val transactionTimestampSql: String = "NOW(6)"
+ override protected def bindTagsForWrite(stmt: Statement, tags: Set[String],
index: Int): Statement =
+ if (tags.isEmpty) stmt.bindNull(index, classOf[String])
+ else stmt.bind(index, MySQLJournalDao.tagsToJson(tags))
+
override def selectBucketsSql(minSlice: Int, maxSlice: Int): String = {
sql"""
SELECT CAST(UNIX_TIMESTAMP(db_timestamp) AS SIGNED) / 10 AS bucket,
count(*) AS count
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTagsSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTagsSpec.scala
index cb4e640..f4e5b17 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTagsSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTagsSpec.scala
@@ -24,6 +24,7 @@ import pekko.persistence.r2dbc.TestActors.Persister
import pekko.persistence.r2dbc.TestConfig
import pekko.persistence.r2dbc.TestData
import pekko.persistence.r2dbc.TestDbLifecycle
+import pekko.persistence.r2dbc.journal.mysql.MySQLJournalDao
import pekko.persistence.typed.PersistenceId
import org.scalatest.wordspec.AnyWordSpecLike
@@ -41,17 +42,7 @@ class PersistTagsSpec
private lazy val dialect =
system.settings.config.getString("pekko.persistence.r2dbc.journal.dialect")
- private lazy val testEnabled: Boolean = {
- // tags are not implemented for MySQL
- dialect != "mysql"
- }
-
"Persist tags" should {
- if (!testEnabled) {
- info(s"PersistTagsSpec not enabled for $dialect")
- pending
- }
-
"be the same for events stored in same transaction" in {
val numberOfEntities = 9
val entityType = nextEntityType()
@@ -77,10 +68,15 @@ class PersistTagsSpec
.select[Row]("test")(
connection => connection.createStatement(s"select * from
${settings.journalTableWithSchema}"),
row => {
- val tags = row.get("tags", classOf[Array[String]]) match {
- case null => Set.empty[String]
- case tagsArray => tagsArray.toSet
- }
+ val tags =
+ if (dialect == "mysql") {
+ MySQLJournalDao.tagsFromJson(row.get("tags",
classOf[String]))
+ } else {
+ row.get("tags", classOf[Array[String]]) match {
+ case null => Set.empty[String]
+ case tagsArray => tagsArray.toSet
+ }
+ }
Row(
pid = row.get("persistence_id", classOf[String]),
seqNr = row.get[java.lang.Long]("seq_nr",
classOf[java.lang.Long]),
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/mysql/MySQLTagsJsonSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/mysql/MySQLTagsJsonSpec.scala
new file mode 100644
index 0000000..2263a89
--- /dev/null
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/mysql/MySQLTagsJsonSpec.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pekko.persistence.r2dbc.journal.mysql
+
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpec
+
+class MySQLTagsJsonSpec extends AnyWordSpec with Matchers {
+
+ "MySQLJournalDao.tagsToJson" should {
+
+ "serialise an empty set to an empty JSON array" in {
+ MySQLJournalDao.tagsToJson(Set.empty) shouldBe "[]"
+ }
+
+ "serialise a single tag to a compact JSON array" in {
+ MySQLJournalDao.tagsToJson(Set("tag1")) shouldBe """["tag1"]"""
+ }
+
+ "produce compact JSON (no extra whitespace)" in {
+ val json = MySQLJournalDao.tagsToJson(Set("a", "b", "c"))
+ json should startWith("[")
+ json should endWith("]")
+ (json should not).include(" ")
+ }
+
+ "escape double-quotes in tag values" in {
+ val json = MySQLJournalDao.tagsToJson(Set("""say "hello""""))
+ json shouldBe """["say \"hello\""]"""
+ }
+
+ "escape backslashes in tag values" in {
+ val json = MySQLJournalDao.tagsToJson(Set("""path\to\file"""))
+ json shouldBe """["path\\to\\file"]"""
+ }
+
+ "round-trip through tagsFromJson" in {
+ val tags = Set("alpha", "beta", "gamma")
+ MySQLJournalDao.tagsFromJson(MySQLJournalDao.tagsToJson(tags)) shouldBe
tags
+ }
+ }
+
+ "MySQLJournalDao.tagsFromJson" should {
+
+ "return an empty set for null input" in {
+ MySQLJournalDao.tagsFromJson(null) shouldBe Set.empty
+ }
+
+ "return an empty set for an empty JSON array '[]'" in {
+ MySQLJournalDao.tagsFromJson("[]") shouldBe Set.empty
+ }
+
+ "return an empty set for an empty JSON array with spaces '[ ]'" in {
+ MySQLJournalDao.tagsFromJson("[ ]") shouldBe Set.empty
+ }
+
+ "return an empty set for an empty JSON array with leading/trailing
whitespace ' [] '" in {
+ MySQLJournalDao.tagsFromJson(" [] ") shouldBe Set.empty
+ }
+
+ "return an empty set for an empty array with internal whitespace '[\n]'"
in {
+ MySQLJournalDao.tagsFromJson("[\n]") shouldBe Set.empty
+ }
+
+ "parse a single-element array" in {
+ MySQLJournalDao.tagsFromJson("""["tag1"]""") shouldBe Set("tag1")
+ }
+
+ "parse a multi-element array" in {
+ MySQLJournalDao.tagsFromJson("""["alpha","beta","gamma"]""") shouldBe
Set("alpha", "beta", "gamma")
+ }
+
+ "parse a multi-element array with spaces after commas" in {
+ MySQLJournalDao.tagsFromJson("""["alpha", "beta", "gamma"]""") shouldBe
Set("alpha", "beta", "gamma")
+ }
+
+ "parse tags that contain escaped double-quotes" in {
+ MySQLJournalDao.tagsFromJson("""["say \"hello\""]""") shouldBe
Set("""say "hello"""")
+ }
+
+ "parse tags that contain escaped backslashes" in {
+ MySQLJournalDao.tagsFromJson("""["path\\to\\file"]""") shouldBe
Set("""path\to\file""")
+ }
+
+ "throw IllegalStateException when input is not a JSON array (plain
string)" in {
+ an[IllegalStateException] should be thrownBy {
+ MySQLJournalDao.tagsFromJson("notanarray")
+ }
+ }
+
+ "throw IllegalStateException when input is a JSON object" in {
+ an[IllegalStateException] should be thrownBy {
+ MySQLJournalDao.tagsFromJson("""{"tag":"value"}""")
+ }
+ }
+
+ "throw IllegalStateException when array contains a non-string element
(number)" in {
+ an[IllegalStateException] should be thrownBy {
+ MySQLJournalDao.tagsFromJson("""["valid", 42]""")
+ }
+ }
+
+ "throw IllegalStateException when array contains a non-string element
(boolean)" in {
+ an[IllegalStateException] should be thrownBy {
+ MySQLJournalDao.tagsFromJson("""["valid", true]""")
+ }
+ }
+
+ "throw IllegalStateException when string is unterminated" in {
+ an[IllegalStateException] should be thrownBy {
+ MySQLJournalDao.tagsFromJson("""["unterminated""")
+ }
+ }
+ }
+}
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsByPersistenceIdSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsByPersistenceIdSpec.scala
index 5291027..533296c 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsByPersistenceIdSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsByPersistenceIdSpec.scala
@@ -31,6 +31,7 @@ import pekko.persistence.r2dbc.TestDbLifecycle
import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
import pekko.persistence.typed.PersistenceId
import pekko.persistence.typed.internal.ReplicatedEventMetadata
+import pekko.stream.scaladsl.Sink
import pekko.stream.scaladsl.Source
import pekko.stream.testkit.TestSubscriber
import pekko.stream.testkit.scaladsl.TestSink
@@ -191,6 +192,33 @@ class EventsByPersistenceIdSpec
}
}
+ "Typed versions of query" should {
+ "include tags" in {
+ val probe = testKit.createTestProbe[Done]()
+ val entityType = nextEntityType()
+ val entityId = "entity-1"
+ val pid = PersistenceId(entityType, entityId)
+
+ val persister = testKit.spawn(TestActors.Persister(pid, tags =
Set("tag")))
+ persister ! Persister.PersistWithAck("e-1", probe.ref)
+ probe.expectMessage(Done)
+
+ val events = query
+ .currentEventsByPersistenceIdTyped[String](pid.id, 0L, Long.MaxValue)
+ .runWith(Sink.seq)
+ .futureValue
+
+ events should have size 1
+ events.head.tags should ===(Set("tag"))
+
+ val event = query
+ .eventsByPersistenceIdTyped[String](pid.id, 0L, Long.MaxValue)
+ .runWith(Sink.head)
+ .futureValue
+ event.tags should ===(Set("tag"))
+ }
+ }
+
"Live query" should {
"pick up new events" in {
val pid = nextPid()
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
index 70ac26c..c723cd2 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
@@ -18,6 +18,7 @@ import java.time.{ Duration => JDuration }
import scala.concurrent.Await
import scala.concurrent.duration._
+import scala.collection.immutable
import org.apache.pekko
import pekko.Done
@@ -26,6 +27,7 @@ import pekko.actor.testkit.typed.scaladsl.LoggingTestKit
import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import pekko.actor.typed.ActorSystem
import pekko.actor.typed.internal.pubsub.TopicImpl
+import pekko.persistence.Persistence
import pekko.persistence.query.NoOffset
import pekko.persistence.query.PersistenceQuery
import pekko.persistence.query.TimestampOffset
@@ -305,6 +307,43 @@ class EventsBySlicePubSubSpec
}
+ "group slices into topics" in new Setup {
+
+ val numberOfTopics =
+
typedSystem.settings.config.getInt("pekko.persistence.r2dbc.journal.publish-events-number-of-topics")
+ val querySliceRanges =
Persistence(typedSystem).sliceRanges(numberOfTopics * 2)
+ val queries:
immutable.IndexedSeq[TestSubscriber.Probe[EventEnvelope[String]]] = {
+ querySliceRanges.map { range =>
+ query
+ .eventsBySlices[String](setupEntityType, range.min, range.max,
NoOffset)
+ .runWith(sinkProbe)
+ .request(100)
+ }
+ }
+
+ val topicStatsProbe = createTestProbe[TopicImpl.TopicStats]()
+ eventually {
+ (0 until 1024).foreach { i =>
+ withClue(s"slice $i: ") {
+ PubSub(typedSystem).eventTopic[String](setupEntityType, i) !
TopicImpl.GetTopicStats(topicStatsProbe.ref)
+ topicStatsProbe.receiveMessage().localSubscriberCount shouldBe 2
+ }
+ }
+ }
+
+ for (i <- 1 to 10) {
+ persister ! PersistWithAck(s"e-$i", probe.ref)
+ probe.expectMessage(Done)
+ }
+
+ for (i <- 1 to 10) {
+ val queryIndex =
querySliceRanges.indexOf(querySliceRanges.find(_.contains(slice)).get)
+ queries(queryIndex).expectNext().event shouldBe s"e-$i"
+ }
+
+ queries.foreach(_.cancel())
+ }
+
}
}
diff --git a/ddl-scripts/create_tables_mysql.sql
b/ddl-scripts/create_tables_mysql.sql
index 67554e7..e006404 100644
--- a/ddl-scripts/create_tables_mysql.sql
+++ b/ddl-scripts/create_tables_mysql.sql
@@ -29,7 +29,7 @@ CREATE TABLE IF NOT EXISTS event_journal(
deleted BOOLEAN DEFAULT FALSE NOT NULL,
writer VARCHAR(255) NOT NULL,
adapter_manifest VARCHAR(255),
- tags TEXT, -- FIXME no array type, is this the best option?
+ tags JSON, -- stored as a JSON array of strings
meta_ser_id INTEGER,
meta_ser_manifest VARCHAR(255),
@@ -67,7 +67,7 @@ CREATE TABLE IF NOT EXISTS durable_state (
state_ser_id INTEGER NOT NULL,
state_ser_manifest VARCHAR(255),
state_payload BLOB NOT NULL,
- tags TEXT, -- FIXME no array type, is this the best option?
+ tags JSON, -- stored as a JSON array of strings
PRIMARY KEY(persistence_id, revision)
);
diff --git a/docs/src/main/paradox/query.md b/docs/src/main/paradox/query.md
index 5eb621d..3896179 100644
--- a/docs/src/main/paradox/query.md
+++ b/docs/src/main/paradox/query.md
@@ -112,6 +112,8 @@ Disable publishing of events with configuration:
pekko.persistence.r2dbc.journal.publish-events = off
```
+If you use many queries or Projection instances you should consider adjusting
the `pekko.persistence.r2dbc.journal.publish-events-number-of-topics`
configuration, see @ref:[Configuration](#configuration).
+
## Durable state queries
@apidoc[R2dbcDurableStateStore] implements the following @extref:[Persistence
Queries](pekko:durable-state/persistence-query.html):
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]