This is an automated email from the ASF dual-hosted git repository.

mdedetrich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-projection.git


The following commit(s) were added to refs/heads/main by this push:
     new a55714e  Apply compat changes from latest Pekko
a55714e is described below

commit a55714ec6b41efd30d495326166f2abe4e925bd6
Author: Matthew de Detrich <[email protected]>
AuthorDate: Thu May 11 11:06:03 2023 +0200

    Apply compat changes from latest Pekko
---
 .../cassandra/CassandraProjectionTest.java         |  6 +++---
 .../cassandra/CassandraOffsetStoreSpec.scala       |  6 +++---
 .../cassandra/CassandraProjectionSpec.scala        | 10 ++++-----
 .../cassandra/javadsl/CassandraProjection.scala    |  4 ++--
 .../pekko/projection/internal/HandlerAdapter.scala | 24 +++++++++++-----------
 .../internal/SourceProviderAdapter.scala           |  8 ++++----
 .../projection/javadsl/ProjectionManagement.scala  | 16 +++++++--------
 .../state/javadsl/DurableStateSourceProvider.scala | 10 ++++-----
 .../javadsl/EventSourcedProvider.scala             | 10 ++++-----
 .../projection/jdbc/javadsl/JdbcProjection.scala   |  7 +++----
 .../kafka/internal/KafkaSourceProviderImpl.scala   |  6 +++---
 project/Dependencies.scala                         | 16 +++++++--------
 .../testkit/internal/TestOffsetStoreAdapter.scala  | 15 +++++++-------
 .../testkit/internal/TestSourceProviderImpl.scala  |  8 ++++----
 .../testkit/javadsl/TestSourceProvider.scala       |  3 +--
 .../testkit/javadsl/ProjectionTestKitTest.java     |  4 ++--
 16 files changed, 76 insertions(+), 77 deletions(-)

diff --git 
a/cassandra/src/it/java/org/apache/pekko/projection/cassandra/CassandraProjectionTest.java
 
b/cassandra/src/it/java/org/apache/pekko/projection/cassandra/CassandraProjectionTest.java
index 9075332..192cf22 100644
--- 
a/cassandra/src/it/java/org/apache/pekko/projection/cassandra/CassandraProjectionTest.java
+++ 
b/cassandra/src/it/java/org/apache/pekko/projection/cassandra/CassandraProjectionTest.java
@@ -38,9 +38,9 @@ import 
org.apache.pekko.projection.testkit.javadsl.ProjectionTestKit;
 import org.apache.pekko.stream.connectors.cassandra.javadsl.CassandraSession;
 import 
org.apache.pekko.stream.connectors.cassandra.javadsl.CassandraSessionRegistry;
 import org.apache.pekko.stream.javadsl.Source;
+import org.apache.pekko.util.FutureConverters;
 import org.junit.*;
 import org.scalatestplus.junit.JUnitSuite;
-import scala.compat.java8.FutureConverters;
 import scala.concurrent.Await;
 
 import java.time.Duration;
@@ -78,13 +78,13 @@ public class CassandraProjectionTest extends JUnitSuite {
     // we should keep trying to create the table until it succeeds
     CompletionStage<Done> createTableAttempts =
         Patterns.retry(
-            () -> 
FutureConverters.toJava(offsetStore.createKeyspaceAndTable()),
+            () -> 
FutureConverters.asJava(offsetStore.createKeyspaceAndTable()),
             20,
             Duration.ofSeconds(3),
             testKit.system().classicSystem().scheduler(),
             testKit.system().executionContext());
     Await.result(
-        FutureConverters.toScala(createTableAttempts),
+        FutureConverters.asScala(createTableAttempts),
         scala.concurrent.duration.Duration.create(60, TimeUnit.SECONDS));
   }
 
diff --git 
a/cassandra/src/it/scala/org/apache/pekko/projection/cassandra/CassandraOffsetStoreSpec.scala
 
b/cassandra/src/it/scala/org/apache/pekko/projection/cassandra/CassandraOffsetStoreSpec.scala
index b30a3f0..5db8e57 100644
--- 
a/cassandra/src/it/scala/org/apache/pekko/projection/cassandra/CassandraOffsetStoreSpec.scala
+++ 
b/cassandra/src/it/scala/org/apache/pekko/projection/cassandra/CassandraOffsetStoreSpec.scala
@@ -16,7 +16,6 @@ package org.apache.pekko.projection.cassandra
 import java.time.Instant
 import java.util.UUID
 
-import scala.compat.java8.FutureConverters._
 import scala.concurrent.Await
 import scala.concurrent.ExecutionContext
 import scala.concurrent.duration._
