This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/incubator-pekko-persistence-cassandra.git
The following commit(s) were added to refs/heads/main by this push:
new 8a10b77 scala3 support (#85)
8a10b77 is described below
commit 8a10b77dacd9670703a174682456d7d02c84abb8
Author: PJ Fanning <[email protected]>
AuthorDate: Sat Aug 19 12:23:29 2023 +0100
scala3 support (#85)
* scala3 support
* Update unit-tests.yml
---
.github/workflows/unit-tests.yml | 2 ++
.../pekko/persistence/cassandra/EventsByTagMigration.scala | 2 +-
.../apache/pekko/persistence/cassandra/cleanup/Cleanup.scala | 2 +-
.../cassandra/healthcheck/CassandraHealthCheck.scala | 2 +-
.../pekko/persistence/cassandra/journal/CassandraJournal.scala | 8 ++++----
.../persistence/cassandra/journal/CassandraTagRecovery.scala | 2 +-
.../pekko/persistence/cassandra/journal/TagWriters.scala | 10 +++++-----
.../cassandra/query/CassandraReadJournalProvider.scala | 10 +++++++---
.../pekko/persistence/cassandra/query/EventsByTagStage.scala | 6 +++---
.../cassandra/query/TagViewSequenceNumberScanner.scala | 2 +-
.../cassandra/query/scaladsl/CassandraReadJournal.scala | 5 +++--
.../cassandra/snapshot/CassandraSnapshotStore.scala | 4 ++--
.../persistence/cassandra/CassandraEventsByTagLoadSpec.scala | 2 +-
.../cassandra/journal/CassandraEventUpdateSpec.scala | 2 +-
.../persistence/cassandra/journal/CassandraLoadSpec.scala | 4 ++--
.../apache/pekko/persistence/cassandra/query/TestActor.scala | 2 +-
project/Dependencies.scala | 6 +++---
17 files changed, 39 insertions(+), 32 deletions(-)
diff --git a/.github/workflows/unit-tests.yml b/.github/workflows/unit-tests.yml
index 0f27e3e..b4fe285 100644
--- a/.github/workflows/unit-tests.yml
+++ b/.github/workflows/unit-tests.yml
@@ -21,8 +21,10 @@ jobs:
matrix:
include:
- { javaVersion: '8', container: "cassandra-latest", scalaVersion:
"++2.13.11", test: "test" }
+ - { javaVersion: '8', container: "cassandra-latest", scalaVersion:
"++3.3.0", test: "test" }
- { javaVersion: '11', container: "cassandra-latest", scalaVersion:
"++2.12.18", test: "test" }
- { javaVersion: '11', container: "cassandra-latest", scalaVersion:
"++2.13.11", test: "test" }
+ - { javaVersion: '8', container: "cassandra-latest", scalaVersion:
"++3.3.0", test: "test" }
- { javaVersion: '11', container: "cassandra2", scalaVersion:
"++2.13.11", test: "'testOnly -- -l RequiresCassandraThree'"}
- { javaVersion: '11', container: "cassandra3", scalaVersion:
"++2.13.11", test: "test" }
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/EventsByTagMigration.scala
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/EventsByTagMigration.scala
index 44ff295..148a373 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/EventsByTagMigration.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/EventsByTagMigration.scala
@@ -84,7 +84,7 @@ class EventsByTagMigration(
systemProvider: ClassicActorSystemProvider,
pluginConfigPath: String = "pekko.persistence.cassandra") {
private val system = systemProvider.classicSystem
- private[pekko] val log = Logging.getLogger(system, getClass)
+ private[pekko] val log = Logging.getLogger(system,
classOf[EventsByTagMigration])
private lazy val queries =
PersistenceQuery(system).readJournalFor[CassandraReadJournal](pluginConfigPath
+ ".query")
private implicit val sys: ActorSystem = system
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/cleanup/Cleanup.scala
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/cleanup/Cleanup.scala
index 8525aa3..158bd5d 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/cleanup/Cleanup.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/cleanup/Cleanup.scala
@@ -62,7 +62,7 @@ final class Cleanup(systemProvider:
ClassicActorSystemProvider, settings: Cleanu
import settings._
import system.dispatcher
- private val log = Logging(system, getClass)
+ private val log = Logging(system, classOf[Cleanup])
// operations on journal, snapshotStore and tagViews should be only be done
when dry-run = false
private val journal: ActorRef =
Persistence(system).journalFor(pluginLocation + ".journal")
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/healthcheck/CassandraHealthCheck.scala
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/healthcheck/CassandraHealthCheck.scala
index 13e0740..6e7be41 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/healthcheck/CassandraHealthCheck.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/healthcheck/CassandraHealthCheck.scala
@@ -27,7 +27,7 @@ import scala.util.control.NonFatal
final class CassandraHealthCheck(system: ActorSystem) extends (() =>
Future[Boolean]) {
- private val log = Logging.getLogger(system, getClass)
+ private val log = Logging.getLogger(system, classOf[CassandraHealthCheck])
private val settings = new PluginSettings(system,
system.settings.config.getConfig("pekko.persistence.cassandra"))
private val healthCheckSettings = settings.healthCheckSettings
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraJournal.scala
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraJournal.scala
index e29ef34..0fa4028 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraJournal.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraJournal.scala
@@ -75,7 +75,7 @@ import scala.util.{ Failure, Success, Try }
private val statements: CassandraStatements = new
CassandraStatements(settings)
private val healthCheckCql = settings.healthCheckSettings.healthCheckCql
private val serialization = SerializationExtension(context.system)
- private val log: LoggingAdapter = Logging(context.system, getClass)
+ private val log: LoggingAdapter = Logging(context.system,
classOf[CassandraJournal])
private implicit val ec: ExecutionContext = context.dispatcher
@@ -266,7 +266,7 @@ import scala.util.{ Failure, Success, Try }
writeInProgress.put(pid, writeInProgressForPersistentId.future)
val toReturn: Future[Nil.type] = Future.sequence(writesWithUuids.map(w =>
serialize(w))).flatMap {
- serialized: Seq[SerializedAtomicWrite] =>
+ (serialized: Seq[SerializedAtomicWrite]) =>
val result: Future[Any] =
if (messages.map(_.payload.size).sum <=
journalSettings.maxMessageBatchSize) {
// optimize for the common case
@@ -397,7 +397,7 @@ import scala.util.{ Failure, Success, Try }
maxPnr - minPnr <= 1,
"Do not support AtomicWrites that span 3 partitions. Keep AtomicWrites
<= max partition size.")
- val writes: Seq[Future[BoundStatement]] = all.map { m: Serialized =>
+ val writes: Seq[Future[BoundStatement]] = all.map { (m: Serialized) =>
// using two separate statements with or without the meta data columns
because
// then users doesn't have to alter table and add the new columns if
they don't use
// the meta data feature
@@ -868,7 +868,7 @@ import scala.util.{ Failure, Success, Try }
class EventDeserializer(system: ActorSystem) {
- private val log = Logging(system, this.getClass)
+ private val log = Logging(system, classOf[CassandraJournal])
private val serialization = SerializationExtension(system)
val columnDefinitionCache = new ColumnDefinitionCache
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraTagRecovery.scala
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraTagRecovery.scala
index 559d3c4..5252bbd 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraTagRecovery.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraTagRecovery.scala
@@ -57,7 +57,7 @@ import
pekko.stream.connectors.cassandra.scaladsl.CassandraSession
private val serialization = SerializationExtension(system)
// used for local asks
- private implicit val timeout = Timeout(10.second)
+ private implicit val timeout: Timeout = Timeout(10.second)
import statements._
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/TagWriters.scala
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/TagWriters.scala
index 89eda49..71534a1 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/TagWriters.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/TagWriters.scala
@@ -14,11 +14,10 @@
package org.apache.pekko.persistence.cassandra.journal
import scala.collection.immutable
+import scala.concurrent.Promise
import java.lang.{ Integer => JInt, Long => JLong }
import java.net.URLEncoder
import java.util.UUID
-
-import scala.concurrent.Promise
import org.apache.pekko
import pekko.Done
import pekko.actor.SupervisorStrategy.Escalate
@@ -37,7 +36,6 @@ import pekko.dispatch.ExecutionContexts
import pekko.event.LoggingAdapter
import pekko.persistence.cassandra.journal.CassandraJournal._
import pekko.persistence.cassandra.journal.TagWriter._
-import pekko.persistence.cassandra.journal.TagWriters._
import pekko.stream.connectors.cassandra.scaladsl.CassandraSession
import pekko.util.ByteString
import pekko.util.Timeout
@@ -180,11 +178,13 @@ import scala.util.Try
* INTERNAL API
* Manages all the tag writers.
*/
-@InternalApi private[pekko] class TagWriters(settings: TagWriterSettings,
tagWriterSession: TagWritersSession)
+@InternalApi private[pekko] class TagWriters(settings: TagWriterSettings,
+ tagWriterSession: TagWriters.TagWritersSession)
extends Actor
with Timers
with ActorLogging {
+ import TagWriters._
import context.dispatcher
// eager init and val because used from Future callbacks
@@ -210,7 +210,7 @@ import scala.util.Try
scheduleWriteTagScanningTick()
def receive: Receive = {
- case FlushAllTagWriters(t) =>
+ case FlushAllTagWriters(t: Timeout) =>
implicit val timeout: Timeout = t
if (log.isDebugEnabled)
log.debug("Flushing all tag writers [{}]",
tagActors.keySet.mkString(", "))
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/CassandraReadJournalProvider.scala
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/CassandraReadJournalProvider.scala
index 646f922..33f09c8 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/CassandraReadJournalProvider.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/CassandraReadJournalProvider.scala
@@ -21,10 +21,14 @@ import com.typesafe.config.Config
class CassandraReadJournalProvider(system: ExtendedActorSystem, config:
Config, configPath: String)
extends ReadJournalProvider {
- override val scaladslReadJournal: scaladsl.CassandraReadJournal =
+ private val readJournalScala: scaladsl.CassandraReadJournal =
new scaladsl.CassandraReadJournal(system, config, configPath)
- override val javadslReadJournal: javadsl.CassandraReadJournal =
- new javadsl.CassandraReadJournal(scaladslReadJournal)
+ private val readJournalJava: javadsl.CassandraReadJournal =
+ new javadsl.CassandraReadJournal(readJournalScala)
+
+ override def scaladslReadJournal(): scaladsl.CassandraReadJournal =
readJournalScala
+
+ override def javadslReadJournal(): javadsl.CassandraReadJournal =
readJournalJava
}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagStage.scala
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagStage.scala
index 1ef6a12..d682cbd 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagStage.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagStage.scala
@@ -24,7 +24,6 @@ import
pekko.persistence.cassandra.EventsByTagSettings.RetrySettings
import pekko.persistence.cassandra._
import pekko.persistence.cassandra.journal.CassandraJournal._
import pekko.persistence.cassandra.journal.TimeBucket
-import pekko.persistence.cassandra.query.EventsByTagStage._
import
pekko.persistence.cassandra.query.scaladsl.CassandraReadJournal.EventByTagStatements
import pekko.stream.stage._
import pekko.stream.{ Attributes, Outlet, SourceShape }
@@ -212,7 +211,7 @@ import scala.util.{ Failure, Success, Try }
/** INTERNAL API */
@InternalApi private[pekko] class EventsByTagStage(
- session: TagStageSession,
+ session: EventsByTagStage.TagStageSession,
initialQueryOffset: UUID,
toOffset: Option[UUID],
settings: PluginSettings,
@@ -221,8 +220,9 @@ import scala.util.{ Failure, Success, Try }
usingOffset: Boolean,
initialTagPidSequenceNrs: Map[PersistenceId, (TagPidSequenceNr, UUID)],
scanner: TagViewSequenceNumberScanner)
- extends GraphStage[SourceShape[UUIDRow]] {
+ extends GraphStage[SourceShape[EventsByTagStage.UUIDRow]] {
+ import EventsByTagStage._
import settings.{ eventsByTagSettings, querySettings }
private val out: Outlet[UUIDRow] = Outlet("event.out")
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/TagViewSequenceNumberScanner.scala
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/TagViewSequenceNumberScanner.scala
index 4506681..f5f1e8e 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/TagViewSequenceNumberScanner.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/TagViewSequenceNumberScanner.scala
@@ -62,7 +62,7 @@ import pekko.stream.scaladsl.Sink
@InternalApi private[pekko] class TagViewSequenceNumberScanner(session:
Session, pluginDispatcher: String)(
implicit materializer: Materializer,
@nowarn("msg=never used") ec: ExecutionContext) {
- private val log = Logging(materializer.system, getClass)
+ private val log = Logging(materializer.system,
classOf[TagViewSequenceNumberScanner])
/**
* This could be its own stage and return half way through a query to better
meet the deadline
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala
index 9fc41bc..97c25fe 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala
@@ -20,6 +20,7 @@ import com.typesafe.config.Config
import org.apache.pekko
import pekko.actor.{ ActorSystem, ExtendedActorSystem }
import pekko.annotation.InternalApi
+import pekko.dispatch.MessageDispatcher
import pekko.event.Logging
import pekko.persistence.cassandra.Extractors.Extractor
import pekko.persistence.cassandra.{ CassandraStatements, Extractors,
PluginSettings }
@@ -113,7 +114,7 @@ class CassandraReadJournal protected (
import CassandraReadJournal.CombinedEventsByPersistenceIdStmts
- private val log = Logging.getLogger(system, getClass)
+ private val log = Logging.getLogger(system, classOf[CassandraReadJournal])
private val settings = new PluginSettings(system, sharedConfig)
private val statements = new CassandraStatements(settings)
@@ -139,7 +140,7 @@ class CassandraReadJournal protected (
new CassandraJournal.EventDeserializer(system)
private val serialization = SerializationExtension(system)
- implicit private val ec =
+ implicit private val ec: MessageDispatcher =
system.dispatchers.lookup(querySettings.pluginDispatcher)
implicit private val sys: ActorSystem = system
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/snapshot/CassandraSnapshotStore.scala
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/snapshot/CassandraSnapshotStore.scala
index 484c4cb..4835ae1 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/snapshot/CassandraSnapshotStore.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/snapshot/CassandraSnapshotStore.scala
@@ -215,7 +215,7 @@ import scala.util.{ Failure, Success }
// this meta query gets slower than slower if snapshots are deleted
without a criteria.minSequenceNr as
// all previous tombstones are scanned in the meta data query
metadata(snapshotMetaPs, persistenceId, criteria, limit =
None).flatMap {
- mds: immutable.Seq[SnapshotMetadata] =>
+ (mds: immutable.Seq[SnapshotMetadata]) =>
val boundStatementBatches = mds
.map(md =>
preparedDeleteSnapshot.map(_.bind(md.persistenceId,
md.sequenceNr: JLong)
@@ -306,7 +306,7 @@ import scala.util.{ Failure, Success }
@InternalApi
private[pekko] class SnapshotSerialization(system: ActorSystem)(implicit val
ec: ExecutionContext) {
- private val log = Logging(system, this.getClass)
+ private val log = Logging(system, classOf[SnapshotSerialization])
private val serialization = SerializationExtension(system)
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/CassandraEventsByTagLoadSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/CassandraEventsByTagLoadSpec.scala
index 3e20413..3174ac1 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/CassandraEventsByTagLoadSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/CassandraEventsByTagLoadSpec.scala
@@ -87,7 +87,7 @@ class CassandraEventsByTagLoadSpec extends
CassandraSpec(CassandraEventsByTagLoa
var allReceived: Map[String, List[Long]] =
Map.empty.withDefaultValue(List.empty)
probe.request(messagesPerPersistenceId * nrPersistenceIds)
- (1L to (messagesPerPersistenceId * nrPersistenceIds)).foreach { i: Long =>
+ (1L to (messagesPerPersistenceId * nrPersistenceIds)).foreach { (i: Long)
=>
val event =
try {
probe.expectNext(veryLongWait)
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/journal/CassandraEventUpdateSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/journal/CassandraEventUpdateSpec.scala
index 698290b..7b4d5a3 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/journal/CassandraEventUpdateSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/journal/CassandraEventUpdateSpec.scala
@@ -38,7 +38,7 @@ object CassandraEventUpdateSpec {
class CassandraEventUpdateSpec extends
CassandraSpec(CassandraEventUpdateSpec.config) { s =>
- private[pekko] val log = Logging(system, getClass)
+ private[pekko] val log = Logging(system, classOf[CassandraEventUpdateSpec])
private val serialization = SerializationExtension(system)
val updater = new CassandraEventUpdate {
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/journal/CassandraLoadSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/journal/CassandraLoadSpec.scala
index b948d62..19987a5 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/journal/CassandraLoadSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/journal/CassandraLoadSpec.scala
@@ -64,8 +64,8 @@ object CassandraLoadSpec {
override def receiveRecover: Receive = onEvent
override def receiveCommand: Receive = {
- case c @ "start" => onStart(c)
- case c @ "stop" => onStop(c)
+ case "start" => onStart("start")
+ case "stop" => onStop("stop")
case payload: String => onCommand(payload)
}
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/TestActor.scala
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/TestActor.scala
index 7efb011..f0a1c97 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/TestActor.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/TestActor.scala
@@ -52,7 +52,7 @@ class TestActor(override val persistenceId: String, override
val journalPluginId
val size = events.size
val handler = {
var count = 0
- evt: String => {
+ (_: String) => {
count += 1
if (count == size)
sender() ! "PersistAll-done"
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 6f37d47..6680a33 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -13,8 +13,8 @@ object Dependencies {
// keep in sync with .github/workflows/unit-tests.yml
val scala212Version = "2.12.18"
val scala213Version = "2.13.11"
- val scala3Version = "3.1.2" // not yet enabled - missing
pekko-http/pekko-management Scala 3 artifacts
- val scalaVersions = Seq(scala212Version, scala213Version)
+ val scala3Version = "3.3.0"
+ val scalaVersions = Seq(scala212Version, scala213Version, scala3Version)
val pekkoVersion = System.getProperty("override.pekko.version", "1.0.1")
val pekkoVersionInDocs = "current"
@@ -24,7 +24,7 @@ object Dependencies {
val driverVersion = "4.15.0"
val driverVersionInDocs = "4.14"
- val pekkoConnectorsVersion = "0.0.0+144-703e9cca-SNAPSHOT"
+ val pekkoConnectorsVersion = "0.0.0+172-784827a8-SNAPSHOT"
val pekkoConnectorsVersionInDocs = "current"
// for example
val pekkoManagementVersion = "1.0.0"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]