A lot of things have been fixed since that version. Try latest. See readme for version info.
/Patrik > On 6 Sep 2017, at 16:19, Sergei Akhapkin <[email protected]> wrote: > > Hi folks, > > I've a question: did someone experienced Akka persistence query on Cassandra > to be stalled without any symptoms ? > > My setup is: > Akka Persistence - 2.4.18 > Akka Cassandra Persistence - 0.11 > 3 x Cassandra - 3.7 > > I run a persistence query as shown below inside an actor (actor is supervised > and restarted if exited): > > // events generated by this publisher are expected to be not tagged by the > adapter > val (switch, stream) = readJournal.eventsByTag(tag.toString, initialUuid) > .mapAsync(writer.parallelism) { envelope => > // ... process event here end-ups with envelope.offset > } > .mapAsync(1) { offset => self ? SetOffset(offset) } > .recover { case e => > logger.error(s"the stream $persistenceId has completed with error, > stopping...", e) > context.stop(self) > } > .viaMat(KillSwitches.single)(Keep.right) > .toMat(Sink.ignore)(Keep.both) > .run() > > > Nothing unusual I hope, but I think, I should use Sink.actorRef here but > anyway. > > So my problem is quiet rarely (I suspect it was 2-3 times in this year, so I > don't know how to reproduce) event processing is stopped without any symptoms > (actor is alive, nothing interesting/relevant in app/cassandra log messages, > nothing I was able to find). After restart it works normally again. > > I tried to dig into Cassandra Persistence and I've question about: > > https://github.com/akka/akka-persistence-cassandra/blob/master/core/src/main/scala/akka/persistence/cassandra/query/EventsByTagFetcher.scala > (line 83 in latest codebase): > > override def preStart(): Unit = { > val boundStmt = preparedSelect.ps.bind(tag, timeBucket.key, fromOffset, > toOffset, selectLimit: Integer) > boundStmt.setFetchSize(settings.fetchSize) > val init: Future[ResultSet] = > preparedSelect.session.executeAsync(boundStmt) > init.map(InitResultSet.apply).pipeTo(self) > } > > What should happened if session.executeAsync will throw exception or return > failed future ? Or it's impossible case ? > > -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ > >>>>>>>>>> Check the FAQ: > >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > To post to this group, send email to [email protected]. > Visit this group at https://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
