This is an automated email from the ASF dual-hosted git repository.

pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-r2dbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 55726a1  Port akka PRs 373, 377, 382: OffsetPidSeqNr, tags in events, 
pubsub tags (#376)
55726a1 is described below

commit 55726a1275446fad1be07870e7ed35c8b37f064e
Author: PJ Fanning <[email protected]>
AuthorDate: Fri May 15 14:16:45 2026 +0100

    Port akka PRs 373, 377, 382: OffsetPidSeqNr, tags in events, pubsub tags 
(#376)
    
    * Port akka PRs 373, 377, 382: OffsetPidSeqNr, tags in events, pubsub tags
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/0dd65fbd-cc87-4ea9-a5ce-a6ab564ecbb6
    
    Co-authored-by: pjfanning <[email protected]>
    
    * Rename event variable to eventPayload in PubSub.scala for clarity
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/0dd65fbd-cc87-4ea9-a5ce-a6ab564ecbb6
    
    Co-authored-by: pjfanning <[email protected]>
    
    * scalafmt
    
    * Update R2dbcProjectionImpl.scala
    
    * issue with tags column
    
    * scalafmt
    
    * Update MySQLQueryDao.scala
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 .../r2dbc/internal/EventsByPersistenceIdDao.scala  |  11 +-
 .../pekko/persistence/r2dbc/internal/PubSub.scala  |  18 +-
 .../persistence/r2dbc/journal/R2dbcJournal.scala   |  17 +-
 .../r2dbc/query/javadsl/R2dbcReadJournal.scala     |  26 ++-
 .../r2dbc/query/scaladsl/QueryDao.scala            |  52 ++---
 .../r2dbc/query/scaladsl/R2dbcReadJournal.scala    |  60 ++++--
 .../r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala |  35 ++--
 .../projection/r2dbc/internal/OffsetPidSeqNr.scala |  32 +++
 .../r2dbc/internal/R2dbcOffsetStore.scala          |  36 ++--
 .../r2dbc/internal/R2dbcProjectionImpl.scala       |  49 ++++-
 .../projection/r2dbc/R2dbcOffsetStoreSpec.scala    |  70 +++++--
 .../r2dbc/R2dbcTimestampOffsetStoreSpec.scala      | 214 +++++++++++++++------
 12 files changed, 435 insertions(+), 185 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/EventsByPersistenceIdDao.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/EventsByPersistenceIdDao.scala
index 4099091..3664e92 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/EventsByPersistenceIdDao.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/EventsByPersistenceIdDao.scala
@@ -36,6 +36,11 @@ private[r2dbc] object EventsByPersistenceIdDao {
   private val log = LoggerFactory.getLogger(classOf[EventsByPersistenceIdDao])
 
   private final case class ByPersistenceIdState(queryCount: Int, rowCount: 
Int, latestSeqNr: Long)
+
+  private def setFromDb[T](array: Array[T]): Set[T] = array match {
+    case null    => Set.empty[T]
+    case entries => entries.toSet
+  }
 }
 
 /**
@@ -57,7 +62,7 @@ private[r2dbc] trait EventsByPersistenceIdDao {
   protected def settings: BufferSize
 
   private lazy val selectEventsSql = sql"""
-    SELECT slice, entity_type, persistence_id, seq_nr, db_timestamp, 
$statementTimestampSql AS read_db_timestamp, event_ser_id, event_ser_manifest, 
event_payload, writer, adapter_manifest, meta_ser_id, meta_ser_manifest, 
meta_payload
+    SELECT slice, entity_type, persistence_id, seq_nr, db_timestamp, 
$statementTimestampSql AS read_db_timestamp, event_ser_id, event_ser_manifest, 
event_payload, writer, adapter_manifest, meta_ser_id, meta_ser_manifest, 
meta_payload, tags
     from $journalTable
     WHERE persistence_id = ? AND seq_nr >= ? AND seq_nr <= ?
     AND deleted = false
@@ -67,7 +72,7 @@ private[r2dbc] trait EventsByPersistenceIdDao {
   /**
    * INTERNAL API: Used by both journal replay and currentEventsByPersistenceId
    */
-  @InternalApi private[r2dbc] def internalEventsByPersistenceId(
+  @InternalApi private[r2dbc] def internalCurrentEventsByPersistenceId(
       persistenceId: String,
       fromSequenceNr: Long,
       toSequenceNr: Long): Source[SerializedJournalRow, NotUsed] = {
@@ -140,7 +145,7 @@ private[r2dbc] trait EventsByPersistenceIdDao {
           serId = row.get[Integer]("event_ser_id", classOf[Integer]),
           serManifest = row.get("event_ser_manifest", classOf[String]),
           writerUuid = row.get("writer", classOf[String]),
-          tags = Set.empty, // tags not fetched in queries (yet)
+          tags = setFromDb(row.get("tags", classOf[Array[String]])),
           metadata = readMetadata(row)))
 
     if (log.isDebugEnabled)
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/PubSub.scala 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/PubSub.scala
index 6d901d2..84db63d 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/PubSub.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/PubSub.scala
@@ -108,26 +108,26 @@ import org.slf4j.LoggerFactory
       val slice = persistenceExt.sliceForPersistenceId(pid)
 
       val offset = TimestampOffset(timestamp, timestamp, Map(pid -> 
pr.sequenceNr))
-      val payload =
-        pr.payload match {
-          case Tagged(payload, _) =>
-            // eventsByTag not implemented (see issue #82), but events can 
still be tagged, so we unwrap this tagged event.
-            payload
 
-          case other => other
-        }
+      val (eventPayload, tags) = pr.payload match {
+        case Tagged(payload, tags) =>
+          (payload, tags)
+        case other =>
+          (other, Set.empty[String])
+      }
 
       val envelope = new EventEnvelope(
         offset,
         pid,
         pr.sequenceNr,
-        Option(payload),
+        Option(eventPayload),
         timestamp.toEpochMilli,
         pr.metadata,
         entityType,
         slice,
         filtered = false,
-        source = EnvelopeOrigin.SourcePubSub)
+        source = EnvelopeOrigin.SourcePubSub,
+        tags)
       eventTopic(entityType, slice) ! Topic.Publish(envelope)
     }
   }
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala
index a95b0b5..34713ac 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala
@@ -100,8 +100,6 @@ private[r2dbc] final class R2dbcJournal(config: Config, 
cfgPath: String) extends
   // them to complete before we can read the highest sequence number or we 
will miss it
   private val writesInProgress = new java.util.HashMap[String, Future[_]]()
 
-  private var eventsByTagNotImplementedLogged = false
-
   override def receivePluginInternal: Receive = { case WriteFinished(pid, f) =>
     writesInProgress.remove(pid, f)
   }
