[ 
https://issues.apache.org/jira/browse/PROTON-2177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17027428#comment-17027428
 ] 

Robbie Gemmell commented on PROTON-2177:
----------------------------------------

I'd consider this more of a misuse than a bug. The link object cant be freed if 
the remote side hasnt attached+detached yet on an ongoing session because it is 
essentially still live and the peer may subsequently do so, as you encountered. 
The vertx-proton javadoc for free even gives example of calling free only after 
the link being locally and remotely closed, precisely due to things like this.

For your timeout handling I think essentially you'd need to kill the session 
itself instead to avoid any potential 'leak' in this scenario.

> IllegalStateException when freeing link as part of timeout handling
> -------------------------------------------------------------------
>
>                 Key: PROTON-2177
>                 URL: https://issues.apache.org/jira/browse/PROTON-2177
>             Project: Qpid Proton
>          Issue Type: Bug
>          Components: proton-j
>    Affects Versions: proton-j-0.33.3
>            Reporter: Carsten Lohmann
>            Priority: Major
>
> Invoking free() on a link, or the processing of a received {{detach}} frame, 
> may result in such an exception if the preconditions below are met:
> {noformat}
> java.lang.IllegalStateException
>       at 
> org.apache.qpid.proton.engine.impl.EndpointImpl.decref(EndpointImpl.java:54)
>       at 
> org.apache.qpid.proton.engine.impl.LinkImpl.postFinal(LinkImpl.java:128)
>       at 
> org.apache.qpid.proton.engine.impl.EndpointImpl.decref(EndpointImpl.java:52)
>       at 
> org.apache.qpid.proton.engine.impl.TransportLink.clearRemoteHandle(TransportLink.java:125)
>       at 
> org.apache.qpid.proton.engine.impl.TransportImpl.handleDetach(TransportImpl.java:1379)
>       at 
> org.apache.qpid.proton.engine.impl.TransportImpl.handleDetach(TransportImpl.java:70)
>       at org.apache.qpid.proton.amqp.transport.Detach.invoke(Detach.java:86)
>       at 
> org.apache.qpid.proton.engine.impl.TransportImpl.handleFrame(TransportImpl.java:1453)
>       at 
> org.apache.qpid.proton.engine.impl.FrameParser.input(FrameParser.java:425)
>       at 
> org.apache.qpid.proton.engine.impl.FrameParser.process(FrameParser.java:536)
>       at 
> org.apache.qpid.proton.engine.impl.TransportImpl.process(TransportImpl.java:1570)
>       at 
> org.apache.qpid.proton.engine.impl.TransportImpl.processInput(TransportImpl.java:1528)
> {noformat}
> The scenario:
> We have implemented the logic for creating AMQP links with a 
> timeout-mechanism. That means that after invoking {{link.open()}} we wait for 
> a predefined time and if we haven't received the {{attach}} frame from the 
> server at that point, we call {{link.close()}} and then {{link.free()}} to 
> avoid having a memory leak.
> This has mostly worked well so far. In cases where the server was not sending 
> the attach frame in time, after calling {{sender.close()}} the server usually 
> finally responded with a {{detach}} frame (instead of sending an {{attach}} 
> in between).
> But lately, when testing with a high number of links (>10000) we sometimes 
> encountered such a server behaviour (with Qpid Dispatch Router as server):
> - client sends {{attach}} frame
> - linkEstablishmentTimeout: server doesn't respond in time so client invokes 
> {{link.close()}} and then {{link.free()}}
> - *server sends an attach frame*
> - server sends a detach frame
> If that happens multiple times on the same session, the above exception 
> occurs when calling {{link.free()}}. Afterwards there are 'socked closed' 
> exceptions.
> If we don't invoke {{link.free}} as part of the linkEstablishmentTimeout 
> handling, the exception doesn't occur.
> But not invoking {{link.free()}} would create a memory leak if the server 
> doesn't respond at all to the {{attach}} frame.
> ---
> The issue can be reproduced with this test method (to be run as an additional 
> method in the "org.apache.qpid.proton.systemtests.FreeTest" class)
> {code:java|collapse=true}
>     @Test
>     public void testFreeOnLinkEstablishmentTimeout() throws Exception {
>         LOGGER.fine(bold("======== About to create transports"));
>         getClient().transport = Proton.transport();
>         ProtocolTracerEnabler.setProtocolTracer(getClient().transport, 
> TestLoggingHelper.CLIENT_PREFIX);
>         getServer().transport = Proton.transport();
>         ProtocolTracerEnabler.setProtocolTracer(getServer().transport, "      
>       " + TestLoggingHelper.SERVER_PREFIX);
>         getClient().connection = Proton.connection();
>         getClient().transport.bind(getClient().connection);
>         getServer().connection = Proton.connection();
>         getServer().transport.bind(getServer().connection);
>         LOGGER.fine(bold("======== About to open connections"));
>         getClient().connection.open();
>         getServer().connection.open();
>         doOutputInputCycle();
>         LOGGER.fine(bold("======== About to open session"));
>         getClient().session = getClient().connection.session();
>         getClient().session.open();
>         pumpClientToServer();
>         getServer().session = 
> getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE));
>         assertEndpointState(getServer().session, UNINITIALIZED, ACTIVE);
>         getServer().session.open();
>         assertEndpointState(getServer().session, ACTIVE, ACTIVE);
>         pumpServerToClient();
>         assertEndpointState(getClient().session, ACTIVE, ACTIVE);
>         for (int i = 0; i < 5; i++) {
>             LOGGER.fine("\n\n");
>             LOGGER.fine(bold("======== About to create client sender " + i + 
> "; refcount on session: " + getSessionRefCount(getClient().session)));
>             getClient().source = new Source();
>             getClient().source.setAddress(null);
>             getClient().target = new Target();
>             getClient().target.setAddress("myQueue");
>             getClient().sender = getClient().session.sender("sender" + i);
>             getClient().sender.setTarget(getClient().target);
>             getClient().sender.setSource(getClient().source);
>             
> getClient().sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
>             
> getClient().sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
>             assertEndpointState(getClient().sender, UNINITIALIZED, 
> UNINITIALIZED);
>             getClient().sender.open();
>             assertEndpointState(getClient().sender, ACTIVE, UNINITIALIZED);
>             pumpClientToServer();
>             LOGGER.fine(bold("======== client: invoke close & free already, 
> as would be done in linkEstablishmentTimeout handling"));
>             getClient().sender.close();
>             getClient().sender.free();  // skipping this, the error doesn't 
> occur; but how to prevent a memory leak if no attach frame ever is received?
>             // write to outputBuffer already
>             ByteBuffer clientBuffer = getClient().transport.getOutputBuffer();
>             LOGGER.fine(bold("======== About to set up server receiver (not 
> having gotten the detach yet)"));
>             getServer().receiver = (Receiver) 
> getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE));
>             
> getServer().receiver.setSenderSettleMode(getServer().receiver.getRemoteSenderSettleMode());
>             
> getServer().receiver.setReceiverSettleMode(getServer().receiver.getRemoteReceiverSettleMode());
>             org.apache.qpid.proton.amqp.transport.Target serverRemoteTarget = 
> getServer().receiver.getRemoteTarget();
>             assertTerminusEquals(getClient().target, serverRemoteTarget);
>             getServer().receiver.setTarget(serverRemoteTarget);
>             assertEndpointState(getServer().receiver, UNINITIALIZED, ACTIVE);
>             getServer().receiver.open();
>             assertEndpointState(getServer().receiver, ACTIVE, ACTIVE);
>             // write server attach to server outputBuffer already
>             ByteBuffer serverBuffer = getServer().transport.getOutputBuffer();
>             LOGGER.fine(bold("~~~~~~~~~~ now write the yet-unsent 
> clientBuffer (with 'detach') to the server"));
>             getServer().transport.getInputBuffer().put(clientBuffer);
>             getClient().transport.outputConsumed();
>             getServer().transport.processInput().checkIsOk();
>             LOGGER.fine(bold("~~~~~~~~~~ now write the yet-unsent 
> serverBuffer (with 'attach') to the client"));
>             getClient().transport.getInputBuffer().put(serverBuffer);
>             getClient().transport.processInput().checkIsOk();
>             getServer().transport.outputConsumed();
>             // assert the server got the detach
>             assertEndpointState(getServer().receiver, ACTIVE, CLOSED);
>             // let the server respond with a detach
>             getServer().receiver.close();
>             LOGGER.fine(bold("~~~~~~~~~~ pump detach to client"));
>             pumpServerToClient();
>             assertEndpointState(getClient().sender, CLOSED, CLOSED);
>             getClient().sender.free();
>         }
>         getClient().transport.unbind();
>         LOGGER.fine(bold("======== About to close and free client's 
> connection"));
>         getClient().connection.close();
>         getClient().connection.free();
>     }
>     private Integer getSessionRefCount(final Session protonSession) throws 
> NoSuchFieldException, IllegalAccessException {
>         if (!(protonSession instanceof EndpointImpl)) {
>             return null;
>         }
>         final Field refcountField = 
> EndpointImpl.class.getDeclaredField("refcount");
>         refcountField.setAccessible(true);
>         return (Integer) refcountField.get(protonSession);
>     }
> {code}
> (Testcode using vertx-proton and separate client/server classes can also be 
> provided if needed.)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to