This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch 1.2.x
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-r2dbc.git
The following commit(s) were added to refs/heads/1.2.x by this push:
new 3e3af51 fix incorrect impl of Durable State deleteObject(id,
revision) (#384)
3e3af51 is described below
commit 3e3af5101c463912a9b4e2b3e1b170b97379bb5e
Author: PJ Fanning <[email protected]>
AuthorDate: Fri May 29 10:42:14 2026 +0100
fix incorrect impl of Durable State deleteObject(id, revision) (#384)
* Update deleteObject scaladoc and remove DurableStateExceptionSupport
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/22d6fde4-d342-41d9-8e97-bbbeed4becc5
Co-authored-by: pjfanning <[email protected]>
* Update DurableStateStoreSpec.scala
* Fix deleteObject revision check: bind revision-1 to SQL and handle
revision==0 as full delete
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/6d2196f1-d07f-46b0-82e2-e80cb16061ee
Co-authored-by: pjfanning <[email protected]>
* add tck tests
* Fix TCK upsert-after-deletion: use soft-delete and handle null payload in
getObject
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/2dd45545-cf53-4b5d-8f5d-69b28026e189
Co-authored-by: pjfanning <[email protected]>
* Update R2dbcDurableStateStore.scala
* Fix delete/upsert: restore hard-delete SQL, fallback INSERT in writeState
for deleted state
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
.../r2dbc/state/scaladsl/DurableStateDao.scala | 96 +++++----
.../scaladsl/DurableStateExceptionSupport.scala | 48 -----
.../state/scaladsl/R2dbcDurableStateStore.scala | 36 ++--
.../r2dbc/state/DurableStateStoreSpec.scala | 12 +-
.../r2dbc/state/DurableStateStoreTCKSpec.scala | 214 +++++++++++++++++++++
.../state/R2dbcDurableStateStoreTCKSpec.scala | 34 ++++
6 files changed, 337 insertions(+), 103 deletions(-)
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
index 2c5d454..22a9dc4 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
@@ -225,30 +225,46 @@ private[r2dbc] class DurableStateDao(settings:
StateSettings, connectionFactory:
stmt.bind(i, state.tags.toArray)
}
- val result = {
- if (state.revision == 1) {
- r2dbcExecutor
- .updateOne(s"insert [${state.persistenceId}]") { connection =>
- val stmt = connection
- .createStatement(insertStateSql)
- .bind(0, slice)
- .bind(1, entityType)
- .bind(2, state.persistenceId)
- .bind(3, state.revision)
- .bind(4, state.serId)
- .bind(5, state.serManifest)
- .bind(6, state.payload)
- bindTags(stmt, 7)
- }
- .recoverWith { case _: R2dbcDataIntegrityViolationException =>
- Future.failed(
- new IllegalStateException(
- s"Insert failed: durable state for persistence id
[${state.persistenceId}] already exists"))
+ def doInsert(): Future[Done] = {
+ r2dbcExecutor
+ .updateOne(s"insert [${state.persistenceId}]") { connection =>
+ val stmt = connection
+ .createStatement(insertStateSql)
+ .bind(0, slice)
+ .bind(1, entityType)
+ .bind(2, state.persistenceId)
+ .bind(3, state.revision)
+ .bind(4, state.serId)
+ .bind(5, state.serManifest)
+ .bind(6, state.payload)
+ bindTags(stmt, 7)
+ }
+ .recoverWith { case _: R2dbcDataIntegrityViolationException =>
+ Future.failed(
+ new IllegalStateException(
+ s"Insert failed: durable state for persistence id
[${state.persistenceId}] already exists"))
+ }
+ .map { updatedRows =>
+ if (updatedRows != 1)
+ throw new IllegalStateException(
+ s"Insert failed: durable state for persistence id
[${state.persistenceId}] could not be inserted at revision [${state.revision}]")
+ else {
+ log.debug(
+ "Inserted durable state for persistenceId [{}] at revision [{}]",
+ state.persistenceId,
+ state.revision)
+ Done
}
- } else {
- val previousRevision = state.revision - 1
+ }
+ }
- r2dbcExecutor.updateOne(s"update [${state.persistenceId}]") {
connection =>
+ if (state.revision == 1) {
+ doInsert()
+ } else {
+ val previousRevision = state.revision - 1
+
+ r2dbcExecutor
+ .updateOne(s"update [${state.persistenceId}]") { connection =>
val stmt = connection
.createStatement(updateStateSql)
.bind(0, state.revision)
@@ -277,17 +293,27 @@ private[r2dbc] class DurableStateDao(settings:
StateSettings, connectionFactory:
stmt
}
}
- }
- }
-
- result.map { updatedRows =>
- if (updatedRows != 1)
- throw new IllegalStateException(
- s"Update failed: durable state for persistence id
[${state.persistenceId}] could not be updated to revision [${state.revision}]")
- else {
- log.debug("Updated durable state for persistenceId [{}] to revision
[{}]", state.persistenceId, state.revision)
- Done
- }
+ .flatMap { updatedRows =>
+ if (updatedRows == 1) {
+ log.debug(
+ "Updated durable state for persistenceId [{}] to revision [{}]",
+ state.persistenceId,
+ state.revision)
+ Future.successful(Done)
+ } else {
+ // The UPDATE matched no row. Check whether the state was deleted
(no row exists)
+ // or whether this is a revision mismatch (a row exists with a
different revision).
+ readState(state.persistenceId).flatMap {
+ case None =>
+ // State was previously hard-deleted; insert a fresh row at
the requested revision.
+ doInsert()
+ case Some(_) =>
+ Future.failed(
+ new IllegalStateException(
+ s"Update failed: durable state for persistence id
[${state.persistenceId}] could not be updated to revision [${state.revision}]"))
+ }
+ }
+ }
}
}
@@ -307,7 +333,7 @@ private[r2dbc] class DurableStateDao(settings:
StateSettings, connectionFactory:
/**
* @param persistenceId The persistence id for the object
- * @param revision The revision to delete
+ * @param revision The next revision (current stored revision + 1) - deletes
the row where stored revision equals revision - 1
* @return The number of rows deleted
* @since 1.1.0
*/
@@ -317,7 +343,7 @@ private[r2dbc] class DurableStateDao(settings:
StateSettings, connectionFactory:
connection
.createStatement(deleteStateWithRevisionSql)
.bind(0, persistenceId)
- .bind(1, revision)
+ .bind(1, revision - 1)
}
if (log.isDebugEnabled())
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateExceptionSupport.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateExceptionSupport.scala
deleted file mode 100644
index fb4f5e9..0000000
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateExceptionSupport.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.pekko.persistence.r2dbc.state.scaladsl
-
-import java.lang.invoke.{ MethodHandles, MethodType }
-
-import scala.util.Try
-
-/**
- * INTERNAL API
- *
- * Support for creating a `DeleteRevisionException`if the class is
- * available on the classpath. Pekko 1.0 does not have this class, but
- * it is added in Pekko 1.1.
- */
-private[state] object DurableStateExceptionSupport {
- val DeleteRevisionExceptionClass =
- "org.apache.pekko.persistence.state.exception.DeleteRevisionException"
-
- private def exceptionClassOpt: Option[Class[_]] =
- Try(Class.forName(DeleteRevisionExceptionClass)).toOption
-
- private val constructorOpt = exceptionClassOpt.map { clz =>
- val mt = MethodType.methodType(classOf[Unit], classOf[String])
- MethodHandles.publicLookup().findConstructor(clz, mt)
- }
-
- def createDeleteRevisionExceptionIfSupported(message: String):
Option[Exception] =
- constructorOpt.map { constructor =>
- constructor.invoke(message).asInstanceOf[Exception]
- }
-
-}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala
index ab30892..a8bb81c 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala
@@ -110,22 +110,30 @@ class R2dbcDurableStateStore[A](system:
ExtendedActorSystem, config: Config, cfg
override def deleteObject(persistenceId: String): Future[Done] =
stateDao.deleteState(persistenceId)
+ /**
+ * Delete the value, which will fail with `IllegalStateException` if the
existing stored `revision` + 1 isn't equal to
+ * the given `revision`. The row is removed from the database and the next
call to [[getObject]] will return
+ * `GetObjectResult(None, 0L)`. A subsequent [[upsertObject]] at `revision +
1` will succeed by inserting a new row.
+ *
+ * If the given revision is `0` it will fully delete the value and revision
from the database without any optimistic
+ * locking check. Next call to [[getObject]] will then return revision 0 and
no value.
+ */
override def deleteObject(persistenceId: String, revision: Long):
Future[Done] = {
- stateDao.deleteStateForRevision(persistenceId, revision).map { count =>
- if (count != 1) {
- // if you run this code with Pekko 1.0.x, no exception will be thrown
here
- // this matches the behavior of pekko-connectors-jdbc 1.0.x
- // if you run this code with Pekko 1.1.x, a DeleteRevisionException
will be thrown here
- val msg = if (count == 0) {
- s"Failed to delete object with persistenceId [$persistenceId] and
revision [$revision]"
- } else {
- s"Delete object succeeded for persistenceId [$persistenceId] and
revision [$revision] but more than one row was affected ($count rows)"
+ if (revision == 0) {
+ stateDao.deleteState(persistenceId)
+ } else {
+ stateDao.deleteStateForRevision(persistenceId, revision).map { count =>
+ if (count != 1) {
+ val msg = if (count == 0) {
+ s"Failed to delete object with persistenceId [$persistenceId] and
revision [$revision]"
+ } else {
+ s"Delete object succeeded for persistenceId [$persistenceId] and
revision [$revision] but more than one row was affected ($count rows)"
+ }
+ throw new IllegalStateException(msg)
}
-
DurableStateExceptionSupport.createDeleteRevisionExceptionIfSupported(msg)
- .foreach(throw _)
- }
- Done
- }(ExecutionContexts.parasitic)
+ Done
+ }(ExecutionContexts.parasitic)
+ }
}
override def sliceForPersistenceId(persistenceId: String): Int =
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateStoreSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateStoreSpec.scala
index 7fb9841..74b47a8 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateStoreSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateStoreSpec.scala
@@ -17,7 +17,7 @@ import org.apache.pekko
import pekko.actor.testkit.typed.scaladsl.{ LogCapturing,
ScalaTestWithActorTestKit }
import pekko.actor.typed.ActorSystem
import pekko.persistence.r2dbc.{ TestConfig, TestData, TestDbLifecycle }
-import pekko.persistence.r2dbc.state.scaladsl.{ DurableStateExceptionSupport,
R2dbcDurableStateStore }
+import pekko.persistence.r2dbc.state.scaladsl.R2dbcDurableStateStore
import pekko.persistence.state.DurableStateStoreRegistry
import pekko.persistence.state.scaladsl.GetObjectResult
import pekko.persistence.typed.PersistenceId
@@ -120,23 +120,23 @@ class DurableStateStoreSpec
val value = "Genuinely Collaborative"
store.upsertObject(persistenceId, 1L, value, unusedTag).futureValue
store.getObject(persistenceId).futureValue should
be(GetObjectResult(Some(value), 1L))
- store.deleteObject(persistenceId, 1L).futureValue
+ store.deleteObject(persistenceId, 2L).futureValue
store.getObject(persistenceId).futureValue should
be(GetObjectResult(None, 0L))
}
- "fail deleteObject call when revision is unknown" in {
+ "fail deleteObject call when revision is wrong" in {
val entityType = nextEntityType()
val persistenceId = PersistenceId(entityType,
"to-be-added-and-removed").id
val value = "Genuinely Collaborative"
store.upsertObject(persistenceId, 1L, value, unusedTag).futureValue
store.getObject(persistenceId).futureValue should
be(GetObjectResult(Some(value), 1L))
if (pekko.Version.current.startsWith("1.0")) {
- store.deleteObject(persistenceId, 2L).futureValue
+ store.deleteObject(persistenceId, 1L).futureValue
} else {
val ex = intercept[Exception] {
- Await.result(store.deleteObject(persistenceId, 2L), 20.seconds)
+ Await.result(store.deleteObject(persistenceId, 1L), 20.seconds)
}
- ex.getClass.getName shouldEqual
DurableStateExceptionSupport.DeleteRevisionExceptionClass
+ ex.getClass shouldEqual classOf[IllegalStateException]
}
store.getObject(persistenceId).futureValue should
be(GetObjectResult(Some(value), 1L))
}
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateStoreTCKSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateStoreTCKSpec.scala
new file mode 100644
index 0000000..67c43a5
--- /dev/null
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateStoreTCKSpec.scala
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc.state
+
+import scala.annotation.nowarn
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import org.apache.pekko
+import pekko.actor.ActorSystem
+import pekko.persistence._
+import pekko.persistence.scalatest.{ MayVerb, OptionalTests }
+import pekko.persistence.state.DurableStateStoreRegistry
+import pekko.persistence.state.scaladsl.DurableStateUpdateStore
+import pekko.testkit.TestProbe
+
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+
+object DurableStateStoreTCKSpec {
+ val config: Config = ConfigFactory.parseString(s"""
+ pekko.actor {
+ serializers {
+ durable-state-tck-test = "${classOf[TestSerializer].getName}"
+ }
+ serialization-bindings {
+ "${classOf[TestPayload].getName}" = durable-state-tck-test
+ }
+ }
+ """)
+}
+
+/**
+ * This spec aims to verify custom pekko-persistence [[DurableStateStore]]
implementations.
+ * Plugin authors are highly encouraged to include it in their plugin's test
suites.
+ *
+ * In case your durable state store plugin needs some kind of setup or
teardown, override the `beforeAll`
+ * or `afterAll` methods (don't forget to call `super` in your overridden
methods).
+ *
+ * This is a copy of the TCK spec added in Pekko 2.0.0.
+ *
https://github.com/apache/pekko/blob/4f20b4736980a5d57bc58e1356fea4dede87a7cd/persistence-tck/src/main/scala/org/apache/pekko/persistence/state/DurableStateStoreSpec.scala
+ */
+abstract class DurableStateStoreTCKSpec(config: Config)
+ extends PluginSpec(config)
+ with MayVerb
+ with OptionalTests {
+
+ implicit lazy val system: ActorSystem =
+ ActorSystem("DurableStateStoreTCKSpec",
config.withFallback(DurableStateStoreTCKSpec.config))
+
+ protected def supportsDeleteWithRevisionCheck: CapabilityFlag =
CapabilityFlag.off()
+
+ protected def supportsUpsertWithRevisionCheck: CapabilityFlag =
CapabilityFlag.off()
+
+ protected def supportsSerialization: CapabilityFlag = CapabilityFlag.on()
+
+ protected def supportsSoftDelete: CapabilityFlag = CapabilityFlag.off()
+
+ override protected def beforeEach(): Unit = {
+ super.beforeEach()
+ preparePersistenceId(pid)
+ }
+
+ /**
+ * Overridable hook that is called before each test case.
+ * `pid` is the `persistenceId` that will be used in the test.
+ * This method may be needed to clean any pre-existing state from the store,
+ * for example when running against a shared external database.
+ */
+ def preparePersistenceId(@nowarn("msg=never used") pid: String): Unit = ()
+
+ /**
+ * Returns the `DurableStateUpdateStore` under test. By default, this uses
the plugin
+ * configured under `pekko.persistence.state.plugin` in the provided config.
+ */
+ def durableStateStore(): DurableStateUpdateStore[Any] =
+
DurableStateStoreRegistry(system).durableStateStoreFor[DurableStateUpdateStore[Any]]("")
+
+ protected val timeout: FiniteDuration = 5.seconds
+
+ "A durable state store" must {
+ "not find a non-existing object" in {
+ val result = Await.result(durableStateStore().getObject(pid), timeout)
+ result.value shouldBe None
+ }
+
+ "persist a state and retrieve it" in {
+ val value = s"state-${pid}"
+ Await.result(durableStateStore().upsertObject(pid, 1L, value,
"test-tag"), timeout)
+ val result = Await.result(durableStateStore().getObject(pid), timeout)
+ result.value shouldBe Some(value)
+ result.revision shouldBe 1L
+ }
+
+ "update a state" in {
+ val store = durableStateStore()
+ val value1 = s"state-1-${pid}"
+ val value2 = s"state-2-${pid}"
+ Await.result(store.upsertObject(pid, 1L, value1, "test-tag"), timeout)
+ Await.result(store.upsertObject(pid, 2L, value2, "test-tag"), timeout)
+ val result = Await.result(store.getObject(pid), timeout)
+ result.value shouldBe Some(value2)
+ result.revision shouldBe 2L
+ }
+
+ "delete a state" in {
+ val store = durableStateStore()
+ val value = s"state-${pid}"
+ Await.result(store.upsertObject(pid, 1L, value, "test-tag"), timeout)
+ Await.result(store.deleteObject(pid, 2L), timeout)
+ val result = Await.result(store.getObject(pid), timeout)
+ result.value shouldBe None
+ }
+
+ "handle different persistence IDs independently" in {
+ val store = durableStateStore()
+ val pid2 = pid + "-2"
+ val value1 = s"state-${pid}"
+ val value2 = s"state-${pid2}"
+ Await.result(store.upsertObject(pid, 1L, value1, "test-tag"), timeout)
+ Await.result(store.upsertObject(pid2, 1L, value2, "test-tag"), timeout)
+
+ val result1 = Await.result(store.getObject(pid), timeout)
+ val result2 = Await.result(store.getObject(pid2), timeout)
+
+ result1.value shouldBe Some(value1)
+ result2.value shouldBe Some(value2)
+ }
+
+ "upsert again after a deletion" in {
+ val store = durableStateStore()
+ val original = s"state-${pid}"
+ val recreated = s"state-${pid}-v2"
+ Await.result(store.upsertObject(pid, 1L, original, "test-tag"), timeout)
+ Await.result(store.deleteObject(pid, 2L), timeout)
+ Await.result(store.upsertObject(pid, 3L, recreated, "test-tag"), timeout)
+ val result = Await.result(store.getObject(pid), timeout)
+ result.value shouldBe Some(recreated)
+ result.revision shouldBe 3L
+ }
+ }
+
+ "A durable state store optionally".may {
+ optional(flag = supportsDeleteWithRevisionCheck) {
+ "fail to delete a state when the revision does not match" in {
+ val store = durableStateStore()
+ val value = s"state-${pid}"
+ Await.result(store.upsertObject(pid, 1L, value, "test-tag"), timeout)
+ val deleteResult = store.deleteObject(pid, 99L)
+ intercept[Exception] {
+ Await.result(deleteResult, timeout)
+ }
+ // The original state should still be accessible
+ val result = Await.result(store.getObject(pid), timeout)
+ result.value shouldBe Some(value)
+ result.revision shouldBe 1L
+ }
+ }
+
+ optional(flag = supportsUpsertWithRevisionCheck) {
+ "fail to upsert a state when the revision is stale" in {
+ val store = durableStateStore()
+ val original = s"state-${pid}"
+ val stale = s"state-${pid}-stale"
+ Await.result(store.upsertObject(pid, 1L, original, "test-tag"),
timeout)
+ // Re-using revision 1 should be rejected; the next valid revision is
2.
+ val staleUpsert = store.upsertObject(pid, 1L, stale, "test-tag")
+ intercept[Exception] {
+ Await.result(staleUpsert, timeout)
+ }
+ // The original state should still be accessible
+ val result = Await.result(store.getObject(pid), timeout)
+ result.value shouldBe Some(original)
+ result.revision shouldBe 1L
+ }
+ }
+
+ optional(flag = supportsSerialization) {
+ "serialize and deserialize values via the configured serializer" in {
+ val store = durableStateStore()
+ val probe = TestProbe()
+ val value = TestPayload(probe.ref)
+ Await.result(store.upsertObject(pid, 1L, value, "test-tag"), timeout)
+ val result = Await.result(store.getObject(pid), timeout)
+ result.value shouldBe Some(value)
+ result.revision shouldBe 1L
+ }
+ }
+
+ optional(flag = supportsSoftDelete) {
+ "delete a state via the deprecated deleteObject overload" in {
+ val store = durableStateStore()
+ val value = s"state-${pid}"
+ Await.result(store.upsertObject(pid, 1L, value, "test-tag"), timeout)
+ @nowarn("cat=deprecation")
+ val deleteResult = store.deleteObject(pid)
+ Await.result(deleteResult, timeout)
+ val result = Await.result(store.getObject(pid), timeout)
+ result.value shouldBe None
+ }
+ }
+ }
+}
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/R2dbcDurableStateStoreTCKSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/R2dbcDurableStateStoreTCKSpec.scala
new file mode 100644
index 0000000..4c99c6a
--- /dev/null
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/R2dbcDurableStateStoreTCKSpec.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.persistence.r2dbc.state
+
+import org.apache.pekko
+import pekko.actor.typed.ActorSystem
+import pekko.actor.typed.scaladsl.adapter._
+import pekko.persistence.CapabilityFlag
+import pekko.persistence.r2dbc.TestConfig
+import pekko.persistence.r2dbc.TestDbLifecycle
+
+class R2dbcDurableStateStoreTCKSpec
+ extends DurableStateStoreTCKSpec(TestConfig.config)
+ with TestDbLifecycle {
+
+ override def typedSystem: ActorSystem[_] = system.toTyped
+
+ override protected def supportsDeleteWithRevisionCheck: CapabilityFlag =
CapabilityFlag.on()
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]