@@ -113,8 +111,6 @@ private[r2dbc] final class R2dbcJournal(config: Config, 
cfgPath: String) extends
         atomicWrite.payload.map { pr =>
           val (event, tags) = pr.payload match {
             case Tagged(payload, tags) =>
-              // eventsBytag not implemented, issue #82, but they are stored
-              logEventsByTagsNotImplemented()
               (payload.asInstanceOf[AnyRef], tags)
             case other =>
               (other.asInstanceOf[AnyRef], Set.empty[String])
@@ -197,17 +193,6 @@ private[r2dbc] final class R2dbcJournal(config: Config, 
cfgPath: String) extends
         dbTimestamp.map(_ => Done)(ExecutionContext.parasitic)
     }
 
-  private def logEventsByTagsNotImplemented(): Unit = {
-    if (!eventsByTagNotImplementedLogged) {
-      eventsByTagNotImplementedLogged = true
-      log.info(
-        "eventsByTag query not implemented by pekko-persistence-r2dbc. We 
recommend using eventsBySlices instead. " +
-        "The given tags are stored. " +
-        "eventsByTag may be implemented in the future if there is strong 
demand for it. " +
-        "Let us know in 
https://github.com/akka/akka-persistence-r2dbc/issues/82";)
-    }
-  }
-
   override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: 
Long): Future[Unit] = {
     log.debug("asyncDeleteMessagesTo persistenceId [{}], toSequenceNr [{}]", 
persistenceId, toSequenceNr)
     journalDao.deleteMessagesTo(persistenceId, toSequenceNr)
@@ -220,7 +205,7 @@ private[r2dbc] final class R2dbcJournal(config: Config, 
cfgPath: String) extends
       if (max == Long.MaxValue) toSequenceNr
       else math.min(toSequenceNr, fromSequenceNr + max - 1)
     journalDao
-      .internalEventsByPersistenceId(persistenceId, fromSequenceNr, 
effectiveToSequenceNr)
+      .internalCurrentEventsByPersistenceId(persistenceId, fromSequenceNr, 
effectiveToSequenceNr)
       .runWith(Sink.foreach { row =>
         val repr = deserializeRow(serialization, row)
         recoveryCallback(repr)
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/javadsl/R2dbcReadJournal.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/javadsl/R2dbcReadJournal.scala
index 184b689..0be7612 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/javadsl/R2dbcReadJournal.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/javadsl/R2dbcReadJournal.scala
@@ -29,10 +29,14 @@ import pekko.persistence.query.{ EventEnvelope => 
ClassicEventEnvelope }
 import pekko.persistence.query.Offset
 import pekko.persistence.query.javadsl._
 import pekko.persistence.query.typed.EventEnvelope
-import pekko.persistence.query.typed.javadsl.CurrentEventsBySliceQuery
-import pekko.persistence.query.typed.javadsl.EventTimestampQuery
-import pekko.persistence.query.typed.javadsl.EventsBySliceQuery
-import pekko.persistence.query.typed.javadsl.LoadEventQuery
+import pekko.persistence.query.typed.javadsl.{
+  CurrentEventsByPersistenceIdTypedQuery,
+  CurrentEventsBySliceQuery,
+  EventTimestampQuery,
+  EventsByPersistenceIdTypedQuery,
+  EventsBySliceQuery,
+  LoadEventQuery
+}
 import pekko.persistence.r2dbc.query.scaladsl
 import pekko.stream.javadsl.Source
 
@@ -47,7 +51,9 @@ final class R2dbcReadJournal(delegate: 
scaladsl.R2dbcReadJournal)
     with EventTimestampQuery
     with LoadEventQuery
     with CurrentEventsByPersistenceIdQuery
+    with CurrentEventsByPersistenceIdTypedQuery
     with EventsByPersistenceIdQuery
+    with EventsByPersistenceIdTypedQuery
     with CurrentPersistenceIdsQuery
     with PagedPersistenceIdsQuery {
 
@@ -82,12 +88,24 @@ final class R2dbcReadJournal(delegate: 
scaladsl.R2dbcReadJournal)
       toSequenceNr: Long): Source[ClassicEventEnvelope, NotUsed] =
     delegate.currentEventsByPersistenceId(persistenceId, fromSequenceNr, 
toSequenceNr).asJava
 
+  override def currentEventsByPersistenceIdTyped[Event](
+      persistenceId: String,
+      fromSequenceNr: Long,
+      toSequenceNr: Long): Source[EventEnvelope[Event], NotUsed] =
+    delegate.currentEventsByPersistenceIdTyped[Event](persistenceId, 
fromSequenceNr, toSequenceNr).asJava
+
   override def eventsByPersistenceId(
       persistenceId: String,
       fromSequenceNr: Long,
       toSequenceNr: Long): Source[ClassicEventEnvelope, NotUsed] =
     delegate.eventsByPersistenceId(persistenceId, fromSequenceNr, 
toSequenceNr).asJava
 
+  override def eventsByPersistenceIdTyped[Event](
+      persistenceId: String,
+      fromSequenceNr: Long,
+      toSequenceNr: Long): Source[EventEnvelope[Event], NotUsed] =
+    delegate.eventsByPersistenceIdTyped[Event](persistenceId, fromSequenceNr, 
toSequenceNr).asJava
+
   override def currentPersistenceIds(): Source[String, NotUsed] =
     delegate.currentPersistenceIds().asJava
 
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
index 49d70b6..b79f7ac 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
@@ -47,6 +47,11 @@ import org.slf4j.LoggerFactory
 object QueryDao {
   val log: Logger = LoggerFactory.getLogger(classOf[QueryDao])
 
+  private def setFromDb[T](array: Array[T]): Set[T] = array match {
+    case null    => Set.empty[T]
+    case entries => entries.toSet
+  }
+
   def fromConfig(
       settings: QuerySettings,
       config: Config
@@ -71,6 +76,7 @@ private[r2dbc] class QueryDao(val settings: QuerySettings, 
connectionFactory: Co
     with EventsByPersistenceIdDao with HighestSequenceNrDao {
   import JournalDao.readMetadata
   import QueryDao.log
+  import QueryDao.setFromDb
 
   implicit protected val dialect: Dialect = settings.dialect
   protected lazy val statementTimestampSql: String = "statement_timestamp()"
@@ -80,38 +86,36 @@ private[r2dbc] class QueryDao(val settings: QuerySettings, 
connectionFactory: Co
   private val currentDbTimestampSql =
     "SELECT transaction_timestamp() AS db_timestamp"
 
+  protected def selectColumns(backtracking: Boolean): String =
+    if (backtracking)
+      "SELECT slice, persistence_id, seq_nr, db_timestamp, 
statement_timestamp() AS read_db_timestamp, tags "
+    else
+      "SELECT slice, persistence_id, seq_nr, db_timestamp, 
statement_timestamp() AS read_db_timestamp, tags, event_ser_id, 
event_ser_manifest, event_payload, meta_ser_id, meta_ser_manifest, meta_payload 
"
+
+  protected def toDbTimestampParamCondition(toDbTimestampParam: Boolean): 
String =
+    if (toDbTimestampParam) "AND db_timestamp <= ?" else ""
+
+  protected def behindCurrentTimeIntervalCondition(behindCurrentTime: 
FiniteDuration): String =
+    if (behindCurrentTime > Duration.Zero)
+      s"AND db_timestamp < transaction_timestamp() - interval 
'${behindCurrentTime.toMillis} milliseconds'"
+    else ""
+
   protected def eventsBySlicesRangeSql(
       toDbTimestampParam: Boolean,
       behindCurrentTime: FiniteDuration,
       backtracking: Boolean,
       minSlice: Int,
-      maxSlice: Int): String = {
-
-    def toDbTimestampParamCondition =
-      if (toDbTimestampParam) "AND db_timestamp <= ?" else ""
-
-    def behindCurrentTimeIntervalCondition =
-      if (behindCurrentTime > Duration.Zero)
-        s"AND db_timestamp < transaction_timestamp() - interval 
'${behindCurrentTime.toMillis} milliseconds'"
-      else ""
-
-    val selectColumns = {
-      if (backtracking)
-        "SELECT slice, persistence_id, seq_nr, db_timestamp, 
statement_timestamp() AS read_db_timestamp "
-      else
-        "SELECT slice, persistence_id, seq_nr, db_timestamp, 
statement_timestamp() AS read_db_timestamp, event_ser_id, event_ser_manifest, 
event_payload, meta_ser_id, meta_ser_manifest, meta_payload "
-    }
-
+      maxSlice: Int): String =
     sql"""
-      $selectColumns
+      ${selectColumns(backtracking)}
       FROM $journalTable
       WHERE entity_type = ?
       AND ${sliceCondition(minSlice, maxSlice)}
-      AND db_timestamp >= ? $toDbTimestampParamCondition 
$behindCurrentTimeIntervalCondition
+      AND db_timestamp >= ? ${toDbTimestampParamCondition(toDbTimestampParam)} 
${behindCurrentTimeIntervalCondition(
+        behindCurrentTime)}
       AND deleted = false
       ORDER BY db_timestamp, seq_nr
       LIMIT ?"""
-  }
 
   private def sliceCondition(minSlice: Int, maxSlice: Int): String = {
     settings.dialect match {
@@ -138,7 +142,7 @@ private[r2dbc] class QueryDao(val settings: QuerySettings, 
connectionFactory: Co
     WHERE persistence_id = ? AND seq_nr = ? AND deleted = false"""
 
   private val selectOneEventSql = sql"""
-    SELECT slice, entity_type, db_timestamp, $statementTimestampSql AS 
read_db_timestamp, event_ser_id, event_ser_manifest, event_payload, 
meta_ser_id, meta_ser_manifest, meta_payload
+    SELECT slice, entity_type, db_timestamp, $statementTimestampSql AS 
read_db_timestamp, event_ser_id, event_ser_manifest, event_payload, 
meta_ser_id, meta_ser_manifest, meta_payload, tags
     FROM $journalTable
     WHERE persistence_id = ? AND seq_nr = ? AND deleted = false"""
 
@@ -210,7 +214,7 @@ private[r2dbc] class QueryDao(val settings: QuerySettings, 
connectionFactory: Co
             serId = 0,
             serManifest = "",
             writerUuid = "", // not need in this query
-            tags = Set.empty, // tags not fetched in queries (yet)
+            tags = Set.empty, // tags not fetched in backtracking
             metadata = None)
         else
           SerializedJournalRow(
@@ -224,7 +228,7 @@ private[r2dbc] class QueryDao(val settings: QuerySettings, 
connectionFactory: Co
             serId = row.get[Integer]("event_ser_id", classOf[Integer]),
             serManifest = row.get("event_ser_manifest", classOf[String]),
             writerUuid = "", // not need in this query
-            tags = Set.empty, // tags not fetched in queries (yet)
+            tags = setFromDb(row.get("tags", classOf[Array[String]])),
             metadata = readMetadata(row)))
 
     if (log.isDebugEnabled)
@@ -310,7 +314,7 @@ private[r2dbc] class QueryDao(val settings: QuerySettings, 
connectionFactory: Co
           serId = row.get[Integer]("event_ser_id", classOf[Integer]),
           serManifest = row.get("event_ser_manifest", classOf[String]),
           writerUuid = "", // not need in this query
-          tags = Set.empty, // tags not fetched in queries (yet)
+          tags = setFromDb(row.get("tags", classOf[Array[String]])),
           metadata = readMetadata(row)))
 
   def persistenceIds(entityType: String, afterId: Option[String], limit: 
Long): Source[String, NotUsed] = {
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
index 3ead172..a3e9ab3 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
@@ -32,10 +32,14 @@ import pekko.persistence.query.Offset
 import pekko.persistence.query.TimestampOffset
 import pekko.persistence.query.scaladsl._
 import pekko.persistence.query.typed.EventEnvelope
-import pekko.persistence.query.typed.scaladsl.CurrentEventsBySliceQuery
-import pekko.persistence.query.typed.scaladsl.EventTimestampQuery
-import pekko.persistence.query.typed.scaladsl.EventsBySliceQuery
-import pekko.persistence.query.typed.scaladsl.LoadEventQuery
+import pekko.persistence.query.typed.scaladsl.{
+  CurrentEventsByPersistenceIdTypedQuery,
+  CurrentEventsBySliceQuery,
+  EventTimestampQuery,
+  EventsByPersistenceIdTypedQuery,
+  EventsBySliceQuery,
+  LoadEventQuery
+}
 import pekko.persistence.query.{ EventEnvelope => ClassicEventEnvelope }
 import pekko.persistence.r2dbc.QuerySettings
 import pekko.persistence.r2dbc.internal.BySliceQuery
@@ -65,7 +69,9 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, 
config: Config, cfgPat
     with EventTimestampQuery
     with LoadEventQuery
     with CurrentEventsByPersistenceIdQuery
+    with CurrentEventsByPersistenceIdTypedQuery
     with EventsByPersistenceIdQuery
+    with EventsByPersistenceIdTypedQuery
     with CurrentPersistenceIdsQuery
     with PagedPersistenceIdsQuery {
   import R2dbcReadJournal.ByPersistenceIdState
@@ -96,7 +102,8 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, 
config: Config, cfgPat
         row.entityType,
         row.slice,
         filtered = false,
-        source)
+        source,
+        tags = row.tags)
     }
 
     val extractOffset: EventEnvelope[Any] => TimestampOffset = env => 
env.offset.asInstanceOf[TimestampOffset]
@@ -229,7 +236,25 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, 
config: Config, cfgPat
   override def currentEventsByPersistenceId(
       persistenceId: String,
       fromSequenceNr: Long,
-      toSequenceNr: Long): Source[ClassicEventEnvelope, NotUsed] = {
+      toSequenceNr: Long): Source[ClassicEventEnvelope, NotUsed] =
+    internalCurrentEventsByPersistenceId(persistenceId, fromSequenceNr, 
toSequenceNr)
+      .map(deserializeRow)
+
+  override def currentEventsByPersistenceIdTyped[Event](
+      persistenceId: String,
+      fromSequenceNr: Long,
+      toSequenceNr: Long): Source[EventEnvelope[Event], NotUsed] =
+    internalCurrentEventsByPersistenceId(persistenceId, fromSequenceNr, 
toSequenceNr)
+      .map(deserializeBySliceRow[Event])
+
+  /**
+   * INTERNAL API: Used by both journal replay and currentEventsByPersistenceId
+   */
+  @InternalApi private[r2dbc] def internalCurrentEventsByPersistenceId(
+      persistenceId: String,
+      fromSequenceNr: Long,
+      toSequenceNr: Long): Source[SerializedJournalRow, NotUsed] = {
+
     val highestSeqNrFut =
       if (toSequenceNr == Long.MaxValue) 
queryDao.readHighestSequenceNr(persistenceId, fromSequenceNr)
       else Future.successful(toSequenceNr)
@@ -237,10 +262,9 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, 
config: Config, cfgPat
     Source
       .futureSource[SerializedJournalRow, NotUsed] {
         highestSeqNrFut.map { highestSeqNr =>
-          queryDao.internalEventsByPersistenceId(persistenceId, 
fromSequenceNr, highestSeqNr)
+          queryDao.internalCurrentEventsByPersistenceId(persistenceId, 
fromSequenceNr, highestSeqNr)
         }
       }
-      .map(deserializeRow)
       .mapMaterializedValue(_ => NotUsed)
   }
 
@@ -264,7 +288,21 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, 
config: Config, cfgPat
   override def eventsByPersistenceId(
       persistenceId: String,
       fromSequenceNr: Long,
-      toSequenceNr: Long): Source[ClassicEventEnvelope, NotUsed] = {
+      toSequenceNr: Long): Source[ClassicEventEnvelope, NotUsed] =
+    internalEventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr)
+      .map(deserializeRow)
+
+  override def eventsByPersistenceIdTyped[Event](
+      persistenceId: String,
+      fromSequenceNr: Long,
+      toSequenceNr: Long): Source[EventEnvelope[Event], NotUsed] =
+    internalEventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr)
+      .map(deserializeBySliceRow[Event])
+
+  private def internalEventsByPersistenceId(
+      persistenceId: String,
+      fromSequenceNr: Long,
+      toSequenceNr: Long): Source[SerializedJournalRow, NotUsed] = {
 
     log.debug("Starting eventsByPersistenceId query for persistenceId [{}], 
from [{}].", persistenceId, fromSequenceNr)
 
@@ -318,7 +356,6 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, 
config: Config, cfgPat
       updateState = nextOffset,
       delayNextQuery = delayNextQuery,
       nextQuery = nextQuery)
-      .map(deserializeRow)
   }
 
   private def deserializeBySliceRow[Event](row: SerializedJournalRow): 
EventEnvelope[Event] = {
@@ -337,7 +374,8 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, 
config: Config, cfgPat
       row.entityType,
       row.slice,
       filtered = false,
-      source)
+      source,
+      tags = row.tags)
   }
 
   private def deserializeRow(row: SerializedJournalRow): ClassicEventEnvelope 
= {
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala
index dc04dda..260b5a7 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala
@@ -44,38 +44,33 @@ private[r2dbc] class MySQLQueryDao(
 
   override lazy val statementTimestampSql: String = "NOW(6)"
 
+  override protected def selectColumns(backtracking: Boolean): String =
+    if (backtracking)
+      s"SELECT slice, persistence_id, seq_nr, db_timestamp, 
$statementTimestampSql AS read_db_timestamp, tags "
+    else
+      s"SELECT slice, persistence_id, seq_nr, db_timestamp, 
$statementTimestampSql AS read_db_timestamp, tags, event_ser_id, 
event_ser_manifest, event_payload, meta_ser_id, meta_ser_manifest, meta_payload 
"
+
+  override protected def behindCurrentTimeIntervalCondition(behindCurrentTime: 
FiniteDuration): String =
+    if (behindCurrentTime > Duration.Zero)
+      s"AND db_timestamp < DATE_SUB($statementTimestampSql, INTERVAL 
'${behindCurrentTime.toMicros}' MICROSECOND)"
+    else ""
+
   override def eventsBySlicesRangeSql(
       toDbTimestampParam: Boolean,
       behindCurrentTime: FiniteDuration,
       backtracking: Boolean,
       minSlice: Int,
-      maxSlice: Int): String = {
-
-    def toDbTimestampParamCondition =
-      if (toDbTimestampParam) "AND db_timestamp <= ?" else ""
-
-    def behindCurrentTimeIntervalCondition =
-      if (behindCurrentTime > Duration.Zero)
-        s"AND db_timestamp < DATE_SUB($statementTimestampSql, INTERVAL 
'${behindCurrentTime.toMicros}' MICROSECOND)"
-      else ""
-
-    val selectColumns = {
-      if (backtracking)
-        s"SELECT slice, persistence_id, seq_nr, db_timestamp, 
$statementTimestampSql AS read_db_timestamp "
-      else
-        s"SELECT slice, persistence_id, seq_nr, db_timestamp, 
$statementTimestampSql AS read_db_timestamp, event_ser_id, event_ser_manifest, 
event_payload, meta_ser_id, meta_ser_manifest, meta_payload "
-    }
-
+      maxSlice: Int): String =
     sql"""
-      $selectColumns
+      ${selectColumns(backtracking)}
       FROM $journalTable
       WHERE entity_type = ?
       AND slice BETWEEN $minSlice AND $maxSlice
-      AND db_timestamp >= ? $toDbTimestampParamCondition 
$behindCurrentTimeIntervalCondition
+      AND db_timestamp >= ? ${toDbTimestampParamCondition(toDbTimestampParam)} 
${behindCurrentTimeIntervalCondition(
+        behindCurrentTime)}
       AND deleted = false
       ORDER BY db_timestamp, seq_nr
       LIMIT ?"""
-  }
 
   override def selectBucketsSql(minSlice: Int, maxSlice: Int): String = {
     sql"""
diff --git 
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/OffsetPidSeqNr.scala
 
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/OffsetPidSeqNr.scala
new file mode 100644
index 0000000..68edcca
--- /dev/null
+++ 
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/OffsetPidSeqNr.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 - 2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.projection.r2dbc.internal
+
+import org.apache.pekko.annotation.InternalApi
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] object OffsetPidSeqNr {
+  def apply(offset: Any, pid: String, seqNr: Long): OffsetPidSeqNr =
+    new OffsetPidSeqNr(offset, Some(pid -> seqNr))
+
+  def apply(offset: Any): OffsetPidSeqNr =
+    new OffsetPidSeqNr(offset, None)
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] final case class OffsetPidSeqNr(offset: Any, 
pidSeqNr: Option[(String, Long)])
diff --git 
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
 
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
index 01d26c8..061b9da 100644
--- 
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
+++ 
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
@@ -415,7 +415,7 @@ private[projection] class R2dbcOffsetStore(
   /**
    * Like saveOffsetInTx, but in own transaction. Used by atLeastOnce.
    */
-  def saveOffset[Offset](offset: Offset): Future[Done] = {
+  def saveOffset(offset: OffsetPidSeqNr): Future[Done] = {
     r2dbcExecutor
       .withConnection("save offset") { conn =>
         saveOffsetInTx(conn, offset)
@@ -426,18 +426,19 @@ private[projection] class R2dbcOffsetStore(
   /**
    * This method is used together with the users' handler code and run in same 
transaction.
    */
-  def saveOffsetInTx[Offset](conn: Connection, offset: Offset): Future[Done] = 
{
+  def saveOffsetInTx(conn: Connection, offset: OffsetPidSeqNr): Future[Done] = 
{
     offset match {
-      case t: TimestampOffset =>
-        // TODO possible perf improvement to optimize for the normal case of 1 
record
-        val records = t.seen.map { case (pid, seqNr) => Record(pid, seqNr, 
t.timestamp) }.toVector
-        saveTimestampOffsetInTx(conn, records)
+      case OffsetPidSeqNr(t: TimestampOffset, Some((pid, seqNr))) =>
+        val record = Record(pid, seqNr, t.timestamp)
+        saveTimestampOffsetInTx(conn, Vector(record))
+      case OffsetPidSeqNr(_: TimestampOffset, None) =>
+        throw new IllegalArgumentException("Required EventEnvelope or 
DurableStateChange for TimestampOffset.")
       case _ =>
-        savePrimitiveOffsetInTx(conn, offset)
+        savePrimitiveOffsetInTx(conn, offset.offset)
     }
   }
 
-  def saveOffsets[Offset](offsets: immutable.IndexedSeq[Offset]): Future[Done] 
= {
+  def saveOffsets(offsets: immutable.IndexedSeq[OffsetPidSeqNr]): Future[Done] 
= {
     r2dbcExecutor
       .withConnection("save offsets") { conn =>
         saveOffsetsInTx(conn, offsets)
@@ -445,17 +446,22 @@ private[projection] class R2dbcOffsetStore(
       .map(_ => Done)(ExecutionContext.parasitic)
   }
 
-  def saveOffsetsInTx[Offset](conn: Connection, offsets: 
immutable.IndexedSeq[Offset]): Future[Done] = {
-    if (offsets.exists(_.isInstanceOf[TimestampOffset])) {
-      val records = offsets.flatMap {
-        case t: TimestampOffset =>
-          t.seen.map { case (pid, seqNr) => Record(pid, seqNr, t.timestamp) }
+  def saveOffsetsInTx(conn: Connection, offsets: 
immutable.IndexedSeq[OffsetPidSeqNr]): Future[Done] = {
+    if (offsets.isEmpty)
+      FutureDone
+    else if (offsets.head.offset.isInstanceOf[TimestampOffset]) {
+      val records = offsets.map {
+        case OffsetPidSeqNr(t: TimestampOffset, Some((pid, seqNr))) =>
+          Record(pid, seqNr, t.timestamp)
+        case OffsetPidSeqNr(_: TimestampOffset, None) =>
+          throw new IllegalArgumentException("Required EventEnvelope or 
DurableStateChange for TimestampOffset.")
         case _ =>
-          Nil
+          throw new IllegalArgumentException(
+            "Mix of TimestampOffset and other offset type in same transaction 
is not supported")
       }
       saveTimestampOffsetInTx(conn, records)
     } else {
-      savePrimitiveOffsetInTx(conn, offsets.last)
+      savePrimitiveOffsetInTx(conn, offsets.last.offset)
     }
   }
 
diff --git 
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala
 
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala
index ce2ff55..3d7d136 100644
--- 
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala
+++ 
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala
@@ -28,6 +28,7 @@ import pekko.annotation.InternalApi
 import pekko.event.Logging
 import pekko.event.LoggingAdapter
 import pekko.persistence.query.DeletedDurableState
+import pekko.persistence.query.DurableStateChange
 import pekko.persistence.query.UpdatedDurableState
 import pekko.persistence.query.typed.EventEnvelope
 import pekko.persistence.query.typed.scaladsl.LoadEventQuery
@@ -158,6 +159,25 @@ private[projection] object R2dbcProjectionImpl {
     }
   }
 
+  private def extractOffsetPidSeqNr[Offset, Envelope](
+      sourceProvider: SourceProvider[Offset, Envelope],
+      envelope: Envelope): OffsetPidSeqNr =
+    extractOffsetPidSeqNr(sourceProvider.extractOffset(envelope), envelope)
+
+  private def extractOffsetPidSeqNr[Offset, Envelope](offset: Offset, 
envelope: Envelope): OffsetPidSeqNr = {
+    // we could define a new trait for the SourceProvider to implement this in 
case other (custom) envelope types are needed
+    envelope match {
+      case env: EventEnvelope[_]         => OffsetPidSeqNr(offset, 
env.persistenceId, env.sequenceNr)
+      case chg: UpdatedDurableState[_]   => OffsetPidSeqNr(offset, 
chg.persistenceId, chg.revision)
+      case del: DeletedDurableState[_]   => OffsetPidSeqNr(offset, 
del.persistenceId, del.revision)
+      case change: DurableStateChange[_] =>
+        // in case additional types are added
+        throw new IllegalArgumentException(
+          s"DurableStateChange [${change.getClass.getName}] not implemented 
yet. Please report bug at 
https://github.com/apache/pekko-persistence-r2dbc/issues";)
+      case _ => OffsetPidSeqNr(offset)
+    }
+  }
+
   private[projection] def adaptedHandlerForExactlyOnce[Offset, Envelope](
       sourceProvider: SourceProvider[Offset, Envelope],
       handlerFactory: () => R2dbcHandler[Envelope],
@@ -169,11 +189,11 @@ private[projection] object R2dbcProjectionImpl {
           offsetStore.isAccepted(envelope).flatMap {
             case true =>
               if (isFilteredEvent(envelope)) {
-                val offset = sourceProvider.extractOffset(envelope)
+                val offset = extractOffsetPidSeqNr(sourceProvider, envelope)
                 offsetStore.saveOffset(offset)
               } else {
                 loadEnvelope(envelope, sourceProvider).flatMap { 
loadedEnvelope =>
-                  val offset = sourceProvider.extractOffset(loadedEnvelope)
+                  val offset = extractOffsetPidSeqNr(sourceProvider, 
loadedEnvelope)
                   r2dbcExecutor.withConnection("exactly-once handler") { conn 
=>
                     // run users handler
                     val session = new R2dbcSession(conn)
@@ -208,7 +228,7 @@ private[projection] object R2dbcProjectionImpl {
           } else {
             Future.sequence(acceptedEnvelopes.map(env => loadEnvelope(env, 
sourceProvider))).flatMap {
               loadedEnvelopes =>
-                val offsets = 
loadedEnvelopes.iterator.map(sourceProvider.extractOffset).toVector
+                val offsets = 
loadedEnvelopes.iterator.map(extractOffsetPidSeqNr(sourceProvider, _)).toVector
                 val filteredEnvelopes = 
loadedEnvelopes.filterNot(isFilteredEvent)
                 if (filteredEnvelopes.isEmpty) {
                   offsetStore.saveOffsets(offsets)
@@ -512,8 +532,12 @@ private[projection] class R2dbcProjectionImpl[Offset, 
Envelope](
       offsetStore.readOffset()
 
     // Called from InternalProjectionState.saveOffsetAndReport
-    override def saveOffset(projectionId: ProjectionId, offset: Offset): 
Future[Done] =
-      offsetStore.saveOffset(offset)
+    override def saveOffset(projectionId: ProjectionId, offset: Offset): 
Future[Done] = {
+      // need the envelope to be able to call offsetStore.saveOffset
+      // FIXME maybe we can cleanup this mess when moving R2dbcProjection to 
the Pekko Projections repository?
+      throw new IllegalStateException(
+        "Unexpected call to saveOffset. It should have called 
saveOffsetAndReport. Please report bug at 
https://github.com/apache/pekko-persistence-r2dbc/issues";)
+    }
 
     override protected def saveOffsetAndReport(
         projectionId: ProjectionId,
@@ -523,7 +547,18 @@ private[projection] class R2dbcProjectionImpl[Offset, 
Envelope](
       val envelope = projectionContext.envelope
 
       if (offsetStore.isInflight(envelope) || isExactlyOnceWithSkip) {
-        super.saveOffsetAndReport(projectionId, projectionContext, batchSize)
+        val offset = 
R2dbcProjectionImpl.extractOffsetPidSeqNr(projectionContext.offset, envelope)
+        offsetStore
+          .saveOffset(offset)
+          .map { done =>
+            try {
+              statusObserver.offsetProgress(projectionId, envelope)
+            } catch {
+              case NonFatal(_) => // ignore
+            }
+            getTelemetry().onOffsetStored(batchSize)
+            done
+          }
       } else {
         FutureDone
       }
@@ -547,7 +582,7 @@ private[projection] class R2dbcProjectionImpl[Offset, 
Envelope](
       if (acceptedContexts.isEmpty) {
         FutureDone
       } else {
-        val offsets = acceptedContexts.map(_.offset)
+        val offsets = acceptedContexts.map(ctx => 
R2dbcProjectionImpl.extractOffsetPidSeqNr(ctx.offset, ctx.envelope))
         offsetStore
           .saveOffsets(offsets)
           .map { done =>
diff --git 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreSpec.scala
 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreSpec.scala
index ccd63ff..9575e5d 100644
--- 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreSpec.scala
+++ 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreSpec.scala
@@ -28,6 +28,7 @@ import 
pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
 import pekko.projection.MergeableOffset
 import pekko.projection.ProjectionId
 import pekko.projection.internal.ManagementState
+import pekko.projection.r2dbc.internal.OffsetPidSeqNr
 import pekko.projection.r2dbc.internal.R2dbcOffsetStore
 import org.scalatest.wordspec.AnyWordSpecLike
 
@@ -74,12 +75,14 @@ class R2dbcOffsetStoreSpec
     "save and read offsets" in {
       val projectionId = genRandomProjectionId()
       val offsetStore = createOffsetStore(projectionId)
+      def saveOffset(offset: Any): Unit =
+        offsetStore.saveOffset(OffsetPidSeqNr(offset)).futureValue
 
-      offsetStore.saveOffset(1L).futureValue
+      saveOffset(1L)
       val offset1 = offsetStore.readOffset[Long]()
       offset1.futureValue shouldBe Some(1L)
 
-      offsetStore.saveOffset(2L).futureValue
+      saveOffset(2L)
       val offset2 = offsetStore.readOffset[Long]()
       offset2.futureValue shouldBe Some(2L) // yep, saveOffset overwrites 
previous
     }
@@ -87,7 +90,10 @@ class R2dbcOffsetStoreSpec
     "save and retrieve offsets of type Long" in {
       val projectionId = genRandomProjectionId()
       val offsetStore = createOffsetStore(projectionId)
-      offsetStore.saveOffset(1L).futureValue
+      def saveOffset(offset: Any): Unit =
+        offsetStore.saveOffset(OffsetPidSeqNr(offset)).futureValue
+
+      saveOffset(1L)
       val offset = offsetStore.readOffset[Long]()
       offset.futureValue shouldBe Some(1L)
     }
@@ -95,7 +101,10 @@ class R2dbcOffsetStoreSpec
     "save and retrieve offsets of type java.lang.Long" in {
       val projectionId = genRandomProjectionId()
       val offsetStore = createOffsetStore(projectionId)
-      offsetStore.saveOffset(java.lang.Long.valueOf(1L)).futureValue
+      def saveOffset(offset: Any): Unit =
+        offsetStore.saveOffset(OffsetPidSeqNr(offset)).futureValue
+
+      saveOffset(java.lang.Long.valueOf(1L))
       val offset = offsetStore.readOffset[java.lang.Long]()
       offset.futureValue shouldBe Some(1L)
     }
@@ -103,7 +112,10 @@ class R2dbcOffsetStoreSpec
     "save and retrieve offsets of type Int" in {
       val projectionId = genRandomProjectionId()
       val offsetStore = createOffsetStore(projectionId)
-      offsetStore.saveOffset(1).futureValue
+      def saveOffset(offset: Any): Unit =
+        offsetStore.saveOffset(OffsetPidSeqNr(offset)).futureValue
+
+      saveOffset(1)
       val offset = offsetStore.readOffset[Int]()
       offset.futureValue shouldBe Some(1)
     }
@@ -111,7 +123,10 @@ class R2dbcOffsetStoreSpec
     "save and retrieve offsets of type java.lang.Integer" in {
       val projectionId = genRandomProjectionId()
       val offsetStore = createOffsetStore(projectionId)
-      offsetStore.saveOffset(java.lang.Integer.valueOf(1)).futureValue
+      def saveOffset(offset: Any): Unit =
+        offsetStore.saveOffset(OffsetPidSeqNr(offset)).futureValue
+
+      saveOffset(java.lang.Integer.valueOf(1))
       val offset = offsetStore.readOffset[java.lang.Integer]()
       offset.futureValue shouldBe Some(1)
     }
@@ -119,8 +134,11 @@ class R2dbcOffsetStoreSpec
     "save and retrieve offsets of type String" in {
       val projectionId = genRandomProjectionId()
       val offsetStore = createOffsetStore(projectionId)
+      def saveOffset(offset: Any): Unit =
+        offsetStore.saveOffset(OffsetPidSeqNr(offset)).futureValue
+
       val randOffset = UUID.randomUUID().toString
-      offsetStore.saveOffset(randOffset).futureValue
+      saveOffset(randOffset)
       val offset = offsetStore.readOffset[String]()
       offset.futureValue shouldBe Some(randOffset)
     }
@@ -128,8 +146,11 @@ class R2dbcOffsetStoreSpec
     "save and retrieve offsets of type pekko.persistence.query.Sequence" in {
       val projectionId = genRandomProjectionId()
       val offsetStore = createOffsetStore(projectionId)
+      def saveOffset(offset: Any): Unit =
+        offsetStore.saveOffset(OffsetPidSeqNr(offset)).futureValue
+
       val seqOffset = Sequence(1L)
-      offsetStore.saveOffset(seqOffset).futureValue
+      saveOffset(seqOffset)
       val offset = offsetStore.readOffset[Sequence]()
       offset.futureValue shouldBe Some(seqOffset)
     }
@@ -137,10 +158,12 @@ class R2dbcOffsetStoreSpec
     "save and retrieve offsets of type pekko.persistence.query.TimeBasedUUID" 
in {
       val projectionId = genRandomProjectionId()
       val offsetStore = createOffsetStore(projectionId)
+      def saveOffset(offset: Any): Unit =
+        offsetStore.saveOffset(OffsetPidSeqNr(offset)).futureValue
 
       val timeUuidOffset =
         TimeBasedUUID(UUID.fromString("49225740-2019-11ea-a752-ffae2393b6e4")) 
// 2019-12-16T15:32:36.148Z[UTC]
-      offsetStore.saveOffset(timeUuidOffset).futureValue
+      saveOffset(timeUuidOffset)
       val offset = offsetStore.readOffset[TimeBasedUUID]()
       offset.futureValue shouldBe Some(timeUuidOffset)
     }
@@ -148,9 +171,11 @@ class R2dbcOffsetStoreSpec
     "save and retrieve offsets of unknown custom serializable type" in {
       val projectionId = genRandomProjectionId()
       val offsetStore = createOffsetStore(projectionId)
+      def saveOffset(offset: Any): Unit =
+        offsetStore.saveOffset(OffsetPidSeqNr(offset)).futureValue
 
       val customOffset = "abc"
-      offsetStore.saveOffset(customOffset).futureValue
+      saveOffset(customOffset)
       val offset = offsetStore.readOffset[TimeBasedUUID]()
       offset.futureValue shouldBe Some(customOffset)
     }
@@ -158,8 +183,11 @@ class R2dbcOffsetStoreSpec
     "save and retrieve MergeableOffset" in {
       val projectionId = genRandomProjectionId()
       val offsetStore = createOffsetStore(projectionId)
+      def saveOffset(offset: Any): Unit =
+        offsetStore.saveOffset(OffsetPidSeqNr(offset)).futureValue
+
       val origOffset = MergeableOffset(Map("abc" -> 1L, "def" -> 1L, "ghi" -> 
1L))
-      offsetStore.saveOffset(origOffset).futureValue
+      saveOffset(origOffset)
       val offset = offsetStore.readOffset[MergeableOffset[Long]]()
       offset.futureValue shouldBe Some(origOffset)
     }
@@ -167,16 +195,18 @@ class R2dbcOffsetStoreSpec
     "add new offsets to MergeableOffset" in {
       val projectionId = genRandomProjectionId()
       val offsetStore = createOffsetStore(projectionId)
+      def saveOffset(offset: Any): Unit =
+        offsetStore.saveOffset(OffsetPidSeqNr(offset)).futureValue
 
       val origOffset = MergeableOffset(Map("abc" -> 1L, "def" -> 1L))
-      offsetStore.saveOffset(origOffset).futureValue
+      saveOffset(origOffset)
 
       val offset1 = offsetStore.readOffset[MergeableOffset[Long]]()
       offset1.futureValue shouldBe Some(origOffset)
 
       // mix updates and inserts
       val updatedOffset = MergeableOffset(Map("abc" -> 2L, "def" -> 2L, "ghi" 
-> 1L))
-      offsetStore.saveOffset(updatedOffset).futureValue
+      saveOffset(updatedOffset)
 
       val offset2 = offsetStore.readOffset[MergeableOffset[Long]]()
       offset2.futureValue shouldBe Some(updatedOffset)
@@ -185,15 +215,17 @@ class R2dbcOffsetStoreSpec
     "update timestamp" in {
       val projectionId = genRandomProjectionId()
       val offsetStore = createOffsetStore(projectionId)
+      def saveOffset(offset: Any): Unit =
+        offsetStore.saveOffset(OffsetPidSeqNr(offset)).futureValue
 
       val instant0 = clock.instant()
-      offsetStore.saveOffset(15).futureValue
+      saveOffset(15)
 
       val instant1 = selectLastUpdated(projectionId)
       instant1 shouldBe instant0
 
       val instant2 = clock.tick(java.time.Duration.ofMillis(5))
-      offsetStore.saveOffset(16).futureValue
+      saveOffset(16)
 
       val instant3 = selectLastUpdated(projectionId)
       instant3 shouldBe instant2
@@ -202,8 +234,10 @@ class R2dbcOffsetStoreSpec
     "set offset" in {
       val projectionId = genRandomProjectionId()
       val offsetStore = createOffsetStore(projectionId)
+      def saveOffset(offset: Any): Unit =
+        offsetStore.saveOffset(OffsetPidSeqNr(offset)).futureValue
 
-      offsetStore.saveOffset(3L).futureValue
+      saveOffset(3L)
       offsetStore.readOffset[Long]().futureValue shouldBe Some(3L)
 
       offsetStore.managementSetOffset(2L).futureValue
@@ -213,8 +247,10 @@ class R2dbcOffsetStoreSpec
     "clear offset" in {
       val projectionId = genRandomProjectionId()
       val offsetStore = createOffsetStore(projectionId)
+      def saveOffset(offset: Any): Unit =
+        offsetStore.saveOffset(OffsetPidSeqNr(offset)).futureValue
 
-      offsetStore.saveOffset(3L).futureValue
+      saveOffset(3L)
       offsetStore.readOffset[Long]().futureValue shouldBe Some(3L)
 
       offsetStore.managementClearOffset().futureValue
diff --git 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala
 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala
index 96982c9..d56e6dc 100644
--- 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala
+++ 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala
@@ -34,6 +34,7 @@ import pekko.persistence.r2dbc.internal.EnvelopeOrigin
 import pekko.projection.BySlicesSourceProvider
 import pekko.projection.ProjectionId
 import pekko.projection.internal.ManagementState
+import pekko.projection.r2dbc.internal.OffsetPidSeqNr
 import pekko.projection.r2dbc.internal.R2dbcOffsetStore
 import pekko.projection.r2dbc.internal.R2dbcOffsetStore.Pid
 import pekko.projection.r2dbc.internal.R2dbcOffsetStore.Record
@@ -150,33 +151,34 @@ class R2dbcTimestampOffsetStoreSpec
 
       tick()
       val offset1 = TimestampOffset(clock.instant(), Map("p1" -> 3L))
-      offsetStore.saveOffset(offset1).futureValue
+      offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p1", 3L)).futureValue
       val readOffset1 = offsetStore.readOffset[TimestampOffset]()
       readOffset1.futureValue shouldBe Some(offset1)
 
       tick()
       val offset2 = TimestampOffset(clock.instant(), Map("p1" -> 4L))
-      offsetStore.saveOffset(offset2).futureValue
+      offsetStore.saveOffset(OffsetPidSeqNr(offset2, "p1", 4L)).futureValue
       val readOffset2 = offsetStore.readOffset[TimestampOffset]()
       readOffset2.futureValue shouldBe Some(offset2) // yep, saveOffset 
overwrites previous
     }
 
-    "save TimestampOffset with several entries" in {
+    "save TimestampOffset with several seen entries" in {
       val projectionId = genRandomProjectionId()
       val offsetStore = createOffsetStore(projectionId)
 
       tick()
       val offset1 = TimestampOffset(clock.instant(), Map("p1" -> 3L, "p2" -> 
1L, "p3" -> 5L))
-      offsetStore.saveOffset(offset1).futureValue
+      offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p1", 3L)).futureValue
       val readOffset1 = offsetStore.readOffset[TimestampOffset]()
-      readOffset1.futureValue shouldBe Some(offset1)
+      val expectedOffset1 = offset1.copy(seen = Map("p1" -> 3L))
+      readOffset1.futureValue shouldBe Some(expectedOffset1)
 
       tick()
       val offset2 = TimestampOffset(clock.instant(), Map("p1" -> 4L, "p3" -> 
6L, "p4" -> 9L))
-      offsetStore.saveOffset(offset2).futureValue
+      offsetStore.saveOffset(OffsetPidSeqNr(offset2, "p3", 6L)).futureValue
       val readOffset2 = offsetStore.readOffset[TimestampOffset]()
-      // p2 is not included in read offset because it wasn't updated and has 
earlier timestamp
-      readOffset2.futureValue shouldBe Some(offset2)
+      val expectedOffset2 = offset2.copy(seen = Map("p3" -> 6L))
+      readOffset2.futureValue shouldBe Some(expectedOffset2)
     }
 
     "save TimestampOffset when same timestamp" in {
@@ -185,13 +187,16 @@ class R2dbcTimestampOffsetStoreSpec
 
       tick()
       val offset1 = TimestampOffset(clock.instant(), Map("p1" -> 3L, "p2" -> 
1L, "p3" -> 5L))
-      offsetStore.saveOffset(offset1).futureValue
+      offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p1", 3L)).futureValue
+      offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p2", 1L)).futureValue
+      offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p3", 5L)).futureValue
       val readOffset1 = offsetStore.readOffset[TimestampOffset]()
       readOffset1.futureValue shouldBe Some(offset1)
 
       // not tick, same timestamp
       val offset2 = TimestampOffset(clock.instant(), Map("p2" -> 2L, "p4" -> 
9L))
-      offsetStore.saveOffset(offset2).futureValue
+      offsetStore.saveOffset(OffsetPidSeqNr(offset2, "p2", 2L)).futureValue
+      offsetStore.saveOffset(OffsetPidSeqNr(offset2, "p4", 9L)).futureValue
       val readOffset2 = offsetStore.readOffset[TimestampOffset]()
       // all should be included since same timestamp
       val expectedOffset2 = TimestampOffset(clock.instant(), Map("p1" -> 3L, 
"p2" -> 2L, "p3" -> 5L, "p4" -> 9L))
@@ -200,7 +205,7 @@ class R2dbcTimestampOffsetStoreSpec
       // saving new with later timestamp
       tick()
       val offset3 = TimestampOffset(clock.instant(), Map("p1" -> 4L))
-      offsetStore.saveOffset(offset3).futureValue
+      offsetStore.saveOffset(OffsetPidSeqNr(offset3, "p1", 4L)).futureValue
       val readOffset3 = offsetStore.readOffset[TimestampOffset]()
       // then it should only contain that entry
       readOffset3.futureValue shouldBe Some(offset3)
@@ -218,11 +223,19 @@ class R2dbcTimestampOffsetStoreSpec
       val offset3 = TimestampOffset(clock.instant(), Map("p6" -> 6L))
       tick()
       val offset4 = TimestampOffset(clock.instant(), Map("p1" -> 4L, "p3" -> 
6L, "p4" -> 9L))
-      val offsetsBatch1 = Vector(offset1, offset2, offset3, offset4)
+      val offsetsBatch1 = Vector(
+        OffsetPidSeqNr(offset1, "p1", 3L),
+        OffsetPidSeqNr(offset1, "p2", 1L),
+        OffsetPidSeqNr(offset1, "p3", 5L),
+        OffsetPidSeqNr(offset2, "p5", 1L),
+        OffsetPidSeqNr(offset3, "p6", 6L),
+        OffsetPidSeqNr(offset4, "p1", 4L),
+        OffsetPidSeqNr(offset4, "p3", 6L),
+        OffsetPidSeqNr(offset4, "p4", 9L))
 
       offsetStore.saveOffsets(offsetsBatch1).futureValue
       val readOffset1 = offsetStore.readOffset[TimestampOffset]()
-      readOffset1.futureValue shouldBe Some(offsetsBatch1.last)
+      readOffset1.futureValue shouldBe Some(offsetsBatch1.last.offset)
       offsetStore.getState().byPid("p1").seqNr shouldBe 4L
       offsetStore.getState().byPid("p2").seqNr shouldBe 1L
       offsetStore.getState().byPid("p3").seqNr shouldBe 6L
@@ -232,12 +245,12 @@ class R2dbcTimestampOffsetStoreSpec
 
       tick()
       val offset5 = TimestampOffset(clock.instant(), Map("p1" -> 5L))
-      offsetStore.saveOffsets(Vector(offset5)).futureValue
+      offsetStore.saveOffsets(Vector(OffsetPidSeqNr(offset5, "p1", 
5L))).futureValue
 
       tick()
       // duplicate
       val offset6 = TimestampOffset(clock.instant(), Map("p2" -> 1L))
-      offsetStore.saveOffsets(Vector(offset6)).futureValue
+      offsetStore.saveOffsets(Vector(OffsetPidSeqNr(offset6, "p2", 
1L))).futureValue
 
       tick()
       val offset7 = TimestampOffset(clock.instant(), Map("p1" -> 6L))
@@ -245,11 +258,12 @@ class R2dbcTimestampOffsetStoreSpec
       val offset8 = TimestampOffset(clock.instant(), Map("p1" -> 7L))
       tick()
       val offset9 = TimestampOffset(clock.instant(), Map("p1" -> 8L))
-      val offsetsBatch2 = Vector(offset7, offset8, offset9)
+      val offsetsBatch2 =
+        Vector(OffsetPidSeqNr(offset7, "p1", 6L), OffsetPidSeqNr(offset8, 
"p1", 7L), OffsetPidSeqNr(offset9, "p1", 8L))
 
       offsetStore.saveOffsets(offsetsBatch2).futureValue
       val readOffset2 = offsetStore.readOffset[TimestampOffset]()
-      readOffset2.futureValue shouldBe Some(offsetsBatch2.last)
+      readOffset2.futureValue shouldBe Some(offsetsBatch2.last.offset)
       offsetStore.getState().byPid("p1").seqNr shouldBe 8L
       offsetStore.getState().byPid("p2").seqNr shouldBe 1L
       offsetStore.getState().byPid("p3").seqNr shouldBe 6L
@@ -264,13 +278,13 @@ class R2dbcTimestampOffsetStoreSpec
 
       tick()
       val offset1 = TimestampOffset(clock.instant(), Map("p1" -> 3L))
-      offsetStore.saveOffset(offset1).futureValue
+      offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p1", 3L)).futureValue
       val readOffset1 = offsetStore.readOffset[TimestampOffset]()
       readOffset1.futureValue shouldBe Some(offset1)
 
       clock.setInstant(clock.instant().minusMillis(1))
       val offset2 = TimestampOffset(clock.instant(), Map("p1" -> 2L))
-      offsetStore.saveOffset(offset2).futureValue
+      offsetStore.saveOffset(OffsetPidSeqNr(offset2, "p1", 2L)).futureValue
       val readOffset2 = offsetStore.readOffset[TimestampOffset]()
       readOffset2.futureValue shouldBe Some(offset1) // keeping offset1
     }
@@ -306,16 +320,16 @@ class R2dbcTimestampOffsetStoreSpec
 
       tick()
       val offset1 = TimestampOffset(clock.instant(), Map(p1 -> 3L))
-      offsetStore0.saveOffset(offset1).futureValue
+      offsetStore0.saveOffset(OffsetPidSeqNr(offset1, p1, 3L)).futureValue
       tick()
       val offset2 = TimestampOffset(clock.instant(), Map(p2 -> 4L))
-      offsetStore0.saveOffset(offset2).futureValue
+      offsetStore0.saveOffset(OffsetPidSeqNr(offset2, p2, 4L)).futureValue
       tick()
       val offset3 = TimestampOffset(clock.instant(), Map(p3 -> 7L))
-      offsetStore0.saveOffset(offset3).futureValue
+      offsetStore0.saveOffset(OffsetPidSeqNr(offset3, p3, 7L)).futureValue
       tick()
       val offset4 = TimestampOffset(clock.instant(), Map(p4 -> 5L))
-      offsetStore0.saveOffset(offset4).futureValue
+      offsetStore0.saveOffset(OffsetPidSeqNr(offset4, p4, 5L)).futureValue
 
       val offsetStore1 =
         R2dbcOffsetStore.fromConfig(
@@ -344,13 +358,17 @@ class R2dbcTimestampOffsetStoreSpec
 
       tick()
       val offset1 = TimestampOffset(clock.instant(), Map("p1" -> 3L, "p2" -> 
1L, "p3" -> 5L))
-      offsetStore.saveOffset(offset1).futureValue
+      offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p1", 3L)).futureValue
+      offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p2", 1L)).futureValue
+      offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p3", 5L)).futureValue
       tick()
       val offset2 = TimestampOffset(clock.instant(), Map("p1" -> 4L, "p3" -> 
6L, "p4" -> 9L))
-      offsetStore.saveOffset(offset2).futureValue
+      offsetStore.saveOffset(OffsetPidSeqNr(offset2, "p1", 4L)).futureValue
+      offsetStore.saveOffset(OffsetPidSeqNr(offset2, "p3", 6L)).futureValue
+      offsetStore.saveOffset(OffsetPidSeqNr(offset2, "p4", 9L)).futureValue
       tick()
       val offset3 = TimestampOffset(clock.instant(), Map("p5" -> 10L))
-      offsetStore.saveOffset(offset3).futureValue
+      offsetStore.saveOffset(OffsetPidSeqNr(offset3, "p5", 10L)).futureValue
 
       offsetStore.isDuplicate(Record("p5", 10, offset3.timestamp)) shouldBe 
true
       offsetStore.isDuplicate(Record("p1", 4, offset2.timestamp)) shouldBe true
@@ -378,7 +396,9 @@ class R2dbcTimestampOffsetStoreSpec
 
       val startTime = TestClock.nowMicros().instant()
       val offset1 = TimestampOffset(startTime, Map("p1" -> 3L, "p2" -> 1L, 
"p3" -> 5L))
-      offsetStore.saveOffset(offset1).futureValue
+      offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p1", 3L)).futureValue
+      offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p2", 1L)).futureValue
+      offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p3", 5L)).futureValue
 
       // seqNr 1 is always accepted
       val env1 = createEnvelope("p4", 1L, startTime.plusMillis(1), "e4-1")
@@ -477,8 +497,12 @@ class R2dbcTimestampOffsetStoreSpec
       // it's keeping the inflight that are not in the "stored" state
       offsetStore.getInflight() shouldBe Map("p1" -> 4L, "p3" -> 8, "p4" -> 
2L, "p5" -> 8)
       // and they are removed from inflight once they have been stored
-      offsetStore.saveOffset(TimestampOffset(startTime.plusMillis(2), Map("p4" 
-> 2L))).futureValue
-      offsetStore.saveOffset(TimestampOffset(startTime.plusMillis(9), Map("p5" 
-> 8L))).futureValue
+      offsetStore
+        .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(2), 
Map("p4" -> 2L)), "p4", 2L))
+        .futureValue
+      offsetStore
+        .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(9), 
Map("p5" -> 8L)), "p5", 8L))
+        .futureValue
       offsetStore.getInflight() shouldBe Map("p1" -> 4L, "p3" -> 8)
     }
 
@@ -496,7 +520,9 @@ class R2dbcTimestampOffsetStoreSpec
       offsetStore.isAccepted(envelope1).futureValue shouldBe true
       offsetStore.addInflight(envelope1)
       offsetStore.getInflight() shouldBe Map("p1" -> 1L)
-      offsetStore.saveOffset(TimestampOffset(startTime.plusMillis(1), Map("p1" 
-> 1L))).futureValue
+      offsetStore
+        .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(1), 
Map("p1" -> 1L)), "p1", 1L))
+        .futureValue
       offsetStore.getInflight() shouldBe empty
 
       // seqNr 2 is accepts since it follows seqNr 1 that is stored in state
@@ -517,7 +543,9 @@ class R2dbcTimestampOffsetStoreSpec
       offsetStore.getInflight() shouldBe Map("p1" -> 3L)
 
       // and they are removed from inflight once they have been stored
-      offsetStore.saveOffset(TimestampOffset(startTime.plusMillis(2), Map("p1" 
-> 3L))).futureValue
+      offsetStore
+        .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(2), 
Map("p1" -> 3L)), "p1", 3L))
+        .futureValue
       offsetStore.getInflight() shouldBe empty
     }
 
@@ -527,7 +555,9 @@ class R2dbcTimestampOffsetStoreSpec
       val offsetStore = createOffsetStore(projectionId)
 
       val offset1 = TimestampOffset(startTime, Map("p1" -> 3L, "p2" -> 1L, 
"p3" -> 5L))
-      offsetStore.saveOffset(offset1).futureValue
+      offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p1", 3L)).futureValue
+      offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p2", 1L)).futureValue
+      offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p3", 5L)).futureValue
 
       // seqNr 1 is always accepted
       val env1 = createEnvelope("p4", 1L, startTime.plusMillis(1), "e4-1")
@@ -550,7 +580,9 @@ class R2dbcTimestampOffsetStoreSpec
 
       val startTime = TestClock.nowMicros().instant()
       val offset1 = TimestampOffset(startTime, Map("p1" -> 3L, "p2" -> 1L, 
"p3" -> 5L))
-      offsetStore.saveOffset(offset1).futureValue
+      offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p1", 3L)).futureValue
+      offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p2", 1L)).futureValue
+      offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p3", 5L)).futureValue
 
       // seqNr 1 is always accepted
       val env1 = createUpdatedDurableState("p4", 1L, startTime.plusMillis(1), 
"s4-1")
@@ -603,8 +635,12 @@ class R2dbcTimestampOffsetStoreSpec
       // it's keeping the inflight that are not in the "stored" state
       offsetStore.getInflight() shouldBe Map("p1" -> 4L, "p3" -> 20, "p4" -> 
2L, "p5" -> 7)
       // and they are removed from inflight once they have been stored
-      offsetStore.saveOffset(TimestampOffset(startTime.plusMillis(2), Map("p4" 
-> 2L))).futureValue
-      offsetStore.saveOffset(TimestampOffset(startTime.plusMillis(9), Map("p5" 
-> 8L))).futureValue
+      offsetStore
+        .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(2), 
Map("p4" -> 2L)), "p4", 2L))
+        .futureValue
+      offsetStore
+        .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(9), 
Map("p5" -> 8L)), "p5", 8L))
+        .futureValue
       offsetStore.getInflight() shouldBe Map("p1" -> 4L, "p3" -> 20)
     }
 
@@ -617,36 +653,69 @@ class R2dbcTimestampOffsetStoreSpec
       val startTime = TestClock.nowMicros().instant()
       log.debug("Start time [{}]", startTime)
 
-      offsetStore.saveOffset(TimestampOffset(startTime, Map("p1" -> 
1L))).futureValue
-      
offsetStore.saveOffset(TimestampOffset(startTime.plus(JDuration.ofSeconds(1)), 
Map("p2" -> 1L))).futureValue
-      
offsetStore.saveOffset(TimestampOffset(startTime.plus(JDuration.ofSeconds(2)), 
Map("p3" -> 1L))).futureValue
-      offsetStore.saveOffset(TimestampOffset(startTime.plus(evictInterval), 
Map("p4" -> 1L))).futureValue
+      offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime, 
Map("p1" -> 1L)), "p1", 1L)).futureValue
+      offsetStore
+        
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(JDuration.ofSeconds(1)),
 Map("p2" -> 1L)), "p2", 1L))
+        .futureValue
+      offsetStore
+        
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(JDuration.ofSeconds(2)),
 Map("p3" -> 1L)), "p3", 1L))
+        .futureValue
+      offsetStore
+        
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(evictInterval), 
Map("p4" -> 1L)), "p4", 1L))
+        .futureValue
       offsetStore
-        
.saveOffset(TimestampOffset(startTime.plus(evictInterval).plus(JDuration.ofSeconds(1)),
 Map("p4" -> 1L)))
+        .saveOffset(
+          OffsetPidSeqNr(
+            
TimestampOffset(startTime.plus(evictInterval).plus(JDuration.ofSeconds(1)), 
Map("p4" -> 1L)),
+            "p4",
+            1L))
         .futureValue
       offsetStore
-        
.saveOffset(TimestampOffset(startTime.plus(evictInterval).plus(JDuration.ofSeconds(2)),
 Map("p5" -> 1L)))
+        .saveOffset(
+          OffsetPidSeqNr(
+            
TimestampOffset(startTime.plus(evictInterval).plus(JDuration.ofSeconds(2)), 
Map("p5" -> 1L)),
+            "p5",
+            1L))
         .futureValue
       offsetStore
-        
.saveOffset(TimestampOffset(startTime.plus(evictInterval).plus(JDuration.ofSeconds(3)),
 Map("p6" -> 1L)))
+        .saveOffset(
+          OffsetPidSeqNr(
+            
TimestampOffset(startTime.plus(evictInterval).plus(JDuration.ofSeconds(3)), 
Map("p6" -> 1L)),
+            "p6",
+            1L))
         .futureValue
       offsetStore.getState().size shouldBe 6
 
-      
offsetStore.saveOffset(TimestampOffset(startTime.plus(timeWindow.minusSeconds(10)),
 Map("p7" -> 1L))).futureValue
+      offsetStore
+        
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.minusSeconds(10)),
 Map("p7" -> 1L)), "p7",
+          1L))
+        .futureValue
       offsetStore.getState().size shouldBe 7 // nothing evicted yet
 
       offsetStore
-        
.saveOffset(TimestampOffset(startTime.plus(timeWindow.plus(evictInterval).minusSeconds(3)),
 Map("p8" -> 1L)))
+        .saveOffset(
+          OffsetPidSeqNr(
+            
TimestampOffset(startTime.plus(timeWindow.plus(evictInterval).minusSeconds(3)), 
Map("p8" -> 1L)),
+            "p8",
+            1L))
         .futureValue
       offsetStore.getState().size shouldBe 8 // still nothing evicted yet
 
       offsetStore
-        
.saveOffset(TimestampOffset(startTime.plus(timeWindow.plus(evictInterval).plusSeconds(1)),
 Map("p8" -> 2L)))
+        .saveOffset(
+          OffsetPidSeqNr(
+            
TimestampOffset(startTime.plus(timeWindow.plus(evictInterval).plusSeconds(1)), 
Map("p8" -> 2L)),
+            "p8",
+            2L))
         .futureValue
       offsetStore.getState().byPid.keySet shouldBe Set("p5", "p6", "p7", "p8")
 
       offsetStore
-        
.saveOffset(TimestampOffset(startTime.plus(timeWindow.plus(evictInterval).plusSeconds(20)),
 Map("p8" -> 3L)))
+        .saveOffset(
+          OffsetPidSeqNr(
+            
TimestampOffset(startTime.plus(timeWindow.plus(evictInterval).plusSeconds(20)), 
Map("p8" -> 3L)),
+            "p8",
+            3L))
         .futureValue
       offsetStore.getState().byPid.keySet shouldBe Set("p7", "p8")
     }
@@ -660,23 +729,41 @@ class R2dbcTimestampOffsetStoreSpec
       val startTime = TestClock.nowMicros().instant()
       log.debug("Start time [{}]", startTime)
 
-      offsetStore.saveOffset(TimestampOffset(startTime, Map("p1" -> 
1L))).futureValue
-      
offsetStore.saveOffset(TimestampOffset(startTime.plus(JDuration.ofSeconds(1)), 
Map("p2" -> 1L))).futureValue
-      
offsetStore.saveOffset(TimestampOffset(startTime.plus(JDuration.ofSeconds(2)), 
Map("p3" -> 1L))).futureValue
-      
offsetStore.saveOffset(TimestampOffset(startTime.plus(JDuration.ofSeconds(3)), 
Map("p4" -> 1L))).futureValue
+      offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime, 
Map("p1" -> 1L)), "p1", 1L)).futureValue
+      offsetStore
+        
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(JDuration.ofSeconds(1)),
 Map("p2" -> 1L)), "p2", 1L))
+        .futureValue
+      offsetStore
+        
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(JDuration.ofSeconds(2)),
 Map("p3" -> 1L)), "p3", 1L))
