I am using Akka persistence to remember a sequence number from an event
stream. The code in my receiveCommand that writes to the Cassandra journal
is below:
persistAsync(MessageProcessed(message.eventSequenceNumber,
mdMessage.trackingId, message.eventTimeStamp)) {
mesgProcessed =>
{
log.debug(s"Processing message for $organization, last message seqNum:
${state.messageSequence} as ${state.time}")
if ((mesgProcessed.messageSequence > state.messageSequence) &&
!config.getBoolean(ComputeConfigKeys.devMode)) {
if (mesgProcessed.messageSequence != state.messageSequence + 1) {
log.warning("Event Sequence is not in order, last event id ='" +
state.messageSequence + "', this events sequence id = '" +
mesgProcessed.messageSequence + "'")
context.parent ! OrganizationReset(organization)
}
context.parent ! MDMessage(mdContext, mdMessage)
state = mesgProcessed
} else {
context.parent ! MDMessage(mdContext, mdMessage)
state = mesgProcessed
}
}
}
In low message volume this code works exactly as expect. This week we
started ramping up the load and eventually the system failed throwing this
exception:
[report.compute.md.CtipsMessageDistributorClient$MessageProcessed] with
sequence number [2767] for persistenceId [/messaging/hawks/cti].
java.util.concurrent.ExecutionException:
com.datastax.driver.core.exceptions.InvalidQueryException: Batch too large
at
com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299)
at
com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286)
at
com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
at
akka.persistence.cassandra.package$$anon$1$$anonfun$run$1.apply(package.scala:17)
at scala.util.Try$.apply(Try.scala:192)
at akka.persistence.cassandra.package$$anon$1.run(package.scala:17)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:409)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: Batch
too large
at
com.datastax.driver.core.Responses$Error.asException(Responses.java:136)
at
com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:179)
at
com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:184)
at
com.datastax.driver.core.RequestHandler.access$2500(RequestHandler.java:43)
at
com.datastax.driver.core.RequestHandler$SpeculativeExecution.setFinalResult(RequestHandler.java:798)
at
com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:617)
at
com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1005)
at
com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:928)
at
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
at
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
at
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
at
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:277)
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:264)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:962)
at
io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:879)
at
io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:360)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:276)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
at java.lang.Thread.run(Thread.java:745)
Has anyone experience this? Is this a Cassandra tuning issue our should I
drop the 'persistAsync' in favor of the more traditional persist call?
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.