QPID-8038: [Broker-J] [AMQP 0-x/1.0] Add heartbeating/idle tests to protocol suites
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/99fa51f0 Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/99fa51f0 Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/99fa51f0 Branch: refs/heads/master Commit: 99fa51f01cbd03e5712821bcdd782e59584c175f Parents: a9e61c1 Author: Keith Wall <[email protected]> Authored: Thu Jan 4 10:13:29 2018 +0000 Committer: Keith Wall <[email protected]> Committed: Thu Jan 4 12:15:45 2018 +0000 ---------------------------------------------------------------------- .../protocol/v0_10/ConnectionInteraction.java | 14 +++ .../tests/protocol/v0_10/ConnectionTest.java | 82 ++++++++++++++ .../protocol/v0_8/ConnectionInteraction.java | 16 +++ .../protocol/v0_8/ExchangeInteraction.java | 6 + .../qpid/tests/protocol/v0_8/Interaction.java | 6 + .../tests/protocol/v0_8/ConnectionTest.java | 111 +++++++++++++++++++ .../qpid/tests/protocol/v1_0/EmptyResponse.java | 31 ++++++ .../qpid/tests/protocol/v1_0/FrameDecoder.java | 4 + .../qpid/tests/protocol/v1_0/Interaction.java | 20 ++++ .../v1_0/transport/connection/OpenTest.java | 55 +++++++++ 10 files changed, 345 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/99fa51f0/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ConnectionInteraction.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ConnectionInteraction.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ConnectionInteraction.java index d7b54b0..56d7498 100644 --- a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ConnectionInteraction.java +++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ConnectionInteraction.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.tests.protocol.v0_10; +import org.apache.qpid.server.protocol.v0_10.transport.ConnectionHeartbeat; import org.apache.qpid.server.protocol.v0_10.transport.ConnectionOpen; import org.apache.qpid.server.protocol.v0_10.transport.ConnectionStartOk; import org.apache.qpid.server.protocol.v0_10.transport.ConnectionTuneOk; @@ -33,6 +34,7 @@ public class ConnectionInteraction private ConnectionStartOk _startOk; private ConnectionTuneOk _tuneOk; private ConnectionOpen _open; + private ConnectionHeartbeat _connectionHeartbeat; public ConnectionInteraction(final Interaction interaction) { @@ -40,6 +42,7 @@ public class ConnectionInteraction _startOk = new ConnectionStartOk(); _tuneOk = new ConnectionTuneOk(); _open = new ConnectionOpen(); + _connectionHeartbeat = new ConnectionHeartbeat(); } public Interaction startOk() throws Exception @@ -69,6 +72,12 @@ public class ConnectionInteraction return this; } + public ConnectionInteraction tuneOkHeartbeat(final int heartbeat) + { + _tuneOk.setHeartbeat(heartbeat); + return this; + } + public ConnectionInteraction tuneOkMaxFrameSize(final int maxFrameSize) { _tuneOk.setMaxFrameSize(maxFrameSize); @@ -80,4 +89,9 @@ public class ConnectionInteraction _startOk.setResponse(response); return this; } + + public Interaction heartbeat() throws Exception + { + return _interaction.sendPerformative(_connectionHeartbeat); + } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/99fa51f0/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java index d273f02..1a800f6 100644 --- a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java +++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java @@ -20,8 +20,12 @@ */ package org.apache.qpid.tests.protocol.v0_10; +import static org.hamcrest.CoreMatchers.both; import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.Assume.assumeThat; import java.net.InetSocketAddress; @@ -30,6 +34,7 @@ import org.junit.Before; import org.junit.Test; import org.apache.qpid.server.protocol.v0_10.transport.ConnectionClose; +import org.apache.qpid.server.protocol.v0_10.transport.ConnectionHeartbeat; import org.apache.qpid.server.protocol.v0_10.transport.ConnectionOpenOk; import org.apache.qpid.server.protocol.v0_10.transport.ConnectionSecure; import org.apache.qpid.server.protocol.v0_10.transport.ConnectionStart; @@ -181,4 +186,81 @@ public class ConnectionTest extends BrokerAdminUsingTestBase } } + @Test + @SpecificationTest(section = "9.connection", + description = "The heartbeat control may be used to generate artificial network traffic when a connection " + + "is idle.") + public void heartbeating() throws Exception + { + try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + final Interaction interaction = transport.newInteraction(); + ConnectionTune response = interaction.negotiateProtocol().consumeResponse() + .consumeResponse(ConnectionStart.class) + .connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS).startOk() + .consumeResponse().getLatestResponse(ConnectionTune.class); + + assumeThat(response.hasHeartbeatMin(), is(true)); + assumeThat(response.hasHeartbeatMax(), is(true)); + assumeThat(response.getHeartbeatMin(), is(greaterThanOrEqualTo(0))); + assumeThat(response.getHeartbeatMax(), is(greaterThanOrEqualTo(1))); + + final int heartbeatPeriod = 1; + + interaction.connection() + .tuneOkChannelMax(response.getChannelMax()) + .tuneOkMaxFrameSize(response.getMaxFrameSize()) + .tuneOkHeartbeat(heartbeatPeriod) + .tuneOk() + .connection().open() + .consumeResponse(ConnectionOpenOk.class); + + final long startTime = System.currentTimeMillis(); + interaction.consumeResponse().getLatestResponse(ConnectionHeartbeat.class); + final long actualHeartbeatDelay = System.currentTimeMillis() - startTime; + assertThat("Heartbeat not received within expected time frame", + ((int)actualHeartbeatDelay / 1000), + is(both(greaterThanOrEqualTo(heartbeatPeriod)).and(lessThanOrEqualTo(heartbeatPeriod * 2)))); + interaction.connection().heartbeat(); + + interaction.consumeResponse(ConnectionHeartbeat.class) + .connection().heartbeat(); + } + } + + + @Test + @SpecificationTest(section = "9.connection", + description = "If a connection is idle for more than twice the negotiated heartbeat delay, the peers MAY " + + "be considered disconnected.") + public void heartbeatingIncomingIdle() throws Exception + { + try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + final Interaction interaction = transport.newInteraction(); + ConnectionTune response = interaction.negotiateProtocol().consumeResponse() + .consumeResponse(ConnectionStart.class) + .connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS).startOk() + .consumeResponse().getLatestResponse(ConnectionTune.class); + + assumeThat(response.hasHeartbeatMin(), is(true)); + assumeThat(response.hasHeartbeatMax(), is(true)); + assumeThat(response.getHeartbeatMin(), is(greaterThanOrEqualTo(0))); + assumeThat(response.getHeartbeatMax(), is(greaterThanOrEqualTo(1))); + + final int heartbeatPeriod = 1; + + interaction.connection() + .tuneOkChannelMax(response.getChannelMax()) + .tuneOkMaxFrameSize(response.getMaxFrameSize()) + .tuneOkHeartbeat(heartbeatPeriod) + .tuneOk() + .connection().open() + .consumeResponse(ConnectionOpenOk.class); + + interaction.consumeResponse().getLatestResponse(ConnectionHeartbeat.class); + + transport.assertNoMoreResponsesAndChannelClosed(); + } + } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/99fa51f0/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ConnectionInteraction.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ConnectionInteraction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ConnectionInteraction.java index 236c49a..3c4943e 100644 --- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ConnectionInteraction.java +++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ConnectionInteraction.java @@ -24,8 +24,10 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import org.apache.qpid.server.protocol.ErrorCodes; import org.apache.qpid.server.protocol.v0_8.AMQShortString; import org.apache.qpid.server.protocol.v0_8.FieldTable; +import org.apache.qpid.server.protocol.v0_8.transport.ConnectionCloseBody; import org.apache.qpid.server.protocol.v0_8.transport.ConnectionOpenBody; import org.apache.qpid.server.protocol.v0_8.transport.ConnectionStartOkBody; import org.apache.qpid.server.protocol.v0_8.transport.ConnectionTuneOkBody; @@ -43,6 +45,11 @@ public class ConnectionInteraction private int _tuneOkHeartbeat; private String _openVirtualHost; + private int _closeReplyCode = ErrorCodes.REPLY_SUCCESS; + private String _closeReplyText; + private int _closeClassId; + private int _closeMethodId; + public ConnectionInteraction(final Interaction interaction) { _interaction = interaction; @@ -106,4 +113,13 @@ public class ConnectionInteraction null, false)); } + + public Interaction close() throws Exception + { + return _interaction.sendPerformative(new ConnectionCloseBody(_interaction.getProtocolVersion(), + _closeReplyCode, + AMQShortString.valueOf(_closeReplyText), + _closeClassId, + _closeMethodId)); + } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/99fa51f0/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ExchangeInteraction.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ExchangeInteraction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ExchangeInteraction.java index fedd481..b35a1a9 100644 --- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ExchangeInteraction.java +++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ExchangeInteraction.java @@ -84,6 +84,12 @@ public class ExchangeInteraction return this; } + public ExchangeInteraction declareNoWait(final boolean noWait) + { + _declareNoWait = noWait; + return this; + } + public ExchangeInteraction declareArguments(final Map<String,Object> args) { _declareArguments = args == null ? Collections.emptyMap() : new HashMap<>(args); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/99fa51f0/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java index 5e89af8..bb38fa0 100644 --- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java +++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.tests.protocol.v0_8; +import org.apache.qpid.server.protocol.ProtocolVersion; import org.apache.qpid.server.protocol.v0_8.transport.AMQBody; import org.apache.qpid.server.protocol.v0_8.transport.AMQDataBlock; import org.apache.qpid.server.protocol.v0_8.transport.AMQFrame; @@ -65,6 +66,11 @@ public class Interaction extends AbstractInteraction<Interaction> return _protocolHeader; } + public ProtocolVersion getProtocolVersion() + { + return ((FrameTransport) getTransport()).getProtocolVersion(); + } + public Interaction sendPerformative(final AMQBody amqBody) throws Exception { return sendPerformative(_channelId, amqBody); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/99fa51f0/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java index 9c693a3..81dbb85 100644 --- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java +++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java @@ -20,9 +20,12 @@ */ package org.apache.qpid.tests.protocol.v0_8; +import static org.hamcrest.CoreMatchers.both; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import java.net.InetSocketAddress; @@ -31,10 +34,12 @@ import org.junit.Test; import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody; import org.apache.qpid.server.protocol.v0_8.transport.ConnectionCloseBody; +import org.apache.qpid.server.protocol.v0_8.transport.ConnectionCloseOkBody; import org.apache.qpid.server.protocol.v0_8.transport.ConnectionOpenOkBody; import org.apache.qpid.server.protocol.v0_8.transport.ConnectionSecureBody; import org.apache.qpid.server.protocol.v0_8.transport.ConnectionStartBody; import org.apache.qpid.server.protocol.v0_8.transport.ConnectionTuneBody; +import org.apache.qpid.server.protocol.v0_8.transport.HeartbeatBody; import org.apache.qpid.tests.protocol.ChannelClosedResponse; import org.apache.qpid.tests.protocol.SpecificationTest; import org.apache.qpid.tests.utils.BrokerAdmin; @@ -246,4 +251,110 @@ public class ConnectionTest extends BrokerAdminUsingTestBase .consumeResponse(ConnectionCloseBody.class, ChannelClosedResponse.class); } } + + @Test + @SpecificationTest(section = "4.2.7", + description = "Heartbeat frames tell the recipient that the sender is still alive. The rate and timing of" + + " heartbeat frames is negotiated during connection tuning.") + public void heartbeating() throws Exception + { + try(FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + final Interaction interaction = transport.newInteraction(); + ConnectionTuneBody response = interaction.negotiateProtocol() + .consumeResponse(ConnectionStartBody.class) + .connection().startOkMechanism("ANONYMOUS") + .startOk() + .consumeResponse().getLatestResponse(ConnectionTuneBody.class); + + final Long heartbeatPeriod = 1L; + + interaction.connection() + .tuneOkChannelMax(response.getChannelMax()) + .tuneOkFrameMax(response.getFrameMax()) + .tuneOkHeartbeat(heartbeatPeriod.intValue()) + .tuneOk() + .connection().open() + .consumeResponse(ConnectionOpenOkBody.class); + + final long startTime = System.currentTimeMillis(); + interaction.consumeResponse().getLatestResponse(HeartbeatBody.class); + final long actualHeartbeatDelay = System.currentTimeMillis() - startTime; + assertThat("Heartbeat not received within expected time frame", + actualHeartbeatDelay / 1000, + is(both(greaterThanOrEqualTo(heartbeatPeriod)).and(lessThanOrEqualTo(heartbeatPeriod * 2)))); + interaction.sendPerformative(new HeartbeatBody()); + + interaction.consumeResponse(HeartbeatBody.class) + .sendPerformative(new HeartbeatBody()); + } + } + + @Test + @SpecificationTest(section = "4.2.7", description = "Any sent octet is a valid substitute for a heartbeat") + public void heartbeatingIncomingTrafficIsNonHeartbeat() throws Exception + { + try(FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + final Interaction interaction = transport.newInteraction(); + ConnectionTuneBody response = interaction.negotiateProtocol() + .consumeResponse(ConnectionStartBody.class) + .connection().startOkMechanism("ANONYMOUS") + .startOk() + .consumeResponse().getLatestResponse(ConnectionTuneBody.class); + + final Long heartbeatPeriod = 1L; + + interaction.connection() + .tuneOkChannelMax(response.getChannelMax()) + .tuneOkFrameMax(response.getFrameMax()) + .tuneOkHeartbeat(heartbeatPeriod.intValue()) + .tuneOk() + .connection().open() + .consumeResponse(ConnectionOpenOkBody.class) + .channel().open() + .consumeResponse(ChannelOpenOkBody.class) + .consumeResponse(HeartbeatBody.class) + .exchange().declarePassive(true).declareNoWait(true).declare() + .consumeResponse(HeartbeatBody.class) + .sendPerformative(new HeartbeatBody()) + .exchange().declarePassive(true).declareNoWait(true).declare(); + + interaction.connection() + .close() + .consumeResponse().getLatestResponse(ConnectionCloseOkBody.class); + } + } + + @Test + @SpecificationTest(section = "4.2.7", + description = " If a peer detects no incoming traffic (i.e. received octets) for two heartbeat intervals " + + "or longer, it should close the connection without following the Connection.Close/Close-Ok handshaking") + public void heartbeatingNoIncomingTraffic() throws Exception + { + try(FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + final Interaction interaction = transport.newInteraction(); + ConnectionTuneBody response = interaction.negotiateProtocol() + .consumeResponse(ConnectionStartBody.class) + .connection().startOkMechanism("ANONYMOUS") + .startOk() + .consumeResponse().getLatestResponse(ConnectionTuneBody.class); + + final Long heartbeatPeriod = 1L; + + interaction.connection() + .tuneOkChannelMax(response.getChannelMax()) + .tuneOkFrameMax(response.getFrameMax()) + .tuneOkHeartbeat(heartbeatPeriod.intValue()) + .tuneOk() + .connection().open() + .consumeResponse(ConnectionOpenOkBody.class) + .consumeResponse(HeartbeatBody.class); + + // Do not reflect a heartbeat so incoming line will be silent thus + // requiring the broker to close the connection. + transport.assertNoMoreResponsesAndChannelClosed(); + } + } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/99fa51f0/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/EmptyResponse.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/EmptyResponse.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/EmptyResponse.java new file mode 100644 index 0000000..c233e28 --- /dev/null +++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/EmptyResponse.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.tests.protocol.v1_0; + +import org.apache.qpid.tests.protocol.Response; + +public class EmptyResponse implements Response<EmptyResponse> +{ + @Override + public EmptyResponse getBody() + { + return this; + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/99fa51f0/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java index 0c94ad7..a8ab32e 100644 --- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java +++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java @@ -241,6 +241,10 @@ public class FrameDecoder implements InputDecoder resetInputHandlerAfterSaslOutcome(); } } + else if (val == null) + { + response = new EmptyResponse(); + } else { throw new UnsupportedOperationException("Unexpected frame type : " + val.getClass()); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/99fa51f0/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java index 7a372a7..03f496a 100644 --- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java +++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java @@ -82,6 +82,10 @@ import org.apache.qpid.tests.protocol.Response; public class Interaction extends AbstractInteraction<Interaction> { + private static final FrameBody EMPTY_FRAME = (channel, conn) -> { + throw new UnsupportedOperationException(); + }; + private static final Set<String> CONTAINER_IDS = Collections.newSetFromMap(new ConcurrentHashMap<>()); private final Begin _begin; private final End _end; @@ -269,6 +273,12 @@ public class Interaction extends AbstractInteraction<Interaction> return this; } + public Interaction openIdleTimeOut(final int idleTimeOut) + { + _open.setIdleTimeOut(UnsignedInteger.valueOf(idleTimeOut)); + return this; + } + public Interaction openProperties(final Map<Symbol, Object> properties) { _open.setProperties(properties); @@ -1071,4 +1081,14 @@ public class Interaction extends AbstractInteraction<Interaction> return new InteractionTransactionalState(handle); } + /////////// + // Empty // + /////////// + + public Interaction emptyFrame() throws Exception + { + sendPerformative(EMPTY_FRAME, UnsignedShort.ZERO); + return this; + } + } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/99fa51f0/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java index ab570da..a744cb1 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java @@ -21,11 +21,13 @@ package org.apache.qpid.tests.protocol.v1_0.transport.connection; import static org.hamcrest.CoreMatchers.both; +import static org.hamcrest.CoreMatchers.either; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; import java.net.InetSocketAddress; @@ -41,6 +43,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Open; import org.apache.qpid.tests.utils.BrokerAdmin; import org.apache.qpid.tests.protocol.v1_0.FrameTransport; import org.apache.qpid.tests.protocol.v1_0.Interaction; +import org.apache.qpid.tests.protocol.v1_0.EmptyResponse; import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase; import org.apache.qpid.tests.protocol.SpecificationTest; @@ -94,6 +97,58 @@ public class OpenTest extends BrokerAdminUsingTestBase } @Test + @SpecificationTest(section = "2.4.5", + description = "Implementations MUST be prepared to handle empty frames arriving on any valid channel") + public void emptyFrame() throws Exception + { + final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP); + try (FrameTransport transport = new FrameTransport(addr).connect()) + { + Interaction interaction = transport.newInteraction(); + interaction.negotiateProtocol().consumeResponse() + .openContainerId("testContainerId") + .open().consumeResponse(Open.class) + .emptyFrame() + .doCloseConnection(); + } + } + + @Test + @SpecificationTest(section = "2.4.5", + description = "Connections are subject to an idle timeout threshold.") + public void idleTimeout() throws Exception + { + final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP); + try (FrameTransport transport = new FrameTransport(addr).connect()) + { + Interaction interaction = transport.newInteraction(); + final int idleTimeOut = 1000; + Open responseOpen = interaction.negotiateProtocol().consumeResponse() + .openContainerId("testContainerId") + .openIdleTimeOut(idleTimeOut) + .open().consumeResponse() + .getLatestResponse(Open.class); + + final int peerIdleTimeOut = responseOpen.getIdleTimeOut().intValue(); + assertThat(peerIdleTimeOut, is(either(equalTo(0)).or(greaterThanOrEqualTo(idleTimeOut)))); + + final long startTime = System.currentTimeMillis(); + interaction.consumeResponse(EmptyResponse.class); + final long actualHeartbeatDelay = System.currentTimeMillis() - startTime; + assertThat("Empty frame not received within expected time frame", + ((int)actualHeartbeatDelay / 1000), + is(both(greaterThanOrEqualTo(peerIdleTimeOut)).and(lessThanOrEqualTo(peerIdleTimeOut * 2)))); + + if (peerIdleTimeOut > 0) + { + interaction.emptyFrame(); + } + + interaction.doCloseConnection(); + } + } + + @Test @SpecificationTest(section = "2.4.1", description = "The open frame can only be sent on channel 0. §2.7.1: A peer that receives a channel number outside the supported range MUST close the connection with the framing-error error-code.") public void failOpenOnChannelNotZero() throws Exception --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