+        .futureValue
+      offsetStore
+        
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(JDuration.ofSeconds(3)),
 Map("p4" -> 1L)), "p4", 1L))
+        .futureValue
       offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0
       offsetStore.readOffset().futureValue // this will load from database
       offsetStore.getState().size shouldBe 4
 
-      
offsetStore.saveOffset(TimestampOffset(startTime.plus(timeWindow.minusSeconds(2)),
 Map("p5" -> 1L))).futureValue
-      
offsetStore.saveOffset(TimestampOffset(startTime.plus(timeWindow.minusSeconds(1)),
 Map("p6" -> 1L))).futureValue
+      offsetStore
+        
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.minusSeconds(2)),
 Map("p5" -> 1L)), "p5",
+          1L))
+        .futureValue
+      offsetStore
+        
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.minusSeconds(1)),
 Map("p6" -> 1L)), "p6",
+          1L))
+        .futureValue
       // nothing deleted yet
       offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0
       offsetStore.readOffset().futureValue // this will load from database
       offsetStore.getState().size shouldBe 6
 
-      
offsetStore.saveOffset(TimestampOffset(startTime.plus(timeWindow.plusSeconds(1)),
 Map("p7" -> 1L))).futureValue
-      
offsetStore.saveOffset(TimestampOffset(startTime.plus(timeWindow.plusSeconds(2)),
 Map("p8" -> 1L))).futureValue
