This is an automated email from the ASF dual-hosted git repository. fanningpj pushed a commit to branch wip-28011-sharding-rolling-update-patriknw in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git
commit b305b1f43b93da3144840547ade0d31fda237db0 Author: Patrik Nordwall <[email protected]> AuthorDate: Tue Oct 29 09:23:24 2019 +0100 Try with Akka 2.5.26 --- akka-sample-sharding-scala/build.sbt | 4 +-- .../src/main/resources/application.conf | 7 ++++- .../src/main/scala/sample/sharding/Device.scala | 2 ++ .../src/main/scala/sample/sharding/Devices.scala | 30 ++++++++++++++-------- .../main/scala/sample/sharding/ShardingApp.scala | 2 +- 5 files changed, 30 insertions(+), 15 deletions(-) diff --git a/akka-sample-sharding-scala/build.sbt b/akka-sample-sharding-scala/build.sbt index 73aff9d..cce8b17 100644 --- a/akka-sample-sharding-scala/build.sbt +++ b/akka-sample-sharding-scala/build.sbt @@ -1,7 +1,7 @@ import com.typesafe.sbt.SbtMultiJvm.multiJvmSettings import com.typesafe.sbt.SbtMultiJvm.MultiJvmKeys.MultiJvm -val akkaVersion = "2.6.0-RC1" +val akkaVersion = "2.5.26" lazy val `akka-sample-sharding-scala` = project .in(file(".")) @@ -20,7 +20,7 @@ lazy val `akka-sample-sharding-scala` = project javaOptions in run ++= Seq("-Xms128m", "-Xmx1024m"), libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion, - "com.typesafe.akka" %% "akka-serialization-jackson" % akkaVersion, +// "com.typesafe.akka" %% "akka-serialization-jackson" % akkaVersion, "com.typesafe.akka" %% "akka-slf4j" % akkaVersion, "ch.qos.logback" % "logback-classic" % "1.2.3", "org.scalatest" %% "scalatest" % "3.0.7" % Test diff --git a/akka-sample-sharding-scala/src/main/resources/application.conf b/akka-sample-sharding-scala/src/main/resources/application.conf index 853f3c9..bfb5b46 100644 --- a/akka-sample-sharding-scala/src/main/resources/application.conf +++ b/akka-sample-sharding-scala/src/main/resources/application.conf @@ -7,13 +7,15 @@ akka { provider = cluster serialization-bindings { - "sample.sharding.Message" = jackson-cbor + #"sample.sharding.Message" = jackson-cbor } } # For the sample, just bind to loopback and do not allow access from the network # the port is overridden by the logic in main class remote.artery { + enabled = on + transport = tcp canonical.port = 0 canonical.hostname = 127.0.0.1 } @@ -26,6 +28,9 @@ akka { min-nr-of-members = 3 + gossip-interval = 100ms + leader-actions-interval = 100 ms + log-info-verbose = on } diff --git a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Device.scala b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Device.scala index 1e166a2..4451d47 100644 --- a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Device.scala +++ b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Device.scala @@ -59,6 +59,8 @@ class Device extends Actor with ActorLogging { else values.sum / values.size override def postStop(): Unit = { +// log.info("Stopping Device {}", self.path.name) +// Thread.sleep(200) // FIXME slow stopping log.info("Stopped Device {}", self.path.name) } } diff --git a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala index d5a8b22..d16079c 100644 --- a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala +++ b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala @@ -58,26 +58,34 @@ class Devices extends Actor with ActorLogging with Timers { def receive = { case Start => - timers.startTimerWithFixedDelay(UpdateDevice, UpdateDevice, 100.millis) - timers.startTimerWithFixedDelay( +// timers.startTimerWithFixedDelay(UpdateDevice, UpdateDevice, 100.millis) +// timers.startTimerWithFixedDelay( +// ReadTemperatures, +// ReadTemperatures, +// 15.seconds +// ) + timers.startPeriodicTimer(UpdateDevice, UpdateDevice, 200.millis) + timers.startPeriodicTimer( ReadTemperatures, ReadTemperatures, 15.seconds ) case UpdateDevice => - seqNr += 1 - //val deviceId = random.nextInt(numberOfDevices) - val deviceId = (seqNr % numberOfDevices).toInt - sequenceNumbers = sequenceNumbers.updated(seqNr, deviceId) - val temperature = 5 + 30 * random.nextDouble() - val msg = Device.RecordTemperature(deviceId, temperature, System.nanoTime(), seqNr) - log.info(s"Sending $msg") - deviceRegion ! msg + (1 to numberOfDevices).foreach {_ => + seqNr += 1 + //val deviceId = random.nextInt(numberOfDevices) + val deviceId = (seqNr % numberOfDevices).toInt + sequenceNumbers = sequenceNumbers.updated(seqNr, deviceId) + val temperature = 5 + 30 * random.nextDouble() + val msg = Device.RecordTemperature(deviceId, temperature, System.nanoTime(), seqNr) + log.info(s"Sending $msg") + deviceRegion ! msg + } case Device.RecordTemperatureAck(deviceId, startTime, seqNr) => val durationMs = (System.nanoTime() - startTime) / 1000 / 1000 - if (durationMs > 500) + if (durationMs > 300) log.info("Delayed ack of device {} seqNr {} after {} ms", deviceId, seqNr, durationMs) else log.info("Ack of device {} seqNr {} after {} ms", deviceId, seqNr, durationMs) diff --git a/akka-sample-sharding-scala/src/main/scala/sample/sharding/ShardingApp.scala b/akka-sample-sharding-scala/src/main/scala/sample/sharding/ShardingApp.scala index fa91ce7..932461c 100644 --- a/akka-sample-sharding-scala/src/main/scala/sample/sharding/ShardingApp.scala +++ b/akka-sample-sharding-scala/src/main/scala/sample/sharding/ShardingApp.scala @@ -20,7 +20,7 @@ object ShardingApp { val config = ConfigFactory .parseString(s""" akka.remote.artery.canonical.port=$port - sample.sending-temperatures = ${port == "2555"} + sample.sending-temperatures = ${port == "2553"} """) .withFallback(ConfigFactory.load()) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
