Github user gemmellr commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2149#discussion_r196726944 --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNoHearbeatsTest.java --- @@ -80,4 +88,68 @@ public void inspectOpenedResource(Connection connection) { connection.close(); } + private static final String QUEUE_NAME = "queue://testHeartless"; + + // This test is validating a scenario where the client will leave with connection reset + // This is done by setting soLinger=0 on the socket, which will make the system to issue a connection.reset instead of sending a + // disconnect. + @Test(timeout = 60000) + public void testCloseConsumerOnConnectionReset() throws Exception { + + AmqpClient client = createAmqpClient(); + assertNotNull(client); + + client.setValidator(new AmqpValidator() { + + @Override + public void inspectOpenedResource(Connection connection) { + assertEquals("idle timeout was not disabled", 0, connection.getTransport().getRemoteIdleTimeout()); + } + }); + + AmqpConnection connection = addConnection(client.connect()); + assertNotNull(connection); + + connection.getStateInspector().assertValid(); + AmqpSession session = connection.createSession(); + AmqpReceiver receiver = session.createReceiver(QUEUE_NAME); + + // This test needs a remote process exiting without closing the socket + // with soLinger=0 on the socket so it will issue a connection.reset + Process p = SpawnedVMSupport.spawnVM(AmqpNoHearbeatsTest.class.getName(), "testConnectionReset"); + Assert.assertEquals(33, p.waitFor()); + + AmqpSender sender = session.createSender(QUEUE_NAME); + + for (int i = 0; i < 10; i++) { + AmqpMessage msg = new AmqpMessage(); + msg.setBytes(new byte[] {0}); + sender.send(msg); + } + + receiver.flow(20); + + for (int i = 0; i < 10; i++) { + AmqpMessage msg = receiver.receive(1, TimeUnit.SECONDS); + Assert.assertNotNull(msg); + msg.accept(); + } + } + + public static void main(String[] arg) { + if (arg.length > 0 && arg[0].equals("testConnectionReset")) { + try { + AmqpClient client = new AmqpClient(new URI("tcp://127.0.0.1:5672?transport.soLinger=0"), null, null); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + AmqpReceiver receiver = session.createReceiver(QUEUE_NAME); --- End diff -- Linked with above comments, using the actual test name as the argument (instead of the differerent "testConnectionReset" literal) would mean this queue name constant wouldnt be needed and the argument could be used directly.
---