I solved my issue with changing the roles names ! 

Le jeudi 30 mars 2017 18:01:51 UTC+2, Kilic Ali-Firat a écrit :
>
> Hi Akka Team, 
>
> Using the akka cluster samples, I defined a cluster router (group) and I 
> want to validate my cluster using unit test. 
>
> The problem is that the master receive the job (an Akka message) to 
> execute and when I send the job to the workers, it forwards to the dead 
> lettters : 
>
>  [JVM-1] [WARN] [SECURITY][03/30/2017 17:48:05.969] [FGSClusterSpec-akka.
> remote.default-remote-dispatcher-15] [akka.serialization.Serialization(
> akka://FGSClusterSpec)] Using the default Java serializer for class 
> [com.bioserenity.fgs.actors.ClusterClientActor$JobDump] which is not 
> recommended because of performance implications. Use another serializer or 
> disable this warning using the setting 
> 'akka.actor.warn-about-java-serializer-usage'
> [JVM-3] [INFO] [03/30/2017 17:48:05.984] [FGSClusterSpec-akka.actor.
> default-dispatcher-16] [akka.tcp://FGSClusterSpec@localhost:2553/user/master] 
> Master akka://FGSClusterSpec/user/master receive following job: 
> JobDump(7aa7c95c-654a-4dd2-8bd2-721f591a838f,Some(0),Some(10000000))
> [JVM-3] [WARN] [03/30/2017 17:48:05.987] [FGSClusterSpec-akka.actor.
> default-dispatcher-16] [akka://FGSClusterSpec/deadLetters] received dead 
> letter from Actor[akka://FGSClusterSpec/user/master#876373281]: 
> JobDump(7aa7c95c-654a-4dd2-8bd2-721f591a838f,Some(0),Some(10000000))
> [JVM-3] [INFO] [03/30/2017 17:48:05.989] [FGSClusterSpec-akka.actor.
> default-dispatcher-19] [akka://FGSClusterSpec/deadLetters] Message 
> [com.bioserenity.fgs.actors.ClusterClientActor$JobDump] from 
> Actor[akka://FGSClusterSpec/user/master#876373281] to 
> Actor[akka://FGSClusterSpec/deadLetters] was not delivered. [1] dead 
> letters encountered. This logging can be turned off or adjusted with 
> configuration settings 'akka.log-dead-letters' and 
> 'akka.log-dead-letters-during-shutdown'.
>
>
> My Master code : 
>
>
> /**
>   * Master actors of all workers of the cluster
>   * @param totalInstance number of workers by node
>   * @param routees workers path
>   */
> case class Master
> (
>   totalInstance : Int,
>   routees : scala.collection.immutable.Seq[String]
> ) extends Actor with ActorLogging {
>
>
>   /**
>     * Defined in the code the router instead to file configuration. The 
> main
>     * reason of this choice is easier to create routees (workers) path.
>     */
>
>
>   val routerType = RoundRobinGroup(paths = routees.toList)
>   val clusterGroupSettings =
>     ClusterRouterGroupSettings(
>       totalInstances = totalInstance,
>       routeesPaths = routees,
>       allowLocalRoutees = false,
>       useRole = Some("workers"))
>   val clusterGroup = ClusterRouterGroup(routerType,clusterGroupSettings).
> props()
>   val workerRouter = context.actorOf(clusterGroup, name="workerRouter")
>
>
>   log.info("Create Router {} with following routees: {}", workerRouter.
> path, routees.mkString("---"))
>
>   def receive = {
>     case jDump : JobDump =>
>       log.info("Master {} receive following job: {}", self.path, jDump)
>       workerRouter ! jDump
>  }
> }
>
> My Worker : 
>
>
> /**
>   * @param awsBucket bucket to store device data
>   * @param gapMinValueMicroSec min value to not exceed between bcp packets
>   * @param persistentCache cache storing temporary device data
>   * @param referentialService service to communicate with referential
>   * @param bffIO instance of bff to write chunks
>   * @return Props of worker actors
>   */
> case class Worker
> (
>   awsBucket : String,
>   gapMinValueMicroSec : Long,
>   persistentCache: PersistentCache[DeviceId, BCPPackets],
>   referentialService: IReferentialService,
>   bffIO : BFF_IO
> ) extends Actor with ActorLogging {
>
>
>
>
>   override def preRestart(reason: Throwable, message: Option[Any]): Unit = 
> {
>     log.info(
>       """
>         | 
> **********************************************************************
>         | Actor {} restart !
>         | Reason: {}
>         | Message: {}
>       """.stripMargin, self.path, reason, message)
>   }
>
>
>   def receive = {
>     case jDump: JobDump =>
>       val replyTo = sender()
>       log.info("Worker {} receive following job: {}", self.path, jDump)
>       val jobDumpResult = this.executeJobDump(jDump)
>       this.sendJobResultToSender(jDump, jobDumpResult, replyTo)
>  }
> }
>
> In the logs, the worker is never reached and maybe the main reason is that 
> my worker is not created to be used in the Cluster. But below you can see 
> that I'm creating the worker before the master so I cannot fully understand 
> why my worker is never reached.  
>
>
> object FGSClusterSpecConfig extends MultiNodeConfig {
>
>
>   Logger.getLogger("akka").setLevel(Level.INFO)
>   // register the named roles (nodes) of the test
>   val first = role("first")
>   val second = role("second")
>   val third = role("thrid")
>
>
>   def nodeList = Seq(first, second, third)
>   val portOffset = 2551
>   val numberOfPorts = nodeList.size
>   val portList = (portOffset to (portOffset+numberOfPorts)).toList
>
>
>
>
>     nodeList
>       .zip(portList)
>       .foreach { case (role, port) =>
>       nodeConfig(role) {
>         ConfigFactory.parseString(s"""
>       # Define port for each node configuration
>       akka.remote.netty.tcp.port = $port
>       # Disable legacy metrics in akka-cluster.
>       akka.cluster.metrics.enabled=off
>       # Enable metrics extension in akka-cluster-metrics.
>       akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"]
>       # Sigar native library extract location during tests.
>       akka.cluster.metrics.native-library-extract-folder=target/native/${
> role.name}
>           """)
>       }
>     }
>
>
>   commonConfig(ConfigFactory.parseString(
>     """
>        akka.actor.provider = cluster
>        akka.loglevel=INFO
>        akka.stdout-logleve=INFO
>        akka.actor.debug.receive=off
>     """.stripMargin))
> }
>
>
> class FGSSpecMultiJvmNode1 extends FGSClusterSpec
> class FGSSpecMultiJvmNode2 extends FGSClusterSpec
> class FGSSpecMultiJvmNode3 extends FGSClusterSpec
>
>
> abstract class FGSClusterSpec extends MultiNodeSpec(FGSClusterSpecConfig)
>   with WordSpecLike
>   with Matchers
>   with BeforeAndAfterAll
>   with ImplicitSender
>   with MockitoSugar {
>
>
>
>
>   import FGSClusterSpecConfig._
>
>
>   override def initialParticipants: Int = roles.size
>
>
>   val mockPersistentCache = mock[PersistentCache[DeviceId, BCPPackets]]
>   val mockReferential = mock[IReferentialService]
>
>
>   val workerProps =
>     Worker.props(awsBucket = "foobar",
>       gapMinValueMicroSec = 1e9.toLong,
>       persistentCache = mockPersistentCache,
>       referentialService = mockReferential,
>       bffIO = mock[BFF_IO]
>     )
>
>   override def beforeAll() = multiNodeSpecBeforeAll()
>
>   override def afterAll() = multiNodeSpecAfterAll()
>
>
>   "A FGS cluster" must {
>     "illustrate how to startup cluster" in within(15 seconds) {
>       Cluster(system).subscribe(testActor, classOf[MemberUp])
>       expectMsgClass(classOf[CurrentClusterState])
>
>
>       val firstAddress = node(first).address
>       val secondAddress = node(second).address
>       val thirdAddress = node(third).address
>
>
>       Cluster(system) join firstAddress
>
>
>       val workerActor = system.actorOf(workerProps, name = "worker")
>       println("workerActor path " + workerActor.path)
>       val routees = scala.collection.immutable.Seq("/user/worker")
>       system.actorOf(Master.props(100,routees), name = "master")
>
>
>       receiveN(3).collect { case MemberUp(m) => m.address }.toSet should 
> be(
>         Set(firstAddress, secondAddress, thirdAddress))
>
>
>       Cluster(system).unsubscribe(testActor)
>       testConductor.enter("all-up")
>     }
>
>
>     "execute a job dump from one node" in within(15 seconds) {
>         val master = system.actorSelection(node(third) / "user" / "master"
> )
>
>
>         /**
>           * Job Dump settings
>           */
>         val deviceId = TestUtils.uuid
>         val pw = new PrintWriter("/tmp/test")
>         pw.write(deviceId)
>         pw.close
>         val from = Some(0L)
>         val to = Some(10000000L)
>         val jobDump = JobDump(deviceId, from, to)
>
>
>         awaitAssert {
>
>
>           /**
>             * Mock the job dump receive by the master and forwar to a 
> worker
>             */
>           mockJobDump(jobDump)
>
>
>           /**
>             * Send the job to the master
>             */
>           master ! jobDump
>
>
>           /**
>             * Worker must return a JobSuccess
>             */
>           expectMsg(JobSuccess(jobDump))
>         }
>       testConductor.enter("done-3")
>       }
>
>
>   }
>
>
>   def mockJobDump(jDump : JobDump) : Unit = {
>
>
>     /**
>       * Metadata settings
>       */
>     val bcpRecord = TestUtils.createTestBcpRecord
>     val bonjour = DeviceConfigs.recordToBonjour(jDump.deviceId,bcpRecord)
>     val startStream =
>       DeviceConfigs.recordToStartStream(jDump.deviceId,bcpRecord)
>     val metadata = Some((bonjour, startStream))
>     /**
>       * Note that each bcp packet contain one sample
>       */
>     val BCP_PACKETS_BY_SENSOR = 10
>     val bcpPackets =
>       TestUtils.createDummyBCPPackets(bcpRecord, BCP_PACKETS_BY_SENSOR)
>
>
>     /**
>       * Mock get metadata function of cache : setup his behavior like it
>       * returning metadata of a device
>       */
>     TestUtils
>     .mockGetMetadataOfCache(mockPersistentCache, jDump.deviceId, metadata)
>
>
>     /**
>       * Mock getData function of cache : setup his behavior like it 
> returning
>       * data from the cache
>       */
>     TestUtils
>       .mockGetData(mockPersistentCache,jDump.deviceId, jDump.from, jDump.
> to, bcpPackets)
>
>
>   }
>
>
> }
>
>
>
>
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to