This is an automated email from the ASF dual-hosted git repository. fanningpj pushed a commit to branch revert-44-wip/orcart-example in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git
commit 0744270dd525b0a41ce11cc6a8446fc750d18495 Author: gosubpl <[email protected]> AuthorDate: Fri Jun 15 09:00:50 2018 +0200 alternate ShoppingCart based on ORMultiMap (#44) --- .../sample/distributeddata/ShoppingORCart.scala | 143 +++++++++++++++++++++ .../distributeddata/ShoppingORCartSpec.scala | 116 +++++++++++++++++ 2 files changed, 259 insertions(+) diff --git a/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingORCart.scala b/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingORCart.scala new file mode 100644 index 0000000..aefbf4e --- /dev/null +++ b/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingORCart.scala @@ -0,0 +1,143 @@ +package sample.distributeddata + +import java.util.UUID + +import scala.concurrent.duration._ +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.Props +import akka.cluster.Cluster +import akka.cluster.ddata.{DistributedData, ORMultiMap, ORMultiMapKey} + +import scala.collection.immutable.HashSet + +object ShoppingORCart { + import akka.cluster.ddata.Replicator._ + + def props(userId: String): Props = Props(new ShoppingORCart(userId)) + + case object GetCart + final case class ChangeItemQuantity(item: LineItem) + final case class RemoveItem(productId: String) + + final case class Cart(items: Set[LineItem]) + final case class LineItem(productId: String, title: String, quantity: Int) + final case class SingleLineItem(productId: String, title: String, uid: String) + + //#read-write-majority-orcart + private val timeout = 3.seconds + private val readMajority = ReadMajority(timeout) + private val writeMajority = WriteMajority(timeout) + //#read-write-majority-orcart + + def getUniqueId(implicit cluster: Cluster): String = { + cluster.selfUniqueAddress.toString + UUID.randomUUID().toString + } + +} + +class ShoppingORCart(userId: String) extends Actor { + import ShoppingORCart._ + import akka.cluster.ddata.Replicator._ + + val replicator = DistributedData(context.system).replicator + implicit val cluster = Cluster(context.system) + + val DataKey = ORMultiMapKey[String, SingleLineItem]("cart-" + userId) + + def receive = receiveGetCart + .orElse[Any, Unit](receiveAddItem) + .orElse[Any, Unit](receiveRemoveItem) + .orElse[Any, Unit](receiveOther) + + //#get-orcart + def receiveGetCart: Receive = { + case GetCart ⇒ + replicator ! Get(DataKey, readMajority, Some(sender())) + + case g @ GetSuccess(DataKey, Some(replyTo: ActorRef)) ⇒ + val data = g.get(DataKey) + val entrySet = data.entries.groupBy(_._1).map { + entry ⇒ LineItem(entry._2.values.head.head.productId, entry._2.values.head.head.title, entry._2.values.head.size) + }.toSet + val cart = Cart(entrySet) + replyTo ! cart + + case NotFound(DataKey, Some(replyTo: ActorRef)) ⇒ + replyTo ! Cart(Set.empty) + + case GetFailure(DataKey, Some(replyTo: ActorRef)) ⇒ + // ReadMajority failure, try again with local read + replicator ! Get(DataKey, ReadLocal, Some(replyTo)) + } + //#get-orcart + + //#add-item-orcart + def receiveAddItem: Receive = { + case cmd @ ChangeItemQuantity(item) ⇒ + val update = Update(DataKey, ORMultiMap.emptyWithValueDeltas[String, SingleLineItem], writeMajority, Some(cmd)) { + cart ⇒ updateCart(cart, item) + } + replicator ! update + } + //#add-item-orcart + + def updateCart(data: ORMultiMap[String, SingleLineItem], item: LineItem): ORMultiMap[String, SingleLineItem] = + data.get(item.productId) match { + case Some(entries) ⇒ + val existingQuantity = entries.size + if (existingQuantity == item.quantity) { + data + } else if (existingQuantity < item.quantity) { + var newEntries = HashSet[SingleLineItem]() + (1 to (item.quantity - existingQuantity)).foreach { _ ⇒ + newEntries = newEntries + SingleLineItem(item.productId, item.title, getUniqueId) + } + val ops = newEntries.foldLeft(data) { case (d, item) ⇒ d.addBinding(item.productId, item) } + ops + } else { + val existingItems = entries.toVector + val ops = (1 to (existingQuantity - item.quantity)).foldLeft(data) { + case (d, index) ⇒ + d.removeBinding(item.productId, existingItems(index - 1)) + } + ops + } + case None ⇒ + var items: Set[SingleLineItem] = new HashSet[SingleLineItem]() + (1 to item.quantity).foreach { _ ⇒ + items = items + SingleLineItem(item.productId, item.title, getUniqueId) + } + data + (item.productId → items) + } + + //#remove-item-orcart + def receiveRemoveItem: Receive = { + case cmd @ RemoveItem(productId) ⇒ + // Try to fetch latest from a majority of nodes first, since ORMap + // remove must have seen the item to be able to remove it. + replicator ! Get(DataKey, readMajority, Some(cmd)) + + case GetSuccess(DataKey, Some(RemoveItem(productId))) ⇒ + replicator ! Update(DataKey, ORMultiMap.emptyWithValueDeltas[String, SingleLineItem], writeMajority, None) { + _ - productId // be careful, use ORMultiMap.emptyWithValueDeltas for safety, LWWMap and vanilla ORMultiMap can result in merge anomaly + } + + case GetFailure(DataKey, Some(RemoveItem(productId))) ⇒ + // ReadMajority failed, fall back to best effort local value + replicator ! Update(DataKey, ORMultiMap.emptyWithValueDeltas[String, SingleLineItem], writeMajority, None) { + _ - productId + } + + case NotFound(DataKey, Some(RemoveItem(productId))) ⇒ + // nothing to remove + } + //#remove-item-orcart + + def receiveOther: Receive = { + case _: UpdateSuccess[_] | _: UpdateTimeout[_] ⇒ + // UpdateTimeout, will eventually be replicated + case e: UpdateFailure[_] ⇒ throw new IllegalStateException("Unexpected failure: " + e) + } + +} diff --git a/akka-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ShoppingORCartSpec.scala b/akka-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ShoppingORCartSpec.scala new file mode 100644 index 0000000..3a7ca2e --- /dev/null +++ b/akka-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ShoppingORCartSpec.scala @@ -0,0 +1,116 @@ +package sample.distributeddata + +import scala.concurrent.duration._ +import akka.cluster.Cluster +import akka.cluster.ddata.DistributedData +import akka.cluster.ddata.Replicator.GetReplicaCount +import akka.cluster.ddata.Replicator.ReplicaCount +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import com.typesafe.config.ConfigFactory + +object ShoppingORCartSpec extends MultiNodeConfig { + val node1 = role("node-1") + val node2 = role("node-2") + val node3 = role("node-3") + + commonConfig(ConfigFactory.parseString(""" + akka.loglevel = INFO + akka.actor.provider = "cluster" + akka.log-dead-letters-during-shutdown = off + """)) + +} + +class ShoppingORCartSpecMultiJvmNode1 extends ShoppingORCartSpec +class ShoppingORCartSpecMultiJvmNode2 extends ShoppingORCartSpec +class ShoppingORCartSpecMultiJvmNode3 extends ShoppingORCartSpec + +class ShoppingORCartSpec extends MultiNodeSpec(ShoppingORCartSpec) with STMultiNodeSpec with ImplicitSender { + import ShoppingORCartSpec._ + import ShoppingORCart._ + + override def initialParticipants = roles.size + + val cluster = Cluster(system) + val shoppingORCart = system.actorOf(ShoppingORCart.props("user-1")) + + def join(from: RoleName, to: RoleName): Unit = { + runOn(from) { + cluster join node(to).address + } + enterBarrier(from.name + "-joined") + } + + "Demo of a replicated shopping cart" must { + "join cluster" in within(20.seconds) { + join(node1, node1) + join(node2, node1) + join(node3, node1) + + awaitAssert { + DistributedData(system).replicator ! GetReplicaCount + expectMsg(ReplicaCount(roles.size)) + } + enterBarrier("after-1") + } + + "handle updates directly after start" in within(15.seconds) { + runOn(node2) { + shoppingORCart ! ShoppingORCart.ChangeItemQuantity(LineItem("1", "Apples", quantity = 2)) + shoppingORCart ! ShoppingORCart.ChangeItemQuantity(LineItem("2", "Oranges", quantity = 3)) + } + enterBarrier("updates-done") + + awaitAssert { + shoppingORCart ! ShoppingORCart.GetCart + val cart = expectMsgType[Cart] + cart.items should be(Set(LineItem("1", "Apples", quantity = 2), LineItem("2", "Oranges", quantity = 3))) + } + + enterBarrier("after-2") + } + + "handle updates from different nodes" in within(5.seconds) { + runOn(node2) { + shoppingORCart ! ShoppingORCart.ChangeItemQuantity(LineItem("1", "Apples", quantity = 5)) + shoppingORCart ! ShoppingORCart.RemoveItem("2") + } + runOn(node3) { + shoppingORCart ! ShoppingORCart.ChangeItemQuantity(LineItem("3", "Bananas", quantity = 4)) + } + enterBarrier("updates-done") + + awaitAssert { + shoppingORCart ! ShoppingORCart.GetCart + val cart = expectMsgType[Cart] + cart.items should be(Set(LineItem("1", "Apples", quantity = 5), LineItem("3", "Bananas", quantity = 4))) + } + + enterBarrier("after-3") + } + + "handle more updates from different nodes" in within(5.seconds) { + runOn(node2) { + shoppingORCart ! ShoppingORCart.ChangeItemQuantity(LineItem("1", "Apples", quantity = 2)) + } + runOn(node3) { + shoppingORCart ! ShoppingORCart.ChangeItemQuantity(LineItem("3", "Bananas", quantity = 6)) + } + enterBarrier("updates-done") + + awaitAssert { + shoppingORCart ! ShoppingORCart.GetCart + val cart = expectMsgType[Cart] + cart.items should be(Set(LineItem("1", "Apples", quantity = 2), LineItem("3", "Bananas", quantity = 6))) + } + + enterBarrier("after-3") + } + + } + +} + --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
