Repository: qpid-proton Updated Branches: refs/heads/master 0c27d5ffb -> 5ec901323
PROTON-1110: add toggle for suppressing the synthetic flow events generated on sending Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/5ec90132 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/5ec90132 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/5ec90132 Branch: refs/heads/master Commit: 5ec901323eb188b840a519df3e7aeea15523c218 Parents: 6422e24 Author: Robert Gemmell <[email protected]> Authored: Tue Jan 26 13:49:51 2016 +0000 Committer: Robert Gemmell <[email protected]> Committed: Tue Jan 26 14:06:55 2016 +0000 ---------------------------------------------------------------------- .../apache/qpid/proton/engine/Transport.java | 12 ++ .../qpid/proton/engine/impl/TransportImpl.java | 15 ++- .../proton/engine/impl/TransportImplTest.java | 124 +++++++++++++++++++ 3 files changed, 150 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ec90132/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java index 2e322d6..5d8b79d 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java @@ -270,4 +270,16 @@ public interface Transport extends Endpoint long getFramesInput(); long getFramesOutput(); + + /** + * Configure whether a synthetic Flow event should be emitted when messages are sent, + * reflecting a change in the credit level on the link that may prompt other action. + * + * Defaults to true. + * + * @param emitFlowEventOnSend true if a flow event should be emitted, false otherwise + */ + void setEmitFlowEventOnSend(boolean emitFlowEventOnSend); + + boolean isEmitFlowEventOnSend(); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ec90132/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java index d85794f..d132508 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java @@ -116,6 +116,7 @@ public class TransportImpl extends EndpointImpl private boolean _init; private boolean _processingStarted; + private boolean _emitFlowEventOnSend = true; private FrameHandler _frameHandler = this; private boolean _head_closed = false; @@ -611,7 +612,7 @@ public class TransportImpl extends EndpointImpl tpLink.setInProgressDelivery(delivery); } - if (snd.getLocalState() != EndpointState.CLOSED) { + if (_emitFlowEventOnSend && snd.getLocalState() != EndpointState.CLOSED) { getConnectionImpl().put(Event.Type.LINK_FLOW, snd); } } @@ -1657,4 +1658,16 @@ public class TransportImpl extends EndpointImpl public Reactor getReactor() { return _reactor; } + + @Override + public void setEmitFlowEventOnSend(boolean emitFlowEventOnSend) + { + _emitFlowEventOnSend = emitFlowEventOnSend; + } + + @Override + public boolean isEmitFlowEventOnSend() + { + return _emitFlowEventOnSend; + } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ec90132/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java index 888f4af..a0e6766 100644 --- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java +++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java @@ -24,12 +24,14 @@ import static org.apache.qpid.proton.engine.impl.TransportTestHelper.stringOfLen import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.LinkedList; import org.apache.qpid.proton.Proton; @@ -47,6 +49,7 @@ import org.apache.qpid.proton.engine.Collector; import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.EndpointState; +import org.apache.qpid.proton.engine.Event; import org.apache.qpid.proton.engine.Link; import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.engine.Sender; @@ -567,6 +570,127 @@ public class TransportImplTest assertTrue("Unexpected frame type", transport.writes.get(3) instanceof Transfer); } + @Test + public void testEmitFlowEventOnSend() + { + doEmitFlowOnSendTestImpl(true); + } + + public void testSupressFlowEventOnSend() + { + doEmitFlowOnSendTestImpl(false); + } + + void doEmitFlowOnSendTestImpl(boolean emitFlowEventOnSend) + { + MockTransportImpl transport = new MockTransportImpl(); + transport.setEmitFlowEventOnSend(emitFlowEventOnSend); + + Connection connection = Proton.connection(); + transport.bind(connection); + + Collector collector = Collector.Factory.create(); + connection.collect(collector); + + Session session = connection.session(); + session.open(); + + String linkName = "mySender"; + Sender sender = session.sender(linkName); + sender.open(); + + sendMessage(sender, "tag1", "content1"); + + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 0, transport.writes.size()); + + assertEvents(collector, Event.Type.CONNECTION_INIT, Event.Type.SESSION_INIT, Event.Type.SESSION_LOCAL_OPEN, + Event.Type.TRANSPORT, Event.Type.LINK_INIT, Event.Type.LINK_LOCAL_OPEN, Event.Type.TRANSPORT); + + // Now open the connection, expect the Open and Begin frames but + // nothing else as we haven't opened the receiver itself yet. + connection.open(); + + pumpMockTransport(transport); + + assertEvents(collector, Event.Type.CONNECTION_LOCAL_OPEN, Event.Type.TRANSPORT); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size()); + + assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open); + assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin); + assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach); + + transport.handleFrame(new TransportFrame(0, new Open(), null)); + + Begin begin = new Begin(); + begin.setRemoteChannel(UnsignedShort.valueOf((short) 0)); + transport.handleFrame(new TransportFrame(0, begin, null)); + + Attach attach = new Attach(); + attach.setHandle(UnsignedInteger.ZERO); + attach.setRole(Role.RECEIVER); + attach.setName(linkName); + attach.setInitialDeliveryCount(UnsignedInteger.ZERO); + transport.handleFrame(new TransportFrame(0, attach, null)); + + Flow flow = new Flow(); + flow.setHandle(UnsignedInteger.ZERO); + flow.setDeliveryCount(UnsignedInteger.ZERO); + flow.setNextIncomingId(UnsignedInteger.ONE); + flow.setNextOutgoingId(UnsignedInteger.ZERO); + flow.setIncomingWindow(UnsignedInteger.valueOf(1024)); + flow.setOutgoingWindow(UnsignedInteger.valueOf(1024)); + flow.setLinkCredit(UnsignedInteger.valueOf(10)); + + transport.handleFrame(new TransportFrame(0, flow, null)); + + assertEvents(collector, Event.Type.CONNECTION_REMOTE_OPEN, Event.Type.SESSION_REMOTE_OPEN, + Event.Type.LINK_REMOTE_OPEN, Event.Type.LINK_FLOW); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size()); + + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size()); + assertTrue("Unexpected frame type", transport.writes.get(3) instanceof Transfer); + + if(emitFlowEventOnSend) + { + assertEvents(collector, Event.Type.LINK_FLOW); + } + else + { + assertNoEvents(collector); + } + } + + private void assertNoEvents(Collector collector) + { + assertEvents(collector); + } + + private void assertEvents(Collector collector, Event.Type... expectedEventTypes) + { + + if(expectedEventTypes.length == 0) + { + assertNull("Expected no events, but at least one was present: " + collector.peek(), collector.peek()); + } + else + { + ArrayList<Event.Type> eventTypesList = new ArrayList<Event.Type>(); + Event event = null; + while ((event = collector.peek()) != null) { + eventTypesList.add(event.getType()); + collector.pop(); + } + + assertArrayEquals("Unexpected event types: " + eventTypesList, expectedEventTypes, eventTypesList.toArray(new Event.Type[0])); + } + } + private void pumpMockTransport(MockTransportImpl transport) { while(transport.pending() > 0) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
