This is an automated email from the ASF dual-hosted git repository.

pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-r2dbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 662c54f  feat: cleanup - delete events in batches (port of 
akka/akka-persistence-r2dbc#329) (#360)
662c54f is described below

commit 662c54fe7a40832b1c129f8a6e6965f2ea2df833
Author: PJ Fanning <[email protected]>
AuthorDate: Fri May 15 09:57:18 2026 +0100

    feat: cleanup - delete events in batches (port of 
akka/akka-persistence-r2dbc#329) (#360)
    
    * port akka PR #329: add cleanup tool with batch event deletion support
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/0f997cad-95ad-4696-ab57-81a9520e74c9
    
    Co-authored-by: pjfanning <[email protected]>
    
    * fix log message string interpolation in cleanup tools
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/0f997cad-95ad-4696-ab57-81a9520e74c9
    
    Co-authored-by: pjfanning <[email protected]>
    
    * scalafmt
    
    * Fix ExecutionContext import in JournalDao.scala
    
    * Update ExecutionContexts to ExecutionContext in JournalDao
    
    * since 2.0.0
    
    * scalafmt
    
    * port akka PR #326: add cleanup documentation and doc examples
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/3c2b8b51-5df4-4d6b-bb19-0d45ace0e6bd
    
    Co-authored-by: pjfanning <[email protected]>
    
    * Update 'Cleanup tool' to 'Cleanup Tool' in index.md
    
    * Fix capitalization in cleanup tool headings
    
    * Update CleanupDocExample.scala
    
    * port akka PR #329: add cleanup tool with batch event deletion support
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/0f997cad-95ad-4696-ab57-81a9520e74c9
    
    Co-authored-by: pjfanning <[email protected]>
    
    * fix log message string interpolation in cleanup tools
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/0f997cad-95ad-4696-ab57-81a9520e74c9
    
    Co-authored-by: pjfanning <[email protected]>
    
    * scalafmt
    
    * Fix ExecutionContext import in JournalDao.scala
    
    * Update ExecutionContexts to ExecutionContext in JournalDao
    
    * since 2.0.0
    
    * scalafmt
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 core/src/main/resources/reference.conf             |  12 ++
 .../pekko/persistence/r2dbc/R2dbcSettings.scala    |  18 ++
 .../cleanup/javadsl/DurableStateCleanup.scala      |  65 +++++++
 .../cleanup/javadsl/EventSourcedCleanup.scala      | 117 ++++++++++++
 .../cleanup/scaladsl/DurableStateCleanup.scala     | 135 +++++++++++++
 .../cleanup/scaladsl/EventSourcedCleanup.scala     | 208 +++++++++++++++++++++
 .../persistence/r2dbc/journal/JournalDao.scala     | 125 +++++++++++++
 docs/src/main/paradox/cleanup.md                   |  55 ++++++
 docs/src/main/paradox/index.md                     |   1 +
 .../java/jdocs/home/cleanup/CleanupDocExample.java |  50 +++++
 .../docs/home/cleanup/CleanupDocExample.scala      |  46 +++++
 11 files changed, 832 insertions(+)

diff --git a/core/src/main/resources/reference.conf 
b/core/src/main/resources/reference.conf
index f5a5075..dac6e01 100644
--- a/core/src/main/resources/reference.conf
+++ b/core/src/main/resources/reference.conf
@@ -291,3 +291,15 @@ pekko.persistence.r2dbc {
   }
 }
 // #connection-settings
+
+pekko.persistence.r2dbc {
+  # Configuration of the Cleanup tool (EventSourcedCleanup and 
DurableStateCleanup).
+  cleanup {
+    # Log progress after this number of delete operations. Can be set to 1 to 
log
+    # progress of each operation.
+    log-progress-every = 100
+    # For large journals deleting events in a single transaction might not be 
very efficient.
+    # Set this value to expected delete batch size to minimize table lock 
holding and contention.
+    events-journal-delete-batch-size = 1000
+  }
+}
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
index b3978f2..ff1a234 100644
--- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
+++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
@@ -282,6 +282,24 @@ object ConnectionFactorySettings {
     new ConnectionFactorySettings(config)
 }
 
