Hi folks,
I've a question: did someone experienced Akka persistence query on
Cassandra to be stalled without any symptoms ?
My setup is:
Akka Persistence - 2.4.18
Akka Cassandra Persistence - 0.11
3 x Cassandra - 3.7
I run a persistence query as shown below inside an actor (actor is
supervised and restarted if exited):
// events generated by this publisher are expected to be not tagged by the
adapter
val (switch, stream) = readJournal.eventsByTag(tag.toString, initialUuid)
.mapAsync(writer.parallelism) { envelope =>
// ... process event here end-ups with envelope.offset
}
.mapAsync(1) { offset => self ? SetOffset(offset) }
.recover { case e =>
logger.error(s"the stream $persistenceId has completed with error,
stopping...", e)
context.stop(self)
}
.viaMat(KillSwitches.single)(Keep.right)
.toMat(Sink.ignore)(Keep.both)
.run()
Nothing unusual I hope, but I think, I should use Sink.actorRef here but
anyway.
So my problem is quiet rarely (I suspect it was 2-3 times in this year, so
I don't know how to reproduce) event processing is stopped without any
symptoms (actor is alive, nothing interesting/relevant in app/cassandra log
messages, nothing I was able to find). After restart it works normally
again.
I tried to dig into Cassandra Persistence and I've question about:
https://github.com/akka/akka-persistence-cassandra/blob/master/core/src/main/scala/akka/persistence/cassandra/query/EventsByTagFetcher.scala
(line 83 in latest codebase):
override def preStart(): Unit = {
val boundStmt = preparedSelect.ps.bind(tag, timeBucket.key, fromOffset,
toOffset, selectLimit: Integer)
boundStmt.setFetchSize(settings.fetchSize)
val init: Future[ResultSet] = preparedSelect.session.executeAsync(
boundStmt)
init.map(InitResultSet.apply).pipeTo(self)
}
What should happened if session.executeAsync will throw exception or return
failed future ? Or it's impossible case ?
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.