Hi Akka Team,

It's again me about the multi-nodes test that I'm doing to validate my 
cluster. I solve my issues about Master-Worker communication but I have a 
really weird behavior in my test. The below piece of code seems to be 
execute 3 three times : 

 
awaitAssert {


          /**
            * Send the job to the master
            */
          master ! jobDump


          /**
            * Worker must return a JobSuccess
            */
          checkAssertionsOnRecord(deviceId)


}

Just to be quick, I have two Actors (Master and Worker) with the following 
implements : 


case class Master(totalInstance : Int,routees : scala.collection.immutable.
Seq[String],nodeRole : Option[String]) extends Actor with ActorLogging {


  val routerType = RoundRobinGroup(Nil)
  val clusterGroupSettings = ClusterRouterGroupSettings(totalInstances = 
totalInstance,routeesPaths = routees,allowLocalRoutees = true,useRole = 
nodeRole)
  val clusterGroup = ClusterRouterGroup(routerType,clusterGroupSettings).
props()
  val workerRouter = context.actorOf(clusterGroup, name="workerRouter")


  def receive = {


    case jDump : JobDump =>
      log.debug("Master {} receive from {} following job: {}", self.path, 
sender(), jDump)
      workerRouter ! jDump
 }
}


case class Worker(awsBucket : String,gapMinValueMicroSec : Long,
persistentCache: PersistentCache[DeviceId, BCPPackets],referentialService: 
IReferentialService,bffIO : BFF_IO) extends Actor with ActorLogging {




  def receive = {
    case jDump: JobDump =>
      val replyTo = sender()
      log.info("Worker {} receive following job: {}", self.path, jDump)
      val jobDumpResult = this.executeJobDump(jDump)
      this.sendJobResultToSender(jDump, jobDumpResult, replyTo)
 }
}


To make easier the debug, I limit the test to one node (this is why I set 
to to true allowLocalRoutees for the Master) like this :


object FGSClusterSpecConfig extends MultiNodeConfig with LazyLogging {


  // register the named roles (nodes) of the test
  val first = role("first")
  def nodeList = Seq(first)
  val portOffset = 2551
  val numberOfPorts = nodeList.size
  val portList = (portOffset to (portOffset+numberOfPorts)).toList


    nodeList
      .zip(portList)
      .foreach { case (role, port) =>
      nodeConfig(role) {
        ConfigFactory.parseString(s"""
      # Define port for each node configuration
      akka.remote.netty.tcp.port = $port
      # Disable legacy metrics in akka-cluster.
      akka.cluster.metrics.enabled=off
      # Enable metrics extension in akka-cluster-metrics.
      akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"]
      # Sigar native library extract location during tests.
      
akka.cluster.metrics.native-library-extract-folder=target/native/${role.name}
          """)
      }
    }
  commonConfig(ConfigFactory.parseString(
    """
       akka.actor.provider = cluster
       akka.loglevel=DEBUG
    """.stripMargin))
}

class FGSSpecMultiJvmNode1 extends FGSClusterSpec