@@ -33,6 +32,7 @@ import 
pekko.projection.cassandra.internal.CassandraOffsetStore
 import pekko.projection.internal.ManagementState
 import pekko.projection.testkit.internal.TestClock
 import pekko.stream.connectors.cassandra.scaladsl.CassandraSessionRegistry
+import pekko.util.FutureConverters._
 import org.scalatest.wordspec.AnyWordSpecLike
 
 class CassandraOffsetStoreSpec
@@ -61,9 +61,9 @@ class CassandraOffsetStoreSpec
         s <- session.underlying()
 
         // reason for setSchemaMetadataEnabled is that it speed up tests
-        _ <- s.setSchemaMetadataEnabled(false).toScala
+        _ <- s.setSchemaMetadataEnabled(false).asScala
         _ <- offsetStore.createKeyspaceAndTable()
-        _ <- s.setSchemaMetadataEnabled(null).toScala
+        _ <- s.setSchemaMetadataEnabled(null).asScala
       } yield Done
 
     // the container can takes time to be 'ready',
diff --git 
a/cassandra/src/it/scala/org/apache/pekko/projection/cassandra/CassandraProjectionSpec.scala
 
b/cassandra/src/it/scala/org/apache/pekko/projection/cassandra/CassandraProjectionSpec.scala
index 5faa18d..f478654 100644
--- 
a/cassandra/src/it/scala/org/apache/pekko/projection/cassandra/CassandraProjectionSpec.scala
+++ 
b/cassandra/src/it/scala/org/apache/pekko/projection/cassandra/CassandraProjectionSpec.scala
@@ -19,7 +19,6 @@ import java.util.concurrent.atomic.AtomicReference
 
 import scala.annotation.tailrec
 import scala.collection.immutable
-import scala.compat.java8.FutureConverters._
 import scala.concurrent.Await
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
@@ -60,6 +59,7 @@ import pekko.stream.scaladsl.Source
 import pekko.stream.testkit.TestPublisher
 import pekko.stream.testkit.TestSubscriber
 import pekko.stream.testkit.scaladsl.TestSource
+import pekko.util.FutureConverters._
 import org.scalatest.wordspec.AnyWordSpecLike
 
 object CassandraProjectionSpec {
@@ -161,10 +161,10 @@ class CassandraProjectionSpec
       for {
         s <- session.underlying()
         // reason for setSchemaMetadataEnabled is that it speed up tests
-        _ <- s.setSchemaMetadataEnabled(false).toScala
+        _ <- s.setSchemaMetadataEnabled(false).asScala
         _ <- offsetStore.createKeyspaceAndTable()
         _ <- repository.createKeyspaceAndTable()
-        _ <- s.setSchemaMetadataEnabled(null).toScala
+        _ <- s.setSchemaMetadataEnabled(null).asScala
       } yield Done
 
     // the container can takes time to be 'ready',
@@ -177,10 +177,10 @@ class CassandraProjectionSpec
     Await.ready(for {
         s <- session.underlying()
         // reason for setSchemaMetadataEnabled is that it speed up tests
-        _ <- s.setSchemaMetadataEnabled(false).toScala
+        _ <- s.setSchemaMetadataEnabled(false).asScala
         _ <- session.executeDDL(s"DROP keyspace ${offsetStore.keyspace}")
         _ <- session.executeDDL(s"DROP keyspace ${repository.keyspace}")
-        _ <- s.setSchemaMetadataEnabled(null).toScala
+        _ <- s.setSchemaMetadataEnabled(null).asScala
       } yield Done, 30.seconds)
     super.afterAll()
   }
diff --git 
a/cassandra/src/main/scala/org/apache/pekko/projection/cassandra/javadsl/CassandraProjection.scala
 
