This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 7830a1cb10 fix: emitting DeletedDurableState for deleted objects 
(#2397)
7830a1cb10 is described below

commit 7830a1cb10f64ae2e4530b51b9648c654606e490
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun Oct 26 22:52:58 2025 +0800

    fix: emitting DeletedDurableState for deleted objects (#2397)
---
 .../PersistenceTestKitDurableStateStore.scala      | 10 ++++++---
 .../PersistenceTestKitDurableStateStoreSpec.scala  | 25 ++++++++++++++++++----
 2 files changed, 28 insertions(+), 7 deletions(-)

diff --git 
a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala
 
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala
index 0ced17de1e..414a9f1962 100644
--- 
a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala
+++ 
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala
@@ -83,9 +83,13 @@ class PersistenceTestKitDurableStateStore[A](val system: 
ExtendedActorSystem)
   override def deleteObject(persistenceId: String): Future[Done] = 
Future.successful(Done)
 
   override def deleteObject(persistenceId: String, revision: Long): 
Future[Done] = this.synchronized {
-    store = store.get(persistenceId) match {
-      case Some(record) => store + (persistenceId -> record.copy(value = None, 
revision = revision))
-      case None         => store
+    store.get(persistenceId) match {
+      case Some(record) =>
+        val globalOffset = lastGlobalOffset.incrementAndGet()
+        val updatedRecord = Record[A](globalOffset, persistenceId, revision, 
None, record.tag)
+        store = store + (persistenceId -> updatedRecord)
+        publisher ! updatedRecord
+      case None => // ignore
     }
 
     Future.successful(Done)
diff --git 
a/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreSpec.scala
 
b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreSpec.scala
index bc3b46a252..17eff3fe89 100644
--- 
a/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreSpec.scala
+++ 
b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreSpec.scala
@@ -17,15 +17,12 @@ import org.apache.pekko
 import pekko.actor.ExtendedActorSystem
 import pekko.actor.testkit.typed.scaladsl.LogCapturing
 import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
-import pekko.persistence.query.NoOffset
-import pekko.persistence.query.Sequence
-import pekko.persistence.query.UpdatedDurableState
+import pekko.persistence.query.{ DeletedDurableState, DurableStateChange, 
NoOffset, Sequence, UpdatedDurableState }
 import pekko.persistence.testkit.PersistenceTestKitDurableStateStorePlugin
 import pekko.stream.scaladsl.Sink
 import pekko.stream.testkit.scaladsl.TestSink
 
 import org.scalatest.wordspec.AnyWordSpecLike
-
 import com.typesafe.config.ConfigFactory
 
 object PersistenceTestKitDurableStateStoreSpec {
@@ -81,6 +78,26 @@ class PersistenceTestKitDurableStateStoreSpec
         .value should be >= 
(firstStateChange.offset.asInstanceOf[Sequence].value)
     }
 
+    "find tagged state changes for deleted object" in {
+      val stateStore = new 
PersistenceTestKitDurableStateStore[Record](classic.asInstanceOf[ExtendedActorSystem])
+      val record = Record(1, "name-1")
+      val tag = "tag-1"
+      val persistenceId = "record-1"
+      val testSink = stateStore.changes(tag, 
NoOffset).runWith(TestSink[DurableStateChange[Record]]())
+
+      stateStore.upsertObject(persistenceId, 1L, record, tag)
+      val updatedDurableState = 
testSink.request(1).expectNext().asInstanceOf[UpdatedDurableState[Record]]
+      updatedDurableState.persistenceId should be(persistenceId)
+      updatedDurableState.value should be(record)
+      updatedDurableState.revision should be(1L)
+
+      stateStore.deleteObject(persistenceId, 2L)
+      val deletedDurableState = 
testSink.request(1).expectNext().asInstanceOf[DeletedDurableState[Record]]
+      deletedDurableState.persistenceId should be(persistenceId)
+      deletedDurableState.revision should be(2L)
+      deletedDurableState.timestamp should be >= updatedDurableState.timestamp
+    }
+
     "find tagged current state changes ordered by upsert" in {
       val stateStore = new 
PersistenceTestKitDurableStateStore[Record](classic.asInstanceOf[ExtendedActorSystem])
       val record = Record(1, "name-1")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to