This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/pekko-persistence-cassandra.git
The following commit(s) were added to refs/heads/main by this push:
new 3e21d33 build with pekko 2 (#407)
3e21d33 is described below
commit 3e21d335a30791af16fb60e93978db3632f1fb1e
Author: PJ Fanning <[email protected]>
AuthorDate: Mon May 25 18:00:23 2026 +0100
build with pekko 2 (#407)
* build with pekko 2
* Update build.sbt
* compile issues
* Update EventProcessorStream.scala
* Update EventProcessorStream.scala
* Update CassandraSnapshotStore.scala
* Update build.sbt
* update docs
---
build.sbt | 7 +-
.../pekko/persistence/cassandra/Retries.scala | 4 +-
.../cassandra/query/AllPersistenceIdsStage.scala | 4 +-
.../query/EventsByPersistenceIdStage.scala | 5 +-
.../snapshot/CassandraSnapshotStore.scala | 4 +-
docs/src/main/paradox/journal.md | 88 +++++++++++++++++++++-
docs/src/main/paradox/snapshots.md | 33 +++++++-
.../cassandra/example/EventProcessorStream.scala | 11 +--
project/PekkoConnectorsDependency.scala | 2 +-
project/PekkoCoreDependency.scala | 2 +-
project/PekkoManagementDependency.scala | 2 +-
11 files changed, 139 insertions(+), 23 deletions(-)
diff --git a/build.sbt b/build.sbt
index bfeb188..6897128 100644
--- a/build.sbt
+++ b/build.sbt
@@ -18,6 +18,7 @@ sourceDistIncubating := false
val mimaCompareVersion = "1.0.0"
ThisBuild / reproducibleBuildsCheckResolver := Resolver.ApacheMavenStagingRepo
+ThisBuild / evictionErrorLevel := Level.Info
lazy val root = project
.in(file("."))
@@ -51,7 +52,11 @@ lazy val core = project
"Automatic-Module-Name" -> "pekko.persistence.cassandra"),
mimaReportSignatureProblems := true,
mimaPreviousArtifacts := Set(
- organization.value %% name.value % mimaCompareVersion))
+ organization.value %% name.value % mimaCompareVersion),
+ // following is needed by Agrona lib
+ // https://github.com/aeron-io/agrona/wiki/Change-Log#200-2024-12-17
+ Test / fork := true,
+ Test / javaOptions +=
"--add-opens=java.base/jdk.internal.misc=ALL-UNNAMED")
.configs(MultiJvm)
// Used for testing events by tag in various environments
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/Retries.scala
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/Retries.scala
index 87364b4..650c46a 100644
--- a/core/src/main/scala/org/apache/pekko/persistence/cassandra/Retries.scala
+++ b/core/src/main/scala/org/apache/pekko/persistence/cassandra/Retries.scala
@@ -16,7 +16,7 @@ package org.apache.pekko.persistence.cassandra
import org.apache.pekko
import pekko.actor.Scheduler
import pekko.annotation.InternalApi
-import pekko.pattern.{ after, BackoffSupervisor }
+import pekko.pattern.{ after, RetrySupport }
import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration.FiniteDuration
@@ -58,7 +58,7 @@ private[cassandra] object Retries {
if (maxAttempts == -1 || maxAttempts - attempted != 1) {
tryAttempt().recoverWith {
case NonFatal(exc) =>
- val nextDelay = BackoffSupervisor.calculateDelay(attempted,
minBackoff, maxBackoff, randomFactor)
+ val nextDelay = RetrySupport.calculateDelay(attempted, minBackoff,
maxBackoff, randomFactor)
onFailure(attempted + 1, exc, nextDelay)
after(nextDelay, scheduler) {
retry(attempt, maxAttempts, onFailure, minBackoff, maxBackoff,
randomFactor, attempted + 1)
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/AllPersistenceIdsStage.scala
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/AllPersistenceIdsStage.scala
index 40f4ae8..2f7eab3 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/AllPersistenceIdsStage.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/AllPersistenceIdsStage.scala
@@ -22,7 +22,6 @@ import pekko.stream.{ Attributes, Outlet, SourceShape }
import com.datastax.oss.driver.api.core.CqlSession
import com.datastax.oss.driver.api.core.cql.AsyncResultSet
import com.datastax.oss.driver.api.core.cql.PreparedStatement
-import scala.annotation.nowarn
import scala.collection.immutable.Queue
import scala.concurrent.duration._
@@ -108,7 +107,6 @@ import scala.concurrent.duration._
}
}
- @nowarn("msg=deprecated") // keep compatible with akka 2.5
override def preStart(): Unit = {
query()
refreshInterval.foreach { interval =>
@@ -117,7 +115,7 @@ import scala.concurrent.duration._
(interval / 2) +
ThreadLocalRandom.current().nextLong(interval.toMillis / 2).millis
else interval
- schedulePeriodicallyWithInitialDelay(Continue, initial, interval)
+ scheduleWithFixedDelay(Continue, initial, interval)
}
}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdStage.scala
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdStage.scala
index 48c842d..421ce42 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdStage.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdStage.scala
@@ -24,7 +24,7 @@ import pekko.stream.{ Attributes, Outlet, SourceShape }
import java.lang.{ Long => JLong }
import java.util.concurrent.ThreadLocalRandom
-import scala.annotation.{ nowarn, tailrec }
+import scala.annotation.tailrec
import scala.concurrent.duration.{ FiniteDuration, _ }
import scala.concurrent.{ ExecutionContext, Future, Promise }
import scala.jdk.FutureConverters._
@@ -265,9 +265,8 @@ import scala.util.{ Failure, Success, Try }
}
}
- @nowarn("msg=deprecated")
private def scheduleContinue(initial: FiniteDuration, interval:
FiniteDuration): Unit = {
- schedulePeriodicallyWithInitialDelay(Continue, initial, interval)
+ scheduleWithFixedDelay(Continue, initial, interval)
}
override def postStop(): Unit = {
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/snapshot/CassandraSnapshotStore.scala
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/snapshot/CassandraSnapshotStore.scala
index 7026488..3d8cc98 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/snapshot/CassandraSnapshotStore.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/snapshot/CassandraSnapshotStore.scala
@@ -29,7 +29,7 @@ import pekko.persistence.snapshot.SnapshotStore
import pekko.serialization.{ AsyncSerializer, Serialization,
SerializationExtension, Serializers }
import pekko.stream.connectors.cassandra.scaladsl.{ CassandraSession,
CassandraSessionRegistry }
import pekko.stream.scaladsl.{ Sink, Source }
-import pekko.util.{ unused, OptionVal }
+import pekko.util.OptionVal
import java.lang.{ Long => JLong }
import java.nio.ByteBuffer
@@ -42,7 +42,7 @@ import scala.util.{ Failure, Success }
/**
* INTERNAL API
*/
-@InternalApi private[pekko] class CassandraSnapshotStore(@unused cfg: Config,
cfgPath: String)
+@InternalApi private[pekko] class CassandraSnapshotStore(cfg: Config, cfgPath:
String)
extends SnapshotStore
with ActorLogging {
diff --git a/docs/src/main/paradox/journal.md b/docs/src/main/paradox/journal.md
index 7467051..65ae619 100644
--- a/docs/src/main/paradox/journal.md
+++ b/docs/src/main/paradox/journal.md
@@ -30,12 +30,96 @@ CREATE KEYSPACE IF NOT EXISTS pekko WITH replication =
{'class': 'NetworkTopolog
For local testing, and the default if you enable
`pekko.persistence.cassandra.journal.keyspace-autocreate` you can use the
following:
-@@snip [journal-schema](/target/journal-keyspace.txt) { #journal-keyspace }
+```
+CREATE KEYSPACE IF NOT EXISTS pekko
+WITH REPLICATION = { 'class' : 'SimpleStrategy','replication_factor':1 };
+```
There are multiple tables required. These need to be created before starting
your application.
For local testing you can enable
`pekko.persistence.cassandra.journal.tables-autocreate`. The default table
definitions look like this:
-@@snip [journal-tables](/target/journal-tables.txt) { #journal-tables }
+```
+CREATE TABLE IF NOT EXISTS pekko.messages (
+ persistence_id text,
+ partition_nr bigint,
+ sequence_nr bigint,
+ timestamp timeuuid,
+ timebucket text,
+ writer_uuid text,
+ ser_id int,
+ ser_manifest text,
+ event_manifest text,
+ event blob,
+ meta_ser_id int,
+ meta_ser_manifest text,
+ meta blob,
+ tags set<text>,
+ PRIMARY KEY ((persistence_id, partition_nr), sequence_nr, timestamp))
+ WITH gc_grace_seconds = 864000
+ AND compaction = {
+ 'class' : 'SizeTieredCompactionStrategy',
+ 'enabled' : true,
+ 'tombstone_compaction_interval' : 86400,
+ 'tombstone_threshold' : 0.2,
+ 'unchecked_tombstone_compaction' : false,
+ 'bucket_high' : 1.5,
+ 'bucket_low' : 0.5,
+ 'max_threshold' : 32,
+ 'min_threshold' : 4,
+ 'min_sstable_size' : 50
+ };
+
+CREATE TABLE IF NOT EXISTS pekko.tag_views (
+ tag_name text,
+ persistence_id text,
+ sequence_nr bigint,
+ timebucket bigint,
+ timestamp timeuuid,
+ tag_pid_sequence_nr bigint,
+ writer_uuid text,
+ ser_id int,
+ ser_manifest text,
+ event_manifest text,
+ event blob,
+ meta_ser_id int,
+ meta_ser_manifest text,
+ meta blob,
+ PRIMARY KEY ((tag_name, timebucket), timestamp, persistence_id,
tag_pid_sequence_nr))
+ WITH gc_grace_seconds = 864000
+ AND compaction = {
+ 'class' : 'SizeTieredCompactionStrategy',
+ 'enabled' : true,
+ 'tombstone_compaction_interval' : 86400,
+ 'tombstone_threshold' : 0.2,
+ 'unchecked_tombstone_compaction' : false,
+ 'bucket_high' : 1.5,
+ 'bucket_low' : 0.5,
+ 'max_threshold' : 32,
+ 'min_threshold' : 4,
+ 'min_sstable_size' : 50
+ };
+
+CREATE TABLE IF NOT EXISTS pekko.tag_write_progress(
+ persistence_id text,
+ tag text,
+ sequence_nr bigint,
+ tag_pid_sequence_nr bigint,
+ offset timeuuid,
+ PRIMARY KEY (persistence_id, tag));
+
+CREATE TABLE IF NOT EXISTS pekko.tag_scanning(
+ persistence_id text,
+ sequence_nr bigint,
+ PRIMARY KEY (persistence_id));
+
+CREATE TABLE IF NOT EXISTS pekko.metadata(
+ persistence_id text PRIMARY KEY,
+ deleted_to bigint,
+ properties map<text,text>);
+
+CREATE TABLE IF NOT EXISTS pekko.all_persistence_ids(
+ persistence_id text PRIMARY KEY);
+```
### Messages table
diff --git a/docs/src/main/paradox/snapshots.md
b/docs/src/main/paradox/snapshots.md
index 55d4b8c..682a4db 100644
--- a/docs/src/main/paradox/snapshots.md
+++ b/docs/src/main/paradox/snapshots.md
@@ -26,13 +26,42 @@ CREATE KEYSPACE IF NOT EXISTS pekko_snapshot WITH
replication = {'class': 'Netwo
For local testing, and the default if you enable
`pekko.persistence.cassandra.snapshot.keyspace-autocreate` you can use the
following:
-@@snip [snapshot-keyspace](/target/snapshot-keyspace.txt) { #snapshot-keyspace
}
+```
+CREATE KEYSPACE IF NOT EXISTS pekko_snapshot
+ WITH REPLICATION = { 'class' : 'SimpleStrategy','replication_factor':1 };
+```
A single table is required. This needs to be created before starting your
application.
For local testing you can enable
`pekko.persistence.cassandra.snapshot.tables-autocreate`.
The default table definitions look like this:
-@@snip [snapshot-tables](/target/snapshot-tables.txt) { #snapshot-tables}
+```
+CREATE TABLE IF NOT EXISTS pekko_snapshot.snapshots (
+ persistence_id text,
+ sequence_nr bigint,
+ timestamp bigint,
+ ser_id int,
+ ser_manifest text,
+ snapshot_data blob,
+ snapshot blob,
+ meta_ser_id int,
+ meta_ser_manifest text,
+ meta blob,
+ PRIMARY KEY (persistence_id, sequence_nr))
+ WITH CLUSTERING ORDER BY (sequence_nr DESC) AND gc_grace_seconds = 864000
+ AND compaction = {
+ 'class' : 'SizeTieredCompactionStrategy',
+ 'enabled' : true,
+ 'tombstone_compaction_interval' : 86400,
+ 'tombstone_threshold' : 0.2,
+ 'unchecked_tombstone_compaction' : false,
+ 'bucket_high' : 1.5,
+ 'bucket_low' : 0.5,
+ 'max_threshold' : 32,
+ 'min_threshold' : 4,
+ 'min_sstable_size' : 50
+ };
+```
### ScyllaDB
diff --git
a/example/src/main/scala/org/apache/pekko/persistence/cassandra/example/EventProcessorStream.scala
b/example/src/main/scala/org/apache/pekko/persistence/cassandra/example/EventProcessorStream.scala
index 874b469..02ce548 100644
---
a/example/src/main/scala/org/apache/pekko/persistence/cassandra/example/EventProcessorStream.scala
+++
b/example/src/main/scala/org/apache/pekko/persistence/cassandra/example/EventProcessorStream.scala
@@ -12,15 +12,15 @@ package org.apache.pekko.persistence.cassandra.example
import org.apache.pekko
import pekko.{ Done, NotUsed }
import pekko.actor.typed.ActorSystem
+import pekko.actor.typed.scaladsl.LoggerOps
import pekko.persistence.cassandra.query.scaladsl.CassandraReadJournal
import pekko.persistence.query.{ Offset, PersistenceQuery, TimeBasedUUID }
import pekko.persistence.typed.PersistenceId
-import pekko.stream.SharedKillSwitch
+import pekko.stream.{ RestartSettings, SharedKillSwitch }
import pekko.stream.connectors.cassandra.scaladsl.CassandraSessionRegistry
import pekko.stream.scaladsl.{ RestartSource, Sink, Source }
import com.datastax.oss.driver.api.core.cql.{ PreparedStatement, Row }
import org.slf4j.{ Logger, LoggerFactory }
-import pekko.actor.typed.scaladsl.LoggerOps
import org.HdrHistogram.Histogram
import scala.concurrent.{ ExecutionContext, Future }
@@ -34,16 +34,17 @@ class EventProcessorStream[Event: ClassTag](
tag: String) {
protected val log: Logger = LoggerFactory.getLogger(getClass)
- implicit val sys: ActorSystem[_] = system
- implicit val ec: ExecutionContext = executionContext
+ private implicit val sys: ActorSystem[_] = system
+ private implicit val ec: ExecutionContext = executionContext
private val session =
CassandraSessionRegistry(system).sessionFor("pekko.persistence.cassandra")
private val query =
PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
def runQueryStream(killSwitch: SharedKillSwitch, histogram: Histogram): Unit
= {
+ val restartSettings = RestartSettings(minBackoff = 500.millis, maxBackoff
= 20.seconds, randomFactor = 0.1)
RestartSource
- .withBackoff(minBackoff = 500.millis, maxBackoff = 20.seconds,
randomFactor = 0.1) { () =>
+ .withBackoff(restartSettings) { () =>
Source.futureSource {
readOffset().map { offset =>
log.infoN("Starting stream for tag [{}] from offset [{}]", tag,
offset)
diff --git a/project/PekkoConnectorsDependency.scala
b/project/PekkoConnectorsDependency.scala
index 7fe187e..7974cf5 100644
--- a/project/PekkoConnectorsDependency.scala
+++ b/project/PekkoConnectorsDependency.scala
@@ -20,5 +20,5 @@ import com.github.pjfanning.pekkobuild.PekkoDependency
object PekkoConnectorsDependency extends PekkoDependency {
override val checkProject: String = "pekko-connectors-cassandra"
override val module: Option[String] = Some("connectors")
- override val currentVersion: String = "1.3.0"
+ override val currentVersion: String = "2.0.0-M1"
}
diff --git a/project/PekkoCoreDependency.scala
b/project/PekkoCoreDependency.scala
index ed3ab68..37903c6 100644
--- a/project/PekkoCoreDependency.scala
+++ b/project/PekkoCoreDependency.scala
@@ -20,5 +20,5 @@ import com.github.pjfanning.pekkobuild.PekkoDependency
object PekkoCoreDependency extends PekkoDependency {
override val checkProject: String = "pekko-cluster-sharding-typed"
override val module: Option[String] = None
- override val currentVersion: String = "1.5.0"
+ override val currentVersion: String = "2.0.0-M2"
}
diff --git a/project/PekkoManagementDependency.scala
b/project/PekkoManagementDependency.scala
index e1a4c81..1e1a76d 100644
--- a/project/PekkoManagementDependency.scala
+++ b/project/PekkoManagementDependency.scala
@@ -20,5 +20,5 @@ import com.github.pjfanning.pekkobuild.PekkoDependency
object PekkoManagementDependency extends PekkoDependency {
override val checkProject: String = "pekko-discovery-aws-api-async"
override val module: Option[String] = Some("management")
- override val currentVersion: String = "1.2.1"
+ override val currentVersion: String = "2.0.0-M1"
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]