Hi Akka Team,
Using the akka cluster samples, I defined a cluster router (group) and I
want to validate my cluster using unit test.
The problem is that the master receive the job (an Akka message) to execute
and when I send the job to the workers, it forwards to the dead lettters :
[JVM-1] [WARN] [SECURITY][03/30/2017 17:48:05.969] [FGSClusterSpec-akka.
remote.default-remote-dispatcher-15]
[akka.serialization.Serialization(akka://FGSClusterSpec)]
Using the default Java serializer for class
[com.bioserenity.fgs.actors.ClusterClientActor$JobDump] which is not
recommended because of performance implications. Use another serializer or
disable this warning using the setting
'akka.actor.warn-about-java-serializer-usage'
[JVM-3] [INFO] [03/30/2017 17:48:05.984] [FGSClusterSpec-akka.actor.default-
dispatcher-16] [akka.tcp://FGSClusterSpec@localhost:2553/user/master]
Master akka://FGSClusterSpec/user/master receive following job:
JobDump(7aa7c95c-654a-4dd2-8bd2-721f591a838f,Some(0),Some(10000000))
[JVM-3] [WARN] [03/30/2017 17:48:05.987] [FGSClusterSpec-akka.actor.default-
dispatcher-16] [akka://FGSClusterSpec/deadLetters] received dead letter
from Actor[akka://FGSClusterSpec/user/master#876373281]:
JobDump(7aa7c95c-654a-4dd2-8bd2-721f591a838f,Some(0),Some(10000000))
[JVM-3] [INFO] [03/30/2017 17:48:05.989] [FGSClusterSpec-akka.actor.default-
dispatcher-19] [akka://FGSClusterSpec/deadLetters] Message
[com.bioserenity.fgs.actors.ClusterClientActor$JobDump] from
Actor[akka://FGSClusterSpec/user/master#876373281] to
Actor[akka://FGSClusterSpec/deadLetters] was not delivered. [1] dead
letters encountered. This logging can be turned off or adjusted with
configuration settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.
My Master code :
/**
* Master actors of all workers of the cluster
* @param totalInstance number of workers by node
* @param routees workers path
*/
case class Master
(
totalInstance : Int,
routees : scala.collection.immutable.Seq[String]
) extends Actor with ActorLogging {
/**
* Defined in the code the router instead to file configuration. The main
* reason of this choice is easier to create routees (workers) path.
*/
val routerType = RoundRobinGroup(paths = routees.toList)
val clusterGroupSettings =
ClusterRouterGroupSettings(
totalInstances = totalInstance,
routeesPaths = routees,
allowLocalRoutees = false,
useRole = Some("workers"))
val clusterGroup = ClusterRouterGroup(routerType,clusterGroupSettings).
props()
val workerRouter = context.actorOf(clusterGroup, name="workerRouter")
log.info("Create Router {} with following routees: {}", workerRouter.path,
routees.mkString("---"))
def receive = {
case jDump : JobDump =>
log.info("Master {} receive following job: {}", self.path, jDump)
workerRouter ! jDump
}
}
My Worker :
/**
* @param awsBucket bucket to store device data
* @param gapMinValueMicroSec min value to not exceed between bcp packets
* @param persistentCache cache storing temporary device data
* @param referentialService service to communicate with referential
* @param bffIO instance of bff to write chunks
* @return Props of worker actors
*/
case class Worker
(
awsBucket : String,
gapMinValueMicroSec : Long,
persistentCache: PersistentCache[DeviceId, BCPPackets],
referentialService: IReferentialService,
bffIO : BFF_IO
) extends Actor with ActorLogging {
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
log.info(
"""
|
**********************************************************************
| Actor {} restart !
| Reason: {}
| Message: {}
""".stripMargin, self.path, reason, message)
}
def receive = {
case jDump: JobDump =>
val replyTo = sender()
log.info("Worker {} receive following job: {}", self.path, jDump)
val jobDumpResult = this.executeJobDump(jDump)
this.sendJobResultToSender(jDump, jobDumpResult, replyTo)
}
}
In the logs, the worker is never reached and maybe the main reason is that
my worker is not created to be used in the Cluster. But below you can see
that I'm creating the worker before the master so I cannot fully understand
why my worker is never reached.
object FGSClusterSpecConfig extends MultiNodeConfig {
Logger.getLogger("akka").setLevel(Level.INFO)
// register the named roles (nodes) of the test
val first = role("first")
val second = role("second")
val third = role("thrid")
def nodeList = Seq(first, second, third)
val portOffset = 2551
val numberOfPorts = nodeList.size
val portList = (portOffset to (portOffset+numberOfPorts)).toList
nodeList
.zip(portList)
.foreach { case (role, port) =>
nodeConfig(role) {
ConfigFactory.parseString(s"""
# Define port for each node configuration
akka.remote.netty.tcp.port = $port
# Disable legacy metrics in akka-cluster.
akka.cluster.metrics.enabled=off
# Enable metrics extension in akka-cluster-metrics.
akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"]
# Sigar native library extract location during tests.
akka.cluster.metrics.native-library-extract-folder=target/native/${role.name}
""")
}
}
commonConfig(ConfigFactory.parseString(
"""
akka.actor.provider = cluster
akka.loglevel=INFO
akka.stdout-logleve=INFO
akka.actor.debug.receive=off
""".stripMargin))
}
class FGSSpecMultiJvmNode1 extends FGSClusterSpec
class FGSSpecMultiJvmNode2 extends FGSClusterSpec
class FGSSpecMultiJvmNode3 extends FGSClusterSpec
abstract class FGSClusterSpec extends MultiNodeSpec(FGSClusterSpecConfig)
with WordSpecLike
with Matchers
with BeforeAndAfterAll
with ImplicitSender
with MockitoSugar {
import FGSClusterSpecConfig._
override def initialParticipants: Int = roles.size
val mockPersistentCache = mock[PersistentCache[DeviceId, BCPPackets]]
val mockReferential = mock[IReferentialService]
val workerProps =
Worker.props(awsBucket = "foobar",
gapMinValueMicroSec = 1e9.toLong,
persistentCache = mockPersistentCache,
referentialService = mockReferential,
bffIO = mock[BFF_IO]
)
override def beforeAll() = multiNodeSpecBeforeAll()
override def afterAll() = multiNodeSpecAfterAll()
"A FGS cluster" must {
"illustrate how to startup cluster" in within(15 seconds) {
Cluster(system).subscribe(testActor, classOf[MemberUp])
expectMsgClass(classOf[CurrentClusterState])
val firstAddress = node(first).address
val secondAddress = node(second).address
val thirdAddress = node(third).address
Cluster(system) join firstAddress
val workerActor = system.actorOf(workerProps, name = "worker")
println("workerActor path " + workerActor.path)
val routees = scala.collection.immutable.Seq("/user/worker")
system.actorOf(Master.props(100,routees), name = "master")
receiveN(3).collect { case MemberUp(m) => m.address }.toSet should be(
Set(firstAddress, secondAddress, thirdAddress))
Cluster(system).unsubscribe(testActor)
testConductor.enter("all-up")
}
"execute a job dump from one node" in within(15 seconds) {
val master = system.actorSelection(node(third) / "user" / "master")
/**
* Job Dump settings
*/
val deviceId = TestUtils.uuid
val pw = new PrintWriter("/tmp/test")
pw.write(deviceId)
pw.close
val from = Some(0L)
val to = Some(10000000L)
val jobDump = JobDump(deviceId, from, to)
awaitAssert {
/**
* Mock the job dump receive by the master and forwar to a worker
*/
mockJobDump(jobDump)
/**
* Send the job to the master
*/
master ! jobDump
/**
* Worker must return a JobSuccess
*/
expectMsg(JobSuccess(jobDump))
}
testConductor.enter("done-3")
}
}
def mockJobDump(jDump : JobDump) : Unit = {
/**
* Metadata settings
*/
val bcpRecord = TestUtils.createTestBcpRecord
val bonjour = DeviceConfigs.recordToBonjour(jDump.deviceId,bcpRecord)
val startStream =
DeviceConfigs.recordToStartStream(jDump.deviceId,bcpRecord)
val metadata = Some((bonjour, startStream))
/**
* Note that each bcp packet contain one sample
*/
val BCP_PACKETS_BY_SENSOR = 10
val bcpPackets =
TestUtils.createDummyBCPPackets(bcpRecord, BCP_PACKETS_BY_SENSOR)
/**
* Mock get metadata function of cache : setup his behavior like it
* returning metadata of a device
*/
TestUtils
.mockGetMetadataOfCache(mockPersistentCache, jDump.deviceId, metadata)
/**
* Mock getData function of cache : setup his behavior like it
returning
* data from the cache
*/
TestUtils
.mockGetData(mockPersistentCache,jDump.deviceId, jDump.from, jDump.to,
bcpPackets)
}
}
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.