You can try to reduce cassandra-journal.max-message-batch-size https://github.com/akka/akka-persistence-cassandra/blob/master/src/main/resources/reference.conf#L188
Or try to increase the limit in Cassandra http://stackoverflow.com/questions/34699841/what-is-the-batch-limit-in-cassandra There is also a discussion here: https://github.com/akka/akka-persistence-cassandra/issues/96 Regards, Patrik On Thu, Oct 27, 2016 at 7:21 PM, Richard Ney <[email protected]> wrote: > I am using Akka persistence to remember a sequence number from an event > stream. The code in my receiveCommand that writes to the Cassandra journal > is below: > > persistAsync(MessageProcessed(message.eventSequenceNumber, > mdMessage.trackingId, message.eventTimeStamp)) { > mesgProcessed => > { > log.debug(s"Processing message for $organization, last message seqNum: > ${state.messageSequence} as ${state.time}") > if ((mesgProcessed.messageSequence > state.messageSequence) && > !config.getBoolean(ComputeConfigKeys.devMode)) { > if (mesgProcessed.messageSequence != state.messageSequence + 1) { > log.warning("Event Sequence is not in order, last event id ='" + > state.messageSequence + "', this events sequence id = '" + > mesgProcessed.messageSequence + "'") > context.parent ! OrganizationReset(organization) > } > context.parent ! MDMessage(mdContext, mdMessage) > state = mesgProcessed > } else { > context.parent ! MDMessage(mdContext, mdMessage) > state = mesgProcessed > } > } > } > > > In low message volume this code works exactly as expect. This week we > started ramping up the load and eventually the system failed throwing this > exception: > > [report.compute.md.CtipsMessageDistributorClient$MessageProcessed] with > sequence number [2767] for persistenceId [/messaging/hawks/cti]. > java.util.concurrent.ExecutionException: com.datastax.driver.core. > exceptions.InvalidQueryException: Batch too large > at com.google.common.util.concurrent.AbstractFuture$ > Sync.getValue(AbstractFuture.java:299) > at com.google.common.util.concurrent.AbstractFuture$ > Sync.get(AbstractFuture.java:286) > at com.google.common.util.concurrent.AbstractFuture.get( > AbstractFuture.java:116) > at akka.persistence.cassandra.package$$anon$1$$anonfun$run$ > 1.apply(package.scala:17) > at scala.util.Try$.apply(Try.scala:192) > at akka.persistence.cassandra.package$$anon$1.run(package.scala:17) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec( > AbstractDispatcher.scala:409) > at scala.concurrent.forkjoin.ForkJoinTask.doExec( > ForkJoinTask.java:260) > at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. > runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker( > ForkJoinPool.java:1979) > at scala.concurrent.forkjoin.ForkJoinWorkerThread.run( > ForkJoinWorkerThread.java:107) > Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: > Batch too large > at com.datastax.driver.core.Responses$Error.asException( > Responses.java:136) > at com.datastax.driver.core.DefaultResultSetFuture.onSet( > DefaultResultSetFuture.java:179) > at com.datastax.driver.core.RequestHandler.setFinalResult( > RequestHandler.java:184) > at com.datastax.driver.core.RequestHandler.access$2500( > RequestHandler.java:43) > at com.datastax.driver.core.RequestHandler$SpeculativeExecution. > setFinalResult(RequestHandler.java:798) > at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet( > RequestHandler.java:617) > at com.datastax.driver.core.Connection$Dispatcher. > channelRead0(Connection.java:1005) > at com.datastax.driver.core.Connection$Dispatcher. > channelRead0(Connection.java:928) > at io.netty.channel.SimpleChannelInboundHandler.channelRead( > SimpleChannelInboundHandler.java:105) > at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead( > AbstractChannelHandlerContext.java:292) > at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead( > AbstractChannelHandlerContext.java:278) > at io.netty.handler.timeout.IdleStateHandler.channelRead( > IdleStateHandler.java:266) > at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead( > AbstractChannelHandlerContext.java:292) > at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead( > AbstractChannelHandlerContext.java:278) > at io.netty.handler.codec.MessageToMessageDecoder.channelRead( > MessageToMessageDecoder.java:103) > at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead( > AbstractChannelHandlerContext.java:292) > at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead( > AbstractChannelHandlerContext.java:278) > at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead( > ByteToMessageDecoder.java:277) > at io.netty.handler.codec.ByteToMessageDecoder.channelRead( > ByteToMessageDecoder.java:264) > at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead( > AbstractChannelHandlerContext.java:292) > at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead( > AbstractChannelHandlerContext.java:278) > at io.netty.channel.DefaultChannelPipeline.fireChannelRead( > DefaultChannelPipeline.java:962) > at io.netty.channel.epoll.AbstractEpollStreamChannel$ > EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:879) > at io.netty.channel.epoll.EpollEventLoop.processReady( > EpollEventLoop.java:360) > at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:276) > at io.netty.util.concurrent.SingleThreadEventExecutor$2. > run(SingleThreadEventExecutor.java:112) > at java.lang.Thread.run(Thread.java:745) > > Has anyone experience this? Is this a Cassandra tuning issue our should I > drop the 'persistAsync' in favor of the more traditional persist call? > > -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ > >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/ > current/additional/faq.html > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > To post to this group, send email to [email protected]. > Visit this group at https://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. > -- Patrik Nordwall Akka Tech Lead Lightbend <http://www.lightbend.com/> - Reactive apps on the JVM Twitter: @patriknw -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
