This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 7830a1cb10 fix: emitting DeletedDurableState for deleted objects
(#2397)
7830a1cb10 is described below
commit 7830a1cb10f64ae2e4530b51b9648c654606e490
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun Oct 26 22:52:58 2025 +0800
fix: emitting DeletedDurableState for deleted objects (#2397)
---
.../PersistenceTestKitDurableStateStore.scala | 10 ++++++---
.../PersistenceTestKitDurableStateStoreSpec.scala | 25 ++++++++++++++++++----
2 files changed, 28 insertions(+), 7 deletions(-)
diff --git
a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala
index 0ced17de1e..414a9f1962 100644
---
a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala
+++
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala
@@ -83,9 +83,13 @@ class PersistenceTestKitDurableStateStore[A](val system:
ExtendedActorSystem)
override def deleteObject(persistenceId: String): Future[Done] =
Future.successful(Done)
override def deleteObject(persistenceId: String, revision: Long):
Future[Done] = this.synchronized {
- store = store.get(persistenceId) match {
- case Some(record) => store + (persistenceId -> record.copy(value = None,
revision = revision))
- case None => store
+ store.get(persistenceId) match {
+ case Some(record) =>
+ val globalOffset = lastGlobalOffset.incrementAndGet()
+ val updatedRecord = Record[A](globalOffset, persistenceId, revision,
None, record.tag)
+ store = store + (persistenceId -> updatedRecord)
+ publisher ! updatedRecord
+ case None => // ignore
}
Future.successful(Done)
diff --git
a/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreSpec.scala
b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreSpec.scala
index bc3b46a252..17eff3fe89 100644
---
a/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreSpec.scala
+++
b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreSpec.scala
@@ -17,15 +17,12 @@ import org.apache.pekko
import pekko.actor.ExtendedActorSystem
import pekko.actor.testkit.typed.scaladsl.LogCapturing
import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
-import pekko.persistence.query.NoOffset
-import pekko.persistence.query.Sequence
-import pekko.persistence.query.UpdatedDurableState
+import pekko.persistence.query.{ DeletedDurableState, DurableStateChange,
NoOffset, Sequence, UpdatedDurableState }
import pekko.persistence.testkit.PersistenceTestKitDurableStateStorePlugin
import pekko.stream.scaladsl.Sink
import pekko.stream.testkit.scaladsl.TestSink
import org.scalatest.wordspec.AnyWordSpecLike
-
import com.typesafe.config.ConfigFactory
object PersistenceTestKitDurableStateStoreSpec {
@@ -81,6 +78,26 @@ class PersistenceTestKitDurableStateStoreSpec
.value should be >=
(firstStateChange.offset.asInstanceOf[Sequence].value)
}
+ "find tagged state changes for deleted object" in {
+ val stateStore = new
PersistenceTestKitDurableStateStore[Record](classic.asInstanceOf[ExtendedActorSystem])
+ val record = Record(1, "name-1")
+ val tag = "tag-1"
+ val persistenceId = "record-1"
+ val testSink = stateStore.changes(tag,
NoOffset).runWith(TestSink[DurableStateChange[Record]]())
+
+ stateStore.upsertObject(persistenceId, 1L, record, tag)
+ val updatedDurableState =
testSink.request(1).expectNext().asInstanceOf[UpdatedDurableState[Record]]
+ updatedDurableState.persistenceId should be(persistenceId)
+ updatedDurableState.value should be(record)
+ updatedDurableState.revision should be(1L)
+
+ stateStore.deleteObject(persistenceId, 2L)
+ val deletedDurableState =
testSink.request(1).expectNext().asInstanceOf[DeletedDurableState[Record]]
+ deletedDurableState.persistenceId should be(persistenceId)
+ deletedDurableState.revision should be(2L)
+ deletedDurableState.timestamp should be >= updatedDurableState.timestamp
+ }
+
"find tagged current state changes ordered by upsert" in {
val stateStore = new
PersistenceTestKitDurableStateStore[Record](classic.asInstanceOf[ExtendedActorSystem])
val record = Record(1, "name-1")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]