This is an automated email from the ASF dual-hosted git repository. fanningpj pushed a commit to branch aws-testing in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git
commit eb6696c150868f4acff4d180ce06b12ff5fc37c3 Author: Christopher Batey <[email protected]> AuthorDate: Mon Jan 13 09:04:08 2020 +0000 Modifications for running in aws --- akka-sample-cqrs-scala/.gitignore | 1 + akka-sample-cqrs-scala/build.sbt | 16 +- akka-sample-cqrs-scala/project/build.properties | 2 +- akka-sample-cqrs-scala/project/plugins.sbt | 3 + .../src/main/resources/application.conf | 41 ----- akka-sample-cqrs-scala/src/main/resources/aws.conf | 43 +++++ .../resources/{application.conf => common.conf} | 22 +-- .../src/main/resources/logback.xml | 1 + .../main/scala/sample/cqrs/EventProcessor.scala | 4 +- .../src/main/scala/sample/cqrs/Main.scala | 9 +- .../scala/sample/cqrs/EventProcessorSpec.scala | 85 ---------- .../test/scala/sample/cqrs/IntegrationSpec.scala | 174 --------------------- .../test/scala/sample/cqrs/ShoppingCartSpec.scala | 85 ---------- akka-sample-cqrs-scala/upload-app.sh | 2 + 14 files changed, 82 insertions(+), 406 deletions(-) diff --git a/akka-sample-cqrs-scala/.gitignore b/akka-sample-cqrs-scala/.gitignore new file mode 100644 index 0000000..c3c3e19 --- /dev/null +++ b/akka-sample-cqrs-scala/.gitignore @@ -0,0 +1 @@ +akka-sample-cqrs-scala-1.0 diff --git a/akka-sample-cqrs-scala/build.sbt b/akka-sample-cqrs-scala/build.sbt index 4e0b98a..d87bc6c 100644 --- a/akka-sample-cqrs-scala/build.sbt +++ b/akka-sample-cqrs-scala/build.sbt @@ -1,5 +1,7 @@ +import NativePackagerHelper._ + val AkkaVersion = "2.6.1" -val AkkaPersistenceCassandraVersion = "0.98+118-c0a98f49-SNAPSHOT" +val AkkaPersistenceCassandraVersion = "0.98+118-c0a98f49+20200109-1155-SNAPSHOT" val AkkaHttpVersion = "10.1.10" lazy val `akka-sample-cqrs-scala` = project @@ -33,3 +35,15 @@ lazy val `akka-sample-cqrs-scala` = project testOptions in Test += Tests.Argument("-oDF"), logBuffered in Test := false, licenses := Seq(("CC0", url("http://creativecommons.org/publicdomain/zero/1.0")))) + .settings( + assemblyMergeStrategy in assembly := { + case "application.conf" => MergeStrategy.concat + case "META-INF/io.netty.versions.properties" => MergeStrategy.first + case "module-info.class" => MergeStrategy.discard + case x => + val oldStrategy = (assemblyMergeStrategy in assembly).value + oldStrategy(x) + } + ).enablePlugins(UniversalPlugin, JavaAppPackaging) + + diff --git a/akka-sample-cqrs-scala/project/build.properties b/akka-sample-cqrs-scala/project/build.properties index 6adcdc7..00b48d9 100644 --- a/akka-sample-cqrs-scala/project/build.properties +++ b/akka-sample-cqrs-scala/project/build.properties @@ -1 +1 @@ -sbt.version=1.3.3 +sbt.version=1.3.6 diff --git a/akka-sample-cqrs-scala/project/plugins.sbt b/akka-sample-cqrs-scala/project/plugins.sbt new file mode 100644 index 0000000..8180cb2 --- /dev/null +++ b/akka-sample-cqrs-scala/project/plugins.sbt @@ -0,0 +1,3 @@ +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.10") +addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.5.2") + diff --git a/akka-sample-cqrs-scala/src/main/resources/application.conf b/akka-sample-cqrs-scala/src/main/resources/application.conf index 998eefe..f0f2a01 100644 --- a/akka-sample-cqrs-scala/src/main/resources/application.conf +++ b/akka-sample-cqrs-scala/src/main/resources/application.conf @@ -1,13 +1,4 @@ akka { - loglevel = DEBUG - - actor { - provider = cluster - - serialization-bindings { - "sample.cqrs.CborSerializable" = jackson-cbor - } - } # For the sample, just bind to loopback and do not allow access from the network # the port is overridden by the logic in main class @@ -25,37 +16,5 @@ akka { roles = ["write-model", "read-model"] } - # use Cassandra to store both snapshots and the events of the persistent actors - persistence { - journal.plugin = "cassandra-journal" - snapshot-store.plugin = "cassandra-snapshot-store" - } - -} - -cassandra-journal { - events-by-tag { - bucket-size = "Day" - } -} - -cassandra-query-journal { - first-time-bucket = "20191023T00:00" } -#latency testing -cassandra-journal.events-by-tag.flush-interval = 0s -cassandra-journal.pubsub-notification = on -cassandra-query-journal.events-by-tag.eventual-consistency-delay = 50ms -cassandra-query-journal.refresh-interval = 3s - -event-processor { - id = "EventProcessor" // type name of sharded event processor - keep-alive-interval = 2 seconds // event-processors ping interval - tag-prefix = "tag" // even processor tag prefix - parallelism = 1 // number of event processors -} - -shopping.http.port = 0 -shopping.askTimeout = 5 s - diff --git a/akka-sample-cqrs-scala/src/main/resources/aws.conf b/akka-sample-cqrs-scala/src/main/resources/aws.conf new file mode 100644 index 0000000..c9116fc --- /dev/null +++ b/akka-sample-cqrs-scala/src/main/resources/aws.conf @@ -0,0 +1,43 @@ +include "common.conf" + +akka { + remote.artery { + canonical.port = 2551 + } + + cluster { + seed-nodes = [ + "akka://[email protected]:2551" + ] + + roles = ["write-model", "read-model"] + } +} + +// config for the 1.0x branch +datastax-java-driver { + profiles { + cassandra-journal { + basic.request { + consistency = LOCAL_ONE + # the journal does not use any counters or collections + default-idempotence = true + } + } + } +} + +datastax-java-driver { + basic.contact-points = [ "cassandra.eu-north-1.amazonaws.com:9142" ] + advanced.ssl-engine-factory { + class = DefaultSslEngineFactory + } + basic.load-balancing-policy.local-datacenter = "dc1" + + advanced.auth-provider { + class = PlainTextAuthProvider + username = ${CASSANDRA_USERNAME} + password = ${CASSANDRA_PASSWORD} + + } +} \ No newline at end of file diff --git a/akka-sample-cqrs-scala/src/main/resources/application.conf b/akka-sample-cqrs-scala/src/main/resources/common.conf similarity index 83% copy from akka-sample-cqrs-scala/src/main/resources/application.conf copy to akka-sample-cqrs-scala/src/main/resources/common.conf index 998eefe..1ccfa5e 100644 --- a/akka-sample-cqrs-scala/src/main/resources/application.conf +++ b/akka-sample-cqrs-scala/src/main/resources/common.conf @@ -11,17 +11,8 @@ akka { # For the sample, just bind to loopback and do not allow access from the network # the port is overridden by the logic in main class - remote.artery { - canonical.port = 0 - canonical.hostname = 127.0.0.1 - } cluster { - seed-nodes = [ - "akka://[email protected]:2551", - "akka://[email protected]:2552" - ] - roles = ["write-model", "read-model"] } @@ -41,6 +32,10 @@ cassandra-journal { cassandra-query-journal { first-time-bucket = "20191023T00:00" + + // events-by-tag { + // verbose-debug-logging = true + // } } #latency testing @@ -59,3 +54,12 @@ event-processor { shopping.http.port = 0 shopping.askTimeout = 5 s + +// config for the 1.0x branch + +datastax-java-driver { + basic.contact-points = [ "cassandra.eu-north-1.amazonaws.com" ] + advanced.ssl-engine-factory { + class = DefaultSslEngineFactory + } +} \ No newline at end of file diff --git a/akka-sample-cqrs-scala/src/main/resources/logback.xml b/akka-sample-cqrs-scala/src/main/resources/logback.xml index 1c57325..8f718d9 100644 --- a/akka-sample-cqrs-scala/src/main/resources/logback.xml +++ b/akka-sample-cqrs-scala/src/main/resources/logback.xml @@ -12,6 +12,7 @@ --> <logger name="com.datastax.driver.core.Connection" level="WARN"/> <logger name="com.codahale.metrics" level="INFO"/> + <logger name="akka.persistence.cassandra" level="DEBUG"/> <appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender"> <queueSize>1024</queueSize> diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/EventProcessor.scala b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/EventProcessor.scala index 160bfcf..8f1600e 100644 --- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/EventProcessor.scala +++ b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/EventProcessor.scala @@ -122,7 +122,7 @@ abstract class EventProcessorStream[Event: ClassTag]( private def readOffset(): Future[Offset] = { session .selectOne( - "SELECT timeUuidOffset FROM akka_cqrs_sample.offsetStore WHERE eventProcessorId = ? AND tag = ?", + "SELECT timeUuidOffset FROM akka.offsetStore WHERE eventProcessorId = ? AND tag = ?", eventProcessorId, tag) .map(extractOffset) @@ -142,7 +142,7 @@ abstract class EventProcessorStream[Event: ClassTag]( } private def prepareWriteOffset(): Future[PreparedStatement] = { - session.prepare("INSERT INTO akka_cqrs_sample.offsetStore (eventProcessorId, tag, timeUuidOffset) VALUES (?, ?, ?)") + session.prepare("INSERT INTO akka.offsetStore (eventProcessorId, tag, timeUuidOffset) VALUES (?, ?, ?)") } private def writeOffset(offset: Offset)(implicit ec: ExecutionContext): Future[Done] = { diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/Main.scala b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/Main.scala index ad46815..fbe9e52 100644 --- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/Main.scala +++ b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/Main.scala @@ -60,15 +60,9 @@ object Main { def createTables(system: ActorSystem[_]): Unit = { val session = CassandraSessionExtension(system).session - // TODO use real replication strategy in real application - val keyspaceStmt = """ - CREATE KEYSPACE IF NOT EXISTS akka_cqrs_sample - WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 } - """ - val offsetTableStmt = """ - CREATE TABLE IF NOT EXISTS akka_cqrs_sample.offsetStore ( + CREATE TABLE IF NOT EXISTS akka.offsetStore ( eventProcessorId text, tag text, timeUuidOffset timeuuid, @@ -77,7 +71,6 @@ object Main { """ // ok to block here, main thread - Await.ready(session.executeDDL(keyspaceStmt), 30.seconds) Await.ready(session.executeDDL(offsetTableStmt), 30.seconds) } diff --git a/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/EventProcessorSpec.scala b/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/EventProcessorSpec.scala deleted file mode 100644 index 8bea0a2..0000000 --- a/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/EventProcessorSpec.scala +++ /dev/null @@ -1,85 +0,0 @@ -package sample.cqrs - -import java.io.File - -import akka.actor.testkit.typed.scaladsl.LoggingTestKit -import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit -import akka.actor.typed.eventstream.EventStream -import akka.persistence.cassandra.testkit.CassandraLauncher -import com.typesafe.config.ConfigFactory -import org.apache.commons.io.FileUtils -import org.scalatest.WordSpecLike - -class EventProcessorSpec extends ScalaTestWithActorTestKit(ConfigFactory.parseString(s""" - akka.actor.provider = local - cassandra-journal { - port = 19042 - } - cassandra-snapshot-store { - port = 19042 - } - cassandra-query-journal { - refresh-interval = 500 ms - events-by-tag { - eventual-consistency-delay = 200 ms - } - } - akka.actor.testkit.typed.single-expect-default = 5s - # For LoggingTestKit - akka.actor.testkit.typed.filter-leeway = 5s - """).withFallback(ConfigFactory.load())) with WordSpecLike { - - val databaseDirectory = new File("target/cassandra-EventProcessorSpec") - - override protected def beforeAll(): Unit = { - CassandraLauncher.start( - databaseDirectory, - CassandraLauncher.DefaultTestConfigResource, - clean = true, - port = 19042, // default is 9042, but use different for test - CassandraLauncher.classpathForResources("logback-test.xml")) - - Main.createTables(system) - - super.beforeAll() - } - - override protected def afterAll(): Unit = { - super.afterAll() - CassandraLauncher.stop() - FileUtils.deleteDirectory(databaseDirectory) - } - - "The events from the Shopping Cart" should { - "be consumed by the event processor" in { - val cart1 = testKit.spawn(ShoppingCart("cart-1", Set("tag-0"))) - val probe = testKit.createTestProbe[ShoppingCart.Confirmation] - - val eventProbe = testKit.createTestProbe[ShoppingCart.Event]() - testKit.system.eventStream ! EventStream.Subscribe(eventProbe.ref) - - testKit.spawn( - EventProcessor( - new ShoppingCartEventProcessorStream(system, system.executionContext, "EventProcessor", "tag-0"))) - cart1 ! ShoppingCart.AddItem("foo", 42, probe.ref) - probe.expectMessageType[ShoppingCart.Accepted] - eventProbe.expectMessage(ShoppingCart.ItemAdded("cart-1", "foo", 42)) - - cart1 ! ShoppingCart.AddItem("bar", 17, probe.ref) - probe.expectMessageType[ShoppingCart.Accepted] - eventProbe.expectMessage(ShoppingCart.ItemAdded("cart-1", "bar", 17)) - cart1 ! ShoppingCart.AdjustItemQuantity("bar", 18, probe.ref) - probe.expectMessageType[ShoppingCart.Accepted] - eventProbe.expectMessage(ShoppingCart.ItemQuantityAdjusted("cart-1", "bar", 18)) - - val cart2 = testKit.spawn(ShoppingCart("cart-2", Set("tag-0"))) - // also verify that EventProcessor is logging - LoggingTestKit.info("consumed ItemAdded(cart-2,another,1)").intercept { - cart2 ! ShoppingCart.AddItem("another", 1, probe.ref) - probe.expectMessageType[ShoppingCart.Accepted] - } - eventProbe.expectMessage(ShoppingCart.ItemAdded("cart-2", "another", 1)) - } - } - -} diff --git a/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/IntegrationSpec.scala b/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/IntegrationSpec.scala deleted file mode 100644 index d14083a..0000000 --- a/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/IntegrationSpec.scala +++ /dev/null @@ -1,174 +0,0 @@ -package sample.cqrs - -import java.io.File - -import scala.concurrent.duration._ - -import akka.actor.testkit.typed.scaladsl.ActorTestKit -import akka.actor.typed.eventstream.EventStream -import akka.cluster.MemberStatus -import akka.cluster.sharding.typed.scaladsl.ClusterSharding -import akka.cluster.typed.Cluster -import akka.cluster.typed.Join -import akka.persistence.cassandra.testkit.CassandraLauncher -import com.typesafe.config.Config -import com.typesafe.config.ConfigFactory -import org.apache.commons.io.FileUtils -import org.scalatest.BeforeAndAfterAll -import org.scalatest.Matchers -import org.scalatest.TestSuite -import org.scalatest.WordSpecLike -import org.scalatest.concurrent.Eventually -import org.scalatest.concurrent.PatienceConfiguration -import org.scalatest.concurrent.ScalaFutures -import org.scalatest.time.Span - -object IntegrationSpec { - val config: Config = ConfigFactory.parseString(s""" - akka.cluster { - seed-nodes = [] - } - cassandra-journal { - port = 19042 - } - cassandra-snapshot-store { - port = 19042 - } - cassandra-query-journal { - refresh-interval = 500 ms - events-by-tag { - eventual-consistency-delay = 200 ms - } - } - event-processor { - keep-alive-interval = 1 seconds - } - akka.actor.testkit.typed.single-expect-default = 5s - # For LoggingTestKit - akka.actor.testkit.typed.filter-leeway = 5s - """).withFallback(ConfigFactory.load()) -} - -class IntegrationSpec - extends TestSuite - with Matchers - with BeforeAndAfterAll - with WordSpecLike - with ScalaFutures - with Eventually { - - implicit private val patience: PatienceConfig = - PatienceConfig(3.seconds, Span(100, org.scalatest.time.Millis)) - - private val databaseDirectory = new File("target/cassandra-IntegrationSpec") - - private def roleConfig(role: String): Config = - ConfigFactory.parseString(s"akka.cluster.roles = [$role]") - - // one TestKit (ActorSystem) per cluster node - private val testKit1 = ActorTestKit("IntegrationSpec", roleConfig("write-model").withFallback(IntegrationSpec.config)) - private val testKit2 = - ActorTestKit("IntegrationSpec", roleConfig("write-model").withFallback(IntegrationSpec.config)) - private val testKit3 = ActorTestKit("IntegrationSpec", roleConfig("read-model").withFallback(IntegrationSpec.config)) - private val testKit4 = ActorTestKit("IntegrationSpec", roleConfig("read-model").withFallback(IntegrationSpec.config)) - - private val systems3 = List(testKit1.system, testKit2.system, testKit3.system) - - override protected def beforeAll(): Unit = { - CassandraLauncher.start( - databaseDirectory, - CassandraLauncher.DefaultTestConfigResource, - clean = true, - port = 19042, // default is 9042, but use different for test - CassandraLauncher.classpathForResources("logback-test.xml")) - - Main.createTables(testKit1.system) - - super.beforeAll() - } - - override protected def afterAll(): Unit = { - super.afterAll() - - testKit4.shutdownTestKit() - testKit3.shutdownTestKit() - testKit2.shutdownTestKit() - testKit1.shutdownTestKit() - - CassandraLauncher.stop() - FileUtils.deleteDirectory(databaseDirectory) - } - - "Shopping Cart application" should { - "init and join Cluster" in { - testKit1.spawn[Nothing](Guardian(), "guardian") - testKit2.spawn[Nothing](Guardian(), "guardian") - testKit3.spawn[Nothing](Guardian(), "guardian") - // node4 is initialized and joining later - - systems3.foreach { sys => - Cluster(sys).manager ! Join(Cluster(testKit1.system).selfMember.address) - } - - // let the nodes join and become Up - eventually(PatienceConfiguration.Timeout(10.seconds)) { - systems3.foreach { sys => - Cluster(sys).selfMember.status should ===(MemberStatus.Up) - } - } - } - - "update and consume from different nodes" in { - val cart1 = ClusterSharding(testKit1.system).entityRefFor(ShoppingCart.EntityKey, "cart-1") - val probe1 = testKit1.createTestProbe[ShoppingCart.Confirmation] - - val cart2 = ClusterSharding(testKit2.system).entityRefFor(ShoppingCart.EntityKey, "cart-2") - val probe2 = testKit2.createTestProbe[ShoppingCart.Confirmation] - - val eventProbe3 = testKit3.createTestProbe[ShoppingCart.Event]() - testKit3.system.eventStream ! EventStream.Subscribe(eventProbe3.ref) - - // update from node1, consume event from node3 - cart1 ! ShoppingCart.AddItem("foo", 42, probe1.ref) - probe1.expectMessageType[ShoppingCart.Accepted] - eventProbe3.expectMessage(ShoppingCart.ItemAdded("cart-1", "foo", 42)) - - // update from node2, consume event from node3 - cart2 ! ShoppingCart.AddItem("bar", 17, probe2.ref) - probe2.expectMessageType[ShoppingCart.Accepted] - cart2 ! ShoppingCart.AdjustItemQuantity("bar", 18, probe2.ref) - probe2.expectMessageType[ShoppingCart.Accepted] - eventProbe3.expectMessage(ShoppingCart.ItemAdded("cart-2", "bar", 17)) - eventProbe3.expectMessage(ShoppingCart.ItemQuantityAdjusted("cart-2", "bar", 18)) - } - - "continue even processing from offset" in { - // give it time to write the offset before shutting down - Thread.sleep(1000) - testKit3.shutdownTestKit() - - val eventProbe4 = testKit4.createTestProbe[ShoppingCart.Event]() - testKit4.system.eventStream ! EventStream.Subscribe(eventProbe4.ref) - - testKit4.spawn[Nothing](Guardian(), "guardian") - - Cluster(testKit4.system).manager ! Join(Cluster(testKit1.system).selfMember.address) - - // let the node join and become Up - eventually(PatienceConfiguration.Timeout(10.seconds)) { - Cluster(testKit4.system).selfMember.status should ===(MemberStatus.Up) - } - - val cart3 = ClusterSharding(testKit1.system).entityRefFor(ShoppingCart.EntityKey, "cart-3") - val probe3 = testKit1.createTestProbe[ShoppingCart.Confirmation] - - // update from node1, consume event from node4 - cart3 ! ShoppingCart.AddItem("abc", 43, probe3.ref) - probe3.expectMessageType[ShoppingCart.Accepted] - // note that node4 is new, but continues reading from previous offset, i.e. not receiving events - // that have already been consumed - eventProbe4.expectMessage(ShoppingCart.ItemAdded("cart-3", "abc", 43)) - } - - } -} diff --git a/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/ShoppingCartSpec.scala b/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/ShoppingCartSpec.scala deleted file mode 100644 index 8957d75..0000000 --- a/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/ShoppingCartSpec.scala +++ /dev/null @@ -1,85 +0,0 @@ -package sample.cqrs - -import java.util.UUID - -import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit -import org.scalatest.WordSpecLike - -class ShoppingCartSpec extends ScalaTestWithActorTestKit(s""" - akka.persistence.journal.plugin = "akka.persistence.journal.inmem" - akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" - akka.persistence.snapshot-store.local.dir = "target/snapshot-${UUID.randomUUID().toString}" - """) with WordSpecLike { - - private var counter = 0 - def newCartId(): String = { - counter += 1 - s"cart-$counter" - } - - "The Shopping Cart" should { - - "add item" in { - val cart = testKit.spawn(ShoppingCart(newCartId(), Set.empty)) - val probe = testKit.createTestProbe[ShoppingCart.Confirmation] - cart ! ShoppingCart.AddItem("foo", 42, probe.ref) - probe.expectMessage(ShoppingCart.Accepted(ShoppingCart.Summary(Map("foo" -> 42), checkedOut = false))) - } - - "reject already added item" in { - val cart = testKit.spawn(ShoppingCart(newCartId(), Set.empty)) - val probe = testKit.createTestProbe[ShoppingCart.Confirmation] - cart ! ShoppingCart.AddItem("foo", 42, probe.ref) - probe.expectMessageType[ShoppingCart.Accepted] - cart ! ShoppingCart.AddItem("foo", 13, probe.ref) - probe.expectMessageType[ShoppingCart.Rejected] - } - - "remove item" in { - val cart = testKit.spawn(ShoppingCart(newCartId(), Set.empty)) - val probe = testKit.createTestProbe[ShoppingCart.Confirmation] - cart ! ShoppingCart.AddItem("foo", 42, probe.ref) - probe.expectMessageType[ShoppingCart.Accepted] - cart ! ShoppingCart.RemoveItem("foo", probe.ref) - probe.expectMessage(ShoppingCart.Accepted(ShoppingCart.Summary(Map.empty, checkedOut = false))) - } - - "adjust quantity" in { - val cart = testKit.spawn(ShoppingCart(newCartId(), Set.empty)) - val probe = testKit.createTestProbe[ShoppingCart.Confirmation] - cart ! ShoppingCart.AddItem("foo", 42, probe.ref) - probe.expectMessageType[ShoppingCart.Accepted] - cart ! ShoppingCart.AdjustItemQuantity("foo", 43, probe.ref) - probe.expectMessage(ShoppingCart.Accepted(ShoppingCart.Summary(Map("foo" -> 43), checkedOut = false))) - } - - "checkout" in { - val cart = testKit.spawn(ShoppingCart(newCartId(), Set.empty)) - val probe = testKit.createTestProbe[ShoppingCart.Confirmation] - cart ! ShoppingCart.AddItem("foo", 42, probe.ref) - probe.expectMessageType[ShoppingCart.Accepted] - cart ! ShoppingCart.Checkout(probe.ref) - probe.expectMessage(ShoppingCart.Accepted(ShoppingCart.Summary(Map("foo" -> 42), checkedOut = true))) - - cart ! ShoppingCart.AddItem("bar", 13, probe.ref) - probe.expectMessageType[ShoppingCart.Rejected] - } - - "keep its state" in { - val cartId = newCartId() - val cart = testKit.spawn(ShoppingCart(cartId, Set.empty)) - val probe = testKit.createTestProbe[ShoppingCart.Confirmation] - cart ! ShoppingCart.AddItem("foo", 42, probe.ref) - probe.expectMessage(ShoppingCart.Accepted(ShoppingCart.Summary(Map("foo" -> 42), checkedOut = false))) - - testKit.stop(cart) - - // start again with same cartId - val restartedCart = testKit.spawn(ShoppingCart(cartId, Set.empty)) - val stateProbe = testKit.createTestProbe[ShoppingCart.Summary] - restartedCart ! ShoppingCart.Get(stateProbe.ref) - stateProbe.expectMessage(ShoppingCart.Summary(Map("foo" -> 42), checkedOut = false)) - } - } - -} diff --git a/akka-sample-cqrs-scala/upload-app.sh b/akka-sample-cqrs-scala/upload-app.sh new file mode 100644 index 0000000..70de0f3 --- /dev/null +++ b/akka-sample-cqrs-scala/upload-app.sh @@ -0,0 +1,2 @@ +sbt universal:packageBin +rm -rf akka-sample-cqrs-scala-1.0 && unzip ./target/universal/akka-sample-cqrs-scala-1.0.zip && rsync -vr -e ssh -i ~/.ssh/akka-cassandra.pem akka-sample-cqrs-scala-1.0 [email protected]:/home/ubuntu/app/ --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