+/**
+ * INTERNAL API
+ */
+@InternalStableApi
+final class CleanupSettings(val config: Config) {
+  val logProgressEvery: Int = config.getInt("log-progress-every")
+  val eventsJournalDeleteBatchSize: Int = 
config.getInt("events-journal-delete-batch-size")
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalStableApi
+object CleanupSettings {
+  def apply(config: Config): CleanupSettings =
+    new CleanupSettings(config)
+}
+
 /**
  * INTERNAL API
  */
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/cleanup/javadsl/DurableStateCleanup.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/cleanup/javadsl/DurableStateCleanup.scala
new file mode 100644
index 0000000..87a81c1
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/cleanup/javadsl/DurableStateCleanup.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc.cleanup.javadsl
+
+import java.util.concurrent.CompletionStage
+import java.util.{ List => JList }
+
+import scala.jdk.CollectionConverters._
+import scala.jdk.FutureConverters._
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.ClassicActorSystemProvider
+import pekko.annotation.ApiMayChange
+import pekko.persistence.r2dbc.cleanup.{ scaladsl => s }
+
+/**
+ * Java API: Tool for deleting durable state for a given list of 
`persistenceIds` without using `DurableStateBehavior`
+ * actors. It's important that the actors with corresponding persistenceId are 
not running at the same time as using the
+ * tool.
+ *
+ * If `resetRevisionNumber` is `true` then the creating entity with the same 
`persistenceId` will start from 0.
+ * Otherwise it will continue from the latest highest used revision number.
+ *
+ * WARNING: reusing the same `persistenceId` after resetting the revision 
number should be avoided, since it might be
+ * confusing to reuse the same revision numbers for new state changes.
+ *
+ * When a list of `persistenceIds` are given they are deleted sequentially in 
the order of the list. It's possible to
+ * parallelize the deletes by running several cleanup operations at the same 
time operating on different sets of
+ * `persistenceIds`.
+ *
+ * @since 2.0.0
+ */
+@ApiMayChange
+final class DurableStateCleanup private (delegate: s.DurableStateCleanup) {
+
+  def this(systemProvider: ClassicActorSystemProvider, configPath: String) =
+    this(new s.DurableStateCleanup(systemProvider, configPath))
+
+  def this(systemProvider: ClassicActorSystemProvider) =
+    this(systemProvider, "pekko.persistence.r2dbc.cleanup")
+
+  /**
+   * Delete the state related to one single `persistenceId`.
+   */
+  def deleteState(persistenceId: String, resetRevisionNumber: Boolean): 
CompletionStage[Done] =
+    delegate.deleteState(persistenceId, resetRevisionNumber).asJava
+
+  /**
+   * Delete all states related to the given list of `persistenceIds`.
+   */
+  def deleteStates(persistenceIds: JList[String], resetRevisionNumber: 
Boolean): CompletionStage[Done] =
+    delegate.deleteStates(persistenceIds.asScala.toVector, 
resetRevisionNumber).asJava
+}
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/cleanup/javadsl/EventSourcedCleanup.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/cleanup/javadsl/EventSourcedCleanup.scala
new file mode 100644
index 0000000..dd99714
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/cleanup/javadsl/EventSourcedCleanup.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc.cleanup.javadsl
+
+import java.util.concurrent.CompletionStage
+import java.util.{ List => JList }
+
+import scala.jdk.CollectionConverters._
+import scala.jdk.FutureConverters._
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.ClassicActorSystemProvider
+import pekko.annotation.ApiMayChange
+import pekko.persistence.r2dbc.cleanup.{ scaladsl => s }
+
+/**
+ * Java API: Tool for deleting events and/or snapshots for a given list of 
`persistenceIds` without using persistent
+ * actors.
+ *
+ * When running an operation with `EventSourcedCleanup` that deletes all 
events for a persistence id, the actor with
+ * that persistence id must not be running! If the actor is restarted it would 
in that case be recovered to the wrong
+ * state since the stored events have been deleted. Delete events before 
snapshot can still be used while the actor is
+ * running.
+ *
+ * If `resetSequenceNumber` is `true` then the creating entity with the same 
`persistenceId` will start from 0.
+ * Otherwise it will continue from the latest highest used sequence number.
+ *
+ * WARNING: reusing the same `persistenceId` after resetting the sequence 
number should be avoided, since it might be
+ * confusing to reuse the same sequence number for new events.
+ *
+ * When a list of `persistenceIds` are given they are deleted sequentially in 
the order of the list. It's possible to
+ * parallelize the deletes by running several cleanup operations at the same 
time operating on different sets of
+ * `persistenceIds`.
+ *
+ *  @since 2.0.0
+ */
+@ApiMayChange
+final class EventSourcedCleanup private (delegate: s.EventSourcedCleanup) {
+
+  def this(systemProvider: ClassicActorSystemProvider, configPath: String) =
+    this(new s.EventSourcedCleanup(systemProvider, configPath))
+
+  def this(systemProvider: ClassicActorSystemProvider) =
+    this(systemProvider, "pekko.persistence.r2dbc.cleanup")
+
+  /**
+   * Delete all events before a sequenceNr for the given persistence id. 
Snapshots are not deleted.
+   *
+   * @param persistenceId
+   *   the persistence id to delete for
+   * @param toSequenceNr
+   *   sequence nr (inclusive) to delete up to
+   */
+  def deleteEventsTo(persistenceId: String, toSequenceNr: Long): 
CompletionStage[Done] =
+    delegate.deleteEventsTo(persistenceId, toSequenceNr).asJava
+
+  /**
+   * Delete all events related to one single `persistenceId`. Snapshots are 
not deleted.
+   */
+  def deleteAllEvents(persistenceId: String, resetSequenceNumber: Boolean): 
CompletionStage[Done] =
+    delegate.deleteAllEvents(persistenceId, resetSequenceNumber).asJava
+
+  /**
+   * Delete all events related to the given list of `persistenceIds`. 
Snapshots are not deleted.
+   */
+  def deleteAllEvents(persistenceIds: JList[String], resetSequenceNumber: 
Boolean): CompletionStage[Done] =
+    delegate.deleteAllEvents(persistenceIds.asScala.toVector, 
resetSequenceNumber).asJava
+
+  /**
+   * Delete snapshots related to one single `persistenceId`. Events are not 
deleted.
+   */
+  def deleteSnapshot(persistenceId: String): CompletionStage[Done] =
+    delegate.deleteSnapshot(persistenceId).asJava
+
+  /**
+   * Delete all snapshots related to the given list of `persistenceIds`. 
Events are not deleted.
+   */
+  def deleteSnapshots(persistenceIds: JList[String]): CompletionStage[Done] =
+    delegate.deleteSnapshots(persistenceIds.asScala.toVector).asJava
+
+  /**
+   * Deletes all events for the given persistence id from before the snapshot. 
The snapshot is not deleted. The event
+   * with the same sequence number as the remaining snapshot is deleted.
+   */
+  def cleanupBeforeSnapshot(persistenceId: String): CompletionStage[Done] =
+    delegate.cleanupBeforeSnapshot(persistenceId).asJava
+
+  /**
+   * See single persistenceId overload for what is done for each persistence 
id.
+   */
+  def cleanupBeforeSnapshot(persistenceIds: JList[String]): 
CompletionStage[Done] =
+    delegate.cleanupBeforeSnapshot(persistenceIds.asScala.toVector).asJava
+
+  /**
+   * Delete everything related to one single `persistenceId`. All events and 
snapshots are deleted.
+   */
+  def deleteAll(persistenceId: String, resetSequenceNumber: Boolean): 
CompletionStage[Done] =
+    delegate.deleteAll(persistenceId, resetSequenceNumber).asJava
+
+  /**
+   * Delete everything related to the given list of `persistenceIds`. All 
events and snapshots are deleted.
+   */
+  def deleteAll(persistenceIds: JList[String], resetSequenceNumber: Boolean): 
CompletionStage[Done] =
+    delegate.deleteAll(persistenceIds.asScala.toVector, 
resetSequenceNumber).asJava
+}
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala
new file mode 100644
index 0000000..251496b
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc.cleanup.scaladsl
+
+import scala.collection.immutable
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.util.Failure
+import scala.util.Success
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.ClassicActorSystemProvider
+import pekko.actor.typed.ActorSystem
+import pekko.annotation.ApiMayChange
+import pekko.annotation.InternalApi
+import pekko.persistence.r2dbc.CleanupSettings
+import pekko.persistence.r2dbc.StateSettings
+import pekko.persistence.r2dbc.state.scaladsl.DurableStateDao
+import com.typesafe.config.Config
+import org.slf4j.LoggerFactory
+
+/**
+ * Scala API: Tool for deleting durable state for a given list of 
`persistenceIds` without using `DurableStateBehavior`
+ * actors. It's important that the actors with corresponding persistenceId are 
not running at the same time as using the
+ * tool.
+ *
+ * If `resetRevisionNumber` is `true` then the creating entity with the same 
`persistenceId` will start from 0.
+ * Otherwise it will continue from the latest highest used revision number.
+ *
+ * WARNING: reusing the same `persistenceId` after resetting the revision 
number should be avoided, since it might be
+ * confusing to reuse the same revision numbers for new state changes.
+ *
+ * When a list of `persistenceIds` are given they are deleted sequentially in 
the order of the list. It's possible to
+ * parallelize the deletes by running several cleanup operations at the same 
time operating on different sets of
+ * `persistenceIds`.
+ *
+ * @since 2.0.0
+ */
+@ApiMayChange
+final class DurableStateCleanup(systemProvider: ClassicActorSystemProvider, 
configPath: String) {
+
+  def this(systemProvider: ClassicActorSystemProvider) =
+    this(systemProvider, "pekko.persistence.r2dbc.cleanup")
+
+  /**
+   * INTERNAL API
+   */
+  @InternalApi private[pekko] implicit val system: ActorSystem[_] = {
+    import pekko.actor.typed.scaladsl.adapter._
+    systemProvider.classicSystem.toTyped
+  }
+
+  import system.executionContext
+
+  private val log = LoggerFactory.getLogger(classOf[DurableStateCleanup])
+
+  private val sharedConfigPath = configPath.replaceAll("""\.cleanup$""", "")
+  private val systemConfig: Config = system.settings.config
+
+  private val cleanupSettings = new 
CleanupSettings(systemConfig.getConfig(configPath))
+
+  private val stateConfig = systemConfig.getConfig(sharedConfigPath + ".state")
+  private val stateSettings = StateSettings(stateConfig)
+  private val stateDao = DurableStateDao.fromConfig(stateSettings, stateConfig)
+
+  /**
+   * Delete the state related to one single `persistenceId`.
+   */
+  def deleteState(persistenceId: String, resetRevisionNumber: Boolean): 
Future[Done] = {
+    if (resetRevisionNumber) {
+      stateDao
+        .deleteStateForRevision(persistenceId, revision = 0L)
+        .map(_ => Done)(ExecutionContext.parasitic)
+    } else {
+      stateDao.readState(persistenceId).flatMap {
+        case None =>
+          Future.successful(Done) // already deleted
+        case Some(s) =>
+          stateDao
+            .deleteStateForRevision(persistenceId, s.revision + 1)
+            .map(_ => Done)(ExecutionContext.parasitic)
+      }
+    }
+  }
+
+  /**
+   * Delete all states related to the given list of `persistenceIds`.
+   */
+  def deleteStates(persistenceIds: immutable.Seq[String], resetRevisionNumber: 
Boolean): Future[Done] = {
+    foreach(persistenceIds, "deleteStates", pid => deleteState(pid, 
resetRevisionNumber))
+  }
+
+  private def foreach(
+      persistenceIds: immutable.Seq[String],
+      operationName: String,
+      pidOperation: String => Future[Done]): Future[Done] = {
+    val size = persistenceIds.size
+    log.info("Cleanup started {} of [{}] persistenceId.", operationName, size: 
java.lang.Integer)
+
+    def loop(remaining: List[String], n: Int): Future[Done] = {
+      remaining match {
+        case Nil         => Future.successful(Done)
+        case pid :: tail =>
+          pidOperation(pid).flatMap { _ =>
+            if (n % cleanupSettings.logProgressEvery == 0)
+              log.info("Cleanup {} [{}] of [{}].", operationName, n: 
java.lang.Integer, size: java.lang.Integer)
+            loop(tail, n + 1)
+          }
+      }
+    }
+
+    val result = loop(persistenceIds.toList, n = 1)
+
+    result.onComplete {
+      case Success(_) =>
+        log.info("Cleanup completed {} of [{}] persistenceId.", operationName, 
size: java.lang.Integer)
+      case Failure(e) =>
+        log.error(s"Cleanup $operationName failed.", e)
+    }
+
+    result
+  }
+}
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/cleanup/scaladsl/EventSourcedCleanup.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/cleanup/scaladsl/EventSourcedCleanup.scala
new file mode 100644
index 0000000..1793024
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/cleanup/scaladsl/EventSourcedCleanup.scala
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc.cleanup.scaladsl
+
+import scala.collection.immutable
+import scala.concurrent.Future
+import scala.util.Failure
+import scala.util.Success
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.ClassicActorSystemProvider
+import pekko.actor.typed.ActorSystem
+import pekko.annotation.ApiMayChange
+import pekko.annotation.InternalApi
+import pekko.persistence.SnapshotSelectionCriteria
+import pekko.persistence.r2dbc.CleanupSettings
+import pekko.persistence.r2dbc.JournalSettings
+import pekko.persistence.r2dbc.SnapshotSettings
+import pekko.persistence.r2dbc.journal.JournalDao
+import pekko.persistence.r2dbc.snapshot.SnapshotDao
+import com.typesafe.config.Config
+import org.slf4j.LoggerFactory
+
+/**
+ * Scala API: Tool for deleting events and/or snapshots for a given list of 
`persistenceIds` without using persistent
+ * actors.
+ *
+ * When running an operation with `EventSourcedCleanup` that deletes all 
events for a persistence id, the actor with
+ * that persistence id must not be running! If the actor is restarted it would 
in that case be recovered to the wrong
+ * state since the stored events have been deleted. Delete events before 
snapshot can still be used while the actor is
+ * running.
+ *
+ * If `resetSequenceNumber` is `true` then the creating entity with the same 
`persistenceId` will start from 0.
+ * Otherwise it will continue from the latest highest used sequence number.
+ *
+ * WARNING: reusing the same `persistenceId` after resetting the sequence 
number should be avoided, since it might be
+ * confusing to reuse the same sequence number for new events.
+ *
+ * When a list of `persistenceIds` are given they are deleted sequentially in 
the order of the list. It's possible to
+ * parallelize the deletes by running several cleanup operations at the same 
time operating on different sets of
+ * `persistenceIds`.
+ *
+ * @since 2.0.0
+ */
+@ApiMayChange
+final class EventSourcedCleanup(systemProvider: ClassicActorSystemProvider, 
configPath: String) {
+
+  def this(systemProvider: ClassicActorSystemProvider) =
+    this(systemProvider, "pekko.persistence.r2dbc.cleanup")
+
+  /**
+   * INTERNAL API
+   */
+  @InternalApi private[pekko] implicit val system: ActorSystem[_] = {
+    import pekko.actor.typed.scaladsl.adapter._
+    systemProvider.classicSystem.toTyped
+  }
+
+  import system.executionContext
+
+  private val log = LoggerFactory.getLogger(classOf[EventSourcedCleanup])
+
+  private val sharedConfigPath = configPath.replaceAll("""\.cleanup$""", "")
+  private val systemConfig: Config = system.settings.config
+
+  private val cleanupSettings = new 
CleanupSettings(systemConfig.getConfig(configPath))
+
+  private val journalConfig = systemConfig.getConfig(sharedConfigPath + 
".journal")
+  private val journalSettings = JournalSettings(journalConfig)
+  private val journalDao = JournalDao.fromConfig(journalSettings, 
journalConfig)
+
+  private val snapshotConfig = systemConfig.getConfig(sharedConfigPath + 
".snapshot")
+  private val snapshotSettings = SnapshotSettings(snapshotConfig)
+  private val snapshotDao = SnapshotDao.fromConfig(snapshotSettings, 
snapshotConfig)
+
+  /**
+   * Delete all events before a sequenceNr for the given persistence id. 
Snapshots are not deleted.
+   *
+   * @param persistenceId
+   *   the persistence id to delete for
+   * @param toSequenceNr
+   *   sequence nr (inclusive) to delete up to
+   */
+  def deleteEventsTo(persistenceId: String, toSequenceNr: Long): Future[Done] 
= {
+    log.debug("deleteEventsTo persistenceId [{}], toSequenceNr [{}]", 
persistenceId, toSequenceNr: java.lang.Long)
+    journalDao
+      .deleteEventsTo(
+        persistenceId,
+        toSequenceNr,
+        resetSequenceNumber = false,
+        cleanupSettings.eventsJournalDeleteBatchSize)
+      .map(_ => Done)
+  }
+
+  /**
+   * Delete all events related to one single `persistenceId`. Snapshots are 
not deleted.
+   */
+  def deleteAllEvents(persistenceId: String, resetSequenceNumber: Boolean): 
Future[Done] = {
+    journalDao
+      .deleteEventsTo(
+        persistenceId,
+        toSequenceNr = Long.MaxValue,
+        resetSequenceNumber,
+        cleanupSettings.eventsJournalDeleteBatchSize)
+      .map(_ => Done)
+  }
+
+  /**
+   * Delete all events related to the given list of `persistenceIds`. 
Snapshots are not deleted.
+   */
+  def deleteAllEvents(persistenceIds: immutable.Seq[String], 
resetSequenceNumber: Boolean): Future[Done] = {
+    foreach(persistenceIds, "deleteAllEvents", pid => deleteAllEvents(pid, 
resetSequenceNumber))
+  }
+
+  /**
+   * Delete snapshots related to one single `persistenceId`. Events are not 
deleted.
+   */
+  def deleteSnapshot(persistenceId: String): Future[Done] = {
+    snapshotDao
+      .delete(persistenceId, SnapshotSelectionCriteria(maxSequenceNr = 
Long.MaxValue))
+      .map(_ => Done)
+  }
+
+  /**
+   * Delete all snapshots related to the given list of `persistenceIds`. 
Events are not deleted.
+   */
+  def deleteSnapshots(persistenceIds: immutable.Seq[String]): Future[Done] = {
+    foreach(persistenceIds, "deleteSnapshots", pid => deleteSnapshot(pid))
+  }
+
+  /**
+   * Deletes all events for the given persistence id from before the snapshot. 
The snapshot is not deleted. The event
+   * with the same sequence number as the remaining snapshot is deleted.
+   */
+  def cleanupBeforeSnapshot(persistenceId: String): Future[Done] = {
+    snapshotDao.load(persistenceId, SnapshotSelectionCriteria.Latest).flatMap {
+      case None           => Future.successful(Done)
+      case Some(snapshot) => deleteEventsTo(persistenceId, snapshot.seqNr)
+    }
+  }
+
+  /**
+   * See single persistenceId overload for what is done for each persistence 
id.
+   */
+  def cleanupBeforeSnapshot(persistenceIds: immutable.Seq[String]): 
Future[Done] = {
+    foreach(persistenceIds, "cleanupBeforeSnapshot", pid => 
cleanupBeforeSnapshot(pid))
+  }
+
+  /**
+   * Delete everything related to one single `persistenceId`. All events and 
snapshots are deleted.
+   */
+  def deleteAll(persistenceId: String, resetSequenceNumber: Boolean): 
Future[Done] = {
+    for {
+      _ <- deleteAllEvents(persistenceId, resetSequenceNumber)
+      _ <- deleteSnapshot(persistenceId)
+    } yield Done
+  }
+
+  /**
+   * Delete everything related to the given list of `persistenceIds`. All 
events and snapshots are deleted.
+   */
+  def deleteAll(persistenceIds: immutable.Seq[String], resetSequenceNumber: 
Boolean): Future[Done] = {
+    foreach(persistenceIds, "deleteAll", pid => deleteAll(pid, 
resetSequenceNumber))
+  }
+
+  private def foreach(
+      persistenceIds: immutable.Seq[String],
+      operationName: String,
+      pidOperation: String => Future[Done]): Future[Done] = {
+    val size = persistenceIds.size
+    log.info("Cleanup started {} of [{}] persistenceId.", operationName, size: 
java.lang.Integer)
+
+    def loop(remaining: List[String], n: Int): Future[Done] = {
+      remaining match {
+        case Nil         => Future.successful(Done)
+        case pid :: tail =>
+          pidOperation(pid).flatMap { _ =>
+            if (n % cleanupSettings.logProgressEvery == 0)
+              log.info("Cleanup {} [{}] of [{}].", operationName, n: 
java.lang.Integer, size: java.lang.Integer)
+            loop(tail, n + 1)
+          }
+      }
+    }
+
+    val result = loop(persistenceIds.toList, n = 1)
+
+    result.onComplete {
+      case Success(_) =>
+        log.info("Cleanup completed {} of [{}] persistenceId.", operationName, 
size: java.lang.Integer)
+      case Failure(e) =>
+        log.error(s"Cleanup $operationName failed.", e)
+    }
+
+    result
+  }
+}
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
index 801471d..c61d154 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
@@ -32,6 +32,7 @@ import 
pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
 import pekko.persistence.r2dbc.journal.mysql.MySQLJournalDao
 import pekko.persistence.typed.PersistenceId
 import com.typesafe.config.Config
+import io.r2dbc.spi.Connection
 import io.r2dbc.spi.ConnectionFactory
 import io.r2dbc.spi.Row
 import io.r2dbc.spi.Statement
@@ -145,6 +146,14 @@ private[r2dbc] class JournalDao(val settings: 
JournalSettings, connectionFactory
   private val deleteEventsSql = sql"""
     DELETE FROM $journalTable
     WHERE persistence_id = ? AND seq_nr <= ?"""
+
+  private val deleteEventsFromToSql = sql"""
+    DELETE FROM $journalTable
+    WHERE persistence_id = ? AND seq_nr >= ? AND seq_nr <= ?"""
+
+  private val selectLowestSequenceNrSql = sql"""
+    SELECT MIN(seq_nr) from $journalTable WHERE persistence_id = ?"""
+
   private val insertDeleteMarkerSql = sql"""
     INSERT INTO $journalTable
     (slice, entity_type, persistence_id, seq_nr, db_timestamp, writer, 
adapter_manifest, event_ser_id, event_ser_manifest, event_payload, deleted)
@@ -302,4 +311,120 @@ private[r2dbc] class JournalDao(val settings: 
JournalSettings, connectionFactory
     }
   }
 
+  private[r2dbc] def readLowestSequenceNr(persistenceId: String): Future[Long] 
= {
+    val result = r2dbcExecutor
+      .select(s"select lowest seqNr [$persistenceId]")(
+        connection =>
+          connection
+            .createStatement(selectLowestSequenceNrSql)
+            .bind(0, persistenceId),
+        row => {
+          val seqNr = row.get(0, classOf[java.lang.Long])
+          if (seqNr eq null) 0L else seqNr.longValue
+        })
+      .map(r => if (r.isEmpty) 0L else r.head)(ExecutionContext.parasitic)
+
+    if (log.isDebugEnabled)
+      result.foreach(seqNr =>
+        log.debug("Lowest sequence nr for persistenceId [{}]: [{}]", 
persistenceId, seqNr: java.lang.Long))
+
+    result
+  }
+
+  private def highestSeqNrForDelete(persistenceId: String, toSequenceNr: 
Long): Future[Long] = {
+    if (toSequenceNr == Long.MaxValue) readHighestSequenceNr(persistenceId, 0L)
+    else Future.successful(toSequenceNr)
+  }
+
+  private def lowestSequenceNrForDelete(persistenceId: String, toSeqNr: Long, 
batchSize: Int): Future[Long] = {
+    if (toSeqNr <= batchSize) {
+      Future.successful(1L)
+    } else {
+      readLowestSequenceNr(persistenceId)
+    }
+  }
+
+  /**
+   * Delete events up to and including `toSequenceNr` for `persistenceId` in 
batches.
+   *
+   * If `resetSequenceNumber` is `false` a delete marker is left at the 
highest deleted sequence number, so that the
+   * actor can continue from the next sequence number. This is the typical use 
case for cleanup of older events.
+   *
+   * If `resetSequenceNumber` is `true` the sequence number will be reset to 1 
when the actor is started again with
+   * the same `persistenceId`. WARNING: reusing the same `persistenceId` after 
resetting the sequence number should
+   * be avoided, since it might be confusing to reuse the same sequence number 
for new events.
+   *
+   * @param batchSize number of events to delete per batch (use 
`CleanupSettings.eventsJournalDeleteBatchSize`)
+   */
+  def deleteEventsTo(
+      persistenceId: String,
+      toSequenceNr: Long,
+      resetSequenceNumber: Boolean,
+      batchSize: Int): Future[Unit] = {
+    def insertDeleteMarkerStmt(deleteMarkerSeqNr: Long, connection: 
Connection): Statement = {
+      val entityType = PersistenceId.extractEntityType(persistenceId)
+      val slice = persistenceExt.sliceForPersistenceId(persistenceId)
+      connection
+        .createStatement(insertDeleteMarkerSql)
+        .bind(0, slice)
+        .bind(1, entityType)
+        .bind(2, persistenceId)
+        .bind(3, deleteMarkerSeqNr)
+        .bind(4, "")
+        .bind(5, "")
+        .bind(6, 0)
+        .bind(7, "")
+        .bind(8, Array.emptyByteArray)
+        .bind(9, true)
+    }
+
+    def deleteBatch(from: Long, to: Long, lastBatch: Boolean): Future[Unit] = {
+      (if (lastBatch && !resetSequenceNumber) {
+         r2dbcExecutor
+           .update(s"delete [$persistenceId] and insert marker") { connection 
=>
+             Vector(
+               connection
+                 .createStatement(deleteEventsFromToSql)
+                 .bind(0, persistenceId)
+                 .bind(1, from)
+                 .bind(2, to),
+               insertDeleteMarkerStmt(to, connection))
+           }
+           .map(_.head)
+       } else {
+         r2dbcExecutor
+           .updateOne(s"delete [$persistenceId]") { connection =>
+             connection
+               .createStatement(deleteEventsFromToSql)
+               .bind(0, persistenceId)
+               .bind(1, from)
+               .bind(2, to)
+           }
+       }).map(deletedRows =>
+        if (log.isDebugEnabled) {
+          log.debug(
+            "Deleted [{}] events for persistenceId [{}], from seq num [{}] to 
[{}]",
+            deletedRows: java.lang.Long,
+            persistenceId,
+            from: java.lang.Long,
+            to: java.lang.Long)
+        })(ExecutionContext.parasitic)
+    }
+
+    def deleteInBatches(from: Long, maxTo: Long): Future[Unit] = {
+      if (from + batchSize > maxTo) {
+        deleteBatch(from, maxTo, lastBatch = true)
+      } else {
+        val to = from + batchSize - 1
+        deleteBatch(from, to, lastBatch = false).flatMap(_ => 
deleteInBatches(to + 1, maxTo))
+      }
+    }
+
+    for {
+      toSeqNr <- highestSeqNrForDelete(persistenceId, toSequenceNr)
+      fromSeqNr <- lowestSequenceNrForDelete(persistenceId, toSeqNr, batchSize)
+      _ <- deleteInBatches(fromSeqNr, toSeqNr)
+    } yield ()
+  }
+
 }
diff --git a/docs/src/main/paradox/cleanup.md b/docs/src/main/paradox/cleanup.md
new file mode 100644
index 0000000..9a966d4
--- /dev/null
+++ b/docs/src/main/paradox/cleanup.md
@@ -0,0 +1,55 @@
+# Database Cleanup
+
+## Event Sourced Cleanup Tool
+
+@@@ warning
+
+When running any operation with `EventSourcedCleanup` for a persistence id, 
the actor with that persistence id must
+not be running!
+
+@@@
+
+If possible, it is best to keep all events in an event sourced system. That 
way new @ref[Projections](projection.md)
+can be re-built.
+
+In some cases keeping all events is not possible or must be removed for 
regulatory reasons, such as compliance with
+GDPR. `EventSourcedBehavior`s can automatically snapshot state and delete 
events as described in the
+@extref:[Pekko docs](pekko:typed/persistence-snapshot.html#snapshot-deletion). 
Snapshotting is useful even if events
+aren't deleted as it speeds up recovery.
+
+Deleting all events immediately when an entity has reached its terminal 
deleted state would have the consequence that
+Projections might not have consumed all previous events yet and will not be 
notified of the deleted event. Instead, it's
+recommended to emit a final deleted event and store the fact that the entity 
is deleted separately via a Projection.
+Then a background task can clean up the events and snapshots for the deleted 
entities by using the
+@apidoc[EventSourcedCleanup] tool. The entity itself knows about the terminal 
state from the deleted event and should
+not emit further events after that and typically stop itself if it receives 
more commands.
+
+@apidoc[EventSourcedCleanup] operations include:
+
+* Delete all events and snapshots for one or many persistence ids
+* Delete all events for one or many persistence ids
+* Delete all snapshots for one or many persistence ids
+* Delete events before snapshot for one or many persistence ids
+
+The cleanup tool can be combined with the @ref[query plugin](./query.md) which 
has a query to get all persistence ids.
+
+Java
+: @@snip 
[cleanup](/docs/src/test/java/jdocs/home/cleanup/CleanupDocExample.java) { 
#cleanup }
+
+Scala
+: @@snip 
[cleanup](/docs/src/test/scala/docs/home/cleanup/CleanupDocExample.scala) { 
#cleanup }
+
+## Durable State Cleanup Tool
+
+@@@ warning
+
+When running any operation with `DurableStateCleanup` for a persistence id, 
the actor with that persistence id must
+not be running!
+
+@@@
+
+@apidoc[DurableStateCleanup] operations include:
+
+* Delete state for one or many persistence ids
+
+The cleanup tool can be combined with the @ref[query plugin](./query.md) which 
has a query to get all persistence ids.
diff --git a/docs/src/main/paradox/index.md b/docs/src/main/paradox/index.md
index 599b986..2d94f93 100644
--- a/docs/src/main/paradox/index.md
+++ b/docs/src/main/paradox/index.md
@@ -15,6 +15,7 @@ The Pekko Persistence R2DBC plugin allows for using SQL 
database with R2DBC as a
 * [Durable State Plugin](durable-state-store.md)
 * [Query Plugin](query.md)
 * [Projection](projection.md)
+* [Cleanup Tool](cleanup.md)
 * [Migration Tool](migration.md)
 * [Migration Guides](migration-guides.md)
 * [Release Notes](release-notes/index.md)
diff --git a/docs/src/test/java/jdocs/home/cleanup/CleanupDocExample.java 
b/docs/src/test/java/jdocs/home/cleanup/CleanupDocExample.java
new file mode 100644
index 0000000..f758941
--- /dev/null
+++ b/docs/src/test/java/jdocs/home/cleanup/CleanupDocExample.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package jdocs.home.cleanup;
+
+import org.apache.pekko.actor.typed.ActorSystem;
+import org.apache.pekko.actor.typed.javadsl.Behaviors;
+
+// #cleanup
+import org.apache.pekko.persistence.query.PersistenceQuery;
+import org.apache.pekko.persistence.query.javadsl.CurrentPersistenceIdsQuery;
+import org.apache.pekko.persistence.r2dbc.cleanup.javadsl.EventSourcedCleanup;
+import org.apache.pekko.persistence.r2dbc.query.javadsl.R2dbcReadJournal;
+
+// #cleanup
+
+public class CleanupDocExample {
+
+  public static void example() {
+
+    ActorSystem<?> system = ActorSystem.create(Behaviors.empty(), "Docs");
+
+    // #cleanup
+    CurrentPersistenceIdsQuery queries =
+        PersistenceQuery.get(system)
+            .getReadJournalFor(CurrentPersistenceIdsQuery.class, 
R2dbcReadJournal.Identifier());
+    EventSourcedCleanup cleanup = new EventSourcedCleanup(system);
+
+    // how many persistence ids to operate on in parallel
+    int persistenceIdParallelism = 10;
+
+    // for all persistence ids, delete all events before the snapshot
+    queries
+        .currentPersistenceIds()
+        .mapAsync(persistenceIdParallelism, pid -> 
cleanup.cleanupBeforeSnapshot(pid))
+        .run(system);
+
+    // #cleanup
+  }
+}
diff --git a/docs/src/test/scala/docs/home/cleanup/CleanupDocExample.scala 
b/docs/src/test/scala/docs/home/cleanup/CleanupDocExample.scala
new file mode 100644
index 0000000..5ecafbc
--- /dev/null
+++ b/docs/src/test/scala/docs/home/cleanup/CleanupDocExample.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package docs.home.cleanup
+
+import org.apache.pekko
+import pekko.actor.typed.ActorSystem
+
+//#cleanup
+import pekko.persistence.query.PersistenceQuery
+import pekko.persistence.query.scaladsl.CurrentPersistenceIdsQuery
+import pekko.persistence.r2dbc.cleanup.scaladsl.EventSourcedCleanup
+import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
+
+//#cleanup
+
+object CleanupDocExample {
+
+  implicit val system: ActorSystem[_] = ???
+
+  // #cleanup
+  val queries = 
PersistenceQuery(system).readJournalFor[CurrentPersistenceIdsQuery](R2dbcReadJournal.Identifier)
+  val cleanup = new EventSourcedCleanup(system)
+
+  // how many persistence ids to operate on in parallel
+  val persistenceIdParallelism = 10
+
+  // for all persistence ids, delete all events before the snapshot
+  queries
+    .currentPersistenceIds()
+    .mapAsync(persistenceIdParallelism)(pid => 
cleanup.cleanupBeforeSnapshot(pid))
+    .run()
+
+  // #cleanup
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to