b/cassandra/src/main/scala/org/apache/pekko/projection/cassandra/javadsl/CassandraProjection.scala
index bf386cc..2d5f211 100644
--- 
a/cassandra/src/main/scala/org/apache/pekko/projection/cassandra/javadsl/CassandraProjection.scala
+++ 
b/cassandra/src/main/scala/org/apache/pekko/projection/cassandra/javadsl/CassandraProjection.scala
@@ -155,9 +155,9 @@ object CassandraProjection {
    * before the system is started.
    */
   def createTablesIfNotExists(system: ActorSystem[_]): CompletionStage[Done] = 
{
-    import scala.compat.java8.FutureConverters._
+    import pekko.util.FutureConverters._
     val offsetStore = new CassandraOffsetStore(system)
-    offsetStore.createKeyspaceAndTable().toJava
+    offsetStore.createKeyspaceAndTable().asJava
   }
 
   @deprecated("Renamed to createTablesIfNotExists", "1.2.0")
diff --git 
a/core/src/main/scala/org/apache/pekko/projection/internal/HandlerAdapter.scala 
b/core/src/main/scala/org/apache/pekko/projection/internal/HandlerAdapter.scala
index ae0a3bc..5957f44 100644
--- 
a/core/src/main/scala/org/apache/pekko/projection/internal/HandlerAdapter.scala
+++ 
b/core/src/main/scala/org/apache/pekko/projection/internal/HandlerAdapter.scala
@@ -14,7 +14,6 @@
 package org.apache.pekko.projection.internal
 
 import scala.collection.immutable
-import scala.compat.java8.FutureConverters._
 import scala.concurrent.Future
 
 import org.apache.pekko
@@ -23,6 +22,7 @@ import pekko.annotation.InternalApi
 import pekko.projection.javadsl
 import pekko.projection.scaladsl
 import pekko.util.ccompat.JavaConverters._
+import pekko.util.FutureConverters._
 
 /**
  * INTERNAL API
@@ -43,14 +43,14 @@ import pekko.util.ccompat.JavaConverters._
     extends scaladsl.Handler[Envelope] {
 
   override def process(envelope: Envelope): Future[Done] = {
-    delegate.process(envelope).toScala
+    delegate.process(envelope).asScala
   }
 
   override def start(): Future[Done] =
-    delegate.start().toScala
+    delegate.start().asScala
 
   override def stop(): Future[Done] =
-    delegate.stop().toScala
+    delegate.stop().asScala
 
 }
 
@@ -62,14 +62,14 @@ import pekko.util.ccompat.JavaConverters._
     extends scaladsl.Handler[immutable.Seq[Envelope]] {
 
   override def process(envelopes: immutable.Seq[Envelope]): Future[Done] = {
-    delegate.process(envelopes.asJava).toScala
+    delegate.process(envelopes.asJava).asScala
   }
 
   override def start(): Future[Done] =
-    delegate.start().toScala
+    delegate.start().asScala
 
   override def stop(): Future[Done] =
-    delegate.stop().toScala
+    delegate.stop().asScala
 
 }
 
@@ -86,14 +86,14 @@ private[projection] class HandlerLifecycleAdapter(delegate: 
javadsl.HandlerLifec
    * is restarted after a failure.
    */
   override def start(): Future[Done] =
-    delegate.start().toScala
+    delegate.start().asScala
 
   /**
    * Invoked when the projection has been stopped. Can be overridden to 
implement resource
    * cleanup. It is also called when the `Projection` is restarted after a 
failure.
    */
   override def stop(): Future[Done] =
-    delegate.stop().toScala
+    delegate.stop().asScala
 }
 
 /**
@@ -106,12 +106,12 @@ private[projection] class 
HandlerLifecycleAdapter(delegate: javadsl.HandlerLifec
   override private[projection] def behavior = delegate.behavior
 
   override final def process(envelope: Envelope): Future[Done] =
-    delegate.process(getActor(), envelope).toScala
+    delegate.process(getActor(), envelope).asScala
 
   override def start(): Future[Done] =
-    delegate.start().toScala
+    delegate.start().asScala
 
   override def stop(): Future[Done] =
-    delegate.stop().toScala
+    delegate.stop().asScala
 
 }
diff --git 
a/core/src/main/scala/org/apache/pekko/projection/internal/SourceProviderAdapter.scala
 
b/core/src/main/scala/org/apache/pekko/projection/internal/SourceProviderAdapter.scala
index 3d830d2..8f3c033 100644
--- 
a/core/src/main/scala/org/apache/pekko/projection/internal/SourceProviderAdapter.scala
+++ 
b/core/src/main/scala/org/apache/pekko/projection/internal/SourceProviderAdapter.scala
@@ -17,8 +17,6 @@ import java.util.Optional
 import java.util.concurrent.CompletionStage
 import java.util.function.Supplier
 
-import scala.compat.java8.FutureConverters._
-import scala.compat.java8.OptionConverters._
 import scala.concurrent.Future
 
 import org.apache.pekko
@@ -27,6 +25,8 @@ import pekko.annotation.InternalApi
 import pekko.projection.javadsl
 import pekko.projection.scaladsl
 import pekko.stream.scaladsl.Source
+import pekko.util.FutureConverters._
+import pekko.util.OptionConverters._
 
 /**
  * INTERNAL API: Adapter from javadsl.SourceProvider to scaladsl.SourceProvider
@@ -40,9 +40,9 @@ import pekko.stream.scaladsl.Source
     // it _should_ not be used for the blocking operation of getting offsets 
themselves
     val ec = pekko.dispatch.ExecutionContexts.parasitic
     val offsetAdapter = new Supplier[CompletionStage[Optional[Offset]]] {
-      override def get(): CompletionStage[Optional[Offset]] = 
offset().map(_.asJava)(ec).toJava
+      override def get(): CompletionStage[Optional[Offset]] = 
offset().map(_.toJava)(ec).asJava
     }
-    delegate.source(offsetAdapter).toScala.map(_.asScala)(ec)
+    delegate.source(offsetAdapter).asScala.map(_.asScala)(ec)
   }
 
   def extractOffset(envelope: Envelope): Offset = 
delegate.extractOffset(envelope)
diff --git 
a/core/src/main/scala/org/apache/pekko/projection/javadsl/ProjectionManagement.scala
 
b/core/src/main/scala/org/apache/pekko/projection/javadsl/ProjectionManagement.scala
index 92d5919..717b1fb 100644
--- 
a/core/src/main/scala/org/apache/pekko/projection/javadsl/ProjectionManagement.scala
+++ 
b/core/src/main/scala/org/apache/pekko/projection/javadsl/ProjectionManagement.scala
@@ -17,8 +17,6 @@ import java.util.Optional
 import java.util.concurrent.CompletionStage
 
 import scala.concurrent.ExecutionContext
-import scala.compat.java8.FutureConverters._
-import scala.compat.java8.OptionConverters._
 
 import org.apache.pekko
 import pekko.Done
@@ -26,6 +24,8 @@ import pekko.actor.typed.ActorSystem
 import pekko.annotation.ApiMayChange
 import pekko.projection.ProjectionId
 import pekko.projection.scaladsl
+import pekko.util.FutureConverters._
+import pekko.util.OptionConverters._
 
 @ApiMayChange object ProjectionManagement {
   def get(system: ActorSystem[_]): ProjectionManagement = new 
ProjectionManagement(system)
@@ -39,7 +39,7 @@ import pekko.projection.scaladsl
    * Get the latest stored offset for the `projectionId`.
    */
   def getOffset[Offset](projectionId: ProjectionId): 
