[
https://issues.apache.org/jira/browse/QPID-8590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17550850#comment-17550850
]
ASF GitHub Bot commented on QPID-8590:
--------------------------------------
dakirily opened a new pull request, #128:
URL: https://github.com/apache/qpid-broker-j/pull/128
This PR addresses JIRA QPID-8590 to improve flow-to-disk mechanism: messages
should be flowed to disk when their amount exceeds overflow limit or restored
in memory when their amount is less than overflow limit. Although performance
of purging messages from queue ( cleanQueue() operation ) is increased.
> [Broker-J] Purge on flow to disk queue
> --------------------------------------
>
> Key: QPID-8590
> URL: https://issues.apache.org/jira/browse/QPID-8590
> Project: Qpid
> Issue Type: Improvement
> Components: Broker-J
> Affects Versions: qpid-java-broker-8.0.6
> Reporter: Daniil Kirilyuk
> Priority: Minor
>
> Purging of flow to disk queues is very slow (~10messages/s).
> Steps to replicate:
> * send 100000 messages to flow to disk queue with overflow count set to 10
> * initiate queue purge by REST API
> Note: When broker is stop is initiated, queue is purged much faster
> (approximatelly ~60000messages/s).
> Threaddumps show following stacktraces:
> {code:java}
> "qtp1722120611-7283" #7283 prio=5 os_prio=0 cpu=47303542.52ms
> elapsed=76831.32s tid=0x00007fcf64097800 nid=0x3d5b runnable
> [0x00007fcf3c7be000]
> java.lang.Thread.State: RUNNABLE
> at
> org.apache.qpid.server.message.AbstractServerMessageImpl.decrementReference(AbstractServerMessageImpl.java:136)
> at
> org.apache.qpid.server.message.AbstractServerMessageImpl.access$500(AbstractServerMessageImpl.java:40)
> at
> org.apache.qpid.server.message.AbstractServerMessageImpl$Reference.release(AbstractServerMessageImpl.java:356)
> - locked <0x0000000600516aa0> (a
> org.apache.qpid.server.message.AbstractServerMessageImpl$Reference)
> at
> org.apache.qpid.server.message.AbstractServerMessageImpl$Reference.close(AbstractServerMessageImpl.java:363)
> at
> org.apache.qpid.server.queue.FlowToDiskOverflowPolicyHandler$Handler.flowToDisk(FlowToDiskOverflowPolicyHandler.java:135)
> at
> org.apache.qpid.server.queue.FlowToDiskOverflowPolicyHandler$Handler.flowTailToDiskIfNecessary(FlowToDiskOverflowPolicyHandler.java:105)
> at
> org.apache.qpid.server.queue.FlowToDiskOverflowPolicyHandler$Handler.checkOverflow(FlowToDiskOverflowPolicyHandler.java:68)
> at
> org.apache.qpid.server.queue.FlowToDiskOverflowPolicyHandler$Handler.access$100(FlowToDiskOverflowPolicyHandler.java:44)
> at
> org.apache.qpid.server.queue.FlowToDiskOverflowPolicyHandler.checkOverflow(FlowToDiskOverflowPolicyHandler.java:40)
> at
> org.apache.qpid.server.queue.AbstractQueue.checkCapacity(AbstractQueue.java:2117)
> at
> org.apache.qpid.server.queue.AbstractQueueEntryList.updateStatsOnStateChange(AbstractQueueEntryList.java:99)
> at
> org.apache.qpid.server.queue.OrderedQueueEntryList.updateStatsOnStateChange(OrderedQueueEntryList.java:33)
> at
> org.apache.qpid.server.queue.QueueEntryImpl.notifyStateChange(QueueEntryImpl.java:559)
> at
> org.apache.qpid.server.queue.QueueEntryImpl.dispose(QueueEntryImpl.java:578)
> at
> org.apache.qpid.server.queue.QueueEntryImpl.delete(QueueEntryImpl.java:596)
> at
> org.apache.qpid.server.queue.AbstractQueue$7.postCommit(AbstractQueue.java:1836)
> at
> org.apache.qpid.server.txn.LocalTransaction.doPostTransactionActions(LocalTransaction.java:473)
> at
> org.apache.qpid.server.txn.LocalTransaction.commit(LocalTransaction.java:402)
> at
> org.apache.qpid.server.txn.LocalTransaction.commit(LocalTransaction.java:374)
> at
> org.apache.qpid.server.queue.AbstractQueue.clearQueue(AbstractQueue.java:1816)
> at
> org.apache.qpid.server.queue.StandardQueueImplWithAccessChecking.clearQueue(StandardQueueImplWithAccessChecking.java:152)
> at
> jdk.internal.reflect.NativeMethodAccessorImpl.invoke0([email protected]/Native
> Method)
> at
> jdk.internal.reflect.NativeMethodAccessorImpl.invoke([email protected]/NativeMethodAccessorImpl.java:62)
> at
> jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke([email protected]/DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke([email protected]/Method.java:566)
> at
> org.apache.qpid.server.model.ConfiguredObjectMethodOperation.perform(ConfiguredObjectMethodOperation.java:125)
> at
> org.apache.qpid.server.management.plugin.controller.latest.LatestManagementController.invoke(LatestManagementController.java:372)
> at
> org.apache.qpid.server.management.plugin.controller.AbstractManagementController.handlePostOrPut(AbstractManagementController.java:147)
> at
> org.apache.qpid.server.management.plugin.controller.AbstractManagementController.handlePost(AbstractManagementController.java:100)
> at
> org.apache.qpid.server.management.plugin.servlet.rest.RestServlet.doPost(RestServlet.java:136)
> at
> org.apache.qpid.server.management.plugin.servlet.rest.AbstractServlet.doPost(AbstractServlet.java:146)
> at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)
> at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
> at
> org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:791)
> at
> org.eclipse.jetty.servlet.ServletHandler$ChainEnd.doFilter(ServletHandler.java:1626)
> at
> org.apache.qpid.server.management.plugin.filter.AuthenticationCheckFilter$1.run(AuthenticationCheckFilter.java:161)
> at
> org.apache.qpid.server.management.plugin.filter.AuthenticationCheckFilter$1.run(AuthenticationCheckFilter.java:157)
> at java.security.AccessController.doPrivileged([email protected]/Native
> Method)
> at javax.security.auth.Subject.doAs([email protected]/Subject.java:423)
> at
> org.apache.qpid.server.management.plugin.filter.AuthenticationCheckFilter.doFilterChainAs(AuthenticationCheckFilter.java:156)
> at
> org.apache.qpid.server.management.plugin.filter.AuthenticationCheckFilter.doFilter(AuthenticationCheckFilter.java:126)
> at
> org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193)
> at
> org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601)
> at
> org.apache.qpid.server.management.plugin.filter.LoggingFilter.doFilter(LoggingFilter.java:63)
> at
> org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193)
> at
> org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601)
> at
> org.apache.qpid.server.management.plugin.filter.MethodFilter.doFilter(MethodFilter.java:67)
> at
> org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193)
> at
> org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601)
> at
> org.eclipse.jetty.servlets.CrossOriginFilter.handle(CrossOriginFilter.java:319)
> at
> org.eclipse.jetty.servlets.CrossOriginFilter.doFilter(CrossOriginFilter.java:273)
> at
> org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193)
> at
> org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601)
> at
> org.apache.qpid.server.management.plugin.filter.ExceptionHandlingFilter.doFilter(ExceptionHandlingFilter.java:59)
> at
> org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193)
> at
> org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601)
> at
> org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:548)
> at
> org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
> at
> org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1624)
> at
> org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
> at
> org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1435)
> at
> org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
> at
> org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:501)
> at
> org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1594)
> at
> org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
> at
> org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1350)
> at
> org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
> at
> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
> at org.eclipse.jetty.server.Server.handle(Server.java:516)
> at
> org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:388)
> at
> org.eclipse.jetty.server.HttpChannel$$Lambda$220/0x00000008403d1040.dispatch(Unknown
> Source)
> at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:633)
> at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:380)
> at
> org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:273)
> at
> org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
> at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105)
> at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)
> at
> org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:336)
> at
> org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:313)
> at
> org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:171)
> at
> org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:129)
> at
> org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:375)
> at
> org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:773)
> at
> org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:905)
> at
> org.apache.qpid.server.bytebuffer.QpidByteBufferFactory.lambda$createQpidByteBufferTrackingThreadFactory$0(QpidByteBufferFactory.java:464)
> at
> org.apache.qpid.server.bytebuffer.QpidByteBufferFactory$$Lambda$73/0x0000000840157c40.run(Unknown
> Source)
> at java.lang.Thread.run([email protected]/Thread.java:834)
> {code}
> It seems that deletion of a queue entry in queue with FLOW_TO_DISK overflow
> policy triggers FlowToDiskOverflowPolicyHandler to iterate over queue entries
> leading to significant performance drop.
>
--
This message was sent by Atlassian Jira
(v8.20.7#820007)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]