This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-projection.git
The following commit(s) were added to refs/heads/main by this push:
new 0afbf60 support scala3 with slick (#167)
0afbf60 is described below
commit 0afbf604b00fdf14d63b0ea611b2dd149bdcdac0
Author: PJ Fanning <[email protected]>
AuthorDate: Sun Jun 16 23:56:38 2024 +0100
support scala3 with slick (#167)
* try to support scala3 with slick
* further build changes
* Update Dependencies.scala
* Scala 3 better enforces package private checks and there is a test in a
non-pekko package
* Update SlickOffsetStore.scala
* Update SlickOffsetStore.scala
* make access to db instance lazy
---
build.sbt | 9 --------
.../projection/jdbc/internal/JdbcOffsetStore.scala | 2 +-
.../projection/jdbc/internal/JdbcSettings.scala | 2 +-
.../integration/KafkaToSlickIntegrationSpec.scala | 2 +-
project/Common.scala | 2 +-
project/Dependencies.scala | 7 +++---
project/PekkoCoreDependency.scala | 2 +-
.../pekko/projection/slick/SlickProjection.scala | 2 +-
.../slick/internal/SlickOffsetStore.scala | 27 +++++++++++++---------
.../slick/internal/SlickProjectionImpl.scala | 2 +-
.../projection/slick/SlickOffsetStoreSpec.scala | 2 +-
.../projection/slick/SlickProjectionSpec.scala | 2 +-
12 files changed, 28 insertions(+), 33 deletions(-)
diff --git a/build.sbt b/build.sbt
index a807aac..84a94f2 100644
--- a/build.sbt
+++ b/build.sbt
@@ -20,7 +20,6 @@ lazy val core =
Project(id = "core", base = file("core"))
.configs(IntegrationTest)
.enablePlugins(ReproducibleBuildsPlugin)
- .settings(crossScalaVersions := Dependencies.Scala2And3Versions)
.settings(headerSettings(IntegrationTest))
.settings(Defaults.itSettings)
.settings(Dependencies.core)
@@ -31,7 +30,6 @@ lazy val core =
lazy val coreTest =
Project(id = "core-test", base = file("core-test"))
.configs(IntegrationTest)
- .settings(crossScalaVersions := Dependencies.Scala2And3Versions)
.settings(headerSettings(IntegrationTest))
.disablePlugins(MimaPlugin)
.settings(Defaults.itSettings)
@@ -46,7 +44,6 @@ lazy val testkit =
Project(id = "testkit", base = file("testkit"))
.configs(IntegrationTest)
.enablePlugins(ReproducibleBuildsPlugin)
- .settings(crossScalaVersions := Dependencies.Scala2And3Versions)
.settings(headerSettings(IntegrationTest))
.settings(Defaults.itSettings)
.settings(Dependencies.testKit)
@@ -59,7 +56,6 @@ lazy val jdbc =
Project(id = "jdbc", base = file("jdbc"))
.configs(IntegrationTest.extend(Test))
.enablePlugins(ReproducibleBuildsPlugin)
- .settings(crossScalaVersions := Dependencies.Scala2And3Versions)
.settings(headerSettings(IntegrationTest))
.settings(Defaults.itSettings)
.settings(Dependencies.jdbc)
@@ -74,7 +70,6 @@ lazy val slick =
Project(id = "slick", base = file("slick"))
.configs(IntegrationTest.extend(Test))
.enablePlugins(ReproducibleBuildsPlugin)
- .settings(crossScalaVersions := Dependencies.Scala2Versions)
.settings(headerSettings(IntegrationTest))
.settings(Defaults.itSettings)
.settings(Dependencies.slick)
@@ -92,7 +87,6 @@ lazy val cassandra =
Project(id = "cassandra", base = file("cassandra"))
.configs(IntegrationTest)
.enablePlugins(ReproducibleBuildsPlugin)
- .settings(crossScalaVersions := Dependencies.Scala2And3Versions)
.settings(headerSettings(IntegrationTest))
.settings(Defaults.itSettings)
.settings(Dependencies.cassandra)
@@ -109,7 +103,6 @@ lazy val cassandra =
lazy val eventsourced =
Project(id = "eventsourced", base = file("eventsourced"))
.enablePlugins(ReproducibleBuildsPlugin)
- .settings(crossScalaVersions := Dependencies.Scala2And3Versions)
.settings(Dependencies.eventsourced)
.settings(AutomaticModuleName.settings("pekko.projection.eventsourced"))
.settings(name := "pekko-projection-eventsourced")
@@ -120,7 +113,6 @@ lazy val eventsourced =
lazy val kafka =
Project(id = "kafka", base = file("kafka"))
.enablePlugins(ReproducibleBuildsPlugin)
- .settings(crossScalaVersions := Dependencies.Scala2And3Versions)
.settings(Dependencies.kafka)
.settings(AutomaticModuleName.settings("pekko.projection.kafka"))
.settings(name := "pekko-projection-kafka")
@@ -146,7 +138,6 @@ lazy val `durable-state` =
Project(id = "durable-state", base = file("durable-state"))
.configs(IntegrationTest)
.enablePlugins(ReproducibleBuildsPlugin)
- .settings(crossScalaVersions := Dependencies.Scala2And3Versions)
.settings(Dependencies.state)
.settings(AutomaticModuleName.settings("pekko.projection.durable-state"))
.settings(name := "pekko-projection-durable-state")
diff --git
a/jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcOffsetStore.scala
b/jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcOffsetStore.scala
index 5ede283..7bb448b 100644
---
a/jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcOffsetStore.scala
+++
b/jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcOffsetStore.scala
@@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory
* INTERNAL API
*/
@InternalApi
-private[projection] class JdbcOffsetStore[S <: JdbcSession](
+class JdbcOffsetStore[S <: JdbcSession](
system: ActorSystem[_],
settings: JdbcSettings,
jdbcSessionFactory: () => S,
diff --git
a/jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcSettings.scala
b/jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcSettings.scala
index 35912e0..31db11f 100644
---
a/jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcSettings.scala
+++
b/jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcSettings.scala
@@ -27,7 +27,7 @@ import com.typesafe.config.ConfigValueType
* INTERNAL API
*/
@InternalApi
-private[projection] case class JdbcSettings(config: Config, executionContext:
ExecutionContext) {
+case class JdbcSettings(config: Config, executionContext: ExecutionContext) {
val schema: Option[String] =
Option(config.getString("offset-store.schema")).filterNot(_.trim.isEmpty)
diff --git
a/kafka-test/src/it/scala/org/apache/pekko/projection/kafka/integration/KafkaToSlickIntegrationSpec.scala
b/kafka-test/src/it/scala/org/apache/pekko/projection/kafka/integration/KafkaToSlickIntegrationSpec.scala
index b4bb21b..da5e046 100644
---
a/kafka-test/src/it/scala/org/apache/pekko/projection/kafka/integration/KafkaToSlickIntegrationSpec.scala
+++
b/kafka-test/src/it/scala/org/apache/pekko/projection/kafka/integration/KafkaToSlickIntegrationSpec.scala
@@ -131,7 +131,7 @@ class KafkaToSlickIntegrationSpec extends
KafkaSpecBase(ConfigFactory.load().wit
PatienceConfig(timeout = Span(30, Seconds), interval = Span(500,
Milliseconds))
val dbConfig: DatabaseConfig[H2Profile] =
DatabaseConfig.forConfig(SlickSettings.configPath, config)
- val offsetStore = new SlickOffsetStore(system.toTyped, dbConfig.db,
dbConfig.profile, SlickSettings(system.toTyped))
+ val offsetStore = new SlickOffsetStore(system.toTyped, dbConfig,
SlickSettings(system.toTyped))
val repository = new EventTypeCountRepository(dbConfig)
override protected def beforeAll(): Unit = {
diff --git a/project/Common.scala b/project/Common.scala
index e1f7932..0d0962b 100644
--- a/project/Common.scala
+++ b/project/Common.scala
@@ -43,7 +43,7 @@ object Common extends AutoPlugin {
override lazy val projectSettings = Seq(
projectInfoVersion := (if (isSnapshot.value) "snapshot" else
version.value),
crossVersion := CrossVersion.binary,
- crossScalaVersions := Dependencies.Scala2Versions,
+ crossScalaVersions := Dependencies.ScalaVersions,
scalaVersion := Dependencies.Scala213,
javacOptions ++= List("-Xlint:unchecked", "-Xlint:deprecation"),
Compile / doc / scalacOptions := scalacOptions.value ++ Seq(
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 02dd0ac..163af5e 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -17,16 +17,15 @@ object Dependencies {
val Scala213 = "2.13.14"
val Scala212 = "2.12.19"
val Scala3 = "3.3.3"
- val Scala2Versions = Seq(Scala213, Scala212)
- val Scala2And3Versions = Scala2Versions.+:(Scala3)
+ val ScalaVersions = Seq(Scala213, Scala212, Scala3)
- val PekkoVersionInDocs = "1.0"
+ val PekkoVersionInDocs = "1.1"
val ConnectorsVersionInDocs = "1.0"
val ConnectorsKafkaVersionInDocs = "1.0"
object Versions {
val pekko = PekkoCoreDependency.version
- val pekkoPersistenceJdbc = "1.0.0"
+ val pekkoPersistenceJdbc = "1.1.0-M1"
val pekkoPersistenceCassandra = "1.0.0"
val connectors = PekkoConnectorsDependency.version
val connectorsKafka = PekkoConnectorsKafkaDependency.version
diff --git a/project/PekkoCoreDependency.scala
b/project/PekkoCoreDependency.scala
index 5fd5678..155a8b4 100644
--- a/project/PekkoCoreDependency.scala
+++ b/project/PekkoCoreDependency.scala
@@ -21,5 +21,5 @@ import com.github.pjfanning.pekkobuild.PekkoDependency
object PekkoCoreDependency extends PekkoDependency {
override val checkProject: String = "pekko-cluster-sharding-typed"
override val module: Option[String] = None
- override val currentVersion: String = "1.0.2"
+ override val currentVersion: String = "1.1.0-M1"
}
diff --git
a/slick/src/main/scala/org/apache/pekko/projection/slick/SlickProjection.scala
b/slick/src/main/scala/org/apache/pekko/projection/slick/SlickProjection.scala
index 999e9e7..10e88e1 100644
---
a/slick/src/main/scala/org/apache/pekko/projection/slick/SlickProjection.scala
+++
b/slick/src/main/scala/org/apache/pekko/projection/slick/SlickProjection.scala
@@ -395,7 +395,7 @@ object SlickProjection {
private def createOffsetStore[P <: JdbcProfile: ClassTag](databaseConfig:
DatabaseConfig[P])(
implicit system: ActorSystem[_]) =
- new SlickOffsetStore(system, databaseConfig.db, databaseConfig.profile,
SlickSettings(system))
+ new SlickOffsetStore(system, databaseConfig, SlickSettings(system))
}
@ApiMayChange
diff --git
a/slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickOffsetStore.scala
b/slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickOffsetStore.scala
index 36a98a7..b61d1be 100644
---
a/slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickOffsetStore.scala
+++
b/slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickOffsetStore.scala
@@ -35,6 +35,7 @@ import pekko.projection.jdbc.internal.MySQLDialect
import pekko.projection.jdbc.internal.OracleDialect
import pekko.projection.jdbc.internal.PostgresDialect
import pekko.util.Helpers.toRootLowerCase
+import slick.basic.DatabaseConfig
import slick.jdbc.JdbcProfile
/**
@@ -42,16 +43,18 @@ import slick.jdbc.JdbcProfile
*/
@InternalApi private[projection] class SlickOffsetStore[P <: JdbcProfile](
system: ActorSystem[_],
- val db: P#Backend#Database,
- val profile: P,
+ databaseConfig: DatabaseConfig[P],
slickSettings: SlickSettings,
clock: Clock) {
+
+ def this(system: ActorSystem[_], databaseConfig: DatabaseConfig[P],
slickSettings: SlickSettings) =
+ this(system, databaseConfig, slickSettings, Clock.systemUTC())
+
+ private[projection] val profile: P = databaseConfig.profile
+
+ import profile.api._
import OffsetSerialization.MultipleOffsets
import OffsetSerialization.SingleOffset
- import profile.api._
-
- def this(system: ActorSystem[_], db: P#Backend#Database, profile: P,
slickSettings: SlickSettings) =
- this(system, db, profile, slickSettings, Clock.systemUTC())
val (dialect, useLowerCase): (Dialect, Boolean) = {
@@ -88,7 +91,7 @@ import slick.jdbc.JdbcProfile
SingleOffset(ProjectionId(projectionId.name, row.projectionKey),
row.manifest, row.offsetStr, row.mergeable))
}
- val results = db.run(action)
+ val results = databaseConfig.db.run(action)
results.map {
case Nil => None
@@ -177,7 +180,8 @@ import slick.jdbc.JdbcProfile
stmt.execute(sql)
})
}
- db.run(DBIO.seq(prepareSchemaDBIO, prepareManagementSchemaDBIO)).map(_ =>
Done)(ExecutionContexts.parasitic)
+ databaseConfig.db.run(DBIO.seq(prepareSchemaDBIO,
prepareManagementSchemaDBIO))
+ .map(_ => Done)(ExecutionContexts.parasitic)
}
def dropIfExists(): Future[Done] = {
@@ -193,7 +197,8 @@ import slick.jdbc.JdbcProfile
stmt.execute(dialect.dropManagementTableStatement)
}
}
- db.run(DBIO.seq(prepareSchemaDBIO, prepareManagementSchemaDBIO)).map(_ =>
Done)(ExecutionContexts.parasitic)
+ databaseConfig.db.run(DBIO.seq(prepareSchemaDBIO,
prepareManagementSchemaDBIO))
+ .map(_ => Done)(ExecutionContexts.parasitic)
}
def readManagementState(projectionId: ProjectionId)(
@@ -206,7 +211,7 @@ import slick.jdbc.JdbcProfile
maybeRow.map(row => ManagementState(row.paused))
}
- db.run(action)
+ databaseConfig.db.run(action)
}
def savePaused(projectionId: ProjectionId, paused: Boolean): Future[Done] = {
@@ -214,6 +219,6 @@ import slick.jdbc.JdbcProfile
val action =
managementTable.insertOrUpdate(ManagementStateRow(projectionId.name,
projectionId.key, paused, millisSinceEpoch))
- db.run(action).map(_ => Done)(ExecutionContexts.parasitic)
+ databaseConfig.db.run(action).map(_ => Done)(ExecutionContexts.parasitic)
}
}
diff --git
a/slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickProjectionImpl.scala
b/slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickProjectionImpl.scala
index 311add3..27c4671 100644
---
a/slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickProjectionImpl.scala
+++
b/slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickProjectionImpl.scala
@@ -186,7 +186,7 @@ private[projection] class SlickProjectionImpl[Offset,
Envelope, P <: JdbcProfile
settings) {
implicit val executionContext: ExecutionContext = system.executionContext
- override val logger: LoggingAdapter = Logging(system.classicSystem,
this.getClass)
+ override val logger: LoggingAdapter = Logging(system.classicSystem,
classOf[SlickInternalProjectionState])
override def readPaused(): Future[Boolean] =
offsetStore.readManagementState(projectionId).map(_.exists(_.paused))
diff --git
a/slick/src/test/scala/org/apache/pekko/projection/slick/SlickOffsetStoreSpec.scala
b/slick/src/test/scala/org/apache/pekko/projection/slick/SlickOffsetStoreSpec.scala
index 1f3035a..ecb7e49 100644
---
a/slick/src/test/scala/org/apache/pekko/projection/slick/SlickOffsetStoreSpec.scala
+++
b/slick/src/test/scala/org/apache/pekko/projection/slick/SlickOffsetStoreSpec.scala
@@ -107,7 +107,7 @@ abstract class SlickOffsetStoreSpec(specConfig:
SlickSpecConfig)
private val clock = new TestClock
private val offsetStore =
- new SlickOffsetStore(system, dbConfig.db, dbConfig.profile,
SlickSettings(slickConfig), clock)
+ new SlickOffsetStore(system, dbConfig, SlickSettings(slickConfig), clock)
override protected def beforeAll(): Unit = {
// create offset table
diff --git
a/slick/src/test/scala/org/apache/pekko/projection/slick/SlickProjectionSpec.scala
b/slick/src/test/scala/org/apache/pekko/projection/slick/SlickProjectionSpec.scala
index cadde1e..2c84bf7 100644
---
a/slick/src/test/scala/org/apache/pekko/projection/slick/SlickProjectionSpec.scala
+++
b/slick/src/test/scala/org/apache/pekko/projection/slick/SlickProjectionSpec.scala
@@ -172,7 +172,7 @@ class SlickProjectionSpec
val dbConfig: DatabaseConfig[H2Profile] =
DatabaseConfig.forConfig(SlickSettings.configPath, config)
- val offsetStore = new SlickOffsetStore(system, dbConfig.db,
dbConfig.profile, SlickSettings(system))
+ val offsetStore = new SlickOffsetStore(system, dbConfig,
SlickSettings(system))
val projectionTestKit = ProjectionTestKit(system)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]