CompletionStage[Optional[Offset]] =
-    delegate.getOffset[Offset](projectionId).map(_.asJava).toJava
+    delegate.getOffset[Offset](projectionId).map(_.toJava).asJava
 
   /**
    * Update the stored offset for the `projectionId` and restart the 
`Projection`.
@@ -48,7 +48,7 @@ import pekko.projection.scaladsl
    * the next offset that is greater than the stored offset.
    */
   def updateOffset[Offset](projectionId: ProjectionId, offset: Offset): 
CompletionStage[Done] =
-    delegate.updateOffset[Offset](projectionId, offset).toJava
+    delegate.updateOffset[Offset](projectionId, offset).asJava
 
   /**
    * Clear the stored offset for the `projectionId` and restart the 
`Projection`.
@@ -56,13 +56,13 @@ import pekko.projection.scaladsl
    * offset.
    */
   def clearOffset(projectionId: ProjectionId): CompletionStage[Done] =
-    delegate.clearOffset(projectionId).toJava
+    delegate.clearOffset(projectionId).asJava
 
   /**
    * Is the given Projection paused or not?
    */
   def isPaused(projectionId: ProjectionId): CompletionStage[java.lang.Boolean] 
=
-    delegate.isPaused(projectionId).map(java.lang.Boolean.valueOf).toJava
+    delegate.isPaused(projectionId).map(java.lang.Boolean.valueOf).asJava
 
   /**
    * Pause the given Projection. Processing will be stopped.
@@ -74,7 +74,7 @@ import pekko.projection.scaladsl
    * in case of rebalance or system restart.
    */
   def pause(projectionId: ProjectionId): CompletionStage[Done] =
-    delegate.pause(projectionId).toJava
+    delegate.pause(projectionId).asJava
 
   /**
    * Resume a paused Projection. Processing will be start from previously 
stored offset.
@@ -83,6 +83,6 @@ import pekko.projection.scaladsl
    * in case of rebalance or system restart.
    */
   def resume(projectionId: ProjectionId): CompletionStage[Done] =
-    delegate.resume(projectionId).toJava
+    delegate.resume(projectionId).asJava
 
 }
diff --git 
a/durable-state/src/main/scala/org/apache/pekko/projection/state/javadsl/DurableStateSourceProvider.scala
 
b/durable-state/src/main/scala/org/apache/pekko/projection/state/javadsl/DurableStateSourceProvider.scala
index fcf8a22..114e65f 100644
--- 
a/durable-state/src/main/scala/org/apache/pekko/projection/state/javadsl/DurableStateSourceProvider.scala
+++ 
b/durable-state/src/main/scala/org/apache/pekko/projection/state/javadsl/DurableStateSourceProvider.scala
@@ -17,7 +17,6 @@ import java.util.Optional
 import java.util.concurrent.CompletionStage
 import java.util.function.Supplier
 
-import scala.compat.java8.FutureConverters._
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 
@@ -40,6 +39,7 @@ import pekko.projection.BySlicesSourceProvider
 import pekko.projection.javadsl
 import pekko.projection.javadsl.SourceProvider
 import pekko.stream.javadsl.Source
