This is an automated email from the ASF dual-hosted git repository.

pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-r2dbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 8b9f24f  feat: add publish-events-number-of-topics support (#382)
8b9f24f is described below

commit 8b9f24f57315904367d94136ee0a66aafa980b8a
Author: PJ Fanning <[email protected]>
AuthorDate: Thu May 21 08:53:15 2026 +0100

    feat: add publish-events-number-of-topics support (#382)
    
    * feat: add publish-events-number-of-topics support
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/46bfeb49-fb2e-48fb-955d-2557d826e7f3
    
    Co-authored-by: pjfanning <[email protected]>
    
    * test: add typed query 'include tags' test to EventsByPersistenceIdSpec
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/46bfeb49-fb2e-48fb-955d-2557d826e7f3
    
    Co-authored-by: pjfanning <[email protected]>
    
    * scalafmt
    
    * test: skip 'include tags' test for MySQL dialect
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/b52d4409-a714-4136-9970-759f542ac325
    
    Co-authored-by: pjfanning <[email protected]>
    
    * feat: MySQL tags via JSON column (TEXT -> JSON) with encode/decode support
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/4a1f1bd4-a41d-48ed-b4cd-308cdc8ac314
    
    Co-authored-by: pjfanning <[email protected]>
    
    * Update MySQLJournalDao.scala
    
    * test: add MySQLTagsJsonSpec unit tests for tagsToJson/tagsFromJson with 
IllegalStateException validation
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/8d19c9bb-890a-4ccc-93a9-0278bcd0471c
    
    Co-authored-by: pjfanning <[email protected]>
    
    * Update MySQLTagsJsonSpec.scala
    
    * Update MySQLTagsJsonSpec.scala
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 core/src/main/resources/reference.conf             |  12 ++
 .../r2dbc/internal/EventsByPersistenceIdDao.scala  |   6 +-
 .../pekko/persistence/r2dbc/internal/PubSub.scala  |  27 ++++-
 .../persistence/r2dbc/journal/JournalDao.scala     |   9 +-
 .../r2dbc/journal/mysql/MySQLJournalDao.scala      |  68 +++++++++++
 .../r2dbc/query/scaladsl/QueryDao.scala            |   5 +-
 .../r2dbc/query/scaladsl/R2dbcReadJournal.scala    |   8 +-
 .../r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala |   5 +
 .../r2dbc/state/scaladsl/DurableStateDao.scala     |  19 ++-
 .../scaladsl/mysql/MySQLDurableStateDao.scala      |   5 +
 .../r2dbc/journal/PersistTagsSpec.scala            |  24 ++--
 .../r2dbc/journal/mysql/MySQLTagsJsonSpec.scala    | 132 +++++++++++++++++++++
 .../r2dbc/query/EventsByPersistenceIdSpec.scala    |  28 +++++
 .../r2dbc/query/EventsBySlicePubSubSpec.scala      |  39 ++++++
 ddl-scripts/create_tables_mysql.sql                |   4 +-
 docs/src/main/paradox/query.md                     |   2 +
 16 files changed, 355 insertions(+), 38 deletions(-)

diff --git a/core/src/main/resources/reference.conf 
b/core/src/main/resources/reference.conf
index 60b553b..b05b9aa 100644
--- a/core/src/main/resources/reference.conf
+++ b/core/src/main/resources/reference.conf
@@ -36,6 +36,18 @@ pekko.persistence.r2dbc {
       throughput-collect-interval = 10 seconds
     }
 
+    # Group the slices for an entity type into this number of topics. Most 
efficient is to use
+    # the same number as number of projection instances. If configured to less 
than the number of
+    # projection instances the overhead is that events will be sent more than 
once and discarded
+    # on the destination side. If configured to more than the number of 
projection instances
+    # the events will only be sent once but there is a risk of exceeding the 
limits of number
+    # of topics that PubSub can handle (e.g. OversizedPayloadException).
+    # Must be between 1 and 1024 and a whole number divisor of 1024 (number of 
slices).
+    # This configuration can be changed in a rolling update, but there might 
be some events
+    # that are not delivered via the pub-sub path and instead delivered later 
by the queries.
+    # This configuration cannot be defined per journal, but is global for the 
ActorSystem.
+    publish-events-number-of-topics = 128
+
     # replay filter not needed for this plugin
     replay-filter.mode = off
 
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/EventsByPersistenceIdDao.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/EventsByPersistenceIdDao.scala
index 9e63f2f..495a904 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/EventsByPersistenceIdDao.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/EventsByPersistenceIdDao.scala
@@ -26,6 +26,7 @@ import 
pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
 import pekko.persistence.r2dbc.journal.JournalDao.SerializedJournalRow
 import pekko.persistence.r2dbc.journal.JournalDao.readMetadata
 import pekko.stream.scaladsl.Source
+import io.r2dbc.spi.Row
 import org.slf4j.LoggerFactory
 import scala.concurrent.ExecutionContext
 
@@ -65,6 +66,9 @@ private[r2dbc] trait EventsByPersistenceIdDao {
 
   implicit protected def journalPayloadCodec: PayloadCodec
 
+  protected def tagsFromRow(row: Row): Set[String] =
+    setFromDb(row.get("tags", classOf[Array[String]]))
+
   private lazy val selectEventsSql = sql"""
     SELECT slice, entity_type, persistence_id, seq_nr, db_timestamp, 
$statementTimestampSql AS read_db_timestamp, event_ser_id, event_ser_manifest, 
event_payload, writer, adapter_manifest, meta_ser_id, meta_ser_manifest, 
meta_payload, tags
     from $journalTable
@@ -149,7 +153,7 @@ private[r2dbc] trait EventsByPersistenceIdDao {
           serId = row.get[Integer]("event_ser_id", classOf[Integer]),
           serManifest = row.get("event_ser_manifest", classOf[String]),
           writerUuid = row.get("writer", classOf[String]),
-          tags = setFromDb(row.get("tags", classOf[Array[String]])),
+          tags = tagsFromRow(row),
           metadata = readMetadata(row)))
 
     if (log.isDebugEnabled)
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/PubSub.scala 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/PubSub.scala
index 7c6bbf4..2bf6f9b 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/PubSub.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/PubSub.scala
@@ -59,6 +59,14 @@ import org.slf4j.LoggerFactory
 
   private val settings = new PublishEventsDynamicSettings(
     
system.settings.config.getConfig("pekko.persistence.r2dbc.journal.publish-events-dynamic"))
+
+  private val sliceRanges = {
+    val numberOfTopics =
+      
system.settings.config.getInt("pekko.persistence.r2dbc.journal.publish-events-number-of-topics")
+    persistenceExt.sliceRanges(numberOfTopics)
+  }
+  private val sliceRangeLookup = new ConcurrentHashMap[Int, Range]
+
   private val throughputCollectIntervalMillis = 
settings.throughputCollectInterval.toMillis
   private val throughputThreshold = settings.throughputThreshold.toDouble
   private val throughputSampler =
@@ -74,8 +82,23 @@ import org.slf4j.LoggerFactory
       .narrow[Topic.Command[EventEnvelope[Event]]]
   }
 
-  private def topicName(entityType: String, slice: Int): String =
-    URLEncoder.encode(s"r2dbc-$entityType-$slice", 
StandardCharsets.UTF_8.name())
+  def eventTopics[Event](
+      entityType: String,
+      minSlice: Int,
+      maxSlice: Int): Set[ActorRef[Topic.Command[EventEnvelope[Event]]]] = {
+    (minSlice to maxSlice).map(eventTopic[Event](entityType, _)).toSet
+  }
+
+  private def topicName(entityType: String, slice: Int): String = {
+    val range = sliceRangeLookup.computeIfAbsent(
+      slice,
+      _ =>
+        sliceRanges
+          .find(_.contains(slice))
+          .getOrElse(throw new IllegalArgumentException(s"Slice [$slice] not 
found in " +
+            s"slice ranges [${sliceRanges.mkString(", ")}]")))
+    URLEncoder.encode(s"r2dbc-$entityType-${range.min}-${range.max}", 
StandardCharsets.UTF_8.name())
+  }
 
   def publish(pr: PersistentRepr, timestamp: Instant): Unit = {
 
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
index 73e26f9..974d2cd 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
@@ -116,6 +116,10 @@ private[r2dbc] class JournalDao(val settings: 
JournalSettings, connectionFactory
   protected val journalTable: String = settings.journalTableWithSchema
   protected implicit val journalPayloadCodec: PayloadCodec = 
settings.journalPayloadCodec
 
+  protected def bindTagsForWrite(stmt: Statement, tags: Set[String], index: 
Int): Statement =
+    if (tags.isEmpty) stmt.bindNull(index, classOf[Array[String]])
+    else stmt.bind(index, tags.toArray)
+
   protected val (insertEventWithParameterTimestampSql: String, 
insertEventWithTransactionTimestampSql: String) = {
     val baseSql =
       s"INSERT INTO $journalTable " +
@@ -194,10 +198,7 @@ private[r2dbc] class JournalDao(val settings: 
JournalSettings, connectionFactory
         .bind(7, write.serManifest)
         .bindPayload(8, write.payload.get)
 
-      if (write.tags.isEmpty)
-        stmt.bindNull(9, classOf[Array[String]])
-      else
-        stmt.bind(9, write.tags.toArray)
+      bindTagsForWrite(stmt, write.tags, 9)
 
       // optional metadata
       write.metadata match {
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/mysql/MySQLJournalDao.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/mysql/MySQLJournalDao.scala
index e336add..538dd0d 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/mysql/MySQLJournalDao.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/mysql/MySQLJournalDao.scala
@@ -29,12 +29,73 @@ import pekko.persistence.r2dbc.UseAppTimestamp
 import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
 import pekko.persistence.r2dbc.journal.JournalDao
 import io.r2dbc.spi.ConnectionFactory
+import io.r2dbc.spi.Row
+import io.r2dbc.spi.Statement
 
 /**
  * INTERNAL API
  */
 @InternalApi
 private[r2dbc] object MySQLJournalDao {
+
+  /** Serialise a set of tags to a JSON array string for MySQL JSON column 
storage. */
+  def tagsToJson(tags: Set[String]): String =
+    tags.iterator
+      .map(t => "\"" + t.replace("\\", "\\\\").replace("\"", "\\\"") + "\"")
+      .mkString("[", ",", "]")
+
+  /**
+   * Deserialise a JSON array string from a MySQL JSON column back to a set of 
tags.
+   * Returns an empty set for `null` or an empty JSON array (with optional 
surrounding whitespace).
+   * Throws [[IllegalStateException]] if the input is not a valid JSON array 
of strings.
+   */
+  def tagsFromJson(json: String): Set[String] = {
+    if (json == null) return Set.empty
+    val trimmed = json.trim
+    if (!trimmed.startsWith("[") || !trimmed.endsWith("]"))
+      throw new IllegalStateException(s"tags JSON is not a JSON array: $json")
+    val inner = trimmed.substring(1, trimmed.length - 1).trim
+    if (inner.isEmpty) return Set.empty
+    val result = scala.collection.mutable.Set.empty[String]
+    val n = trimmed.length
+    var i = 1 // skip opening '['
+    while (i < n - 1) { // stop before closing ']'
+      val c = trimmed.charAt(i)
+      if (c == '"') {
+        val sb = new StringBuilder
+        i += 1
+        var closed = false
+        while (i < n - 1 && !closed) {
+          trimmed.charAt(i) match {
+            case '"' =>
+              closed = true
+            case '\\' if i + 1 < n - 1 =>
+              i += 1
+              trimmed.charAt(i) match {
+                case '"'  => sb.append('"')
+                case '\\' => sb.append('\\')
+                case 'n'  => sb.append('\n')
+                case 'r'  => sb.append('\r')
+                case 't'  => sb.append('\t')
+                case c2   => sb.append('\\').append(c2)
+              }
+              i += 1
+            case other =>
+              sb.append(other)
+              i += 1
+          }
+        }
+        if (!closed)
+          throw new IllegalStateException(s"tags JSON contains an unterminated 
string: $json")
+        result += sb.toString()
+      } else if (c != ',' && c != ' ' && c != '\t' && c != '\n' && c != '\r') {
+        throw new IllegalStateException(s"tags JSON contains non-string 
element: $json")
+      }
+      i += 1
+    }
+    result.toSet
+  }
+
   def settingRequirements(settings: UseAppTimestamp with 
DbTimestampMonotonicIncreasing): Unit = {
     // Application timestamps are used because MySQL does not have 
transaction_timestamp like Postgres. In future releases
     // they could be tried to be emulated, but the benefits are questionable - 
no matter where the timestamps are generated,
@@ -65,6 +126,13 @@ private[r2dbc] class MySQLJournalDao(
   override lazy val timestampSql: String = "NOW(6)"
   override lazy val statementTimestampSql: String = "NOW(6)"
 
+  override protected def bindTagsForWrite(stmt: Statement, tags: Set[String], 
index: Int): Statement =
+    if (tags.isEmpty) stmt.bindNull(index, classOf[String])
+    else stmt.bind(index, MySQLJournalDao.tagsToJson(tags))
+
+  override protected def tagsFromRow(row: Row): Set[String] =
+    MySQLJournalDao.tagsFromJson(row.get("tags", classOf[String]))
+
   override val insertEventWithParameterTimestampSql: String =
     sql"INSERT INTO $journalTable " +
     "(slice, entity_type, persistence_id, seq_nr, writer, adapter_manifest, 
event_ser_id, event_ser_manifest, " +
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
index 451225b..fa859bb 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
@@ -43,6 +43,7 @@ import pekko.persistence.typed.PersistenceId
 import pekko.stream.scaladsl.Source
 import com.typesafe.config.Config
 import io.r2dbc.spi.ConnectionFactory
+import io.r2dbc.spi.Row
 import org.slf4j.Logger
 import org.slf4j.LoggerFactory
 
@@ -231,7 +232,7 @@ private[r2dbc] class QueryDao(val settings: QuerySettings, 
connectionFactory: Co
             serId = row.get[Integer]("event_ser_id", classOf[Integer]),
             serManifest = row.get("event_ser_manifest", classOf[String]),
             writerUuid = "", // not need in this query
-            tags = setFromDb(row.get("tags", classOf[Array[String]])),
+            tags = tagsFromRow(row),
             metadata = readMetadata(row)))
 
     if (log.isDebugEnabled)
@@ -317,7 +318,7 @@ private[r2dbc] class QueryDao(val settings: QuerySettings, 
connectionFactory: Co
           serId = row.get[Integer]("event_ser_id", classOf[Integer]),
           serManifest = row.get("event_ser_manifest", classOf[String]),
           writerUuid = "", // not need in this query
-          tags = setFromDb(row.get("tags", classOf[Array[String]])),
+          tags = tagsFromRow(row),
           metadata = readMetadata(row)))
 
   def persistenceIds(entityType: String, afterId: Option[String], limit: 
Long): Source[String, NotUsed] = {
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
index 96c0988..beedaff 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
@@ -182,11 +182,15 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, 
config: Config, cfgPat
             // OverflowStrategy.dropNew is long deprecated and removed in 
Pekko 2.0.0.
             overflowStrategy = OverflowStrategy.dropHead)
           .mapMaterializedValue { ref =>
-            (minSlice to maxSlice).foreach { slice =>
+            pubSub.eventTopics[Event](entityType, minSlice, maxSlice).foreach 
{ topic =>
               import pekko.actor.typed.scaladsl.adapter._
-              pubSub.eventTopic(entityType, slice) ! 
Topic.Subscribe(ref.toTyped[EventEnvelope[Event]])
+              topic ! Topic.Subscribe(ref.toTyped[EventEnvelope[Event]])
             }
           }
+          .filter { env =>
+            val slice = sliceForPersistenceId(env.persistenceId)
+            minSlice <= slice && slice <= maxSlice
+          }
       dbSource
         .mergePrioritized(pubSubSource, leftPriority = 1, rightPriority = 10)
         .via(skipPubSubTooFarAhead(settings.backtrackingEnabled,
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala
index 260b5a7..bb49a8a 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala
@@ -30,8 +30,10 @@ import pekko.actor.typed.ActorSystem
 import pekko.annotation.InternalApi
 import pekko.persistence.r2dbc.QuerySettings
 import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
+import pekko.persistence.r2dbc.journal.mysql.MySQLJournalDao
 import pekko.persistence.r2dbc.query.scaladsl.QueryDao
 import io.r2dbc.spi.ConnectionFactory
+import io.r2dbc.spi.Row
 
 /**
  * INTERNAL API
@@ -44,6 +46,9 @@ private[r2dbc] class MySQLQueryDao(
 
   override lazy val statementTimestampSql: String = "NOW(6)"
 
+  override protected def tagsFromRow(row: Row): Set[String] =
+    MySQLJournalDao.tagsFromJson(row.get("tags", classOf[String]))
+
   override protected def selectColumns(backtracking: Boolean): String =
     if (backtracking)
       s"SELECT slice, persistence_id, seq_nr, db_timestamp, 
$statementTimestampSql AS read_db_timestamp, tags "
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
index 0e39722..b7b48d1 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
@@ -125,6 +125,10 @@ private[r2dbc] class DurableStateDao(settings: 
StateSettings, connectionFactory:
   protected val stateTable = settings.durableStateTableWithSchema
   protected implicit val statePayloadCodec: PayloadCodec = 
settings.durableStatePayloadCodec
 
+  protected def bindTagsForWrite(stmt: Statement, tags: Set[String], index: 
Int): Statement =
+    if (tags.isEmpty) stmt.bindNull(index, classOf[Array[String]])
+    else stmt.bind(index, tags.toArray)
+
   private lazy val additionalColumns: Map[String, 
immutable.IndexedSeq[AdditionalColumn[Any, Any]]] = {
     settings.durableStateAdditionalColumnClasses.map { case (entityType, 
columnClasses) =>
       val instances = columnClasses.map(fqcn => 
AdditionalColumnFactory.create(system, fqcn))
@@ -334,13 +338,6 @@ private[r2dbc] class DurableStateDao(settings: 
StateSettings, connectionFactory:
   def upsertState(state: SerializedStateRow, value: Any): Future[Done] = {
     require(state.revision > 0)
 
-    def bindTags(stmt: Statement, i: Int): Statement = {
-      if (state.tags.isEmpty)
-        stmt.bindNull(i, classOf[Array[String]])
-      else
-        stmt.bind(i, state.tags.toArray)
-    }
-
     var bindIdx = 0
     def getAndIncIndex(): Int = {
       val i = bindIdx
@@ -388,7 +385,7 @@ private[r2dbc] class DurableStateDao(settings: 
StateSettings, connectionFactory:
             .bind(getAndIncIndex(), state.serId)
             .bind(getAndIncIndex(), state.serManifest)
             .bindPayloadOption(getAndIncIndex(), state.payload)
-          bindTags(stmt, getAndIncIndex())
+          bindTagsForWrite(stmt, state.tags, getAndIncIndex())
           bindAdditionalColumns(stmt, additionalBindings)
         }
 
@@ -420,7 +417,7 @@ private[r2dbc] class DurableStateDao(settings: 
StateSettings, connectionFactory:
             .bind(getAndIncIndex(), state.serId)
             .bind(getAndIncIndex(), state.serManifest)
             .bindPayloadOption(getAndIncIndex(), state.payload)
-          bindTags(stmt, getAndIncIndex())
+          bindTagsForWrite(stmt, state.tags, getAndIncIndex())
           bindAdditionalColumns(stmt, additionalBindings)
 
           if (settings.dbTimestampMonotonicIncreasing) {
@@ -503,7 +500,7 @@ private[r2dbc] class DurableStateDao(settings: 
StateSettings, connectionFactory:
           val slice = persistenceExt.sliceForPersistenceId(persistenceId)
 
           def insertDeleteMarkerStatement(connection: Connection): Statement = 
{
-            connection
+            val stmt = connection
               .createStatement(
                 insertStateSql(entityType, Vector.empty)
               ) // FIXME should the additional columns be cleared (null)? Then 
they must allow NULL
@@ -514,7 +511,7 @@ private[r2dbc] class DurableStateDao(settings: 
StateSettings, connectionFactory:
               .bind(4, 0)
               .bind(5, "")
               .bindPayloadOption(6, None)
-              .bindNull(7, classOf[Array[String]])
+            bindTagsForWrite(stmt, Set.empty, 7)
           }
 
           def recoverDataIntegrityViolation[A](f: Future[A]): Future[A] =
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/mysql/MySQLDurableStateDao.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/mysql/MySQLDurableStateDao.scala
index bcd7056..79f4168 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/mysql/MySQLDurableStateDao.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/mysql/MySQLDurableStateDao.scala
@@ -33,6 +33,7 @@ import 
pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
 import pekko.persistence.r2dbc.journal.mysql.MySQLJournalDao
 import pekko.persistence.r2dbc.state.scaladsl.DurableStateDao
 import io.r2dbc.spi.ConnectionFactory
+import io.r2dbc.spi.Statement
 
 /**
  * INTERNAL API
@@ -46,6 +47,10 @@ private[r2dbc] class MySQLDurableStateDao(
 
   override lazy val transactionTimestampSql: String = "NOW(6)"
 
+  override protected def bindTagsForWrite(stmt: Statement, tags: Set[String], 
index: Int): Statement =
+    if (tags.isEmpty) stmt.bindNull(index, classOf[String])
+    else stmt.bind(index, MySQLJournalDao.tagsToJson(tags))
+
   override def selectBucketsSql(minSlice: Int, maxSlice: Int): String = {
     sql"""
      SELECT CAST(UNIX_TIMESTAMP(db_timestamp) AS SIGNED) / 10 AS bucket, 
count(*) AS count
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTagsSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTagsSpec.scala
index cb4e640..f4e5b17 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTagsSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTagsSpec.scala
@@ -24,6 +24,7 @@ import pekko.persistence.r2dbc.TestActors.Persister
 import pekko.persistence.r2dbc.TestConfig
 import pekko.persistence.r2dbc.TestData
 import pekko.persistence.r2dbc.TestDbLifecycle
+import pekko.persistence.r2dbc.journal.mysql.MySQLJournalDao
 import pekko.persistence.typed.PersistenceId
 import org.scalatest.wordspec.AnyWordSpecLike
 
@@ -41,17 +42,7 @@ class PersistTagsSpec
 
   private lazy val dialect = 
system.settings.config.getString("pekko.persistence.r2dbc.journal.dialect")
 
-  private lazy val testEnabled: Boolean = {
-    // tags are not implemented for MySQL
-    dialect != "mysql"
-  }
-
   "Persist tags" should {
-    if (!testEnabled) {
-      info(s"PersistTagsSpec not enabled for $dialect")
-      pending
-    }
-
     "be the same for events stored in same transaction" in {
       val numberOfEntities = 9
       val entityType = nextEntityType()
@@ -77,10 +68,15 @@ class PersistTagsSpec
           .select[Row]("test")(
             connection => connection.createStatement(s"select * from 
${settings.journalTableWithSchema}"),
             row => {
-              val tags = row.get("tags", classOf[Array[String]]) match {
-                case null      => Set.empty[String]
-                case tagsArray => tagsArray.toSet
-              }
+              val tags =
+                if (dialect == "mysql") {
+                  MySQLJournalDao.tagsFromJson(row.get("tags", 
classOf[String]))
+                } else {
+                  row.get("tags", classOf[Array[String]]) match {
+                    case null      => Set.empty[String]
+                    case tagsArray => tagsArray.toSet
+                  }
+                }
               Row(
                 pid = row.get("persistence_id", classOf[String]),
                 seqNr = row.get[java.lang.Long]("seq_nr", 
classOf[java.lang.Long]),
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/mysql/MySQLTagsJsonSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/mysql/MySQLTagsJsonSpec.scala
new file mode 100644
index 0000000..2263a89
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/mysql/MySQLTagsJsonSpec.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pekko.persistence.r2dbc.journal.mysql
+
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpec
+
+class MySQLTagsJsonSpec extends AnyWordSpec with Matchers {
+
+  "MySQLJournalDao.tagsToJson" should {
+
+    "serialise an empty set to an empty JSON array" in {
+      MySQLJournalDao.tagsToJson(Set.empty) shouldBe "[]"
+    }
+
+    "serialise a single tag to a compact JSON array" in {
+      MySQLJournalDao.tagsToJson(Set("tag1")) shouldBe """["tag1"]"""
+    }
+
+    "produce compact JSON (no extra whitespace)" in {
+      val json = MySQLJournalDao.tagsToJson(Set("a", "b", "c"))
+      json should startWith("[")
+      json should endWith("]")
+      (json should not).include(" ")
+    }
+
+    "escape double-quotes in tag values" in {
+      val json = MySQLJournalDao.tagsToJson(Set("""say "hello""""))
+      json shouldBe """["say \"hello\""]"""
+    }
+
+    "escape backslashes in tag values" in {
+      val json = MySQLJournalDao.tagsToJson(Set("""path\to\file"""))
+      json shouldBe """["path\\to\\file"]"""
+    }
+
+    "round-trip through tagsFromJson" in {
+      val tags = Set("alpha", "beta", "gamma")
+      MySQLJournalDao.tagsFromJson(MySQLJournalDao.tagsToJson(tags)) shouldBe 
tags
+    }
+  }
+
+  "MySQLJournalDao.tagsFromJson" should {
+
+    "return an empty set for null input" in {
+      MySQLJournalDao.tagsFromJson(null) shouldBe Set.empty
+    }
+
+    "return an empty set for an empty JSON array '[]'" in {
+      MySQLJournalDao.tagsFromJson("[]") shouldBe Set.empty
+    }
+
+    "return an empty set for an empty JSON array with spaces '[ ]'" in {
+      MySQLJournalDao.tagsFromJson("[ ]") shouldBe Set.empty
+    }
+
+    "return an empty set for an empty JSON array with leading/trailing 
whitespace '  []  '" in {
+      MySQLJournalDao.tagsFromJson("  []  ") shouldBe Set.empty
+    }
+
+    "return an empty set for an empty array with internal whitespace '[\n]'" 
in {
+      MySQLJournalDao.tagsFromJson("[\n]") shouldBe Set.empty
+    }
+
+    "parse a single-element array" in {
+      MySQLJournalDao.tagsFromJson("""["tag1"]""") shouldBe Set("tag1")
+    }
+
+    "parse a multi-element array" in {
+      MySQLJournalDao.tagsFromJson("""["alpha","beta","gamma"]""") shouldBe 
Set("alpha", "beta", "gamma")
+    }
+
+    "parse a multi-element array with spaces after commas" in {
+      MySQLJournalDao.tagsFromJson("""["alpha", "beta", "gamma"]""") shouldBe 
Set("alpha", "beta", "gamma")
+    }
+
+    "parse tags that contain escaped double-quotes" in {
+      MySQLJournalDao.tagsFromJson("""["say \"hello\""]""") shouldBe 
Set("""say "hello"""")
+    }
+
+    "parse tags that contain escaped backslashes" in {
+      MySQLJournalDao.tagsFromJson("""["path\\to\\file"]""") shouldBe 
Set("""path\to\file""")
+    }
+
+    "throw IllegalStateException when input is not a JSON array (plain 
string)" in {
+      an[IllegalStateException] should be thrownBy {
+        MySQLJournalDao.tagsFromJson("notanarray")
+      }
+    }
+
+    "throw IllegalStateException when input is a JSON object" in {
+      an[IllegalStateException] should be thrownBy {
+        MySQLJournalDao.tagsFromJson("""{"tag":"value"}""")
+      }
+    }
+
+    "throw IllegalStateException when array contains a non-string element 
(number)" in {
+      an[IllegalStateException] should be thrownBy {
+        MySQLJournalDao.tagsFromJson("""["valid", 42]""")
+      }
+    }
+
+    "throw IllegalStateException when array contains a non-string element 
(boolean)" in {
+      an[IllegalStateException] should be thrownBy {
+        MySQLJournalDao.tagsFromJson("""["valid", true]""")
+      }
+    }
+
+    "throw IllegalStateException when string is unterminated" in {
+      an[IllegalStateException] should be thrownBy {
+        MySQLJournalDao.tagsFromJson("""["unterminated""")
+      }
+    }
+  }
+}
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsByPersistenceIdSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsByPersistenceIdSpec.scala
index 5291027..533296c 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsByPersistenceIdSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsByPersistenceIdSpec.scala
@@ -31,6 +31,7 @@ import pekko.persistence.r2dbc.TestDbLifecycle
 import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
 import pekko.persistence.typed.PersistenceId
 import pekko.persistence.typed.internal.ReplicatedEventMetadata
+import pekko.stream.scaladsl.Sink
 import pekko.stream.scaladsl.Source
 import pekko.stream.testkit.TestSubscriber
 import pekko.stream.testkit.scaladsl.TestSink
@@ -191,6 +192,33 @@ class EventsByPersistenceIdSpec
     }
   }
 
+  "Typed versions of query" should {
+    "include tags" in {
+      val probe = testKit.createTestProbe[Done]()
+      val entityType = nextEntityType()
+      val entityId = "entity-1"
+      val pid = PersistenceId(entityType, entityId)
+
+      val persister = testKit.spawn(TestActors.Persister(pid, tags = 
Set("tag")))
+      persister ! Persister.PersistWithAck("e-1", probe.ref)
+      probe.expectMessage(Done)
+
+      val events = query
+        .currentEventsByPersistenceIdTyped[String](pid.id, 0L, Long.MaxValue)
+        .runWith(Sink.seq)
+        .futureValue
+
+      events should have size 1
+      events.head.tags should ===(Set("tag"))
+
+      val event = query
+        .eventsByPersistenceIdTyped[String](pid.id, 0L, Long.MaxValue)
+        .runWith(Sink.head)
+        .futureValue
+      event.tags should ===(Set("tag"))
+    }
+  }
+
   "Live query" should {
     "pick up new events" in {
       val pid = nextPid()
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
index 70ac26c..c723cd2 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
@@ -18,6 +18,7 @@ import java.time.{ Duration => JDuration }
 
 import scala.concurrent.Await
 import scala.concurrent.duration._
+import scala.collection.immutable
 
 import org.apache.pekko
 import pekko.Done
@@ -26,6 +27,7 @@ import pekko.actor.testkit.typed.scaladsl.LoggingTestKit
 import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
 import pekko.actor.typed.ActorSystem
 import pekko.actor.typed.internal.pubsub.TopicImpl
+import pekko.persistence.Persistence
 import pekko.persistence.query.NoOffset
 import pekko.persistence.query.PersistenceQuery
 import pekko.persistence.query.TimestampOffset
@@ -305,6 +307,43 @@ class EventsBySlicePubSubSpec
 
     }
 
+    "group slices into topics" in new Setup {
+
+      val numberOfTopics =
+        
typedSystem.settings.config.getInt("pekko.persistence.r2dbc.journal.publish-events-number-of-topics")
+      val querySliceRanges = 
Persistence(typedSystem).sliceRanges(numberOfTopics * 2)
+      val queries: 
immutable.IndexedSeq[TestSubscriber.Probe[EventEnvelope[String]]] = {
+        querySliceRanges.map { range =>
+          query
+            .eventsBySlices[String](setupEntityType, range.min, range.max, 
NoOffset)
+            .runWith(sinkProbe)
+            .request(100)
+        }
+      }
+
+      val topicStatsProbe = createTestProbe[TopicImpl.TopicStats]()
+      eventually {
+        (0 until 1024).foreach { i =>
+          withClue(s"slice $i: ") {
+            PubSub(typedSystem).eventTopic[String](setupEntityType, i) ! 
TopicImpl.GetTopicStats(topicStatsProbe.ref)
+            topicStatsProbe.receiveMessage().localSubscriberCount shouldBe 2
+          }
+        }
+      }
+
+      for (i <- 1 to 10) {
+        persister ! PersistWithAck(s"e-$i", probe.ref)
+        probe.expectMessage(Done)
+      }
+
+      for (i <- 1 to 10) {
+        val queryIndex = 
querySliceRanges.indexOf(querySliceRanges.find(_.contains(slice)).get)
+        queries(queryIndex).expectNext().event shouldBe s"e-$i"
+      }
+
+      queries.foreach(_.cancel())
+    }
+
   }
 
 }
diff --git a/ddl-scripts/create_tables_mysql.sql 
b/ddl-scripts/create_tables_mysql.sql
index 67554e7..e006404 100644
--- a/ddl-scripts/create_tables_mysql.sql
+++ b/ddl-scripts/create_tables_mysql.sql
@@ -29,7 +29,7 @@ CREATE TABLE IF NOT EXISTS event_journal(
   deleted BOOLEAN DEFAULT FALSE NOT NULL,
   writer VARCHAR(255) NOT NULL,
   adapter_manifest VARCHAR(255),
-  tags TEXT, -- FIXME no array type, is this the best option?
+  tags JSON, -- stored as a JSON array of strings
 
   meta_ser_id INTEGER,
   meta_ser_manifest VARCHAR(255),
@@ -67,7 +67,7 @@ CREATE TABLE IF NOT EXISTS durable_state (
   state_ser_id INTEGER NOT NULL,
   state_ser_manifest VARCHAR(255),
   state_payload BLOB NOT NULL,
-  tags TEXT, -- FIXME no array type, is this the best option?
+  tags JSON, -- stored as a JSON array of strings
 
   PRIMARY KEY(persistence_id, revision)
 );
diff --git a/docs/src/main/paradox/query.md b/docs/src/main/paradox/query.md
index 5eb621d..3896179 100644
--- a/docs/src/main/paradox/query.md
+++ b/docs/src/main/paradox/query.md
@@ -112,6 +112,8 @@ Disable publishing of events with configuration:
 pekko.persistence.r2dbc.journal.publish-events = off
 ```
 
+If you use many queries or Projection instances you should consider adjusting 
the `pekko.persistence.r2dbc.journal.publish-events-number-of-topics` 
configuration, see @ref:[Configuration](#configuration).
+
 ## Durable state queries
 
 @apidoc[R2dbcDurableStateStore] implements the following @extref:[Persistence 
Queries](pekko:durable-state/persistence-query.html):


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to