On Thu, Sep 7, 2017 at 11:12 AM, Sergei Akhapkin <[email protected]>
wrote:

> Thanks, of course I will try new version, but what about my second
> question (code in EventsByTagFetcher) - is it impossible case then
> statement's execution end-ups with failed future ?
>


If an exception is thrown the actor will be stopped and the stream is
failed.
If the future is completed with failure the pipeTo will send that as
Status.Failure, which is taken care of.

/Patrik


>
>
> On Thursday, September 7, 2017 at 9:50:26 AM UTC+3, Patrik Nordwall wrote:
>>
>> A lot of things have been fixed since that version. Try latest. See
>> readme for version info.
>>
>> /Patrik
>>
>> On 6 Sep 2017, at 16:19, Sergei Akhapkin <[email protected]> wrote:
>>
>> Hi folks,
>>
>> I've a question: did someone experienced Akka persistence query on
>> Cassandra to be stalled without any symptoms ?
>>
>> My setup is:
>> Akka Persistence - 2.4.18
>> Akka Cassandra Persistence - 0.11
>> 3 x Cassandra - 3.7
>>
>> I run a persistence query as shown below inside an actor (actor is
>> supervised and restarted if exited):
>>
>> // events generated by this publisher are expected to be not tagged by
>> the adapter
>> val (switch, stream) = readJournal.eventsByTag(tag.toString, initialUuid)
>>  .mapAsync(writer.parallelism) { envelope =>
>>    // ... process event here end-ups with envelope.offset
>>  }
>>  .mapAsync(1) { offset => self ? SetOffset(offset) }
>>  .recover { case e =>
>>    logger.error(s"the stream $persistenceId has completed with error,
>> stopping...", e)
>>    context.stop(self)
>>  }
>>  .viaMat(KillSwitches.single)(Keep.right)
>>  .toMat(Sink.ignore)(Keep.both)
>>  .run()
>>
>>
>> Nothing unusual I hope, but I think, I should use Sink.actorRef here but
>> anyway.
>>
>> So my problem is quiet rarely (I suspect it was 2-3 times in this year,
>> so I don't know how to reproduce) event processing is stopped without any
>> symptoms (actor is alive, nothing interesting/relevant in app/cassandra log
>> messages, nothing I was able to find). After restart it works normally
>> again.
>>
>> I tried to dig into Cassandra Persistence and I've question about:
>>
>> https://github.com/akka/akka-persistence-cassandra/blob/mast
>> er/core/src/main/scala/akka/persistence/cassandra/query/
>> EventsByTagFetcher.scala (line 83 in latest codebase):
>>
>>  override def preStart(): Unit = {
>>    val boundStmt = preparedSelect.ps.bind(tag, timeBucket.key, fromOffset
>> , toOffset, selectLimit: Integer)
>>    boundStmt.setFetchSize(settings.fetchSize)
>>    val init: Future[ResultSet] = preparedSelect.session.executeAsync(
>> boundStmt)
>>    init.map(InitResultSet.apply).pipeTo(self)
>>  }
>>
>> What should happened if session.executeAsync will throw exception or
>> return failed future ? Or it's impossible case ?
>>
>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
>> urrent/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to [email protected].
>> To post to this group, send email to [email protected].
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To post to this group, send email to [email protected].
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 

Patrik Nordwall
Akka Tech Lead
Lightbend <http://www.lightbend.com/> -  Reactive apps on the JVM
Twitter: @patriknw

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to