+      offsetStore
+        
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.plusSeconds(1)),
 Map("p7" -> 1L)), "p7",
+          1L))
+        .futureValue
+      offsetStore
+        
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.plusSeconds(2)),
 Map("p8" -> 1L)), "p8",
+          1L))
+        .futureValue
       offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 2
       offsetStore.readOffset().futureValue // this will load from database
       offsetStore.getState().byPid.keySet shouldBe Set("p3", "p4", "p5", "p6", 
"p7", "p8")
@@ -691,15 +778,22 @@ class R2dbcTimestampOffsetStoreSpec
       val startTime = TestClock.nowMicros().instant()
       log.debug("Start time [{}]", startTime)
 
-      offsetStore.saveOffset(TimestampOffset(startTime, Map("p1" -> 
1L))).futureValue
-      
offsetStore.saveOffset(TimestampOffset(startTime.plus(timeWindow.plusSeconds(1)),
 Map("p2" -> 1L))).futureValue
+      offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime, 
Map("p1" -> 1L)), "p1", 1L)).futureValue
+      offsetStore
+        
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.plusSeconds(1)),
 Map("p2" -> 1L)), "p2",
+          1L))
+        .futureValue
       eventually {
         offsetStore.readOffset().futureValue // this will load from database
         offsetStore.getState().byPid.keySet shouldBe Set("p2")
       }
 
       offsetStore
