Hi all, i'm trying to make reliable messaging based on example in the docs 
for my actors with MongoDB as journal. 
After restart many redelivery of already delivered messages are observed.

Reproducing example consist of three actors, all with reliable messaging, 
that pass message in chain: A->B->C. The last actor C tracks incoming 
messages, prints out every 1000'th message and detects redelivery.
Log of the first high load run:
[DEBUG] [09/04/2015 18:44:54.090] [main] [EventStream(akka://TestSystem)] 
logger log1-Logging$DefaultLogger started
[DEBUG] [09/04/2015 18:44:54.091] [main] [EventStream(akka://TestSystem)] 
Default Loggers started
[INFO] [09/04/2015 18:44:54.127] [main] [Remoting] Starting remoting
[INFO] [09/04/2015 18:44:54.399] [main] [Remoting] Remoting started; 
listening on addresses :[akka.tcp://[email protected]:5150]
[INFO] [09/04/2015 18:44:54.400] [main] [Remoting] Remoting now listens on 
addresses: [akka.tcp://[email protected]:5150]
Actor[akka://TestSystem/user/C#-1715447106] Recovery completed ========
Actor[akka://TestSystem/user/B#-929620679] Recovery completed ========
Actor[akka://TestSystem/user/A#1016936722] Recovery completed ========
[DEBUG] [09/04/2015 18:44:55.112] [TestSystem-akka-contrib-persistence-
dispatcher-16] [akka.serialization.Serialization(akka://TestSystem)] Using 
serializer[akka.persistence.serialization.MessageSerializer] for message 
[akka.persistence.PersistentRepr]
[DEBUG] [09/04/2015 18:44:55.126] [TestSystem-akka-contrib-persistence-
dispatcher-16] [akka.serialization.Serialization(akka://TestSystem)] Using 
serializer[akka.serialization.JavaSerializer] for message 
[test.ReliableMessaging$MsgSent]
[DEBUG] [09/04/2015 18:44:55.440] [TestSystem-akka-contrib-persistence-
dispatcher-27] [akka.serialization.Serialization(akka://TestSystem)] Using 
serializer[akka.serialization.JavaSerializer] for message 
[test.ReliableMessaging$MsgConfirmed]
1000
2000
3000
[DEBUG] [09/04/2015 18:45:04.531] [ForkJoinPool-3-worker-7] 
[akka://TestSystem/user/C] 
Unconfirmed messages count = 0             [DEBUG] [09/04/2015 
18:45:04.531] [ForkJoinPool-3-worker-7] [akka://TestSystem/user/A] 
Unconfirmed messages count = 7088
[DEBUG] [09/04/2015 18:45:04.531] [ForkJoinPool-3-worker-3] 
[akka://TestSystem/user/B] 
Unconfirmed messages count = 1660
...
...
[DEBUG] [09/04/2015 18:45:34.486] [ForkJoinPool-3-worker-5] 
[akka://TestSystem/user/B] 
Unconfirmed messages count = 20398
[DEBUG] [09/04/2015 18:45:34.487] [ForkJoinPool-3-worker-7] 
[akka://TestSystem/user/C] 
Unconfirmed messages count = 0
[DEBUG] [09/04/2015 18:45:34.496] [ForkJoinPool-3-worker-3] 
[akka://TestSystem/user/A] 
Unconfirmed messages count = 87261
55000
56000
Redelivery 1                    *<---- some messages not confirmed due to 
high load and** starts redelivery*
Redelivery 2
Redelivery 3
Redelivery 4
Redelivery 5
...
here comes lots of redelivery
...
Redelivery 99996
Redelivery 99997
Redelivery 99998
[DEBUG] [09/04/2015 18:47:24.493] [ForkJoinPool-3-worker-5] 
[akka://TestSystem/user/B] 
Unconfirmed messages count = 36212
Redelivery 99999
100000
[DEBUG] [09/04/2015 18:47:34.483] [ForkJoinPool-3-worker-3] 
[akka://TestSystem/user/C] 
Unconfirmed messages count = 0
[DEBUG] [09/04/2015 18:47:34.493] [ForkJoinPool-3-worker-3] 
[akka://TestSystem/user/A] 
Unconfirmed messages count = 0
[DEBUG] [09/04/2015 18:47:34.493] [ForkJoinPool-3-worker-3] 
[akka://TestSystem/user/B] 
Unconfirmed messages count = 12790
[DEBUG] [09/04/2015 18:47:44.475] [ForkJoinPool-3-worker-5] 
[akka://TestSystem/user/C] 
Unconfirmed messages count = 0
[DEBUG] [09/04/2015 18:47:44.485] [ForkJoinPool-3-worker-3] 
[akka://TestSystem/user/B] 
Unconfirmed messages count = 2470
[DEBUG] [09/04/2015 18:47:44.485] [ForkJoinPool-3-worker-5] 
[akka://TestSystem/user/A] 
Unconfirmed messages count = 0
[DEBUG] [09/04/2015 18:47:54.488] [ForkJoinPool-3-worker-3] 
[akka://TestSystem/user/C] 
Unconfirmed messages count = 0
[DEBUG] [09/04/2015 18:47:54.498] [ForkJoinPool-3-worker-3] 
[akka://TestSystem/user/B] 
Unconfirmed messages count = 0
[DEBUG] [09/04/2015 18:47:54.498] [ForkJoinPool-3-worker-7] 
[akka://TestSystem/user/A] 
Unconfirmed messages count = 0

Finally all messages will be confirmed.

After that i close progarm (CTRL-C) and start once again, but not sending 
messages
Log of the second run:
[DEBUG] [09/04/2015 18:48:43.539] [main] [EventStream(akka://TestSystem)] 
logger log1-Logging$DefaultLogger started
[DEBUG] [09/04/2015 18:48:43.539] [main] [EventStream(akka://TestSystem)] 
Default Loggers started
[INFO] [09/04/2015 18:48:43.660] [main] [Remoting] Starting remoting
[INFO] [09/04/2015 18:48:44.378] [main] [Remoting] Remoting started; 
listening on addresses :[akka.tcp://[email protected]:5150]
[INFO] [09/04/2015 18:48:44.380] [main] [Remoting] Remoting now listens on 
addresses: [akka.tcp://[email protected]:5150]
[DEBUG] [09/04/2015 18:48:45.346] [TestSystem-akka-contrib-persistence-
dispatcher-15] [akka.serialization.Serialization(akka://TestSystem)] Using 
serializer[akka.persistence.serialization.MessageSerializer] for message 
[akka.persistence.PersistentRepr]
[DEBUG] [09/04/2015 18:48:45.479] [TestSystem-akka-contrib-persistence-
dispatcher-16] [LocalActorRefProvider(akka://TestSystem)] resolve of path 
sequence [/user/A#1016936722] failed
[DEBUG] [09/04/2015 18:48:45.487] [TestSystem-akka-contrib-persistence-
dispatcher-16] [LocalActorRefProvider(akka://TestSystem)] resolve of path 
sequence [/user/A#1016936722] failed
[DEBUG] [09/04/2015 18:48:45.490] [TestSystem-akka-contrib-persistence-
dispatcher-16] [LocalActorRefProvider(akka://TestSystem)] resolve of path 
sequence [/user/C#-1715447106] failed
*and nearly 800 thousands lines of such "resolve of path sequence failed"*
*after that lines starts redelivery*
....
Redelivery 99994
Redelivery 99995
Redelivery 99996
Redelivery 99997
Redelivery 99998
Redelivery 99999
100000
[DEBUG] [09/04/2015 18:51:14.509] [ForkJoinPool-3-worker-1] 
[akka://TestSystem/user/A] 
Unconfirmed messages count = 0
[DEBUG] [09/04/2015 18:51:14.509] [ForkJoinPool-3-worker-1] 
[akka://TestSystem/user/C] 
Unconfirmed messages count = 0
[DEBUG] [09/04/2015 18:51:14.509] [ForkJoinPool-3-worker-5] 
[akka://TestSystem/user/B] 
Unconfirmed messages count = 0


The main question is: why redelivery of already confirmed messages happens 
after restart?

I tried different configurations with different number of messages and 
redeliver interval and noticed that strange redelivery not happens if first 
run is not under high load, i.e. without redelivery at all.

Am i misunderstand something? It is ok to me to have redelivery, im not 
suffer without backpressure, but i want to know what am i doing wrong.

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Attachment: build.sbt
Description: Binary data

akka {
  # Options: OFF, ERROR, WARNING, INFO, DEBUG
  loglevel = "DEBUG"

  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }
  remote {
    asdasd = asd
    enabled-transports = ["akka.remote.netty.tcp"]
    //log-sent-messages = on
    //log-received-messages = on
    netty.tcp {
      hostname = "127.0.0.1"
      port = 5150
      maximum-frame-size = 512000b
    }
  }
}

akka.persistence.at-least-once-delivery.redeliver-interval = 20s

# akka.persistence.journal.leveldb.dir = "persistence/journal"
# akka.persistence.snapshot-store.local.dir = "persistence/journal"

akka.persistence.journal.plugin = "akka-contrib-mongodb-persistence-journal"
akka.persistence.snapshot-store.plugin = 
"akka-contrib-mongodb-persistence-snapshot"
akka.contrib.persistence.mongodb.mongo.mongouri = 
"mongodb://192.168.1.9:27017/akka_persistence"

akka.contrib.persistence.mongodb.mongo.journal-collection = "persistent_journal"
akka.contrib.persistence.mongodb.mongo.journal-index = "journal_index"
akka.contrib.persistence.mongodb.mongo.snaps-collection = "persistent_snapshots"
akka.contrib.persistence.mongodb.mongo.snaps-index = "snaps_index"
akka.contrib.persistence.mongodb.mongo.journal-write-concern = "Acknowledged"

Attachment: ReliableMessaging.scala
Description: Binary data

Attachment: App.scala
Description: Binary data

Reply via email to