This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-r2dbc.git


The following commit(s) were added to refs/heads/main by this push:
     new e1c2d4d  port durable state delete changes (#304)
e1c2d4d is described below

commit e1c2d4df5b59357777f254f8aa9dba19a37efaff
Author: PJ Fanning <[email protected]>
AuthorDate: Mon Mar 23 19:42:48 2026 +0100

    port durable state delete changes (#304)
    
    * port durable state delete changes
    
    * Update R2dbcDurableStateStore.scala
    
    * Update DurableStateDao.scala
    
    * Create delete-erffect.excludes
    
    * add tests
    
    * fix 1 test
    
    * Update DurableStateStoreSpec.scala
    
    * remove tests that no longer apply
    
    * Update DurableStateBySliceSpec.scala
    
    * Update DurableStateBySliceSpec.scala
    
    * copyright years
    
    * Fix race condition in emit DeletedDurableState test
    
    Co-authored-by: pjfanning <[email protected]>
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/71ff8fa8-e95a-4390-b012-ee6cbcb1a447
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 .../delete-erffect.excludes                        |  25 ++++
 .../r2dbc/state/scaladsl/DurableStateDao.scala     | 138 +++++++++++++++------
 .../state/scaladsl/R2dbcDurableStateStore.scala    |  48 +++++--
 .../pekko/persistence/r2dbc/TestActors.scala       |   5 +
 .../r2dbc/state/DurableStateBySliceSpec.scala      |  68 +++++++++-
 .../r2dbc/state/DurableStateStoreSpec.scala        |  78 +++++++++---
 docs/src/main/paradox/query.md                     |   6 +-
 .../r2dbc/internal/R2dbcOffsetStore.scala          |  13 +-
 .../r2dbc/internal/R2dbcProjectionImpl.scala       |  11 +-
 9 files changed, 317 insertions(+), 75 deletions(-)

diff --git 
a/core/src/main/mima-filters/2.0.x.backwards.excludes/delete-erffect.excludes 
b/core/src/main/mima-filters/2.0.x.backwards.excludes/delete-erffect.excludes
new file mode 100644
index 0000000..7c370d0
--- /dev/null
+++ 
b/core/src/main/mima-filters/2.0.x.backwards.excludes/delete-erffect.excludes
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Caused by https://github.com/apache/pekko-persistence-r2dbc/pull/304
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.persistence.r2dbc.state.scaladsl.DurableStateDao#SerializedStateRow.payload")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.persistence.r2dbc.state.scaladsl.DurableStateDao#SerializedStateRow._5")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.persistence.r2dbc.state.scaladsl.DurableStateDao#SerializedStateRow.copy$default$5")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.persistence.r2dbc.state.scaladsl.DurableStateDao#SerializedStateRow.copy")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.persistence.r2dbc.state.scaladsl.DurableStateDao#SerializedStateRow.this")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.persistence.r2dbc.state.scaladsl.DurableStateDao#SerializedStateRow.apply")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.persistence.r2dbc.state.scaladsl.DurableStateDao#SerializedStateRow.unapply")
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
index 415b7fe..96598d4 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
@@ -8,7 +8,7 @@
  */
 
 /*
- * Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2021-2022 Lightbend Inc. <https://www.lightbend.com>
  */
 
 package org.apache.pekko.persistence.r2dbc.state.scaladsl
@@ -37,6 +37,7 @@ import pekko.persistence.typed.PersistenceId
 import pekko.stream.scaladsl.Source
 import com.typesafe.config.Config
 import io.r2dbc.spi.ConnectionFactory
+import io.r2dbc.spi.Row
 import io.r2dbc.spi.R2dbcDataIntegrityViolationException
 import io.r2dbc.spi.Statement
 import org.slf4j.Logger
@@ -54,7 +55,7 @@ import org.slf4j.LoggerFactory
       revision: Long,
       dbTimestamp: Instant,
       readDbTimestamp: Instant,
-      payload: Array[Byte],
+      payload: Option[Array[Byte]],
       serId: Int,
       serManifest: String,
       tags: Set[String])
@@ -126,7 +127,7 @@ private[r2dbc] class DurableStateDao(settings: 
StateSettings, connectionFactory:
     (slice, entity_type, persistence_id, revision, state_ser_id, 
state_ser_manifest, state_payload, tags, db_timestamp)
     VALUES (?, ?, ?, ?, ?, ?, ?, ?, $transactionTimestampSql)"""
 
-  private val updateStateSql: String = {
+  private def updateStateSql(updateTags: Boolean): String = {
     val timestamp =
       if (settings.dbTimestampMonotonicIncreasing)
         s"$transactionTimestampSql"
@@ -138,14 +139,16 @@ private[r2dbc] class DurableStateDao(settings: 
StateSettings, connectionFactory:
       if (settings.durableStateAssertSingleWriter) " AND revision = ?"
       else ""
 
+    val tags = if (updateTags) "tags = ?," else ""
+
     sql"""
       UPDATE $stateTable
-      SET revision = ?, state_ser_id = ?, state_ser_manifest = ?, 
state_payload = ?, tags = ?, db_timestamp = $timestamp
+      SET revision = ?, state_ser_id = ?, state_ser_manifest = ?, 
state_payload = ?, $tags db_timestamp = $timestamp
       WHERE persistence_id = ?
       $revisionCondition"""
   }
 
-  private val deleteStateSql: String =
+  private val hardDeleteStateSql: String =
     sql"DELETE from $stateTable WHERE persistence_id = ?"
 
   private val deleteStateWithRevisionSql: String =
@@ -177,7 +180,7 @@ private[r2dbc] class DurableStateDao(settings: 
StateSettings, connectionFactory:
 
     val selectColumns =
       if (backtracking)
-        "SELECT persistence_id, revision, db_timestamp, statement_timestamp() 
AS read_db_timestamp "
+        "SELECT persistence_id, revision, db_timestamp, statement_timestamp() 
AS read_db_timestamp, state_ser_id "
       else
         "SELECT persistence_id, revision, db_timestamp, statement_timestamp() 
AS read_db_timestamp, state_ser_id, state_ser_manifest, state_payload "
 
@@ -203,18 +206,24 @@ private[r2dbc] class DurableStateDao(settings: 
StateSettings, connectionFactory:
           revision = row.get[java.lang.Long]("revision", 
classOf[java.lang.Long]),
           dbTimestamp = row.get("db_timestamp", classOf[Instant]),
           readDbTimestamp = Instant.EPOCH, // not needed here
-          payload = row.get("state_payload", classOf[Array[Byte]]),
+          getPayload(row),
           serId = row.get[Integer]("state_ser_id", classOf[Integer]),
           serManifest = row.get("state_ser_manifest", classOf[String]),
           tags = Set.empty // tags not fetched in queries (yet)
         ))
   }
 
-  def writeState(state: SerializedStateRow): Future[Done] = {
-    require(state.revision > 0)
+  private def getPayload(row: Row): Option[Array[Byte]] = {
+    val serId = row.get("state_ser_id", classOf[Integer])
+    val rowPayload = row.get("state_payload", classOf[Array[Byte]])
+    if (serId == 0 && (rowPayload == null || rowPayload.isEmpty))
+      None // delete marker
+    else
+      Option(rowPayload)
+  }
 
-    val entityType = PersistenceId.extractEntityType(state.persistenceId)
-    val slice = persistenceExt.sliceForPersistenceId(state.persistenceId)
+  def upsertState(state: SerializedStateRow): Future[Done] = {
+    require(state.revision > 0)
 
     def bindTags(stmt: Statement, i: Int): Statement = {
       if (state.tags.isEmpty)
@@ -225,6 +234,9 @@ private[r2dbc] class DurableStateDao(settings: 
StateSettings, connectionFactory:
 
     val result = {
       if (state.revision == 1) {
+        val entityType = PersistenceId.extractEntityType(state.persistenceId)
+        val slice = persistenceExt.sliceForPersistenceId(state.persistenceId)
+
         r2dbcExecutor
           .updateOne(s"insert [${state.persistenceId}]") { connection =>
             val stmt = connection
@@ -235,7 +247,7 @@ private[r2dbc] class DurableStateDao(settings: 
StateSettings, connectionFactory:
               .bind(3, state.revision)
               .bind(4, state.serId)
               .bind(5, state.serManifest)
-              .bind(6, state.payload)
+              .bind(6, state.payload.getOrElse(Array.emptyByteArray))
             bindTags(stmt, 7)
           }
           .recoverWith { case _: R2dbcDataIntegrityViolationException =>
@@ -248,11 +260,11 @@ private[r2dbc] class DurableStateDao(settings: 
StateSettings, connectionFactory:
 
         r2dbcExecutor.updateOne(s"update [${state.persistenceId}]") { 
connection =>
           val stmt = connection
-            .createStatement(updateStateSql)
+            .createStatement(updateStateSql(updateTags = true))
             .bind(0, state.revision)
             .bind(1, state.serId)
             .bind(2, state.serManifest)
-            .bind(3, state.payload)
+            .bind(3, state.payload.getOrElse(Array.emptyByteArray))
           bindTags(stmt, 4)
 
           if (settings.dbTimestampMonotonicIncreasing) {
@@ -289,18 +301,18 @@ private[r2dbc] class DurableStateDao(settings: 
StateSettings, connectionFactory:
     }
   }
 
-  def deleteState(persistenceId: String): Future[Done] = {
+  private def hardDeleteState(persistenceId: String): Future[Long] = {
     val result =
-      r2dbcExecutor.updateOne(s"delete [$persistenceId]") { connection =>
+      r2dbcExecutor.updateOne(s"hard delete [$persistenceId]") { connection =>
         connection
-          .createStatement(deleteStateSql)
+          .createStatement(hardDeleteStateSql)
           .bind(0, persistenceId)
       }
 
     if (log.isDebugEnabled())
-      result.foreach(_ => log.debug("Deleted durable state for persistenceId 
[{}]", persistenceId))
+      result.foreach(_ => log.debug("Hard deleted durable state for 
persistenceId [{}]", persistenceId))
 
-    result.map(_ => Done)(ExecutionContext.parasitic)
+    result
   }
 
   /**
@@ -310,19 +322,66 @@ private[r2dbc] class DurableStateDao(settings: 
StateSettings, connectionFactory:
    * @since 1.1.0
    */
   def deleteStateForRevision(persistenceId: String, revision: Long): 
Future[Long] = {
-    val result =
-      r2dbcExecutor.updateOne(s"delete [$persistenceId, $revision]") { 
connection =>
-        connection
-          .createStatement(deleteStateWithRevisionSql)
-          .bind(0, persistenceId)
-          .bind(1, revision)
+    if (revision == 0) {
+      hardDeleteState(persistenceId)
+    } else {
+      val result = {
+        if (revision == 1) {
+          val entityType = PersistenceId.extractEntityType(persistenceId)
+          val slice = persistenceExt.sliceForPersistenceId(persistenceId)
+
+          r2dbcExecutor
+            .updateOne(s"insert delete marker [$persistenceId]") { connection 
=>
+              connection
+                .createStatement(insertStateSql)
+                .bind(0, slice)
+                .bind(1, entityType)
+                .bind(2, persistenceId)
+                .bind(3, revision)
+                .bind(4, 0)
+                .bind(5, "")
+                .bind(6, Array.emptyByteArray)
+                .bindNull(7, classOf[Array[String]])
+            }
+            .recoverWith { case _: R2dbcDataIntegrityViolationException =>
+              Future.failed(new IllegalStateException(
+                s"Insert delete marker with revision 1 failed: durable state 
for persistence id [$persistenceId] already exists"))
+            }
+        } else {
+          val previousRevision = revision - 1
+
+          r2dbcExecutor.updateOne(s"delete [$persistenceId]") { connection =>
+            val stmt = connection
+              .createStatement(updateStateSql(updateTags = false))
+              .bind(0, revision)
+              .bind(1, 0)
+              .bind(2, "")
+              .bind(3, Array.emptyByteArray)
+
+            if (settings.dbTimestampMonotonicIncreasing) {
+              if (settings.durableStateAssertSingleWriter)
+                stmt
+                  .bind(4, persistenceId)
+                  .bind(5, previousRevision)
+              else
+                stmt
+                  .bind(4, persistenceId)
+            } else {
+              stmt
+                .bind(4, persistenceId)
+                .bind(5, previousRevision)
+                .bind(6, persistenceId)
+
+              if (settings.durableStateAssertSingleWriter)
+                stmt.bind(7, previousRevision)
+              else
+                stmt
+            }
+          }
+        }
       }
-
-    if (log.isDebugEnabled())
-      result.foreach(_ =>
-        log.debug("Deleted durable state for persistenceId [{}]; revision 
[{}]", persistenceId, revision))
-
-    result
+      result
+    }
   }
 
   override def currentDbTimestamp(): Future[Instant] = {
@@ -366,24 +425,31 @@ private[r2dbc] class DurableStateDao(settings: 
StateSettings, connectionFactory:
         stmt
       },
       row =>
-        if (backtracking)
+        if (backtracking) {
+          val serId = row.get("state_ser_id", classOf[Integer])
+          // would have been better with an explicit deleted column as in the 
journal table,
+          // but not worth the schema change
+          val isDeleted = serId == 0
+
           SerializedStateRow(
             persistenceId = row.get("persistence_id", classOf[String]),
-            revision = row.get[java.lang.Long]("revision", 
classOf[java.lang.Long]),
+            revision = row.get("revision", classOf[Long]),
             dbTimestamp = row.get("db_timestamp", classOf[Instant]),
             readDbTimestamp = row.get("read_db_timestamp", classOf[Instant]),
-            payload = null, // lazy loaded for backtracking
+            // payload = null => lazy loaded for backtracking (ugly, but not 
worth changing UpdatedDurableState in Akka)
+            // payload = None => DeletedDurableState (no lazy loading)
+            payload = if (isDeleted) None else null,
             serId = 0,
             serManifest = "",
             tags = Set.empty // tags not fetched in queries (yet)
           )
-        else
+        } else
           SerializedStateRow(
             persistenceId = row.get("persistence_id", classOf[String]),
             revision = row.get[java.lang.Long]("revision", 
classOf[java.lang.Long]),
             dbTimestamp = row.get("db_timestamp", classOf[Instant]),
             readDbTimestamp = row.get("read_db_timestamp", classOf[Instant]),
-            payload = row.get("state_payload", classOf[Array[Byte]]),
+            payload = getPayload(row),
             serId = row.get[Integer]("state_ser_id", classOf[Integer]),
             serManifest = row.get("state_ser_manifest", classOf[String]),
             tags = Set.empty // tags not fetched in queries (yet)
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala
index c1115eb..d53b479 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala
@@ -8,7 +8,7 @@
  */
 
 /*
- * Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2021-2022 Lightbend Inc. <https://www.lightbend.com>
  */
 
 package org.apache.pekko.persistence.r2dbc.state.scaladsl
@@ -24,6 +24,7 @@ import pekko.actor.ExtendedActorSystem
 import pekko.actor.typed.ActorSystem
 import pekko.actor.typed.scaladsl.adapter._
 import pekko.persistence.Persistence
+import pekko.persistence.query.DeletedDurableState
 import pekko.persistence.query.DurableStateChange
 import pekko.persistence.query.Offset
 import pekko.persistence.query.TimestampOffset
@@ -65,8 +66,21 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, 
config: Config, cfg
 
   private val bySlice: BySliceQuery[SerializedStateRow, DurableStateChange[A]] 
= {
     val createEnvelope: (TimestampOffset, SerializedStateRow) => 
DurableStateChange[A] = (offset, row) => {
-      val payload = serialization.deserialize(row.payload, row.serId, 
row.serManifest).get.asInstanceOf[A]
-      new UpdatedDurableState(row.persistenceId, row.revision, payload, 
offset, row.dbTimestamp.toEpochMilli)
+      row.payload match {
+        case null =>
+          // payload = null => lazy loaded for backtracking (ugly, but not 
worth changing UpdatedDurableState in Akka)
+          new UpdatedDurableState(
+            row.persistenceId,
+            row.revision,
+            null.asInstanceOf[A],
+            offset,
+            row.dbTimestamp.toEpochMilli)
+        case Some(bytes) =>
+          val payload = serialization.deserialize(bytes, row.serId, 
row.serManifest).get.asInstanceOf[A]
+          new UpdatedDurableState(row.persistenceId, row.revision, payload, 
offset, row.dbTimestamp.toEpochMilli)
+        case None =>
+          new DeletedDurableState(row.persistenceId, row.revision, offset, 
row.dbTimestamp.toEpochMilli)
+      }
     }
 
     val extractOffset: DurableStateChange[A] => TimestampOffset = env => 
env.offset.asInstanceOf[TimestampOffset]
@@ -79,14 +93,23 @@ class R2dbcDurableStateStore[A](system: 
ExtendedActorSystem, config: Config, cfg
     stateDao.readState(persistenceId).map {
       case None                => GetObjectResult(None, 0L)
       case Some(serializedRow) =>
-        val payload = serialization
-          .deserialize(serializedRow.payload, serializedRow.serId, 
serializedRow.serManifest)
-          .get
-          .asInstanceOf[A]
-        GetObjectResult(Some(payload), serializedRow.revision)
+        val payload =
+          serializedRow.payload.map { bytes =>
+            serialization
+              .deserialize(bytes, serializedRow.serId, 
serializedRow.serManifest)
+              .get
+              .asInstanceOf[A]
+          }
+        GetObjectResult(payload, serializedRow.revision)
     }
   }
 
+  /**
+   * Insert the value if `revision` is 1, which will fail with 
`IllegalStateException` if there is already a stored
+   * value for the given `persistenceId`. Otherwise update the value, which 
will fail with `IllegalStateException` if
+   * the existing stored `revision` + 1 isn't equal to the given `revision`. 
This optimistic locking check can be
+   * disabled with configuration `assert-single-writer`.
+   */
   override def upsertObject(persistenceId: String, revision: Long, value: A, 
tag: String): Future[Done] = {
     val valueAnyRef = value.asInstanceOf[AnyRef]
     val serialized = serialization.serialize(valueAnyRef).get
@@ -98,16 +121,17 @@ class R2dbcDurableStateStore[A](system: 
ExtendedActorSystem, config: Config, cfg
       revision,
       DurableStateDao.EmptyDbTimestamp,
       DurableStateDao.EmptyDbTimestamp,
-      serialized,
+      Some(serialized),
       serializer.identifier,
       manifest,
       if (tag.isEmpty) Set.empty else Set(tag))
 
-    stateDao.writeState(serializedRow)
-
+    stateDao.upsertState(serializedRow)
   }
+
   override def deleteObject(persistenceId: String): Future[Done] =
-    stateDao.deleteState(persistenceId)
+    stateDao.deleteStateForRevision(persistenceId, 0L)
+      .map(_ => Done)(ExecutionContext.parasitic)
 
   override def deleteObject(persistenceId: String, revision: Long): 
Future[Done] = {
     stateDao.deleteStateForRevision(persistenceId, revision).map { count =>
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestActors.scala 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestActors.scala
index 95fed2f..bf336a0 100644
--- a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestActors.scala
+++ b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestActors.scala
@@ -117,6 +117,7 @@ object TestActors {
     sealed trait Command
     final case class Persist(payload: Any) extends Command
     final case class PersistWithAck(payload: Any, replyTo: ActorRef[Done]) 
extends Command
+    final case class DeleteWithAck(replyTo: ActorRef[Done]) extends Command
     final case class Ping(replyTo: ActorRef[Done]) extends Command
     final case class Stop(replyTo: ActorRef[Done]) extends Command
 
@@ -144,6 +145,10 @@ object TestActors {
                   pid.id: Object,
                   (DurableStateBehavior.lastSequenceNumber(context) + 1: 
java.lang.Long): Object)
                 Effect.persist(command.payload).thenRun(_ => command.replyTo ! 
Done)
+              case command: DeleteWithAck =>
+                context.log
+                  .debug("Delete pid [{}], seqNr [{}]", pid.id, 
DurableStateBehavior.lastSequenceNumber(context) + 1)
+                Effect.delete[Any]().thenRun(_ => command.replyTo ! Done)
               case Ping(replyTo) =>
                 replyTo ! Done
                 Effect.none
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateBySliceSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateBySliceSpec.scala
index 5c30435..bb6d588 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateBySliceSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateBySliceSpec.scala
@@ -8,7 +8,7 @@
  */
 
 /*
- * Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2021-2022 Lightbend Inc. <https://www.lightbend.com>
  */
 
 package org.apache.pekko.persistence.r2dbc.state
@@ -24,12 +24,14 @@ import pekko.actor.testkit.typed.scaladsl.LogCapturing
 import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
 import pekko.actor.testkit.typed.scaladsl.TestProbe
 import pekko.actor.typed.ActorSystem
+import pekko.persistence.query.DeletedDurableState
 import pekko.persistence.query.DurableStateChange
 import pekko.persistence.query.NoOffset
 import pekko.persistence.query.Offset
 import pekko.persistence.query.TimestampOffset
 import pekko.persistence.query.UpdatedDurableState
 import pekko.persistence.r2dbc.TestActors
+import pekko.persistence.r2dbc.TestActors.DurableStatePersister.DeleteWithAck
 import pekko.persistence.r2dbc.TestActors.DurableStatePersister.Persist
 import pekko.persistence.r2dbc.TestActors.DurableStatePersister.PersistWithAck
 import pekko.persistence.r2dbc.TestConfig
@@ -175,13 +177,38 @@ class DurableStateBySliceSpec
         assertFinished(updatedDurableStateProbe2, withOffsetDone)
         killSwitch.shutdown()
       }
+
+      "emit DeletedDurableState for latest deleted state" in new Setup {
+        val timeout = 10.seconds
+        for (i <- 1 to 3) {
+          persister ! PersistWithAck(s"s-$i", probe.ref)
+          probe.expectMessage(timeout, Done)
+        }
+
+        // Use store directly to ensure delete is committed before query runs.
+        // (Effect.delete() in Pekko DurableStateBehavior is fire-and-forget: 
side effects
+        // like thenRun fire before the DB write completes, creating a race 
for Current queries.)
+        query.deleteObject(persistenceId, 4L).futureValue
+
+        val deletedDurableStateProbe = 
createTestProbe[DeletedDurableState[String]]()
+
+        val done =
+          doQuery(entityType, slice, slice, NoOffset)
+            .collect { case d: DeletedDurableState[String] => d }
+            .via(killSwitch.flow)
+            .runWith(Sink.foreach(deletedDurableStateProbe.ref.tell))
+
+        deletedDurableStateProbe.receiveMessage(timeout).revision shouldBe 4
+        assertFinished(updatedDurableStateProbe, done)
+        killSwitch.shutdown()
+      }
     }
   }
 
   // tests just relevant for current query
   "Current changesBySlices" should {
     "filter states with the same timestamp based on seen sequence nrs" in new 
Setup {
-      persister ! PersistWithAck(s"s-1", probe.ref)
+      persister ! PersistWithAck("s-1", probe.ref)
       probe.expectMessage(Done)
       val singleState: UpdatedDurableState[String] =
         query
@@ -199,7 +226,7 @@ class DurableStateBySliceSpec
     }
 
     "not filter states with the same timestamp based on sequence nrs" in new 
Setup {
-      persister ! PersistWithAck(s"s-1", probe.ref)
+      persister ! PersistWithAck("s-1", probe.ref)
       probe.expectMessage(Done)
       val singleState: UpdatedDurableState[String] =
         query
@@ -246,6 +273,41 @@ class DurableStateBySliceSpec
       killSwitch.shutdown()
     }
 
+    "find delete" in new Setup {
+      for (i <- 1 to 19) {
+        persister ! PersistWithAck(s"s-$i", probe.ref)
+        probe.expectMessage(Done)
+      }
+
+      val deletedDurableStateProbe = 
createTestProbe[DeletedDurableState[String]]()
+
+      val done =
+        query
+          .changesBySlices(entityType, slice, slice, NoOffset)
+          .via(killSwitch.flow)
+          .runWith(Sink.foreach {
+            case u: UpdatedDurableState[String] => 
updatedDurableStateProbe.ref.tell(u)
+            case u: DeletedDurableState[String] => 
deletedDurableStateProbe.ref.tell(u)
+          })
+      fishForState("s-19", updatedDurableStateProbe).last.revision shouldBe 19
+
+      persister ! DeleteWithAck(probe.ref)
+      probe.expectMessage(Done)
+      deletedDurableStateProbe.receiveMessage().revision shouldBe 20
+
+      for (i <- 21 to 40) {
+        persister ! PersistWithAck(s"s-$i", probe.ref)
+        probe.expectMessage(Done)
+      }
+      fishForState("s-40", updatedDurableStateProbe).last.revision shouldBe 40
+
+      persister ! DeleteWithAck(probe.ref)
+      probe.expectMessage(Done)
+      deletedDurableStateProbe.receiveMessage().revision shouldBe 41
+
+      killSwitch.shutdown()
+    }
+
   }
 
 }
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateStoreSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateStoreSpec.scala
index 7fb9841..98d63a1 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateStoreSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateStoreSpec.scala
@@ -8,7 +8,7 @@
  */
 
 /*
- * Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2021-2022 Lightbend Inc. <https://www.lightbend.com>
  */
 
 package org.apache.pekko.persistence.r2dbc.state
@@ -114,31 +114,79 @@ class DurableStateStoreSpec
       store.getObject(persistenceId).futureValue should 
be(GetObjectResult(None, 0L))
     }
 
-    "support deletions with revision" in {
+    "hard delete when revision=0" in {
       val entityType = nextEntityType()
       val persistenceId = PersistenceId(entityType, 
"to-be-added-and-removed").id
       val value = "Genuinely Collaborative"
       store.upsertObject(persistenceId, 1L, value, unusedTag).futureValue
       store.getObject(persistenceId).futureValue should 
be(GetObjectResult(Some(value), 1L))
-      store.deleteObject(persistenceId, 1L).futureValue
+      store.deleteObject(persistenceId, revision = 0).futureValue
       store.getObject(persistenceId).futureValue should 
be(GetObjectResult(None, 0L))
     }
 
-    "fail deleteObject call when revision is unknown" in {
+    "delete payload but keep revision" in {
       val entityType = nextEntityType()
       val persistenceId = PersistenceId(entityType, 
"to-be-added-and-removed").id
+      val value1 = "value1"
+      store.upsertObject(persistenceId, 1L, value1, unusedTag).futureValue
+      store.getObject(persistenceId).futureValue should 
be(GetObjectResult(Some(value1), 1L))
+      store.deleteObject(persistenceId, revision = 2L).futureValue
+      store.getObject(persistenceId).futureValue should 
be(GetObjectResult(None, 2L))
+
+      val value2 = "value2"
+      store.upsertObject(persistenceId, 3L, value2, unusedTag).futureValue
+      store.getObject(persistenceId).futureValue should 
be(GetObjectResult(Some(value2), 3L))
+    }
+
+    "update revision when deleting" in {
+      val entityType = nextEntityType()
+      val persistenceId = PersistenceId(entityType, "to-be-removed").id
+
+      store.deleteObject(persistenceId, revision = 1L).futureValue
+      store.getObject(persistenceId).futureValue should 
be(GetObjectResult(None, 1L))
+      store.deleteObject(persistenceId, revision = 2L).futureValue
+      store.getObject(persistenceId).futureValue should 
be(GetObjectResult(None, 2L))
+
+      val value1 = "value1"
+      store.upsertObject(persistenceId, 3L, value1, unusedTag).futureValue
+      store.getObject(persistenceId).futureValue should 
be(GetObjectResult(Some(value1), 3L))
+
+      store.deleteObject(persistenceId, revision = 4L).futureValue
+      store.getObject(persistenceId).futureValue should 
be(GetObjectResult(None, 4L))
+      store.deleteObject(persistenceId, revision = 5L).futureValue
+      store.getObject(persistenceId).futureValue should 
be(GetObjectResult(None, 5L))
+    }
+
+    "detect and reject concurrent delete of revision 1" in {
+      val entityType = nextEntityType()
+      val persistenceId = PersistenceId(entityType, 
"id-to-be-deleted-concurrently")
       val value = "Genuinely Collaborative"
-      store.upsertObject(persistenceId, 1L, value, unusedTag).futureValue
-      store.getObject(persistenceId).futureValue should 
be(GetObjectResult(Some(value), 1L))
-      if (pekko.Version.current.startsWith("1.0")) {
-        store.deleteObject(persistenceId, 2L).futureValue
-      } else {
-        val ex = intercept[Exception] {
-          Await.result(store.deleteObject(persistenceId, 2L), 20.seconds)
-        }
-        ex.getClass.getName shouldEqual 
DurableStateExceptionSupport.DeleteRevisionExceptionClass
-      }
-      store.getObject(persistenceId).futureValue should 
be(GetObjectResult(Some(value), 1L))
+      store.upsertObject(persistenceId.id, revision = 1L, value, 
entityType).futureValue
+      store.getObject(persistenceId.id).futureValue should 
be(GetObjectResult(Some(value), 1L))
+
+      val failure =
+        store.deleteObject(persistenceId.id, revision = 1L).failed.futureValue
+      failure.getMessage should include(
+        s"Insert delete marker with revision 1 failed: durable state for 
persistence id [${persistenceId.id}] already exists")
+    }
+
+    "detect and reject concurrent deletes" in {
+      val entityType = nextEntityType()
+      val persistenceId = PersistenceId(entityType, 
"id-to-be-updated-concurrently")
+      val value = "Genuinely Collaborative"
+      store.upsertObject(persistenceId.id, revision = 1L, value, 
entityType).futureValue
+      store.getObject(persistenceId.id).futureValue should 
be(GetObjectResult(Some(value), 1L))
+
+      val updatedValue = "Open to Feedback"
+      store.upsertObject(persistenceId.id, revision = 2L, updatedValue, 
entityType).futureValue
+      store.getObject(persistenceId.id).futureValue should 
be(GetObjectResult(Some(updatedValue), 2L))
+
+      // simulate a delete by a different node that didn't see the first one:
+      val updatedValue2 = "Genuine and Sincere in all Communications"
+      val failure =
+        store.deleteObject(persistenceId.id, revision = 2L).failed.futureValue
+      failure.getMessage should include(
+        s"Failed to delete object with persistenceId [${persistenceId.id}] and 
revision [2]")
     }
 
   }
diff --git a/docs/src/main/paradox/query.md b/docs/src/main/paradox/query.md
index e4ba5c9..a560c10 100644
--- a/docs/src/main/paradox/query.md
+++ b/docs/src/main/paradox/query.md
@@ -135,13 +135,17 @@ Java
 Scala
 :  @@snip 
[create](/docs/src/test/scala/docs/home/query/QueryDocCompileOnly.scala) { 
#currentChangesBySlices }
 
-The emitted `DurableStateChange` can be a `UpdatedDurableState` or 
`DeletedDurableState`, but `DeletedDurableState` is not implemented yet.
+The emitted `DurableStateChange` can be a `UpdatedDurableState` or 
`DeletedDurableState`.
 
 It will emit an `UpdatedDurableState` when the durable state is updated. When 
the state is updated again another
 `UpdatedDurableState` is emitted. It will always emit an `UpdatedDurableState` 
for the latest revision of the state,
 but there is no guarantee that all intermediate changes are emitted if the 
state is updated several times. Note that
 `UpdatedDurableState` contains the full current state, and it is not a delta 
from previous revision of state.
 
+It will emit an `DeletedDurableState` when the durable state is deleted. When 
the state is updated again a new
+`UpdatedDurableState` is emitted. There is no guarantee that all intermediate 
changes are emitted if the state is
+updated or deleted several times.
+
 `changesBySlices` should be used via @ref:[R2dbcProjection](projection.md), 
which will automatically handle the similar difficulties
 with duplicates as described for @ref[eventsBySlices](#eventsbyslices). When 
using `R2dbcProjection` the changes
 will be delivered in revision number order without duplicates.
diff --git 
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
 
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
index 20ae6bc..c85c50f 100644
--- 
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
+++ 
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
@@ -8,7 +8,7 @@
  */
 
 /*
- * Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2021-2022 Lightbend Inc. <https://www.lightbend.com>
  */
 
 package org.apache.pekko.projection.r2dbc.internal
@@ -30,6 +30,7 @@ import pekko.actor.typed.ActorSystem
 import pekko.actor.typed.scaladsl.LoggerOps
 import pekko.annotation.InternalApi
 import pekko.persistence.Persistence
+import pekko.persistence.query.DeletedDurableState
 import pekko.persistence.query.DurableStateChange
 import pekko.persistence.query.Offset
 import pekko.persistence.query.TimestampOffset
@@ -1021,8 +1022,16 @@ private[projection] class R2dbcOffsetStore(
             timestampOffset,
             strictSeqNr = false,
             envelopeLoaded = change.value != null))
+      case change: DeletedDurableState[_] if 
change.offset.isInstanceOf[TimestampOffset] =>
+        val timestampOffset = change.offset.asInstanceOf[TimestampOffset]
+        Some(
+          RecordWithOffset(
+            Record(change.persistenceId, change.revision, 
timestampOffset.timestamp),
+            timestampOffset,
+            strictSeqNr = false,
+            envelopeLoaded = true))
       case change: DurableStateChange[_] if 
change.offset.isInstanceOf[TimestampOffset] =>
-        // FIXME case DeletedDurableState when that is added
+        // in case additional types are added
         throw new IllegalArgumentException(
           s"DurableStateChange [${change.getClass.getName}] not implemented 
yet. Please report bug at 
https://github.com/apache/pekko-persistence-r2dbc/issues";)
       case _ => None
diff --git 
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala
 
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala
index 4a96fd0..0ee478d 100644
--- 
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala
+++ 
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala
@@ -8,7 +8,7 @@
  */
 
 /*
- * Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2021-2022 Lightbend Inc. <https://www.lightbend.com>
  */
 
 package org.apache.pekko.projection.r2dbc.internal
@@ -28,6 +28,7 @@ import pekko.actor.typed.ActorSystem
 import pekko.annotation.InternalApi
 import pekko.event.Logging
 import pekko.event.LoggingAdapter
+import pekko.persistence.query.DeletedDurableState
 import pekko.persistence.query.UpdatedDurableState
 import pekko.persistence.query.typed.EventEnvelope
 import pekko.persistence.query.typed.scaladsl.LoadEventQuery
@@ -122,7 +123,6 @@ private[projection] object R2dbcProjectionImpl {
 
       case upd: UpdatedDurableState[_] if upd.value == null =>
         val pid = upd.persistenceId
-        val revision = upd.revision
         (sourceProvider match {
           case store: DurableStateStore[_] =>
             store.getObject(pid)
@@ -146,10 +146,9 @@ private[projection] object R2dbcProjectionImpl {
                 count: java.lang.Long)
             new UpdatedDurableState(pid, loadedRevision, loadedValue, 
upd.offset, upd.timestamp)
               .asInstanceOf[Envelope]
-          case GetObjectResult(None, _) =>
-            // FIXME use DeletedDurableState here when that is added
-            throw new IllegalStateException(
-              s"Durable state not found when loaded lazily, persistenceId 
[$pid], revision [$revision]")
+          case GetObjectResult(None, loadedRevision) =>
+            new DeletedDurableState(pid, loadedRevision, upd.offset, 
upd.timestamp)
+              .asInstanceOf[Envelope]
         }
 
       case _ =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to