This is an automated email from the ASF dual-hosted git repository. engelen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/pekko-persistence-r2dbc.git
The following commit(s) were added to refs/heads/main by this push: new ae5bdad drop scala 2.12 (#242) ae5bdad is described below commit ae5bdad7bea916aded92f492c1befaf4c3f15c92 Author: PJ Fanning <pjfann...@users.noreply.github.com> AuthorDate: Sat Sep 27 12:40:31 2025 +0100 drop scala 2.12 (#242) --- .github/workflows/build-test.yml | 11 ++++------- .../r2dbc/ConnectionFactoryProvider.scala | 3 ++- .../pekko/persistence/r2dbc/R2dbcSettings.scala | 21 +++++++++++---------- .../r2dbc/internal/ContinuousQuery.scala | 9 +++------ .../r2dbc/internal/HighestSequenceNrDao.scala | 6 ++---- .../persistence/r2dbc/internal/R2dbcExecutor.scala | 9 ++++----- .../persistence/r2dbc/journal/JournalDao.scala | 13 ++++++------- .../persistence/r2dbc/journal/R2dbcJournal.scala | 15 ++++++--------- .../r2dbc/query/javadsl/R2dbcReadJournal.scala | 11 ++++++----- .../persistence/r2dbc/snapshot/SnapshotDao.scala | 11 +++++------ .../state/javadsl/R2dbcDurableStateStore.scala | 6 +++--- .../r2dbc/state/scaladsl/DurableStateDao.scala | 10 ++++------ .../state/scaladsl/R2dbcDurableStateStore.scala | 7 +++---- .../persistence/r2dbc/migration/MigrationTool.scala | 10 ++++------ .../r2dbc/migration/MigrationToolDao.scala | 8 +++----- project/CommonSettings.scala | 2 +- project/Dependencies.scala | 1 - .../projection/r2dbc/R2dbcProjectionSettings.scala | 5 +++-- .../internal/BySliceSourceProviderAdapter.scala | 21 +++++++++------------ .../r2dbc/internal/R2dbcHandlerAdapter.scala | 4 ++-- .../r2dbc/internal/R2dbcOffsetStore.scala | 21 ++++++++++----------- .../r2dbc/internal/R2dbcProjectionImpl.scala | 4 ++-- .../projection/r2dbc/javadsl/R2dbcProjection.scala | 3 ++- .../projection/r2dbc/javadsl/R2dbcSession.scala | 11 +++++------ .../r2dbc/TestSourceProviderWithInput.scala | 2 +- 25 files changed, 101 insertions(+), 123 deletions(-) diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index a809c01..49fadd6 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -13,7 +13,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - SCALA_VERSION: [ 2.12, 2.13, 3.3 ] + SCALA_VERSION: [ 2.13, 3.3 ] JAVA_VERSION: [ 8, 11, 17, 21 ] steps: - name: Checkout @@ -45,7 +45,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - SCALA_VERSION: [ 2.12, 2.13, 3.3 ] + SCALA_VERSION: [ 2.13, 3.3 ] JAVA_VERSION: [ 8, 11 ] if: github.repository == 'apache/pekko-persistence-r2dbc' steps: @@ -95,7 +95,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - SCALA_VERSION: [ 2.12, 2.13, 3.3 ] + SCALA_VERSION: [ 2.13, 3.3 ] JAVA_VERSION: [ 8, 11 ] if: github.repository == 'apache/pekko-persistence-r2dbc' steps: @@ -145,13 +145,10 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - SCALA_VERSION: [ 2.12, 2.13, 3.3 ] + SCALA_VERSION: [ 2.13, 3.3 ] JAVA_VERSION: [ 11, 17, 21 ] # only compiling on JDK 8, because certain tests depend on the higher timestamp precision added in JDK 9 include: - - JAVA_VERSION: 8 - SCALA_VERSION: 2.12 - COMPILE_ONLY: true - JAVA_VERSION: 8 SCALA_VERSION: 2.13 COMPILE_ONLY: true diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/ConnectionFactoryProvider.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/ConnectionFactoryProvider.scala index b6a5e5d..ce3ea63 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/ConnectionFactoryProvider.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/ConnectionFactoryProvider.scala @@ -18,8 +18,10 @@ import java.util.concurrent.ConcurrentHashMap import scala.concurrent.Future import scala.concurrent.duration.Duration +import scala.jdk.CollectionConverters._ import scala.util.Failure import scala.util.Success + import org.apache.pekko import pekko.Done import pekko.actor.CoordinatedShutdown @@ -29,7 +31,6 @@ import pekko.actor.typed.ExtensionId import pekko.persistence.r2dbc.ConnectionFactoryProvider.ConnectionFactoryOptionsCustomizer import pekko.persistence.r2dbc.ConnectionFactoryProvider.NoopCustomizer import pekko.persistence.r2dbc.internal.R2dbcExecutor.PublisherOps -import pekko.util.ccompat.JavaConverters._ import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import io.r2dbc.pool.ConnectionPool diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala index 9a67a80..00f43b3 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala @@ -16,11 +16,12 @@ package org.apache.pekko.persistence.r2dbc import java.util.Locale import scala.concurrent.duration._ +import scala.jdk.DurationConverters._ + import org.apache.pekko import pekko.annotation.InternalApi import pekko.annotation.InternalStableApi import pekko.util.Helpers.toRootLowerCase -import pekko.util.JavaDurationConverters._ import com.typesafe.config.Config /** @@ -151,7 +152,7 @@ trait ConnectionSettings { val logDbCallsExceeding: FiniteDuration = config.getString("log-db-calls-exceeding").toLowerCase(Locale.ROOT) match { case "off" => -1.millis - case _ => config.getDuration("log-db-calls-exceeding").asScala + case _ => config.getDuration("log-db-calls-exceeding").toScala } } @@ -205,7 +206,7 @@ trait BufferSize { trait RefreshInterval { def config: Config - val refreshInterval: FiniteDuration = config.getDuration("refresh-interval").asScala + val refreshInterval: FiniteDuration = config.getDuration("refresh-interval").toScala } /** @@ -215,10 +216,10 @@ trait RefreshInterval { trait BySliceQuerySettings { def config: Config - val behindCurrentTime: FiniteDuration = config.getDuration("behind-current-time").asScala + val behindCurrentTime: FiniteDuration = config.getDuration("behind-current-time").toScala val backtrackingEnabled: Boolean = config.getBoolean("backtracking.enabled") - val backtrackingWindow: FiniteDuration = config.getDuration("backtracking.window").asScala - val backtrackingBehindCurrentTime: FiniteDuration = config.getDuration("backtracking.behind-current-time").asScala + val backtrackingWindow: FiniteDuration = config.getDuration("backtracking.window").toScala + val backtrackingBehindCurrentTime: FiniteDuration = config.getDuration("backtracking.behind-current-time").toScala } /** @@ -254,11 +255,11 @@ final class ConnectionFactorySettings(config: Config) { val initialSize: Int = config.getInt("initial-size") val maxSize: Int = config.getInt("max-size") - val maxIdleTime: FiniteDuration = config.getDuration("max-idle-time").asScala - val maxLifeTime: FiniteDuration = config.getDuration("max-life-time").asScala + val maxIdleTime: FiniteDuration = config.getDuration("max-idle-time").toScala + val maxLifeTime: FiniteDuration = config.getDuration("max-life-time").toScala - val connectTimeout: FiniteDuration = config.getDuration("connect-timeout").asScala - val acquireTimeout: FiniteDuration = config.getDuration("acquire-timeout").asScala + val connectTimeout: FiniteDuration = config.getDuration("connect-timeout").toScala + val acquireTimeout: FiniteDuration = config.getDuration("acquire-timeout").toScala val acquireRetry: Int = config.getInt("acquire-retry") val validationQuery: String = config.getString("validation-query") diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/ContinuousQuery.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/ContinuousQuery.scala index 422a1e7..b51055c 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/ContinuousQuery.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/ContinuousQuery.scala @@ -13,16 +13,13 @@ package org.apache.pekko.persistence.r2dbc.internal -import scala.concurrent.Future +import scala.concurrent.{ ExecutionContext, Future } import scala.concurrent.duration._ -import scala.util.Failure -import scala.util.Success -import scala.util.Try +import scala.util.{ Failure, Success, Try } import org.apache.pekko import pekko.NotUsed import pekko.annotation.InternalApi -import pekko.dispatch.ExecutionContexts import pekko.stream.Attributes import pekko.stream.Outlet import pekko.stream.SourceShape @@ -127,7 +124,7 @@ final private[r2dbc] class ContinuousQuery[S, T]( beforeQuery(state) match { case None => runNextQuery() case Some(beforeQueryFuture) => - beforeQueryFuture.onComplete(beforeQueryCallback.invoke)(ExecutionContexts.parasitic) + beforeQueryFuture.onComplete(beforeQueryCallback.invoke)(ExecutionContext.parasitic) } } } diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/HighestSequenceNrDao.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/HighestSequenceNrDao.scala index b1aa52f..6146e05 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/HighestSequenceNrDao.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/HighestSequenceNrDao.scala @@ -15,13 +15,11 @@ package org.apache.pekko.persistence.r2dbc.internal import org.apache.pekko import pekko.annotation.InternalApi -import pekko.dispatch.ExecutionContexts import pekko.persistence.r2dbc.Dialect import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation import org.slf4j.LoggerFactory -import scala.concurrent.ExecutionContext -import scala.concurrent.Future +import scala.concurrent.{ ExecutionContext, Future } /** * INTERNAL API @@ -63,7 +61,7 @@ trait HighestSequenceNrDao { val seqNr = row.get[java.lang.Long](0, classOf[java.lang.Long]) if (seqNr eq null) 0L else seqNr.longValue }) - .map(r => if (r.isEmpty) 0L else r.head)(ExecutionContexts.parasitic) + .map(r => if (r.isEmpty) 0L else r.head)(ExecutionContext.parasitic) if (log.isDebugEnabled) result.foreach(seqNr => log.debug("Highest sequence nr for persistenceId [{}]: [{}]", persistenceId, seqNr)) diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/R2dbcExecutor.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/R2dbcExecutor.scala index 138584b..55dd88a 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/R2dbcExecutor.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/R2dbcExecutor.scala @@ -26,7 +26,6 @@ import org.apache.pekko import pekko.Done import pekko.actor.typed.ActorSystem import pekko.annotation.InternalStableApi -import pekko.dispatch.ExecutionContexts import io.r2dbc.spi.Connection import io.r2dbc.spi.ConnectionFactory import io.r2dbc.spi.Result @@ -58,7 +57,7 @@ import reactor.core.publisher.Mono def updateOneInTx(stmt: Statement)(implicit ec: ExecutionContext): Future[Long] = stmt.execute().asFuture().flatMap { result => - result.getRowsUpdated.asFuture().map(_.longValue())(ExecutionContexts.parasitic) + result.getRowsUpdated.asFuture().map(_.longValue())(ExecutionContext.parasitic) } def updateBatchInTx(stmt: Statement)(implicit ec: ExecutionContext): Future[Long] = { @@ -76,7 +75,7 @@ import reactor.core.publisher.Mono statements.foldLeft(Future.successful(immutable.IndexedSeq.empty[Long])) { (acc, stmt) => acc.flatMap { seq => stmt.execute().asFuture().flatMap { res => - res.getRowsUpdated.asFuture().map(seq :+ _.longValue())(ExecutionContexts.parasitic) + res.getRowsUpdated.asFuture().map(seq :+ _.longValue())(ExecutionContext.parasitic) } } } @@ -134,7 +133,7 @@ class R2dbcExecutor(val connectionFactory: ConnectionFactory, log: Logger, logDb if (durationMicros >= logDbCallsExceedingMicros) log.info("{} - getConnection took [{}] µs", logPrefix, durationMicros) connection - }(ExecutionContexts.parasitic) + }(ExecutionContext.parasitic) } /** @@ -217,7 +216,7 @@ class R2dbcExecutor(val connectionFactory: ConnectionFactory, log: Logger, logDb def updateInBatchReturning[A](logPrefix: String)( statementFactory: Connection => Statement, mapRow: Row => A): Future[immutable.IndexedSeq[A]] = { - import pekko.util.ccompat.JavaConverters._ + import scala.jdk.CollectionConverters._ withConnection(logPrefix) { connection => val stmt = statementFactory(connection) Flux diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala index c06516d..1e5fcfb 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala @@ -15,12 +15,11 @@ package org.apache.pekko.persistence.r2dbc.journal import java.time.Instant -import scala.concurrent.ExecutionContext -import scala.concurrent.Future +import scala.concurrent.{ ExecutionContext, Future } + import org.apache.pekko import pekko.actor.typed.ActorSystem import pekko.annotation.InternalApi -import pekko.dispatch.ExecutionContexts import pekko.persistence.Persistence import pekko.persistence.r2dbc.ConnectionFactoryProvider import pekko.persistence.r2dbc.Dialect @@ -237,7 +236,7 @@ private[r2dbc] class JournalDao(val settings: JournalSettings, connectionFactory if (useTimestampFromDb) { result } else { - result.map(_ => events.head.dbTimestamp)(ExecutionContexts.parasitic) + result.map(_ => events.head.dbTimestamp)(ExecutionContext.parasitic) } } else { val result = r2dbcExecutor.updateInBatchReturning(s"batch insert [$persistenceId], [$totalEvents] events")( @@ -254,9 +253,9 @@ private[r2dbc] class JournalDao(val settings: JournalSettings, connectionFactory log.debug("Wrote [{}] events for persistenceId [{}]", 1, events.head.persistenceId) } if (useTimestampFromDb) { - result.map(_.head)(ExecutionContexts.parasitic) + result.map(_.head)(ExecutionContext.parasitic) } else { - result.map(_ => events.head.dbTimestamp)(ExecutionContexts.parasitic) + result.map(_ => events.head.dbTimestamp)(ExecutionContext.parasitic) } } } @@ -299,7 +298,7 @@ private[r2dbc] class JournalDao(val settings: JournalSettings, connectionFactory result.foreach(updatedRows => log.debug("Deleted [{}] events for persistenceId [{}]", updatedRows.head, persistenceId)) - result.map(_ => ())(ExecutionContexts.parasitic) + result.map(_ => ())(ExecutionContext.parasitic) } } diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala index 200d43a..c2fdd35 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala @@ -16,11 +16,9 @@ package org.apache.pekko.persistence.r2dbc.journal import java.time.Instant import scala.collection.immutable -import scala.concurrent.ExecutionContext -import scala.concurrent.Future -import scala.util.Failure -import scala.util.Success -import scala.util.Try +import scala.concurrent.{ ExecutionContext, Future } +import scala.util.{ Failure, Success, Try } + import com.typesafe.config.Config import org.apache.pekko import pekko.Done @@ -28,7 +26,6 @@ import pekko.actor.ActorRef import pekko.actor.typed.ActorSystem import pekko.actor.typed.scaladsl.adapter._ import pekko.annotation.InternalApi -import pekko.dispatch.ExecutionContexts import pekko.event.Logging import pekko.persistence.AtomicWrite import pekko.persistence.Persistence @@ -181,7 +178,7 @@ private[r2dbc] final class R2dbcJournal(config: Config, cfgPath: String) extends writeAndPublishResult.onComplete { _ => self ! WriteFinished(persistenceId, writeAndPublishResult) } - writeAndPublishResult.map(_ => Nil)(ExecutionContexts.parasitic) + writeAndPublishResult.map(_ => Nil)(ExecutionContext.parasitic) } private def publish(messages: immutable.Seq[AtomicWrite], dbTimestamp: Future[Instant]): Future[Done] = @@ -196,7 +193,7 @@ private[r2dbc] final class R2dbcJournal(config: Config, cfgPath: String) extends } case None => - dbTimestamp.map(_ => Done)(ExecutionContexts.parasitic) + dbTimestamp.map(_ => Done)(ExecutionContext.parasitic) } private def logEventsByTagsNotImplemented(): Unit = { @@ -236,7 +233,7 @@ private[r2dbc] final class R2dbcJournal(config: Config, cfgPath: String) extends case Some(f) => log.debug("Write in progress for [{}], deferring highest seq nr until write completed", persistenceId) // we only want to make write - replay sequential, not fail if previous write failed - f.recover { case _ => Done }(ExecutionContexts.parasitic) + f.recover { case _ => Done }(ExecutionContext.parasitic) case None => Future.successful(Done) } pendingWrite.flatMap(_ => journalDao.readHighestSequenceNr(persistenceId, fromSequenceNr)) diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/javadsl/R2dbcReadJournal.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/javadsl/R2dbcReadJournal.scala index f45f91b..172f855 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/javadsl/R2dbcReadJournal.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/javadsl/R2dbcReadJournal.scala @@ -18,9 +18,12 @@ import java.util import java.util.Optional import java.util.concurrent.CompletionStage +import scala.concurrent.ExecutionContext +import scala.jdk.FutureConverters._ +import scala.jdk.OptionConverters._ + import org.apache.pekko import pekko.NotUsed -import pekko.dispatch.ExecutionContexts import pekko.japi.Pair import pekko.persistence.query.{ EventEnvelope => ClassicEventEnvelope } import pekko.persistence.query.Offset @@ -32,8 +35,6 @@ import pekko.persistence.query.typed.javadsl.EventsBySliceQuery import pekko.persistence.query.typed.javadsl.LoadEventQuery import pekko.persistence.r2dbc.query.scaladsl import pekko.stream.javadsl.Source -import pekko.util.OptionConverters._ -import pekko.util.FutureConverters._ object R2dbcReadJournal { val Identifier: String = scaladsl.R2dbcReadJournal.Identifier @@ -68,7 +69,7 @@ final class R2dbcReadJournal(delegate: scaladsl.R2dbcReadJournal) delegate.eventsBySlices(entityType, minSlice, maxSlice, offset).asJava override def sliceRanges(numberOfRanges: Int): util.List[Pair[Integer, Integer]] = { - import pekko.util.ccompat.JavaConverters._ + import scala.jdk.CollectionConverters._ delegate .sliceRanges(numberOfRanges) .map(range => Pair(Integer.valueOf(range.min), Integer.valueOf(range.max))) @@ -94,7 +95,7 @@ final class R2dbcReadJournal(delegate: scaladsl.R2dbcReadJournal) delegate.currentPersistenceIds(afterId.toScala, limit).asJava override def timestampOf(persistenceId: String, sequenceNr: Long): CompletionStage[Optional[Instant]] = - delegate.timestampOf(persistenceId, sequenceNr).map(_.toJava)(ExecutionContexts.parasitic).asJava + delegate.timestampOf(persistenceId, sequenceNr).map(_.toJava)(ExecutionContext.parasitic).asJava override def loadEnvelope[Event](persistenceId: String, sequenceNr: Long): CompletionStage[EventEnvelope[Event]] = delegate.loadEnvelope[Event](persistenceId, sequenceNr).asJava diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala index bc3bb94..7cf77dc 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala @@ -13,12 +13,11 @@ package org.apache.pekko.persistence.r2dbc.snapshot -import scala.concurrent.ExecutionContext -import scala.concurrent.Future +import scala.concurrent.{ ExecutionContext, Future } + import org.apache.pekko import pekko.actor.typed.ActorSystem import pekko.annotation.InternalApi -import pekko.dispatch.ExecutionContexts import pekko.persistence.Persistence import pekko.persistence.SnapshotSelectionCriteria import pekko.persistence.r2dbc.ConnectionFactoryProvider @@ -193,7 +192,7 @@ private[r2dbc] class SnapshotDao(settings: SnapshotSettings, connectionFactory: statement }, collectSerializedSnapshot) - .map(_.headOption)(ExecutionContexts.parasitic) + .map(_.headOption)(ExecutionContext.parasitic) } @@ -231,7 +230,7 @@ private[r2dbc] class SnapshotDao(settings: SnapshotSettings, connectionFactory: statement } - .map(_ => ())(ExecutionContexts.parasitic) + .map(_ => ())(ExecutionContext.parasitic) } def delete(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = { @@ -262,6 +261,6 @@ private[r2dbc] class SnapshotDao(settings: SnapshotSettings, connectionFactory: } statement } - }.map(_ => ())(ExecutionContexts.parasitic) + }.map(_ => ())(ExecutionContext.parasitic) } diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/javadsl/R2dbcDurableStateStore.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/javadsl/R2dbcDurableStateStore.scala index 709efdb..8770f85 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/javadsl/R2dbcDurableStateStore.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/javadsl/R2dbcDurableStateStore.scala @@ -18,6 +18,7 @@ import java.util.Optional import java.util.concurrent.CompletionStage import scala.concurrent.ExecutionContext +import scala.jdk.FutureConverters._ import org.apache.pekko import pekko.Done @@ -31,7 +32,6 @@ import pekko.persistence.r2dbc.state.scaladsl.{ R2dbcDurableStateStore => ScalaR import pekko.persistence.state.javadsl.DurableStateUpdateStore import pekko.persistence.state.javadsl.GetObjectResult import pekko.stream.javadsl.Source -import pekko.util.FutureConverters._ object R2dbcDurableStateStore { val Identifier: String = ScalaR2dbcDurableStateStore.Identifier @@ -75,7 +75,7 @@ class R2dbcDurableStateStore[A](scalaStore: ScalaR2dbcDurableStateStore[A])(impl scalaStore.sliceForPersistenceId(persistenceId) override def sliceRanges(numberOfRanges: Int): util.List[Pair[Integer, Integer]] = { - import pekko.util.ccompat.JavaConverters._ + import scala.jdk.CollectionConverters._ scalaStore .sliceRanges(numberOfRanges) .map(range => Pair(Integer.valueOf(range.min), Integer.valueOf(range.max))) @@ -83,7 +83,7 @@ class R2dbcDurableStateStore[A](scalaStore: ScalaR2dbcDurableStateStore[A])(impl } override def currentPersistenceIds(afterId: Optional[String], limit: Long): Source[String, NotUsed] = { - import pekko.util.OptionConverters._ + import scala.jdk.OptionConverters._ scalaStore.currentPersistenceIds(afterId.toScala, limit).asJava } diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala index 2c5d454..415b7fe 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala @@ -15,16 +15,14 @@ package org.apache.pekko.persistence.r2dbc.state.scaladsl import java.time.Instant -import scala.concurrent.ExecutionContext -import scala.concurrent.Future -import scala.concurrent.duration.Duration -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.{ ExecutionContext, Future } +import scala.concurrent.duration.{ Duration, FiniteDuration } + import org.apache.pekko import pekko.Done import pekko.NotUsed import pekko.actor.typed.ActorSystem import pekko.annotation.InternalApi -import pekko.dispatch.ExecutionContexts import pekko.persistence.Persistence import pekko.persistence.r2dbc.ConnectionFactoryProvider import pekko.persistence.r2dbc.Dialect @@ -302,7 +300,7 @@ private[r2dbc] class DurableStateDao(settings: StateSettings, connectionFactory: if (log.isDebugEnabled()) result.foreach(_ => log.debug("Deleted durable state for persistenceId [{}]", persistenceId)) - result.map(_ => Done)(ExecutionContexts.parasitic) + result.map(_ => Done)(ExecutionContext.parasitic) } /** diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala index ab30892..d6c43aa 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala @@ -14,8 +14,8 @@ package org.apache.pekko.persistence.r2dbc.state.scaladsl import scala.collection.immutable -import scala.concurrent.ExecutionContext -import scala.concurrent.Future +import scala.concurrent.{ ExecutionContext, Future } + import com.typesafe.config.Config import org.apache.pekko import pekko.Done @@ -23,7 +23,6 @@ import pekko.NotUsed import pekko.actor.ExtendedActorSystem import pekko.actor.typed.ActorSystem import pekko.actor.typed.scaladsl.adapter._ -import pekko.dispatch.ExecutionContexts import pekko.persistence.Persistence import pekko.persistence.query.DurableStateChange import pekko.persistence.query.Offset @@ -125,7 +124,7 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg .foreach(throw _) } Done - }(ExecutionContexts.parasitic) + }(ExecutionContext.parasitic) } override def sliceForPersistenceId(persistenceId: String): Int = diff --git a/migration/src/main/scala/org/apache/pekko/persistence/r2dbc/migration/MigrationTool.scala b/migration/src/main/scala/org/apache/pekko/persistence/r2dbc/migration/MigrationTool.scala index 055138c..dddbfd3 100644 --- a/migration/src/main/scala/org/apache/pekko/persistence/r2dbc/migration/MigrationTool.scala +++ b/migration/src/main/scala/org/apache/pekko/persistence/r2dbc/migration/MigrationTool.scala @@ -15,17 +15,15 @@ package org.apache.pekko.persistence.r2dbc.migration import java.time.Instant -import scala.concurrent.Future +import scala.concurrent.{ ExecutionContext, Future } import scala.concurrent.duration._ -import scala.util.Failure -import scala.util.Success -import scala.util.Try +import scala.util.{ Failure, Success, Try } + import org.apache.pekko import pekko.Done import pekko.actor.typed.ActorSystem import pekko.actor.typed.Behavior import pekko.actor.typed.scaladsl.Behaviors -import pekko.dispatch.ExecutionContexts import pekko.pattern.ask import pekko.persistence.Persistence import pekko.persistence.SelectedSnapshot @@ -296,7 +294,7 @@ class MigrationTool(system: ActorSystem[_]) { val serializedRow = serializedSnapotRow(selectedSnapshot) targetSnapshotDao .store(serializedRow) - .map(_ => snapshotMetadata.sequenceNr)(ExecutionContexts.parasitic) + .map(_ => snapshotMetadata.sequenceNr)(ExecutionContext.parasitic) } _ <- migrationDao.updateSnapshotProgress(persistenceId, seqNr) } yield 1 diff --git a/migration/src/main/scala/org/apache/pekko/persistence/r2dbc/migration/MigrationToolDao.scala b/migration/src/main/scala/org/apache/pekko/persistence/r2dbc/migration/MigrationToolDao.scala index aed6d03..9b6c500 100644 --- a/migration/src/main/scala/org/apache/pekko/persistence/r2dbc/migration/MigrationToolDao.scala +++ b/migration/src/main/scala/org/apache/pekko/persistence/r2dbc/migration/MigrationToolDao.scala @@ -13,15 +13,13 @@ package org.apache.pekko.persistence.r2dbc.migration -import scala.concurrent.ExecutionContext -import scala.concurrent.Future +import scala.concurrent.{ ExecutionContext, Future } import scala.concurrent.duration.FiniteDuration import org.apache.pekko import pekko.Done import pekko.actor.typed.ActorSystem import pekko.annotation.InternalApi -import pekko.dispatch.ExecutionContexts import pekko.persistence.r2dbc.internal.Sql.Interpolation import pekko.persistence.r2dbc.internal.R2dbcExecutor import pekko.persistence.r2dbc.journal.JournalDao.log @@ -70,7 +68,7 @@ import io.r2dbc.spi.ConnectionFactory .bind(0, persistenceId) .bind(1, seqNr) } - .map(_ => Done)(ExecutionContexts.parasitic) + .map(_ => Done)(ExecutionContext.parasitic) } def updateSnapshotProgress(persistenceId: String, seqNr: Long): Future[Done] = { @@ -87,7 +85,7 @@ import io.r2dbc.spi.ConnectionFactory .bind(0, persistenceId) .bind(1, seqNr) } - .map(_ => Done)(ExecutionContexts.parasitic) + .map(_ => Done)(ExecutionContext.parasitic) } def currentProgress(persistenceId: String): Future[Option[CurrentProgress]] = { diff --git a/project/CommonSettings.scala b/project/CommonSettings.scala index 2408741..6195661 100644 --- a/project/CommonSettings.scala +++ b/project/CommonSettings.scala @@ -21,7 +21,7 @@ object CommonSettings extends AutoPlugin { override def requires = JvmPlugin && ApacheSonatypePlugin && DynVerPlugin override lazy val projectSettings = Seq( - crossScalaVersions := Seq(Dependencies.Scala212, Dependencies.Scala213, Dependencies.Scala3), + crossScalaVersions := Seq(Dependencies.Scala213, Dependencies.Scala3), scalaVersion := Dependencies.Scala213, crossVersion := CrossVersion.binary, // Setting javac options in common allows IntelliJ IDEA to import them automatically diff --git a/project/Dependencies.scala b/project/Dependencies.scala index af6c233..42f5863 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -14,7 +14,6 @@ import sbt._ object Dependencies { - val Scala212 = "2.12.20" val Scala213 = "2.13.16" val Scala3 = "3.3.6" val PekkoVersion = PekkoCoreDependency.version diff --git a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSettings.scala b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSettings.scala index 70bc11b..15eb323 100644 --- a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSettings.scala +++ b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSettings.scala @@ -17,12 +17,13 @@ import java.time.{ Duration => JDuration } import java.util.Locale import scala.concurrent.duration._ +import scala.jdk.DurationConverters._ import scala.util.hashing.MurmurHash3 + import com.typesafe.config.Config import org.apache.pekko import pekko.actor.typed.ActorSystem import pekko.persistence.r2dbc.Dialect -import pekko.util.JavaDurationConverters._ object R2dbcProjectionSettings { @@ -32,7 +33,7 @@ object R2dbcProjectionSettings { val logDbCallsExceeding: FiniteDuration = config.getString("log-db-calls-exceeding").toLowerCase(Locale.ROOT) match { case "off" => -1.millis - case _ => config.getDuration("log-db-calls-exceeding").asScala + case _ => config.getDuration("log-db-calls-exceeding").toScala } new R2dbcProjectionSettings( diff --git a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/BySliceSourceProviderAdapter.scala b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/BySliceSourceProviderAdapter.scala index 1c4b190..d0b9383 100644 --- a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/BySliceSourceProviderAdapter.scala +++ b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/BySliceSourceProviderAdapter.scala @@ -18,24 +18,21 @@ import java.util.Optional import java.util.concurrent.CompletionStage import java.util.function.Supplier -import scala.concurrent.Future +import scala.concurrent.{ ExecutionContext, Future } +import scala.jdk.CollectionConverters._ +import scala.jdk.FutureConverters._ +import scala.jdk.OptionConverters._ import org.apache.pekko import pekko.NotUsed import pekko.annotation.InternalApi -import pekko.projection.javadsl -import pekko.projection.scaladsl -import pekko.dispatch.ExecutionContexts -import pekko.stream.scaladsl.Source -import pekko.util.ccompat.JavaConverters._ -import pekko.util.FutureConverters._ -import pekko.util.OptionConverters._ -import scala.concurrent.ExecutionContext - import pekko.persistence.query.typed.EventEnvelope import pekko.persistence.query.typed.scaladsl.EventTimestampQuery import pekko.persistence.query.typed.scaladsl.LoadEventQuery +import pekko.projection.javadsl +import pekko.projection.scaladsl import pekko.projection.BySlicesSourceProvider +import pekko.stream.scaladsl.Source /** * INTERNAL API: Adapter from javadsl.SourceProvider to scaladsl.SourceProvider @@ -50,7 +47,7 @@ import pekko.projection.BySlicesSourceProvider def source(offset: () => Future[Option[Offset]]): Future[Source[Envelope, NotUsed]] = { // the parasitic context is used to convert the Optional to Option and a java streams Source to a scala Source, // it _should_ not be used for the blocking operation of getting offsets themselves - val ec = pekko.dispatch.ExecutionContexts.parasitic + val ec = ExecutionContext.parasitic val offsetAdapter = new Supplier[CompletionStage[Optional[Offset]]] { override def get(): CompletionStage[Optional[Offset]] = offset().map(_.toJava)(ec).asJava } @@ -70,7 +67,7 @@ import pekko.projection.BySlicesSourceProvider override def timestampOf(persistenceId: String, sequenceNr: Long): Future[Option[Instant]] = delegate match { case timestampQuery: pekko.persistence.query.typed.javadsl.EventTimestampQuery => - timestampQuery.timestampOf(persistenceId, sequenceNr).asScala.map(_.toScala)(ExecutionContexts.parasitic) + timestampQuery.timestampOf(persistenceId, sequenceNr).asScala.map(_.toScala)(ExecutionContext.parasitic) case _ => Future.failed( new IllegalArgumentException( diff --git a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcHandlerAdapter.scala b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcHandlerAdapter.scala index de71c82..c64a1ed 100644 --- a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcHandlerAdapter.scala +++ b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcHandlerAdapter.scala @@ -15,6 +15,8 @@ package org.apache.pekko.projection.r2dbc.internal import scala.collection.immutable import scala.concurrent.Future +import scala.jdk.CollectionConverters._ +import scala.jdk.FutureConverters._ import org.apache.pekko import pekko.Done @@ -22,8 +24,6 @@ import pekko.annotation.InternalApi import pekko.projection.r2dbc.javadsl import pekko.projection.r2dbc.javadsl.R2dbcSession import pekko.projection.r2dbc.scaladsl -import pekko.util.ccompat.JavaConverters._ -import pekko.util.FutureConverters._ /** * INTERNAL API: Adapter from javadsl.R2dbcHandler to scaladsl.R2dbcHandler diff --git a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala index 3c612ee..e2515b7 100644 --- a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala +++ b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala @@ -22,14 +22,13 @@ import java.util.concurrent.atomic.AtomicReference import scala.annotation.tailrec import scala.collection.immutable -import scala.concurrent.ExecutionContext -import scala.concurrent.Future +import scala.concurrent.{ ExecutionContext, Future } + import org.apache.pekko import pekko.Done import pekko.actor.typed.ActorSystem import pekko.actor.typed.scaladsl.LoggerOps import pekko.annotation.InternalApi -import pekko.dispatch.ExecutionContexts import pekko.persistence.Persistence import pekko.persistence.query.DurableStateChange import pekko.persistence.query.Offset @@ -291,8 +290,8 @@ private[projection] class R2dbcOffsetStore( case timestampQuery: EventTimestampQuery => timestampQuery.timestampOf(persistenceId, sequenceNr) case timestampQuery: pekko.persistence.query.typed.javadsl.EventTimestampQuery => - import pekko.util.FutureConverters._ - import pekko.util.OptionConverters._ + import scala.jdk.FutureConverters._ + import scala.jdk.OptionConverters._ timestampQuery.timestampOf(persistenceId, sequenceNr).asScala.map(_.toScala) case _ => throw new IllegalArgumentException( @@ -418,7 +417,7 @@ private[projection] class R2dbcOffsetStore( .withConnection("save offset") { conn => saveOffsetInTx(conn, offset) } - .map(_ => Done)(ExecutionContexts.parasitic) + .map(_ => Done)(ExecutionContext.parasitic) } /** @@ -440,7 +439,7 @@ private[projection] class R2dbcOffsetStore( .withConnection("save offsets") { conn => saveOffsetsInTx(conn, offsets) } - .map(_ => Done)(ExecutionContexts.parasitic) + .map(_ => Done)(ExecutionContext.parasitic) } def saveOffsetsInTx[Offset](conn: Connection, offsets: immutable.IndexedSeq[Offset]): Future[Done] = { @@ -604,7 +603,7 @@ private[projection] class R2dbcOffsetStore( case MultipleOffsets(many) => many.map(upsertStmt).toVector } - R2dbcExecutor.updateInTx(statements).map(_ => Done)(ExecutionContexts.parasitic) + R2dbcExecutor.updateInTx(statements).map(_ => Done)(ExecutionContext.parasitic) } def isDuplicate(record: Record): Boolean = @@ -884,14 +883,14 @@ private[projection] class R2dbcOffsetStore( insertTimestampOffsetInTx(conn, records) } } - .map(_ => Done)(ExecutionContexts.parasitic) + .map(_ => Done)(ExecutionContext.parasitic) case _ => r2dbcExecutor .withConnection("set offset") { conn => savePrimitiveOffsetInTx(conn, offset) } - .map(_ => Done)(ExecutionContexts.parasitic) + .map(_ => Done)(ExecutionContext.parasitic) } } @@ -1001,7 +1000,7 @@ private[projection] class R2dbcOffsetStore( .bind(2, paused) .bind(3, Instant.now(clock).toEpochMilli) } - .map(_ => Done)(ExecutionContexts.parasitic) + .map(_ => Done)(ExecutionContext.parasitic) } private def createRecordWithOffset[Envelope](envelope: Envelope): Option[RecordWithOffset] = { diff --git a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala index 5452a92..4a96fd0 100644 --- a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala +++ b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala @@ -102,7 +102,7 @@ private[projection] object R2dbcProjectionImpl { case loadEventQuery: LoadEventQuery => loadEventQuery.loadEnvelope[Any](pid, seqNr) case loadEventQuery: pekko.persistence.query.typed.javadsl.LoadEventQuery => - import pekko.util.FutureConverters._ + import scala.jdk.FutureConverters._ loadEventQuery.loadEnvelope[Any](pid, seqNr).asScala case _ => throw new IllegalArgumentException( @@ -127,7 +127,7 @@ private[projection] object R2dbcProjectionImpl { case store: DurableStateStore[_] => store.getObject(pid) case store: pekko.persistence.state.javadsl.DurableStateStore[_] => - import pekko.util.FutureConverters._ + import scala.jdk.FutureConverters._ store.getObject(pid).asScala.map(_.toScala) }).map { case GetObjectResult(Some(loadedValue), loadedRevision) => diff --git a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/javadsl/R2dbcProjection.scala b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/javadsl/R2dbcProjection.scala index 007c54a..b70441c 100644 --- a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/javadsl/R2dbcProjection.scala +++ b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/javadsl/R2dbcProjection.scala @@ -16,6 +16,8 @@ package org.apache.pekko.projection.r2dbc.javadsl import java.util.Optional import java.util.function.Supplier +import scala.jdk.OptionConverters._ + import org.apache.pekko import pekko.Done import pekko.actor.typed.ActorSystem @@ -38,7 +40,6 @@ import pekko.projection.r2dbc.internal.R2dbcGroupedHandlerAdapter import pekko.projection.r2dbc.internal.R2dbcHandlerAdapter import pekko.projection.r2dbc.scaladsl import pekko.stream.javadsl.FlowWithContext -import pekko.util.OptionConverters._ @ApiMayChange object R2dbcProjection { diff --git a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/javadsl/R2dbcSession.scala b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/javadsl/R2dbcSession.scala index 079cb95..ee20292 100644 --- a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/javadsl/R2dbcSession.scala +++ b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/javadsl/R2dbcSession.scala @@ -17,15 +17,14 @@ import java.util.Optional import java.util.concurrent.CompletionStage import scala.concurrent.ExecutionContext +import scala.jdk.CollectionConverters._ +import scala.jdk.FutureConverters._ +import scala.jdk.OptionConverters._ import org.apache.pekko import pekko.actor.typed.ActorSystem import pekko.annotation.ApiMayChange -import pekko.dispatch.ExecutionContexts import pekko.persistence.r2dbc.internal.R2dbcExecutor -import pekko.util.ccompat.JavaConverters._ -import pekko.util.FutureConverters._ -import pekko.util.OptionConverters._ import io.r2dbc.spi.Connection import io.r2dbc.spi.Row import io.r2dbc.spi.Statement @@ -37,14 +36,14 @@ final class R2dbcSession(connection: Connection)(implicit ec: ExecutionContext, connection.createStatement(sql) def updateOne(statement: Statement): CompletionStage[java.lang.Long] = - R2dbcExecutor.updateOneInTx(statement).map(java.lang.Long.valueOf)(ExecutionContexts.parasitic).asJava + R2dbcExecutor.updateOneInTx(statement).map(java.lang.Long.valueOf)(ExecutionContext.parasitic).asJava def update(statements: java.util.List[Statement]): CompletionStage[java.util.List[java.lang.Long]] = R2dbcExecutor.updateInTx(statements.asScala.toVector).map(results => results.map(java.lang.Long.valueOf).asJava).asJava def selectOne[A](statement: Statement)(mapRow: Row => A): CompletionStage[Optional[A]] = - R2dbcExecutor.selectOneInTx(statement, mapRow).map(_.toJava)(ExecutionContexts.parasitic).asJava + R2dbcExecutor.selectOneInTx(statement, mapRow).map(_.toJava)(ExecutionContext.parasitic).asJava def select[A](statement: Statement)(mapRow: Row => A): CompletionStage[java.util.List[A]] = R2dbcExecutor.selectInTx(statement, mapRow).map(_.asJava).asJava diff --git a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestSourceProviderWithInput.scala b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestSourceProviderWithInput.scala index 8e6cf78..2817b76 100644 --- a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestSourceProviderWithInput.scala +++ b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestSourceProviderWithInput.scala @@ -20,6 +20,7 @@ import java.util.concurrent.atomic.AtomicReference import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.Promise +import scala.jdk.CollectionConverters._ import org.apache.pekko import pekko.NotUsed @@ -35,7 +36,6 @@ import pekko.projection.BySlicesSourceProvider import pekko.projection.scaladsl.SourceProvider import pekko.stream.OverflowStrategy import pekko.stream.scaladsl.Source -import pekko.util.ccompat.JavaConverters._ class TestSourceProviderWithInput()(implicit val system: ActorSystem[_]) extends SourceProvider[TimestampOffset, EventEnvelope[String]] --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pekko.apache.org For additional commands, e-mail: commits-h...@pekko.apache.org