Hi folks,

I've a question: did someone experienced Akka persistence query on 
Cassandra to be stalled without any symptoms ?

My setup is:
Akka Persistence - 2.4.18
Akka Cassandra Persistence - 0.11
3 x Cassandra - 3.7

I run a persistence query as shown below inside an actor (actor is 
supervised and restarted if exited):

// events generated by this publisher are expected to be not tagged by the 
adapter
val (switch, stream) = readJournal.eventsByTag(tag.toString, initialUuid)
 .mapAsync(writer.parallelism) { envelope => 
   // ... process event here end-ups with envelope.offset
 }
 .mapAsync(1) { offset => self ? SetOffset(offset) }
 .recover { case e =>
   logger.error(s"the stream $persistenceId has completed with error, 
stopping...", e)
   context.stop(self)
 } 
 .viaMat(KillSwitches.single)(Keep.right)
 .toMat(Sink.ignore)(Keep.both)
 .run() 


Nothing unusual I hope, but I think, I should use Sink.actorRef here but 
anyway.

So my problem is quiet rarely (I suspect it was 2-3 times in this year, so 
I don't know how to reproduce) event processing is stopped without any 
symptoms (actor is alive, nothing interesting/relevant in app/cassandra log 
messages, nothing I was able to find). After restart it works normally 
again.

I tried to dig into Cassandra Persistence and I've question about:
 
https://github.com/akka/akka-persistence-cassandra/blob/master/core/src/main/scala/akka/persistence/cassandra/query/EventsByTagFetcher.scala
 
(line 83 in latest codebase):

 override def preStart(): Unit = {
   val boundStmt = preparedSelect.ps.bind(tag, timeBucket.key, fromOffset, 
toOffset, selectLimit: Integer)
   boundStmt.setFetchSize(settings.fetchSize)
   val init: Future[ResultSet] = preparedSelect.session.executeAsync(
boundStmt)
   init.map(InitResultSet.apply).pipeTo(self)
 }

What should happened if session.executeAsync will throw exception or return 
failed future ? Or it's impossible case ?

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to