This is an automated email from the ASF dual-hosted git repository. fanningpj pushed a commit to branch wip-28011-sharding-rolling-update-patriknw in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git
commit 2ac8b4ff3a43395e1ee592c9a1ad1f038b0ba5d1 Author: Patrik Nordwall <[email protected]> AuthorDate: Mon Oct 21 15:08:34 2019 +0200 test rolling update latency --- akka-sample-sharding-scala/build.sbt | 4 +- .../src/main/resources/application.conf | 13 ++++-- .../src/main/resources/logback.xml | 21 ++++++++++ .../src/main/scala/sample/sharding/Device.scala | 16 ++++++-- .../src/main/scala/sample/sharding/Devices.scala | 47 ++++++++++++++++------ .../main/scala/sample/sharding/ShardingApp.scala | 5 ++- 6 files changed, 85 insertions(+), 21 deletions(-) diff --git a/akka-sample-sharding-scala/build.sbt b/akka-sample-sharding-scala/build.sbt index 32145af..73aff9d 100644 --- a/akka-sample-sharding-scala/build.sbt +++ b/akka-sample-sharding-scala/build.sbt @@ -1,7 +1,7 @@ import com.typesafe.sbt.SbtMultiJvm.multiJvmSettings import com.typesafe.sbt.SbtMultiJvm.MultiJvmKeys.MultiJvm -val akkaVersion = "2.6.0-M4" +val akkaVersion = "2.6.0-RC1" lazy val `akka-sample-sharding-scala` = project .in(file(".")) @@ -21,6 +21,8 @@ lazy val `akka-sample-sharding-scala` = project libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion, "com.typesafe.akka" %% "akka-serialization-jackson" % akkaVersion, + "com.typesafe.akka" %% "akka-slf4j" % akkaVersion, + "ch.qos.logback" % "logback-classic" % "1.2.3", "org.scalatest" %% "scalatest" % "3.0.7" % Test ), mainClass in (Compile, run) := Some("sample.sharding.ShardingApp"), diff --git a/akka-sample-sharding-scala/src/main/resources/application.conf b/akka-sample-sharding-scala/src/main/resources/application.conf index fc64a62..853f3c9 100644 --- a/akka-sample-sharding-scala/src/main/resources/application.conf +++ b/akka-sample-sharding-scala/src/main/resources/application.conf @@ -1,5 +1,7 @@ akka { - loglevel = INFO + loglevel = DEBUG + loggers = ["akka.event.slf4j.Slf4jLogger"] + logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" actor { provider = cluster @@ -21,9 +23,12 @@ akka { "akka://[email protected]:2551", "akka://[email protected]:2552"] - # auto downing is NOT safe for production deployments. - # you may want to use it during development, read more about it in the docs. - auto-down-unreachable-after = 10s + + min-nr-of-members = 3 + + log-info-verbose = on } } + +sample.sending-temperatures = on diff --git a/akka-sample-sharding-scala/src/main/resources/logback.xml b/akka-sample-sharding-scala/src/main/resources/logback.xml new file mode 100644 index 0000000..bc42d63 --- /dev/null +++ b/akka-sample-sharding-scala/src/main/resources/logback.xml @@ -0,0 +1,21 @@ +<?xml version="1.0" encoding="UTF-8"?> +<configuration> + + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>[%date{ISO8601}] [%level] [%logger] [%thread] [%X{akkaSource}] - %msg%n</pattern> + </encoder> + </appender> + + <appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender"> + <queueSize>1024</queueSize> + <neverBlock>true</neverBlock> + <appender-ref ref="STDOUT" /> + </appender> + + <logger name="akka.cluster.sharding" level="DEBUG" /> + + <root level="INFO"> + <appender-ref ref="ASYNC"/> + </root> +</configuration> diff --git a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Device.scala b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Device.scala index 19b0488..1e166a2 100644 --- a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Device.scala +++ b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Device.scala @@ -10,9 +10,12 @@ import akka.actor._ object Device { sealed trait Command extends Message - case class RecordTemperature(deviceId: Int, temperature: Double) + case class RecordTemperature(deviceId: Int, temperature: Double, startTime: Long, seqNr: Long) extends Command + case class RecordTemperatureAck(deviceId: Int, startTime: Long, seqNr: Long) + extends Command + case class GetTemperature(deviceId: Int) extends Command case class Temperature(deviceId: Int, @@ -27,15 +30,18 @@ object Device { class Device extends Actor with ActorLogging { import Device._ + log.info("Starting Device {}", self.path.name) + override def receive = counting(Vector.empty) def counting(values: Vector[Double]): Receive = { - case RecordTemperature(id, temp) => + case RecordTemperature(id, temp, startTime, seqNr) => val temperatures = values :+ temp log.info( s"Recording temperature $temp for device $id, average is ${average(temperatures)} after " + - s"${temperatures.size} readings" + s"${temperatures.size} readings. SeqNr [$seqNr]" ) + sender() ! RecordTemperatureAck(id, startTime, seqNr) context.become(counting(temperatures)) case GetTemperature(id) => @@ -51,4 +57,8 @@ class Device extends Actor with ActorLogging { private def average(values: Vector[Double]): Double = if (values.isEmpty) Double.NaN else values.sum / values.size + + override def postStop(): Unit = { + log.info("Stopped Device {}", self.path.name) + } } diff --git a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala index 6cbe318..d5a8b22 100644 --- a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala +++ b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala @@ -12,6 +12,8 @@ object Devices { case object ReadTemperatures + case object Start + def props(): Props = Props(new Devices) } @@ -20,14 +22,14 @@ class Devices extends Actor with ActorLogging with Timers { import Devices._ private val extractEntityId: ShardRegion.ExtractEntityId = { - case msg @ Device.RecordTemperature(id, _) => (id.toString, msg) + case msg @ Device.RecordTemperature(id, _,_, _) => (id.toString, msg) case msg @ Device.GetTemperature(id) => (id.toString, msg) } - private val numberOfShards = 100 + private val numberOfShards = 10 private val extractShardId: ShardRegion.ExtractShardId = { - case Device.RecordTemperature(id, _) => + case Device.RecordTemperature(id, _, _, _) => (math.abs(id.hashCode) % numberOfShards).toString case Device.GetTemperature(id) => (math.abs(id.hashCode) % numberOfShards).toString @@ -45,23 +47,44 @@ class Devices extends Actor with ActorLogging with Timers { ) val random = new Random() - val numberOfDevices = 50 + val numberOfDevices = 10 - timers.startTimerWithFixedDelay(UpdateDevice, UpdateDevice, 1.second) - timers.startTimerWithFixedDelay( - ReadTemperatures, - ReadTemperatures, - 15.seconds - ) + if (context.system.settings.config.getBoolean("sample.sending-temperatures")) { + timers.startSingleTimer(Start, Start, 20.seconds) + } + + private var seqNr = 0L + private var sequenceNumbers = Map.empty[Long, Int] def receive = { + case Start => + timers.startTimerWithFixedDelay(UpdateDevice, UpdateDevice, 100.millis) + timers.startTimerWithFixedDelay( + ReadTemperatures, + ReadTemperatures, + 15.seconds + ) + case UpdateDevice => - val deviceId = random.nextInt(numberOfDevices) + seqNr += 1 + //val deviceId = random.nextInt(numberOfDevices) + val deviceId = (seqNr % numberOfDevices).toInt + sequenceNumbers = sequenceNumbers.updated(seqNr, deviceId) val temperature = 5 + 30 * random.nextDouble() - val msg = Device.RecordTemperature(deviceId, temperature) + val msg = Device.RecordTemperature(deviceId, temperature, System.nanoTime(), seqNr) log.info(s"Sending $msg") deviceRegion ! msg + case Device.RecordTemperatureAck(deviceId, startTime, seqNr) => + val durationMs = (System.nanoTime() - startTime) / 1000 / 1000 + if (durationMs > 500) + log.info("Delayed ack of device {} seqNr {} after {} ms", deviceId, seqNr, durationMs) + else + log.info("Ack of device {} seqNr {} after {} ms", deviceId, seqNr, durationMs) + + sequenceNumbers -= seqNr + log.info("Pending sequence numbers: {}", sequenceNumbers) + case ReadTemperatures => (0 to numberOfDevices).foreach { deviceId => deviceRegion ! Device.GetTemperature(deviceId) diff --git a/akka-sample-sharding-scala/src/main/scala/sample/sharding/ShardingApp.scala b/akka-sample-sharding-scala/src/main/scala/sample/sharding/ShardingApp.scala index 897ccf5..fa91ce7 100644 --- a/akka-sample-sharding-scala/src/main/scala/sample/sharding/ShardingApp.scala +++ b/akka-sample-sharding-scala/src/main/scala/sample/sharding/ShardingApp.scala @@ -18,7 +18,10 @@ object ShardingApp { ports foreach { port => // Override the configuration of the port val config = ConfigFactory - .parseString("akka.remote.artery.canonical.port=" + port) + .parseString(s""" + akka.remote.artery.canonical.port=$port + sample.sending-temperatures = ${port == "2555"} + """) .withFallback(ConfigFactory.load()) // Create an Akka system --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
