This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-jdbc.git
The following commit(s) were added to refs/heads/main by this push:
new 08db474 take revision value into account when deleting DurableState
(#156)
08db474 is described below
commit 08db4743fe842acd611e627e595b35624557311c
Author: PJ Fanning <[email protected]>
AuthorDate: Thu Apr 25 22:02:20 2024 +0200
take revision value into account when deleting DurableState (#156)
* take revision value into account when deleting
add test
add tests
scalafmt
update test
Update DurableStateQueries.scala
scalafmt
* temporarily throw DurableStateStoreException
* Update JdbcDurableStateStore.scala
* rework exceptions
* Update configuration.md
* Update configuration.md
* Update JdbcDurableStateSpec.scala
* uptake https://github.com/apache/pekko/pull/1271
* scalafmt
* Update JdbcDurableStateSpec.scala
* try to fix test
* Update DurableStateExceptionSupport.scala
* refactor
* use parasitic context
* apply review suggestions
* review change
* Update JdbcDurableStateStore.scala
* revert doc change
* Update JdbcDurableStateStore.scala
---
.../jdbc/state/DurableStateQueries.scala | 11 +++++
.../scaladsl/DurableStateExceptionSupport.scala | 48 ++++++++++++++++++++++
.../state/scaladsl/JdbcDurableStateStore.scala | 36 +++++++++++-----
.../jdbc/state/scaladsl/JdbcDurableStateSpec.scala | 41 ++++++++++++++++++
4 files changed, 125 insertions(+), 11 deletions(-)
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/DurableStateQueries.scala
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/DurableStateQueries.scala
index 3c39ce3..7b433fb 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/DurableStateQueries.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/DurableStateQueries.scala
@@ -17,6 +17,7 @@ package org.apache.pekko.persistence.jdbc.state
import org.apache.pekko
import pekko.annotation.InternalApi
import pekko.persistence.jdbc.config.DurableStateTableConfiguration
+
import slick.jdbc.{ H2Profile, JdbcProfile, OracleProfile, PostgresProfile,
SQLServerProfile, SetParameter }
/**
@@ -92,6 +93,16 @@ import slick.jdbc.{ H2Profile, JdbcProfile, OracleProfile,
PostgresProfile, SQLS
durableStateTable.filter(_.persistenceId === persistenceId).delete
}
+ /**
+ * Deletes a particular revision of an object based on its persistenceId.
+ * This revision may no longer exist and if so, no delete will occur.
+ *
+ * @since 1.1.0
+ */
+ private[jdbc] def deleteBasedOnPersistenceIdAndRevision(persistenceId:
String, revision: Long) = {
+ selectFromDbByPersistenceId(persistenceId).filter(_.revision ===
revision).delete
+ }
+
def deleteAllFromDb() = {
durableStateTable.delete
}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/DurableStateExceptionSupport.scala
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/DurableStateExceptionSupport.scala
new file mode 100644
index 0000000..88e1931
--- /dev/null
+++
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/DurableStateExceptionSupport.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.persistence.jdbc.state.scaladsl
+
+import java.lang.invoke.{ MethodHandles, MethodType }
+
+import scala.util.Try
+
+/**
+ * INTERNAL API
+ *
+ * Support for creating a `DeleteRevisionException`if the class is
+ * available on the classpath. Pekko 1.0 does not have this class, but
+ * it is added in Pekko 1.1.
+ */
+private[scaladsl] object DurableStateExceptionSupport {
+ val DeleteRevisionExceptionClass =
+ "org.apache.pekko.persistence.state.exception.DeleteRevisionException"
+
+ private def exceptionClassOpt: Option[Class[_]] =
+ Try(Class.forName(DeleteRevisionExceptionClass)).toOption
+
+ private val constructorOpt = exceptionClassOpt.map { clz =>
+ val mt = MethodType.methodType(classOf[Unit], classOf[String])
+ MethodHandles.publicLookup().findConstructor(clz, mt)
+ }
+
+ def createDeleteRevisionExceptionIfSupported(message: String):
Option[Exception] =
+ constructorOpt.map { constructor =>
+ constructor.invoke(message).asInstanceOf[Exception]
+ }
+
+}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala
index e0795e4..b1acbf1 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala
@@ -22,23 +22,23 @@ import slick.jdbc.{ JdbcBackend, JdbcProfile }
import org.apache.pekko
import pekko.{ Done, NotUsed }
import pekko.actor.ExtendedActorSystem
+import pekko.annotation.ApiMayChange
+import pekko.dispatch.ExecutionContexts
import pekko.pattern.ask
import pekko.persistence.jdbc.PekkoSerialization
import pekko.persistence.jdbc.state.DurableStateQueries
import pekko.persistence.jdbc.config.DurableStateTableConfiguration
import pekko.persistence.jdbc.state.{ DurableStateTables, OffsetSyntax }
-import pekko.persistence.query.{ DurableStateChange, Offset }
import pekko.persistence.jdbc.journal.dao.FlowControl
import pekko.persistence.jdbc.state.{ scaladsl => jdbcStateScalaDsl }
-import pekko.persistence.state.{ scaladsl => stateScalaDsl }
+import pekko.persistence.query.{ DurableStateChange, Offset,
UpdatedDurableState }
import pekko.persistence.query.{ scaladsl => queryScalaDsl }
+import pekko.persistence.state.{ scaladsl => stateScalaDsl }
import pekko.serialization.Serialization
import pekko.stream.scaladsl.{ Sink, Source }
import pekko.stream.{ Materializer, SystemMaterializer }
import pekko.util.Timeout
import OffsetSyntax._
-import pekko.annotation.ApiMayChange
-import pekko.persistence.query.UpdatedDurableState
object JdbcDurableStateStore {
val Identifier = "jdbc-durable-state-store"
@@ -70,7 +70,7 @@ class JdbcDurableStateStore[A](
durableStateConfig.stateSequenceConfig),
s"pekko-persistence-jdbc-durable-state-sequence-actor")
- def getObject(persistenceId: String):
Future[stateScalaDsl.GetObjectResult[A]] = {
+ override def getObject(persistenceId: String):
Future[stateScalaDsl.GetObjectResult[A]] = {
db.run(queries.selectFromDbByPersistenceId(persistenceId).result).map {
rows =>
rows.headOption match {
case Some(row) =>
@@ -84,7 +84,7 @@ class JdbcDurableStateStore[A](
}
}
- def upsertObject(persistenceId: String, revision: Long, value: A, tag:
String): Future[Done] = {
+ override def upsertObject(persistenceId: String, revision: Long, value: A,
tag: String): Future[Done] = {
require(revision > 0)
val row =
PekkoSerialization.serialize(serialization, value).map { serialized =>
@@ -113,13 +113,27 @@ class JdbcDurableStateStore[A](
}
}
- def deleteObject(persistenceId: String): Future[Done] =
+ override def deleteObject(persistenceId: String): Future[Done] =
db.run(queries.deleteFromDb(persistenceId).map(_ => Done))
- def deleteObject(persistenceId: String, revision: Long): Future[Done] =
- db.run(queries.deleteFromDb(persistenceId).map(_ => Done))
+ override def deleteObject(persistenceId: String, revision: Long):
Future[Done] =
+ db.run(queries.deleteBasedOnPersistenceIdAndRevision(persistenceId,
revision)).map { count =>
+ if (count != 1) {
+ // if you run this code with Pekko 1.0.x, no exception will be thrown
here
+ // this matches the behavior of pekko-connectors-jdbc 1.0.x
+ // if you run this code with Pekko 1.1.x, a DeleteRevisionException
will be thrown here
+ val msg = if (count == 0) {
+ s"Failed to delete object with persistenceId [$persistenceId] and
revision [$revision]"
+ } else {
+ s"Delete object succeeded for persistenceId [$persistenceId] and
revision [$revision] but more than one row was affected ($count rows)"
+ }
+
DurableStateExceptionSupport.createDeleteRevisionExceptionIfSupported(msg)
+ .foreach(throw _)
+ }
+ Done
+ }(ExecutionContexts.parasitic)
- def currentChanges(tag: String, offset: Offset):
Source[DurableStateChange[A], NotUsed] = {
+ override def currentChanges(tag: String, offset: Offset):
Source[DurableStateChange[A], NotUsed] = {
Source
.futureSource(maxStateStoreOffset().map { maxOrderingInDb =>
changesByTag(tag, offset.value, terminateAfterOffset =
Some(maxOrderingInDb))
@@ -127,7 +141,7 @@ class JdbcDurableStateStore[A](
.mapMaterializedValue(_ => NotUsed)
}
- def changes(tag: String, offset: Offset): Source[DurableStateChange[A],
NotUsed] =
+ override def changes(tag: String, offset: Offset):
Source[DurableStateChange[A], NotUsed] =
changesByTag(tag, offset.value, terminateAfterOffset = None)
private def currentChangesByTag(
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateSpec.scala
index 84192ce..01681f4 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateSpec.scala
@@ -120,6 +120,47 @@ abstract class JdbcDurableStateSpec(config: Config,
schemaType: SchemaType) exte
}
}
}
+ "fail to delete old object revision" in {
+ val f = for {
+ n <- stateStoreString.upsertObject("p987", 1, "a valid string", "t123")
+ _ = n shouldBe pekko.Done
+ g <- stateStoreString.getObject("p987")
+ _ = g.value shouldBe Some("a valid string")
+ u <- stateStoreString.upsertObject("p987", 2, "updated valid string",
"t123")
+ _ = u shouldBe pekko.Done
+ d <- stateStoreString.deleteObject("p987", 1)
+ } yield d
+ if (pekko.Version.current.startsWith("1.0")) {
+ whenReady(f) { v =>
+ v shouldBe pekko.Done
+ }
+ } else {
+ whenReady(f.failed) { e =>
+ e.getClass.getName shouldEqual
DurableStateExceptionSupport.DeleteRevisionExceptionClass
+ e.getMessage should include("Failed to delete object with
persistenceId [p987] and revision [1]")
+ }
+ }
+ }
+ "delete latest object revision but not older one" in {
+ whenReady {
+ for {
+
+ n <- stateStoreString.upsertObject("p9876", 1, "a valid string",
"t123")
+ _ = n shouldBe pekko.Done
+ g <- stateStoreString.getObject("p9876")
+ _ = g.value shouldBe Some("a valid string")
+ u <- stateStoreString.upsertObject("p9876", 2, "updated valid
string", "t123")
+ _ = u shouldBe pekko.Done
+ d <- stateStoreString.deleteObject("p9876", 2)
+ _ = d shouldBe pekko.Done
+ h <- stateStoreString.getObject("p9876")
+
+ } yield h
+ } { v =>
+ // current behavior is that deleting the latest revision means
getObject returns None (we don't preserve older revisions)
+ v.value shouldBe None
+ }
+ }
}
"A durable state store with payload that needs custom serializer" must
withActorSystem { implicit system =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]