abstract class FGSClusterSpec extends MultiNodeSpec(FGSClusterSpecConfig)
with WordSpecLikewith Matcherswith BeforeAndAfterAllwith MockitoSugarwith 
ImplicitSenderwith LazyLogging {

  import FGSClusterSpecConfig._

  override def initialParticipants: Int = roles.size
  override def beforeAll() = multiNodeSpecBeforeAll()
  override def afterAll() = multiNodeSpecAfterAll()

  val mockReferential = mock[IReferentialService]

  val workerProps =
    Worker.props(awsBucket = Val.AWSBUCKET,
      gapMinValueMicroSec = Val.dummyGapMinValueMicroSec,
      persistentCache = Val.mockPersistentCache,
      referentialService = mockReferential,
      bffIO = Val.bffIO
    )

  "A FGS cluster" must {
    "illustrate how to startup cluster" in within(15 seconds) {
      Cluster(system).subscribe(testActor, classOf[MemberUp])
      expectMsgClass(classOf[CurrentClusterState])

      val firstAddress = node(first).address
      Cluster(system) join firstAddress

      val workerActor = system.actorOf(workerProps, name = "worker")
      val routees = scala.collection.immutable.Seq("/user/worker")
      system.actorOf(Master.props(1,routees, None), name = "master")
      
       receiveN(roles.size)
        .collect { case MemberUp(m) => m.address }
        .toSet should be(Set(firstAddress))


      Cluster(system).unsubscribe(testActor)
      testConductor.enter("all-up")
    }


    "execute a job dump from one node" in within(15.seconds) {
      log.info(
        """
          | 
********************************************************************
          | BEGIN TEST
          | 
********************************************************************
        """.stripMargin)


      runOn(first) {
        val master = system.actorSelection(node(first) / "user" / "master")


        /**x
          * Job Dump settings
          */
        val deviceId = getClass.getSimpleName
        val from = Some(0L)
        val to = Some(10000000L)
        val jobDump = JobDump(deviceId, from, to)


        /**
          * Mock the job dump receive by the master and forward to a worker
          */
        val jobRemove = mockJobDump(jobDump)


        mockRemoveData(jobRemove)


        awaitAssert {


          /**
            * Send the job to the master
            */
          master ! jobDump


          /**
            * Worker must return a JobSuccess
            */
          checkAssertionsOnRecord(deviceId)
        }
      }
      testConductor.enter("done-2")
    }
  }


And now in the logs, I see this : 

[JVM-1] [DEBUG] [03/31/2017 15:36:37.888] [New I/O worker #10] 
[akka.remote.testconductor.PlayerHandler(akka://FGSClusterSpec)] channel 
[id: 0x1df9d48f, /127.0.0.1:54134 => /127.0.0.1:4711] written 13
[JVM-1] [DEBUG] [03/31/2017 15:36:37.889] [New I/O worker #7] 
[akka.remote.testconductor.ConductorHandler(akka://FGSClusterSpec)] message 
from /127.0.0.1:54134: GetAddress(RoleName(first))
[JVM-1] [DEBUG] [03/31/2017 15:36:37.889] [New I/O worker #10] 
[akka.remote.testconductor.PlayerHandler(akka://FGSClusterSpec)] message 
from /127.0.0.1:4711: 
AddressReply(RoleName(first),akka.tcp://FGSClusterSpec@localhost:2551)
[JVM-1] [DEBUG] [03/31/2017 15:36:37.941] [FGSClusterSpec-akka.actor.default
-dispatcher-15] [akka.tcp://FGSClusterSpec@localhost:2551/user/master] 
Master akka://FGSClusterSpec/user/master receive from 
Actor[akka://FGSClusterSpec/system/testActor-1#1142810589] following job: 
JobDump(FGSSpecMultiJvmNode1,Some(0),Some(10000000))
[JVM-1] [INFO] [03/31/2017 15:36:37.942] [FGSClusterSpec-akka.actor.default-
dispatcher-19] [akka.tcp://FGSClusterSpec@localhost:2551/user/worker] 
Worker akka://FGSClusterSpec/user/worker receive following job: 
JobDump(FGSSpecMultiJvmNode1,Some(0),Some(10000000))
[JVM-1] [DEBUG] [03/31/2017 15:36:38.608] [FGSClusterSpec-akka.actor.default
-dispatcher-19] [akka.tcp://FGSClusterSpec@localhost:2551/user/master] 
Master akka://FGSClusterSpec/user/master receive from 
Actor[akka://FGSClusterSpec/system/testActor-1#1142810589] following job: 
JobDump(FGSSpecMultiJvmNode1,Some(0),Some(10000000))
[JVM-1] [INFO] [03/31/2017 15:36:38.609] [FGSClusterSpec-akka.actor.default-
dispatcher-19] [akka.tcp://FGSClusterSpec@localhost:2551/user/worker] 
Worker akka://FGSClusterSpec/user/worker receive following job: 
JobDump(FGSSpecMultiJvmNode1,Some(0),Some(10000000))
[JVM-1] [DEBUG] [03/31/2017 15:36:38.806] [FGSClusterSpec-akka.actor.default
-dispatcher-19] [akka.tcp://FGSClusterSpec@localhost:2551/user/master] 
Master akka://FGSClusterSpec/user/master receive from 
Actor[akka://FGSClusterSpec/system/testActor-1#1142810589] following job: 
JobDump(FGSSpecMultiJvmNode1,Some(0),Some(10000000))
[JVM-1] [INFO] [03/31/2017 15:36:38.807] [FGSClusterSpec-akka.actor.default-
dispatcher-2] [akka.tcp://FGSClusterSpec@localhost:2551/user/worker] Worker 
akka://FGSClusterSpec/user/worker receive following job: 
JobDump(FGSSpecMultiJvmNode1,Some(0),Some(10000000))

The master receive *the same message three times* and I really cannot 
understand why. I limit the tests to one node so I'm expecting to get only 
one job to execute, not three like now.  Is it the way to write my test 
which is wrong or something else ?

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to