This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-r2dbc.git
The following commit(s) were added to refs/heads/main by this push:
new 662c54f feat: cleanup - delete events in batches (port of
akka/akka-persistence-r2dbc#329) (#360)
662c54f is described below
commit 662c54fe7a40832b1c129f8a6e6965f2ea2df833
Author: PJ Fanning <[email protected]>
AuthorDate: Fri May 15 09:57:18 2026 +0100
feat: cleanup - delete events in batches (port of
akka/akka-persistence-r2dbc#329) (#360)
* port akka PR #329: add cleanup tool with batch event deletion support
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/0f997cad-95ad-4696-ab57-81a9520e74c9
Co-authored-by: pjfanning <[email protected]>
* fix log message string interpolation in cleanup tools
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/0f997cad-95ad-4696-ab57-81a9520e74c9
Co-authored-by: pjfanning <[email protected]>
* scalafmt
* Fix ExecutionContext import in JournalDao.scala
* Update ExecutionContexts to ExecutionContext in JournalDao
* since 2.0.0
* scalafmt
* port akka PR #326: add cleanup documentation and doc examples
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/3c2b8b51-5df4-4d6b-bb19-0d45ace0e6bd
Co-authored-by: pjfanning <[email protected]>
* Update 'Cleanup tool' to 'Cleanup Tool' in index.md
* Fix capitalization in cleanup tool headings
* Update CleanupDocExample.scala
* port akka PR #329: add cleanup tool with batch event deletion support
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/0f997cad-95ad-4696-ab57-81a9520e74c9
Co-authored-by: pjfanning <[email protected]>
* fix log message string interpolation in cleanup tools
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/0f997cad-95ad-4696-ab57-81a9520e74c9
Co-authored-by: pjfanning <[email protected]>
* scalafmt
* Fix ExecutionContext import in JournalDao.scala
* Update ExecutionContexts to ExecutionContext in JournalDao
* since 2.0.0
* scalafmt
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
core/src/main/resources/reference.conf | 12 ++
.../pekko/persistence/r2dbc/R2dbcSettings.scala | 18 ++
.../cleanup/javadsl/DurableStateCleanup.scala | 65 +++++++
.../cleanup/javadsl/EventSourcedCleanup.scala | 117 ++++++++++++
.../cleanup/scaladsl/DurableStateCleanup.scala | 135 +++++++++++++
.../cleanup/scaladsl/EventSourcedCleanup.scala | 208 +++++++++++++++++++++
.../persistence/r2dbc/journal/JournalDao.scala | 125 +++++++++++++
docs/src/main/paradox/cleanup.md | 55 ++++++
docs/src/main/paradox/index.md | 1 +
.../java/jdocs/home/cleanup/CleanupDocExample.java | 50 +++++
.../docs/home/cleanup/CleanupDocExample.scala | 46 +++++
11 files changed, 832 insertions(+)
diff --git a/core/src/main/resources/reference.conf
b/core/src/main/resources/reference.conf
index f5a5075..dac6e01 100644
--- a/core/src/main/resources/reference.conf
+++ b/core/src/main/resources/reference.conf
@@ -291,3 +291,15 @@ pekko.persistence.r2dbc {
}
}
// #connection-settings
+
+pekko.persistence.r2dbc {
+ # Configuration of the Cleanup tool (EventSourcedCleanup and
DurableStateCleanup).
+ cleanup {
+ # Log progress after this number of delete operations. Can be set to 1 to
log
+ # progress of each operation.
+ log-progress-every = 100
+ # For large journals deleting events in a single transaction might not be
very efficient.
+ # Set this value to expected delete batch size to minimize table lock
holding and contention.
+ events-journal-delete-batch-size = 1000
+ }
+}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
index b3978f2..ff1a234 100644
--- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
+++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
@@ -282,6 +282,24 @@ object ConnectionFactorySettings {
new ConnectionFactorySettings(config)
}
+/**
+ * INTERNAL API
+ */
+@InternalStableApi
+final class CleanupSettings(val config: Config) {
+ val logProgressEvery: Int = config.getInt("log-progress-every")
+ val eventsJournalDeleteBatchSize: Int =
config.getInt("events-journal-delete-batch-size")
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalStableApi
+object CleanupSettings {
+ def apply(config: Config): CleanupSettings =
+ new CleanupSettings(config)
+}
+
/**
* INTERNAL API
*/
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/cleanup/javadsl/DurableStateCleanup.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/cleanup/javadsl/DurableStateCleanup.scala
new file mode 100644
index 0000000..87a81c1
--- /dev/null
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/cleanup/javadsl/DurableStateCleanup.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc.cleanup.javadsl
+
+import java.util.concurrent.CompletionStage
+import java.util.{ List => JList }
+
+import scala.jdk.CollectionConverters._
+import scala.jdk.FutureConverters._
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.ClassicActorSystemProvider
+import pekko.annotation.ApiMayChange
+import pekko.persistence.r2dbc.cleanup.{ scaladsl => s }
+
+/**
+ * Java API: Tool for deleting durable state for a given list of
`persistenceIds` without using `DurableStateBehavior`
+ * actors. It's important that the actors with corresponding persistenceId are
not running at the same time as using the
+ * tool.
+ *
+ * If `resetRevisionNumber` is `true` then the creating entity with the same
`persistenceId` will start from 0.
+ * Otherwise it will continue from the latest highest used revision number.
+ *
+ * WARNING: reusing the same `persistenceId` after resetting the revision
number should be avoided, since it might be
+ * confusing to reuse the same revision numbers for new state changes.
+ *
+ * When a list of `persistenceIds` are given they are deleted sequentially in
the order of the list. It's possible to
+ * parallelize the deletes by running several cleanup operations at the same
time operating on different sets of
+ * `persistenceIds`.
+ *
+ * @since 2.0.0
+ */
+@ApiMayChange
+final class DurableStateCleanup private (delegate: s.DurableStateCleanup) {
+
+ def this(systemProvider: ClassicActorSystemProvider, configPath: String) =
+ this(new s.DurableStateCleanup(systemProvider, configPath))
+
+ def this(systemProvider: ClassicActorSystemProvider) =
+ this(systemProvider, "pekko.persistence.r2dbc.cleanup")
+
+ /**
+ * Delete the state related to one single `persistenceId`.
+ */
+ def deleteState(persistenceId: String, resetRevisionNumber: Boolean):
CompletionStage[Done] =
+ delegate.deleteState(persistenceId, resetRevisionNumber).asJava
+
+ /**
+ * Delete all states related to the given list of `persistenceIds`.
+ */
+ def deleteStates(persistenceIds: JList[String], resetRevisionNumber:
Boolean): CompletionStage[Done] =
+ delegate.deleteStates(persistenceIds.asScala.toVector,
resetRevisionNumber).asJava
+}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/cleanup/javadsl/EventSourcedCleanup.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/cleanup/javadsl/EventSourcedCleanup.scala
new file mode 100644
index 0000000..dd99714
--- /dev/null
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/cleanup/javadsl/EventSourcedCleanup.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc.cleanup.javadsl
+
+import java.util.concurrent.CompletionStage
+import java.util.{ List => JList }
+
+import scala.jdk.CollectionConverters._
+import scala.jdk.FutureConverters._
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.ClassicActorSystemProvider
+import pekko.annotation.ApiMayChange
+import pekko.persistence.r2dbc.cleanup.{ scaladsl => s }
+
+/**
+ * Java API: Tool for deleting events and/or snapshots for a given list of
`persistenceIds` without using persistent
+ * actors.
+ *
+ * When running an operation with `EventSourcedCleanup` that deletes all
events for a persistence id, the actor with
+ * that persistence id must not be running! If the actor is restarted it would
in that case be recovered to the wrong
+ * state since the stored events have been deleted. Delete events before
snapshot can still be used while the actor is
+ * running.
+ *
+ * If `resetSequenceNumber` is `true` then the creating entity with the same
`persistenceId` will start from 0.
+ * Otherwise it will continue from the latest highest used sequence number.
+ *
+ * WARNING: reusing the same `persistenceId` after resetting the sequence
number should be avoided, since it might be
+ * confusing to reuse the same sequence number for new events.
+ *
+ * When a list of `persistenceIds` are given they are deleted sequentially in
the order of the list. It's possible to
+ * parallelize the deletes by running several cleanup operations at the same
time operating on different sets of
+ * `persistenceIds`.
+ *
+ * @since 2.0.0
+ */
+@ApiMayChange
+final class EventSourcedCleanup private (delegate: s.EventSourcedCleanup) {
+
+ def this(systemProvider: ClassicActorSystemProvider, configPath: String) =
+ this(new s.EventSourcedCleanup(systemProvider, configPath))
+
+ def this(systemProvider: ClassicActorSystemProvider) =
+ this(systemProvider, "pekko.persistence.r2dbc.cleanup")
+
+ /**
+ * Delete all events before a sequenceNr for the given persistence id.
Snapshots are not deleted.
+ *
+ * @param persistenceId
+ * the persistence id to delete for
+ * @param toSequenceNr
+ * sequence nr (inclusive) to delete up to
+ */
+ def deleteEventsTo(persistenceId: String, toSequenceNr: Long):
CompletionStage[Done] =
+ delegate.deleteEventsTo(persistenceId, toSequenceNr).asJava
+
+ /**
+ * Delete all events related to one single `persistenceId`. Snapshots are
not deleted.
+ */
+ def deleteAllEvents(persistenceId: String, resetSequenceNumber: Boolean):
CompletionStage[Done] =
+ delegate.deleteAllEvents(persistenceId, resetSequenceNumber).asJava
+
+ /**
+ * Delete all events related to the given list of `persistenceIds`.
Snapshots are not deleted.
+ */
+ def deleteAllEvents(persistenceIds: JList[String], resetSequenceNumber:
Boolean): CompletionStage[Done] =
+ delegate.deleteAllEvents(persistenceIds.asScala.toVector,
resetSequenceNumber).asJava
+
+ /**
+ * Delete snapshots related to one single `persistenceId`. Events are not
deleted.
+ */
+ def deleteSnapshot(persistenceId: String): CompletionStage[Done] =
+ delegate.deleteSnapshot(persistenceId).asJava
+
+ /**
+ * Delete all snapshots related to the given list of `persistenceIds`.
Events are not deleted.
+ */
+ def deleteSnapshots(persistenceIds: JList[String]): CompletionStage[Done] =
+ delegate.deleteSnapshots(persistenceIds.asScala.toVector).asJava
+
+ /**
+ * Deletes all events for the given persistence id from before the snapshot.
The snapshot is not deleted. The event
+ * with the same sequence number as the remaining snapshot is deleted.
+ */
+ def cleanupBeforeSnapshot(persistenceId: String): CompletionStage[Done] =
+ delegate.cleanupBeforeSnapshot(persistenceId).asJava
+
+ /**
+ * See single persistenceId overload for what is done for each persistence
id.
+ */
+ def cleanupBeforeSnapshot(persistenceIds: JList[String]):
CompletionStage[Done] =
+ delegate.cleanupBeforeSnapshot(persistenceIds.asScala.toVector).asJava
+
+ /**
+ * Delete everything related to one single `persistenceId`. All events and
snapshots are deleted.
+ */
+ def deleteAll(persistenceId: String, resetSequenceNumber: Boolean):
CompletionStage[Done] =
+ delegate.deleteAll(persistenceId, resetSequenceNumber).asJava
+
+ /**
+ * Delete everything related to the given list of `persistenceIds`. All
events and snapshots are deleted.
+ */
+ def deleteAll(persistenceIds: JList[String], resetSequenceNumber: Boolean):
CompletionStage[Done] =
+ delegate.deleteAll(persistenceIds.asScala.toVector,
resetSequenceNumber).asJava
+}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala
new file mode 100644
index 0000000..251496b
--- /dev/null
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc.cleanup.scaladsl
+
+import scala.collection.immutable
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.util.Failure
+import scala.util.Success
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.ClassicActorSystemProvider
+import pekko.actor.typed.ActorSystem
+import pekko.annotation.ApiMayChange
+import pekko.annotation.InternalApi
+import pekko.persistence.r2dbc.CleanupSettings
+import pekko.persistence.r2dbc.StateSettings
+import pekko.persistence.r2dbc.state.scaladsl.DurableStateDao
+import com.typesafe.config.Config
+import org.slf4j.LoggerFactory
+
+/**
+ * Scala API: Tool for deleting durable state for a given list of
`persistenceIds` without using `DurableStateBehavior`
+ * actors. It's important that the actors with corresponding persistenceId are
not running at the same time as using the
+ * tool.
+ *
+ * If `resetRevisionNumber` is `true` then the creating entity with the same
`persistenceId` will start from 0.
+ * Otherwise it will continue from the latest highest used revision number.
+ *
+ * WARNING: reusing the same `persistenceId` after resetting the revision
number should be avoided, since it might be
+ * confusing to reuse the same revision numbers for new state changes.
+ *
+ * When a list of `persistenceIds` are given they are deleted sequentially in
the order of the list. It's possible to
+ * parallelize the deletes by running several cleanup operations at the same
time operating on different sets of
+ * `persistenceIds`.
+ *
+ * @since 2.0.0
+ */
+@ApiMayChange
+final class DurableStateCleanup(systemProvider: ClassicActorSystemProvider,
configPath: String) {
+
+ def this(systemProvider: ClassicActorSystemProvider) =
+ this(systemProvider, "pekko.persistence.r2dbc.cleanup")
+
+ /**
+ * INTERNAL API
+ */
+ @InternalApi private[pekko] implicit val system: ActorSystem[_] = {
+ import pekko.actor.typed.scaladsl.adapter._
+ systemProvider.classicSystem.toTyped
+ }
+
+ import system.executionContext
+
+ private val log = LoggerFactory.getLogger(classOf[DurableStateCleanup])
+
+ private val sharedConfigPath = configPath.replaceAll("""\.cleanup$""", "")
+ private val systemConfig: Config = system.settings.config
+
+ private val cleanupSettings = new
CleanupSettings(systemConfig.getConfig(configPath))
+
+ private val stateConfig = systemConfig.getConfig(sharedConfigPath + ".state")
+ private val stateSettings = StateSettings(stateConfig)
+ private val stateDao = DurableStateDao.fromConfig(stateSettings, stateConfig)
+
+ /**
+ * Delete the state related to one single `persistenceId`.
+ */
+ def deleteState(persistenceId: String, resetRevisionNumber: Boolean):
Future[Done] = {
+ if (resetRevisionNumber) {
+ stateDao
+ .deleteStateForRevision(persistenceId, revision = 0L)
+ .map(_ => Done)(ExecutionContext.parasitic)
+ } else {
+ stateDao.readState(persistenceId).flatMap {
+ case None =>
+ Future.successful(Done) // already deleted
+ case Some(s) =>
+ stateDao
+ .deleteStateForRevision(persistenceId, s.revision + 1)
+ .map(_ => Done)(ExecutionContext.parasitic)
+ }
+ }
+ }
+
+ /**
+ * Delete all states related to the given list of `persistenceIds`.
+ */
+ def deleteStates(persistenceIds: immutable.Seq[String], resetRevisionNumber:
Boolean): Future[Done] = {
+ foreach(persistenceIds, "deleteStates", pid => deleteState(pid,
resetRevisionNumber))
+ }
+
+ private def foreach(
+ persistenceIds: immutable.Seq[String],
+ operationName: String,
+ pidOperation: String => Future[Done]): Future[Done] = {
+ val size = persistenceIds.size
+ log.info("Cleanup started {} of [{}] persistenceId.", operationName, size:
java.lang.Integer)
+
+ def loop(remaining: List[String], n: Int): Future[Done] = {
+ remaining match {
+ case Nil => Future.successful(Done)
+ case pid :: tail =>
+ pidOperation(pid).flatMap { _ =>
+ if (n % cleanupSettings.logProgressEvery == 0)
+ log.info("Cleanup {} [{}] of [{}].", operationName, n:
java.lang.Integer, size: java.lang.Integer)
+ loop(tail, n + 1)
+ }
+ }
+ }
+
+ val result = loop(persistenceIds.toList, n = 1)
+
+ result.onComplete {
+ case Success(_) =>
+ log.info("Cleanup completed {} of [{}] persistenceId.", operationName,
size: java.lang.Integer)
+ case Failure(e) =>
+ log.error(s"Cleanup $operationName failed.", e)
+ }
+
+ result
+ }
+}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/cleanup/scaladsl/EventSourcedCleanup.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/cleanup/scaladsl/EventSourcedCleanup.scala
new file mode 100644
index 0000000..1793024
--- /dev/null
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/cleanup/scaladsl/EventSourcedCleanup.scala
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc.cleanup.scaladsl
+
+import scala.collection.immutable
+import scala.concurrent.Future
+import scala.util.Failure
+import scala.util.Success
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.ClassicActorSystemProvider
+import pekko.actor.typed.ActorSystem
+import pekko.annotation.ApiMayChange
+import pekko.annotation.InternalApi
+import pekko.persistence.SnapshotSelectionCriteria
+import pekko.persistence.r2dbc.CleanupSettings
+import pekko.persistence.r2dbc.JournalSettings
+import pekko.persistence.r2dbc.SnapshotSettings
+import pekko.persistence.r2dbc.journal.JournalDao
+import pekko.persistence.r2dbc.snapshot.SnapshotDao
+import com.typesafe.config.Config
+import org.slf4j.LoggerFactory
+
+/**
+ * Scala API: Tool for deleting events and/or snapshots for a given list of
`persistenceIds` without using persistent
+ * actors.
+ *
+ * When running an operation with `EventSourcedCleanup` that deletes all
events for a persistence id, the actor with
+ * that persistence id must not be running! If the actor is restarted it would
in that case be recovered to the wrong
+ * state since the stored events have been deleted. Delete events before
snapshot can still be used while the actor is
+ * running.
+ *
+ * If `resetSequenceNumber` is `true` then the creating entity with the same
`persistenceId` will start from 0.
+ * Otherwise it will continue from the latest highest used sequence number.
+ *
+ * WARNING: reusing the same `persistenceId` after resetting the sequence
number should be avoided, since it might be
+ * confusing to reuse the same sequence number for new events.
+ *
+ * When a list of `persistenceIds` are given they are deleted sequentially in
the order of the list. It's possible to
+ * parallelize the deletes by running several cleanup operations at the same
time operating on different sets of
+ * `persistenceIds`.
+ *
+ * @since 2.0.0
+ */
+@ApiMayChange
+final class EventSourcedCleanup(systemProvider: ClassicActorSystemProvider,
configPath: String) {
+
+ def this(systemProvider: ClassicActorSystemProvider) =
+ this(systemProvider, "pekko.persistence.r2dbc.cleanup")
+
+ /**
+ * INTERNAL API
+ */
+ @InternalApi private[pekko] implicit val system: ActorSystem[_] = {
+ import pekko.actor.typed.scaladsl.adapter._
+ systemProvider.classicSystem.toTyped
+ }
+
+ import system.executionContext
+
+ private val log = LoggerFactory.getLogger(classOf[EventSourcedCleanup])
+
+ private val sharedConfigPath = configPath.replaceAll("""\.cleanup$""", "")
+ private val systemConfig: Config = system.settings.config
+
+ private val cleanupSettings = new
CleanupSettings(systemConfig.getConfig(configPath))
+
+ private val journalConfig = systemConfig.getConfig(sharedConfigPath +
".journal")
+ private val journalSettings = JournalSettings(journalConfig)
+ private val journalDao = JournalDao.fromConfig(journalSettings,
journalConfig)
+
+ private val snapshotConfig = systemConfig.getConfig(sharedConfigPath +
".snapshot")
+ private val snapshotSettings = SnapshotSettings(snapshotConfig)
+ private val snapshotDao = SnapshotDao.fromConfig(snapshotSettings,
snapshotConfig)
+
+ /**
+ * Delete all events before a sequenceNr for the given persistence id.
Snapshots are not deleted.
+ *
+ * @param persistenceId
+ * the persistence id to delete for
+ * @param toSequenceNr
+ * sequence nr (inclusive) to delete up to
+ */
+ def deleteEventsTo(persistenceId: String, toSequenceNr: Long): Future[Done]
= {
+ log.debug("deleteEventsTo persistenceId [{}], toSequenceNr [{}]",
persistenceId, toSequenceNr: java.lang.Long)
+ journalDao
+ .deleteEventsTo(
+ persistenceId,
+ toSequenceNr,
+ resetSequenceNumber = false,
+ cleanupSettings.eventsJournalDeleteBatchSize)
+ .map(_ => Done)
+ }
+
+ /**
+ * Delete all events related to one single `persistenceId`. Snapshots are
not deleted.
+ */
+ def deleteAllEvents(persistenceId: String, resetSequenceNumber: Boolean):
Future[Done] = {
+ journalDao
+ .deleteEventsTo(
+ persistenceId,
+ toSequenceNr = Long.MaxValue,
+ resetSequenceNumber,
+ cleanupSettings.eventsJournalDeleteBatchSize)
+ .map(_ => Done)
+ }
+
+ /**
+ * Delete all events related to the given list of `persistenceIds`.
Snapshots are not deleted.
+ */
+ def deleteAllEvents(persistenceIds: immutable.Seq[String],
resetSequenceNumber: Boolean): Future[Done] = {
+ foreach(persistenceIds, "deleteAllEvents", pid => deleteAllEvents(pid,
resetSequenceNumber))
+ }
+
+ /**
+ * Delete snapshots related to one single `persistenceId`. Events are not
deleted.
+ */
+ def deleteSnapshot(persistenceId: String): Future[Done] = {
+ snapshotDao
+ .delete(persistenceId, SnapshotSelectionCriteria(maxSequenceNr =
Long.MaxValue))
+ .map(_ => Done)
+ }
+
+ /**
+ * Delete all snapshots related to the given list of `persistenceIds`.
Events are not deleted.
+ */
+ def deleteSnapshots(persistenceIds: immutable.Seq[String]): Future[Done] = {
+ foreach(persistenceIds, "deleteSnapshots", pid => deleteSnapshot(pid))
+ }
+
+ /**
+ * Deletes all events for the given persistence id from before the snapshot.
The snapshot is not deleted. The event
+ * with the same sequence number as the remaining snapshot is deleted.
+ */
+ def cleanupBeforeSnapshot(persistenceId: String): Future[Done] = {
+ snapshotDao.load(persistenceId, SnapshotSelectionCriteria.Latest).flatMap {
+ case None => Future.successful(Done)
+ case Some(snapshot) => deleteEventsTo(persistenceId, snapshot.seqNr)
+ }
+ }
+
+ /**
+ * See single persistenceId overload for what is done for each persistence
id.
+ */
+ def cleanupBeforeSnapshot(persistenceIds: immutable.Seq[String]):
Future[Done] = {
+ foreach(persistenceIds, "cleanupBeforeSnapshot", pid =>
cleanupBeforeSnapshot(pid))
+ }
+
+ /**
+ * Delete everything related to one single `persistenceId`. All events and
snapshots are deleted.
+ */
+ def deleteAll(persistenceId: String, resetSequenceNumber: Boolean):
Future[Done] = {
+ for {
+ _ <- deleteAllEvents(persistenceId, resetSequenceNumber)
+ _ <- deleteSnapshot(persistenceId)
+ } yield Done
+ }
+
+ /**
+ * Delete everything related to the given list of `persistenceIds`. All
events and snapshots are deleted.
+ */
+ def deleteAll(persistenceIds: immutable.Seq[String], resetSequenceNumber:
Boolean): Future[Done] = {
+ foreach(persistenceIds, "deleteAll", pid => deleteAll(pid,
resetSequenceNumber))
+ }
+
+ private def foreach(
+ persistenceIds: immutable.Seq[String],
+ operationName: String,
+ pidOperation: String => Future[Done]): Future[Done] = {
+ val size = persistenceIds.size
+ log.info("Cleanup started {} of [{}] persistenceId.", operationName, size:
java.lang.Integer)
+
+ def loop(remaining: List[String], n: Int): Future[Done] = {
+ remaining match {
+ case Nil => Future.successful(Done)
+ case pid :: tail =>
+ pidOperation(pid).flatMap { _ =>
+ if (n % cleanupSettings.logProgressEvery == 0)
+ log.info("Cleanup {} [{}] of [{}].", operationName, n:
java.lang.Integer, size: java.lang.Integer)
+ loop(tail, n + 1)
+ }
+ }
+ }
+
+ val result = loop(persistenceIds.toList, n = 1)
+
+ result.onComplete {
+ case Success(_) =>
+ log.info("Cleanup completed {} of [{}] persistenceId.", operationName,
size: java.lang.Integer)
+ case Failure(e) =>
+ log.error(s"Cleanup $operationName failed.", e)
+ }
+
+ result
+ }
+}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
index 801471d..c61d154 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
@@ -32,6 +32,7 @@ import
pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.persistence.r2dbc.journal.mysql.MySQLJournalDao
import pekko.persistence.typed.PersistenceId
import com.typesafe.config.Config
+import io.r2dbc.spi.Connection
import io.r2dbc.spi.ConnectionFactory
import io.r2dbc.spi.Row
import io.r2dbc.spi.Statement
@@ -145,6 +146,14 @@ private[r2dbc] class JournalDao(val settings:
JournalSettings, connectionFactory
private val deleteEventsSql = sql"""
DELETE FROM $journalTable
WHERE persistence_id = ? AND seq_nr <= ?"""
+
+ private val deleteEventsFromToSql = sql"""
+ DELETE FROM $journalTable
+ WHERE persistence_id = ? AND seq_nr >= ? AND seq_nr <= ?"""
+
+ private val selectLowestSequenceNrSql = sql"""
+ SELECT MIN(seq_nr) from $journalTable WHERE persistence_id = ?"""
+
private val insertDeleteMarkerSql = sql"""
INSERT INTO $journalTable
(slice, entity_type, persistence_id, seq_nr, db_timestamp, writer,
adapter_manifest, event_ser_id, event_ser_manifest, event_payload, deleted)
@@ -302,4 +311,120 @@ private[r2dbc] class JournalDao(val settings:
JournalSettings, connectionFactory
}
}
+ private[r2dbc] def readLowestSequenceNr(persistenceId: String): Future[Long]
= {
+ val result = r2dbcExecutor
+ .select(s"select lowest seqNr [$persistenceId]")(
+ connection =>
+ connection
+ .createStatement(selectLowestSequenceNrSql)
+ .bind(0, persistenceId),
+ row => {
+ val seqNr = row.get(0, classOf[java.lang.Long])
+ if (seqNr eq null) 0L else seqNr.longValue
+ })
+ .map(r => if (r.isEmpty) 0L else r.head)(ExecutionContext.parasitic)
+
+ if (log.isDebugEnabled)
+ result.foreach(seqNr =>
+ log.debug("Lowest sequence nr for persistenceId [{}]: [{}]",
persistenceId, seqNr: java.lang.Long))
+
+ result
+ }
+
+ private def highestSeqNrForDelete(persistenceId: String, toSequenceNr:
Long): Future[Long] = {
+ if (toSequenceNr == Long.MaxValue) readHighestSequenceNr(persistenceId, 0L)
+ else Future.successful(toSequenceNr)
+ }
+
+ private def lowestSequenceNrForDelete(persistenceId: String, toSeqNr: Long,
batchSize: Int): Future[Long] = {
+ if (toSeqNr <= batchSize) {
+ Future.successful(1L)
+ } else {
+ readLowestSequenceNr(persistenceId)
+ }
+ }
+
+ /**
+ * Delete events up to and including `toSequenceNr` for `persistenceId` in
batches.
+ *
+ * If `resetSequenceNumber` is `false` a delete marker is left at the
highest deleted sequence number, so that the
+ * actor can continue from the next sequence number. This is the typical use
case for cleanup of older events.
+ *
+ * If `resetSequenceNumber` is `true` the sequence number will be reset to 1
when the actor is started again with
+ * the same `persistenceId`. WARNING: reusing the same `persistenceId` after
resetting the sequence number should
+ * be avoided, since it might be confusing to reuse the same sequence number
for new events.
+ *
+ * @param batchSize number of events to delete per batch (use
`CleanupSettings.eventsJournalDeleteBatchSize`)
+ */
+ def deleteEventsTo(
+ persistenceId: String,
+ toSequenceNr: Long,
+ resetSequenceNumber: Boolean,
+ batchSize: Int): Future[Unit] = {
+ def insertDeleteMarkerStmt(deleteMarkerSeqNr: Long, connection:
Connection): Statement = {
+ val entityType = PersistenceId.extractEntityType(persistenceId)
+ val slice = persistenceExt.sliceForPersistenceId(persistenceId)
+ connection
+ .createStatement(insertDeleteMarkerSql)
+ .bind(0, slice)
+ .bind(1, entityType)
+ .bind(2, persistenceId)
+ .bind(3, deleteMarkerSeqNr)
+ .bind(4, "")
+ .bind(5, "")
+ .bind(6, 0)
+ .bind(7, "")
+ .bind(8, Array.emptyByteArray)
+ .bind(9, true)
+ }
+
+ def deleteBatch(from: Long, to: Long, lastBatch: Boolean): Future[Unit] = {
+ (if (lastBatch && !resetSequenceNumber) {
+ r2dbcExecutor
+ .update(s"delete [$persistenceId] and insert marker") { connection
=>
+ Vector(
+ connection
+ .createStatement(deleteEventsFromToSql)
+ .bind(0, persistenceId)
+ .bind(1, from)
+ .bind(2, to),
+ insertDeleteMarkerStmt(to, connection))
+ }
+ .map(_.head)
+ } else {
+ r2dbcExecutor
+ .updateOne(s"delete [$persistenceId]") { connection =>
+ connection
+ .createStatement(deleteEventsFromToSql)
+ .bind(0, persistenceId)
+ .bind(1, from)
+ .bind(2, to)
+ }
+ }).map(deletedRows =>
+ if (log.isDebugEnabled) {
+ log.debug(
+ "Deleted [{}] events for persistenceId [{}], from seq num [{}] to
[{}]",
+ deletedRows: java.lang.Long,
+ persistenceId,
+ from: java.lang.Long,
+ to: java.lang.Long)
+ })(ExecutionContext.parasitic)
+ }
+
+ def deleteInBatches(from: Long, maxTo: Long): Future[Unit] = {
+ if (from + batchSize > maxTo) {
+ deleteBatch(from, maxTo, lastBatch = true)
+ } else {
+ val to = from + batchSize - 1
+ deleteBatch(from, to, lastBatch = false).flatMap(_ =>
deleteInBatches(to + 1, maxTo))
+ }
+ }
+
+ for {
+ toSeqNr <- highestSeqNrForDelete(persistenceId, toSequenceNr)
+ fromSeqNr <- lowestSequenceNrForDelete(persistenceId, toSeqNr, batchSize)
+ _ <- deleteInBatches(fromSeqNr, toSeqNr)
+ } yield ()
+ }
+
}
diff --git a/docs/src/main/paradox/cleanup.md b/docs/src/main/paradox/cleanup.md
new file mode 100644
index 0000000..9a966d4
--- /dev/null
+++ b/docs/src/main/paradox/cleanup.md
@@ -0,0 +1,55 @@
+# Database Cleanup
+
+## Event Sourced Cleanup Tool
+
+@@@ warning
+
+When running any operation with `EventSourcedCleanup` for a persistence id,
the actor with that persistence id must
+not be running!
+
+@@@
+
+If possible, it is best to keep all events in an event sourced system. That
way new @ref[Projections](projection.md)
+can be re-built.
+
+In some cases keeping all events is not possible or must be removed for
regulatory reasons, such as compliance with
+GDPR. `EventSourcedBehavior`s can automatically snapshot state and delete
events as described in the
+@extref:[Pekko docs](pekko:typed/persistence-snapshot.html#snapshot-deletion).
Snapshotting is useful even if events
+aren't deleted as it speeds up recovery.
+
+Deleting all events immediately when an entity has reached its terminal
deleted state would have the consequence that
+Projections might not have consumed all previous events yet and will not be
notified of the deleted event. Instead, it's
+recommended to emit a final deleted event and store the fact that the entity
is deleted separately via a Projection.
+Then a background task can clean up the events and snapshots for the deleted
entities by using the
+@apidoc[EventSourcedCleanup] tool. The entity itself knows about the terminal
state from the deleted event and should
+not emit further events after that and typically stop itself if it receives
more commands.
+
+@apidoc[EventSourcedCleanup] operations include:
+
+* Delete all events and snapshots for one or many persistence ids
+* Delete all events for one or many persistence ids
+* Delete all snapshots for one or many persistence ids
+* Delete events before snapshot for one or many persistence ids
+
+The cleanup tool can be combined with the @ref[query plugin](./query.md) which
has a query to get all persistence ids.
+
+Java
+: @@snip
[cleanup](/docs/src/test/java/jdocs/home/cleanup/CleanupDocExample.java) {
#cleanup }
+
+Scala
+: @@snip
[cleanup](/docs/src/test/scala/docs/home/cleanup/CleanupDocExample.scala) {
#cleanup }
+
+## Durable State Cleanup Tool
+
+@@@ warning
+
+When running any operation with `DurableStateCleanup` for a persistence id,
the actor with that persistence id must
+not be running!
+
+@@@
+
+@apidoc[DurableStateCleanup] operations include:
+
+* Delete state for one or many persistence ids
+
+The cleanup tool can be combined with the @ref[query plugin](./query.md) which
has a query to get all persistence ids.
diff --git a/docs/src/main/paradox/index.md b/docs/src/main/paradox/index.md
index 599b986..2d94f93 100644
--- a/docs/src/main/paradox/index.md
+++ b/docs/src/main/paradox/index.md
@@ -15,6 +15,7 @@ The Pekko Persistence R2DBC plugin allows for using SQL
database with R2DBC as a
* [Durable State Plugin](durable-state-store.md)
* [Query Plugin](query.md)
* [Projection](projection.md)
+* [Cleanup Tool](cleanup.md)
* [Migration Tool](migration.md)
* [Migration Guides](migration-guides.md)
* [Release Notes](release-notes/index.md)
diff --git a/docs/src/test/java/jdocs/home/cleanup/CleanupDocExample.java
b/docs/src/test/java/jdocs/home/cleanup/CleanupDocExample.java
new file mode 100644
index 0000000..f758941
--- /dev/null
+++ b/docs/src/test/java/jdocs/home/cleanup/CleanupDocExample.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package jdocs.home.cleanup;
+
+import org.apache.pekko.actor.typed.ActorSystem;
+import org.apache.pekko.actor.typed.javadsl.Behaviors;
+
+// #cleanup
+import org.apache.pekko.persistence.query.PersistenceQuery;
+import org.apache.pekko.persistence.query.javadsl.CurrentPersistenceIdsQuery;
+import org.apache.pekko.persistence.r2dbc.cleanup.javadsl.EventSourcedCleanup;
+import org.apache.pekko.persistence.r2dbc.query.javadsl.R2dbcReadJournal;
+
+// #cleanup
+
+public class CleanupDocExample {
+
+ public static void example() {
+
+ ActorSystem<?> system = ActorSystem.create(Behaviors.empty(), "Docs");
+
+ // #cleanup
+ CurrentPersistenceIdsQuery queries =
+ PersistenceQuery.get(system)
+ .getReadJournalFor(CurrentPersistenceIdsQuery.class,
R2dbcReadJournal.Identifier());
+ EventSourcedCleanup cleanup = new EventSourcedCleanup(system);
+
+ // how many persistence ids to operate on in parallel
+ int persistenceIdParallelism = 10;
+
+ // for all persistence ids, delete all events before the snapshot
+ queries
+ .currentPersistenceIds()
+ .mapAsync(persistenceIdParallelism, pid ->
cleanup.cleanupBeforeSnapshot(pid))
+ .run(system);
+
+ // #cleanup
+ }
+}
diff --git a/docs/src/test/scala/docs/home/cleanup/CleanupDocExample.scala
b/docs/src/test/scala/docs/home/cleanup/CleanupDocExample.scala
new file mode 100644
index 0000000..5ecafbc
--- /dev/null
+++ b/docs/src/test/scala/docs/home/cleanup/CleanupDocExample.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package docs.home.cleanup
+
+import org.apache.pekko
+import pekko.actor.typed.ActorSystem
+
+//#cleanup
+import pekko.persistence.query.PersistenceQuery
+import pekko.persistence.query.scaladsl.CurrentPersistenceIdsQuery
+import pekko.persistence.r2dbc.cleanup.scaladsl.EventSourcedCleanup
+import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
+
+//#cleanup
+
+object CleanupDocExample {
+
+ implicit val system: ActorSystem[_] = ???
+
+ // #cleanup
+ val queries =
PersistenceQuery(system).readJournalFor[CurrentPersistenceIdsQuery](R2dbcReadJournal.Identifier)
+ val cleanup = new EventSourcedCleanup(system)
+
+ // how many persistence ids to operate on in parallel
+ val persistenceIdParallelism = 10
+
+ // for all persistence ids, delete all events before the snapshot
+ queries
+ .currentPersistenceIds()
+ .mapAsync(persistenceIdParallelism)(pid =>
cleanup.cleanupBeforeSnapshot(pid))
+ .run()
+
+ // #cleanup
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]