This is an automated email from the ASF dual-hosted git repository. fanningpj pushed a commit to branch wip-chbatey-latest-cass-version in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git
commit c260dd5b864f82d634d026daf939873a312e034f Author: Christopher Batey <[email protected]> AuthorDate: Thu Jan 9 09:51:54 2020 +0000 Actually commit them --- akka-sample-cqrs-scala/build.sbt | 2 +- akka-sample-cqrs-scala/project/build.properties | 2 +- .../src/main/resources/application.conf | 6 ++++++ .../scala/sample/cqrs/CassandraSessionExtension.scala | 9 +++------ .../src/main/scala/sample/cqrs/EventProcessor.scala | 15 +++++++++++---- .../src/main/scala/sample/cqrs/ShoppingCartRoutes.scala | 7 +++---- 6 files changed, 25 insertions(+), 16 deletions(-) diff --git a/akka-sample-cqrs-scala/build.sbt b/akka-sample-cqrs-scala/build.sbt index 5c535ee..3298508 100644 --- a/akka-sample-cqrs-scala/build.sbt +++ b/akka-sample-cqrs-scala/build.sbt @@ -1,5 +1,5 @@ val AkkaVersion = "2.6.1" -val AkkaPersistenceCassandraVersion = "0.100" +val AkkaPersistenceCassandraVersion = "1.0-SNAPSHOT" val AkkaHttpVersion = "10.1.10" lazy val `akka-sample-cqrs-scala` = project diff --git a/akka-sample-cqrs-scala/project/build.properties b/akka-sample-cqrs-scala/project/build.properties index 6adcdc7..00b48d9 100644 --- a/akka-sample-cqrs-scala/project/build.properties +++ b/akka-sample-cqrs-scala/project/build.properties @@ -1 +1 @@ -sbt.version=1.3.3 +sbt.version=1.3.6 diff --git a/akka-sample-cqrs-scala/src/main/resources/application.conf b/akka-sample-cqrs-scala/src/main/resources/application.conf index 5b51db2..dcbc0f9 100644 --- a/akka-sample-cqrs-scala/src/main/resources/application.conf +++ b/akka-sample-cqrs-scala/src/main/resources/application.conf @@ -36,11 +36,17 @@ akka { cassandra-journal { events-by-tag { bucket-size = "Day" + flush-interval = 10ms } + pubsub-notification = on } cassandra-query-journal { first-time-bucket = "20191023T00:00" + events-by-tag { + eventual-consistency-delay = 150ms + } +// refresh-interval = 200ms } event-processor { diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/CassandraSessionExtension.scala b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/CassandraSessionExtension.scala index b5bbfe4..e270fe3 100644 --- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/CassandraSessionExtension.scala +++ b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/CassandraSessionExtension.scala @@ -1,16 +1,14 @@ package sample.cqrs import scala.concurrent.Future - import akka.Done import akka.actor.typed.ActorSystem import akka.actor.typed.Extension import akka.actor.typed.ExtensionId import akka.actor.typed.scaladsl.adapter._ +import akka.cassandra.session.DefaultSessionProvider +import akka.cassandra.session.scaladsl.CassandraSession import akka.event.Logging -import akka.persistence.cassandra.ConfigSessionProvider -import akka.persistence.cassandra.session.CassandraSessionSettings -import akka.persistence.cassandra.session.scaladsl.CassandraSession object CassandraSessionExtension extends ExtensionId[CassandraSessionExtension] { @@ -27,8 +25,7 @@ class CassandraSessionExtension(system: ActorSystem[_]) extends Extension { val sessionConfig = system.settings.config.getConfig("cassandra-journal") new CassandraSession( system.toClassic, - new ConfigSessionProvider(system.toClassic, sessionConfig), - CassandraSessionSettings(sessionConfig), + new DefaultSessionProvider(system.toClassic, sessionConfig), system.executionContext, Logging(system.toClassic, getClass), metricsCategory = "sample", diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/EventProcessor.scala b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/EventProcessor.scala index ec420d1..dc1a560 100644 --- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/EventProcessor.scala +++ b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/EventProcessor.scala @@ -4,7 +4,6 @@ import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration._ import scala.reflect.ClassTag - import akka.Done import akka.NotUsed import akka.actor.typed.ActorSystem @@ -26,8 +25,9 @@ import akka.stream.SharedKillSwitch import akka.stream.scaladsl.RestartSource import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Source -import com.datastax.driver.core.PreparedStatement -import com.datastax.driver.core.Row +import com.datastax.oss.driver.api.core.cql.PreparedStatement +import com.datastax.oss.driver.api.core.cql.Row +import com.datastax.oss.driver.api.core.uuid.Uuids import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -110,6 +110,13 @@ abstract class EventProcessorStream[Event: ClassTag]( query.eventsByTag(tag, offset).mapAsync(1) { eventEnvelope => eventEnvelope.event match { case event: Event => + eventEnvelope.offset match { + case TimeBasedUUID(offset) => + // these times are from different nodes so don't rely on this being accurate + val eventDelay = System.currentTimeMillis() - Uuids.unixTimestamp(offset) + log.info("Event eventual consistency {}", eventDelay.millis) + case _ => + } processEvent(event, PersistenceId.ofUniqueId(eventEnvelope.persistenceId), eventEnvelope.sequenceNr).map(_ => eventEnvelope.offset) case other => @@ -130,7 +137,7 @@ abstract class EventProcessorStream[Event: ClassTag]( private def extractOffset(maybeRow: Option[Row]): Offset = { maybeRow match { case Some(row) => - val uuid = row.getUUID("timeUuidOffset") + val uuid = row.getUuid("timeUuidOffset") if (uuid == null) { NoOffset } else { diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartRoutes.scala b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartRoutes.scala index 7bf093d..6e03597 100644 --- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartRoutes.scala +++ b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartRoutes.scala @@ -1,7 +1,6 @@ package sample.cqrs import scala.concurrent.Future -import scala.concurrent.duration._ import akka.actor.typed.ActorRef import akka.actor.typed.ActorSystem @@ -39,7 +38,7 @@ class ShoppingCartRoutes()(implicit system: ActorSystem[_]) { case ShoppingCart.Accepted(summary) => complete(StatusCodes.OK -> summary) case ShoppingCart.Rejected(reason) => - complete(StatusCodes.BadRequest, reason) + complete(StatusCodes.BadRequest -> reason) } } }, @@ -57,7 +56,7 @@ class ShoppingCartRoutes()(implicit system: ActorSystem[_]) { case ShoppingCart.Accepted(summary) => complete(StatusCodes.OK -> summary) case ShoppingCart.Rejected(reason) => - complete(StatusCodes.BadRequest, reason) + complete(StatusCodes.BadRequest -> reason) } } }, @@ -76,7 +75,7 @@ class ShoppingCartRoutes()(implicit system: ActorSystem[_]) { case ShoppingCart.Accepted(summary) => complete(StatusCodes.OK -> summary) case ShoppingCart.Rejected(reason) => - complete(StatusCodes.BadRequest, reason) + complete(StatusCodes.BadRequest -> reason) } } }) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