-        
.saveOffset(TimestampOffset(startTime.plus(timeWindow.multipliedBy(2).plusSeconds(2)),
 Map("p3" -> 1L)))
+        .saveOffset(
+          OffsetPidSeqNr(
+            
TimestampOffset(startTime.plus(timeWindow.multipliedBy(2).plusSeconds(2)), 
Map("p3" -> 1L)),
+            "p3",
+            1L))
         .futureValue
       eventually {
         offsetStore.readOffset().futureValue // this will load from database
@@ -720,7 +814,9 @@ class R2dbcTimestampOffsetStoreSpec
       tick()
       val offset4 = TimestampOffset(clock.instant(), Map("p4" -> 40L))
 
-      offsetStore.saveOffsets(Vector(offset1, offset4)).futureValue
+      offsetStore
+        .saveOffsets(Vector(OffsetPidSeqNr(offset1, "p1", 10L), 
OffsetPidSeqNr(offset4, "p4", 40L)))
+        .futureValue
 
       // offset without any seen pid/seqNr
       offsetStore.managementSetOffset(TimestampOffset(t2, seen = 
Map.empty)).futureValue
@@ -745,7 +841,7 @@ class R2dbcTimestampOffsetStoreSpec
       tick()
       val offset2 = TimestampOffset(clock.instant(), Map("p2" -> 4L))
 
-      offsetStore.saveOffsets(Vector(offset1, offset2)).futureValue
+      offsetStore.saveOffsets(Vector(OffsetPidSeqNr(offset1, "p1", 3L), 
OffsetPidSeqNr(offset2, "p2", 4L))).futureValue
 
       offsetStore.managementClearOffset().futureValue
       offsetStore.readOffset[TimestampOffset]().futureValue shouldBe None


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to