This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-jdbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 08db474  take revision value into account when deleting DurableState 
(#156)
08db474 is described below

commit 08db4743fe842acd611e627e595b35624557311c
Author: PJ Fanning <[email protected]>
AuthorDate: Thu Apr 25 22:02:20 2024 +0200

    take revision value into account when deleting DurableState (#156)
    
    * take revision value into account when deleting
    
    add test
    
    add tests
    
    scalafmt
    
    update test
    
    Update DurableStateQueries.scala
    
    scalafmt
    
    * temporarily throw DurableStateStoreException
    
    * Update JdbcDurableStateStore.scala
    
    * rework exceptions
    
    * Update configuration.md
    
    * Update configuration.md
    
    * Update JdbcDurableStateSpec.scala
    
    * uptake https://github.com/apache/pekko/pull/1271
    
    * scalafmt
    
    * Update JdbcDurableStateSpec.scala
    
    * try to fix test
    
    * Update DurableStateExceptionSupport.scala
    
    * refactor
    
    * use parasitic context
    
    * apply review suggestions
    
    * review change
    
    * Update JdbcDurableStateStore.scala
    
    * revert doc change
    
    * Update JdbcDurableStateStore.scala
---
 .../jdbc/state/DurableStateQueries.scala           | 11 +++++
 .../scaladsl/DurableStateExceptionSupport.scala    | 48 ++++++++++++++++++++++
 .../state/scaladsl/JdbcDurableStateStore.scala     | 36 +++++++++++-----
 .../jdbc/state/scaladsl/JdbcDurableStateSpec.scala | 41 ++++++++++++++++++
 4 files changed, 125 insertions(+), 11 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/DurableStateQueries.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/DurableStateQueries.scala
index 3c39ce3..7b433fb 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/DurableStateQueries.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/DurableStateQueries.scala
@@ -17,6 +17,7 @@ package org.apache.pekko.persistence.jdbc.state
 import org.apache.pekko
 import pekko.annotation.InternalApi
 import pekko.persistence.jdbc.config.DurableStateTableConfiguration
+
 import slick.jdbc.{ H2Profile, JdbcProfile, OracleProfile, PostgresProfile, 
SQLServerProfile, SetParameter }
 
 /**
@@ -92,6 +93,16 @@ import slick.jdbc.{ H2Profile, JdbcProfile, OracleProfile, 
PostgresProfile, SQLS
     durableStateTable.filter(_.persistenceId === persistenceId).delete
   }
 
+  /**
+   * Deletes a particular revision of an object based on its persistenceId.
+   * This revision may no longer exist and if so, no delete will occur.
+   *
+   * @since 1.1.0
+   */
+  private[jdbc] def deleteBasedOnPersistenceIdAndRevision(persistenceId: 
String, revision: Long) = {
+    selectFromDbByPersistenceId(persistenceId).filter(_.revision === 
revision).delete
+  }
+
   def deleteAllFromDb() = {
     durableStateTable.delete
   }
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/DurableStateExceptionSupport.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/DurableStateExceptionSupport.scala
new file mode 100644
index 0000000..88e1931
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/DurableStateExceptionSupport.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.persistence.jdbc.state.scaladsl
+
+import java.lang.invoke.{ MethodHandles, MethodType }
+
+import scala.util.Try
+
+/**
+ * INTERNAL API
+ *
+ * Support for creating a `DeleteRevisionException`if the class is
+ * available on the classpath. Pekko 1.0 does not have this class, but
+ * it is added in Pekko 1.1.
+ */
+private[scaladsl] object DurableStateExceptionSupport {
+  val DeleteRevisionExceptionClass =
+    "org.apache.pekko.persistence.state.exception.DeleteRevisionException"
+
+  private def exceptionClassOpt: Option[Class[_]] =
+    Try(Class.forName(DeleteRevisionExceptionClass)).toOption
+
+  private val constructorOpt = exceptionClassOpt.map { clz =>
+    val mt = MethodType.methodType(classOf[Unit], classOf[String])
+    MethodHandles.publicLookup().findConstructor(clz, mt)
+  }
+
+  def createDeleteRevisionExceptionIfSupported(message: String): 
Option[Exception] =
+    constructorOpt.map { constructor =>
+      constructor.invoke(message).asInstanceOf[Exception]
+    }
+
+}
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala
index e0795e4..b1acbf1 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala
@@ -22,23 +22,23 @@ import slick.jdbc.{ JdbcBackend, JdbcProfile }
 import org.apache.pekko
 import pekko.{ Done, NotUsed }
 import pekko.actor.ExtendedActorSystem
+import pekko.annotation.ApiMayChange
+import pekko.dispatch.ExecutionContexts
 import pekko.pattern.ask
 import pekko.persistence.jdbc.PekkoSerialization
 import pekko.persistence.jdbc.state.DurableStateQueries
 import pekko.persistence.jdbc.config.DurableStateTableConfiguration
 import pekko.persistence.jdbc.state.{ DurableStateTables, OffsetSyntax }
-import pekko.persistence.query.{ DurableStateChange, Offset }
 import pekko.persistence.jdbc.journal.dao.FlowControl
 import pekko.persistence.jdbc.state.{ scaladsl => jdbcStateScalaDsl }
-import pekko.persistence.state.{ scaladsl => stateScalaDsl }
+import pekko.persistence.query.{ DurableStateChange, Offset, 
UpdatedDurableState }
 import pekko.persistence.query.{ scaladsl => queryScalaDsl }
+import pekko.persistence.state.{ scaladsl => stateScalaDsl }
 import pekko.serialization.Serialization
 import pekko.stream.scaladsl.{ Sink, Source }
 import pekko.stream.{ Materializer, SystemMaterializer }
 import pekko.util.Timeout
 import OffsetSyntax._
-import pekko.annotation.ApiMayChange
-import pekko.persistence.query.UpdatedDurableState
 
 object JdbcDurableStateStore {
   val Identifier = "jdbc-durable-state-store"
@@ -70,7 +70,7 @@ class JdbcDurableStateStore[A](
       durableStateConfig.stateSequenceConfig),
     s"pekko-persistence-jdbc-durable-state-sequence-actor")
 
-  def getObject(persistenceId: String): 
Future[stateScalaDsl.GetObjectResult[A]] = {
+  override def getObject(persistenceId: String): 
Future[stateScalaDsl.GetObjectResult[A]] = {
     db.run(queries.selectFromDbByPersistenceId(persistenceId).result).map { 
rows =>
       rows.headOption match {
         case Some(row) =>
@@ -84,7 +84,7 @@ class JdbcDurableStateStore[A](
     }
   }
 
-  def upsertObject(persistenceId: String, revision: Long, value: A, tag: 
String): Future[Done] = {
+  override def upsertObject(persistenceId: String, revision: Long, value: A, 
tag: String): Future[Done] = {
     require(revision > 0)
     val row =
       PekkoSerialization.serialize(serialization, value).map { serialized =>
@@ -113,13 +113,27 @@ class JdbcDurableStateStore[A](
       }
   }
 
-  def deleteObject(persistenceId: String): Future[Done] =
+  override def deleteObject(persistenceId: String): Future[Done] =
     db.run(queries.deleteFromDb(persistenceId).map(_ => Done))
 
-  def deleteObject(persistenceId: String, revision: Long): Future[Done] =
-    db.run(queries.deleteFromDb(persistenceId).map(_ => Done))
+  override def deleteObject(persistenceId: String, revision: Long): 
Future[Done] =
+    db.run(queries.deleteBasedOnPersistenceIdAndRevision(persistenceId, 
revision)).map { count =>
+      if (count != 1) {
+        // if you run this code with Pekko 1.0.x, no exception will be thrown 
here
+        // this matches the behavior of pekko-connectors-jdbc 1.0.x
+        // if you run this code with Pekko 1.1.x, a DeleteRevisionException 
will be thrown here
+        val msg = if (count == 0) {
+          s"Failed to delete object with persistenceId [$persistenceId] and 
revision [$revision]"
+        } else {
+          s"Delete object succeeded for persistenceId [$persistenceId] and 
revision [$revision] but more than one row was affected ($count rows)"
+        }
+        
DurableStateExceptionSupport.createDeleteRevisionExceptionIfSupported(msg)
+          .foreach(throw _)
+      }
+      Done
+    }(ExecutionContexts.parasitic)
 
-  def currentChanges(tag: String, offset: Offset): 
Source[DurableStateChange[A], NotUsed] = {
+  override def currentChanges(tag: String, offset: Offset): 
Source[DurableStateChange[A], NotUsed] = {
     Source
       .futureSource(maxStateStoreOffset().map { maxOrderingInDb =>
         changesByTag(tag, offset.value, terminateAfterOffset = 
Some(maxOrderingInDb))
@@ -127,7 +141,7 @@ class JdbcDurableStateStore[A](
       .mapMaterializedValue(_ => NotUsed)
   }
 
-  def changes(tag: String, offset: Offset): Source[DurableStateChange[A], 
NotUsed] =
+  override def changes(tag: String, offset: Offset): 
Source[DurableStateChange[A], NotUsed] =
     changesByTag(tag, offset.value, terminateAfterOffset = None)
 
   private def currentChangesByTag(
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateSpec.scala
index 84192ce..01681f4 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateSpec.scala
@@ -120,6 +120,47 @@ abstract class JdbcDurableStateSpec(config: Config, 
schemaType: SchemaType) exte
         }
       }
     }
+    "fail to delete old object revision" in {
+      val f = for {
+        n <- stateStoreString.upsertObject("p987", 1, "a valid string", "t123")
+        _ = n shouldBe pekko.Done
+        g <- stateStoreString.getObject("p987")
+        _ = g.value shouldBe Some("a valid string")
+        u <- stateStoreString.upsertObject("p987", 2, "updated valid string", 
"t123")
+        _ = u shouldBe pekko.Done
+        d <- stateStoreString.deleteObject("p987", 1)
+      } yield d
+      if (pekko.Version.current.startsWith("1.0")) {
+        whenReady(f) { v =>
+          v shouldBe pekko.Done
+        }
+      } else {
+        whenReady(f.failed) { e =>
+          e.getClass.getName shouldEqual 
DurableStateExceptionSupport.DeleteRevisionExceptionClass
+          e.getMessage should include("Failed to delete object with 
persistenceId [p987] and revision [1]")
+        }
+      }
+    }
+    "delete latest object revision but not older one" in {
+      whenReady {
+        for {
+
+          n <- stateStoreString.upsertObject("p9876", 1, "a valid string", 
"t123")
+          _ = n shouldBe pekko.Done
+          g <- stateStoreString.getObject("p9876")
+          _ = g.value shouldBe Some("a valid string")
+          u <- stateStoreString.upsertObject("p9876", 2, "updated valid 
string", "t123")
+          _ = u shouldBe pekko.Done
+          d <- stateStoreString.deleteObject("p9876", 2)
+          _ = d shouldBe pekko.Done
+          h <- stateStoreString.getObject("p9876")
+
+        } yield h
+      } { v =>
+        // current behavior is that deleting the latest revision means 
getObject returns None (we don't preserve older revisions)
+        v.value shouldBe None
+      }
+    }
   }
 
   "A durable state store with payload that needs custom serializer" must 
withActorSystem { implicit system =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to