I am using Akka persistence to remember a sequence number from an event 
stream. The code in my receiveCommand that writes to the Cassandra journal 
is below:

persistAsync(MessageProcessed(message.eventSequenceNumber, 
mdMessage.trackingId, message.eventTimeStamp)) {
  mesgProcessed =>
    {
      log.debug(s"Processing message for $organization, last message seqNum: 
${state.messageSequence} as ${state.time}")
      if ((mesgProcessed.messageSequence > state.messageSequence) && 
!config.getBoolean(ComputeConfigKeys.devMode)) {
        if (mesgProcessed.messageSequence != state.messageSequence + 1) {
          log.warning("Event Sequence is not in order, last event id ='" + 
state.messageSequence + "', this events sequence id = '" + 
mesgProcessed.messageSequence + "'")
          context.parent ! OrganizationReset(organization)
        }
        context.parent ! MDMessage(mdContext, mdMessage)
        state = mesgProcessed
      } else {
        context.parent ! MDMessage(mdContext, mdMessage)
        state = mesgProcessed
      }
    }
}


In low message volume this code works exactly as expect. This week we 
started ramping up the load and eventually the system failed throwing this 
exception:

[report.compute.md.CtipsMessageDistributorClient$MessageProcessed] with 
sequence number [2767] for persistenceId [/messaging/hawks/cti].
java.util.concurrent.ExecutionException: 
com.datastax.driver.core.exceptions.InvalidQueryException: Batch too large
    at 
com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299)
    at 
com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286)
    at 
com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
    at 
akka.persistence.cassandra.package$$anon$1$$anonfun$run$1.apply(package.scala:17)
    at scala.util.Try$.apply(Try.scala:192)
    at akka.persistence.cassandra.package$$anon$1.run(package.scala:17)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
    at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:409)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: Batch 
too large
    at 
com.datastax.driver.core.Responses$Error.asException(Responses.java:136)
    at 
com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:179)
    at 
com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:184)
    at 
com.datastax.driver.core.RequestHandler.access$2500(RequestHandler.java:43)
    at 
com.datastax.driver.core.RequestHandler$SpeculativeExecution.setFinalResult(RequestHandler.java:798)
    at 
com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:617)
    at 
com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1005)
    at 
com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:928)
    at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
    at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
    at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
    at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
    at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
    at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
    at 
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:277)
    at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:264)
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
    at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
    at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:962)
    at 
io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:879)
    at 
io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:360)
    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:276)
    at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
    at java.lang.Thread.run(Thread.java:745)

Has anyone experience this? Is this a Cassandra tuning issue our should I 
drop the 'persistAsync' in favor of the more traditional persist call? 

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to