+import pekko.util.FutureConverters._
 
 /**
  * API may change
@@ -67,11 +67,11 @@ object DurableStateSourceProvider {
 
     override def source(offsetAsync: 
Supplier[CompletionStage[Optional[Offset]]])
         : CompletionStage[Source[DurableStateChange[A], NotUsed]] = {
-      val source: Future[Source[DurableStateChange[A], NotUsed]] = 
offsetAsync.get().toScala.map { offsetOpt =>
+      val source: Future[Source[DurableStateChange[A], NotUsed]] = 
offsetAsync.get().asScala.map { offsetOpt =>
         durableStateStoreQuery
           .changes(tag, offsetOpt.orElse(NoOffset))
       }
-      source.toJava
+      source.asJava
     }
 
     override def extractOffset(stateChange: DurableStateChange[A]): Offset = 
stateChange.offset
@@ -126,11 +126,11 @@ object DurableStateSourceProvider {
 
     override def source(offsetAsync: 
Supplier[CompletionStage[Optional[Offset]]])
         : CompletionStage[Source[DurableStateChange[A], NotUsed]] = {
-      val source: Future[Source[DurableStateChange[A], NotUsed]] = 
offsetAsync.get().toScala.map { offsetOpt =>
+      val source: Future[Source[DurableStateChange[A], NotUsed]] = 
offsetAsync.get().asScala.map { offsetOpt =>
         durableStateStoreQuery
           .changesBySlices(entityType, minSlice, maxSlice, 
offsetOpt.orElse(NoOffset))
       }
-      source.toJava
+      source.asJava
     }
 
     override def extractOffset(stateChange: DurableStateChange[A]): Offset = 
stateChange.offset
diff --git 
a/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/javadsl/EventSourcedProvider.scala
 
b/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/javadsl/EventSourcedProvider.scala
index 9bf7664..0fa5c53 100644
--- 
a/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/javadsl/EventSourcedProvider.scala
+++ 
b/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/javadsl/EventSourcedProvider.scala
@@ -19,7 +19,6 @@ import java.util.concurrent.CompletableFuture
 import java.util.concurrent.CompletionStage
 import java.util.function.Supplier
 
-import scala.compat.java8.FutureConverters._
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 
@@ -41,6 +40,7 @@ import pekko.projection.eventsourced.EventEnvelope
 import pekko.projection.javadsl
 import pekko.projection.javadsl.SourceProvider
 import pekko.stream.javadsl.Source
+import pekko.util.FutureConverters._
 
 @ApiMayChange
 object EventSourcedProvider {
@@ -69,12 +69,12 @@ object EventSourcedProvider {
 
     override def source(offsetAsync: 
Supplier[CompletionStage[Optional[Offset]]])
         : CompletionStage[Source[EventEnvelope[Event], NotUsed]] = {
-      val source: Future[Source[EventEnvelope[Event], NotUsed]] = 
offsetAsync.get().toScala.map { offsetOpt =>
+      val source: Future[Source[EventEnvelope[Event], NotUsed]] = 
offsetAsync.get().asScala.map { offsetOpt =>
         eventsByTagQuery
           .eventsByTag(tag, offsetOpt.orElse(NoOffset))
           .map(env => EventEnvelope(env))
       }
-      source.toJava
+      source.asJava
     }
 
     override def extractOffset(envelope: EventEnvelope[Event]): Offset = 
envelope.offset
@@ -137,11 +137,11 @@ object EventSourcedProvider {
     override def source(offsetAsync: 
Supplier[CompletionStage[Optional[Offset]]])
         : 
CompletionStage[Source[pekko.persistence.query.typed.EventEnvelope[Event], 
NotUsed]] = {
       val source: 
Future[Source[pekko.persistence.query.typed.EventEnvelope[Event], NotUsed]] =
-        offsetAsync.get().toScala.map { offsetOpt =>
+        offsetAsync.get().asScala.map { offsetOpt =>
           eventsBySlicesQuery
             .eventsBySlices(entityType, minSlice, maxSlice, 
offsetOpt.orElse(NoOffset))
         }
-      source.toJava
+      source.asJava
     }
 
     override def extractOffset(envelope: 
pekko.persistence.query.typed.EventEnvelope[Event]): Offset = envelope.offset
diff --git 
a/jdbc/src/main/scala/org/apache/pekko/projection/jdbc/javadsl/JdbcProjection.scala
 
b/jdbc/src/main/scala/org/apache/pekko/projection/jdbc/javadsl/JdbcProjection.scala
index e42524d..f67a1fc 100644
--- 
a/jdbc/src/main/scala/org/apache/pekko/projection/jdbc/javadsl/JdbcProjection.scala
+++ 
b/jdbc/src/main/scala/org/apache/pekko/projection/jdbc/javadsl/JdbcProjection.scala
@@ -16,8 +16,6 @@ package org.apache.pekko.projection.jdbc.javadsl
 import java.util.concurrent.CompletionStage
 import java.util.function.Supplier
 
-import scala.compat.java8.FutureConverters._
-
 import org.apache.pekko
 import pekko.Done
 import pekko.actor.typed.ActorSystem
@@ -44,6 +42,7 @@ import 
pekko.projection.jdbc.internal.GroupedJdbcHandlerAdapter
 import pekko.projection.jdbc.internal.JdbcHandlerAdapter
 import pekko.projection.jdbc.internal.JdbcProjectionImpl
 import pekko.stream.javadsl.FlowWithContext
+import pekko.util.FutureConverters._
 
 @ApiMayChange
 object JdbcProjection {
@@ -293,7 +292,7 @@ object JdbcProjection {
   def createTablesIfNotExists[S <: JdbcSession](
       sessionFactory: Supplier[S],
       system: ActorSystem[_]): CompletionStage[Done] =
-    JdbcProjectionImpl.createOffsetStore(() => 
sessionFactory.get())(system).createIfNotExists().toJava
+    JdbcProjectionImpl.createOffsetStore(() => 
sessionFactory.get())(system).createIfNotExists().asJava
 
   @deprecated("Renamed to createTablesIfNotExists", "1.2.0")
   def createOffsetTableIfNotExists[S <: JdbcSession](
@@ -305,7 +304,7 @@ object JdbcProjection {
    * For testing purposes the projection offset and management tables can be 
dropped programmatically.
    */
   def dropTablesIfExists[S <: JdbcSession](sessionFactory: Supplier[S], 
system: ActorSystem[_]): CompletionStage[Done] =
-    JdbcProjectionImpl.createOffsetStore(() => 
sessionFactory.get())(system).dropIfExists().toJava
+    JdbcProjectionImpl.createOffsetStore(() => 
sessionFactory.get())(system).dropIfExists().asJava
 
   @deprecated("Renamed to dropTablesIfExists", "1.2.0")
   def dropOffsetTableIfExists[S <: JdbcSession](
diff --git 
a/kafka/src/main/scala/org/apache/pekko/projection/kafka/internal/KafkaSourceProviderImpl.scala
 
b/kafka/src/main/scala/org/apache/pekko/projection/kafka/internal/KafkaSourceProviderImpl.scala
index e22edb9..b40cdac 100644
--- 
a/kafka/src/main/scala/org/apache/pekko/projection/kafka/internal/KafkaSourceProviderImpl.scala
+++ 
b/kafka/src/main/scala/org/apache/pekko/projection/kafka/internal/KafkaSourceProviderImpl.scala
@@ -18,8 +18,6 @@ import java.util.Optional
 import java.util.concurrent.CompletionStage
 import java.util.function.Supplier
 
-import scala.compat.java8.FutureConverters._
-import scala.compat.java8.OptionConverters._
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 
@@ -42,6 +40,8 @@ import pekko.projection.kafka.KafkaOffsets.keyToPartition
 import pekko.projection.scaladsl
 import pekko.stream.scaladsl.Keep
 import pekko.stream.scaladsl.Source
+import pekko.util.FutureConverters._
+import pekko.util.OptionConverters._
 import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record.TimestampType
@@ -115,7 +115,7 @@ import org.apache.kafka.common.record.TimestampType
 
   override def source(readOffsets: 
Supplier[CompletionStage[Optional[MergeableOffset[JLong]]]])
       : CompletionStage[pekko.stream.javadsl.Source[ConsumerRecord[K, V], 
NotUsed]] = {
-    source(() => readOffsets.get().toScala.map(_.asScala)).map(_.asJava).toJava
+    source(() => readOffsets.get().asScala.map(_.toScala)).map(_.asJava).asJava
   }
 
   override def extractOffset(record: ConsumerRecord[K, V]): 
MergeableOffset[JLong] =
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 9a6375c..b81996b 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -18,15 +18,15 @@ object Dependencies {
   val Scala212 = "2.12.17"
   val ScalaVersions = Seq(Scala213, Scala212)
 
-  val PekkoVersionInDocs = "0.0.0+26623-85c2a469-SNAPSHOT"
-  val ConnectorsVersionInDocs = "0.0.0+60-c7dd80e0-SNAPSHOT"
-  val ConnectorsKafkaVersionInDocs = "0.0.0+1717-267012de-SNAPSHOT"
+  val PekkoVersionInDocs = "0.0.0+26656-898c6970-SNAPSHOT"
+  val ConnectorsVersionInDocs = "0.0.0+85-a82f3c3c-SNAPSHOT"
+  val ConnectorsKafkaVersionInDocs = "0.0.0+1728-e2c660ef-SNAPSHOT"
 
   object Versions {
-    val pekko = sys.props.getOrElse("build.pekko.version", 
"0.0.0+26630-2c4d0ee0-SNAPSHOT")
-    val pekkoPersistenceJdbc = "0.0.0+958-db5733e6-SNAPSHOT"
-    val connectors = "0.0.0+64-20da4165-SNAPSHOT"
-    val connectorsKafka = 
sys.props.getOrElse("build.connectors.kafka.version", 
"0.0.0+1719-2dce1c20-SNAPSHOT")
+    val pekko = sys.props.getOrElse("build.pekko.version", 
"0.0.0+26656-898c6970-SNAPSHOT")
+    val pekkoPersistenceJdbc = "0.0.0+966-e6f717eb-SNAPSHOT"
+    val connectors = "0.0.0+85-a82f3c3c-SNAPSHOT"
+    val connectorsKafka = 
sys.props.getOrElse("build.connectors.kafka.version", 
"0.0.0+1728-e2c660ef-SNAPSHOT")
     val slick = "3.3.3"
     val scalaTest = "3.1.1"
     val testContainers = "1.15.3"
@@ -100,7 +100,7 @@ object Dependencies {
 
     val pekkoPersistenceTyped = "org.apache.pekko" %% 
"pekko-persistence-typed" % Versions.pekko
     val pekkoClusterShardingTyped = "org.apache.pekko" %% 
"pekko-cluster-sharding-typed" % Versions.pekko
-    val pekkoPersistenceCassandra = "org.apache.pekko" %% 
"pekko-persistence-cassandra" % "0.0.0-1072-7c02b521-SNAPSHOT"
+    val pekkoPersistenceCassandra = "org.apache.pekko" %% 
"pekko-persistence-cassandra" % "0.0.0-1079-c30f82ae-SNAPSHOT"
     val pekkoPersistenceJdbc = "org.apache.pekko" %% "pekko-persistence-jdbc" 
% Versions.pekkoPersistenceJdbc
     val pekkoSerializationJackson = "org.apache.pekko" %% 
"pekko-serialization-jackson" % Versions.pekko
   }
diff --git 
a/testkit/src/main/scala/org/apache/pekko/projection/testkit/internal/TestOffsetStoreAdapter.scala
 
b/testkit/src/main/scala/org/apache/pekko/projection/testkit/internal/TestOffsetStoreAdapter.scala
index 643db4c..1ff6483 100644
--- 
a/testkit/src/main/scala/org/apache/pekko/projection/testkit/internal/TestOffsetStoreAdapter.scala
+++ 
b/testkit/src/main/scala/org/apache/pekko/projection/testkit/internal/TestOffsetStoreAdapter.scala
@@ -14,8 +14,6 @@
 package org.apache.pekko.projection.testkit.internal
 
 import scala.jdk.CollectionConverters._
-import scala.compat.java8.OptionConverters._
-import scala.compat.java8.FutureConverters._
 import scala.concurrent.Future
 
 import org.apache.pekko
@@ -24,27 +22,30 @@ import pekko.annotation.InternalApi
 import pekko.projection.ProjectionId
 import pekko.projection.internal.ManagementState
 import pekko.projection.testkit.scaladsl.TestOffsetStore
+import pekko.util.OptionConverters._
+import pekko.util.FutureConverters._
+
 @InternalApi private[projection] class TestOffsetStoreAdapter[Offset](
     delegate: pekko.projection.testkit.javadsl.TestOffsetStore[Offset])
     extends TestOffsetStore[Offset] {
 
-  override def lastOffset(): Option[Offset] = delegate.lastOffset().asScala
+  override def lastOffset(): Option[Offset] = delegate.lastOffset().toScala
 
   override def allOffsets(): List[(ProjectionId, Offset)] = 
delegate.allOffsets().asScala.map(_.toScala).toList
 
   override def readOffsets(): Future[Option[Offset]] = {
     implicit val ec = pekko.dispatch.ExecutionContexts.parasitic
-    delegate.readOffsets().toScala.map(_.asScala)
+    delegate.readOffsets().asScala.map(_.toScala)
   }
 
   override def saveOffset(projectionId: ProjectionId, offset: Offset): 
Future[Done] =
-    delegate.saveOffset(projectionId, offset).toScala
+    delegate.saveOffset(projectionId, offset).asScala
 
   override def readManagementState(projectionId: ProjectionId): 
Future[Option[ManagementState]] = {
     implicit val ec = pekko.dispatch.ExecutionContexts.parasitic
-    delegate.readManagementState(projectionId).toScala.map(_.asScala)
+    delegate.readManagementState(projectionId).asScala.map(_.toScala)
   }
 
   override def savePaused(projectionId: ProjectionId, paused: Boolean): 
Future[Done] =
-    delegate.savePaused(projectionId, paused).toScala
+    delegate.savePaused(projectionId, paused).asScala
 }
diff --git 
a/testkit/src/main/scala/org/apache/pekko/projection/testkit/internal/TestSourceProviderImpl.scala
 
b/testkit/src/main/scala/org/apache/pekko/projection/testkit/internal/TestSourceProviderImpl.scala
index ff3093e..fbe1c86 100644
--- 
a/testkit/src/main/scala/org/apache/pekko/projection/testkit/internal/TestSourceProviderImpl.scala
+++ 
b/testkit/src/main/scala/org/apache/pekko/projection/testkit/internal/TestSourceProviderImpl.scala
@@ -17,9 +17,6 @@ import java.util.Optional
 import java.util.concurrent.CompletionStage
 import java.util.function.Supplier
 
-import scala.compat.java8.FunctionConverters._
-import scala.compat.java8.FutureConverters._
-import scala.compat.java8.OptionConverters._
 import scala.concurrent.Future
 
 import org.apache.pekko
@@ -28,6 +25,9 @@ import pekko.annotation.InternalApi
 import pekko.projection.OffsetVerification
 import pekko.projection.testkit.scaladsl.TestSourceProvider
 import pekko.stream.scaladsl.Source
+import pekko.util.FunctionConverters._
+import pekko.util.FutureConverters._
+import pekko.util.OptionConverters._
 
 /**
  * INTERNAL API
@@ -100,7 +100,7 @@ private[projection] class TestSourceProviderImpl[Offset, 
Envelope] private[proje
   override def source(offset: Supplier[CompletionStage[Optional[Offset]]])
       : CompletionStage[pekko.stream.javadsl.Source[Envelope, NotUsed]] = {
     implicit val ec = pekko.dispatch.ExecutionContexts.parasitic
-    source(() => offset.get().toScala.map(_.asScala)).map(_.asJava).toJava
+    source(() => offset.get().asScala.map(_.toScala)).map(_.asJava).asJava
   }
 
   override def extractOffset(envelope: Envelope): Offset = 
extractOffsetFn(envelope)
diff --git 
a/testkit/src/main/scala/org/apache/pekko/projection/testkit/javadsl/TestSourceProvider.scala
 
b/testkit/src/main/scala/org/apache/pekko/projection/testkit/javadsl/TestSourceProvider.scala
index b89a1db..b215d96 100644
--- 
a/testkit/src/main/scala/org/apache/pekko/projection/testkit/javadsl/TestSourceProvider.scala
+++ 
b/testkit/src/main/scala/org/apache/pekko/projection/testkit/javadsl/TestSourceProvider.scala
@@ -13,14 +13,13 @@
 
 package org.apache.pekko.projection.testkit.javadsl
 
-import scala.compat.java8.FunctionConverters._
-
 import org.apache.pekko
 import pekko.NotUsed
 import pekko.annotation.ApiMayChange
 import pekko.projection.OffsetVerification
 import pekko.projection.javadsl.VerifiableSourceProvider
 import pekko.projection.testkit.internal.TestSourceProviderImpl
+import pekko.util.FunctionConverters._
 
 @ApiMayChange
 object TestSourceProvider {
diff --git 
a/testkit/src/test/java/org/apache/pekko/projection/testkit/javadsl/ProjectionTestKitTest.java
 
b/testkit/src/test/java/org/apache/pekko/projection/testkit/javadsl/ProjectionTestKitTest.java
index 0173a0d..05f816d 100644
--- 
a/testkit/src/test/java/org/apache/pekko/projection/testkit/javadsl/ProjectionTestKitTest.java
+++ 
b/testkit/src/test/java/org/apache/pekko/projection/testkit/javadsl/ProjectionTestKitTest.java
@@ -31,13 +31,13 @@ import org.apache.pekko.stream.SharedKillSwitch;
 import org.apache.pekko.stream.javadsl.DelayStrategy;
 import org.apache.pekko.stream.javadsl.Sink;
 import org.apache.pekko.stream.javadsl.Source;
+import org.apache.pekko.util.FutureConverters;
 import org.junit.Assert;
 import org.junit.ClassRule;
 import org.junit.ComparisonFailure;
 import org.junit.Test;
 import org.scalatestplus.junit.JUnitSuite;
 import scala.Option;
-import scala.compat.java8.FutureConverters;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -285,7 +285,7 @@ public class ProjectionTestKitTest extends JUnitSuite {
           ActorSystem<?> system) {
         this.killSwitch = killSwitch;
         CompletionStage<Done> done = source.asJava().runWith(Sink.ignore(), 
system);
-        this.futureDone = FutureConverters.toScala(done);
+        this.futureDone = FutureConverters.asScala(done);
       }
 
       @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to