This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-r2dbc.git
The following commit(s) were added to refs/heads/main by this push:
new 55726a1 Port akka PRs 373, 377, 382: OffsetPidSeqNr, tags in events,
pubsub tags (#376)
55726a1 is described below
commit 55726a1275446fad1be07870e7ed35c8b37f064e
Author: PJ Fanning <[email protected]>
AuthorDate: Fri May 15 14:16:45 2026 +0100
Port akka PRs 373, 377, 382: OffsetPidSeqNr, tags in events, pubsub tags
(#376)
* Port akka PRs 373, 377, 382: OffsetPidSeqNr, tags in events, pubsub tags
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/0dd65fbd-cc87-4ea9-a5ce-a6ab564ecbb6
Co-authored-by: pjfanning <[email protected]>
* Rename event variable to eventPayload in PubSub.scala for clarity
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/0dd65fbd-cc87-4ea9-a5ce-a6ab564ecbb6
Co-authored-by: pjfanning <[email protected]>
* scalafmt
* Update R2dbcProjectionImpl.scala
* issue with tags column
* scalafmt
* Update MySQLQueryDao.scala
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
.../r2dbc/internal/EventsByPersistenceIdDao.scala | 11 +-
.../pekko/persistence/r2dbc/internal/PubSub.scala | 18 +-
.../persistence/r2dbc/journal/R2dbcJournal.scala | 17 +-
.../r2dbc/query/javadsl/R2dbcReadJournal.scala | 26 ++-
.../r2dbc/query/scaladsl/QueryDao.scala | 52 ++---
.../r2dbc/query/scaladsl/R2dbcReadJournal.scala | 60 ++++--
.../r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala | 35 ++--
.../projection/r2dbc/internal/OffsetPidSeqNr.scala | 32 +++
.../r2dbc/internal/R2dbcOffsetStore.scala | 36 ++--
.../r2dbc/internal/R2dbcProjectionImpl.scala | 49 ++++-
.../projection/r2dbc/R2dbcOffsetStoreSpec.scala | 70 +++++--
.../r2dbc/R2dbcTimestampOffsetStoreSpec.scala | 214 +++++++++++++++------
12 files changed, 435 insertions(+), 185 deletions(-)
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/EventsByPersistenceIdDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/EventsByPersistenceIdDao.scala
index 4099091..3664e92 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/EventsByPersistenceIdDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/EventsByPersistenceIdDao.scala
@@ -36,6 +36,11 @@ private[r2dbc] object EventsByPersistenceIdDao {
private val log = LoggerFactory.getLogger(classOf[EventsByPersistenceIdDao])
private final case class ByPersistenceIdState(queryCount: Int, rowCount:
Int, latestSeqNr: Long)
+
+ private def setFromDb[T](array: Array[T]): Set[T] = array match {
+ case null => Set.empty[T]
+ case entries => entries.toSet
+ }
}
/**
@@ -57,7 +62,7 @@ private[r2dbc] trait EventsByPersistenceIdDao {
protected def settings: BufferSize
private lazy val selectEventsSql = sql"""
- SELECT slice, entity_type, persistence_id, seq_nr, db_timestamp,
$statementTimestampSql AS read_db_timestamp, event_ser_id, event_ser_manifest,
event_payload, writer, adapter_manifest, meta_ser_id, meta_ser_manifest,
meta_payload
+ SELECT slice, entity_type, persistence_id, seq_nr, db_timestamp,
$statementTimestampSql AS read_db_timestamp, event_ser_id, event_ser_manifest,
event_payload, writer, adapter_manifest, meta_ser_id, meta_ser_manifest,
meta_payload, tags
from $journalTable
WHERE persistence_id = ? AND seq_nr >= ? AND seq_nr <= ?
AND deleted = false
@@ -67,7 +72,7 @@ private[r2dbc] trait EventsByPersistenceIdDao {
/**
* INTERNAL API: Used by both journal replay and currentEventsByPersistenceId
*/
- @InternalApi private[r2dbc] def internalEventsByPersistenceId(
+ @InternalApi private[r2dbc] def internalCurrentEventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long): Source[SerializedJournalRow, NotUsed] = {
@@ -140,7 +145,7 @@ private[r2dbc] trait EventsByPersistenceIdDao {
serId = row.get[Integer]("event_ser_id", classOf[Integer]),
serManifest = row.get("event_ser_manifest", classOf[String]),
writerUuid = row.get("writer", classOf[String]),
- tags = Set.empty, // tags not fetched in queries (yet)
+ tags = setFromDb(row.get("tags", classOf[Array[String]])),
metadata = readMetadata(row)))
if (log.isDebugEnabled)
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/PubSub.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/PubSub.scala
index 6d901d2..84db63d 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/PubSub.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/PubSub.scala
@@ -108,26 +108,26 @@ import org.slf4j.LoggerFactory
val slice = persistenceExt.sliceForPersistenceId(pid)
val offset = TimestampOffset(timestamp, timestamp, Map(pid ->
pr.sequenceNr))
- val payload =
- pr.payload match {
- case Tagged(payload, _) =>
- // eventsByTag not implemented (see issue #82), but events can
still be tagged, so we unwrap this tagged event.
- payload
- case other => other
- }
+ val (eventPayload, tags) = pr.payload match {
+ case Tagged(payload, tags) =>
+ (payload, tags)
+ case other =>
+ (other, Set.empty[String])
+ }
val envelope = new EventEnvelope(
offset,
pid,
pr.sequenceNr,
- Option(payload),
+ Option(eventPayload),
timestamp.toEpochMilli,
pr.metadata,
entityType,
slice,
filtered = false,
- source = EnvelopeOrigin.SourcePubSub)
+ source = EnvelopeOrigin.SourcePubSub,
+ tags)
eventTopic(entityType, slice) ! Topic.Publish(envelope)
}
}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala
index a95b0b5..34713ac 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala
@@ -100,8 +100,6 @@ private[r2dbc] final class R2dbcJournal(config: Config,
cfgPath: String) extends
// them to complete before we can read the highest sequence number or we
will miss it
private val writesInProgress = new java.util.HashMap[String, Future[_]]()
- private var eventsByTagNotImplementedLogged = false
-
override def receivePluginInternal: Receive = { case WriteFinished(pid, f) =>
writesInProgress.remove(pid, f)
}
@@ -113,8 +111,6 @@ private[r2dbc] final class R2dbcJournal(config: Config,
cfgPath: String) extends
atomicWrite.payload.map { pr =>
val (event, tags) = pr.payload match {
case Tagged(payload, tags) =>
- // eventsBytag not implemented, issue #82, but they are stored
- logEventsByTagsNotImplemented()
(payload.asInstanceOf[AnyRef], tags)
case other =>
(other.asInstanceOf[AnyRef], Set.empty[String])
@@ -197,17 +193,6 @@ private[r2dbc] final class R2dbcJournal(config: Config,
cfgPath: String) extends
dbTimestamp.map(_ => Done)(ExecutionContext.parasitic)
}
- private def logEventsByTagsNotImplemented(): Unit = {
- if (!eventsByTagNotImplementedLogged) {
- eventsByTagNotImplementedLogged = true
- log.info(
- "eventsByTag query not implemented by pekko-persistence-r2dbc. We
recommend using eventsBySlices instead. " +
- "The given tags are stored. " +
- "eventsByTag may be implemented in the future if there is strong
demand for it. " +
- "Let us know in
https://github.com/akka/akka-persistence-r2dbc/issues/82")
- }
- }
-
override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr:
Long): Future[Unit] = {
log.debug("asyncDeleteMessagesTo persistenceId [{}], toSequenceNr [{}]",
persistenceId, toSequenceNr)
journalDao.deleteMessagesTo(persistenceId, toSequenceNr)
@@ -220,7 +205,7 @@ private[r2dbc] final class R2dbcJournal(config: Config,
cfgPath: String) extends
if (max == Long.MaxValue) toSequenceNr
else math.min(toSequenceNr, fromSequenceNr + max - 1)
journalDao
- .internalEventsByPersistenceId(persistenceId, fromSequenceNr,
effectiveToSequenceNr)
+ .internalCurrentEventsByPersistenceId(persistenceId, fromSequenceNr,
effectiveToSequenceNr)
.runWith(Sink.foreach { row =>
val repr = deserializeRow(serialization, row)
recoveryCallback(repr)
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/javadsl/R2dbcReadJournal.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/javadsl/R2dbcReadJournal.scala
index 184b689..0be7612 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/javadsl/R2dbcReadJournal.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/javadsl/R2dbcReadJournal.scala
@@ -29,10 +29,14 @@ import pekko.persistence.query.{ EventEnvelope =>
ClassicEventEnvelope }
import pekko.persistence.query.Offset
import pekko.persistence.query.javadsl._
import pekko.persistence.query.typed.EventEnvelope
-import pekko.persistence.query.typed.javadsl.CurrentEventsBySliceQuery
-import pekko.persistence.query.typed.javadsl.EventTimestampQuery
-import pekko.persistence.query.typed.javadsl.EventsBySliceQuery
-import pekko.persistence.query.typed.javadsl.LoadEventQuery
+import pekko.persistence.query.typed.javadsl.{
+ CurrentEventsByPersistenceIdTypedQuery,
+ CurrentEventsBySliceQuery,
+ EventTimestampQuery,
+ EventsByPersistenceIdTypedQuery,
+ EventsBySliceQuery,
+ LoadEventQuery
+}
import pekko.persistence.r2dbc.query.scaladsl
import pekko.stream.javadsl.Source
@@ -47,7 +51,9 @@ final class R2dbcReadJournal(delegate:
scaladsl.R2dbcReadJournal)
with EventTimestampQuery
with LoadEventQuery
with CurrentEventsByPersistenceIdQuery
+ with CurrentEventsByPersistenceIdTypedQuery
with EventsByPersistenceIdQuery
+ with EventsByPersistenceIdTypedQuery
with CurrentPersistenceIdsQuery
with PagedPersistenceIdsQuery {
@@ -82,12 +88,24 @@ final class R2dbcReadJournal(delegate:
scaladsl.R2dbcReadJournal)
toSequenceNr: Long): Source[ClassicEventEnvelope, NotUsed] =
delegate.currentEventsByPersistenceId(persistenceId, fromSequenceNr,
toSequenceNr).asJava
+ override def currentEventsByPersistenceIdTyped[Event](
+ persistenceId: String,
+ fromSequenceNr: Long,
+ toSequenceNr: Long): Source[EventEnvelope[Event], NotUsed] =
+ delegate.currentEventsByPersistenceIdTyped[Event](persistenceId,
fromSequenceNr, toSequenceNr).asJava
+
override def eventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long): Source[ClassicEventEnvelope, NotUsed] =
delegate.eventsByPersistenceId(persistenceId, fromSequenceNr,
toSequenceNr).asJava
+ override def eventsByPersistenceIdTyped[Event](
+ persistenceId: String,
+ fromSequenceNr: Long,
+ toSequenceNr: Long): Source[EventEnvelope[Event], NotUsed] =
+ delegate.eventsByPersistenceIdTyped[Event](persistenceId, fromSequenceNr,
toSequenceNr).asJava
+
override def currentPersistenceIds(): Source[String, NotUsed] =
delegate.currentPersistenceIds().asJava
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
index 49d70b6..b79f7ac 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
@@ -47,6 +47,11 @@ import org.slf4j.LoggerFactory
object QueryDao {
val log: Logger = LoggerFactory.getLogger(classOf[QueryDao])
+ private def setFromDb[T](array: Array[T]): Set[T] = array match {
+ case null => Set.empty[T]
+ case entries => entries.toSet
+ }
+
def fromConfig(
settings: QuerySettings,
config: Config
@@ -71,6 +76,7 @@ private[r2dbc] class QueryDao(val settings: QuerySettings,
connectionFactory: Co
with EventsByPersistenceIdDao with HighestSequenceNrDao {
import JournalDao.readMetadata
import QueryDao.log
+ import QueryDao.setFromDb
implicit protected val dialect: Dialect = settings.dialect
protected lazy val statementTimestampSql: String = "statement_timestamp()"
@@ -80,38 +86,36 @@ private[r2dbc] class QueryDao(val settings: QuerySettings,
connectionFactory: Co
private val currentDbTimestampSql =
"SELECT transaction_timestamp() AS db_timestamp"
+ protected def selectColumns(backtracking: Boolean): String =
+ if (backtracking)
+ "SELECT slice, persistence_id, seq_nr, db_timestamp,
statement_timestamp() AS read_db_timestamp, tags "
+ else
+ "SELECT slice, persistence_id, seq_nr, db_timestamp,
statement_timestamp() AS read_db_timestamp, tags, event_ser_id,
event_ser_manifest, event_payload, meta_ser_id, meta_ser_manifest, meta_payload
"
+
+ protected def toDbTimestampParamCondition(toDbTimestampParam: Boolean):
String =
+ if (toDbTimestampParam) "AND db_timestamp <= ?" else ""
+
+ protected def behindCurrentTimeIntervalCondition(behindCurrentTime:
FiniteDuration): String =
+ if (behindCurrentTime > Duration.Zero)
+ s"AND db_timestamp < transaction_timestamp() - interval
'${behindCurrentTime.toMillis} milliseconds'"
+ else ""
+
protected def eventsBySlicesRangeSql(
toDbTimestampParam: Boolean,
behindCurrentTime: FiniteDuration,
backtracking: Boolean,
minSlice: Int,
- maxSlice: Int): String = {
-
- def toDbTimestampParamCondition =
- if (toDbTimestampParam) "AND db_timestamp <= ?" else ""
-
- def behindCurrentTimeIntervalCondition =
- if (behindCurrentTime > Duration.Zero)
- s"AND db_timestamp < transaction_timestamp() - interval
'${behindCurrentTime.toMillis} milliseconds'"
- else ""
-
- val selectColumns = {
- if (backtracking)
- "SELECT slice, persistence_id, seq_nr, db_timestamp,
statement_timestamp() AS read_db_timestamp "
- else
- "SELECT slice, persistence_id, seq_nr, db_timestamp,
statement_timestamp() AS read_db_timestamp, event_ser_id, event_ser_manifest,
event_payload, meta_ser_id, meta_ser_manifest, meta_payload "
- }
-
+ maxSlice: Int): String =
sql"""
- $selectColumns
+ ${selectColumns(backtracking)}
FROM $journalTable
WHERE entity_type = ?
AND ${sliceCondition(minSlice, maxSlice)}
- AND db_timestamp >= ? $toDbTimestampParamCondition
$behindCurrentTimeIntervalCondition
+ AND db_timestamp >= ? ${toDbTimestampParamCondition(toDbTimestampParam)}
${behindCurrentTimeIntervalCondition(
+ behindCurrentTime)}
AND deleted = false
ORDER BY db_timestamp, seq_nr
LIMIT ?"""
- }
private def sliceCondition(minSlice: Int, maxSlice: Int): String = {
settings.dialect match {
@@ -138,7 +142,7 @@ private[r2dbc] class QueryDao(val settings: QuerySettings,
connectionFactory: Co
WHERE persistence_id = ? AND seq_nr = ? AND deleted = false"""
private val selectOneEventSql = sql"""
- SELECT slice, entity_type, db_timestamp, $statementTimestampSql AS
read_db_timestamp, event_ser_id, event_ser_manifest, event_payload,
meta_ser_id, meta_ser_manifest, meta_payload
+ SELECT slice, entity_type, db_timestamp, $statementTimestampSql AS
read_db_timestamp, event_ser_id, event_ser_manifest, event_payload,
meta_ser_id, meta_ser_manifest, meta_payload, tags
FROM $journalTable
WHERE persistence_id = ? AND seq_nr = ? AND deleted = false"""
@@ -210,7 +214,7 @@ private[r2dbc] class QueryDao(val settings: QuerySettings,
connectionFactory: Co
serId = 0,
serManifest = "",
writerUuid = "", // not need in this query
- tags = Set.empty, // tags not fetched in queries (yet)
+ tags = Set.empty, // tags not fetched in backtracking
metadata = None)
else
SerializedJournalRow(
@@ -224,7 +228,7 @@ private[r2dbc] class QueryDao(val settings: QuerySettings,
connectionFactory: Co
serId = row.get[Integer]("event_ser_id", classOf[Integer]),
serManifest = row.get("event_ser_manifest", classOf[String]),
writerUuid = "", // not need in this query
- tags = Set.empty, // tags not fetched in queries (yet)
+ tags = setFromDb(row.get("tags", classOf[Array[String]])),
metadata = readMetadata(row)))
if (log.isDebugEnabled)
@@ -310,7 +314,7 @@ private[r2dbc] class QueryDao(val settings: QuerySettings,
connectionFactory: Co
serId = row.get[Integer]("event_ser_id", classOf[Integer]),
serManifest = row.get("event_ser_manifest", classOf[String]),
writerUuid = "", // not need in this query
- tags = Set.empty, // tags not fetched in queries (yet)
+ tags = setFromDb(row.get("tags", classOf[Array[String]])),
metadata = readMetadata(row)))
def persistenceIds(entityType: String, afterId: Option[String], limit:
Long): Source[String, NotUsed] = {
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
index 3ead172..a3e9ab3 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
@@ -32,10 +32,14 @@ import pekko.persistence.query.Offset
import pekko.persistence.query.TimestampOffset
import pekko.persistence.query.scaladsl._
import pekko.persistence.query.typed.EventEnvelope
-import pekko.persistence.query.typed.scaladsl.CurrentEventsBySliceQuery
-import pekko.persistence.query.typed.scaladsl.EventTimestampQuery
-import pekko.persistence.query.typed.scaladsl.EventsBySliceQuery
-import pekko.persistence.query.typed.scaladsl.LoadEventQuery
+import pekko.persistence.query.typed.scaladsl.{
+ CurrentEventsByPersistenceIdTypedQuery,
+ CurrentEventsBySliceQuery,
+ EventTimestampQuery,
+ EventsByPersistenceIdTypedQuery,
+ EventsBySliceQuery,
+ LoadEventQuery
+}
import pekko.persistence.query.{ EventEnvelope => ClassicEventEnvelope }
import pekko.persistence.r2dbc.QuerySettings
import pekko.persistence.r2dbc.internal.BySliceQuery
@@ -65,7 +69,9 @@ final class R2dbcReadJournal(system: ExtendedActorSystem,
config: Config, cfgPat
with EventTimestampQuery
with LoadEventQuery
with CurrentEventsByPersistenceIdQuery
+ with CurrentEventsByPersistenceIdTypedQuery
with EventsByPersistenceIdQuery
+ with EventsByPersistenceIdTypedQuery
with CurrentPersistenceIdsQuery
with PagedPersistenceIdsQuery {
import R2dbcReadJournal.ByPersistenceIdState
@@ -96,7 +102,8 @@ final class R2dbcReadJournal(system: ExtendedActorSystem,
config: Config, cfgPat
row.entityType,
row.slice,
filtered = false,
- source)
+ source,
+ tags = row.tags)
}
val extractOffset: EventEnvelope[Any] => TimestampOffset = env =>
env.offset.asInstanceOf[TimestampOffset]
@@ -229,7 +236,25 @@ final class R2dbcReadJournal(system: ExtendedActorSystem,
config: Config, cfgPat
override def currentEventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
- toSequenceNr: Long): Source[ClassicEventEnvelope, NotUsed] = {
+ toSequenceNr: Long): Source[ClassicEventEnvelope, NotUsed] =
+ internalCurrentEventsByPersistenceId(persistenceId, fromSequenceNr,
toSequenceNr)
+ .map(deserializeRow)
+
+ override def currentEventsByPersistenceIdTyped[Event](
+ persistenceId: String,
+ fromSequenceNr: Long,
+ toSequenceNr: Long): Source[EventEnvelope[Event], NotUsed] =
+ internalCurrentEventsByPersistenceId(persistenceId, fromSequenceNr,
toSequenceNr)
+ .map(deserializeBySliceRow[Event])
+
+ /**
+ * INTERNAL API: Used by both journal replay and currentEventsByPersistenceId
+ */
+ @InternalApi private[r2dbc] def internalCurrentEventsByPersistenceId(
+ persistenceId: String,
+ fromSequenceNr: Long,
+ toSequenceNr: Long): Source[SerializedJournalRow, NotUsed] = {
+
val highestSeqNrFut =
if (toSequenceNr == Long.MaxValue)
queryDao.readHighestSequenceNr(persistenceId, fromSequenceNr)
else Future.successful(toSequenceNr)
@@ -237,10 +262,9 @@ final class R2dbcReadJournal(system: ExtendedActorSystem,
config: Config, cfgPat
Source
.futureSource[SerializedJournalRow, NotUsed] {
highestSeqNrFut.map { highestSeqNr =>
- queryDao.internalEventsByPersistenceId(persistenceId,
fromSequenceNr, highestSeqNr)
+ queryDao.internalCurrentEventsByPersistenceId(persistenceId,
fromSequenceNr, highestSeqNr)
}
}
- .map(deserializeRow)
.mapMaterializedValue(_ => NotUsed)
}
@@ -264,7 +288,21 @@ final class R2dbcReadJournal(system: ExtendedActorSystem,
config: Config, cfgPat
override def eventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
- toSequenceNr: Long): Source[ClassicEventEnvelope, NotUsed] = {
+ toSequenceNr: Long): Source[ClassicEventEnvelope, NotUsed] =
+ internalEventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr)
+ .map(deserializeRow)
+
+ override def eventsByPersistenceIdTyped[Event](
+ persistenceId: String,
+ fromSequenceNr: Long,
+ toSequenceNr: Long): Source[EventEnvelope[Event], NotUsed] =
+ internalEventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr)
+ .map(deserializeBySliceRow[Event])
+
+ private def internalEventsByPersistenceId(
+ persistenceId: String,
+ fromSequenceNr: Long,
+ toSequenceNr: Long): Source[SerializedJournalRow, NotUsed] = {
log.debug("Starting eventsByPersistenceId query for persistenceId [{}],
from [{}].", persistenceId, fromSequenceNr)
@@ -318,7 +356,6 @@ final class R2dbcReadJournal(system: ExtendedActorSystem,
config: Config, cfgPat
updateState = nextOffset,
delayNextQuery = delayNextQuery,
nextQuery = nextQuery)
- .map(deserializeRow)
}
private def deserializeBySliceRow[Event](row: SerializedJournalRow):
EventEnvelope[Event] = {
@@ -337,7 +374,8 @@ final class R2dbcReadJournal(system: ExtendedActorSystem,
config: Config, cfgPat
row.entityType,
row.slice,
filtered = false,
- source)
+ source,
+ tags = row.tags)
}
private def deserializeRow(row: SerializedJournalRow): ClassicEventEnvelope
= {
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala
index dc04dda..260b5a7 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala
@@ -44,38 +44,33 @@ private[r2dbc] class MySQLQueryDao(
override lazy val statementTimestampSql: String = "NOW(6)"
+ override protected def selectColumns(backtracking: Boolean): String =
+ if (backtracking)
+ s"SELECT slice, persistence_id, seq_nr, db_timestamp,
$statementTimestampSql AS read_db_timestamp, tags "
+ else
+ s"SELECT slice, persistence_id, seq_nr, db_timestamp,
$statementTimestampSql AS read_db_timestamp, tags, event_ser_id,
event_ser_manifest, event_payload, meta_ser_id, meta_ser_manifest, meta_payload
"
+
+ override protected def behindCurrentTimeIntervalCondition(behindCurrentTime:
FiniteDuration): String =
+ if (behindCurrentTime > Duration.Zero)
+ s"AND db_timestamp < DATE_SUB($statementTimestampSql, INTERVAL
'${behindCurrentTime.toMicros}' MICROSECOND)"
+ else ""
+
override def eventsBySlicesRangeSql(
toDbTimestampParam: Boolean,
behindCurrentTime: FiniteDuration,
backtracking: Boolean,
minSlice: Int,
- maxSlice: Int): String = {
-
- def toDbTimestampParamCondition =
- if (toDbTimestampParam) "AND db_timestamp <= ?" else ""
-
- def behindCurrentTimeIntervalCondition =
- if (behindCurrentTime > Duration.Zero)
- s"AND db_timestamp < DATE_SUB($statementTimestampSql, INTERVAL
'${behindCurrentTime.toMicros}' MICROSECOND)"
- else ""
-
- val selectColumns = {
- if (backtracking)
- s"SELECT slice, persistence_id, seq_nr, db_timestamp,
$statementTimestampSql AS read_db_timestamp "
- else
- s"SELECT slice, persistence_id, seq_nr, db_timestamp,
$statementTimestampSql AS read_db_timestamp, event_ser_id, event_ser_manifest,
event_payload, meta_ser_id, meta_ser_manifest, meta_payload "
- }
-
+ maxSlice: Int): String =
sql"""
- $selectColumns
+ ${selectColumns(backtracking)}
FROM $journalTable
WHERE entity_type = ?
AND slice BETWEEN $minSlice AND $maxSlice
- AND db_timestamp >= ? $toDbTimestampParamCondition
$behindCurrentTimeIntervalCondition
+ AND db_timestamp >= ? ${toDbTimestampParamCondition(toDbTimestampParam)}
${behindCurrentTimeIntervalCondition(
+ behindCurrentTime)}
AND deleted = false
ORDER BY db_timestamp, seq_nr
LIMIT ?"""
- }
override def selectBucketsSql(minSlice: Int, maxSlice: Int): String = {
sql"""
diff --git
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/OffsetPidSeqNr.scala
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/OffsetPidSeqNr.scala
new file mode 100644
index 0000000..68edcca
--- /dev/null
+++
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/OffsetPidSeqNr.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 - 2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.projection.r2dbc.internal
+
+import org.apache.pekko.annotation.InternalApi
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] object OffsetPidSeqNr {
+ def apply(offset: Any, pid: String, seqNr: Long): OffsetPidSeqNr =
+ new OffsetPidSeqNr(offset, Some(pid -> seqNr))
+
+ def apply(offset: Any): OffsetPidSeqNr =
+ new OffsetPidSeqNr(offset, None)
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] final case class OffsetPidSeqNr(offset: Any,
pidSeqNr: Option[(String, Long)])
diff --git
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
index 01d26c8..061b9da 100644
---
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
+++
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
@@ -415,7 +415,7 @@ private[projection] class R2dbcOffsetStore(
/**
* Like saveOffsetInTx, but in own transaction. Used by atLeastOnce.
*/
- def saveOffset[Offset](offset: Offset): Future[Done] = {
+ def saveOffset(offset: OffsetPidSeqNr): Future[Done] = {
r2dbcExecutor
.withConnection("save offset") { conn =>
saveOffsetInTx(conn, offset)
@@ -426,18 +426,19 @@ private[projection] class R2dbcOffsetStore(
/**
* This method is used together with the users' handler code and run in same
transaction.
*/
- def saveOffsetInTx[Offset](conn: Connection, offset: Offset): Future[Done] =
{
+ def saveOffsetInTx(conn: Connection, offset: OffsetPidSeqNr): Future[Done] =
{
offset match {
- case t: TimestampOffset =>
- // TODO possible perf improvement to optimize for the normal case of 1
record
- val records = t.seen.map { case (pid, seqNr) => Record(pid, seqNr,
t.timestamp) }.toVector
- saveTimestampOffsetInTx(conn, records)
+ case OffsetPidSeqNr(t: TimestampOffset, Some((pid, seqNr))) =>
+ val record = Record(pid, seqNr, t.timestamp)
+ saveTimestampOffsetInTx(conn, Vector(record))
+ case OffsetPidSeqNr(_: TimestampOffset, None) =>
+ throw new IllegalArgumentException("Required EventEnvelope or
DurableStateChange for TimestampOffset.")
case _ =>
- savePrimitiveOffsetInTx(conn, offset)
+ savePrimitiveOffsetInTx(conn, offset.offset)
}
}
- def saveOffsets[Offset](offsets: immutable.IndexedSeq[Offset]): Future[Done]
= {
+ def saveOffsets(offsets: immutable.IndexedSeq[OffsetPidSeqNr]): Future[Done]
= {
r2dbcExecutor
.withConnection("save offsets") { conn =>
saveOffsetsInTx(conn, offsets)
@@ -445,17 +446,22 @@ private[projection] class R2dbcOffsetStore(
.map(_ => Done)(ExecutionContext.parasitic)
}
- def saveOffsetsInTx[Offset](conn: Connection, offsets:
immutable.IndexedSeq[Offset]): Future[Done] = {
- if (offsets.exists(_.isInstanceOf[TimestampOffset])) {
- val records = offsets.flatMap {
- case t: TimestampOffset =>
- t.seen.map { case (pid, seqNr) => Record(pid, seqNr, t.timestamp) }
+ def saveOffsetsInTx(conn: Connection, offsets:
immutable.IndexedSeq[OffsetPidSeqNr]): Future[Done] = {
+ if (offsets.isEmpty)
+ FutureDone
+ else if (offsets.head.offset.isInstanceOf[TimestampOffset]) {
+ val records = offsets.map {
+ case OffsetPidSeqNr(t: TimestampOffset, Some((pid, seqNr))) =>
+ Record(pid, seqNr, t.timestamp)
+ case OffsetPidSeqNr(_: TimestampOffset, None) =>
+ throw new IllegalArgumentException("Required EventEnvelope or
DurableStateChange for TimestampOffset.")
case _ =>
- Nil
+ throw new IllegalArgumentException(
+ "Mix of TimestampOffset and other offset type in same transaction
is not supported")
}
saveTimestampOffsetInTx(conn, records)
} else {
- savePrimitiveOffsetInTx(conn, offsets.last)
+ savePrimitiveOffsetInTx(conn, offsets.last.offset)
}
}
diff --git
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala
index ce2ff55..3d7d136 100644
---
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala
+++
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala
@@ -28,6 +28,7 @@ import pekko.annotation.InternalApi
import pekko.event.Logging
import pekko.event.LoggingAdapter
import pekko.persistence.query.DeletedDurableState
+import pekko.persistence.query.DurableStateChange
import pekko.persistence.query.UpdatedDurableState
import pekko.persistence.query.typed.EventEnvelope
import pekko.persistence.query.typed.scaladsl.LoadEventQuery
@@ -158,6 +159,25 @@ private[projection] object R2dbcProjectionImpl {
}
}
+ private def extractOffsetPidSeqNr[Offset, Envelope](
+ sourceProvider: SourceProvider[Offset, Envelope],
+ envelope: Envelope): OffsetPidSeqNr =
+ extractOffsetPidSeqNr(sourceProvider.extractOffset(envelope), envelope)
+
+ private def extractOffsetPidSeqNr[Offset, Envelope](offset: Offset,
envelope: Envelope): OffsetPidSeqNr = {
+ // we could define a new trait for the SourceProvider to implement this in
case other (custom) envelope types are needed
+ envelope match {
+ case env: EventEnvelope[_] => OffsetPidSeqNr(offset,
env.persistenceId, env.sequenceNr)
+ case chg: UpdatedDurableState[_] => OffsetPidSeqNr(offset,
chg.persistenceId, chg.revision)
+ case del: DeletedDurableState[_] => OffsetPidSeqNr(offset,
del.persistenceId, del.revision)
+ case change: DurableStateChange[_] =>
+ // in case additional types are added
+ throw new IllegalArgumentException(
+ s"DurableStateChange [${change.getClass.getName}] not implemented
yet. Please report bug at
https://github.com/apache/pekko-persistence-r2dbc/issues")
+ case _ => OffsetPidSeqNr(offset)
+ }
+ }
+
private[projection] def adaptedHandlerForExactlyOnce[Offset, Envelope](
sourceProvider: SourceProvider[Offset, Envelope],
handlerFactory: () => R2dbcHandler[Envelope],
@@ -169,11 +189,11 @@ private[projection] object R2dbcProjectionImpl {
offsetStore.isAccepted(envelope).flatMap {
case true =>
if (isFilteredEvent(envelope)) {
- val offset = sourceProvider.extractOffset(envelope)
+ val offset = extractOffsetPidSeqNr(sourceProvider, envelope)
offsetStore.saveOffset(offset)
} else {
loadEnvelope(envelope, sourceProvider).flatMap {
loadedEnvelope =>
- val offset = sourceProvider.extractOffset(loadedEnvelope)
+ val offset = extractOffsetPidSeqNr(sourceProvider,
loadedEnvelope)
r2dbcExecutor.withConnection("exactly-once handler") { conn
=>
// run users handler
val session = new R2dbcSession(conn)
@@ -208,7 +228,7 @@ private[projection] object R2dbcProjectionImpl {
} else {
Future.sequence(acceptedEnvelopes.map(env => loadEnvelope(env,
sourceProvider))).flatMap {
loadedEnvelopes =>
- val offsets =
loadedEnvelopes.iterator.map(sourceProvider.extractOffset).toVector
+ val offsets =
loadedEnvelopes.iterator.map(extractOffsetPidSeqNr(sourceProvider, _)).toVector
val filteredEnvelopes =
loadedEnvelopes.filterNot(isFilteredEvent)
if (filteredEnvelopes.isEmpty) {
offsetStore.saveOffsets(offsets)
@@ -512,8 +532,12 @@ private[projection] class R2dbcProjectionImpl[Offset,
Envelope](
offsetStore.readOffset()
// Called from InternalProjectionState.saveOffsetAndReport
- override def saveOffset(projectionId: ProjectionId, offset: Offset):
Future[Done] =
- offsetStore.saveOffset(offset)
+ override def saveOffset(projectionId: ProjectionId, offset: Offset):
Future[Done] = {
+ // need the envelope to be able to call offsetStore.saveOffset
+ // FIXME maybe we can cleanup this mess when moving R2dbcProjection to
the Pekko Projections repository?
+ throw new IllegalStateException(
+ "Unexpected call to saveOffset. It should have called
saveOffsetAndReport. Please report bug at
https://github.com/apache/pekko-persistence-r2dbc/issues")
+ }
override protected def saveOffsetAndReport(
projectionId: ProjectionId,
@@ -523,7 +547,18 @@ private[projection] class R2dbcProjectionImpl[Offset,
Envelope](
val envelope = projectionContext.envelope
if (offsetStore.isInflight(envelope) || isExactlyOnceWithSkip) {
- super.saveOffsetAndReport(projectionId, projectionContext, batchSize)
+ val offset =
R2dbcProjectionImpl.extractOffsetPidSeqNr(projectionContext.offset, envelope)
+ offsetStore
+ .saveOffset(offset)
+ .map { done =>
+ try {
+ statusObserver.offsetProgress(projectionId, envelope)
+ } catch {
+ case NonFatal(_) => // ignore
+ }
+ getTelemetry().onOffsetStored(batchSize)
+ done
+ }
} else {
FutureDone
}
@@ -547,7 +582,7 @@ private[projection] class R2dbcProjectionImpl[Offset,
Envelope](
if (acceptedContexts.isEmpty) {
FutureDone
} else {
- val offsets = acceptedContexts.map(_.offset)
+ val offsets = acceptedContexts.map(ctx =>
R2dbcProjectionImpl.extractOffsetPidSeqNr(ctx.offset, ctx.envelope))
offsetStore
.saveOffsets(offsets)
.map { done =>
diff --git
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreSpec.scala
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreSpec.scala
index ccd63ff..9575e5d 100644
---
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreSpec.scala
+++
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreSpec.scala
@@ -28,6 +28,7 @@ import
pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.projection.MergeableOffset
import pekko.projection.ProjectionId
import pekko.projection.internal.ManagementState
+import pekko.projection.r2dbc.internal.OffsetPidSeqNr
import pekko.projection.r2dbc.internal.R2dbcOffsetStore
import org.scalatest.wordspec.AnyWordSpecLike
@@ -74,12 +75,14 @@ class R2dbcOffsetStoreSpec
"save and read offsets" in {
val projectionId = genRandomProjectionId()
val offsetStore = createOffsetStore(projectionId)
+ def saveOffset(offset: Any): Unit =
+ offsetStore.saveOffset(OffsetPidSeqNr(offset)).futureValue
- offsetStore.saveOffset(1L).futureValue
+ saveOffset(1L)
val offset1 = offsetStore.readOffset[Long]()
offset1.futureValue shouldBe Some(1L)
- offsetStore.saveOffset(2L).futureValue
+ saveOffset(2L)
val offset2 = offsetStore.readOffset[Long]()
offset2.futureValue shouldBe Some(2L) // yep, saveOffset overwrites
previous
}
@@ -87,7 +90,10 @@ class R2dbcOffsetStoreSpec
"save and retrieve offsets of type Long" in {
val projectionId = genRandomProjectionId()
val offsetStore = createOffsetStore(projectionId)
- offsetStore.saveOffset(1L).futureValue
+ def saveOffset(offset: Any): Unit =
+ offsetStore.saveOffset(OffsetPidSeqNr(offset)).futureValue
+
+ saveOffset(1L)
val offset = offsetStore.readOffset[Long]()
offset.futureValue shouldBe Some(1L)
}
@@ -95,7 +101,10 @@ class R2dbcOffsetStoreSpec
"save and retrieve offsets of type java.lang.Long" in {
val projectionId = genRandomProjectionId()
val offsetStore = createOffsetStore(projectionId)
- offsetStore.saveOffset(java.lang.Long.valueOf(1L)).futureValue
+ def saveOffset(offset: Any): Unit =
+ offsetStore.saveOffset(OffsetPidSeqNr(offset)).futureValue
+
+ saveOffset(java.lang.Long.valueOf(1L))
val offset = offsetStore.readOffset[java.lang.Long]()
offset.futureValue shouldBe Some(1L)
}
@@ -103,7 +112,10 @@ class R2dbcOffsetStoreSpec
"save and retrieve offsets of type Int" in {
val projectionId = genRandomProjectionId()
val offsetStore = createOffsetStore(projectionId)
- offsetStore.saveOffset(1).futureValue
+ def saveOffset(offset: Any): Unit =
+ offsetStore.saveOffset(OffsetPidSeqNr(offset)).futureValue
+
+ saveOffset(1)
val offset = offsetStore.readOffset[Int]()
offset.futureValue shouldBe Some(1)
}
@@ -111,7 +123,10 @@ class R2dbcOffsetStoreSpec
"save and retrieve offsets of type java.lang.Integer" in {
val projectionId = genRandomProjectionId()
val offsetStore = createOffsetStore(projectionId)
- offsetStore.saveOffset(java.lang.Integer.valueOf(1)).futureValue
+ def saveOffset(offset: Any): Unit =
+ offsetStore.saveOffset(OffsetPidSeqNr(offset)).futureValue
+
+ saveOffset(java.lang.Integer.valueOf(1))
val offset = offsetStore.readOffset[java.lang.Integer]()
offset.futureValue shouldBe Some(1)
}
@@ -119,8 +134,11 @@ class R2dbcOffsetStoreSpec
"save and retrieve offsets of type String" in {
val projectionId = genRandomProjectionId()
val offsetStore = createOffsetStore(projectionId)
+ def saveOffset(offset: Any): Unit =
+ offsetStore.saveOffset(OffsetPidSeqNr(offset)).futureValue
+
val randOffset = UUID.randomUUID().toString
- offsetStore.saveOffset(randOffset).futureValue
+ saveOffset(randOffset)
val offset = offsetStore.readOffset[String]()
offset.futureValue shouldBe Some(randOffset)
}
@@ -128,8 +146,11 @@ class R2dbcOffsetStoreSpec
"save and retrieve offsets of type pekko.persistence.query.Sequence" in {
val projectionId = genRandomProjectionId()
val offsetStore = createOffsetStore(projectionId)
+ def saveOffset(offset: Any): Unit =
+ offsetStore.saveOffset(OffsetPidSeqNr(offset)).futureValue
+
val seqOffset = Sequence(1L)
- offsetStore.saveOffset(seqOffset).futureValue
+ saveOffset(seqOffset)
val offset = offsetStore.readOffset[Sequence]()
offset.futureValue shouldBe Some(seqOffset)
}
@@ -137,10 +158,12 @@ class R2dbcOffsetStoreSpec
"save and retrieve offsets of type pekko.persistence.query.TimeBasedUUID"
in {
val projectionId = genRandomProjectionId()
val offsetStore = createOffsetStore(projectionId)
+ def saveOffset(offset: Any): Unit =
+ offsetStore.saveOffset(OffsetPidSeqNr(offset)).futureValue
val timeUuidOffset =
TimeBasedUUID(UUID.fromString("49225740-2019-11ea-a752-ffae2393b6e4"))
// 2019-12-16T15:32:36.148Z[UTC]
- offsetStore.saveOffset(timeUuidOffset).futureValue
+ saveOffset(timeUuidOffset)
val offset = offsetStore.readOffset[TimeBasedUUID]()
offset.futureValue shouldBe Some(timeUuidOffset)
}
@@ -148,9 +171,11 @@ class R2dbcOffsetStoreSpec
"save and retrieve offsets of unknown custom serializable type" in {
val projectionId = genRandomProjectionId()
val offsetStore = createOffsetStore(projectionId)
+ def saveOffset(offset: Any): Unit =
+ offsetStore.saveOffset(OffsetPidSeqNr(offset)).futureValue
val customOffset = "abc"
- offsetStore.saveOffset(customOffset).futureValue
+ saveOffset(customOffset)
val offset = offsetStore.readOffset[TimeBasedUUID]()
offset.futureValue shouldBe Some(customOffset)
}
@@ -158,8 +183,11 @@ class R2dbcOffsetStoreSpec
"save and retrieve MergeableOffset" in {
val projectionId = genRandomProjectionId()
val offsetStore = createOffsetStore(projectionId)
+ def saveOffset(offset: Any): Unit =
+ offsetStore.saveOffset(OffsetPidSeqNr(offset)).futureValue
+
val origOffset = MergeableOffset(Map("abc" -> 1L, "def" -> 1L, "ghi" ->
1L))
- offsetStore.saveOffset(origOffset).futureValue
+ saveOffset(origOffset)
val offset = offsetStore.readOffset[MergeableOffset[Long]]()
offset.futureValue shouldBe Some(origOffset)
}
@@ -167,16 +195,18 @@ class R2dbcOffsetStoreSpec
"add new offsets to MergeableOffset" in {
val projectionId = genRandomProjectionId()
val offsetStore = createOffsetStore(projectionId)
+ def saveOffset(offset: Any): Unit =
+ offsetStore.saveOffset(OffsetPidSeqNr(offset)).futureValue
val origOffset = MergeableOffset(Map("abc" -> 1L, "def" -> 1L))
- offsetStore.saveOffset(origOffset).futureValue
+ saveOffset(origOffset)
val offset1 = offsetStore.readOffset[MergeableOffset[Long]]()
offset1.futureValue shouldBe Some(origOffset)
// mix updates and inserts
val updatedOffset = MergeableOffset(Map("abc" -> 2L, "def" -> 2L, "ghi"
-> 1L))
- offsetStore.saveOffset(updatedOffset).futureValue
+ saveOffset(updatedOffset)
val offset2 = offsetStore.readOffset[MergeableOffset[Long]]()
offset2.futureValue shouldBe Some(updatedOffset)
@@ -185,15 +215,17 @@ class R2dbcOffsetStoreSpec
"update timestamp" in {
val projectionId = genRandomProjectionId()
val offsetStore = createOffsetStore(projectionId)
+ def saveOffset(offset: Any): Unit =
+ offsetStore.saveOffset(OffsetPidSeqNr(offset)).futureValue
val instant0 = clock.instant()
- offsetStore.saveOffset(15).futureValue
+ saveOffset(15)
val instant1 = selectLastUpdated(projectionId)
instant1 shouldBe instant0
val instant2 = clock.tick(java.time.Duration.ofMillis(5))
- offsetStore.saveOffset(16).futureValue
+ saveOffset(16)
val instant3 = selectLastUpdated(projectionId)
instant3 shouldBe instant2
@@ -202,8 +234,10 @@ class R2dbcOffsetStoreSpec
"set offset" in {
val projectionId = genRandomProjectionId()
val offsetStore = createOffsetStore(projectionId)
+ def saveOffset(offset: Any): Unit =
+ offsetStore.saveOffset(OffsetPidSeqNr(offset)).futureValue
- offsetStore.saveOffset(3L).futureValue
+ saveOffset(3L)
offsetStore.readOffset[Long]().futureValue shouldBe Some(3L)
offsetStore.managementSetOffset(2L).futureValue
@@ -213,8 +247,10 @@ class R2dbcOffsetStoreSpec
"clear offset" in {
val projectionId = genRandomProjectionId()
val offsetStore = createOffsetStore(projectionId)
+ def saveOffset(offset: Any): Unit =
+ offsetStore.saveOffset(OffsetPidSeqNr(offset)).futureValue
- offsetStore.saveOffset(3L).futureValue
+ saveOffset(3L)
offsetStore.readOffset[Long]().futureValue shouldBe Some(3L)
offsetStore.managementClearOffset().futureValue
diff --git
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala
index 96982c9..d56e6dc 100644
---
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala
+++
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala
@@ -34,6 +34,7 @@ import pekko.persistence.r2dbc.internal.EnvelopeOrigin
import pekko.projection.BySlicesSourceProvider
import pekko.projection.ProjectionId
import pekko.projection.internal.ManagementState
+import pekko.projection.r2dbc.internal.OffsetPidSeqNr
import pekko.projection.r2dbc.internal.R2dbcOffsetStore
import pekko.projection.r2dbc.internal.R2dbcOffsetStore.Pid
import pekko.projection.r2dbc.internal.R2dbcOffsetStore.Record
@@ -150,33 +151,34 @@ class R2dbcTimestampOffsetStoreSpec
tick()
val offset1 = TimestampOffset(clock.instant(), Map("p1" -> 3L))
- offsetStore.saveOffset(offset1).futureValue
+ offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p1", 3L)).futureValue
val readOffset1 = offsetStore.readOffset[TimestampOffset]()
readOffset1.futureValue shouldBe Some(offset1)
tick()
val offset2 = TimestampOffset(clock.instant(), Map("p1" -> 4L))
- offsetStore.saveOffset(offset2).futureValue
+ offsetStore.saveOffset(OffsetPidSeqNr(offset2, "p1", 4L)).futureValue
val readOffset2 = offsetStore.readOffset[TimestampOffset]()
readOffset2.futureValue shouldBe Some(offset2) // yep, saveOffset
overwrites previous
}
- "save TimestampOffset with several entries" in {
+ "save TimestampOffset with several seen entries" in {
val projectionId = genRandomProjectionId()
val offsetStore = createOffsetStore(projectionId)
tick()
val offset1 = TimestampOffset(clock.instant(), Map("p1" -> 3L, "p2" ->
1L, "p3" -> 5L))
- offsetStore.saveOffset(offset1).futureValue
+ offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p1", 3L)).futureValue
val readOffset1 = offsetStore.readOffset[TimestampOffset]()
- readOffset1.futureValue shouldBe Some(offset1)
+ val expectedOffset1 = offset1.copy(seen = Map("p1" -> 3L))
+ readOffset1.futureValue shouldBe Some(expectedOffset1)
tick()
val offset2 = TimestampOffset(clock.instant(), Map("p1" -> 4L, "p3" ->
6L, "p4" -> 9L))
- offsetStore.saveOffset(offset2).futureValue
+ offsetStore.saveOffset(OffsetPidSeqNr(offset2, "p3", 6L)).futureValue
val readOffset2 = offsetStore.readOffset[TimestampOffset]()
- // p2 is not included in read offset because it wasn't updated and has
earlier timestamp
- readOffset2.futureValue shouldBe Some(offset2)
+ val expectedOffset2 = offset2.copy(seen = Map("p3" -> 6L))
+ readOffset2.futureValue shouldBe Some(expectedOffset2)
}
"save TimestampOffset when same timestamp" in {
@@ -185,13 +187,16 @@ class R2dbcTimestampOffsetStoreSpec
tick()
val offset1 = TimestampOffset(clock.instant(), Map("p1" -> 3L, "p2" ->
1L, "p3" -> 5L))
- offsetStore.saveOffset(offset1).futureValue
+ offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p1", 3L)).futureValue
+ offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p2", 1L)).futureValue
+ offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p3", 5L)).futureValue
val readOffset1 = offsetStore.readOffset[TimestampOffset]()
readOffset1.futureValue shouldBe Some(offset1)
// not tick, same timestamp
val offset2 = TimestampOffset(clock.instant(), Map("p2" -> 2L, "p4" ->
9L))
- offsetStore.saveOffset(offset2).futureValue
+ offsetStore.saveOffset(OffsetPidSeqNr(offset2, "p2", 2L)).futureValue
+ offsetStore.saveOffset(OffsetPidSeqNr(offset2, "p4", 9L)).futureValue
val readOffset2 = offsetStore.readOffset[TimestampOffset]()
// all should be included since same timestamp
val expectedOffset2 = TimestampOffset(clock.instant(), Map("p1" -> 3L,
"p2" -> 2L, "p3" -> 5L, "p4" -> 9L))
@@ -200,7 +205,7 @@ class R2dbcTimestampOffsetStoreSpec
// saving new with later timestamp
tick()
val offset3 = TimestampOffset(clock.instant(), Map("p1" -> 4L))
- offsetStore.saveOffset(offset3).futureValue
+ offsetStore.saveOffset(OffsetPidSeqNr(offset3, "p1", 4L)).futureValue
val readOffset3 = offsetStore.readOffset[TimestampOffset]()
// then it should only contain that entry
readOffset3.futureValue shouldBe Some(offset3)
@@ -218,11 +223,19 @@ class R2dbcTimestampOffsetStoreSpec
val offset3 = TimestampOffset(clock.instant(), Map("p6" -> 6L))
tick()
val offset4 = TimestampOffset(clock.instant(), Map("p1" -> 4L, "p3" ->
6L, "p4" -> 9L))
- val offsetsBatch1 = Vector(offset1, offset2, offset3, offset4)
+ val offsetsBatch1 = Vector(
+ OffsetPidSeqNr(offset1, "p1", 3L),
+ OffsetPidSeqNr(offset1, "p2", 1L),
+ OffsetPidSeqNr(offset1, "p3", 5L),
+ OffsetPidSeqNr(offset2, "p5", 1L),
+ OffsetPidSeqNr(offset3, "p6", 6L),
+ OffsetPidSeqNr(offset4, "p1", 4L),
+ OffsetPidSeqNr(offset4, "p3", 6L),
+ OffsetPidSeqNr(offset4, "p4", 9L))
offsetStore.saveOffsets(offsetsBatch1).futureValue
val readOffset1 = offsetStore.readOffset[TimestampOffset]()
- readOffset1.futureValue shouldBe Some(offsetsBatch1.last)
+ readOffset1.futureValue shouldBe Some(offsetsBatch1.last.offset)
offsetStore.getState().byPid("p1").seqNr shouldBe 4L
offsetStore.getState().byPid("p2").seqNr shouldBe 1L
offsetStore.getState().byPid("p3").seqNr shouldBe 6L
@@ -232,12 +245,12 @@ class R2dbcTimestampOffsetStoreSpec
tick()
val offset5 = TimestampOffset(clock.instant(), Map("p1" -> 5L))
- offsetStore.saveOffsets(Vector(offset5)).futureValue
+ offsetStore.saveOffsets(Vector(OffsetPidSeqNr(offset5, "p1",
5L))).futureValue
tick()
// duplicate
val offset6 = TimestampOffset(clock.instant(), Map("p2" -> 1L))
- offsetStore.saveOffsets(Vector(offset6)).futureValue
+ offsetStore.saveOffsets(Vector(OffsetPidSeqNr(offset6, "p2",
1L))).futureValue
tick()
val offset7 = TimestampOffset(clock.instant(), Map("p1" -> 6L))
@@ -245,11 +258,12 @@ class R2dbcTimestampOffsetStoreSpec
val offset8 = TimestampOffset(clock.instant(), Map("p1" -> 7L))
tick()
val offset9 = TimestampOffset(clock.instant(), Map("p1" -> 8L))
- val offsetsBatch2 = Vector(offset7, offset8, offset9)
+ val offsetsBatch2 =
+ Vector(OffsetPidSeqNr(offset7, "p1", 6L), OffsetPidSeqNr(offset8,
"p1", 7L), OffsetPidSeqNr(offset9, "p1", 8L))
offsetStore.saveOffsets(offsetsBatch2).futureValue
val readOffset2 = offsetStore.readOffset[TimestampOffset]()
- readOffset2.futureValue shouldBe Some(offsetsBatch2.last)
+ readOffset2.futureValue shouldBe Some(offsetsBatch2.last.offset)
offsetStore.getState().byPid("p1").seqNr shouldBe 8L
offsetStore.getState().byPid("p2").seqNr shouldBe 1L
offsetStore.getState().byPid("p3").seqNr shouldBe 6L
@@ -264,13 +278,13 @@ class R2dbcTimestampOffsetStoreSpec
tick()
val offset1 = TimestampOffset(clock.instant(), Map("p1" -> 3L))
- offsetStore.saveOffset(offset1).futureValue
+ offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p1", 3L)).futureValue
val readOffset1 = offsetStore.readOffset[TimestampOffset]()
readOffset1.futureValue shouldBe Some(offset1)
clock.setInstant(clock.instant().minusMillis(1))
val offset2 = TimestampOffset(clock.instant(), Map("p1" -> 2L))
- offsetStore.saveOffset(offset2).futureValue
+ offsetStore.saveOffset(OffsetPidSeqNr(offset2, "p1", 2L)).futureValue
val readOffset2 = offsetStore.readOffset[TimestampOffset]()
readOffset2.futureValue shouldBe Some(offset1) // keeping offset1
}
@@ -306,16 +320,16 @@ class R2dbcTimestampOffsetStoreSpec
tick()
val offset1 = TimestampOffset(clock.instant(), Map(p1 -> 3L))
- offsetStore0.saveOffset(offset1).futureValue
+ offsetStore0.saveOffset(OffsetPidSeqNr(offset1, p1, 3L)).futureValue
tick()
val offset2 = TimestampOffset(clock.instant(), Map(p2 -> 4L))
- offsetStore0.saveOffset(offset2).futureValue
+ offsetStore0.saveOffset(OffsetPidSeqNr(offset2, p2, 4L)).futureValue
tick()
val offset3 = TimestampOffset(clock.instant(), Map(p3 -> 7L))
- offsetStore0.saveOffset(offset3).futureValue
+ offsetStore0.saveOffset(OffsetPidSeqNr(offset3, p3, 7L)).futureValue
tick()
val offset4 = TimestampOffset(clock.instant(), Map(p4 -> 5L))
- offsetStore0.saveOffset(offset4).futureValue
+ offsetStore0.saveOffset(OffsetPidSeqNr(offset4, p4, 5L)).futureValue
val offsetStore1 =
R2dbcOffsetStore.fromConfig(
@@ -344,13 +358,17 @@ class R2dbcTimestampOffsetStoreSpec
tick()
val offset1 = TimestampOffset(clock.instant(), Map("p1" -> 3L, "p2" ->
1L, "p3" -> 5L))
- offsetStore.saveOffset(offset1).futureValue
+ offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p1", 3L)).futureValue
+ offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p2", 1L)).futureValue
+ offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p3", 5L)).futureValue
tick()
val offset2 = TimestampOffset(clock.instant(), Map("p1" -> 4L, "p3" ->
6L, "p4" -> 9L))
- offsetStore.saveOffset(offset2).futureValue
+ offsetStore.saveOffset(OffsetPidSeqNr(offset2, "p1", 4L)).futureValue
+ offsetStore.saveOffset(OffsetPidSeqNr(offset2, "p3", 6L)).futureValue
+ offsetStore.saveOffset(OffsetPidSeqNr(offset2, "p4", 9L)).futureValue
tick()
val offset3 = TimestampOffset(clock.instant(), Map("p5" -> 10L))
- offsetStore.saveOffset(offset3).futureValue
+ offsetStore.saveOffset(OffsetPidSeqNr(offset3, "p5", 10L)).futureValue
offsetStore.isDuplicate(Record("p5", 10, offset3.timestamp)) shouldBe
true
offsetStore.isDuplicate(Record("p1", 4, offset2.timestamp)) shouldBe true
@@ -378,7 +396,9 @@ class R2dbcTimestampOffsetStoreSpec
val startTime = TestClock.nowMicros().instant()
val offset1 = TimestampOffset(startTime, Map("p1" -> 3L, "p2" -> 1L,
"p3" -> 5L))
- offsetStore.saveOffset(offset1).futureValue
+ offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p1", 3L)).futureValue
+ offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p2", 1L)).futureValue
+ offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p3", 5L)).futureValue
// seqNr 1 is always accepted
val env1 = createEnvelope("p4", 1L, startTime.plusMillis(1), "e4-1")
@@ -477,8 +497,12 @@ class R2dbcTimestampOffsetStoreSpec
// it's keeping the inflight that are not in the "stored" state
offsetStore.getInflight() shouldBe Map("p1" -> 4L, "p3" -> 8, "p4" ->
2L, "p5" -> 8)
// and they are removed from inflight once they have been stored
- offsetStore.saveOffset(TimestampOffset(startTime.plusMillis(2), Map("p4"
-> 2L))).futureValue
- offsetStore.saveOffset(TimestampOffset(startTime.plusMillis(9), Map("p5"
-> 8L))).futureValue
+ offsetStore
+ .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(2),
Map("p4" -> 2L)), "p4", 2L))
+ .futureValue
+ offsetStore
+ .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(9),
Map("p5" -> 8L)), "p5", 8L))
+ .futureValue
offsetStore.getInflight() shouldBe Map("p1" -> 4L, "p3" -> 8)
}
@@ -496,7 +520,9 @@ class R2dbcTimestampOffsetStoreSpec
offsetStore.isAccepted(envelope1).futureValue shouldBe true
offsetStore.addInflight(envelope1)
offsetStore.getInflight() shouldBe Map("p1" -> 1L)
- offsetStore.saveOffset(TimestampOffset(startTime.plusMillis(1), Map("p1"
-> 1L))).futureValue
+ offsetStore
+ .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(1),
Map("p1" -> 1L)), "p1", 1L))
+ .futureValue
offsetStore.getInflight() shouldBe empty
// seqNr 2 is accepts since it follows seqNr 1 that is stored in state
@@ -517,7 +543,9 @@ class R2dbcTimestampOffsetStoreSpec
offsetStore.getInflight() shouldBe Map("p1" -> 3L)
// and they are removed from inflight once they have been stored
- offsetStore.saveOffset(TimestampOffset(startTime.plusMillis(2), Map("p1"
-> 3L))).futureValue
+ offsetStore
+ .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(2),
Map("p1" -> 3L)), "p1", 3L))
+ .futureValue
offsetStore.getInflight() shouldBe empty
}
@@ -527,7 +555,9 @@ class R2dbcTimestampOffsetStoreSpec
val offsetStore = createOffsetStore(projectionId)
val offset1 = TimestampOffset(startTime, Map("p1" -> 3L, "p2" -> 1L,
"p3" -> 5L))
- offsetStore.saveOffset(offset1).futureValue
+ offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p1", 3L)).futureValue
+ offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p2", 1L)).futureValue
+ offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p3", 5L)).futureValue
// seqNr 1 is always accepted
val env1 = createEnvelope("p4", 1L, startTime.plusMillis(1), "e4-1")
@@ -550,7 +580,9 @@ class R2dbcTimestampOffsetStoreSpec
val startTime = TestClock.nowMicros().instant()
val offset1 = TimestampOffset(startTime, Map("p1" -> 3L, "p2" -> 1L,
"p3" -> 5L))
- offsetStore.saveOffset(offset1).futureValue
+ offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p1", 3L)).futureValue
+ offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p2", 1L)).futureValue
+ offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p3", 5L)).futureValue
// seqNr 1 is always accepted
val env1 = createUpdatedDurableState("p4", 1L, startTime.plusMillis(1),
"s4-1")
@@ -603,8 +635,12 @@ class R2dbcTimestampOffsetStoreSpec
// it's keeping the inflight that are not in the "stored" state
offsetStore.getInflight() shouldBe Map("p1" -> 4L, "p3" -> 20, "p4" ->
2L, "p5" -> 7)
// and they are removed from inflight once they have been stored
- offsetStore.saveOffset(TimestampOffset(startTime.plusMillis(2), Map("p4"
-> 2L))).futureValue
- offsetStore.saveOffset(TimestampOffset(startTime.plusMillis(9), Map("p5"
-> 8L))).futureValue
+ offsetStore
+ .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(2),
Map("p4" -> 2L)), "p4", 2L))
+ .futureValue
+ offsetStore
+ .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(9),
Map("p5" -> 8L)), "p5", 8L))
+ .futureValue
offsetStore.getInflight() shouldBe Map("p1" -> 4L, "p3" -> 20)
}
@@ -617,36 +653,69 @@ class R2dbcTimestampOffsetStoreSpec
val startTime = TestClock.nowMicros().instant()
log.debug("Start time [{}]", startTime)
- offsetStore.saveOffset(TimestampOffset(startTime, Map("p1" ->
1L))).futureValue
-
offsetStore.saveOffset(TimestampOffset(startTime.plus(JDuration.ofSeconds(1)),
Map("p2" -> 1L))).futureValue
-
offsetStore.saveOffset(TimestampOffset(startTime.plus(JDuration.ofSeconds(2)),
Map("p3" -> 1L))).futureValue
- offsetStore.saveOffset(TimestampOffset(startTime.plus(evictInterval),
Map("p4" -> 1L))).futureValue
+ offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime,
Map("p1" -> 1L)), "p1", 1L)).futureValue
+ offsetStore
+
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(JDuration.ofSeconds(1)),
Map("p2" -> 1L)), "p2", 1L))
+ .futureValue
+ offsetStore
+
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(JDuration.ofSeconds(2)),
Map("p3" -> 1L)), "p3", 1L))
+ .futureValue
+ offsetStore
+
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(evictInterval),
Map("p4" -> 1L)), "p4", 1L))
+ .futureValue
offsetStore
-
.saveOffset(TimestampOffset(startTime.plus(evictInterval).plus(JDuration.ofSeconds(1)),
Map("p4" -> 1L)))
+ .saveOffset(
+ OffsetPidSeqNr(
+
TimestampOffset(startTime.plus(evictInterval).plus(JDuration.ofSeconds(1)),
Map("p4" -> 1L)),
+ "p4",
+ 1L))
.futureValue
offsetStore
-
.saveOffset(TimestampOffset(startTime.plus(evictInterval).plus(JDuration.ofSeconds(2)),
Map("p5" -> 1L)))
+ .saveOffset(
+ OffsetPidSeqNr(
+
TimestampOffset(startTime.plus(evictInterval).plus(JDuration.ofSeconds(2)),
Map("p5" -> 1L)),
+ "p5",
+ 1L))
.futureValue
offsetStore
-
.saveOffset(TimestampOffset(startTime.plus(evictInterval).plus(JDuration.ofSeconds(3)),
Map("p6" -> 1L)))
+ .saveOffset(
+ OffsetPidSeqNr(
+
TimestampOffset(startTime.plus(evictInterval).plus(JDuration.ofSeconds(3)),
Map("p6" -> 1L)),
+ "p6",
+ 1L))
.futureValue
offsetStore.getState().size shouldBe 6
-
offsetStore.saveOffset(TimestampOffset(startTime.plus(timeWindow.minusSeconds(10)),
Map("p7" -> 1L))).futureValue
+ offsetStore
+
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.minusSeconds(10)),
Map("p7" -> 1L)), "p7",
+ 1L))
+ .futureValue
offsetStore.getState().size shouldBe 7 // nothing evicted yet
offsetStore
-
.saveOffset(TimestampOffset(startTime.plus(timeWindow.plus(evictInterval).minusSeconds(3)),
Map("p8" -> 1L)))
+ .saveOffset(
+ OffsetPidSeqNr(
+
TimestampOffset(startTime.plus(timeWindow.plus(evictInterval).minusSeconds(3)),
Map("p8" -> 1L)),
+ "p8",
+ 1L))
.futureValue
offsetStore.getState().size shouldBe 8 // still nothing evicted yet
offsetStore
-
.saveOffset(TimestampOffset(startTime.plus(timeWindow.plus(evictInterval).plusSeconds(1)),
Map("p8" -> 2L)))
+ .saveOffset(
+ OffsetPidSeqNr(
+
TimestampOffset(startTime.plus(timeWindow.plus(evictInterval).plusSeconds(1)),
Map("p8" -> 2L)),
+ "p8",
+ 2L))
.futureValue
offsetStore.getState().byPid.keySet shouldBe Set("p5", "p6", "p7", "p8")
offsetStore
-
.saveOffset(TimestampOffset(startTime.plus(timeWindow.plus(evictInterval).plusSeconds(20)),
Map("p8" -> 3L)))
+ .saveOffset(
+ OffsetPidSeqNr(
+
TimestampOffset(startTime.plus(timeWindow.plus(evictInterval).plusSeconds(20)),
Map("p8" -> 3L)),
+ "p8",
+ 3L))
.futureValue
offsetStore.getState().byPid.keySet shouldBe Set("p7", "p8")
}
@@ -660,23 +729,41 @@ class R2dbcTimestampOffsetStoreSpec
val startTime = TestClock.nowMicros().instant()
log.debug("Start time [{}]", startTime)
- offsetStore.saveOffset(TimestampOffset(startTime, Map("p1" ->
1L))).futureValue
-
offsetStore.saveOffset(TimestampOffset(startTime.plus(JDuration.ofSeconds(1)),
Map("p2" -> 1L))).futureValue
-
offsetStore.saveOffset(TimestampOffset(startTime.plus(JDuration.ofSeconds(2)),
Map("p3" -> 1L))).futureValue
-
offsetStore.saveOffset(TimestampOffset(startTime.plus(JDuration.ofSeconds(3)),
Map("p4" -> 1L))).futureValue
+ offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime,
Map("p1" -> 1L)), "p1", 1L)).futureValue
+ offsetStore
+
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(JDuration.ofSeconds(1)),
Map("p2" -> 1L)), "p2", 1L))
+ .futureValue
+ offsetStore
+
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(JDuration.ofSeconds(2)),
Map("p3" -> 1L)), "p3", 1L))
+ .futureValue
+ offsetStore
+
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(JDuration.ofSeconds(3)),
Map("p4" -> 1L)), "p4", 1L))
+ .futureValue
offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0
offsetStore.readOffset().futureValue // this will load from database
offsetStore.getState().size shouldBe 4
-
offsetStore.saveOffset(TimestampOffset(startTime.plus(timeWindow.minusSeconds(2)),
Map("p5" -> 1L))).futureValue
-
offsetStore.saveOffset(TimestampOffset(startTime.plus(timeWindow.minusSeconds(1)),
Map("p6" -> 1L))).futureValue
+ offsetStore
+
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.minusSeconds(2)),
Map("p5" -> 1L)), "p5",
+ 1L))
+ .futureValue
+ offsetStore
+
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.minusSeconds(1)),
Map("p6" -> 1L)), "p6",
+ 1L))
+ .futureValue
// nothing deleted yet
offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0
offsetStore.readOffset().futureValue // this will load from database
offsetStore.getState().size shouldBe 6
-
offsetStore.saveOffset(TimestampOffset(startTime.plus(timeWindow.plusSeconds(1)),
Map("p7" -> 1L))).futureValue
-
offsetStore.saveOffset(TimestampOffset(startTime.plus(timeWindow.plusSeconds(2)),
Map("p8" -> 1L))).futureValue
+ offsetStore
+
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.plusSeconds(1)),
Map("p7" -> 1L)), "p7",
+ 1L))
+ .futureValue
+ offsetStore
+
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.plusSeconds(2)),
Map("p8" -> 1L)), "p8",
+ 1L))
+ .futureValue
offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 2
offsetStore.readOffset().futureValue // this will load from database
offsetStore.getState().byPid.keySet shouldBe Set("p3", "p4", "p5", "p6",
"p7", "p8")
@@ -691,15 +778,22 @@ class R2dbcTimestampOffsetStoreSpec
val startTime = TestClock.nowMicros().instant()
log.debug("Start time [{}]", startTime)
- offsetStore.saveOffset(TimestampOffset(startTime, Map("p1" ->
1L))).futureValue
-
offsetStore.saveOffset(TimestampOffset(startTime.plus(timeWindow.plusSeconds(1)),
Map("p2" -> 1L))).futureValue
+ offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime,
Map("p1" -> 1L)), "p1", 1L)).futureValue
+ offsetStore
+
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.plusSeconds(1)),
Map("p2" -> 1L)), "p2",
+ 1L))
+ .futureValue
eventually {
offsetStore.readOffset().futureValue // this will load from database
offsetStore.getState().byPid.keySet shouldBe Set("p2")
}
offsetStore
-
.saveOffset(TimestampOffset(startTime.plus(timeWindow.multipliedBy(2).plusSeconds(2)),
Map("p3" -> 1L)))
+ .saveOffset(
+ OffsetPidSeqNr(
+
TimestampOffset(startTime.plus(timeWindow.multipliedBy(2).plusSeconds(2)),
Map("p3" -> 1L)),
+ "p3",
+ 1L))
.futureValue
eventually {
offsetStore.readOffset().futureValue // this will load from database
@@ -720,7 +814,9 @@ class R2dbcTimestampOffsetStoreSpec
tick()
val offset4 = TimestampOffset(clock.instant(), Map("p4" -> 40L))
- offsetStore.saveOffsets(Vector(offset1, offset4)).futureValue
+ offsetStore
+ .saveOffsets(Vector(OffsetPidSeqNr(offset1, "p1", 10L),
OffsetPidSeqNr(offset4, "p4", 40L)))
+ .futureValue
// offset without any seen pid/seqNr
offsetStore.managementSetOffset(TimestampOffset(t2, seen =
Map.empty)).futureValue
@@ -745,7 +841,7 @@ class R2dbcTimestampOffsetStoreSpec
tick()
val offset2 = TimestampOffset(clock.instant(), Map("p2" -> 4L))
- offsetStore.saveOffsets(Vector(offset1, offset2)).futureValue
+ offsetStore.saveOffsets(Vector(OffsetPidSeqNr(offset1, "p1", 3L),
OffsetPidSeqNr(offset2, "p2", 4L))).futureValue
offsetStore.managementClearOffset().futureValue
offsetStore.readOffset[TimestampOffset]().futureValue shouldBe None
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]