This is an automated email from the ASF dual-hosted git repository.
mdedetrich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-projection.git
commit af0dc3cc8635a1f5a7760bbdcc6259559d70c77a
Author: Auto Format <nobody>
AuthorDate: Fri Mar 24 09:19:29 2023 +0100
format source with scalafmt, #12
---
build.sbt | 54 ++++----
.../cassandra/CassandraProjectionDocExample.scala | 80 ++++++------
.../scala/docs/cassandra/WordCountDocExample.scala | 36 +++---
.../docs/cassandra/WordCountDocExampleSpec.scala | 20 +--
.../scala/docs/classic/ClassicDocExample.scala | 4 +-
.../docs/eventsourced/EventSourcedDocExample.scala | 8 +-
.../scala/docs/eventsourced/ShoppingCart.scala | 10 +-
.../test/scala/docs/guide/EventGeneratorApp.scala | 102 +++++++--------
.../test/scala/docs/guide/ShoppingCartApp.scala | 12 +-
.../scala/docs/jdbc/JdbcProjectionDocExample.scala | 28 ++--
.../test/scala/docs/kafka/KafkaDocExample.scala | 36 +++---
.../docs/slick/SlickProjectionDocExample.scala | 32 ++---
.../docs/state/DurableStateStoreDocExample.scala | 8 +-
.../scala/docs/testkit/TestKitDocExample.scala | 18 +--
project/AkkaDisciplinePlugin.scala | 34 ++---
project/Common.scala | 40 +++---
project/CopyrightHeader.scala | 4 +-
project/Dependencies.scala | 142 ++++++++++-----------
project/Protobuf.scala | 18 +--
.../cassandra/CassandraOffsetStoreSpec.scala | 2 +-
.../cassandra/CassandraProjectionSpec.scala | 15 +--
.../akka/projection/ProjectionBehaviorSpec.scala | 9 +-
.../internal/OffsetSerializationSpec.scala | 2 +-
.../internal/TelemetryProviderSpec.scala | 1 -
.../internal/metrics/ServiceTimeMetricSpec.scala | 6 +-
.../internal/metrics/tools/InMemTelemetry.scala | 8 +-
.../internal/metrics/tools/TestHandlers.scala | 9 +-
.../main/scala/akka/projection/Projection.scala | 7 +-
.../scala/akka/projection/ProjectionContext.scala | 1 -
.../internal/SourceProviderAdapter.scala | 4 +-
.../scala/akka/projection/internal/Telemetry.scala | 3 +-
.../projection/scaladsl/ProjectionManagement.scala | 9 +-
.../akka/projection/jdbc/JdbcProjectionSpec.scala | 2 -
.../scala/akka/projection/jdbc/JdbcSession.scala | 1 -
.../projection/jdbc/internal/JdbcOffsetStore.scala | 5 -
.../jdbc/internal/JdbcProjectionImpl.scala | 4 +-
.../projection/jdbc/javadsl/JdbcProjection.scala | 1 -
.../akka/projection/jdbc/JdbcOffsetStoreSpec.scala | 2 -
.../integration/KafkaToSlickIntegrationSpec.scala | 13 +-
.../kafka/internal/KafkaSourceProviderImpl.scala | 9 +-
.../slick/SlickContainerOffsetStoreSpec.scala | 2 +-
.../slick/internal/SlickProjectionImpl.scala | 2 +-
.../projection/slick/SlickOffsetStoreSpec.scala | 2 +-
.../projection/slick/SlickProjectionSpec.scala | 3 -
44 files changed, 398 insertions(+), 410 deletions(-)
diff --git a/build.sbt b/build.sbt
index ac06ec6..d6d3f75 100644
--- a/build.sbt
+++ b/build.sbt
@@ -10,7 +10,7 @@ lazy val core =
.settings(
name := "pekko-projection-core",
Compile / packageBin / packageOptions += Package.ManifestAttributes(
- "Automatic-Module-Name" -> "pekko.projection.core"))
+ "Automatic-Module-Name" -> "pekko.projection.core"))
.settings(Protobuf.settings)
lazy val coreTest =
@@ -135,32 +135,32 @@ lazy val docs = project
Preprocess / sourceDirectory := (LocalRootProject / ScalaUnidoc / unidoc /
target).value,
Paradox / siteSubdirName :=
s"docs/akka-projection/${projectInfoVersion.value}",
Compile / paradoxProperties ++= Map(
- "project.url" -> "https://doc.akka.io/docs/akka-projection/current/",
- "canonical.base_url" ->
"https://doc.akka.io/docs/akka-projection/current",
- "github.base_url" ->
"https://github.com/apache/incubator-pekko-projection",
- "akka.version" -> Dependencies.Versions.akka,
- // Akka
- "extref.akka.base_url" ->
s"https://doc.akka.io/docs/akka/${Dependencies.AkkaVersionInDocs}/%s",
- "scaladoc.akka.base_url" ->
s"https://doc.akka.io/api/akka/${Dependencies.AkkaVersionInDocs}/",
- "javadoc.akka.base_url" ->
s"https://doc.akka.io/japi/akka/${Dependencies.AkkaVersionInDocs}/",
- "javadoc.akka.link_style" -> "direct",
- // Alpakka
- "extref.alpakka.base_url" ->
s"https://doc.akka.io/docs/alpakka/${Dependencies.AlpakkaVersionInDocs}/%s",
- "scaladoc.akka.stream.alpakka.base_url" ->
s"https://doc.akka.io/api/alpakka/${Dependencies.AlpakkaVersionInDocs}/",
- "javadoc.akka.stream.alpakka.base_url" -> "",
- // Alpakka Kafka
- "extref.alpakka-kafka.base_url" ->
s"https://doc.akka.io/docs/alpakka-kafka/${Dependencies.AlpakkaKafkaVersionInDocs}/%s",
- "scaladoc.akka.kafka.base_url" ->
s"https://doc.akka.io/api/alpakka-kafka/${Dependencies.AlpakkaKafkaVersionInDocs}/",
- "javadoc.akka.kafka.base_url" -> "",
- // Java
- "javadoc.base_url" -> "https://docs.oracle.com/javase/8/docs/api/",
- // Scala
- "scaladoc.scala.base_url" ->
s"https://www.scala-lang.org/api/${scalaBinaryVersion.value}.x/",
- "scaladoc.akka.projection.base_url" -> s"/${(Preprocess /
siteSubdirName).value}/",
- "javadoc.akka.projection.base_url" -> "", // no Javadoc is published
- // Misc
- "extref.samples.base_url" ->
"https://developer.lightbend.com/start/?group=akka&project=%s",
- "extref.platform-guide.base_url" ->
"https://developer.lightbend.com/docs/akka-platform-guide/%s"),
+ "project.url" -> "https://doc.akka.io/docs/akka-projection/current/",
+ "canonical.base_url" ->
"https://doc.akka.io/docs/akka-projection/current",
+ "github.base_url" ->
"https://github.com/apache/incubator-pekko-projection",
+ "akka.version" -> Dependencies.Versions.akka,
+ // Akka
+ "extref.akka.base_url" ->
s"https://doc.akka.io/docs/akka/${Dependencies.AkkaVersionInDocs}/%s",
+ "scaladoc.akka.base_url" ->
s"https://doc.akka.io/api/akka/${Dependencies.AkkaVersionInDocs}/",
+ "javadoc.akka.base_url" ->
s"https://doc.akka.io/japi/akka/${Dependencies.AkkaVersionInDocs}/",
+ "javadoc.akka.link_style" -> "direct",
+ // Alpakka
+ "extref.alpakka.base_url" ->
s"https://doc.akka.io/docs/alpakka/${Dependencies.AlpakkaVersionInDocs}/%s",
+ "scaladoc.akka.stream.alpakka.base_url" ->
s"https://doc.akka.io/api/alpakka/${Dependencies.AlpakkaVersionInDocs}/",
+ "javadoc.akka.stream.alpakka.base_url" -> "",
+ // Alpakka Kafka
+ "extref.alpakka-kafka.base_url" ->
s"https://doc.akka.io/docs/alpakka-kafka/${Dependencies.AlpakkaKafkaVersionInDocs}/%s",
+ "scaladoc.akka.kafka.base_url" ->
s"https://doc.akka.io/api/alpakka-kafka/${Dependencies.AlpakkaKafkaVersionInDocs}/",
+ "javadoc.akka.kafka.base_url" -> "",
+ // Java
+ "javadoc.base_url" -> "https://docs.oracle.com/javase/8/docs/api/",
+ // Scala
+ "scaladoc.scala.base_url" ->
s"https://www.scala-lang.org/api/${scalaBinaryVersion.value}.x/",
+ "scaladoc.akka.projection.base_url" -> s"/${(Preprocess /
siteSubdirName).value}/",
+ "javadoc.akka.projection.base_url" -> "", // no Javadoc is published
+ // Misc
+ "extref.samples.base_url" ->
"https://developer.lightbend.com/start/?group=akka&project=%s",
+ "extref.platform-guide.base_url" ->
"https://developer.lightbend.com/docs/akka-platform-guide/%s"),
paradoxGroups := Map("Language" -> Seq("Java", "Scala")),
paradoxRoots := List("index.html",
"getting-started/event-generator-app.html"),
ApidocPlugin.autoImport.apidocRootPackage := "akka",
diff --git
a/examples/src/it/scala/docs/cassandra/CassandraProjectionDocExample.scala
b/examples/src/it/scala/docs/cassandra/CassandraProjectionDocExample.scala
index ff8a207..6346863 100644
--- a/examples/src/it/scala/docs/cassandra/CassandraProjectionDocExample.scala
+++ b/examples/src/it/scala/docs/cassandra/CassandraProjectionDocExample.scala
@@ -43,7 +43,7 @@ object CassandraProjectionDocExample {
private val system = ActorSystem[Nothing](Behaviors.empty, "Example")
- //#handler
+ // #handler
class ShoppingCartHandler extends Handler[EventEnvelope[ShoppingCart.Event]]
{
private val logger = LoggerFactory.getLogger(getClass)
@@ -59,9 +59,9 @@ object CassandraProjectionDocExample {
}
}
}
- //#handler
+ // #handler
- //#grouped-handler
+ // #grouped-handler
import scala.collection.immutable
class GroupedShoppingCartHandler extends
Handler[immutable.Seq[EventEnvelope[ShoppingCart.Event]]] {
@@ -78,16 +78,16 @@ object CassandraProjectionDocExample {
Future.successful(Done)
}
}
- //#grouped-handler
+ // #grouped-handler
- //#sourceProvider
+ // #sourceProvider
val sourceProvider =
EventSourcedProvider
.eventsByTag[ShoppingCart.Event](system, readJournalPluginId =
CassandraReadJournal.Identifier, tag = "carts-1")
- //#sourceProvider
+ // #sourceProvider
object IllustrateAtLeastOnce {
- //#atLeastOnce
+ // #atLeastOnce
val projection =
CassandraProjection
.atLeastOnce(
@@ -95,21 +95,21 @@ object CassandraProjectionDocExample {
sourceProvider,
handler = () => new ShoppingCartHandler)
.withSaveOffset(afterEnvelopes = 100, afterDuration = 500.millis)
- //#atLeastOnce
+ // #atLeastOnce
}
object IllustrateAtMostOnce {
- //#atMostOnce
+ // #atMostOnce
val projection =
CassandraProjection.atMostOnce(
projectionId = ProjectionId("shopping-carts", "carts-1"),
sourceProvider,
handler = () => new ShoppingCartHandler)
- //#atMostOnce
+ // #atMostOnce
}
object IllustrateGrouped {
- //#grouped
+ // #grouped
val projection =
CassandraProjection
.groupedWithin(
@@ -117,11 +117,11 @@ object CassandraProjectionDocExample {
sourceProvider,
handler = () => new GroupedShoppingCartHandler)
.withGroup(groupAfterEnvelopes = 20, groupAfterDuration = 500.millis)
- //#grouped
+ // #grouped
}
object IllustrateAtLeastOnceFlow {
- //#atLeastOnceFlow
+ // #atLeastOnceFlow
val logger = LoggerFactory.getLogger(getClass)
val flow = FlowWithContext[EventEnvelope[ShoppingCart.Event],
ProjectionContext]
@@ -140,11 +140,11 @@ object CassandraProjectionDocExample {
CassandraProjection
.atLeastOnceFlow(projectionId = ProjectionId("shopping-carts",
"carts-1"), sourceProvider, handler = flow)
.withSaveOffset(afterEnvelopes = 100, afterDuration = 500.millis)
- //#atLeastOnceFlow
+ // #atLeastOnceFlow
}
object IllustrateRecoveryStrategy {
- //#withRecoveryStrategy
+ // #withRecoveryStrategy
import akka.projection.HandlerRecoveryStrategy
val projection =
@@ -154,11 +154,11 @@ object CassandraProjectionDocExample {
sourceProvider,
handler = () => new ShoppingCartHandler)
.withRecoveryStrategy(HandlerRecoveryStrategy.retryAndFail(retries =
10, delay = 1.second))
- //#withRecoveryStrategy
+ // #withRecoveryStrategy
}
object IllustrateRestart {
- //#withRestartBackoff
+ // #withRestartBackoff
val projection =
CassandraProjection
.atLeastOnce(
@@ -166,21 +166,21 @@ object CassandraProjectionDocExample {
sourceProvider,
handler = () => new ShoppingCartHandler)
.withRestartBackoff(minBackoff = 200.millis, maxBackoff = 5.seconds,
randomFactor = 0.1)
- //#withRestartBackoff
+ // #withRestartBackoff
}
object IllustrateRunningWithShardedDaemon {
- //#running-source-provider
+ // #running-source-provider
def sourceProvider(tag: String) =
EventSourcedProvider
.eventsByTag[ShoppingCart.Event](
system = system,
readJournalPluginId = CassandraReadJournal.Identifier,
tag = tag)
- //#running-source-provider
+ // #running-source-provider
- //#running-projection
+ // #running-projection
def projection(tag: String) =
CassandraProjection
.atLeastOnce(
@@ -188,21 +188,21 @@ object CassandraProjectionDocExample {
sourceProvider(tag),
handler = () => new ShoppingCartHandler)
.withSaveOffset(100, 500.millis)
- //#running-projection
+ // #running-projection
- //#running-with-daemon-process
+ // #running-with-daemon-process
ShardedDaemonProcess(system).init[ProjectionBehavior.Command](
name = "shopping-carts",
numberOfInstances = ShoppingCart.tags.size,
behaviorFactory = (i: Int) =>
ProjectionBehavior(projection(ShoppingCart.tags(i))),
stopMessage = ProjectionBehavior.Stop)
- //#running-with-daemon-process
+ // #running-with-daemon-process
}
object IllustrateRunningWithActor {
Behaviors.setup[String] { context =>
- //#running-with-actor
+ // #running-with-actor
def sourceProvider(tag: String) =
EventSourcedProvider
.eventsByTag[ShoppingCart.Event](
@@ -220,7 +220,7 @@ object CassandraProjectionDocExample {
val projection1 = projection("carts-1")
context.spawn(ProjectionBehavior(projection1),
projection1.projectionId.id)
- //#running-with-actor
+ // #running-with-actor
Behaviors.empty
}
@@ -228,7 +228,7 @@ object CassandraProjectionDocExample {
object IllustrateRunningWithSingleton {
- //#running-with-singleton
+ // #running-with-singleton
import akka.cluster.typed.ClusterSingleton
import akka.cluster.typed.SingletonActor
@@ -251,13 +251,13 @@ object CassandraProjectionDocExample {
ClusterSingleton(system).init(
SingletonActor(ProjectionBehavior(projection1),
projection1.projectionId.id)
.withStopMessage(ProjectionBehavior.Stop))
- //#running-with-singleton
+ // #running-with-singleton
}
object IllustrateProjectionSettings {
- //#projection-settings
+ // #projection-settings
val projection =
CassandraProjection
.atLeastOnce(
@@ -266,33 +266,33 @@ object CassandraProjectionDocExample {
handler = () => new ShoppingCartHandler)
.withRestartBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds,
randomFactor = 0.5)
.withSaveOffset(100, 500.millis)
- //#projection-settings
+ // #projection-settings
}
object IllustrateGetOffset {
- //#get-offset
+ // #get-offset
import akka.projection.scaladsl.ProjectionManagement
import akka.persistence.query.Offset
import akka.projection.ProjectionId
val projectionId = ProjectionId("shopping-carts", "carts-1")
val currentOffset: Future[Option[Offset]] =
ProjectionManagement(system).getOffset[Offset](projectionId)
- //#get-offset
+ // #get-offset
}
object IllustrateClearOffset {
import akka.projection.scaladsl.ProjectionManagement
- //#clear-offset
+ // #clear-offset
val projectionId = ProjectionId("shopping-carts", "carts-1")
val done: Future[Done] =
ProjectionManagement(system).clearOffset(projectionId)
- //#clear-offset
+ // #clear-offset
}
object IllustrateUpdateOffset {
import akka.projection.scaladsl.ProjectionManagement
import system.executionContext
- //#update-offset
+ // #update-offset
import akka.persistence.query.Sequence
val projectionId = ProjectionId("shopping-carts", "carts-1")
val currentOffset: Future[Option[Sequence]] =
ProjectionManagement(system).getOffset[Sequence](projectionId)
@@ -300,13 +300,13 @@ object CassandraProjectionDocExample {
case Some(s) =>
ProjectionManagement(system).updateOffset[Sequence](projectionId,
Sequence(s.value + 1))
case None => // already removed
}
- //#update-offset
+ // #update-offset
}
object IllustratePauseResume {
import system.executionContext
def someDataMigration() = Future.successful(Done)
- //#pause-resume
+ // #pause-resume
import akka.projection.scaladsl.ProjectionManagement
import akka.projection.ProjectionId
@@ -317,16 +317,16 @@ object CassandraProjectionDocExample {
_ <- someDataMigration()
_ <- mgmt.resume(projectionId)
} yield Done
- //#pause-resume
+ // #pause-resume
}
object IllustrateIsPaused {
import akka.projection.scaladsl.ProjectionManagement
import akka.projection.ProjectionId
- //#is-paused
+ // #is-paused
val projectionId = ProjectionId("shopping-carts", "carts-1")
val paused: Future[Boolean] =
ProjectionManagement(system).isPaused(projectionId)
- //#is-paused
+ // #is-paused
}
}
diff --git a/examples/src/it/scala/docs/cassandra/WordCountDocExample.scala
b/examples/src/it/scala/docs/cassandra/WordCountDocExample.scala
index ec676b4..20fd4db 100644
--- a/examples/src/it/scala/docs/cassandra/WordCountDocExample.scala
+++ b/examples/src/it/scala/docs/cassandra/WordCountDocExample.scala
@@ -24,21 +24,21 @@ import org.slf4j.LoggerFactory
object WordCountDocExample {
- //#envelope
+ // #envelope
type Word = String
type Count = Int
final case class WordEnvelope(offset: Long, word: Word)
- //#envelope
+ // #envelope
- //#repository
+ // #repository
trait WordCountRepository {
def load(id: String, word: Word): Future[Count]
def loadAll(id: String): Future[Map[Word, Count]]
def save(id: String, word: Word, count: Count): Future[Done]
}
- //#repository
+ // #repository
class CassandraWordCountRepository(session: CassandraSession)(implicit val
ec: ExecutionContext)
extends WordCountRepository {
@@ -80,7 +80,7 @@ object WordCountDocExample {
}
}
- //#sourceProvider
+ // #sourceProvider
class WordSource(implicit ec: ExecutionContext) extends SourceProvider[Long,
WordEnvelope] {
private val src = Source(
@@ -97,10 +97,10 @@ object WordCountDocExample {
override def extractCreationTime(env: WordEnvelope): Long = 0L
}
- //#sourceProvider
+ // #sourceProvider
object IllustrateVariables {
- //#mutableState
+ // #mutableState
class WordCountHandler extends Handler[WordEnvelope] {
private val logger = LoggerFactory.getLogger(getClass)
private var state: Map[Word, Count] = Map.empty
@@ -113,12 +113,12 @@ object WordCountDocExample {
Future.successful(Done)
}
}
- //#mutableState
+ // #mutableState
}
object IllustrateStatefulHandlerLoadingInitialState {
- //#loadingInitialState
+ // #loadingInitialState
import akka.projection.scaladsl.StatefulHandler
class WordCountHandler(projectionId: ProjectionId, repository:
WordCountRepository)(implicit ec: ExecutionContext)
@@ -136,12 +136,12 @@ object WordCountDocExample {
newState
}
}
- //#loadingInitialState
+ // #loadingInitialState
}
object IllustrateStatefulHandlerLoadingStateOnDemand {
- //#loadingOnDemand
+ // #loadingOnDemand
import akka.projection.scaladsl.StatefulHandler
class WordCountHandler(projectionId: ProjectionId, repository:
WordCountRepository)(implicit ec: ExecutionContext)
@@ -171,14 +171,14 @@ object WordCountDocExample {
}
}
- //#loadingOnDemand
+ // #loadingOnDemand
}
object IllstrateActorLoadingInitialState {
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
- //#actorHandler
+ // #actorHandler
import akka.projection.scaladsl.ActorHandler
class WordCountActorHandler(behavior:
Behavior[WordCountProcessor.Command])(implicit system: ActorSystem[_])
@@ -195,9 +195,9 @@ object WordCountDocExample {
}
}
}
- //#actorHandler
+ // #actorHandler
- //#behaviorLoadingInitialState
+ // #behaviorLoadingInitialState
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.SupervisorStrategy
@@ -270,7 +270,7 @@ object WordCountDocExample {
}
}
}
- //#behaviorLoadingInitialState
+ // #behaviorLoadingInitialState
}
object IllstrateActorLoadingStateOnDemand {
@@ -296,7 +296,7 @@ object WordCountDocExample {
}
}
- //#behaviorLoadingOnDemand
+ // #behaviorLoadingOnDemand
object WordCountProcessor {
trait Command
final case class Handle(envelope: WordEnvelope, replyTo:
ActorRef[Try[Done]]) extends Command
@@ -375,7 +375,7 @@ object WordCountDocExample {
}
}
}
- //#behaviorLoadingOnDemand
+ // #behaviorLoadingOnDemand
}
}
diff --git a/examples/src/it/scala/docs/cassandra/WordCountDocExampleSpec.scala
b/examples/src/it/scala/docs/cassandra/WordCountDocExampleSpec.scala
index 6bbd306..70cc2d1 100644
--- a/examples/src/it/scala/docs/cassandra/WordCountDocExampleSpec.scala
+++ b/examples/src/it/scala/docs/cassandra/WordCountDocExampleSpec.scala
@@ -39,16 +39,16 @@ class WordCountDocExampleSpec
Await.result(ContainerSessionProvider.started, 30.seconds)
Await.result(for {
- _ <- CassandraProjection.createTablesIfNotExists()
- _ <- repository.createKeyspaceAndTable()
- } yield Done, 30.seconds)
+ _ <- CassandraProjection.createTablesIfNotExists()
+ _ <- repository.createKeyspaceAndTable()
+ } yield Done, 30.seconds)
}
override protected def afterAll(): Unit = {
Await.ready(for {
- _ <- session.executeDDL(s"DROP keyspace akka_projection.offset_store")
- _ <- session.executeDDL(s"DROP keyspace ${repository.keyspace}")
- } yield Done, 30.seconds)
+ _ <- session.executeDDL(s"DROP keyspace akka_projection.offset_store")
+ _ <- session.executeDDL(s"DROP keyspace ${repository.keyspace}")
+ } yield Done, 30.seconds)
super.afterAll()
}
@@ -74,14 +74,14 @@ class WordCountDocExampleSpec
val projectionId = genRandomProjectionId()
- //#projection
+ // #projection
val projection =
CassandraProjection
.atLeastOnce[Long, WordEnvelope](
projectionId,
sourceProvider = new WordSource,
handler = () => new WordCountHandler(projectionId, repository))
- //#projection
+ // #projection
runAndAssert(projection)
}
@@ -106,14 +106,14 @@ class WordCountDocExampleSpec
val projectionId = genRandomProjectionId()
- //#actorHandlerProjection
+ // #actorHandlerProjection
val projection =
CassandraProjection
.atLeastOnce[Long, WordEnvelope](
projectionId,
sourceProvider = new WordSource,
handler = () => new
WordCountActorHandler(WordCountProcessor(projectionId, repository)))
- //#actorHandlerProjection
+ // #actorHandlerProjection
runAndAssert(projection)
}
diff --git a/examples/src/test/scala/docs/classic/ClassicDocExample.scala
b/examples/src/test/scala/docs/classic/ClassicDocExample.scala
index 29ad347..49b354f 100644
--- a/examples/src/test/scala/docs/classic/ClassicDocExample.scala
+++ b/examples/src/test/scala/docs/classic/ClassicDocExample.scala
@@ -24,10 +24,10 @@ object ClassicDocExample {
private val system = akka.actor.ActorSystem("Example")
private val projection: Projection[Any] = null
- //#spawn
+ // #spawn
import akka.actor.typed.scaladsl.adapter._
system.spawn(ProjectionBehavior(projection), "theProjection")
- //#spawn
+ // #spawn
}
}
diff --git
a/examples/src/test/scala/docs/eventsourced/EventSourcedDocExample.scala
b/examples/src/test/scala/docs/eventsourced/EventSourcedDocExample.scala
index 58e2d21..eff420b 100644
--- a/examples/src/test/scala/docs/eventsourced/EventSourcedDocExample.scala
+++ b/examples/src/test/scala/docs/eventsourced/EventSourcedDocExample.scala
@@ -12,7 +12,7 @@ object EventSourcedDocExample {
private val system = ActorSystem[Nothing](Behaviors.empty, "Example")
object IllustrateEventsByTagSourceProvider {
- //#eventsByTagSourceProvider
+ // #eventsByTagSourceProvider
import akka.projection.eventsourced.EventEnvelope
import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal
import akka.persistence.query.Offset
@@ -22,7 +22,7 @@ object EventSourcedDocExample {
val sourceProvider: SourceProvider[Offset,
EventEnvelope[ShoppingCart.Event]] =
EventSourcedProvider
.eventsByTag[ShoppingCart.Event](system, readJournalPluginId =
CassandraReadJournal.Identifier, tag = "carts-1")
- //#eventsByTagSourceProvider
+ // #eventsByTagSourceProvider
}
object IllustrateEventsBySlicesSourceProvider {
@@ -30,7 +30,7 @@ object EventSourcedDocExample {
val Identifier = "akka.persistence.r2dbc.query"
}
- //#eventsBySlicesSourceProvider
+ // #eventsBySlicesSourceProvider
import akka.persistence.query.typed.EventEnvelope
import akka.persistence.query.Offset
import akka.projection.eventsourced.scaladsl.EventSourcedProvider
@@ -53,6 +53,6 @@ object EventSourcedDocExample {
entityType,
minSlice,
maxSlice)
- //#eventsBySlicesSourceProvider
+ // #eventsBySlicesSourceProvider
}
}
diff --git a/examples/src/test/scala/docs/eventsourced/ShoppingCart.scala
b/examples/src/test/scala/docs/eventsourced/ShoppingCart.scala
index 38a3972..f575833 100644
--- a/examples/src/test/scala/docs/eventsourced/ShoppingCart.scala
+++ b/examples/src/test/scala/docs/eventsourced/ShoppingCart.scala
@@ -134,11 +134,11 @@ object ShoppingCart {
final case class CheckedOut(cartId: String, eventTime: Instant) extends Event
- //#slicingTags
+ // #slicingTags
val tags = Vector.tabulate(5)(i => s"carts-$i")
- //#slicingTags
+ // #slicingTags
- //#tagging
+ // #tagging
val EntityKey: EntityTypeKey[Command] =
EntityTypeKey[Command]("ShoppingCart")
def init(system: ActorSystem[_]): Unit = {
@@ -155,7 +155,7 @@ object ShoppingCart {
PersistenceId(EntityKey.name, cartId),
State.empty,
(state, command) =>
- //The shopping cart behavior changes if it's checked out or not.
+ // The shopping cart behavior changes if it's checked out or not.
// The commands are handled differently for each case.
if (state.isCheckedOut) checkedOutShoppingCart(cartId, state,
command)
else openShoppingCart(cartId, state, command),
@@ -164,7 +164,7 @@ object ShoppingCart {
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100,
keepNSnapshots = 3))
.onPersistFailure(SupervisorStrategy.restartWithBackoff(200.millis,
5.seconds, 0.1))
}
- //#tagging
+ // #tagging
private def openShoppingCart(cartId: String, state: State, command:
Command): ReplyEffect[Event, State] =
command match {
diff --git a/examples/src/test/scala/docs/guide/EventGeneratorApp.scala
b/examples/src/test/scala/docs/guide/EventGeneratorApp.scala
index 42fd556..3b01195 100644
--- a/examples/src/test/scala/docs/guide/EventGeneratorApp.scala
+++ b/examples/src/test/scala/docs/guide/EventGeneratorApp.scala
@@ -45,57 +45,57 @@ object EventGeneratorApp extends App {
.withFallback(ConfigFactory.load("guide-shopping-cart-app.conf"))
ActorSystem(Behaviors.setup[String] {
- ctx =>
- implicit val system = ctx.system
- val cluster = Cluster(system)
- cluster.manager ! Join(cluster.selfMember.address)
- val sharding = ClusterSharding(system)
- val _ = sharding.init(Entity(EntityKey) { entityCtx =>
- cartBehavior(entityCtx.entityId, tagFactory(entityCtx.entityId))
- })
-
- Source
- .tick(1.second, 1.second, "checkout")
- .mapConcat {
- case "checkout" =>
- val cartId = java.util.UUID.randomUUID().toString.take(5)
- val items = randomItems()
- val itemEvents = (0 to items).flatMap {
- _ =>
- val itemId = Products(Random.nextInt(Products.size))
-
- // add the item
- val quantity = randomQuantity()
- val itemAdded = ItemAdded(cartId, itemId, quantity)
-
- // make up to `MaxItemAdjusted` adjustments to quantity of item
- val adjustments = Random.nextInt(MaxItemsAdjusted)
- val itemQuantityAdjusted = (0 to
adjustments).foldLeft(Seq[ItemQuantityAdjusted]()) {
- case (events, _) =>
- val newQuantity = randomQuantity()
- val oldQuantity =
- if (events.isEmpty) itemAdded.quantity
- else events.last.newQuantity
- events :+ ItemQuantityAdjusted(cartId, itemId,
newQuantity, oldQuantity)
- }
-
- // flip a coin to decide whether or not to remove the item
- val itemRemoved =
- if (Random.nextBoolean())
- List(ItemRemoved(cartId, itemId,
itemQuantityAdjusted.last.newQuantity))
- else Nil
-
- List(itemAdded) ++ itemQuantityAdjusted ++ itemRemoved
- }
-
- // checkout the cart and all its preceding item events
- itemEvents :+ CheckedOut(cartId, Instant.now())
- }
- // send each event to the sharded entity represented by the event's
cartId
- .runWith(Sink.foreach(event => sharding.entityRefFor(EntityKey,
event.cartId).ref.tell(event)))
-
- Behaviors.empty
- }, "EventGeneratorApp", config)
+ ctx =>
+ implicit val system = ctx.system
+ val cluster = Cluster(system)
+ cluster.manager ! Join(cluster.selfMember.address)
+ val sharding = ClusterSharding(system)
+ val _ = sharding.init(Entity(EntityKey) { entityCtx =>
+ cartBehavior(entityCtx.entityId, tagFactory(entityCtx.entityId))
+ })
+
+ Source
+ .tick(1.second, 1.second, "checkout")
+ .mapConcat {
+ case "checkout" =>
+ val cartId = java.util.UUID.randomUUID().toString.take(5)
+ val items = randomItems()
+ val itemEvents = (0 to items).flatMap {
+ _ =>
+ val itemId = Products(Random.nextInt(Products.size))
+
+ // add the item
+ val quantity = randomQuantity()
+ val itemAdded = ItemAdded(cartId, itemId, quantity)
+
+ // make up to `MaxItemAdjusted` adjustments to quantity of
item
+ val adjustments = Random.nextInt(MaxItemsAdjusted)
+ val itemQuantityAdjusted = (0 to
adjustments).foldLeft(Seq[ItemQuantityAdjusted]()) {
+ case (events, _) =>
+ val newQuantity = randomQuantity()
+ val oldQuantity =
+ if (events.isEmpty) itemAdded.quantity
+ else events.last.newQuantity
+ events :+ ItemQuantityAdjusted(cartId, itemId,
newQuantity, oldQuantity)
+ }
+
+ // flip a coin to decide whether or not to remove the item
+ val itemRemoved =
+ if (Random.nextBoolean())
+ List(ItemRemoved(cartId, itemId,
itemQuantityAdjusted.last.newQuantity))
+ else Nil
+
+ List(itemAdded) ++ itemQuantityAdjusted ++ itemRemoved
+ }
+
+ // checkout the cart and all its preceding item events
+ itemEvents :+ CheckedOut(cartId, Instant.now())
+ }
+ // send each event to the sharded entity represented by the event's
cartId
+ .runWith(Sink.foreach(event => sharding.entityRefFor(EntityKey,
event.cartId).ref.tell(event)))
+
+ Behaviors.empty
+ }, "EventGeneratorApp", config)
/**
* Random non-zero based quantity for `ItemAdded` and `ItemQuantityAdjusted`
events
diff --git a/examples/src/test/scala/docs/guide/ShoppingCartApp.scala
b/examples/src/test/scala/docs/guide/ShoppingCartApp.scala
index af69822..d506864 100644
--- a/examples/src/test/scala/docs/guide/ShoppingCartApp.scala
+++ b/examples/src/test/scala/docs/guide/ShoppingCartApp.scala
@@ -36,17 +36,17 @@ object ShoppingCartApp extends App {
// ...
- //#guideSetup
- //#guideSourceProviderSetup
+ // #guideSetup
+ // #guideSourceProviderSetup
val sourceProvider: SourceProvider[Offset,
EventEnvelope[ShoppingCartEvents.Event]] =
EventSourcedProvider
.eventsByTag[ShoppingCartEvents.Event](
system,
readJournalPluginId = CassandraReadJournal.Identifier,
tag = ShoppingCartTags.Single)
- //#guideSourceProviderSetup
+ // #guideSourceProviderSetup
- //#guideProjectionSetup
+ // #guideProjectionSetup
implicit val ec = system.executionContext
val session =
CassandraSessionRegistry(system).sessionFor("akka.projection.cassandra.session-config")
val repo = new ItemPopularityProjectionRepositoryImpl(session)
@@ -56,9 +56,9 @@ object ShoppingCartApp extends App {
handler = () => new
ItemPopularityProjectionHandler(ShoppingCartTags.Single, system, repo))
context.spawn(ProjectionBehavior(projection), projection.projectionId.id)
- //#guideProjectionSetup
+ // #guideProjectionSetup
- //#guideSetup
+ // #guideSetup
Behaviors.empty
},
"ShoppingCartApp",
diff --git a/examples/src/test/scala/docs/jdbc/JdbcProjectionDocExample.scala
b/examples/src/test/scala/docs/jdbc/JdbcProjectionDocExample.scala
index 16cb1b2..d63430b 100644
--- a/examples/src/test/scala/docs/jdbc/JdbcProjectionDocExample.scala
+++ b/examples/src/test/scala/docs/jdbc/JdbcProjectionDocExample.scala
@@ -38,12 +38,12 @@ import org.slf4j.LoggerFactory
object JdbcProjectionDocExample {
- //#repository
+ // #repository
case class Order(id: String, time: Instant)
trait OrderRepository {
def save(connection: Connection, order: Order): Unit
}
- //#repository
+ // #repository
class OrderRepositoryImpl extends OrderRepository {
override def save(connection: Connection, order: Order): Unit = ???
@@ -67,7 +67,7 @@ object JdbcProjectionDocExample {
}
// #jdbc-session
- //#handler
+ // #handler
class ShoppingCartHandler(repository: OrderRepository)
extends JdbcHandler[EventEnvelope[ShoppingCart.Event], PlainJdbcSession]
{
private val logger = LoggerFactory.getLogger(getClass)
@@ -85,9 +85,9 @@ object JdbcProjectionDocExample {
}
}
}
- //#handler
+ // #handler
- //#grouped-handler
+ // #grouped-handler
import scala.collection.immutable
class GroupedShoppingCartHandler(repository: OrderRepository)
@@ -111,18 +111,18 @@ object JdbcProjectionDocExample {
}
}
}
- //#grouped-handler
+ // #grouped-handler
implicit val system = ActorSystem[Nothing](Behaviors.empty, "Example")
- //#sourceProvider
+ // #sourceProvider
val sourceProvider =
EventSourcedProvider
.eventsByTag[ShoppingCart.Event](system, readJournalPluginId =
JdbcReadJournal.Identifier, tag = "carts-1")
- //#sourceProvider
+ // #sourceProvider
object IllustrateExactlyOnce {
- //#exactlyOnce
+ // #exactlyOnce
val projection =
JdbcProjection
.exactlyOnce(
@@ -130,11 +130,11 @@ object JdbcProjectionDocExample {
sourceProvider,
() => new PlainJdbcSession, // JdbcSession Factory
handler = () => new ShoppingCartHandler(orderRepository))
- //#exactlyOnce
+ // #exactlyOnce
}
object IllustrateAtLeastOnce {
- //#atLeastOnce
+ // #atLeastOnce
val projection =
JdbcProjection
.atLeastOnce(
@@ -143,11 +143,11 @@ object JdbcProjectionDocExample {
() => new PlainJdbcSession, // JdbcSession Factory
handler = () => new ShoppingCartHandler(orderRepository))
.withSaveOffset(afterEnvelopes = 100, afterDuration = 500.millis)
- //#atLeastOnce
+ // #atLeastOnce
}
object IllustrateGrouped {
- //#grouped
+ // #grouped
val projection =
JdbcProjection
.groupedWithin(
@@ -156,7 +156,7 @@ object JdbcProjectionDocExample {
() => new PlainJdbcSession, // JdbcSession Factory
handler = () => new GroupedShoppingCartHandler(orderRepository))
.withGroup(groupAfterEnvelopes = 20, groupAfterDuration = 500.millis)
- //#grouped
+ // #grouped
}
}
diff --git a/examples/src/test/scala/docs/kafka/KafkaDocExample.scala
b/examples/src/test/scala/docs/kafka/KafkaDocExample.scala
index 13fb08f..e62b432 100644
--- a/examples/src/test/scala/docs/kafka/KafkaDocExample.scala
+++ b/examples/src/test/scala/docs/kafka/KafkaDocExample.scala
@@ -60,10 +60,10 @@ import akka.projection.ProjectionContext
object KafkaDocExample {
- //#wordSource
+ // #wordSource
type Word = String
type Count = Int
- //#wordSource
+ // #wordSource
class WordCountHandler(projectionId: ProjectionId)
extends JdbcHandler[ConsumerRecord[String, String],
HibernateJdbcSession] {
@@ -84,7 +84,7 @@ object KafkaDocExample {
}
}
- //#wordSource
+ // #wordSource
final case class WordEnvelope(offset: Long, word: Word)
class WordSource(implicit ec: ExecutionContext) extends SourceProvider[Long,
WordEnvelope] {
@@ -105,9 +105,9 @@ object KafkaDocExample {
override def extractCreationTime(env: WordEnvelope): Long = 0L
}
- //#wordSource
+ // #wordSource
- //#wordPublisher
+ // #wordPublisher
class WordPublisher(topic: String, sendProducer: SendProducer[String,
String])(implicit ec: ExecutionContext)
extends Handler[WordEnvelope] {
private val logger = LoggerFactory.getLogger(getClass)
@@ -125,7 +125,7 @@ object KafkaDocExample {
result
}
}
- //#wordPublisher
+ // #wordPublisher
val config: Config = ConfigFactory.parseString("""
akka.projection.jdbc {
@@ -152,7 +152,7 @@ object KafkaDocExample {
object IllustrateSourceProvider {
- //#sourceProvider
+ // #sourceProvider
val bootstrapServers = "localhost:9092"
val groupId = "group-wordcount"
val topicName = "words"
@@ -164,14 +164,14 @@ object KafkaDocExample {
val sourceProvider: SourceProvider[MergeableOffset[JLong],
ConsumerRecord[String, String]] =
KafkaSourceProvider(system, consumerSettings, Set(topicName))
- //#sourceProvider
+ // #sourceProvider
}
object IllustrateExactlyOnce {
import IllustrateSourceProvider._
val wordRepository: WordRepository = null
- //#exactlyOnce
+ // #exactlyOnce
val sessionProvider = new HibernateSessionFactory
val projectionId = ProjectionId("WordCount", "wordcount-1")
@@ -181,7 +181,7 @@ object KafkaDocExample {
sourceProvider,
() => sessionProvider.newInstance(),
handler = () => new WordCountJdbcHandler(wordRepository))
- //#exactlyOnce
+ // #exactlyOnce
// #exactly-once-jdbc-handler
class WordCountJdbcHandler(val wordRepository: WordRepository)
@@ -208,7 +208,7 @@ object KafkaDocExample {
implicit val ec = system.executionContext
- //#sendProducer
+ // #sendProducer
val bootstrapServers = "localhost:9092"
val topicName = "words"
private val producerSettings =
@@ -216,9 +216,9 @@ object KafkaDocExample {
.withBootstrapServers(bootstrapServers)
import akka.actor.typed.scaladsl.adapter._ // FIXME might not be needed in
later Alpakka Kafka version?
private val sendProducer = SendProducer(producerSettings)(system.toClassic)
- //#sendProducer
+ // #sendProducer
- //#sendToKafkaProjection
+ // #sendToKafkaProjection
val sourceProvider = new WordSource
val sessionProvider = new HibernateSessionFactory
@@ -231,7 +231,7 @@ object KafkaDocExample {
() => sessionProvider.newInstance(),
handler = () => new WordPublisher(topicName, sendProducer))
- //#sendToKafkaProjection
+ // #sendToKafkaProjection
}
@@ -239,7 +239,7 @@ object KafkaDocExample {
implicit val ec = system.executionContext
- //#producerFlow
+ // #producerFlow
val bootstrapServers = "localhost:9092"
val topicName = "words"
@@ -252,9 +252,9 @@ object KafkaDocExample {
.map(wordEnv => ProducerMessage.single(new ProducerRecord(topicName,
wordEnv.word, wordEnv.word)))
.via(Producer.flowWithContext(producerSettings))
.map(_ => Done)
- //#producerFlow
+ // #producerFlow
- //#sendToKafkaProjectionFlow
+ // #sendToKafkaProjectionFlow
val sourceProvider = new WordSource
val sessionProvider = new HibernateSessionFactory
@@ -262,7 +262,7 @@ object KafkaDocExample {
val projection =
JdbcProjection
.atLeastOnceFlow(projectionId, sourceProvider, () =>
sessionProvider.newInstance(), producerFlow)
- //#sendToKafkaProjectionFlow
+ // #sendToKafkaProjectionFlow
}
diff --git a/examples/src/test/scala/docs/slick/SlickProjectionDocExample.scala
b/examples/src/test/scala/docs/slick/SlickProjectionDocExample.scala
index 1bfe557..bb52da8 100644
--- a/examples/src/test/scala/docs/slick/SlickProjectionDocExample.scala
+++ b/examples/src/test/scala/docs/slick/SlickProjectionDocExample.scala
@@ -36,7 +36,7 @@ import org.slf4j.LoggerFactory
class SlickProjectionDocExample {
- //#repository
+ // #repository
case class Order(id: String, time: Instant)
class OrderRepository(val dbConfig: DatabaseConfig[H2Profile]) {
@@ -60,9 +60,9 @@ class SlickProjectionDocExample {
def createTable(): Future[Unit] =
dbConfig.db.run(ordersTable.schema.createIfNotExists)
}
- //#repository
+ // #repository
- //#handler
+ // #handler
class ShoppingCartHandler(repository: OrderRepository)(implicit ec:
ExecutionContext)
extends SlickHandler[EventEnvelope[ShoppingCart.Event]] {
private val logger = LoggerFactory.getLogger(getClass)
@@ -79,9 +79,9 @@ class SlickProjectionDocExample {
}
}
}
- //#handler
+ // #handler
- //#grouped-handler
+ // #grouped-handler
import scala.collection.immutable
class GroupedShoppingCartHandler(repository: OrderRepository)(implicit ec:
ExecutionContext)
@@ -101,24 +101,24 @@ class SlickProjectionDocExample {
DBIO.sequence(dbios).map(_ => Done)
}
}
- //#grouped-handler
+ // #grouped-handler
implicit val system = ActorSystem[Nothing](Behaviors.empty, "Example")
- //#db-config
+ // #db-config
val dbConfig: DatabaseConfig[H2Profile] =
DatabaseConfig.forConfig("akka.projection.slick", system.settings.config)
val repository = new OrderRepository(dbConfig)
- //#db-config
+ // #db-config
- //#sourceProvider
+ // #sourceProvider
val sourceProvider =
EventSourcedProvider
.eventsByTag[ShoppingCart.Event](system, readJournalPluginId =
JdbcReadJournal.Identifier, tag = "carts-1")
- //#sourceProvider
+ // #sourceProvider
object IllustrateExactlyOnce {
- //#exactlyOnce
+ // #exactlyOnce
implicit val ec = system.executionContext
val projection =
@@ -127,11 +127,11 @@ class SlickProjectionDocExample {
sourceProvider,
dbConfig,
handler = () => new ShoppingCartHandler(repository))
- //#exactlyOnce
+ // #exactlyOnce
}
object IllustrateAtLeastOnce {
- //#atLeastOnce
+ // #atLeastOnce
implicit val ec = system.executionContext
val projection =
@@ -142,11 +142,11 @@ class SlickProjectionDocExample {
dbConfig,
handler = () => new ShoppingCartHandler(repository))
.withSaveOffset(afterEnvelopes = 100, afterDuration = 500.millis)
- //#atLeastOnce
+ // #atLeastOnce
}
object IllustrateGrouped {
- //#grouped
+ // #grouped
implicit val ec = system.executionContext
val projection =
@@ -157,7 +157,7 @@ class SlickProjectionDocExample {
dbConfig,
handler = () => new GroupedShoppingCartHandler(repository))
.withGroup(groupAfterEnvelopes = 20, groupAfterDuration = 500.millis)
- //#grouped
+ // #grouped
}
}
diff --git
a/examples/src/test/scala/docs/state/DurableStateStoreDocExample.scala
b/examples/src/test/scala/docs/state/DurableStateStoreDocExample.scala
index 248d0d7..ab3941c 100644
--- a/examples/src/test/scala/docs/state/DurableStateStoreDocExample.scala
+++ b/examples/src/test/scala/docs/state/DurableStateStoreDocExample.scala
@@ -12,7 +12,7 @@ object DurableStateStoreDocExample {
private val system = ActorSystem[Nothing](Behaviors.empty, "Example")
object IllustrateEventsByTagSourceProvider {
- //#changesByTagSourceProvider
+ // #changesByTagSourceProvider
import akka.persistence.jdbc.state.scaladsl.JdbcDurableStateStore
import akka.persistence.query.DurableStateChange
import akka.persistence.query.Offset
@@ -22,7 +22,7 @@ object DurableStateStoreDocExample {
val sourceProvider: SourceProvider[Offset,
DurableStateChange[AccountEntity.Account]] =
DurableStateSourceProvider
.changesByTag[AccountEntity.Account](system,
JdbcDurableStateStore.Identifier, "bank-accounts-1")
- //#changesByTagSourceProvider
+ // #changesByTagSourceProvider
}
object IllustrateEventsBySlicesSourceProvider {
@@ -30,7 +30,7 @@ object DurableStateStoreDocExample {
val Identifier = "akka.persistence.r2dbc.query"
}
- //#changesBySlicesSourceProvider
+ // #changesBySlicesSourceProvider
import akka.persistence.query.DurableStateChange
import akka.persistence.query.Offset
import akka.projection.state.scaladsl.DurableStateSourceProvider
@@ -54,6 +54,6 @@ object DurableStateStoreDocExample {
entityType,
minSlice,
maxSlice)
- //#changesBySlicesSourceProvider
+ // #changesBySlicesSourceProvider
}
}
diff --git a/examples/src/test/scala/docs/testkit/TestKitDocExample.scala
b/examples/src/test/scala/docs/testkit/TestKitDocExample.scala
index 7504061..3063067 100644
--- a/examples/src/test/scala/docs/testkit/TestKitDocExample.scala
+++ b/examples/src/test/scala/docs/testkit/TestKitDocExample.scala
@@ -35,7 +35,7 @@ abstract
class TestKitDocExample extends ScalaTestWithActorTestKit {
private val projectionTestKit = ProjectionTestKit(system)
- //#testkit
+ // #testkit
case class CartView(id: String)
@@ -49,24 +49,24 @@ class TestKitDocExample extends ScalaTestWithActorTestKit {
val projection = TestProjection(ProjectionId("test", "00"), sourceProvider =
null, handler = null)
{
- //#testkit-run
+ // #testkit-run
projectionTestKit.run(projection) {
// confirm that cart checkout was inserted in db
cartViewRepository.findById("abc-def").futureValue
}
- //#testkit-run
+ // #testkit-run
}
{
- //#testkit-run-max-interval
+ // #testkit-run-max-interval
projectionTestKit.run(projection, max = 5.seconds, interval = 300.millis) {
// confirm that cart checkout was inserted in db
cartViewRepository.findById("abc-def").futureValue
}
- //#testkit-run-max-interval
+ // #testkit-run-max-interval
}
- //#testkit-sink-probe
+ // #testkit-sink-probe
projectionTestKit.runWithTestSink(projection) { sinkProbe =>
sinkProbe.request(1)
sinkProbe.expectNext(Done)
@@ -75,12 +75,12 @@ class TestKitDocExample extends ScalaTestWithActorTestKit {
// confirm that cart checkout was inserted in db
cartViewRepository.findById("abc-def").futureValue
- //#testkit-sink-probe
+ // #testkit-sink-probe
{
val handler: Handler[(Int, String)] = null
- //#testkit-testprojection
+ // #testkit-testprojection
val testData = Source((0, "abc") :: (1, "def") :: Nil)
val extractOffset = (envelope: (Int, String)) => envelope._1
@@ -92,7 +92,7 @@ class TestKitDocExample extends ScalaTestWithActorTestKit {
projectionTestKit.run(projection) {
// assert logic ..
}
- //#testkit-testprojection
+ // #testkit-testprojection
}
//#testkit
}
diff --git a/project/AkkaDisciplinePlugin.scala
b/project/AkkaDisciplinePlugin.scala
index 0c55bfc..ad69cdd 100644
--- a/project/AkkaDisciplinePlugin.scala
+++ b/project/AkkaDisciplinePlugin.scala
@@ -24,24 +24,24 @@ object AkkaDisciplinePlugin extends AutoPlugin {
if (enabled) {
Seq(
Compile / scalacOptions ++= (
- if (!nonFatalWarningsFor(name.value)) Seq("-Xfatal-warnings")
- else Seq.empty
- ),
+ if (!nonFatalWarningsFor(name.value)) Seq("-Xfatal-warnings")
+ else Seq.empty
+ ),
Test / scalacOptions --= testUndicipline,
Compile / scalacOptions ++=
(CrossVersion.partialVersion(scalaVersion.value) match {
- case Some((2, 13)) =>
- disciplineScalacOptions -- Set(
- "-Ywarn-inaccessible",
- "-Ywarn-infer-any",
- "-Ywarn-nullary-override",
- "-Ywarn-nullary-unit",
- "-Ypartial-unification",
- "-Yno-adapted-args")
- case Some((2, 12)) =>
- disciplineScalacOptions
- case _ =>
- Nil
- }).toSeq,
+ case Some((2, 13)) =>
+ disciplineScalacOptions -- Set(
+ "-Ywarn-inaccessible",
+ "-Ywarn-infer-any",
+ "-Ywarn-nullary-override",
+ "-Ywarn-nullary-unit",
+ "-Ypartial-unification",
+ "-Yno-adapted-args")
+ case Some((2, 12)) =>
+ disciplineScalacOptions
+ case _ =>
+ Nil
+ }).toSeq,
// Discipline is not needed for the docs compilation run (which uses
// different compiler phases from the regular run), and in particular
// '-Ywarn-unused:explicits' breaks 'sbt ++2.13.0-M5 akka-actor/doc'
@@ -61,7 +61,7 @@ object AkkaDisciplinePlugin extends AutoPlugin {
*/
val undisciplineScalacOptions = Set()
- /** These options are desired, but some are excluded for the time being*/
+ /** These options are desired, but some are excluded for the time being */
val disciplineScalacOptions = Set(
"-Xfatal-warnings",
"-feature",
diff --git a/project/Common.scala b/project/Common.scala
index d0c7bc3..b45e105 100644
--- a/project/Common.scala
+++ b/project/Common.scala
@@ -23,14 +23,14 @@ object Common extends AutoPlugin {
homepage := Some(url("https://pekko.apache.org/")),
// apiURL defined in projectSettings because version.value is not
correct here
scmInfo := Some(
- ScmInfo(
- url("https://github.com/apache/incubator-pekko-projection"),
- "[email protected]:apache/incubator-pekko-projection.git")),
+ ScmInfo(
+ url("https://github.com/apache/incubator-pekko-projection"),
+ "[email protected]:apache/incubator-pekko-projection.git")),
developers += Developer(
- "contributors",
- "Contributors",
- "[email protected]",
-
url("https://github.com/apache/incubator-pekko-projection/graphs/contributors")),
+ "contributors",
+ "Contributors",
+ "[email protected]",
+
url("https://github.com/apache/incubator-pekko-projection/graphs/contributors")),
licenses := Seq(("Apache-2.0",
url("https://www.apache.org/licenses/LICENSE-2.0"))),
description := "Apache Pekko Projection.")
@@ -41,19 +41,19 @@ object Common extends AutoPlugin {
scalaVersion := Dependencies.Scala213,
javacOptions ++= List("-Xlint:unchecked", "-Xlint:deprecation"),
Compile / doc / scalacOptions := scalacOptions.value ++ Seq(
- "-doc-title",
- "Apache Pekko Projection",
- "-doc-version",
- version.value,
- "-sourcepath",
- (baseDirectory in ThisBuild).value.toString,
- "-doc-source-url", {
- val branch = if (isSnapshot.value) "main" else s"v${version.value}"
-
s"https://github.com/apache/incubator-pekko-projection/tree/${branch}€{FILE_PATH_EXT}#L€{FILE_LINE}"
- },
- "-skip-packages",
- "akka.pattern" // for some reason Scaladoc creates this
- ),
+ "-doc-title",
+ "Apache Pekko Projection",
+ "-doc-version",
+ version.value,
+ "-sourcepath",
+ (baseDirectory in ThisBuild).value.toString,
+ "-doc-source-url", {
+ val branch = if (isSnapshot.value) "main" else s"v${version.value}"
+
s"https://github.com/apache/incubator-pekko-projection/tree/${branch}€{FILE_PATH_EXT}#L€{FILE_LINE}"
+ },
+ "-skip-packages",
+ "akka.pattern" // for some reason Scaladoc creates this
+ ),
autoAPIMappings := true,
apiURL :=
Some(url(s"https://doc.akka.io/api/akka-projection/${projectInfoVersion.value}")),
// show full stack traces and test case durations
diff --git a/project/CopyrightHeader.scala b/project/CopyrightHeader.scala
index b18d9f5..78595a4 100644
--- a/project/CopyrightHeader.scala
+++ b/project/CopyrightHeader.scala
@@ -17,8 +17,8 @@ object CopyrightHeader extends AutoPlugin {
Seq(
headerLicense := Some(HeaderLicense.Custom(headerFor(CurrentYear))),
headerMappings := headerMappings.value ++ Map(
- HeaderFileType.scala -> cStyleComment,
- HeaderFileType.java -> cStyleComment)))
+ HeaderFileType.scala -> cStyleComment,
+ HeaderFileType.java -> cStyleComment)))
})
// We hard-code this so PR's created in year X will not suddenly fail in X+1.
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 5fa61f0..93c7ebf 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -100,34 +100,34 @@ object Dependencies {
val core =
deps ++= Seq(
- Compile.akkaStream,
- Compile.akkaActorTyped,
- Compile.akkaProtobufV3,
- // akka-persistence-query is only needed for OffsetSerialization and
to provide a typed EventEnvelope that
- // references the Offset type from akka-persistence.
- Compile.akkaPersistenceQuery,
- Test.akkaTypedTestkit,
- Test.logback,
- Test.scalatest)
+ Compile.akkaStream,
+ Compile.akkaActorTyped,
+ Compile.akkaProtobufV3,
+ // akka-persistence-query is only needed for OffsetSerialization and to
provide a typed EventEnvelope that
+ // references the Offset type from akka-persistence.
+ Compile.akkaPersistenceQuery,
+ Test.akkaTypedTestkit,
+ Test.logback,
+ Test.scalatest)
val coreTest =
deps ++= Seq(
- Test.akkaTypedTestkit,
- Test.akkaStreamTestkit,
- Test.scalatest,
- Test.scalatestJUnit,
- Test.junit,
- Test.logback)
+ Test.akkaTypedTestkit,
+ Test.akkaStreamTestkit,
+ Test.scalatest,
+ Test.scalatestJUnit,
+ Test.junit,
+ Test.logback)
val testKit =
deps ++= Seq(
- Compile.akkaTypedTestkit,
- Compile.akkaStreamTestkit,
- Compile.collectionCompat,
- Test.scalatest,
- Test.scalatestJUnit,
- Test.junit,
- Test.logback)
+ Compile.akkaTypedTestkit,
+ Compile.akkaStreamTestkit,
+ Compile.collectionCompat,
+ Test.scalatest,
+ Test.scalatestJUnit,
+ Test.junit,
+ Test.logback)
val eventsourced =
deps ++= Seq(Compile.akkaPersistenceQuery)
@@ -137,65 +137,65 @@ object Dependencies {
val jdbc =
deps ++= Seq(
- Compile.akkaPersistenceQuery,
- Test.akkaTypedTestkit,
- Test.h2Driver,
- Test.postgresDriver,
- Test.postgresContainer,
- Test.mysqlDriver,
- Test.mysqlContainer,
- Test.msSQLServerDriver,
- Test.msSQLServerContainer,
- Test.oracleDriver,
- Test.oracleDbContainer,
- Test.logback)
+ Compile.akkaPersistenceQuery,
+ Test.akkaTypedTestkit,
+ Test.h2Driver,
+ Test.postgresDriver,
+ Test.postgresContainer,
+ Test.mysqlDriver,
+ Test.mysqlContainer,
+ Test.msSQLServerDriver,
+ Test.msSQLServerContainer,
+ Test.oracleDriver,
+ Test.oracleDbContainer,
+ Test.logback)
val slick =
deps ++= Seq(
- Compile.slick,
- Compile.akkaPersistenceQuery,
- Test.akkaTypedTestkit,
- Test.h2Driver,
- Test.postgresDriver,
- Test.postgresContainer,
- Test.mysqlDriver,
- Test.mysqlContainer,
- Test.msSQLServerDriver,
- Test.msSQLServerContainer,
- Test.oracleDriver,
- Test.oracleDbContainer,
- Test.logback)
+ Compile.slick,
+ Compile.akkaPersistenceQuery,
+ Test.akkaTypedTestkit,
+ Test.h2Driver,
+ Test.postgresDriver,
+ Test.postgresContainer,
+ Test.mysqlDriver,
+ Test.mysqlContainer,
+ Test.msSQLServerDriver,
+ Test.msSQLServerContainer,
+ Test.oracleDriver,
+ Test.oracleDbContainer,
+ Test.logback)
val cassandra =
deps ++= Seq(
- Compile.alpakkaCassandra,
- Compile.akkaPersistenceQuery,
- Test.akkaTypedTestkit,
- Test.logback,
- Test.cassandraContainer,
- Test.scalatestJUnit)
+ Compile.alpakkaCassandra,
+ Compile.akkaPersistenceQuery,
+ Test.akkaTypedTestkit,
+ Test.logback,
+ Test.cassandraContainer,
+ Test.scalatestJUnit)
val kafka =
deps ++= Seq(
- Compile.alpakkaKafka,
- Compile.jackson,
- Test.scalatest,
- Test.akkaTypedTestkit,
- Test.akkaStreamTestkit,
- Test.alpakkaKafkaTestkit,
- Test.logback,
- Test.scalatestJUnit)
+ Compile.alpakkaKafka,
+ Compile.jackson,
+ Test.scalatest,
+ Test.akkaTypedTestkit,
+ Test.akkaStreamTestkit,
+ Test.alpakkaKafkaTestkit,
+ Test.logback,
+ Test.scalatestJUnit)
val examples =
deps ++= Seq(
- Examples.akkaPersistenceTyped,
- Examples.akkaClusterShardingTyped,
- Examples.akkaPersistenceCassandra,
- Examples.akkaPersistenceJdbc,
- Examples.akkaSerializationJackson,
- Examples.hibernate,
- Test.h2Driver,
- Test.akkaTypedTestkit,
- Test.logback,
- Test.cassandraContainer)
+ Examples.akkaPersistenceTyped,
+ Examples.akkaClusterShardingTyped,
+ Examples.akkaPersistenceCassandra,
+ Examples.akkaPersistenceJdbc,
+ Examples.akkaSerializationJackson,
+ Examples.hibernate,
+ Test.h2Driver,
+ Test.akkaTypedTestkit,
+ Test.logback,
+ Test.cassandraContainer)
}
diff --git a/project/Protobuf.scala b/project/Protobuf.scala
index 355434a..8fbf457 100644
--- a/project/Protobuf.scala
+++ b/project/Protobuf.scala
@@ -47,7 +47,7 @@ object Protobuf {
val targets = target.value
val cache = targets / "protoc" / "cache"
- (sourceDirs.zip(targetDirs)).map {
+ sourceDirs.zip(targetDirs).map {
case (src, dst) =>
val relative = src
.relativeTo(sources)
@@ -62,8 +62,8 @@ object Protobuf {
_ => true,
transformFile(
_.replace("com.google.protobuf", "akka.protobufv3.internal")
- // this is the one thing that protobufGenerate doesn't fully
qualify and causes
- // api doc generation to fail
+ // this is the one thing that protobufGenerate doesn't fully
qualify and causes
+ // api doc generation to fail
.replace(
"UnusedPrivateParameter",
"akka.protobufv3.internal.GeneratedMessageV3.UnusedPrivateParameter")),
@@ -83,9 +83,10 @@ object Protobuf {
}
private def checkProtocVersion(protoc: String, protocVersion: String, log:
Logger): Unit = {
- val res = callProtoc(protoc, Seq("--version"), log, { (p, l) =>
- p !! l
- })
+ val res = callProtoc(protoc, Seq("--version"), log,
+ { (p, l) =>
+ p !! l
+ })
val version = res.split(" ").last.trim
if (version != protocVersion) {
sys.error("Wrong protoc version! Expected %s but got
%s".format(protocVersion, version))
@@ -114,7 +115,8 @@ object Protobuf {
protoc,
Seq("-I" + srcDir.absolutePath,
"--java_out=%s".format(targetDir.absolutePath)) ++
protoPathArg ++ protoFiles.map(_.absolutePath),
- log, { (p, l) =>
+ log,
+ { (p, l) =>
p ! l
})
if (exitCode != 0)
@@ -151,7 +153,7 @@ object Protobuf {
updated
} else Set.empty
}
- val sources = (sourceDir.allPaths).get.toSet
+ val sources = sourceDir.allPaths.get.toSet
runTransform(sources)
targetDir
}
diff --git
a/projection-cassandra/src/it/scala/akka/projection/cassandra/CassandraOffsetStoreSpec.scala
b/projection-cassandra/src/it/scala/akka/projection/cassandra/CassandraOffsetStoreSpec.scala
index 003732b..65716d6 100644
---
a/projection-cassandra/src/it/scala/akka/projection/cassandra/CassandraOffsetStoreSpec.scala
+++
b/projection-cassandra/src/it/scala/akka/projection/cassandra/CassandraOffsetStoreSpec.scala
@@ -191,7 +191,7 @@ class CassandraOffsetStoreSpec
val projectionId = ProjectionId("projection-with-akka-seq", "00")
- val timeOffset =
TimeBasedUUID(UUID.fromString("49225740-2019-11ea-a752-ffae2393b6e4"))
//2019-12-16T15:32:36.148Z[UTC]
+ val timeOffset =
TimeBasedUUID(UUID.fromString("49225740-2019-11ea-a752-ffae2393b6e4")) //
2019-12-16T15:32:36.148Z[UTC]
withClue("check - save offset") {
offsetStore.saveOffset(projectionId, timeOffset).futureValue
}
diff --git
a/projection-cassandra/src/it/scala/akka/projection/cassandra/CassandraProjectionSpec.scala
b/projection-cassandra/src/it/scala/akka/projection/cassandra/CassandraProjectionSpec.scala
index 3d51d6a..1a73d95 100644
---
a/projection-cassandra/src/it/scala/akka/projection/cassandra/CassandraProjectionSpec.scala
+++
b/projection-cassandra/src/it/scala/akka/projection/cassandra/CassandraProjectionSpec.scala
@@ -165,13 +165,13 @@ class CassandraProjectionSpec
override protected def afterAll(): Unit = {
Await.ready(for {
- s <- session.underlying()
- // reason for setSchemaMetadataEnabled is that it speed up tests
- _ <- s.setSchemaMetadataEnabled(false).toScala
- _ <- session.executeDDL(s"DROP keyspace ${offsetStore.keyspace}")
- _ <- session.executeDDL(s"DROP keyspace ${repository.keyspace}")
- _ <- s.setSchemaMetadataEnabled(null).toScala
- } yield Done, 30.seconds)
+ s <- session.underlying()
+ // reason for setSchemaMetadataEnabled is that it speed up tests
+ _ <- s.setSchemaMetadataEnabled(false).toScala
+ _ <- session.executeDDL(s"DROP keyspace ${offsetStore.keyspace}")
+ _ <- session.executeDDL(s"DROP keyspace ${repository.keyspace}")
+ _ <- s.setSchemaMetadataEnabled(null).toScala
+ } yield Done, 30.seconds)
super.afterAll()
}
@@ -384,7 +384,6 @@ class CassandraProjectionSpec
.withSaveOffset(10, 2.seconds)
projectionTestKit.runWithTestSink(projection) { sinkProbe =>
-
eventually {
sourceProbe.get should not be null
}
diff --git
a/projection-core-test/src/test/scala/akka/projection/ProjectionBehaviorSpec.scala
b/projection-core-test/src/test/scala/akka/projection/ProjectionBehaviorSpec.scala
index 80c8003..1061941 100644
---
a/projection-core-test/src/test/scala/akka/projection/ProjectionBehaviorSpec.scala
+++
b/projection-core-test/src/test/scala/akka/projection/ProjectionBehaviorSpec.scala
@@ -451,10 +451,11 @@ class ProjectionBehaviorSpec extends
ScalaTestWithActorTestKit("""
"work with ProjectionManagement extension" in {
val projectionId1 = ProjectionId("test-projection-ext", "1")
- val (testProbe1, _, srcRef1) = setupTestProjection(projectionId1,
earlyMgmtCommand = () => {
- // immediate request should work (via retries)
- ProjectionManagement(system).getOffset[Int](projectionId1).futureValue
shouldBe None
- })
+ val (testProbe1, _, srcRef1) = setupTestProjection(projectionId1,
+ earlyMgmtCommand = () => {
+ // immediate request should work (via retries)
+
ProjectionManagement(system).getOffset[Int](projectionId1).futureValue shouldBe
None
+ })
val projectionId2 = ProjectionId("test-projection-ext", "2")
val (testProbe2, _, srcRef2) = setupTestProjection(projectionId2)
diff --git
a/projection-core-test/src/test/scala/akka/projection/internal/OffsetSerializationSpec.scala
b/projection-core-test/src/test/scala/akka/projection/internal/OffsetSerializationSpec.scala
index 2319a3d..bbdacf2 100644
---
a/projection-core-test/src/test/scala/akka/projection/internal/OffsetSerializationSpec.scala
+++
b/projection-core-test/src/test/scala/akka/projection/internal/OffsetSerializationSpec.scala
@@ -101,7 +101,7 @@ class OffsetSerializationSpec
}
"convert offsets of type akka.persistence.query.TimeBasedUUID" in {
- //2019-12-16T15:32:36.148Z[UTC]
+ // 2019-12-16T15:32:36.148Z[UTC]
val uuidString = "49225740-2019-11ea-a752-ffae2393b6e4"
val timeOffset = query.TimeBasedUUID(UUID.fromString(uuidString))
toStorageRepresentation(id, timeOffset) shouldBe SingleOffset(id,
TimeBasedUUIDManifest, uuidString)
diff --git
a/projection-core-test/src/test/scala/akka/projection/internal/TelemetryProviderSpec.scala
b/projection-core-test/src/test/scala/akka/projection/internal/TelemetryProviderSpec.scala
index 92b5187..2443dca 100644
---
a/projection-core-test/src/test/scala/akka/projection/internal/TelemetryProviderSpec.scala
+++
b/projection-core-test/src/test/scala/akka/projection/internal/TelemetryProviderSpec.scala
@@ -14,7 +14,6 @@ import akka.projection.internal.metrics.tools.InMemTelemetry
import org.scalatest.wordspec.AnyWordSpecLike
/**
- *
*/
object TelemetryProviderSpec {
val projectionId = ProjectionId("TelemetryProviderSpec-projection", "noKey")
diff --git
a/projection-core-test/src/test/scala/akka/projection/internal/metrics/ServiceTimeMetricSpec.scala
b/projection-core-test/src/test/scala/akka/projection/internal/metrics/ServiceTimeMetricSpec.scala
index 0b4ca37..5d6e0a1 100644
---
a/projection-core-test/src/test/scala/akka/projection/internal/metrics/ServiceTimeMetricSpec.scala
+++
b/projection-core-test/src/test/scala/akka/projection/internal/metrics/ServiceTimeMetricSpec.scala
@@ -43,7 +43,7 @@ class ServiceTimeAndProcessingCountMetricAtLeastOnceSpec
extends ServiceTimeAndP
runInternal(tt.projectionState) {
instruments.afterProcessInvocations.get should
be(defaultNumberOfEnvelopes)
- instruments.lastServiceTimeInNanos.get() should be > (0L)
+ instruments.lastServiceTimeInNanos.get() should be > 0L
}
}
"reports measures for all envelopes (with afterEnvelops optimization)"
in {
@@ -53,7 +53,7 @@ class ServiceTimeAndProcessingCountMetricAtLeastOnceSpec
extends ServiceTimeAndP
runInternal(tt.projectionState) {
// afterProcess invocations happen per envelope (not in a
groupWithin!)
instruments.afterProcessInvocations.get should
be(defaultNumberOfEnvelopes)
- instruments.lastServiceTimeInNanos.get() should be > (0L)
+ instruments.lastServiceTimeInNanos.get() should be > 0L
}
}
@@ -183,7 +183,7 @@ class ServiceTimeAndProcessingCountMetricAtLeastOnceSpec
extends ServiceTimeAndP
// previous envelope). As a consequence, the
`afterProcessInvocations` count is
// nondeterministic and we can only assert it'll be some value
between 8 and 10 (both
// included)
- instruments.afterProcessInvocations.get should be >= (8)
+ instruments.afterProcessInvocations.get should be >= 8
}
}
}
diff --git
a/projection-core-test/src/test/scala/akka/projection/internal/metrics/tools/InMemTelemetry.scala
b/projection-core-test/src/test/scala/akka/projection/internal/metrics/tools/InMemTelemetry.scala
index d923e5f..d4b1ac2 100644
---
a/projection-core-test/src/test/scala/akka/projection/internal/metrics/tools/InMemTelemetry.scala
+++
b/projection-core-test/src/test/scala/akka/projection/internal/metrics/tools/InMemTelemetry.scala
@@ -19,7 +19,6 @@ import akka.projection.ProjectionId
import akka.projection.internal.Telemetry
/**
- *
*/
class InMemTelemetry(projectionId: ProjectionId, system: ActorSystem[_])
extends Telemetry {
private val instruments: InMemInstruments =
InMemInstrumentsRegistry(system).forId(projectionId)
@@ -70,9 +69,10 @@ object InMemInstrumentsRegistry extends
ExtensionId[InMemInstrumentsRegistry] {
class InMemInstrumentsRegistry(system: ActorSystem[_]) extends Extension {
private val instrumentMap = new ConcurrentHashMap[ProjectionId,
InMemInstruments]()
def forId(projectionId: ProjectionId): InMemInstruments = {
- instrumentMap.computeIfAbsent(projectionId, new
function.Function[ProjectionId, InMemInstruments] {
- override def apply(t: ProjectionId): InMemInstruments = new
InMemInstruments
- })
+ instrumentMap.computeIfAbsent(projectionId,
+ new function.Function[ProjectionId, InMemInstruments] {
+ override def apply(t: ProjectionId): InMemInstruments = new
InMemInstruments
+ })
}
// these are added to use the constructor argument and keep the
AkkaDisciplinePlugin happy
diff --git
a/projection-core-test/src/test/scala/akka/projection/internal/metrics/tools/TestHandlers.scala
b/projection-core-test/src/test/scala/akka/projection/internal/metrics/tools/TestHandlers.scala
index 03b0db6..3176506 100644
---
a/projection-core-test/src/test/scala/akka/projection/internal/metrics/tools/TestHandlers.scala
+++
b/projection-core-test/src/test/scala/akka/projection/internal/metrics/tools/TestHandlers.scala
@@ -14,7 +14,6 @@ import akka.projection.scaladsl.Handler
import akka.stream.scaladsl.FlowWithContext
/**
- *
*/
object TestHandlers {
@@ -46,7 +45,7 @@ object TestHandlers {
new Handler[Envelope] {
override def process(envelope: Envelope): Future[Done] = {
nextProcessStrategy match {
- case SomeFailures(nextFail :: tail) if (nextFail ==
envelope.offset) =>
+ case SomeFailures(nextFail :: tail) if nextFail == envelope.offset
=>
nextProcessStrategy = SomeFailures(tail)
throw TelemetryException
case _ => Future.successful(Done)
@@ -71,11 +70,11 @@ object TestHandlers {
override def process(envelopes: immutable.Seq[Envelope]): Future[Done]
= {
nextProcessStrategy match {
case SomeFailures(nextFail :: tail)
- if (envelopes
+ if envelopes
.map {
_.offset
}
- .contains(nextFail)) =>
+ .contains(nextFail) =>
nextProcessStrategy = SomeFailures(tail)
Future.failed(TelemetryException)
case _ =>
@@ -99,7 +98,7 @@ object TestHandlers {
FlowWithContext[Envelope, ProjectionContext]
.map { envelope =>
nextProcessStrategy match {
- case SomeFailures(nextFail :: tail) if (envelope.offset == nextFail)
=>
+ case SomeFailures(nextFail :: tail) if envelope.offset == nextFail =>
nextProcessStrategy = SomeFailures(tail)
throw TelemetryException
case _ =>
diff --git a/projection-core/src/main/scala/akka/projection/Projection.scala
b/projection-core/src/main/scala/akka/projection/Projection.scala
index 7faaafe..05639bb 100644
--- a/projection-core/src/main/scala/akka/projection/Projection.scala
+++ b/projection-core/src/main/scala/akka/projection/Projection.scala
@@ -106,9 +106,10 @@ private[projection] object RunningProjection {
RestartSource
.onFailuresWithBackoff(settings.restartBackoff) { () =>
source()
- .recoverWithRetries(1, {
- case AbortProjectionException => Source.empty // don't restart
- })
+ .recoverWithRetries(1,
+ {
+ case AbortProjectionException => Source.empty // don't restart
+ })
}
}
diff --git
a/projection-core/src/main/scala/akka/projection/ProjectionContext.scala
b/projection-core/src/main/scala/akka/projection/ProjectionContext.scala
index 43f224a..c045938 100644
--- a/projection-core/src/main/scala/akka/projection/ProjectionContext.scala
+++ b/projection-core/src/main/scala/akka/projection/ProjectionContext.scala
@@ -5,6 +5,5 @@
package akka.projection
/**
- *
*/
trait ProjectionContext
diff --git
a/projection-core/src/main/scala/akka/projection/internal/SourceProviderAdapter.scala
b/projection-core/src/main/scala/akka/projection/internal/SourceProviderAdapter.scala
index 94987ae..29519e5 100644
---
a/projection-core/src/main/scala/akka/projection/internal/SourceProviderAdapter.scala
+++
b/projection-core/src/main/scala/akka/projection/internal/SourceProviderAdapter.scala
@@ -26,8 +26,8 @@ import akka.stream.scaladsl.Source
extends scaladsl.SourceProvider[Offset, Envelope] {
def source(offset: () => Future[Option[Offset]]): Future[Source[Envelope,
NotUsed]] = {
- // the parasitic context is used to convert the Optional to Option and a
java streams Source to a scala Source,
- // it _should_ not be used for the blocking operation of getting offsets
themselves
+ // the parasitic context is used to convert the Optional to Option and a
java streams Source to a scala Source,
+ // it _should_ not be used for the blocking operation of getting offsets
themselves
val ec = akka.dispatch.ExecutionContexts.parasitic
val offsetAdapter = new Supplier[CompletionStage[Optional[Offset]]] {
override def get(): CompletionStage[Optional[Offset]] =
offset().map(_.asJava)(ec).toJava
diff --git
a/projection-core/src/main/scala/akka/projection/internal/Telemetry.scala
b/projection-core/src/main/scala/akka/projection/internal/Telemetry.scala
index 0dcaf3e..b956b9e 100644
--- a/projection-core/src/main/scala/akka/projection/internal/Telemetry.scala
+++ b/projection-core/src/main/scala/akka/projection/internal/Telemetry.scala
@@ -28,7 +28,8 @@ import akka.util.ccompat.JavaConverters._
@InternalStableApi
trait Telemetry {
- /** Invoked when a projection is stopped. The reason for stopping is
unspecified, can be a
+ /**
+ * Invoked when a projection is stopped. The reason for stopping is
unspecified, can be a
* graceful stop or a failure (see [[Telemetry.failed]]).
*/
def stopped(): Unit
diff --git
a/projection-core/src/main/scala/akka/projection/scaladsl/ProjectionManagement.scala
b/projection-core/src/main/scala/akka/projection/scaladsl/ProjectionManagement.scala
index 583dde2..9756e0f 100644
---
a/projection-core/src/main/scala/akka/projection/scaladsl/ProjectionManagement.scala
+++
b/projection-core/src/main/scala/akka/projection/scaladsl/ProjectionManagement.scala
@@ -49,10 +49,11 @@ import akka.util.Timeout
"projection-" + projectionName
private def topic(projectionName: String):
ActorRef[Topic.Command[ProjectionManagementCommand]] = {
- topics.computeIfAbsent(projectionName, _ => {
- val name = topicName(projectionName)
- system.systemActorOf(Topic[ProjectionManagementCommand](name), name)
- })
+ topics.computeIfAbsent(projectionName,
+ _ => {
+ val name = topicName(projectionName)
+ system.systemActorOf(Topic[ProjectionManagementCommand](name), name)
+ })
}
/**
diff --git
a/projection-jdbc/src/it/scala/akka/projection/jdbc/JdbcProjectionSpec.scala
b/projection-jdbc/src/it/scala/akka/projection/jdbc/JdbcProjectionSpec.scala
index d59c1a9..5ca13df 100644
--- a/projection-jdbc/src/it/scala/akka/projection/jdbc/JdbcProjectionSpec.scala
+++ b/projection-jdbc/src/it/scala/akka/projection/jdbc/JdbcProjectionSpec.scala
@@ -847,7 +847,6 @@ class JdbcProjectionSpec
.withSaveOffset(10, 1.minute)
projectionTestKit.runWithTestSink(projection) { sinkProbe =>
-
eventually {
sourceProbe.get should not be null
}
@@ -892,7 +891,6 @@ class JdbcProjectionSpec
.withSaveOffset(10, 2.seconds)
projectionTestKit.runWithTestSink(projection) { sinkProbe =>
-
eventually {
sourceProbe.get should not be null
}
diff --git
a/projection-jdbc/src/main/scala/akka/projection/jdbc/JdbcSession.scala
b/projection-jdbc/src/main/scala/akka/projection/jdbc/JdbcSession.scala
index d9d4bd3..050a3e4 100644
--- a/projection-jdbc/src/main/scala/akka/projection/jdbc/JdbcSession.scala
+++ b/projection-jdbc/src/main/scala/akka/projection/jdbc/JdbcSession.scala
@@ -24,7 +24,6 @@ import akka.japi.function.{ Function => JFunction }
* When using plain JDBC, one can initialize a connection directly, but when
relying on a JDBC framework like JPA it will depend on the
* chosen implementation. Hibernate for instance provides indirect access to
the underlying connection through a
* lambda call and therefore can be used (see [[JdbcSession#withConnection]]
method). Other JPA implementations may not provide this feature.
- *
*/
@ApiMayChange
trait JdbcSession {
diff --git
a/projection-jdbc/src/main/scala/akka/projection/jdbc/internal/JdbcOffsetStore.scala
b/projection-jdbc/src/main/scala/akka/projection/jdbc/internal/JdbcOffsetStore.scala
index 3d0b16b..f0c73c6 100644
---
a/projection-jdbc/src/main/scala/akka/projection/jdbc/internal/JdbcOffsetStore.scala
+++
b/projection-jdbc/src/main/scala/akka/projection/jdbc/internal/JdbcOffsetStore.scala
@@ -97,18 +97,15 @@ private[projection] class JdbcOffsetStore[S <: JdbcSession](
def readOffset[Offset](projectionId: ProjectionId): Future[Option[Offset]] =
withConnection(jdbcSessionFactory) { conn =>
-
if (verboseLogging)
logger.debug("reading offset for [{}], using connection id [{}]",
projectionId, System.identityHashCode(conn))
// init Statement in try-with-resource
tryWithResource(conn.prepareStatement(settings.dialect.readOffsetQuery))
{ stmt =>
-
stmt.setString(1, projectionId.name)
// init ResultSet in try-with-resource
tryWithResource(stmt.executeQuery()) { resultSet =>
-
val buffer = ListBuffer.empty[SingleOffset]
while (resultSet.next()) {
@@ -223,7 +220,6 @@ private[projection] class JdbcOffsetStore[S <: JdbcSession](
def readManagementState(projectionId: ProjectionId):
Future[Option[ManagementState]] = {
withConnection(jdbcSessionFactory) { conn =>
-
if (verboseLogging)
logger.debug(
"reading ManagementState for [{}], using connection id [{}]",
@@ -232,7 +228,6 @@ private[projection] class JdbcOffsetStore[S <: JdbcSession](
// init Statement in try-with-resource
tryWithResource(conn.prepareStatement(settings.dialect.readManagementStateQuery))
{ stmt =>
-
stmt.setString(1, projectionId.name)
stmt.setString(2, projectionId.key)
diff --git
a/projection-jdbc/src/main/scala/akka/projection/jdbc/internal/JdbcProjectionImpl.scala
b/projection-jdbc/src/main/scala/akka/projection/jdbc/internal/JdbcProjectionImpl.scala
index 209db62..f464c21 100644
---
a/projection-jdbc/src/main/scala/akka/projection/jdbc/internal/JdbcProjectionImpl.scala
+++
b/projection-jdbc/src/main/scala/akka/projection/jdbc/internal/JdbcProjectionImpl.scala
@@ -57,7 +57,6 @@ private[projection] object JdbcProjectionImpl {
sessionFactory: () => S,
handlerFactory: () => JdbcHandler[Envelope, S],
offsetStore: JdbcOffsetStore[S]): () => Handler[Envelope] = { () =>
-
new AdaptedJdbcHandler(handlerFactory(), offsetStore.executionContext) {
override def process(envelope: Envelope): Future[Done] = {
val offset = sourceProvider.extractOffset(envelope)
@@ -96,7 +95,6 @@ private[projection] object JdbcProjectionImpl {
sessionFactory: () => S,
handlerFactory: () => JdbcHandler[immutable.Seq[Envelope], S],
offsetStore: JdbcOffsetStore[S]): () => Handler[immutable.Seq[Envelope]]
= { () =>
-
new AdaptedJdbcHandler(handlerFactory(), offsetStore.executionContext) {
override def process(envelopes: immutable.Seq[Envelope]): Future[Done] =
{
val offset = sourceProvider.extractOffset(envelopes.last)
@@ -212,7 +210,7 @@ private[projection] class JdbcProjectionImpl[Offset,
Envelope, S <: JdbcSession]
val newStrategy = offsetStrategy match {
case s: ExactlyOnce => s.copy(recoveryStrategy = Some(recoveryStrategy))
case s: AtLeastOnce => s.copy(recoveryStrategy = Some(recoveryStrategy))
- //NOTE: AtMostOnce has its own withRecoveryStrategy variant
+ // NOTE: AtMostOnce has its own withRecoveryStrategy variant
// this method is not available for AtMostOnceProjection
case s: AtMostOnce => s
}
diff --git
a/projection-jdbc/src/main/scala/akka/projection/jdbc/javadsl/JdbcProjection.scala
b/projection-jdbc/src/main/scala/akka/projection/jdbc/javadsl/JdbcProjection.scala
index 46ff4f3..e8ac368 100644
---
a/projection-jdbc/src/main/scala/akka/projection/jdbc/javadsl/JdbcProjection.scala
+++
b/projection-jdbc/src/main/scala/akka/projection/jdbc/javadsl/JdbcProjection.scala
@@ -43,7 +43,6 @@ object JdbcProjection {
*
* It stores the offset in a relational database table using JDBC in the
same transaction
* as the user defined `handler`.
- *
*/
def exactlyOnce[Offset, Envelope, S <: JdbcSession](
projectionId: ProjectionId,
diff --git
a/projection-jdbc/src/test/scala/akka/projection/jdbc/JdbcOffsetStoreSpec.scala
b/projection-jdbc/src/test/scala/akka/projection/jdbc/JdbcOffsetStoreSpec.scala
index ab9ef73..9672a57 100644
---
a/projection-jdbc/src/test/scala/akka/projection/jdbc/JdbcOffsetStoreSpec.scala
+++
b/projection-jdbc/src/test/scala/akka/projection/jdbc/JdbcOffsetStoreSpec.scala
@@ -148,7 +148,6 @@ abstract class JdbcOffsetStoreSpec(specConfig:
JdbcSpecConfig)
private def selectLastUpdated(projectionId: ProjectionId): Instant = {
withConnection(specConfig.jdbcSessionFactory _) { conn =>
-
val statement = selectLastStatement
// init statement in try-with-resource
@@ -158,7 +157,6 @@ abstract class JdbcOffsetStoreSpec(specConfig:
JdbcSpecConfig)
// init ResultSet in try-with-resource
tryWithResource(stmt.executeQuery()) { resultSet =>
-
if (resultSet.next()) {
val millisSinceEpoch = resultSet.getLong(6)
Instant.ofEpochMilli(millisSinceEpoch)
diff --git
a/projection-kafka/src/it/scala/akka/projection/kafka/integration/KafkaToSlickIntegrationSpec.scala
b/projection-kafka/src/it/scala/akka/projection/kafka/integration/KafkaToSlickIntegrationSpec.scala
index 7439ec9..a2a8041 100644
---
a/projection-kafka/src/it/scala/akka/projection/kafka/integration/KafkaToSlickIntegrationSpec.scala
+++
b/projection-kafka/src/it/scala/akka/projection/kafka/integration/KafkaToSlickIntegrationSpec.scala
@@ -185,12 +185,13 @@ class KafkaToSlickIntegrationSpec extends
KafkaSpecBase(ConfigFactory.load().wit
// repository will fail to insert the "AddToCart" event type once only
val failedOnce = new AtomicBoolean
- val failingRepository = new EventTypeCountRepository(dbConfig,
doTransientFailure = eventType => {
- if (!failedOnce.get && eventType == EventType.AddToCart) {
- failedOnce.set(true)
- true
- } else false
- })
+ val failingRepository = new EventTypeCountRepository(dbConfig,
+ doTransientFailure = eventType => {
+ if (!failedOnce.get && eventType == EventType.AddToCart) {
+ failedOnce.set(true)
+ true
+ } else false
+ })
val slickProjection =
SlickProjection
diff --git
a/projection-kafka/src/main/scala/akka/projection/kafka/internal/KafkaSourceProviderImpl.scala
b/projection-kafka/src/main/scala/akka/projection/kafka/internal/KafkaSourceProviderImpl.scala
index 498ed7f..a811a24 100644
---
a/projection-kafka/src/main/scala/akka/projection/kafka/internal/KafkaSourceProviderImpl.scala
+++
b/projection-kafka/src/main/scala/akka/projection/kafka/internal/KafkaSourceProviderImpl.scala
@@ -78,9 +78,10 @@ import org.apache.kafka.common.record.TimestampType
metadataClient: MetadataClientAdapter): Source[ConsumerRecord[K, V],
Consumer.Control] =
Consumer
.plainPartitionedManualOffsetSource(settings, subscription,
getOffsetsOnAssign(readOffsets, metadataClient))
- .flatMapMerge(numPartitions, {
- case (_, partitionedSource) => partitionedSource
- })
+ .flatMapMerge(numPartitions,
+ {
+ case (_, partitionedSource) => partitionedSource
+ })
override def source(readOffsets: ReadOffsets):
Future[Source[ConsumerRecord[K, V], NotUsed]] = {
// get the total number of partitions to configure the `breadth`
parameter, or we could just use a really large
@@ -136,7 +137,7 @@ import org.apache.kafka.common.record.TimestampType
case Some(groupOffsets) =>
val filteredMap = groupOffsets.entries.collect {
case (topicPartitionKey, offset) if
assignedTps.contains(keyToPartition(topicPartitionKey)) =>
- (keyToPartition(topicPartitionKey) ->
(offset.asInstanceOf[Long] + 1L))
+ keyToPartition(topicPartitionKey) ->
(offset.asInstanceOf[Long] + 1L)
}
Future.successful(filteredMap)
case None => metadataClient.getBeginningOffsets(assignedTps)
diff --git
a/projection-slick/src/it/scala/akka/projection/slick/SlickContainerOffsetStoreSpec.scala
b/projection-slick/src/it/scala/akka/projection/slick/SlickContainerOffsetStoreSpec.scala
index 528c029..cb48e4d 100644
---
a/projection-slick/src/it/scala/akka/projection/slick/SlickContainerOffsetStoreSpec.scala
+++
b/projection-slick/src/it/scala/akka/projection/slick/SlickContainerOffsetStoreSpec.scala
@@ -114,7 +114,7 @@ object SlickContainerOffsetStoreSpec {
// related to
https://github.com/testcontainers/testcontainers-java/issues/2313
// otherwise we get ORA-01882: timezone region not found
System.setProperty("oracle.jdbc.timezoneAsRegion", "false")
-
+
val container = initContainer(new
OracleContainer("oracleinanutshell/oracle-xe-11g:1.0.0"))
override def config: Config =
diff --git
a/projection-slick/src/main/scala/akka/projection/slick/internal/SlickProjectionImpl.scala
b/projection-slick/src/main/scala/akka/projection/slick/internal/SlickProjectionImpl.scala
index 8451f49..660abf4 100644
---
a/projection-slick/src/main/scala/akka/projection/slick/internal/SlickProjectionImpl.scala
+++
b/projection-slick/src/main/scala/akka/projection/slick/internal/SlickProjectionImpl.scala
@@ -130,7 +130,7 @@ private[projection] class SlickProjectionImpl[Offset,
Envelope, P <: JdbcProfile
offsetStrategy match {
case s: ExactlyOnce => s.copy(recoveryStrategy =
Some(recoveryStrategy))
case s: AtLeastOnce => s.copy(recoveryStrategy =
Some(recoveryStrategy))
- //NOTE: AtMostOnce has its own withRecoveryStrategy variant
+ // NOTE: AtMostOnce has its own withRecoveryStrategy variant
// this method is not available for AtMostOnceProjection
case s: AtMostOnce => s
}
diff --git
a/projection-slick/src/test/scala/akka/projection/slick/SlickOffsetStoreSpec.scala
b/projection-slick/src/test/scala/akka/projection/slick/SlickOffsetStoreSpec.scala
index e50c5ac..22eef64 100644
---
a/projection-slick/src/test/scala/akka/projection/slick/SlickOffsetStoreSpec.scala
+++
b/projection-slick/src/test/scala/akka/projection/slick/SlickOffsetStoreSpec.scala
@@ -250,7 +250,7 @@ abstract class SlickOffsetStoreSpec(specConfig:
SlickSpecConfig)
val projectionId = genRandomProjectionId()
- val timeOffset =
TimeBasedUUID(UUID.fromString("49225740-2019-11ea-a752-ffae2393b6e4"))
//2019-12-16T15:32:36.148Z[UTC]
+ val timeOffset =
TimeBasedUUID(UUID.fromString("49225740-2019-11ea-a752-ffae2393b6e4")) //
2019-12-16T15:32:36.148Z[UTC]
withClue("check - save offset") {
dbConfig.db.run(offsetStore.saveOffset(projectionId,
timeOffset)).futureValue
}
diff --git
a/projection-slick/src/test/scala/akka/projection/slick/SlickProjectionSpec.scala
b/projection-slick/src/test/scala/akka/projection/slick/SlickProjectionSpec.scala
index a642e73..d197269 100644
---
a/projection-slick/src/test/scala/akka/projection/slick/SlickProjectionSpec.scala
+++
b/projection-slick/src/test/scala/akka/projection/slick/SlickProjectionSpec.scala
@@ -505,7 +505,6 @@ class SlickProjectionSpec
}
projectionTestKit.runWithTestSink(slickProjectionFailing) { sinkProbe =>
-
sinkProbe.request(1000)
eventuallyExpectError(sinkProbe).getClass shouldBe
classOf[JdbcSQLIntegrityConstraintViolationException]
}
@@ -999,7 +998,6 @@ class SlickProjectionSpec
.withSaveOffset(10, 1.minute)
projectionTestKit.runWithTestSink(slickProjection) { sinkProbe =>
-
eventually {
sourceProbe.get should not be null
}
@@ -1049,7 +1047,6 @@ class SlickProjectionSpec
.withSaveOffset(10, 2.seconds)
projectionTestKit.runWithTestSink(slickProjection) { sinkProbe =>
-
eventually {
sourceProbe.get should not be null
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]