This is an automated email from the ASF dual-hosted git repository.
mdedetrich pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/incubator-pekko-persistence-jdbc.git
The following commit(s) were added to refs/heads/main by this push:
new 86923ec Preparation for Scala 3
86923ec is described below
commit 86923ecff2543ea3e31cd8b794fb184bb3957bef
Author: Matthew de Detrich <[email protected]>
AuthorDate: Thu May 25 09:21:43 2023 +0200
Preparation for Scala 3
---
.../slick.backwards.excludes | 7 +++++
.../jdbc/journal/dao/JournalTables.scala | 8 ++++++
.../jdbc/journal/dao/legacy/package.scala | 4 +++
.../jdbc/query/JdbcReadJournalProvider.scala | 5 ++--
.../query/dao/legacy/ByteArrayReadJournalDao.scala | 3 ++-
.../jdbc/snapshot/dao/SnapshotTables.scala | 4 +++
.../jdbc/snapshot/dao/legacy/SnapshotTables.scala | 5 ++++
.../jdbc/state/DurableStateTables.scala | 4 +++
.../jdbc/state/JdbcDurableStateStoreProvider.scala | 4 +--
.../state/scaladsl/DurableStateSequenceActor.scala | 10 ++++----
.../state/scaladsl/JdbcDurableStateStore.scala | 22 ++++++++--------
.../pekko/persistence/jdbc/CrossEventually.scala | 30 ++++++++++++++++++++++
.../pekko/persistence/jdbc/CrossEventually.scala | 29 +++++++++++++++++++++
.../jdbc/SharedActorSystemTestSpec.scala | 2 +-
.../apache/pekko/persistence/jdbc/SimpleSpec.scala | 4 +--
.../jdbc/journal/JdbcJournalPerfSpec.scala | 3 ++-
.../persistence/jdbc/journal/JdbcJournalSpec.scala | 3 ++-
.../query/JournalDaoStreamMessagesMemoryTest.scala | 4 +--
.../jdbc/query/JournalSequenceActorTest.scala | 12 ++++-----
.../StoreOnlySerializableMessagesTest.scala | 2 +-
.../jdbc/snapshot/JdbcSnapshotStoreSpec.scala | 4 ++-
.../scaladsl/DurableStateSequenceActorTest.scala | 7 +++--
.../jdbc/state/scaladsl/JdbcDurableStateSpec.scala | 6 +++--
.../jdbc/state/scaladsl/StateSpecBase.scala | 4 ++-
.../jdbc/migrator/JournalMigrator.scala | 7 ++---
.../jdbc/migrator/SnapshotMigrator.scala | 4 +--
26 files changed, 150 insertions(+), 47 deletions(-)
diff --git
a/core/src/main/mima-filters/1.1.x.backwards.excludes/slick.backwards.excludes
b/core/src/main/mima-filters/1.1.x.backwards.excludes/slick.backwards.excludes
new file mode 100644
index 0000000..6920ffb
--- /dev/null
+++
b/core/src/main/mima-filters/1.1.x.backwards.excludes/slick.backwards.excludes
@@ -0,0 +1,7 @@
+ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.persistence.jdbc.journal.dao.JournalTables$JournalPekkoSerializationRow$")
+ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.persistence.jdbc.journal.dao.JournalTables$TagRow$")
+ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.persistence.jdbc.journal.dao.legacy.package$JournalRow$")
+ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.persistence.jdbc.snapshot.dao.SnapshotTables$SnapshotRow$")
+ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.persistence.jdbc.snapshot.dao.legacy.SnapshotTables$SnapshotRow$")
+ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.persistence.jdbc.state.DurableStateTables$DurableStateRow$")
+ProblemFilters.exclude[FinalMethodProblem]("org.apache.pekko.persistence.jdbc.state.scaladsl.JdbcDurableStateStore.queries")
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/JournalTables.scala
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/JournalTables.scala
index a8f6a7f..382cb4e 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/JournalTables.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/JournalTables.scala
@@ -39,7 +39,15 @@ object JournalTables {
metaSerId: Option[Int],
metaSerManifest: Option[String])
+ object JournalPekkoSerializationRow {
+ def tupled = (apply _).tupled
+ }
+
case class TagRow(eventId: Long, tag: String)
+
+ object TagRow {
+ def tupled = (apply _).tupled
+ }
}
/**
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/legacy/package.scala
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/legacy/package.scala
index 52d699f..f3eaf6e 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/legacy/package.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/legacy/package.scala
@@ -25,6 +25,10 @@ package object legacy {
message: Array[Byte],
tags: Option[String] = None)
+ object JournalRow {
+ def tupled = (apply _).tupled
+ }
+
def encodeTags(tags: Set[String], separator: String): Option[String] =
if (tags.isEmpty) None else Option(tags.mkString(separator))
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/JdbcReadJournalProvider.scala
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/JdbcReadJournalProvider.scala
index 96cf9ec..bb6520c 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/JdbcReadJournalProvider.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/JdbcReadJournalProvider.scala
@@ -21,7 +21,8 @@ import com.typesafe.config.Config
class JdbcReadJournalProvider(system: ExtendedActorSystem, config: Config,
configPath: String)
extends ReadJournalProvider {
- override val scaladslReadJournal = new scaladsl.JdbcReadJournal(config,
configPath)(system)
+ override def scaladslReadJournal(): scaladsl.JdbcReadJournal =
+ new scaladsl.JdbcReadJournal(config, configPath)(system)
- override val javadslReadJournal = new
javadsl.JdbcReadJournal(scaladslReadJournal)
+ override def javadslReadJournal(): javadsl.JdbcReadJournal = new
javadsl.JdbcReadJournal(scaladslReadJournal())
}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/legacy/ByteArrayReadJournalDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/legacy/ByteArrayReadJournalDao.scala
index 030c187..42a00d0 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/legacy/ByteArrayReadJournalDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/legacy/ByteArrayReadJournalDao.scala
@@ -122,7 +122,8 @@ trait OracleReadJournalDao extends ReadJournalDao {
}
}
- implicit val getJournalRow = GetResult(r => JournalRow(r.<<, r.<<, r.<<,
r.<<, r.nextBytes(), r.<<))
+ implicit val getJournalRow: GetResult[JournalRow] =
+ GetResult(r => JournalRow(r.<<, r.<<, r.<<, r.<<, r.nextBytes(), r.<<))
abstract override def eventsByTag(
tag: String,
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/snapshot/dao/SnapshotTables.scala
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/snapshot/dao/SnapshotTables.scala
index a7ea40c..ee9987e 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/snapshot/dao/SnapshotTables.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/snapshot/dao/SnapshotTables.scala
@@ -31,6 +31,10 @@ object SnapshotTables {
metaSerId: Option[Int],
metaSerManifest: Option[String],
metaPayload: Option[Array[Byte]])
+
+ object SnapshotRow {
+ def tupled = (apply _).tupled
+ }
}
trait SnapshotTables {
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/snapshot/dao/legacy/SnapshotTables.scala
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/snapshot/dao/legacy/SnapshotTables.scala
index b6f17ea..847baf9 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/snapshot/dao/legacy/SnapshotTables.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/snapshot/dao/legacy/SnapshotTables.scala
@@ -22,6 +22,11 @@ import slick.jdbc.JdbcProfile
object SnapshotTables {
case class SnapshotRow(persistenceId: String, sequenceNumber: Long, created:
Long, snapshot: Array[Byte])
+
+ object SnapshotRow {
+ def tupled = (apply _).tupled
+ }
+
def isOracleDriver(profile: JdbcProfile): Boolean =
profile match {
case _: slick.jdbc.OracleProfile => true
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/DurableStateTables.scala
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/DurableStateTables.scala
index 1b49dbb..32320d3 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/DurableStateTables.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/DurableStateTables.scala
@@ -31,6 +31,10 @@ import
pekko.persistence.jdbc.config.DurableStateTableConfiguration
stateSerId: Int,
stateSerManifest: Option[String],
stateTimestamp: Long)
+
+ object DurableStateRow {
+ def tupled = (apply _).tupled
+ }
}
/**
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/JdbcDurableStateStoreProvider.scala
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/JdbcDurableStateStoreProvider.scala
index 756b8de..352ff37 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/JdbcDurableStateStoreProvider.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/JdbcDurableStateStoreProvider.scala
@@ -43,10 +43,10 @@ class JdbcDurableStateStoreProvider[A](system:
ExtendedActorSystem) extends Dura
lazy val serialization = SerializationExtension(system)
val profile: JdbcProfile = slickDb.profile
- override val scaladslDurableStateStore: DurableStateStore[Any] =
+ override def scaladslDurableStateStore(): DurableStateStore[Any] =
new scaladsl.JdbcDurableStateStore[Any](db, profile, durableStateConfig,
serialization)(system)
- override val javadslDurableStateStore: JDurableStateStore[AnyRef] =
+ override def javadslDurableStateStore(): JDurableStateStore[AnyRef] =
new javadsl.JdbcDurableStateStore[AnyRef](
profile,
durableStateConfig,
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/DurableStateSequenceActor.scala
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/DurableStateSequenceActor.scala
index e11a595..1876145 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/DurableStateSequenceActor.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/DurableStateSequenceActor.scala
@@ -19,6 +19,7 @@ import pekko.actor.{ Actor, ActorLogging, Props, Status,
Timers }
import pekko.pattern.pipe
import pekko.persistence.jdbc.config.DurableStateSequenceRetrievalConfig
import pekko.persistence.jdbc.MissingElements
+import pekko.persistence.jdbc.state.scaladsl
import pekko.stream.Materializer
import pekko.stream.scaladsl.Sink
import scala.concurrent.duration.FiniteDuration
@@ -29,8 +30,8 @@ import pekko.annotation.InternalApi
* INTERNAL API
*/
@InternalApi private[pekko] object DurableStateSequenceActor {
- def props[A](stateStore: JdbcDurableStateStore[A], config:
DurableStateSequenceRetrievalConfig)(
- implicit materializer: Materializer): Props = Props(new
DurableStateSequenceActor(stateStore, config))
+ def props[A](stateStore: scaladsl.JdbcDurableStateStore[A], config:
DurableStateSequenceRetrievalConfig)(
+ implicit materializer: Materializer): Props = Props(new
scaladsl.DurableStateSequenceActor(stateStore, config))
case class VisitedElement(pid: PersistenceId, offset: GlobalOffset,
revision: Revision) {
override def toString = s"($pid, $offset, $revision)"
@@ -86,9 +87,8 @@ import pekko.annotation.InternalApi
*
* INTERNAL API
*/
-@InternalApi
-private[pekko] class DurableStateSequenceActor[A](
- stateStore: JdbcDurableStateStore[A],
+@InternalApi private[pekko] class DurableStateSequenceActor[A](
+ stateStore: scaladsl.JdbcDurableStateStore[A],
config: DurableStateSequenceRetrievalConfig)(implicit materializer:
Materializer)
extends Actor
with ActorLogging
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala
index b06d4ab..e0795e4 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala
@@ -23,19 +23,19 @@ import org.apache.pekko
import pekko.{ Done, NotUsed }
import pekko.actor.ExtendedActorSystem
import pekko.pattern.ask
-import pekko.persistence.state.scaladsl.{ DurableStateUpdateStore,
GetObjectResult }
import pekko.persistence.jdbc.PekkoSerialization
import pekko.persistence.jdbc.state.DurableStateQueries
import pekko.persistence.jdbc.config.DurableStateTableConfiguration
import pekko.persistence.jdbc.state.{ DurableStateTables, OffsetSyntax }
import pekko.persistence.query.{ DurableStateChange, Offset }
-import pekko.persistence.query.scaladsl.DurableStateStoreQuery
import pekko.persistence.jdbc.journal.dao.FlowControl
+import pekko.persistence.jdbc.state.{ scaladsl => jdbcStateScalaDsl }
+import pekko.persistence.state.{ scaladsl => stateScalaDsl }
+import pekko.persistence.query.{ scaladsl => queryScalaDsl }
import pekko.serialization.Serialization
import pekko.stream.scaladsl.{ Sink, Source }
import pekko.stream.{ Materializer, SystemMaterializer }
import pekko.util.Timeout
-import DurableStateSequenceActor._
import OffsetSyntax._
import pekko.annotation.ApiMayChange
import pekko.persistence.query.UpdatedDurableState
@@ -53,31 +53,33 @@ class JdbcDurableStateStore[A](
val profile: JdbcProfile,
durableStateConfig: DurableStateTableConfiguration,
serialization: Serialization)(implicit val system: ExtendedActorSystem)
- extends DurableStateUpdateStore[A]
- with DurableStateStoreQuery[A] {
+ extends stateScalaDsl.DurableStateUpdateStore[A]
+ with queryScalaDsl.DurableStateStoreQuery[A] {
+ import jdbcStateScalaDsl.DurableStateSequenceActor._
import FlowControl._
import profile.api._
implicit val ec: ExecutionContext = system.dispatcher
implicit val mat: Materializer = SystemMaterializer(system).materializer
- lazy val queries = new DurableStateQueries(profile, durableStateConfig)
+ final lazy val queries = new DurableStateQueries(profile, durableStateConfig)
// Started lazily to prevent the actor for querying the db if no
changesByTag queries are used
private[jdbc] lazy val stateSequenceActor = system.systemActorOf(
- DurableStateSequenceActor.props(this,
durableStateConfig.stateSequenceConfig),
+ jdbcStateScalaDsl.DurableStateSequenceActor.props(this,
+ durableStateConfig.stateSequenceConfig),
s"pekko-persistence-jdbc-durable-state-sequence-actor")
- def getObject(persistenceId: String): Future[GetObjectResult[A]] = {
+ def getObject(persistenceId: String):
Future[stateScalaDsl.GetObjectResult[A]] = {
db.run(queries.selectFromDbByPersistenceId(persistenceId).result).map {
rows =>
rows.headOption match {
case Some(row) =>
- GetObjectResult(
+ stateScalaDsl.GetObjectResult(
PekkoSerialization.fromDurableStateRow(serialization)(row).toOption.asInstanceOf[Option[A]],
row.revision)
case None =>
- GetObjectResult(None, 0)
+ stateScalaDsl.GetObjectResult(None, 0)
}
}
}
diff --git
a/core/src/test/scala-2/org/apache/pekko/persistence/jdbc/CrossEventually.scala
b/core/src/test/scala-2/org/apache/pekko/persistence/jdbc/CrossEventually.scala
new file mode 100644
index 0000000..08d756f
--- /dev/null
+++
b/core/src/test/scala-2/org/apache/pekko/persistence/jdbc/CrossEventually.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.persistence.jdbc
+
+import org.scalactic.source
+import org.scalatest.concurrent.Eventually
+import org.scalatest.enablers.Retrying
+
+trait CrossEventually extends Eventually {
+ def eventually[T](config: PatienceConfig)(fun: => T)(implicit retrying:
Retrying[T], pos: source.Position): T =
+ eventually[T](fun)(config, retrying, pos)
+
+}
+
+object CrossEventually extends Eventually
diff --git
a/core/src/test/scala-3/org/apache/pekko/persistence/jdbc/CrossEventually.scala
b/core/src/test/scala-3/org/apache/pekko/persistence/jdbc/CrossEventually.scala
new file mode 100644
index 0000000..e0702c3
--- /dev/null
+++
b/core/src/test/scala-3/org/apache/pekko/persistence/jdbc/CrossEventually.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.persistence.jdbc
+
+import org.scalatest.concurrent.Eventually
+import org.scalatest.enablers.Retrying
+
+trait CrossEventually extends Eventually {
+ def eventually[T](config: PatienceConfig)(fun: => T)(implicit retrying:
Retrying[T]): T =
+ eventually[T](fun)(config, retrying)
+
+}
+
+object CrossEventually extends Eventually
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/SharedActorSystemTestSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/SharedActorSystemTestSpec.scala
index 0f71feb..b22bb69 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/SharedActorSystemTestSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/SharedActorSystemTestSpec.scala
@@ -38,7 +38,7 @@ abstract class SharedActorSystemTestSpec(val config: Config)
extends SimpleSpec
implicit lazy val ec: ExecutionContext = system.dispatcher
implicit val pc: PatienceConfig = PatienceConfig(timeout = 1.minute)
- implicit val timeout = Timeout(1.minute)
+ implicit val timeout: Timeout = Timeout(1.minute)
lazy val serialization = SerializationExtension(system)
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/SimpleSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/SimpleSpec.scala
index 6f7f57a..91a881b 100644
--- a/core/src/test/scala/org/apache/pekko/persistence/jdbc/SimpleSpec.scala
+++ b/core/src/test/scala/org/apache/pekko/persistence/jdbc/SimpleSpec.scala
@@ -19,7 +19,7 @@ import pekko.actor.{ ActorRef, ActorSystem }
import pekko.persistence.jdbc.util.ClasspathResources
import pekko.testkit.TestProbe
import org.scalatest._
-import org.scalatest.concurrent.{ Eventually, ScalaFutures }
+import org.scalatest.concurrent.ScalaFutures
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
@@ -29,7 +29,7 @@ trait SimpleSpec
with ScalaFutures
with TryValues
with OptionValues
- with Eventually
+ with CrossEventually
with ClasspathResources
with BeforeAndAfterAll
with BeforeAndAfterEach
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/JdbcJournalPerfSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/JdbcJournalPerfSpec.scala
index c385ac7..96d01a6 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/JdbcJournalPerfSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/JdbcJournalPerfSpec.scala
@@ -28,6 +28,7 @@ import com.typesafe.config.{ Config, ConfigFactory }
import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
import org.scalatest.concurrent.ScalaFutures
+import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
abstract class JdbcJournalPerfSpec(config: Config, schemaType: SchemaType)
@@ -39,7 +40,7 @@ abstract class JdbcJournalPerfSpec(config: Config,
schemaType: SchemaType)
with DropCreate {
override protected def supportsRejectingNonSerializableObjects:
CapabilityFlag = true
- implicit lazy val ec = system.dispatcher
+ implicit lazy val ec: ExecutionContext = system.dispatcher
implicit def pc: PatienceConfig = PatienceConfig(timeout = 10.minutes)
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/JdbcJournalSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/JdbcJournalSpec.scala
index e25e43c..de8e7d8 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/JdbcJournalSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/JdbcJournalSpec.scala
@@ -25,6 +25,7 @@ import pekko.persistence.jdbc.util.{ ClasspathResources,
DropCreate }
import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
import org.scalatest.concurrent.ScalaFutures
+import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
abstract class JdbcJournalSpec(config: Config, schemaType: SchemaType)
@@ -38,7 +39,7 @@ abstract class JdbcJournalSpec(config: Config, schemaType:
SchemaType)
implicit val pc: PatienceConfig = PatienceConfig(timeout = 10.seconds)
- implicit lazy val ec = system.dispatcher
+ implicit lazy val ec: ExecutionContext = system.dispatcher
lazy val cfg = system.settings.config.getConfig("jdbc-journal")
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala
index fb31410..02625e7 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala
@@ -29,7 +29,7 @@ import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.slf4j.LoggerFactory
import scala.collection.immutable
-import scala.concurrent.ExecutionContextExecutor
+import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.util.{ Failure, Success }
import pekko.stream.testkit.scaladsl.TestSink
@@ -66,7 +66,7 @@ abstract class JournalDaoStreamMessagesMemoryTest(configFile:
String)
pending
withActorSystem { implicit system: ActorSystem =>
withDatabase { db =>
- implicit val ec: ExecutionContextExecutor = system.dispatcher
+ implicit val ec: ExecutionContext = system.dispatcher
val persistenceId = UUID.randomUUID().toString
val dao = new ByteArrayJournalDao(db, profile, journalConfig,
SerializationExtension(system))
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/JournalSequenceActorTest.scala
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/JournalSequenceActorTest.scala
index be9a70a..ad8a169 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/JournalSequenceActorTest.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/JournalSequenceActorTest.scala
@@ -86,10 +86,10 @@ abstract class JournalSequenceActorTest(configFile: String,
isOracle: Boolean)
val startTime = System.currentTimeMillis()
withJournalSequenceActor(db, maxTries = 100) { actor =>
val patienceConfig = PatienceConfig(10.seconds, Span(200,
org.scalatest.time.Millis))
- eventually {
+ eventually(patienceConfig) {
val currentMax =
actor.ask(GetMaxOrderingId).mapTo[MaxOrderingId].futureValue.maxOrdering
currentMax shouldBe elements
- }(patienceConfig, implicitly, implicitly)
+ }
}
val timeTaken = System.currentTimeMillis() - startTime
log.info(s"Recovered all events in $timeTaken ms")
@@ -120,10 +120,10 @@ abstract class JournalSequenceActorTest(configFile:
String, isOracle: Boolean)
withJournalSequenceActor(db, maxTries = 2) { actor =>
// Should normally recover after `maxTries` seconds
val patienceConfig = PatienceConfig(10.seconds, Span(200,
org.scalatest.time.Millis))
- eventually {
+ eventually(patienceConfig) {
val currentMax =
actor.ask(GetMaxOrderingId).mapTo[MaxOrderingId].futureValue.maxOrdering
currentMax shouldBe lastElement
- }(patienceConfig, implicitly, implicitly)
+ }
}
}
}
@@ -156,10 +156,10 @@ abstract class JournalSequenceActorTest(configFile:
String, isOracle: Boolean)
withJournalSequenceActor(db, maxTries = 2) { actor =>
// The actor should assume the max after 2 seconds
val patienceConfig = PatienceConfig(3.seconds)
- eventually {
+ eventually(patienceConfig) {
val currentMax =
actor.ask(GetMaxOrderingId).mapTo[MaxOrderingId].futureValue.maxOrdering
currentMax shouldBe highestValue
- }(patienceConfig, implicitly, implicitly)
+ }
}
}
}
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/serialization/StoreOnlySerializableMessagesTest.scala
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/serialization/StoreOnlySerializableMessagesTest.scala
index 9c7784c..e03050e 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/serialization/StoreOnlySerializableMessagesTest.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/serialization/StoreOnlySerializableMessagesTest.scala
@@ -44,7 +44,7 @@ abstract class StoreOnlySerializableMessagesTest(config:
String, schemaType: Sch
override val receiveCommand: Receive = LoggingReceive { case msg =>
persist(msg) { _ =>
- sender ! pekko.actor.Status.Success("")
+ sender() ! pekko.actor.Status.Success("")
}
}
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/snapshot/JdbcSnapshotStoreSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/snapshot/JdbcSnapshotStoreSpec.scala
index ca4f969..b6e3e67 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/snapshot/JdbcSnapshotStoreSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/snapshot/JdbcSnapshotStoreSpec.scala
@@ -28,6 +28,8 @@ import scala.concurrent.duration._
import pekko.persistence.jdbc.testkit.internal.H2
import pekko.persistence.jdbc.testkit.internal.SchemaType
+import scala.concurrent.ExecutionContext
+
abstract class JdbcSnapshotStoreSpec(config: Config, schemaType: SchemaType)
extends SnapshotStoreSpec(config)
with BeforeAndAfterAll
@@ -36,7 +38,7 @@ abstract class JdbcSnapshotStoreSpec(config: Config,
schemaType: SchemaType)
with DropCreate {
implicit val pc: PatienceConfig = PatienceConfig(timeout = 10.seconds)
- implicit lazy val ec = system.dispatcher
+ implicit lazy val ec: ExecutionContext = system.dispatcher
lazy val cfg = system.settings.config.getConfig("jdbc-journal")
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/DurableStateSequenceActorTest.scala
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/DurableStateSequenceActorTest.scala
index a5d3468..7249fff 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/DurableStateSequenceActorTest.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/DurableStateSequenceActorTest.scala
@@ -21,18 +21,17 @@ import com.typesafe.config.{ Config, ConfigFactory }
import org.apache.pekko
import pekko.actor.{ ActorRef, ActorSystem, ExtendedActorSystem }
import pekko.pattern.ask
-import pekko.persistence.jdbc.SharedActorSystemTestSpec
+import pekko.persistence.jdbc.{ CrossEventually, SharedActorSystemTestSpec }
import
pekko.persistence.jdbc.state.scaladsl.DurableStateSequenceActor.VisitedElement
import pekko.persistence.jdbc.state.scaladsl.DurableStateSequenceActor.{
GetMaxGlobalOffset, MaxGlobalOffset }
import pekko.persistence.jdbc.testkit.internal.{ H2, SchemaType }
import pekko.testkit.TestProbe
import pekko.util.Timeout
-import org.scalatest.concurrent.Eventually
abstract class DurableStateSequenceActorTest(config: Config, schemaType:
SchemaType)
extends StateSpecBase(config, schemaType)
- with DataGenerationHelper
- with Eventually {
+ with CrossEventually
+ with DataGenerationHelper {
val durableStateSequenceActorConfig = durableStateConfig.stateSequenceConfig
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateSpec.scala
index 0296939..e8d82d5 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateSpec.scala
@@ -27,6 +27,8 @@ import org.scalatest.time.Millis
import org.scalatest.time.Seconds
import org.scalatest.time.Span
+import scala.concurrent.Future
+
abstract class JdbcDurableStateSpec(config: Config, schemaType: SchemaType)
extends StateSpecBase(config, schemaType) {
override implicit val defaultPatience =
@@ -294,7 +296,7 @@ abstract class JdbcDurableStateSpec(config: Config,
schemaType: SchemaType) exte
whenReady {
currentChanges("t1", NoOffset)
.collect { case u: UpdatedDurableState[String] => u }
- .runWith(Sink.seq[UpdatedDurableState[String]])
+ .runWith(Sink.seq[UpdatedDurableState[String]]):
Future[Seq[UpdatedDurableState[String]]]
} { chgs =>
chgs.map(_.offset.value) shouldBe sorted
chgs.map(_.offset.value).max shouldBe 3000
@@ -303,7 +305,7 @@ abstract class JdbcDurableStateSpec(config: Config,
schemaType: SchemaType) exte
whenReady {
currentChanges("t1", Sequence(2000))
.collect { case u: UpdatedDurableState[String] => u }
- .runWith(Sink.seq[UpdatedDurableState[String]])
+ .runWith(Sink.seq[UpdatedDurableState[String]]):
Future[Seq[UpdatedDurableState[String]]]
} { chgs =>
chgs.map(_.offset.value) shouldBe sorted
chgs.map(_.offset.value).max shouldBe 3000
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/StateSpecBase.scala
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/StateSpecBase.scala
index b4e64a7..82bd479 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/StateSpecBase.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/StateSpecBase.scala
@@ -32,6 +32,8 @@ import pekko.persistence.jdbc.util.DropCreate
import pekko.serialization.SerializationExtension
import pekko.util.Timeout
+import scala.concurrent.ExecutionContext
+
abstract class StateSpecBase(val config: Config, schemaType: SchemaType)
extends AnyWordSpecLike
with BeforeAndAfterAll
@@ -42,7 +44,7 @@ abstract class StateSpecBase(val config: Config, schemaType:
SchemaType)
with DataGenerationHelper {
implicit def system: ActorSystem
- implicit lazy val e = system.dispatcher
+ implicit lazy val e: ExecutionContext = system.dispatcher
private[jdbc] def schemaTypeToProfile(s: SchemaType) = s match {
case H2 => slick.jdbc.H2Profile
diff --git
a/migrator/src/main/scala/org/apache/pekko/persistence/jdbc/migrator/JournalMigrator.scala
b/migrator/src/main/scala/org/apache/pekko/persistence/jdbc/migrator/JournalMigrator.scala
index 430e2a9..8e336b8 100644
---
a/migrator/src/main/scala/org/apache/pekko/persistence/jdbc/migrator/JournalMigrator.scala
+++
b/migrator/src/main/scala/org/apache/pekko/persistence/jdbc/migrator/JournalMigrator.scala
@@ -48,13 +48,14 @@ final case class JournalMigrator(profile:
JdbcProfile)(implicit system: ActorSys
val log: Logger = LoggerFactory.getLogger(getClass)
// get the various configurations
- private val journalConfig: JournalConfig = new
JournalConfig(system.settings.config.getConfig(JournalConfig))
+ private val journalConfig: JournalConfig =
+ new
JournalConfig(system.settings.config.getConfig(JournalMigrator.JournalConfig))
private val readJournalConfig: ReadJournalConfig = new ReadJournalConfig(
- system.settings.config.getConfig(ReadJournalConfig))
+ system.settings.config.getConfig(JournalMigrator.ReadJournalConfig))
// the journal database
private val journalDB: JdbcBackend.Database =
-
SlickExtension(system).database(system.settings.config.getConfig(ReadJournalConfig)).database
+
SlickExtension(system).database(system.settings.config.getConfig(JournalMigrator.ReadJournalConfig)).database
// get an instance of the new journal queries
private val newJournalQueries: JournalQueries =
diff --git
a/migrator/src/main/scala/org/apache/pekko/persistence/jdbc/migrator/SnapshotMigrator.scala
b/migrator/src/main/scala/org/apache/pekko/persistence/jdbc/migrator/SnapshotMigrator.scala
index 6397c02..10bbeee 100644
---
a/migrator/src/main/scala/org/apache/pekko/persistence/jdbc/migrator/SnapshotMigrator.scala
+++
b/migrator/src/main/scala/org/apache/pekko/persistence/jdbc/migrator/SnapshotMigrator.scala
@@ -48,13 +48,13 @@ case class SnapshotMigrator(profile: JdbcProfile)(implicit
system: ActorSystem)
private val snapshotConfig: SnapshotConfig = new
SnapshotConfig(system.settings.config.getConfig(SnapshotStoreConfig))
private val readJournalConfig: ReadJournalConfig = new ReadJournalConfig(
- system.settings.config.getConfig(ReadJournalConfig))
+ system.settings.config.getConfig(JournalMigrator.ReadJournalConfig))
private val snapshotDB: jdbc.JdbcBackend.Database =
SlickExtension(system).database(system.settings.config.getConfig(SnapshotStoreConfig)).database
private val journalDB: JdbcBackend.Database =
-
SlickExtension(system).database(system.settings.config.getConfig(ReadJournalConfig)).database
+
SlickExtension(system).database(system.settings.config.getConfig(JournalMigrator.ReadJournalConfig)).database
private val serialization: Serialization = SerializationExtension(system)
private val queries: SnapshotQueries = new SnapshotQueries(profile,
snapshotConfig.legacySnapshotTableConfiguration)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]