[
https://issues.apache.org/jira/browse/PROTON-2177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17027428#comment-17027428
]
Robbie Gemmell commented on PROTON-2177:
----------------------------------------
I'd consider this more of a misuse than a bug. The link object cant be freed if
the remote side hasnt attached+detached yet on an ongoing session because it is
essentially still live and the peer may subsequently do so, as you encountered.
The vertx-proton javadoc for free even gives example of calling free only after
the link being locally and remotely closed, precisely due to things like this.
For your timeout handling I think essentially you'd need to kill the session
itself instead to avoid any potential 'leak' in this scenario.
> IllegalStateException when freeing link as part of timeout handling
> -------------------------------------------------------------------
>
> Key: PROTON-2177
> URL: https://issues.apache.org/jira/browse/PROTON-2177
> Project: Qpid Proton
> Issue Type: Bug
> Components: proton-j
> Affects Versions: proton-j-0.33.3
> Reporter: Carsten Lohmann
> Priority: Major
>
> Invoking free() on a link, or the processing of a received {{detach}} frame,
> may result in such an exception if the preconditions below are met:
> {noformat}
> java.lang.IllegalStateException
> at
> org.apache.qpid.proton.engine.impl.EndpointImpl.decref(EndpointImpl.java:54)
> at
> org.apache.qpid.proton.engine.impl.LinkImpl.postFinal(LinkImpl.java:128)
> at
> org.apache.qpid.proton.engine.impl.EndpointImpl.decref(EndpointImpl.java:52)
> at
> org.apache.qpid.proton.engine.impl.TransportLink.clearRemoteHandle(TransportLink.java:125)
> at
> org.apache.qpid.proton.engine.impl.TransportImpl.handleDetach(TransportImpl.java:1379)
> at
> org.apache.qpid.proton.engine.impl.TransportImpl.handleDetach(TransportImpl.java:70)
> at org.apache.qpid.proton.amqp.transport.Detach.invoke(Detach.java:86)
> at
> org.apache.qpid.proton.engine.impl.TransportImpl.handleFrame(TransportImpl.java:1453)
> at
> org.apache.qpid.proton.engine.impl.FrameParser.input(FrameParser.java:425)
> at
> org.apache.qpid.proton.engine.impl.FrameParser.process(FrameParser.java:536)
> at
> org.apache.qpid.proton.engine.impl.TransportImpl.process(TransportImpl.java:1570)
> at
> org.apache.qpid.proton.engine.impl.TransportImpl.processInput(TransportImpl.java:1528)
> {noformat}
> The scenario:
> We have implemented the logic for creating AMQP links with a
> timeout-mechanism. That means that after invoking {{link.open()}} we wait for
> a predefined time and if we haven't received the {{attach}} frame from the
> server at that point, we call {{link.close()}} and then {{link.free()}} to
> avoid having a memory leak.
> This has mostly worked well so far. In cases where the server was not sending
> the attach frame in time, after calling {{sender.close()}} the server usually
> finally responded with a {{detach}} frame (instead of sending an {{attach}}
> in between).
> But lately, when testing with a high number of links (>10000) we sometimes
> encountered such a server behaviour (with Qpid Dispatch Router as server):
> - client sends {{attach}} frame
> - linkEstablishmentTimeout: server doesn't respond in time so client invokes
> {{link.close()}} and then {{link.free()}}
> - *server sends an attach frame*
> - server sends a detach frame
> If that happens multiple times on the same session, the above exception
> occurs when calling {{link.free()}}. Afterwards there are 'socked closed'
> exceptions.
> If we don't invoke {{link.free}} as part of the linkEstablishmentTimeout
> handling, the exception doesn't occur.
> But not invoking {{link.free()}} would create a memory leak if the server
> doesn't respond at all to the {{attach}} frame.
> ---
> The issue can be reproduced with this test method (to be run as an additional
> method in the "org.apache.qpid.proton.systemtests.FreeTest" class)
> {code:java|collapse=true}
> @Test
> public void testFreeOnLinkEstablishmentTimeout() throws Exception {
> LOGGER.fine(bold("======== About to create transports"));
> getClient().transport = Proton.transport();
> ProtocolTracerEnabler.setProtocolTracer(getClient().transport,
> TestLoggingHelper.CLIENT_PREFIX);
> getServer().transport = Proton.transport();
> ProtocolTracerEnabler.setProtocolTracer(getServer().transport, "
> " + TestLoggingHelper.SERVER_PREFIX);
> getClient().connection = Proton.connection();
> getClient().transport.bind(getClient().connection);
> getServer().connection = Proton.connection();
> getServer().transport.bind(getServer().connection);
> LOGGER.fine(bold("======== About to open connections"));
> getClient().connection.open();
> getServer().connection.open();
> doOutputInputCycle();
> LOGGER.fine(bold("======== About to open session"));
> getClient().session = getClient().connection.session();
> getClient().session.open();
> pumpClientToServer();
> getServer().session =
> getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE));
> assertEndpointState(getServer().session, UNINITIALIZED, ACTIVE);
> getServer().session.open();
> assertEndpointState(getServer().session, ACTIVE, ACTIVE);
> pumpServerToClient();
> assertEndpointState(getClient().session, ACTIVE, ACTIVE);
> for (int i = 0; i < 5; i++) {
> LOGGER.fine("\n\n");
> LOGGER.fine(bold("======== About to create client sender " + i +
> "; refcount on session: " + getSessionRefCount(getClient().session)));
> getClient().source = new Source();
> getClient().source.setAddress(null);
> getClient().target = new Target();
> getClient().target.setAddress("myQueue");
> getClient().sender = getClient().session.sender("sender" + i);
> getClient().sender.setTarget(getClient().target);
> getClient().sender.setSource(getClient().source);
>
> getClient().sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
>
> getClient().sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
> assertEndpointState(getClient().sender, UNINITIALIZED,
> UNINITIALIZED);
> getClient().sender.open();
> assertEndpointState(getClient().sender, ACTIVE, UNINITIALIZED);
> pumpClientToServer();
> LOGGER.fine(bold("======== client: invoke close & free already,
> as would be done in linkEstablishmentTimeout handling"));
> getClient().sender.close();
> getClient().sender.free(); // skipping this, the error doesn't
> occur; but how to prevent a memory leak if no attach frame ever is received?
> // write to outputBuffer already
> ByteBuffer clientBuffer = getClient().transport.getOutputBuffer();
> LOGGER.fine(bold("======== About to set up server receiver (not
> having gotten the detach yet)"));
> getServer().receiver = (Receiver)
> getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE));
>
> getServer().receiver.setSenderSettleMode(getServer().receiver.getRemoteSenderSettleMode());
>
> getServer().receiver.setReceiverSettleMode(getServer().receiver.getRemoteReceiverSettleMode());
> org.apache.qpid.proton.amqp.transport.Target serverRemoteTarget =
> getServer().receiver.getRemoteTarget();
> assertTerminusEquals(getClient().target, serverRemoteTarget);
> getServer().receiver.setTarget(serverRemoteTarget);
> assertEndpointState(getServer().receiver, UNINITIALIZED, ACTIVE);
> getServer().receiver.open();
> assertEndpointState(getServer().receiver, ACTIVE, ACTIVE);
> // write server attach to server outputBuffer already
> ByteBuffer serverBuffer = getServer().transport.getOutputBuffer();
> LOGGER.fine(bold("~~~~~~~~~~ now write the yet-unsent
> clientBuffer (with 'detach') to the server"));
> getServer().transport.getInputBuffer().put(clientBuffer);
> getClient().transport.outputConsumed();
> getServer().transport.processInput().checkIsOk();
> LOGGER.fine(bold("~~~~~~~~~~ now write the yet-unsent
> serverBuffer (with 'attach') to the client"));
> getClient().transport.getInputBuffer().put(serverBuffer);
> getClient().transport.processInput().checkIsOk();
> getServer().transport.outputConsumed();
> // assert the server got the detach
> assertEndpointState(getServer().receiver, ACTIVE, CLOSED);
> // let the server respond with a detach
> getServer().receiver.close();
> LOGGER.fine(bold("~~~~~~~~~~ pump detach to client"));
> pumpServerToClient();
> assertEndpointState(getClient().sender, CLOSED, CLOSED);
> getClient().sender.free();
> }
> getClient().transport.unbind();
> LOGGER.fine(bold("======== About to close and free client's
> connection"));
> getClient().connection.close();
> getClient().connection.free();
> }
> private Integer getSessionRefCount(final Session protonSession) throws
> NoSuchFieldException, IllegalAccessException {
> if (!(protonSession instanceof EndpointImpl)) {
> return null;
> }
> final Field refcountField =
> EndpointImpl.class.getDeclaredField("refcount");
> refcountField.setAccessible(true);
> return (Integer) refcountField.get(protonSession);
> }
> {code}
> (Testcode using vertx-proton and separate client/server classes can also be
> provided if needed.)
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]