This is an automated email from the ASF dual-hosted git repository.

nvollmar pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/incubator-pekko-persistence-cassandra.git


The following commit(s) were added to refs/heads/main by this push:
     new e497b9e  Apply compat changes from latest Pekko (#46)
e497b9e is described below

commit e497b9e1ce07ef05f37aedad882b6202a70804ad
Author: Nicolas Vollmar <[email protected]>
AuthorDate: Thu May 11 12:41:36 2023 +0200

    Apply compat changes from latest Pekko (#46)
---
 .../cassandra/journal/CassandraJournal.scala       |  4 ++--
 .../journal/CassandraJournalStatements.scala       | 27 ++++++++++------------
 .../query/EventsByPersistenceIdStage.scala         |  8 +++----
 .../cassandra/query/EventsByTagStage.scala         |  6 ++---
 .../query/javadsl/CassandraReadJournal.scala       |  5 ++--
 .../snapshot/CassandraSnapshotStatements.scala     |  6 ++---
 .../snapshot/CassandraSnapshotStore.scala          |  6 ++---
 project/Dependencies.scala                         |  7 +++---
 8 files changed, 33 insertions(+), 36 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraJournal.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraJournal.scala
index 2088811..32258b6 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraJournal.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraJournal.scala
@@ -37,6 +37,7 @@ import pekko.stream.connectors.cassandra.scaladsl.{ 
CassandraSession, CassandraS
 import pekko.stream.scaladsl.Sink
 import pekko.dispatch.ExecutionContexts
 import pekko.util.{ OptionVal, Timeout }
+import pekko.util.FutureConverters._
 import com.datastax.oss.driver.api.core.cql._
 import com.typesafe.config.Config
 import com.datastax.oss.driver.api.core.uuid.Uuids
@@ -49,7 +50,6 @@ import scala.collection.immutable.Seq
 import scala.concurrent._
 import scala.util.control.NonFatal
 import scala.util.{ Failure, Success, Try }
-import scala.compat.java8.FutureConverters._
 import pekko.annotation.DoNotInherit
 import pekko.annotation.InternalStableApi
 import pekko.stream.scaladsl.Source
@@ -701,7 +701,7 @@ import pekko.stream.scaladsl.Source
     var batch =
       new 
BatchStatementBuilder(BatchType.UNLOGGED).build().setExecutionProfileName(journalSettings.writeProfile)
     batch = body(batch)
-    session.underlying().flatMap(_.executeAsync(batch).toScala).map(_ => ())
+    session.underlying().flatMap(_.executeAsync(batch).asScala).map(_ => ())
   }
 
   private def selectOne[T <: Statement[T]](stmt: Statement[T]): 
Future[Option[Row]] = {
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraJournalStatements.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraJournalStatements.scala
index 0848da1..99a8876 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraJournalStatements.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraJournalStatements.scala
@@ -13,18 +13,16 @@
 
 package org.apache.pekko.persistence.cassandra.journal
 
-import scala.compat.java8.FutureConverters._
-import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
+import com.datastax.oss.driver.api.core.CqlSession
 
 import org.apache.pekko
 import pekko.Done
 import pekko.annotation.InternalApi
 import pekko.event.LoggingAdapter
-import pekko.persistence.cassandra.PluginSettings
-import pekko.persistence.cassandra.indent
-import com.datastax.oss.driver.api.core.CqlSession
-import pekko.persistence.cassandra.FutureDone
+import pekko.persistence.cassandra.{ indent, FutureDone, PluginSettings }
+import pekko.util.FutureConverters._
+
+import scala.concurrent.{ ExecutionContext, Future }
 
 /**
  * INTERNAL API
@@ -345,15 +343,15 @@ import pekko.persistence.cassandra.FutureDone
     def tagStatements: Future[Done] =
       if (eventsByTagSettings.eventsByTagEnabled) {
         for {
-          _ <- session.executeAsync(createTagsTable).toScala
-          _ <- session.executeAsync(createTagsProgressTable).toScala
-          _ <- session.executeAsync(createTagScanningTable).toScala
+          _ <- session.executeAsync(createTagsTable).asScala
+          _ <- session.executeAsync(createTagsProgressTable).asScala
+          _ <- session.executeAsync(createTagScanningTable).asScala
         } yield Done
       } else FutureDone
 
     def keyspace: Future[Done] =
       if (journalSettings.keyspaceAutoCreate)
-        session.executeAsync(createKeyspace).toScala.map(_ => Done)
+        session.executeAsync(createKeyspace).asScala.map(_ => Done)
       else FutureDone
 
     val done = if (journalSettings.tablesAutoCreate) {
@@ -361,11 +359,11 @@ import pekko.persistence.cassandra.FutureDone
       session.setSchemaMetadataEnabled(false)
       val result = for {
         _ <- keyspace
-        _ <- session.executeAsync(createTable).toScala
-        _ <- session.executeAsync(createMetadataTable).toScala
+        _ <- session.executeAsync(createTable).asScala
+        _ <- session.executeAsync(createMetadataTable).asScala
         _ <- {
           if (settings.journalSettings.supportAllPersistenceIds)
-            session.executeAsync(createAllPersistenceIdsTable).toScala
+            session.executeAsync(createAllPersistenceIdsTable).asScala
           else
             FutureDone
         }
@@ -390,5 +388,4 @@ import pekko.persistence.cassandra.FutureDone
 
     done
   }
-
 }
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdStage.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdStage.scala
index b5af93d..866fd86 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdStage.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdStage.scala
@@ -21,11 +21,11 @@ import pekko.annotation.InternalApi
 import pekko.persistence.cassandra.PluginSettings
 import pekko.stream.stage._
 import pekko.stream.{ Attributes, Outlet, SourceShape }
+import pekko.util.FutureConverters._
 
 import java.lang.{ Long => JLong }
 import java.util.concurrent.ThreadLocalRandom
 import scala.annotation.{ nowarn, tailrec }
-import scala.compat.java8.FutureConverters._
 import scala.concurrent.duration.{ FiniteDuration, _ }
 import scala.concurrent.{ ExecutionContext, Future, Promise }
 import scala.util.{ Failure, Success, Try }
@@ -79,7 +79,7 @@ import scala.util.{ Failure, Success, Try }
 
     def selectSingleRow(persistenceId: String, pnr: Long)(implicit ec: 
ExecutionContext): Future[Option[Row]] = {
       val boundStatement = selectSingleRowQuery.bind(persistenceId, pnr: 
JLong).setExecutionProfileName(profile)
-      session.executeAsync(boundStatement).toScala.map(rs => Option(rs.one()))
+      session.executeAsync(boundStatement).asScala.map(rs => Option(rs.one()))
     }
 
     def highestDeletedSequenceNumber(persistenceId: String)(implicit ec: 
ExecutionContext): Future[Long] =
@@ -87,7 +87,7 @@ import scala.util.{ Failure, Success, Try }
         Option(r.one()).map(_.getLong("deleted_to")).getOrElse(0))
 
     private def executeStatement(statement: Statement[_]): 
Future[AsyncResultSet] =
-      session.executeAsync(statement).toScala
+      session.executeAsync(statement).asScala
 
   }
 
@@ -408,7 +408,7 @@ import scala.util.{ Failure, Success, Try }
             } else if (rs.remaining() == 0) {
               log.debug("EventsByPersistenceId [{}] Fetch more from seqNr 
[{}]", persistenceId, expectedNextSeqNr)
               queryState = QueryInProgress(switchPartition, fetchMore = true, 
System.nanoTime())
-              val rsFut = rs.fetchNextPage().toScala
+              val rsFut = rs.fetchNextPage().asScala
               rsFut.onComplete(newResultSetCb.invoke)
             } else {
               val row = rs.one()
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagStage.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagStage.scala
index 4117394..0f78274 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagStage.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagStage.scala
@@ -28,6 +28,7 @@ import pekko.persistence.cassandra.query.EventsByTagStage._
 import 
pekko.persistence.cassandra.query.scaladsl.CassandraReadJournal.EventByTagStatements
 import pekko.stream.stage._
 import pekko.stream.{ Attributes, Outlet, SourceShape }
+import pekko.util.FutureConverters._
 import pekko.util.PrettyDuration._
 import pekko.util.UUIDComparator
 
@@ -35,7 +36,6 @@ import java.lang.{ Long => JLong }
 import java.util.UUID
 import java.util.concurrent.ThreadLocalRandom
 import scala.annotation.tailrec
-import scala.compat.java8.FutureConverters._
 import scala.concurrent.duration.{ Duration, _ }
 import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor, Future }
 import scala.util.{ Failure, Success, Try }
@@ -106,7 +106,7 @@ import scala.util.{ Failure, Success, Try }
       Retries.retry({ () =>
           val bound =
             statements.byTagWithUpperLimit.bind(tag, bucket.key: JLong, from, 
to).setExecutionProfileName(readProfile)
-          session.executeAsync(bound).toScala
+          session.executeAsync(bound).asScala
         }, retries.retries, onFailure, retries.minDuration, 
retries.maxDuration, retries.randomFactor)
     }
   }
@@ -957,7 +957,7 @@ import scala.util.{ Failure, Success, Try }
 
       private def fetchMore(rs: AsyncResultSet): Unit = {
         log.debug("[{}] No more results without paging. Requesting more.", 
stageUuid)
-        val moreResults = rs.fetchNextPage().toScala
+        val moreResults = rs.fetchNextPage().asScala
         updateQueryState(QueryInProgress(abortForMissingSearch = false))
         moreResults.onComplete(newResultSetCb.invoke)
       }
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/javadsl/CassandraReadJournal.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/javadsl/CassandraReadJournal.scala
index bb488a9..a70945c 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/javadsl/CassandraReadJournal.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/javadsl/CassandraReadJournal.scala
@@ -24,8 +24,7 @@ import pekko.persistence.query.TimeBasedUUID
 import pekko.persistence.query.javadsl._
 import pekko.stream.connectors.cassandra.javadsl.CassandraSession
 import pekko.stream.javadsl.Source
-
-import scala.compat.java8.FutureConverters
+import pekko.util.FutureConverters._
 
 object CassandraReadJournal {
 
@@ -78,7 +77,7 @@ class CassandraReadJournal(
    * using the read journal.
    */
   def initialize(): CompletionStage[Done] =
-    FutureConverters.toJava(scaladslReadJournal.initialize())
+    scaladslReadJournal.initialize().asJava
 
   /**
    * Use this as the UUID offset in `eventsByTag` queries when you want all
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/snapshot/CassandraSnapshotStatements.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/snapshot/CassandraSnapshotStatements.scala
index e764a4c..4685534 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/snapshot/CassandraSnapshotStatements.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/snapshot/CassandraSnapshotStatements.scala
@@ -13,11 +13,11 @@
 
 package org.apache.pekko.persistence.cassandra.snapshot
 
-import scala.compat.java8.FutureConverters._
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 
 import org.apache.pekko
+import pekko.util.FutureConverters._
 import pekko.Done
 import pekko.annotation.InternalApi
 import pekko.event.LoggingAdapter
@@ -122,7 +122,7 @@ import pekko.persistence.cassandra.FutureDone
       implicit ec: ExecutionContext): Future[Done] = {
     def keyspace: Future[Done] =
       if (snapshotSettings.keyspaceAutoCreate)
-        session.executeAsync(createKeyspace).toScala.map(_ => Done)
+        session.executeAsync(createKeyspace).asScala.map(_ => Done)
       else FutureDone
 
     if (snapshotSettings.tablesAutoCreate) {
@@ -130,7 +130,7 @@ import pekko.persistence.cassandra.FutureDone
       session.setSchemaMetadataEnabled(false)
       val result = for {
         _ <- keyspace
-        _ <- session.executeAsync(createTable).toScala
+        _ <- session.executeAsync(createTable).asScala
       } yield {
         session.setSchemaMetadataEnabled(null)
         Done
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/snapshot/CassandraSnapshotStore.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/snapshot/CassandraSnapshotStore.scala
index d620196..5cf905e 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/snapshot/CassandraSnapshotStore.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/snapshot/CassandraSnapshotStore.scala
@@ -31,13 +31,14 @@ import pekko.serialization.{ AsyncSerializer, 
Serialization, SerializationExtens
 import pekko.stream.connectors.cassandra.scaladsl.{ CassandraSession, 
CassandraSessionRegistry }
 import pekko.stream.scaladsl.{ Sink, Source }
 import pekko.util.{ unused, OptionVal }
+import pekko.util.FutureConverters._
 
 import java.lang.{ Long => JLong }
 import java.nio.ByteBuffer
 import scala.collection.immutable
 import scala.concurrent.{ ExecutionContext, Future }
-import scala.util.{ Failure, Success }
 import scala.util.control.NonFatal
+import scala.util.{ Failure, Success }
 
 /**
  * INTERNAL API
@@ -243,11 +244,10 @@ import scala.util.control.NonFatal
   }
 
   def executeBatch(body: BatchStatementBuilder => Unit): Future[Unit] = {
-    import scala.compat.java8.FutureConverters._
     val batch =
       new 
BatchStatementBuilder(BatchType.UNLOGGED).setExecutionProfileName(snapshotSettings.writeProfile)
     body(batch)
-    session.underlying().flatMap(_.executeAsync(batch.build()).toScala).map(_ 
=> ())
+    session.underlying().flatMap(_.executeAsync(batch.build()).asScala).map(_ 
=> ())
   }
 
   private def metadata(
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index e81feaa..3a7f6ec 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -17,16 +17,16 @@ object Dependencies {
   val scala3Version = "3.1.2" // not yet enabled - missing 
pekko-http/pekko-management Scala 3 artifacts
   val scalaVersions = Seq(scala212Version, scala213Version)
 
-  val pekkoVersion = System.getProperty("override.pekko.version", 
"0.0.0+26630-2c4d0ee0-SNAPSHOT")
+  val pekkoVersion = System.getProperty("override.pekko.version", 
"0.0.0+26656-898c6970-SNAPSHOT")
   val pekkoVersionInDocs = "current"
   val cassandraVersionInDocs = "4.0"
   // Should be sync with the version of the driver in Pekko Connectors 
Cassandra
   val driverVersionInDocs = "4.6"
 
-  val pekkoConnectorsVersion = "0.0.0+64-20da4165-SNAPSHOT"
+  val pekkoConnectorsVersion = "0.0.0+85-a82f3c3c-SNAPSHOT"
   val pekkoConnectorsVersionInDocs = "current"
   // for example
-  val pekkoManagementVersion = "0.0.0+714-a034fd01-SNAPSHOT"
+  val pekkoManagementVersion = "0.0.0+724-41d3b29c-SNAPSHOT"
 
   val logback = "ch.qos.logback" % "logback-classic" % "1.2.10"
 
@@ -49,6 +49,7 @@ object Dependencies {
     "org.apache.pekko" %% "pekko-connectors-cassandra" % 
pekkoConnectorsVersion,
     "org.apache.pekko" %% "pekko-persistence" % pekkoVersion,
     "org.apache.pekko" %% "pekko-persistence-query" % pekkoVersion,
+    "org.apache.pekko" %% "pekko-stream" % pekkoVersion,
     "org.apache.pekko" %% "pekko-cluster-tools" % pekkoVersion,
     "org.scala-lang.modules" %% "scala-collection-compat" % "2.7.0",
     logback % Test,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to