Hi Akka Team,
It's again me about the multi-nodes test that I'm doing to validate my
cluster. I solve my issues about Master-Worker communication but I have a
really weird behavior in my test. The below piece of code seems to be
execute 3 three times :
awaitAssert {
/**
* Send the job to the master
*/
master ! jobDump
/**
* Worker must return a JobSuccess
*/
checkAssertionsOnRecord(deviceId)
}
Just to be quick, I have two Actors (Master and Worker) with the following
implements :
case class Master(totalInstance : Int,routees : scala.collection.immutable.
Seq[String],nodeRole : Option[String]) extends Actor with ActorLogging {
val routerType = RoundRobinGroup(Nil)
val clusterGroupSettings = ClusterRouterGroupSettings(totalInstances =
totalInstance,routeesPaths = routees,allowLocalRoutees = true,useRole =
nodeRole)
val clusterGroup = ClusterRouterGroup(routerType,clusterGroupSettings).
props()
val workerRouter = context.actorOf(clusterGroup, name="workerRouter")
def receive = {
case jDump : JobDump =>
log.debug("Master {} receive from {} following job: {}", self.path,
sender(), jDump)
workerRouter ! jDump
}
}
case class Worker(awsBucket : String,gapMinValueMicroSec : Long,
persistentCache: PersistentCache[DeviceId, BCPPackets],referentialService:
IReferentialService,bffIO : BFF_IO) extends Actor with ActorLogging {
def receive = {
case jDump: JobDump =>
val replyTo = sender()
log.info("Worker {} receive following job: {}", self.path, jDump)
val jobDumpResult = this.executeJobDump(jDump)
this.sendJobResultToSender(jDump, jobDumpResult, replyTo)
}
}
To make easier the debug, I limit the test to one node (this is why I set
to to true allowLocalRoutees for the Master) like this :
object FGSClusterSpecConfig extends MultiNodeConfig with LazyLogging {
// register the named roles (nodes) of the test
val first = role("first")
def nodeList = Seq(first)
val portOffset = 2551
val numberOfPorts = nodeList.size
val portList = (portOffset to (portOffset+numberOfPorts)).toList
nodeList
.zip(portList)
.foreach { case (role, port) =>
nodeConfig(role) {
ConfigFactory.parseString(s"""
# Define port for each node configuration
akka.remote.netty.tcp.port = $port
# Disable legacy metrics in akka-cluster.
akka.cluster.metrics.enabled=off
# Enable metrics extension in akka-cluster-metrics.
akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"]
# Sigar native library extract location during tests.
akka.cluster.metrics.native-library-extract-folder=target/native/${role.name}
""")
}
}
commonConfig(ConfigFactory.parseString(
"""
akka.actor.provider = cluster
akka.loglevel=DEBUG
""".stripMargin))
}
class FGSSpecMultiJvmNode1 extends FGSClusterSpec
abstract class FGSClusterSpec extends MultiNodeSpec(FGSClusterSpecConfig)
with WordSpecLikewith Matcherswith BeforeAndAfterAllwith MockitoSugarwith
ImplicitSenderwith LazyLogging {
import FGSClusterSpecConfig._
override def initialParticipants: Int = roles.size
override def beforeAll() = multiNodeSpecBeforeAll()
override def afterAll() = multiNodeSpecAfterAll()
val mockReferential = mock[IReferentialService]
val workerProps =
Worker.props(awsBucket = Val.AWSBUCKET,
gapMinValueMicroSec = Val.dummyGapMinValueMicroSec,
persistentCache = Val.mockPersistentCache,
referentialService = mockReferential,
bffIO = Val.bffIO
)
"A FGS cluster" must {
"illustrate how to startup cluster" in within(15 seconds) {
Cluster(system).subscribe(testActor, classOf[MemberUp])
expectMsgClass(classOf[CurrentClusterState])
val firstAddress = node(first).address
Cluster(system) join firstAddress
val workerActor = system.actorOf(workerProps, name = "worker")
val routees = scala.collection.immutable.Seq("/user/worker")
system.actorOf(Master.props(1,routees, None), name = "master")
receiveN(roles.size)
.collect { case MemberUp(m) => m.address }
.toSet should be(Set(firstAddress))
Cluster(system).unsubscribe(testActor)
testConductor.enter("all-up")
}
"execute a job dump from one node" in within(15.seconds) {
log.info(
"""
|
********************************************************************
| BEGIN TEST
|
********************************************************************
""".stripMargin)
runOn(first) {
val master = system.actorSelection(node(first) / "user" / "master")
/**x
* Job Dump settings
*/
val deviceId = getClass.getSimpleName
val from = Some(0L)
val to = Some(10000000L)
val jobDump = JobDump(deviceId, from, to)
/**
* Mock the job dump receive by the master and forward to a worker
*/
val jobRemove = mockJobDump(jobDump)
mockRemoveData(jobRemove)
awaitAssert {
/**
* Send the job to the master
*/
master ! jobDump
/**
* Worker must return a JobSuccess
*/
checkAssertionsOnRecord(deviceId)
}
}
testConductor.enter("done-2")
}
}
And now in the logs, I see this :
[JVM-1] [DEBUG] [03/31/2017 15:36:37.888] [New I/O worker #10]
[akka.remote.testconductor.PlayerHandler(akka://FGSClusterSpec)] channel
[id: 0x1df9d48f, /127.0.0.1:54134 => /127.0.0.1:4711] written 13
[JVM-1] [DEBUG] [03/31/2017 15:36:37.889] [New I/O worker #7]
[akka.remote.testconductor.ConductorHandler(akka://FGSClusterSpec)] message
from /127.0.0.1:54134: GetAddress(RoleName(first))
[JVM-1] [DEBUG] [03/31/2017 15:36:37.889] [New I/O worker #10]
[akka.remote.testconductor.PlayerHandler(akka://FGSClusterSpec)] message
from /127.0.0.1:4711:
AddressReply(RoleName(first),akka.tcp://FGSClusterSpec@localhost:2551)
[JVM-1] [DEBUG] [03/31/2017 15:36:37.941] [FGSClusterSpec-akka.actor.default
-dispatcher-15] [akka.tcp://FGSClusterSpec@localhost:2551/user/master]
Master akka://FGSClusterSpec/user/master receive from
Actor[akka://FGSClusterSpec/system/testActor-1#1142810589] following job:
JobDump(FGSSpecMultiJvmNode1,Some(0),Some(10000000))
[JVM-1] [INFO] [03/31/2017 15:36:37.942] [FGSClusterSpec-akka.actor.default-
dispatcher-19] [akka.tcp://FGSClusterSpec@localhost:2551/user/worker]
Worker akka://FGSClusterSpec/user/worker receive following job:
JobDump(FGSSpecMultiJvmNode1,Some(0),Some(10000000))
[JVM-1] [DEBUG] [03/31/2017 15:36:38.608] [FGSClusterSpec-akka.actor.default
-dispatcher-19] [akka.tcp://FGSClusterSpec@localhost:2551/user/master]
Master akka://FGSClusterSpec/user/master receive from
Actor[akka://FGSClusterSpec/system/testActor-1#1142810589] following job:
JobDump(FGSSpecMultiJvmNode1,Some(0),Some(10000000))
[JVM-1] [INFO] [03/31/2017 15:36:38.609] [FGSClusterSpec-akka.actor.default-
dispatcher-19] [akka.tcp://FGSClusterSpec@localhost:2551/user/worker]
Worker akka://FGSClusterSpec/user/worker receive following job:
JobDump(FGSSpecMultiJvmNode1,Some(0),Some(10000000))
[JVM-1] [DEBUG] [03/31/2017 15:36:38.806] [FGSClusterSpec-akka.actor.default
-dispatcher-19] [akka.tcp://FGSClusterSpec@localhost:2551/user/master]
Master akka://FGSClusterSpec/user/master receive from
Actor[akka://FGSClusterSpec/system/testActor-1#1142810589] following job:
JobDump(FGSSpecMultiJvmNode1,Some(0),Some(10000000))
[JVM-1] [INFO] [03/31/2017 15:36:38.807] [FGSClusterSpec-akka.actor.default-
dispatcher-2] [akka.tcp://FGSClusterSpec@localhost:2551/user/worker] Worker
akka://FGSClusterSpec/user/worker receive following job:
JobDump(FGSSpecMultiJvmNode1,Some(0),Some(10000000))
The master receive *the same message three times* and I really cannot
understand why. I limit the tests to one node so I'm expecting to get only
one job to execute, not three like now. Is it the way to write my test
which is wrong or something else ?
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.