This is an automated email from the ASF dual-hosted git repository.
mdedetrich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-projection.git
The following commit(s) were added to refs/heads/main by this push:
new a55714e Apply compat changes from latest Pekko
a55714e is described below
commit a55714ec6b41efd30d495326166f2abe4e925bd6
Author: Matthew de Detrich <[email protected]>
AuthorDate: Thu May 11 11:06:03 2023 +0200
Apply compat changes from latest Pekko
---
.../cassandra/CassandraProjectionTest.java | 6 +++---
.../cassandra/CassandraOffsetStoreSpec.scala | 6 +++---
.../cassandra/CassandraProjectionSpec.scala | 10 ++++-----
.../cassandra/javadsl/CassandraProjection.scala | 4 ++--
.../pekko/projection/internal/HandlerAdapter.scala | 24 +++++++++++-----------
.../internal/SourceProviderAdapter.scala | 8 ++++----
.../projection/javadsl/ProjectionManagement.scala | 16 +++++++--------
.../state/javadsl/DurableStateSourceProvider.scala | 10 ++++-----
.../javadsl/EventSourcedProvider.scala | 10 ++++-----
.../projection/jdbc/javadsl/JdbcProjection.scala | 7 +++----
.../kafka/internal/KafkaSourceProviderImpl.scala | 6 +++---
project/Dependencies.scala | 16 +++++++--------
.../testkit/internal/TestOffsetStoreAdapter.scala | 15 +++++++-------
.../testkit/internal/TestSourceProviderImpl.scala | 8 ++++----
.../testkit/javadsl/TestSourceProvider.scala | 3 +--
.../testkit/javadsl/ProjectionTestKitTest.java | 4 ++--
16 files changed, 76 insertions(+), 77 deletions(-)
diff --git
a/cassandra/src/it/java/org/apache/pekko/projection/cassandra/CassandraProjectionTest.java
b/cassandra/src/it/java/org/apache/pekko/projection/cassandra/CassandraProjectionTest.java
index 9075332..192cf22 100644
---
a/cassandra/src/it/java/org/apache/pekko/projection/cassandra/CassandraProjectionTest.java
+++
b/cassandra/src/it/java/org/apache/pekko/projection/cassandra/CassandraProjectionTest.java
@@ -38,9 +38,9 @@ import
org.apache.pekko.projection.testkit.javadsl.ProjectionTestKit;
import org.apache.pekko.stream.connectors.cassandra.javadsl.CassandraSession;
import
org.apache.pekko.stream.connectors.cassandra.javadsl.CassandraSessionRegistry;
import org.apache.pekko.stream.javadsl.Source;
+import org.apache.pekko.util.FutureConverters;
import org.junit.*;
import org.scalatestplus.junit.JUnitSuite;
-import scala.compat.java8.FutureConverters;
import scala.concurrent.Await;
import java.time.Duration;
@@ -78,13 +78,13 @@ public class CassandraProjectionTest extends JUnitSuite {
// we should keep trying to create the table until it succeeds
CompletionStage<Done> createTableAttempts =
Patterns.retry(
- () ->
FutureConverters.toJava(offsetStore.createKeyspaceAndTable()),
+ () ->
FutureConverters.asJava(offsetStore.createKeyspaceAndTable()),
20,
Duration.ofSeconds(3),
testKit.system().classicSystem().scheduler(),
testKit.system().executionContext());
Await.result(
- FutureConverters.toScala(createTableAttempts),
+ FutureConverters.asScala(createTableAttempts),
scala.concurrent.duration.Duration.create(60, TimeUnit.SECONDS));
}
diff --git
a/cassandra/src/it/scala/org/apache/pekko/projection/cassandra/CassandraOffsetStoreSpec.scala
b/cassandra/src/it/scala/org/apache/pekko/projection/cassandra/CassandraOffsetStoreSpec.scala
index b30a3f0..5db8e57 100644
---
a/cassandra/src/it/scala/org/apache/pekko/projection/cassandra/CassandraOffsetStoreSpec.scala
+++
b/cassandra/src/it/scala/org/apache/pekko/projection/cassandra/CassandraOffsetStoreSpec.scala
@@ -16,7 +16,6 @@ package org.apache.pekko.projection.cassandra
import java.time.Instant
import java.util.UUID
-import scala.compat.java8.FutureConverters._
import scala.concurrent.Await
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
@@ -33,6 +32,7 @@ import
pekko.projection.cassandra.internal.CassandraOffsetStore
import pekko.projection.internal.ManagementState
import pekko.projection.testkit.internal.TestClock
import pekko.stream.connectors.cassandra.scaladsl.CassandraSessionRegistry
+import pekko.util.FutureConverters._
import org.scalatest.wordspec.AnyWordSpecLike
class CassandraOffsetStoreSpec
@@ -61,9 +61,9 @@ class CassandraOffsetStoreSpec
s <- session.underlying()
// reason for setSchemaMetadataEnabled is that it speed up tests
- _ <- s.setSchemaMetadataEnabled(false).toScala
+ _ <- s.setSchemaMetadataEnabled(false).asScala
_ <- offsetStore.createKeyspaceAndTable()
- _ <- s.setSchemaMetadataEnabled(null).toScala
+ _ <- s.setSchemaMetadataEnabled(null).asScala
} yield Done
// the container can takes time to be 'ready',
diff --git
a/cassandra/src/it/scala/org/apache/pekko/projection/cassandra/CassandraProjectionSpec.scala
b/cassandra/src/it/scala/org/apache/pekko/projection/cassandra/CassandraProjectionSpec.scala
index 5faa18d..f478654 100644
---
a/cassandra/src/it/scala/org/apache/pekko/projection/cassandra/CassandraProjectionSpec.scala
+++
b/cassandra/src/it/scala/org/apache/pekko/projection/cassandra/CassandraProjectionSpec.scala
@@ -19,7 +19,6 @@ import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
import scala.collection.immutable
-import scala.compat.java8.FutureConverters._
import scala.concurrent.Await
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
@@ -60,6 +59,7 @@ import pekko.stream.scaladsl.Source
import pekko.stream.testkit.TestPublisher
import pekko.stream.testkit.TestSubscriber
import pekko.stream.testkit.scaladsl.TestSource
+import pekko.util.FutureConverters._
import org.scalatest.wordspec.AnyWordSpecLike
object CassandraProjectionSpec {
@@ -161,10 +161,10 @@ class CassandraProjectionSpec
for {
s <- session.underlying()
// reason for setSchemaMetadataEnabled is that it speed up tests
- _ <- s.setSchemaMetadataEnabled(false).toScala
+ _ <- s.setSchemaMetadataEnabled(false).asScala
_ <- offsetStore.createKeyspaceAndTable()
_ <- repository.createKeyspaceAndTable()
- _ <- s.setSchemaMetadataEnabled(null).toScala
+ _ <- s.setSchemaMetadataEnabled(null).asScala
} yield Done
// the container can takes time to be 'ready',
@@ -177,10 +177,10 @@ class CassandraProjectionSpec
Await.ready(for {
s <- session.underlying()
// reason for setSchemaMetadataEnabled is that it speed up tests
- _ <- s.setSchemaMetadataEnabled(false).toScala
+ _ <- s.setSchemaMetadataEnabled(false).asScala
_ <- session.executeDDL(s"DROP keyspace ${offsetStore.keyspace}")
_ <- session.executeDDL(s"DROP keyspace ${repository.keyspace}")
- _ <- s.setSchemaMetadataEnabled(null).toScala
+ _ <- s.setSchemaMetadataEnabled(null).asScala
} yield Done, 30.seconds)
super.afterAll()
}
diff --git
a/cassandra/src/main/scala/org/apache/pekko/projection/cassandra/javadsl/CassandraProjection.scala
b/cassandra/src/main/scala/org/apache/pekko/projection/cassandra/javadsl/CassandraProjection.scala
index bf386cc..2d5f211 100644
---
a/cassandra/src/main/scala/org/apache/pekko/projection/cassandra/javadsl/CassandraProjection.scala
+++
b/cassandra/src/main/scala/org/apache/pekko/projection/cassandra/javadsl/CassandraProjection.scala
@@ -155,9 +155,9 @@ object CassandraProjection {
* before the system is started.
*/
def createTablesIfNotExists(system: ActorSystem[_]): CompletionStage[Done] =
{
- import scala.compat.java8.FutureConverters._
+ import pekko.util.FutureConverters._
val offsetStore = new CassandraOffsetStore(system)
- offsetStore.createKeyspaceAndTable().toJava
+ offsetStore.createKeyspaceAndTable().asJava
}
@deprecated("Renamed to createTablesIfNotExists", "1.2.0")
diff --git
a/core/src/main/scala/org/apache/pekko/projection/internal/HandlerAdapter.scala
b/core/src/main/scala/org/apache/pekko/projection/internal/HandlerAdapter.scala
index ae0a3bc..5957f44 100644
---
a/core/src/main/scala/org/apache/pekko/projection/internal/HandlerAdapter.scala
+++
b/core/src/main/scala/org/apache/pekko/projection/internal/HandlerAdapter.scala
@@ -14,7 +14,6 @@
package org.apache.pekko.projection.internal
import scala.collection.immutable
-import scala.compat.java8.FutureConverters._
import scala.concurrent.Future
import org.apache.pekko
@@ -23,6 +22,7 @@ import pekko.annotation.InternalApi
import pekko.projection.javadsl
import pekko.projection.scaladsl
import pekko.util.ccompat.JavaConverters._
+import pekko.util.FutureConverters._
/**
* INTERNAL API
@@ -43,14 +43,14 @@ import pekko.util.ccompat.JavaConverters._
extends scaladsl.Handler[Envelope] {
override def process(envelope: Envelope): Future[Done] = {
- delegate.process(envelope).toScala
+ delegate.process(envelope).asScala
}
override def start(): Future[Done] =
- delegate.start().toScala
+ delegate.start().asScala
override def stop(): Future[Done] =
- delegate.stop().toScala
+ delegate.stop().asScala
}
@@ -62,14 +62,14 @@ import pekko.util.ccompat.JavaConverters._
extends scaladsl.Handler[immutable.Seq[Envelope]] {
override def process(envelopes: immutable.Seq[Envelope]): Future[Done] = {
- delegate.process(envelopes.asJava).toScala
+ delegate.process(envelopes.asJava).asScala
}
override def start(): Future[Done] =
- delegate.start().toScala
+ delegate.start().asScala
override def stop(): Future[Done] =
- delegate.stop().toScala
+ delegate.stop().asScala
}
@@ -86,14 +86,14 @@ private[projection] class HandlerLifecycleAdapter(delegate:
javadsl.HandlerLifec
* is restarted after a failure.
*/
override def start(): Future[Done] =
- delegate.start().toScala
+ delegate.start().asScala
/**
* Invoked when the projection has been stopped. Can be overridden to
implement resource
* cleanup. It is also called when the `Projection` is restarted after a
failure.
*/
override def stop(): Future[Done] =
- delegate.stop().toScala
+ delegate.stop().asScala
}
/**
@@ -106,12 +106,12 @@ private[projection] class
HandlerLifecycleAdapter(delegate: javadsl.HandlerLifec
override private[projection] def behavior = delegate.behavior
override final def process(envelope: Envelope): Future[Done] =
- delegate.process(getActor(), envelope).toScala
+ delegate.process(getActor(), envelope).asScala
override def start(): Future[Done] =
- delegate.start().toScala
+ delegate.start().asScala
override def stop(): Future[Done] =
- delegate.stop().toScala
+ delegate.stop().asScala
}
diff --git
a/core/src/main/scala/org/apache/pekko/projection/internal/SourceProviderAdapter.scala
b/core/src/main/scala/org/apache/pekko/projection/internal/SourceProviderAdapter.scala
index 3d830d2..8f3c033 100644
---
a/core/src/main/scala/org/apache/pekko/projection/internal/SourceProviderAdapter.scala
+++
b/core/src/main/scala/org/apache/pekko/projection/internal/SourceProviderAdapter.scala
@@ -17,8 +17,6 @@ import java.util.Optional
import java.util.concurrent.CompletionStage
import java.util.function.Supplier
-import scala.compat.java8.FutureConverters._
-import scala.compat.java8.OptionConverters._
import scala.concurrent.Future
import org.apache.pekko
@@ -27,6 +25,8 @@ import pekko.annotation.InternalApi
import pekko.projection.javadsl
import pekko.projection.scaladsl
import pekko.stream.scaladsl.Source
+import pekko.util.FutureConverters._
+import pekko.util.OptionConverters._
/**
* INTERNAL API: Adapter from javadsl.SourceProvider to scaladsl.SourceProvider
@@ -40,9 +40,9 @@ import pekko.stream.scaladsl.Source
// it _should_ not be used for the blocking operation of getting offsets
themselves
val ec = pekko.dispatch.ExecutionContexts.parasitic
val offsetAdapter = new Supplier[CompletionStage[Optional[Offset]]] {
- override def get(): CompletionStage[Optional[Offset]] =
offset().map(_.asJava)(ec).toJava
+ override def get(): CompletionStage[Optional[Offset]] =
offset().map(_.toJava)(ec).asJava
}
- delegate.source(offsetAdapter).toScala.map(_.asScala)(ec)
+ delegate.source(offsetAdapter).asScala.map(_.asScala)(ec)
}
def extractOffset(envelope: Envelope): Offset =
delegate.extractOffset(envelope)
diff --git
a/core/src/main/scala/org/apache/pekko/projection/javadsl/ProjectionManagement.scala
b/core/src/main/scala/org/apache/pekko/projection/javadsl/ProjectionManagement.scala
index 92d5919..717b1fb 100644
---
a/core/src/main/scala/org/apache/pekko/projection/javadsl/ProjectionManagement.scala
+++
b/core/src/main/scala/org/apache/pekko/projection/javadsl/ProjectionManagement.scala
@@ -17,8 +17,6 @@ import java.util.Optional
import java.util.concurrent.CompletionStage
import scala.concurrent.ExecutionContext
-import scala.compat.java8.FutureConverters._
-import scala.compat.java8.OptionConverters._
import org.apache.pekko
import pekko.Done
@@ -26,6 +24,8 @@ import pekko.actor.typed.ActorSystem
import pekko.annotation.ApiMayChange
import pekko.projection.ProjectionId
import pekko.projection.scaladsl
+import pekko.util.FutureConverters._
+import pekko.util.OptionConverters._
@ApiMayChange object ProjectionManagement {
def get(system: ActorSystem[_]): ProjectionManagement = new
ProjectionManagement(system)
@@ -39,7 +39,7 @@ import pekko.projection.scaladsl
* Get the latest stored offset for the `projectionId`.
*/
def getOffset[Offset](projectionId: ProjectionId):
CompletionStage[Optional[Offset]] =
- delegate.getOffset[Offset](projectionId).map(_.asJava).toJava
+ delegate.getOffset[Offset](projectionId).map(_.toJava).asJava
/**
* Update the stored offset for the `projectionId` and restart the
`Projection`.
@@ -48,7 +48,7 @@ import pekko.projection.scaladsl
* the next offset that is greater than the stored offset.
*/
def updateOffset[Offset](projectionId: ProjectionId, offset: Offset):
CompletionStage[Done] =
- delegate.updateOffset[Offset](projectionId, offset).toJava
+ delegate.updateOffset[Offset](projectionId, offset).asJava
/**
* Clear the stored offset for the `projectionId` and restart the
`Projection`.
@@ -56,13 +56,13 @@ import pekko.projection.scaladsl
* offset.
*/
def clearOffset(projectionId: ProjectionId): CompletionStage[Done] =
- delegate.clearOffset(projectionId).toJava
+ delegate.clearOffset(projectionId).asJava
/**
* Is the given Projection paused or not?
*/
def isPaused(projectionId: ProjectionId): CompletionStage[java.lang.Boolean]
=
- delegate.isPaused(projectionId).map(java.lang.Boolean.valueOf).toJava
+ delegate.isPaused(projectionId).map(java.lang.Boolean.valueOf).asJava
/**
* Pause the given Projection. Processing will be stopped.
@@ -74,7 +74,7 @@ import pekko.projection.scaladsl
* in case of rebalance or system restart.
*/
def pause(projectionId: ProjectionId): CompletionStage[Done] =
- delegate.pause(projectionId).toJava
+ delegate.pause(projectionId).asJava
/**
* Resume a paused Projection. Processing will be start from previously
stored offset.
@@ -83,6 +83,6 @@ import pekko.projection.scaladsl
* in case of rebalance or system restart.
*/
def resume(projectionId: ProjectionId): CompletionStage[Done] =
- delegate.resume(projectionId).toJava
+ delegate.resume(projectionId).asJava
}
diff --git
a/durable-state/src/main/scala/org/apache/pekko/projection/state/javadsl/DurableStateSourceProvider.scala
b/durable-state/src/main/scala/org/apache/pekko/projection/state/javadsl/DurableStateSourceProvider.scala
index fcf8a22..114e65f 100644
---
a/durable-state/src/main/scala/org/apache/pekko/projection/state/javadsl/DurableStateSourceProvider.scala
+++
b/durable-state/src/main/scala/org/apache/pekko/projection/state/javadsl/DurableStateSourceProvider.scala
@@ -17,7 +17,6 @@ import java.util.Optional
import java.util.concurrent.CompletionStage
import java.util.function.Supplier
-import scala.compat.java8.FutureConverters._
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
@@ -40,6 +39,7 @@ import pekko.projection.BySlicesSourceProvider
import pekko.projection.javadsl
import pekko.projection.javadsl.SourceProvider
import pekko.stream.javadsl.Source
+import pekko.util.FutureConverters._
/**
* API may change
@@ -67,11 +67,11 @@ object DurableStateSourceProvider {
override def source(offsetAsync:
Supplier[CompletionStage[Optional[Offset]]])
: CompletionStage[Source[DurableStateChange[A], NotUsed]] = {
- val source: Future[Source[DurableStateChange[A], NotUsed]] =
offsetAsync.get().toScala.map { offsetOpt =>
+ val source: Future[Source[DurableStateChange[A], NotUsed]] =
offsetAsync.get().asScala.map { offsetOpt =>
durableStateStoreQuery
.changes(tag, offsetOpt.orElse(NoOffset))
}
- source.toJava
+ source.asJava
}
override def extractOffset(stateChange: DurableStateChange[A]): Offset =
stateChange.offset
@@ -126,11 +126,11 @@ object DurableStateSourceProvider {
override def source(offsetAsync:
Supplier[CompletionStage[Optional[Offset]]])
: CompletionStage[Source[DurableStateChange[A], NotUsed]] = {
- val source: Future[Source[DurableStateChange[A], NotUsed]] =
offsetAsync.get().toScala.map { offsetOpt =>
+ val source: Future[Source[DurableStateChange[A], NotUsed]] =
offsetAsync.get().asScala.map { offsetOpt =>
durableStateStoreQuery
.changesBySlices(entityType, minSlice, maxSlice,
offsetOpt.orElse(NoOffset))
}
- source.toJava
+ source.asJava
}
override def extractOffset(stateChange: DurableStateChange[A]): Offset =
stateChange.offset
diff --git
a/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/javadsl/EventSourcedProvider.scala
b/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/javadsl/EventSourcedProvider.scala
index 9bf7664..0fa5c53 100644
---
a/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/javadsl/EventSourcedProvider.scala
+++
b/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/javadsl/EventSourcedProvider.scala
@@ -19,7 +19,6 @@ import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionStage
import java.util.function.Supplier
-import scala.compat.java8.FutureConverters._
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
@@ -41,6 +40,7 @@ import pekko.projection.eventsourced.EventEnvelope
import pekko.projection.javadsl
import pekko.projection.javadsl.SourceProvider
import pekko.stream.javadsl.Source
+import pekko.util.FutureConverters._
@ApiMayChange
object EventSourcedProvider {
@@ -69,12 +69,12 @@ object EventSourcedProvider {
override def source(offsetAsync:
Supplier[CompletionStage[Optional[Offset]]])
: CompletionStage[Source[EventEnvelope[Event], NotUsed]] = {
- val source: Future[Source[EventEnvelope[Event], NotUsed]] =
offsetAsync.get().toScala.map { offsetOpt =>
+ val source: Future[Source[EventEnvelope[Event], NotUsed]] =
offsetAsync.get().asScala.map { offsetOpt =>
eventsByTagQuery
.eventsByTag(tag, offsetOpt.orElse(NoOffset))
.map(env => EventEnvelope(env))
}
- source.toJava
+ source.asJava
}
override def extractOffset(envelope: EventEnvelope[Event]): Offset =
envelope.offset
@@ -137,11 +137,11 @@ object EventSourcedProvider {
override def source(offsetAsync:
Supplier[CompletionStage[Optional[Offset]]])
:
CompletionStage[Source[pekko.persistence.query.typed.EventEnvelope[Event],
NotUsed]] = {
val source:
Future[Source[pekko.persistence.query.typed.EventEnvelope[Event], NotUsed]] =
- offsetAsync.get().toScala.map { offsetOpt =>
+ offsetAsync.get().asScala.map { offsetOpt =>
eventsBySlicesQuery
.eventsBySlices(entityType, minSlice, maxSlice,
offsetOpt.orElse(NoOffset))
}
- source.toJava
+ source.asJava
}
override def extractOffset(envelope:
pekko.persistence.query.typed.EventEnvelope[Event]): Offset = envelope.offset
diff --git
a/jdbc/src/main/scala/org/apache/pekko/projection/jdbc/javadsl/JdbcProjection.scala
b/jdbc/src/main/scala/org/apache/pekko/projection/jdbc/javadsl/JdbcProjection.scala
index e42524d..f67a1fc 100644
---
a/jdbc/src/main/scala/org/apache/pekko/projection/jdbc/javadsl/JdbcProjection.scala
+++
b/jdbc/src/main/scala/org/apache/pekko/projection/jdbc/javadsl/JdbcProjection.scala
@@ -16,8 +16,6 @@ package org.apache.pekko.projection.jdbc.javadsl
import java.util.concurrent.CompletionStage
import java.util.function.Supplier
-import scala.compat.java8.FutureConverters._
-
import org.apache.pekko
import pekko.Done
import pekko.actor.typed.ActorSystem
@@ -44,6 +42,7 @@ import
pekko.projection.jdbc.internal.GroupedJdbcHandlerAdapter
import pekko.projection.jdbc.internal.JdbcHandlerAdapter
import pekko.projection.jdbc.internal.JdbcProjectionImpl
import pekko.stream.javadsl.FlowWithContext
+import pekko.util.FutureConverters._
@ApiMayChange
object JdbcProjection {
@@ -293,7 +292,7 @@ object JdbcProjection {
def createTablesIfNotExists[S <: JdbcSession](
sessionFactory: Supplier[S],
system: ActorSystem[_]): CompletionStage[Done] =
- JdbcProjectionImpl.createOffsetStore(() =>
sessionFactory.get())(system).createIfNotExists().toJava
+ JdbcProjectionImpl.createOffsetStore(() =>
sessionFactory.get())(system).createIfNotExists().asJava
@deprecated("Renamed to createTablesIfNotExists", "1.2.0")
def createOffsetTableIfNotExists[S <: JdbcSession](
@@ -305,7 +304,7 @@ object JdbcProjection {
* For testing purposes the projection offset and management tables can be
dropped programmatically.
*/
def dropTablesIfExists[S <: JdbcSession](sessionFactory: Supplier[S],
system: ActorSystem[_]): CompletionStage[Done] =
- JdbcProjectionImpl.createOffsetStore(() =>
sessionFactory.get())(system).dropIfExists().toJava
+ JdbcProjectionImpl.createOffsetStore(() =>
sessionFactory.get())(system).dropIfExists().asJava
@deprecated("Renamed to dropTablesIfExists", "1.2.0")
def dropOffsetTableIfExists[S <: JdbcSession](
diff --git
a/kafka/src/main/scala/org/apache/pekko/projection/kafka/internal/KafkaSourceProviderImpl.scala
b/kafka/src/main/scala/org/apache/pekko/projection/kafka/internal/KafkaSourceProviderImpl.scala
index e22edb9..b40cdac 100644
---
a/kafka/src/main/scala/org/apache/pekko/projection/kafka/internal/KafkaSourceProviderImpl.scala
+++
b/kafka/src/main/scala/org/apache/pekko/projection/kafka/internal/KafkaSourceProviderImpl.scala
@@ -18,8 +18,6 @@ import java.util.Optional
import java.util.concurrent.CompletionStage
import java.util.function.Supplier
-import scala.compat.java8.FutureConverters._
-import scala.compat.java8.OptionConverters._
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
@@ -42,6 +40,8 @@ import pekko.projection.kafka.KafkaOffsets.keyToPartition
import pekko.projection.scaladsl
import pekko.stream.scaladsl.Keep
import pekko.stream.scaladsl.Source
+import pekko.util.FutureConverters._
+import pekko.util.OptionConverters._
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.record.TimestampType
@@ -115,7 +115,7 @@ import org.apache.kafka.common.record.TimestampType
override def source(readOffsets:
Supplier[CompletionStage[Optional[MergeableOffset[JLong]]]])
: CompletionStage[pekko.stream.javadsl.Source[ConsumerRecord[K, V],
NotUsed]] = {
- source(() => readOffsets.get().toScala.map(_.asScala)).map(_.asJava).toJava
+ source(() => readOffsets.get().asScala.map(_.toScala)).map(_.asJava).asJava
}
override def extractOffset(record: ConsumerRecord[K, V]):
MergeableOffset[JLong] =
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 9a6375c..b81996b 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -18,15 +18,15 @@ object Dependencies {
val Scala212 = "2.12.17"
val ScalaVersions = Seq(Scala213, Scala212)
- val PekkoVersionInDocs = "0.0.0+26623-85c2a469-SNAPSHOT"
- val ConnectorsVersionInDocs = "0.0.0+60-c7dd80e0-SNAPSHOT"
- val ConnectorsKafkaVersionInDocs = "0.0.0+1717-267012de-SNAPSHOT"
+ val PekkoVersionInDocs = "0.0.0+26656-898c6970-SNAPSHOT"
+ val ConnectorsVersionInDocs = "0.0.0+85-a82f3c3c-SNAPSHOT"
+ val ConnectorsKafkaVersionInDocs = "0.0.0+1728-e2c660ef-SNAPSHOT"
object Versions {
- val pekko = sys.props.getOrElse("build.pekko.version",
"0.0.0+26630-2c4d0ee0-SNAPSHOT")
- val pekkoPersistenceJdbc = "0.0.0+958-db5733e6-SNAPSHOT"
- val connectors = "0.0.0+64-20da4165-SNAPSHOT"
- val connectorsKafka =
sys.props.getOrElse("build.connectors.kafka.version",
"0.0.0+1719-2dce1c20-SNAPSHOT")
+ val pekko = sys.props.getOrElse("build.pekko.version",
"0.0.0+26656-898c6970-SNAPSHOT")
+ val pekkoPersistenceJdbc = "0.0.0+966-e6f717eb-SNAPSHOT"
+ val connectors = "0.0.0+85-a82f3c3c-SNAPSHOT"
+ val connectorsKafka =
sys.props.getOrElse("build.connectors.kafka.version",
"0.0.0+1728-e2c660ef-SNAPSHOT")
val slick = "3.3.3"
val scalaTest = "3.1.1"
val testContainers = "1.15.3"
@@ -100,7 +100,7 @@ object Dependencies {
val pekkoPersistenceTyped = "org.apache.pekko" %%
"pekko-persistence-typed" % Versions.pekko
val pekkoClusterShardingTyped = "org.apache.pekko" %%
"pekko-cluster-sharding-typed" % Versions.pekko
- val pekkoPersistenceCassandra = "org.apache.pekko" %%
"pekko-persistence-cassandra" % "0.0.0-1072-7c02b521-SNAPSHOT"
+ val pekkoPersistenceCassandra = "org.apache.pekko" %%
"pekko-persistence-cassandra" % "0.0.0-1079-c30f82ae-SNAPSHOT"
val pekkoPersistenceJdbc = "org.apache.pekko" %% "pekko-persistence-jdbc"
% Versions.pekkoPersistenceJdbc
val pekkoSerializationJackson = "org.apache.pekko" %%
"pekko-serialization-jackson" % Versions.pekko
}
diff --git
a/testkit/src/main/scala/org/apache/pekko/projection/testkit/internal/TestOffsetStoreAdapter.scala
b/testkit/src/main/scala/org/apache/pekko/projection/testkit/internal/TestOffsetStoreAdapter.scala
index 643db4c..1ff6483 100644
---
a/testkit/src/main/scala/org/apache/pekko/projection/testkit/internal/TestOffsetStoreAdapter.scala
+++
b/testkit/src/main/scala/org/apache/pekko/projection/testkit/internal/TestOffsetStoreAdapter.scala
@@ -14,8 +14,6 @@
package org.apache.pekko.projection.testkit.internal
import scala.jdk.CollectionConverters._
-import scala.compat.java8.OptionConverters._
-import scala.compat.java8.FutureConverters._
import scala.concurrent.Future
import org.apache.pekko
@@ -24,27 +22,30 @@ import pekko.annotation.InternalApi
import pekko.projection.ProjectionId
import pekko.projection.internal.ManagementState
import pekko.projection.testkit.scaladsl.TestOffsetStore
+import pekko.util.OptionConverters._
+import pekko.util.FutureConverters._
+
@InternalApi private[projection] class TestOffsetStoreAdapter[Offset](
delegate: pekko.projection.testkit.javadsl.TestOffsetStore[Offset])
extends TestOffsetStore[Offset] {
- override def lastOffset(): Option[Offset] = delegate.lastOffset().asScala
+ override def lastOffset(): Option[Offset] = delegate.lastOffset().toScala
override def allOffsets(): List[(ProjectionId, Offset)] =
delegate.allOffsets().asScala.map(_.toScala).toList
override def readOffsets(): Future[Option[Offset]] = {
implicit val ec = pekko.dispatch.ExecutionContexts.parasitic
- delegate.readOffsets().toScala.map(_.asScala)
+ delegate.readOffsets().asScala.map(_.toScala)
}
override def saveOffset(projectionId: ProjectionId, offset: Offset):
Future[Done] =
- delegate.saveOffset(projectionId, offset).toScala
+ delegate.saveOffset(projectionId, offset).asScala
override def readManagementState(projectionId: ProjectionId):
Future[Option[ManagementState]] = {
implicit val ec = pekko.dispatch.ExecutionContexts.parasitic
- delegate.readManagementState(projectionId).toScala.map(_.asScala)
+ delegate.readManagementState(projectionId).asScala.map(_.toScala)
}
override def savePaused(projectionId: ProjectionId, paused: Boolean):
Future[Done] =
- delegate.savePaused(projectionId, paused).toScala
+ delegate.savePaused(projectionId, paused).asScala
}
diff --git
a/testkit/src/main/scala/org/apache/pekko/projection/testkit/internal/TestSourceProviderImpl.scala
b/testkit/src/main/scala/org/apache/pekko/projection/testkit/internal/TestSourceProviderImpl.scala
index ff3093e..fbe1c86 100644
---
a/testkit/src/main/scala/org/apache/pekko/projection/testkit/internal/TestSourceProviderImpl.scala
+++
b/testkit/src/main/scala/org/apache/pekko/projection/testkit/internal/TestSourceProviderImpl.scala
@@ -17,9 +17,6 @@ import java.util.Optional
import java.util.concurrent.CompletionStage
import java.util.function.Supplier
-import scala.compat.java8.FunctionConverters._
-import scala.compat.java8.FutureConverters._
-import scala.compat.java8.OptionConverters._
import scala.concurrent.Future
import org.apache.pekko
@@ -28,6 +25,9 @@ import pekko.annotation.InternalApi
import pekko.projection.OffsetVerification
import pekko.projection.testkit.scaladsl.TestSourceProvider
import pekko.stream.scaladsl.Source
+import pekko.util.FunctionConverters._
+import pekko.util.FutureConverters._
+import pekko.util.OptionConverters._
/**
* INTERNAL API
@@ -100,7 +100,7 @@ private[projection] class TestSourceProviderImpl[Offset,
Envelope] private[proje
override def source(offset: Supplier[CompletionStage[Optional[Offset]]])
: CompletionStage[pekko.stream.javadsl.Source[Envelope, NotUsed]] = {
implicit val ec = pekko.dispatch.ExecutionContexts.parasitic
- source(() => offset.get().toScala.map(_.asScala)).map(_.asJava).toJava
+ source(() => offset.get().asScala.map(_.toScala)).map(_.asJava).asJava
}
override def extractOffset(envelope: Envelope): Offset =
extractOffsetFn(envelope)
diff --git
a/testkit/src/main/scala/org/apache/pekko/projection/testkit/javadsl/TestSourceProvider.scala
b/testkit/src/main/scala/org/apache/pekko/projection/testkit/javadsl/TestSourceProvider.scala
index b89a1db..b215d96 100644
---
a/testkit/src/main/scala/org/apache/pekko/projection/testkit/javadsl/TestSourceProvider.scala
+++
b/testkit/src/main/scala/org/apache/pekko/projection/testkit/javadsl/TestSourceProvider.scala
@@ -13,14 +13,13 @@
package org.apache.pekko.projection.testkit.javadsl
-import scala.compat.java8.FunctionConverters._
-
import org.apache.pekko
import pekko.NotUsed
import pekko.annotation.ApiMayChange
import pekko.projection.OffsetVerification
import pekko.projection.javadsl.VerifiableSourceProvider
import pekko.projection.testkit.internal.TestSourceProviderImpl
+import pekko.util.FunctionConverters._
@ApiMayChange
object TestSourceProvider {
diff --git
a/testkit/src/test/java/org/apache/pekko/projection/testkit/javadsl/ProjectionTestKitTest.java
b/testkit/src/test/java/org/apache/pekko/projection/testkit/javadsl/ProjectionTestKitTest.java
index 0173a0d..05f816d 100644
---
a/testkit/src/test/java/org/apache/pekko/projection/testkit/javadsl/ProjectionTestKitTest.java
+++
b/testkit/src/test/java/org/apache/pekko/projection/testkit/javadsl/ProjectionTestKitTest.java
@@ -31,13 +31,13 @@ import org.apache.pekko.stream.SharedKillSwitch;
import org.apache.pekko.stream.javadsl.DelayStrategy;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
+import org.apache.pekko.util.FutureConverters;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.ComparisonFailure;
import org.junit.Test;
import org.scalatestplus.junit.JUnitSuite;
import scala.Option;
-import scala.compat.java8.FutureConverters;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
@@ -285,7 +285,7 @@ public class ProjectionTestKitTest extends JUnitSuite {
ActorSystem<?> system) {
this.killSwitch = killSwitch;
CompletionStage<Done> done = source.asJava().runWith(Sink.ignore(),
system);
- this.futureDone = FutureConverters.toScala(done);
+ this.futureDone = FutureConverters.asScala(done);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]