This is an automated email from the ASF dual-hosted git repository.
nvollmar pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/incubator-pekko-persistence-cassandra.git
The following commit(s) were added to refs/heads/main by this push:
new e497b9e Apply compat changes from latest Pekko (#46)
e497b9e is described below
commit e497b9e1ce07ef05f37aedad882b6202a70804ad
Author: Nicolas Vollmar <[email protected]>
AuthorDate: Thu May 11 12:41:36 2023 +0200
Apply compat changes from latest Pekko (#46)
---
.../cassandra/journal/CassandraJournal.scala | 4 ++--
.../journal/CassandraJournalStatements.scala | 27 ++++++++++------------
.../query/EventsByPersistenceIdStage.scala | 8 +++----
.../cassandra/query/EventsByTagStage.scala | 6 ++---
.../query/javadsl/CassandraReadJournal.scala | 5 ++--
.../snapshot/CassandraSnapshotStatements.scala | 6 ++---
.../snapshot/CassandraSnapshotStore.scala | 6 ++---
project/Dependencies.scala | 7 +++---
8 files changed, 33 insertions(+), 36 deletions(-)
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraJournal.scala
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraJournal.scala
index 2088811..32258b6 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraJournal.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraJournal.scala
@@ -37,6 +37,7 @@ import pekko.stream.connectors.cassandra.scaladsl.{
CassandraSession, CassandraS
import pekko.stream.scaladsl.Sink
import pekko.dispatch.ExecutionContexts
import pekko.util.{ OptionVal, Timeout }
+import pekko.util.FutureConverters._
import com.datastax.oss.driver.api.core.cql._
import com.typesafe.config.Config
import com.datastax.oss.driver.api.core.uuid.Uuids
@@ -49,7 +50,6 @@ import scala.collection.immutable.Seq
import scala.concurrent._
import scala.util.control.NonFatal
import scala.util.{ Failure, Success, Try }
-import scala.compat.java8.FutureConverters._
import pekko.annotation.DoNotInherit
import pekko.annotation.InternalStableApi
import pekko.stream.scaladsl.Source
@@ -701,7 +701,7 @@ import pekko.stream.scaladsl.Source
var batch =
new
BatchStatementBuilder(BatchType.UNLOGGED).build().setExecutionProfileName(journalSettings.writeProfile)
batch = body(batch)
- session.underlying().flatMap(_.executeAsync(batch).toScala).map(_ => ())
+ session.underlying().flatMap(_.executeAsync(batch).asScala).map(_ => ())
}
private def selectOne[T <: Statement[T]](stmt: Statement[T]):
Future[Option[Row]] = {
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraJournalStatements.scala
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraJournalStatements.scala
index 0848da1..99a8876 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraJournalStatements.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraJournalStatements.scala
@@ -13,18 +13,16 @@
package org.apache.pekko.persistence.cassandra.journal
-import scala.compat.java8.FutureConverters._
-import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
+import com.datastax.oss.driver.api.core.CqlSession
import org.apache.pekko
import pekko.Done
import pekko.annotation.InternalApi
import pekko.event.LoggingAdapter
-import pekko.persistence.cassandra.PluginSettings
-import pekko.persistence.cassandra.indent
-import com.datastax.oss.driver.api.core.CqlSession
-import pekko.persistence.cassandra.FutureDone
+import pekko.persistence.cassandra.{ indent, FutureDone, PluginSettings }
+import pekko.util.FutureConverters._
+
+import scala.concurrent.{ ExecutionContext, Future }
/**
* INTERNAL API
@@ -345,15 +343,15 @@ import pekko.persistence.cassandra.FutureDone
def tagStatements: Future[Done] =
if (eventsByTagSettings.eventsByTagEnabled) {
for {
- _ <- session.executeAsync(createTagsTable).toScala
- _ <- session.executeAsync(createTagsProgressTable).toScala
- _ <- session.executeAsync(createTagScanningTable).toScala
+ _ <- session.executeAsync(createTagsTable).asScala
+ _ <- session.executeAsync(createTagsProgressTable).asScala
+ _ <- session.executeAsync(createTagScanningTable).asScala
} yield Done
} else FutureDone
def keyspace: Future[Done] =
if (journalSettings.keyspaceAutoCreate)
- session.executeAsync(createKeyspace).toScala.map(_ => Done)
+ session.executeAsync(createKeyspace).asScala.map(_ => Done)
else FutureDone
val done = if (journalSettings.tablesAutoCreate) {
@@ -361,11 +359,11 @@ import pekko.persistence.cassandra.FutureDone
session.setSchemaMetadataEnabled(false)
val result = for {
_ <- keyspace
- _ <- session.executeAsync(createTable).toScala
- _ <- session.executeAsync(createMetadataTable).toScala
+ _ <- session.executeAsync(createTable).asScala
+ _ <- session.executeAsync(createMetadataTable).asScala
_ <- {
if (settings.journalSettings.supportAllPersistenceIds)
- session.executeAsync(createAllPersistenceIdsTable).toScala
+ session.executeAsync(createAllPersistenceIdsTable).asScala
else
FutureDone
}
@@ -390,5 +388,4 @@ import pekko.persistence.cassandra.FutureDone
done
}
-
}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdStage.scala
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdStage.scala
index b5af93d..866fd86 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdStage.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdStage.scala
@@ -21,11 +21,11 @@ import pekko.annotation.InternalApi
import pekko.persistence.cassandra.PluginSettings
import pekko.stream.stage._
import pekko.stream.{ Attributes, Outlet, SourceShape }
+import pekko.util.FutureConverters._
import java.lang.{ Long => JLong }
import java.util.concurrent.ThreadLocalRandom
import scala.annotation.{ nowarn, tailrec }
-import scala.compat.java8.FutureConverters._
import scala.concurrent.duration.{ FiniteDuration, _ }
import scala.concurrent.{ ExecutionContext, Future, Promise }
import scala.util.{ Failure, Success, Try }
@@ -79,7 +79,7 @@ import scala.util.{ Failure, Success, Try }
def selectSingleRow(persistenceId: String, pnr: Long)(implicit ec:
ExecutionContext): Future[Option[Row]] = {
val boundStatement = selectSingleRowQuery.bind(persistenceId, pnr:
JLong).setExecutionProfileName(profile)
- session.executeAsync(boundStatement).toScala.map(rs => Option(rs.one()))
+ session.executeAsync(boundStatement).asScala.map(rs => Option(rs.one()))
}
def highestDeletedSequenceNumber(persistenceId: String)(implicit ec:
ExecutionContext): Future[Long] =
@@ -87,7 +87,7 @@ import scala.util.{ Failure, Success, Try }
Option(r.one()).map(_.getLong("deleted_to")).getOrElse(0))
private def executeStatement(statement: Statement[_]):
Future[AsyncResultSet] =
- session.executeAsync(statement).toScala
+ session.executeAsync(statement).asScala
}
@@ -408,7 +408,7 @@ import scala.util.{ Failure, Success, Try }
} else if (rs.remaining() == 0) {
log.debug("EventsByPersistenceId [{}] Fetch more from seqNr
[{}]", persistenceId, expectedNextSeqNr)
queryState = QueryInProgress(switchPartition, fetchMore = true,
System.nanoTime())
- val rsFut = rs.fetchNextPage().toScala
+ val rsFut = rs.fetchNextPage().asScala
rsFut.onComplete(newResultSetCb.invoke)
} else {
val row = rs.one()
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagStage.scala
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagStage.scala
index 4117394..0f78274 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagStage.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagStage.scala
@@ -28,6 +28,7 @@ import pekko.persistence.cassandra.query.EventsByTagStage._
import
pekko.persistence.cassandra.query.scaladsl.CassandraReadJournal.EventByTagStatements
import pekko.stream.stage._
import pekko.stream.{ Attributes, Outlet, SourceShape }
+import pekko.util.FutureConverters._
import pekko.util.PrettyDuration._
import pekko.util.UUIDComparator
@@ -35,7 +36,6 @@ import java.lang.{ Long => JLong }
import java.util.UUID
import java.util.concurrent.ThreadLocalRandom
import scala.annotation.tailrec
-import scala.compat.java8.FutureConverters._
import scala.concurrent.duration.{ Duration, _ }
import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor, Future }
import scala.util.{ Failure, Success, Try }
@@ -106,7 +106,7 @@ import scala.util.{ Failure, Success, Try }
Retries.retry({ () =>
val bound =
statements.byTagWithUpperLimit.bind(tag, bucket.key: JLong, from,
to).setExecutionProfileName(readProfile)
- session.executeAsync(bound).toScala
+ session.executeAsync(bound).asScala
}, retries.retries, onFailure, retries.minDuration,
retries.maxDuration, retries.randomFactor)
}
}
@@ -957,7 +957,7 @@ import scala.util.{ Failure, Success, Try }
private def fetchMore(rs: AsyncResultSet): Unit = {
log.debug("[{}] No more results without paging. Requesting more.",
stageUuid)
- val moreResults = rs.fetchNextPage().toScala
+ val moreResults = rs.fetchNextPage().asScala
updateQueryState(QueryInProgress(abortForMissingSearch = false))
moreResults.onComplete(newResultSetCb.invoke)
}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/javadsl/CassandraReadJournal.scala
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/javadsl/CassandraReadJournal.scala
index bb488a9..a70945c 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/javadsl/CassandraReadJournal.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/javadsl/CassandraReadJournal.scala
@@ -24,8 +24,7 @@ import pekko.persistence.query.TimeBasedUUID
import pekko.persistence.query.javadsl._
import pekko.stream.connectors.cassandra.javadsl.CassandraSession
import pekko.stream.javadsl.Source
-
-import scala.compat.java8.FutureConverters
+import pekko.util.FutureConverters._
object CassandraReadJournal {
@@ -78,7 +77,7 @@ class CassandraReadJournal(
* using the read journal.
*/
def initialize(): CompletionStage[Done] =
- FutureConverters.toJava(scaladslReadJournal.initialize())
+ scaladslReadJournal.initialize().asJava
/**
* Use this as the UUID offset in `eventsByTag` queries when you want all
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/snapshot/CassandraSnapshotStatements.scala
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/snapshot/CassandraSnapshotStatements.scala
index e764a4c..4685534 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/snapshot/CassandraSnapshotStatements.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/snapshot/CassandraSnapshotStatements.scala
@@ -13,11 +13,11 @@
package org.apache.pekko.persistence.cassandra.snapshot
-import scala.compat.java8.FutureConverters._
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import org.apache.pekko
+import pekko.util.FutureConverters._
import pekko.Done
import pekko.annotation.InternalApi
import pekko.event.LoggingAdapter
@@ -122,7 +122,7 @@ import pekko.persistence.cassandra.FutureDone
implicit ec: ExecutionContext): Future[Done] = {
def keyspace: Future[Done] =
if (snapshotSettings.keyspaceAutoCreate)
- session.executeAsync(createKeyspace).toScala.map(_ => Done)
+ session.executeAsync(createKeyspace).asScala.map(_ => Done)
else FutureDone
if (snapshotSettings.tablesAutoCreate) {
@@ -130,7 +130,7 @@ import pekko.persistence.cassandra.FutureDone
session.setSchemaMetadataEnabled(false)
val result = for {
_ <- keyspace
- _ <- session.executeAsync(createTable).toScala
+ _ <- session.executeAsync(createTable).asScala
} yield {
session.setSchemaMetadataEnabled(null)
Done
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/snapshot/CassandraSnapshotStore.scala
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/snapshot/CassandraSnapshotStore.scala
index d620196..5cf905e 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/snapshot/CassandraSnapshotStore.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/snapshot/CassandraSnapshotStore.scala
@@ -31,13 +31,14 @@ import pekko.serialization.{ AsyncSerializer,
Serialization, SerializationExtens
import pekko.stream.connectors.cassandra.scaladsl.{ CassandraSession,
CassandraSessionRegistry }
import pekko.stream.scaladsl.{ Sink, Source }
import pekko.util.{ unused, OptionVal }
+import pekko.util.FutureConverters._
import java.lang.{ Long => JLong }
import java.nio.ByteBuffer
import scala.collection.immutable
import scala.concurrent.{ ExecutionContext, Future }
-import scala.util.{ Failure, Success }
import scala.util.control.NonFatal
+import scala.util.{ Failure, Success }
/**
* INTERNAL API
@@ -243,11 +244,10 @@ import scala.util.control.NonFatal
}
def executeBatch(body: BatchStatementBuilder => Unit): Future[Unit] = {
- import scala.compat.java8.FutureConverters._
val batch =
new
BatchStatementBuilder(BatchType.UNLOGGED).setExecutionProfileName(snapshotSettings.writeProfile)
body(batch)
- session.underlying().flatMap(_.executeAsync(batch.build()).toScala).map(_
=> ())
+ session.underlying().flatMap(_.executeAsync(batch.build()).asScala).map(_
=> ())
}
private def metadata(
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index e81feaa..3a7f6ec 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -17,16 +17,16 @@ object Dependencies {
val scala3Version = "3.1.2" // not yet enabled - missing
pekko-http/pekko-management Scala 3 artifacts
val scalaVersions = Seq(scala212Version, scala213Version)
- val pekkoVersion = System.getProperty("override.pekko.version",
"0.0.0+26630-2c4d0ee0-SNAPSHOT")
+ val pekkoVersion = System.getProperty("override.pekko.version",
"0.0.0+26656-898c6970-SNAPSHOT")
val pekkoVersionInDocs = "current"
val cassandraVersionInDocs = "4.0"
// Should be sync with the version of the driver in Pekko Connectors
Cassandra
val driverVersionInDocs = "4.6"
- val pekkoConnectorsVersion = "0.0.0+64-20da4165-SNAPSHOT"
+ val pekkoConnectorsVersion = "0.0.0+85-a82f3c3c-SNAPSHOT"
val pekkoConnectorsVersionInDocs = "current"
// for example
- val pekkoManagementVersion = "0.0.0+714-a034fd01-SNAPSHOT"
+ val pekkoManagementVersion = "0.0.0+724-41d3b29c-SNAPSHOT"
val logback = "ch.qos.logback" % "logback-classic" % "1.2.10"
@@ -49,6 +49,7 @@ object Dependencies {
"org.apache.pekko" %% "pekko-connectors-cassandra" %
pekkoConnectorsVersion,
"org.apache.pekko" %% "pekko-persistence" % pekkoVersion,
"org.apache.pekko" %% "pekko-persistence-query" % pekkoVersion,
+ "org.apache.pekko" %% "pekko-stream" % pekkoVersion,
"org.apache.pekko" %% "pekko-cluster-tools" % pekkoVersion,
"org.scala-lang.modules" %% "scala-collection-compat" % "2.7.0",
logback % Test,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]