Hello,
I have an Actor that receives an ActorRef in its constructor:
class TenantProcessor(equipmentProcessor: ActorRef) extends PersistentActor
And i'm trying make a multi node test, but it not works. My code is
val idExtractor: ShardRegion.IdExtractor = {
case msg: EvaluateCoordinate => (msg.coordinate.tenantId, msg)
case msg: AddOrganization => (msg.tenantId, msg)
case msg: AddEvent => (msg.tenantId, msg)
case msg: AddEventCheckSettings => (msg.tenantId, msg)
}
val shardResolver: ShardRegion.ShardResolver = {
case msg: EvaluateCoordinate => (msg.coordinate.tenantId.hashCode %
50).toString
case msg: AddOrganization => (msg.tenantId.hashCode % 50).toString
case msg: AddEvent => (msg.tenantId.hashCode % 50).toString
case msg: AddEventCheckSettings => (msg.tenantId.hashCode % 50).toString
}
And my test is:
runOn(dispatcher1){
val probe = TestProbe()
val tenantProcessor = ClusterSharding(system).start(
TenantProcessor.shardRegion,
Some(TenantProcessor.props(probe.ref)),
TenantProcessor.idExtractor,
TenantProcessor.shardResolver
)
tenantProcessor ! AddOrganization(tenantId1, organization1Tenant1)
tenantProcessor ! AddOrganization(tenantId1, organization2Tenant1)
tenantProcessor ! AddEvent(tenantId1, event1Tenant1)
tenantProcessor ! AddEvent(tenantId1, event2Tenant1)
tenantProcessor ! AddEventCheckSettings(tenantId1, SpeedViolationCheck,
eventCheckSettings1Tenant1)
tenantProcessor ! AddEventCheckSettings(tenantId1, RoutePathDeviationCheck,
eventCheckSettings2Tenant1)
val prepareEvents = PrepareEvents(coordinate, Set(event1Tenant1,
event2Tenant1), eventCheckSettingsMap)
tenantProcessor ! EvaluateCoordinate(coordinate)
probe.expectMsg(10 seconds, prepareEvents)
}
enterBarrier("after-3")
}
the test throws TimeoutException,
Is there some problem pass probe.ref to actor?
Log of mult-node-test:
multi-jvm:test
[info] * com.greenmile.event.EventLogProcessorSpecTest
[JVM-1] Run starting. Expected test count is: 3
[JVM-1] EventLogProcessorSpecTestMultiJvmNode1:
[JVM-1] EventLogOpened EventLogClosed
[JVM-2] Run starting. Expected test count is: 3
[JVM-2] EventLogProcessorSpecTestMultiJvmNode2:
[JVM-4] Run starting. Expected test count is: 3
[JVM-2] EventLogOpened EventLogClosed
[JVM-4] EventLogProcessorSpecTestMultiJvmNode4:
[JVM-4] EventLogOpened EventLogClosed
[JVM-3] Run starting. Expected test count is: 3
[JVM-3] EventLogProcessorSpecTestMultiJvmNode3:
[JVM-3] EventLogOpened EventLogClosed
[JVM-4] - must setup shared journal
[JVM-2] - must setup shared journal
[JVM-1] - must setup shared journal
[JVM-3] - must setup shared journal
[JVM-2] - must join cluster
[JVM-4] - must join cluster
[JVM-3] - must join cluster
[JVM-1] - must join cluster
[JVM-1] [WARN] [09/04/2015 09:40:49.660]
[EventLogProcessorSpecTest-akka.actor.default-dispatcher-4]
[akka://EventLogProcessorSpecTest/user/sharding/TenantProcessor%24Coordinator/singleton/coordinator]
received dead letter from
Actor[akka://EventLogProcessorSpecTest/user/sharding/TenantProcessor%24#1179122014]:
Register(Actor[akka://EventLogProcessorSpecTest/user/sharding/TenantProcessor%24#1179122014])
[JVM-1] - must be generated with two tenants, three equipments, two events
and parent organization *** FAILED ***
[JVM-1] java.lang.AssertionError: assertion failed: timeout (10 seconds)
during expectMsg while waiting for
PrepareEvents(Coordinate(1,2,1,80.0,Position(-3.7751698,-38.495213),Fri Sep
04 09:40:48 BRT 2015),Set(Event(1,1,Set(SpeedViolationCheck)),
Event(2,2,Set(RoutePathDeviationCheck))),Map(SpeedViolationCheck ->
SpeedViolationSettings(50.0), RoutePathDeviationCheck ->
RoutePathDeviationSettings(200.0)))
[JVM-1] at scala.Predef$.assert(Predef.scala:165)
[JVM-1] at
akka.testkit.TestKitBase$class.expectMsg_internal(TestKit.scala:338)
[JVM-1] at akka.testkit.TestKitBase$class.expectMsg(TestKit.scala:324)
[JVM-1] at akka.testkit.TestKit.expectMsg(TestKit.scala:718)
[JVM-1] at
com.greenmile.event.EventLogProcessorSpecTest$$anonfun$1$$anonfun$apply$mcV$sp$6$$anonfun$apply$mcV$sp$7.apply$mcV$sp(EventLogProcessorSpecTest.scala:167)
[JVM-1] at
akka.remote.testkit.MultiNodeSpec.runOn(MultiNodeSpec.scala:338)
[JVM-1] at
com.greenmile.event.EventLogProcessorSpecTest$$anonfun$1$$anonfun$apply$mcV$sp$6.apply$mcV$sp(EventLogProcessorSpecTest.scala:147)
[JVM-1] at
com.greenmile.event.EventLogProcessorSpecTest$$anonfun$1$$anonfun$apply$mcV$sp$6.apply(EventLogProcessorSpecTest.scala:145)
[JVM-1] at
com.greenmile.event.EventLogProcessorSpecTest$$anonfun$1$$anonfun$apply$mcV$sp$6.apply(EventLogProcessorSpecTest.scala:145)
[JVM-1] at
org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
[JVM-1] ...
[JVM-2] - must be generated with two tenants, three equipments, two events
and parent organization
[JVM-3] - must be generated with two tenants, three equipments, two events
and parent organization
[JVM-4] - must be generated with two tenants, three equipments, two events
and parent organization
[JVM-2] set 04, 2015 9:40:59 AM
org.jboss.netty.channel.DefaultChannelPipeline
[JVM-2] ADVERTĂNCIA: An exception was thrown by an exception handler.
[JVM-2] java.util.concurrent.RejectedExecutionException: Worker has already
been shutdown
[JVM-2] at
org.jboss.netty.channel.socket.nio.AbstractNioSelector.registerTask(AbstractNioSelector.java:120)
[JVM-2] at
org.jboss.netty.channel.socket.nio.AbstractNioWorker.executeInIoThread(AbstractNioWorker.java:72)
[JVM-2] at
org.jboss.netty.channel.socket.nio.NioWorker.executeInIoThread(NioWorker.java:36)
[JVM-2] at
org.jboss.netty.channel.socket.nio.AbstractNioWorker.executeInIoThread(AbstractNioWorker.java:56)
[JVM-2] at
org.jboss.netty.channel.socket.nio.NioWorker.executeInIoThread(NioWorker.java:36)
[JVM-2] at
org.jboss.netty.channel.socket.nio.AbstractNioChannelSink.execute(AbstractNioChannelSink.java:34)
[JVM-2] at
org.jboss.netty.channel.Channels.fireExceptionCaughtLater(Channels.java:496)
[JVM-2] at
org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:46)
[JVM-2] at
org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:54)
[JVM-2] at
org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:54)
[JVM-2] at
org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:54)
[JVM-2] at org.jboss.netty.channel.Channels.close(Channels.java:812)
[JVM-2] at
org.jboss.netty.channel.AbstractChannel.close(AbstractChannel.java:197)
[JVM-2] at
akka.remote.testconductor.ClientFSM$$anonfun$5.applyOrElse(Player.scala:266)
[JVM-2] at
akka.remote.testconductor.ClientFSM$$anonfun$5.applyOrElse(Player.scala:264)
[JVM-2] at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
[JVM-2] at akka.actor.FSM$class.terminate(FSM.scala:672)
[JVM-2] at akka.actor.FSM$class.postStop(FSM.scala:658)
[JVM-2] at
akka.remote.testconductor.ClientFSM.postStop(Player.scala:145)
[JVM-2] at akka.actor.Actor$class.aroundPostStop(Actor.scala:477)
[JVM-2] at
akka.remote.testconductor.ClientFSM.aroundPostStop(Player.scala:145)
[JVM-2] at
akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
[JVM-2] at
akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
[JVM-2] at akka.actor.ActorCell.terminate(ActorCell.scala:369)
[JVM-2] at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
[JVM-2] at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
[JVM-2] at
akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
[JVM-2] at akka.dispatch.Mailbox.run(Mailbox.scala:219)
[JVM-2] at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
[JVM-2] at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[JVM-2] at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[JVM-2] at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[JVM-2] at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[JVM-2]
[JVM-2] [WARN] [09/04/2015 09:40:59.761]
[EventLogProcessorSpecTest-akka.actor.default-dispatcher-16]
[akka://EventLogProcessorSpecTest/deadLetters] received dead letter from
Actor[akka://EventLogProcessorSpecTest/system/cluster/core/daemon#312241262]:
GossipStatus(UniqueAddress(akka.tcp://EventLogProcessorSpecTest@localhost:47584,-164915594),VectorClock(c28d4633c25b0f1bc011779f2668abd3
-> 6))
[JVM-1] [WARN] [09/04/2015 09:40:59.779]
[EventLogProcessorSpecTest-akka.actor.default-dispatcher-2]
[akka://EventLogProcessorSpecTest/user/controller/127.0.0.1:58267-server2]
received dead letter from
Actor[akka://EventLogProcessorSpecTest/deadLetters]: ClientDisconnected
[JVM-2] [WARN] [09/04/2015 09:40:59.775]
[EventLogProcessorSpecTest-akka.actor.default-dispatcher-6]
[akka://EventLogProcessorSpecTest/deadLetters] received dead letter from
Actor[akka://EventLogProcessorSpecTest/system/cluster/core/daemon/heartbeatSender#-1988843924]:
Heartbeat(akka.tcp://EventLogProcessorSpecTest@localhost:47584)
[JVM-2] [WARN] [09/04/2015 09:40:59.784]
[EventLogProcessorSpecTest-akka.actor.default-dispatcher-6]
[akka://EventLogProcessorSpecTest/deadLetters] received dead letter from
Actor[akka://EventLogProcessorSpecTest/system/cluster/core/daemon/heartbeatSender#-1988843924]:
Heartbeat(akka.tcp://EventLogProcessorSpecTest@localhost:47584)
[JVM-2] [WARN] [09/04/2015 09:40:59.807]
[EventLogProcessorSpecTest-akka.actor.default-dispatcher-6]
[akka://EventLogProcessorSpecTest/deadLetters] received dead letter from
Actor[akka://EventLogProcessorSpecTest/system/cluster/core/daemon/heartbeatSender#-1988843924]:
Heartbeat(akka.tcp://EventLogProcessorSpecTest@localhost:47584)
[JVM-4] [WARN] [09/04/2015 09:40:59.820]
[EventLogProcessorSpecTest-akka.actor.default-dispatcher-2]
[akka://EventLogProcessorSpecTest/deadLetters] received dead letter from
Actor[akka://EventLogProcessorSpecTest/system/cluster/core/daemon/heartbeatSender#281951575]:
Heartbeat(akka.tcp://EventLogProcessorSpecTest@localhost:50158)
[JVM-4] [WARN] [09/04/2015 09:40:59.822]
[EventLogProcessorSpecTest-akka.actor.default-dispatcher-2]
[akka://EventLogProcessorSpecTest/deadLetters] received dead letter from
Actor[akka://EventLogProcessorSpecTest/system/cluster/core/daemon/heartbeatSender#281951575]:
Heartbeat(akka.tcp://EventLogProcessorSpecTest@localhost:50158)
[JVM-4] [WARN] [09/04/2015 09:40:59.822]
[EventLogProcessorSpecTest-akka.actor.default-dispatcher-2]
[akka://EventLogProcessorSpecTest/deadLetters] received dead letter from
Actor[akka://EventLogProcessorSpecTest/system/cluster/core/daemon/heartbeatSender#281951575]:
Heartbeat(akka.tcp://EventLogProcessorSpecTest@localhost:50158)
[JVM-3] [WARN] [09/04/2015 09:40:59.838]
[EventLogProcessorSpecTest-akka.actor.default-dispatcher-4]
[akka://EventLogProcessorSpecTest/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FEventLogProcessorSpecTest%40localhost%3A47584-1/endpointWriter]
received dead letter from
Actor[akka://EventLogProcessorSpecTest/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FEventLogProcessorSpecTest%40localhost%3A47584-1/endpointWriter#-1663866776]:
AckIdleCheckTimer
[JVM-3] [WARN] [09/04/2015 09:40:59.841]
[EventLogProcessorSpecTest-akka.actor.default-dispatcher-4]
[akka://EventLogProcessorSpecTest/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FEventLogProcessorSpecTest%40localhost%3A50158-2/endpointWriter]
received dead letter from
Actor[akka://EventLogProcessorSpecTest/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FEventLogProcessorSpecTest%40localhost%3A50158-2/endpointWriter#896565979]:
AckIdleCheckTimer
[JVM-3] [WARN] [09/04/2015 09:40:59.841]
[EventLogProcessorSpecTest-akka.actor.default-dispatcher-4]
[akka://EventLogProcessorSpecTest/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FEventLogProcessorSpecTest%40localhost%3A41922-0/endpointWriter]
received dead letter from
Actor[akka://EventLogProcessorSpecTest/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FEventLogProcessorSpecTest%40localhost%3A41922-0/endpointWriter#-443303242]:
AckIdleCheckTimer
[JVM-3] [WARN] [09/04/2015 09:40:59.848]
[EventLogProcessorSpecTest-akka.actor.default-dispatcher-4]
[akka://EventLogProcessorSpecTest/deadLetters] received dead letter from
Actor[akka://EventLogProcessorSpecTest/system/cluster/core/daemon/heartbeatSender#-233589672]:
Heartbeat(akka.tcp://EventLogProcessorSpecTest@localhost:55763)
[JVM-3] [WARN] [09/04/2015 09:40:59.849]
[EventLogProcessorSpecTest-akka.actor.default-dispatcher-4]
[akka://EventLogProcessorSpecTest/deadLetters] received dead letter from
Actor[akka://EventLogProcessorSpecTest/system/cluster/core/daemon/heartbeatSender#-233589672]:
Heartbeat(akka.tcp://EventLogProcessorSpecTest@localhost:55763)
[JVM-3] [WARN] [09/04/2015 09:40:59.849]
[EventLogProcessorSpecTest-akka.actor.default-dispatcher-4]
[akka://EventLogProcessorSpecTest/deadLetters] received dead letter from
Actor[akka://EventLogProcessorSpecTest/system/cluster/core/daemon/heartbeatSender#-233589672]:
Heartbeat(akka.tcp://EventLogProcessorSpecTest@localhost:55763)
[JVM-4] Run completed in 15 seconds, 73 milliseconds.
[JVM-4] Total number of tests run: 3
[JVM-4] Suites: completed 1, aborted 0
[JVM-4] Tests: succeeded 3, failed 0, canceled 0, ignored 0, pending 0
[JVM-4] All tests passed.
[JVM-1] Run completed in 15 seconds, 231 milliseconds.
[JVM-1] Total number of tests run: 3
[JVM-1] Suites: completed 1, aborted 0
[JVM-1] Tests: succeeded 2, failed 1, canceled 0, ignored 0, pending 0
[JVM-1] *** 1 TEST FAILED ***
[JVM-3] Run completed in 15 seconds, 267 milliseconds.
[JVM-3] Total number of tests run: 3
[JVM-3] Suites: completed 1, aborted 0
[JVM-3] Tests: succeeded 3, failed 0, canceled 0, ignored 0, pending 0
[JVM-3] All tests passed.
[JVM-2] Run completed in 16 seconds, 309 milliseconds.
[JVM-2] Total number of tests run: 3
[JVM-2] Suites: completed 1, aborted 0
[JVM-2] Tests: succeeded 3, failed 0, canceled 0, ignored 0, pending 0
[JVM-2] All tests passed.
[error] Failed: com.greenmile.event.EventLogProcessorSpecTestMultiJvmNode1
[error] Failed: Total 4, Failed 1, Errors 0, Passed 3
[error] Failed tests:
[error] com.greenmile.event.EventLogProcessorSpecTest
[error] (multi-jvm:test) sbt.TestsFailedException: Tests unsuccessful
[error] Total time: 18 s, completed 04/09/2015 09:41:00
Complete code test:
object EventLogMultiNodeConfig extends MultiNodeConfig {
val dispatcher1 = role("dispatcher1")
val dispatcher2 = role("dispatcher2")
val backend1 = role("backend1")
val backend2 = role("backend2")
commonConfig(ConfigFactory.parseString("""
| akka.cluster.metrics.enabled=off
| akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
| akka.remote.log-remote-lifecycle-events = off
| akka.persistence.journal.plugin =
"akka.persistence.journal.leveldb-shared"
| akka.persistence.journal.leveldb-shared.store {
| native = off
| dir = "target/test-shared-journal"
| }
| akka.persistence.snapshot-store.plugin =
"akka.persistence.snapshot-store.local"
| akka.persistence.snapshot-store.local.dir = "target/test-snapshots"
|
""".stripMargin))
}
class EventLogProcessorSpecTestMultiJvmNode1 extends EventLogProcessorSpecTest
class EventLogProcessorSpecTestMultiJvmNode2 extends EventLogProcessorSpecTest
class EventLogProcessorSpecTestMultiJvmNode3 extends EventLogProcessorSpecTest
class EventLogProcessorSpecTestMultiJvmNode4 extends EventLogProcessorSpecTest
abstract class EventLogProcessorSpecTest extends
MultiNodeSpec(EventLogMultiNodeConfig)
with WordSpecLike with Matchers with BeforeAndAfterAll with ImplicitSender{
import EventLogMultiNodeConfig._
// TENANT 1
// TenantProcessor
val tenantId1 = "1"
val organization1Tenant1 = Organization(1, None)
val organization2Tenant1 = Organization(2, Some(1))
val event1Tenant1 = Event(1, 1, Set(SpeedViolationCheck))
val event2Tenant1 = Event(2, 2, Set(RoutePathDeviationCheck))
val eventCheckSettings1Tenant1 = SpeedViolationSettings(50)
val eventCheckSettings2Tenant1 = RoutePathDeviationSettings(200)
val eventCheckSettingsMap: Map[EventCheckType, EventCheckSettings] = Map(
SpeedViolationCheck -> eventCheckSettings1Tenant1,
RoutePathDeviationCheck -> eventCheckSettings2Tenant1
)
val equipmentId1Tenant1 = 1
val timestamp1Equipment1Tenant1 = new Date
val coordinate = Coordinate(tenantId1, organization2Tenant1.id,
equipmentId1Tenant1, 80, Position(-3.7751698F,-38.495211F),
timestamp1Equipment1Tenant1)
// EquipmentProcessor
val route = List(new Point(-3.7619644,-38.4853622), new
Point(-3.7867265,-38.4831774))
val encodedRoutePathTenant1 = PolylineEncoder.encode(route)
val storageLocations = List(
"akka.persistence.journal.leveldb.dir",
"akka.persistence.journal.leveldb-shared.store.dir",
"akka.persistence.snapshot-store.local.dir").map(s => new
File(system.settings.config.getString(s)))
override def initialParticipants = roles.size
override def beforeAll() = multiNodeSpecBeforeAll()
override def afterAll() = multiNodeSpecAfterAll()
override protected def atStartup() = {
runOn(dispatcher1) {
storageLocations.foreach(dir => FileUtils.deleteDirectory(dir))
}
}
override protected def afterTermination() = {
runOn(dispatcher1) {
storageLocations.foreach(dir => FileUtils.deleteDirectory(dir))
}
}
def join(from: RoleName, to: RoleName): Unit = {
runOn(from){
Cluster(system) join node(to).address
}
enterBarrier(from.name + "-joined")
}
"EventLogOpened EventLogClosed" must {
"setup shared journal" in {
Persistence(system)
runOn(dispatcher1) {
system.actorOf(Props[SharedLeveldbStore], "store")
}
enterBarrier("persistence-started")
runOn(dispatcher2, backend1, backend2) {
system.actorSelection(node(dispatcher1) / "user" / "store") !
Identify(None)
val sharedStore = expectMsgType[ActorIdentity].ref.get
SharedLeveldbJournal.setStore(sharedStore, system)
}
enterBarrier("after-1")
}
"join cluster" in within(15 seconds) {
join(dispatcher1,dispatcher1)
join(dispatcher2,dispatcher1)
join(backend1,dispatcher1)
join(backend2,dispatcher1)
}
"be generated with two tenants, three equipments, two events and parent
organization" in {
runOn(dispatcher1){
val probe = TestProbe()
val tenantProcessor = ClusterSharding(system).start(
TenantProcessor.shardRegion,
Some(TenantProcessor.props(probe.ref)),
TenantProcessor.idExtractor,
TenantProcessor.shardResolver
)
tenantProcessor ! AddOrganization(tenantId1, organization1Tenant1)
tenantProcessor ! AddOrganization(tenantId1, organization2Tenant1)
tenantProcessor ! AddEvent(tenantId1, event1Tenant1)
tenantProcessor ! AddEvent(tenantId1, event2Tenant1)
tenantProcessor ! AddEventCheckSettings(tenantId1, SpeedViolationCheck,
eventCheckSettings1Tenant1)
tenantProcessor ! AddEventCheckSettings(tenantId1,
RoutePathDeviationCheck, eventCheckSettings2Tenant1)
val prepareEvents = PrepareEvents(coordinate, Set(event1Tenant1,
event2Tenant1), eventCheckSettingsMap)
tenantProcessor ! EvaluateCoordinate(coordinate)
probe.expectMsg(10 seconds, prepareEvents)
}
enterBarrier("after-3")
}
}
}
I did a exaclty equals test without use multi node testkit and without use
shard and test works perfectly. I reaaly don't know what is happening.
Thank any help
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.