Repository: qpid-broker-j Updated Branches: refs/heads/master 955a79b7d -> 660c206de
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java index 7fa98c5..a0eab61 100644 --- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java +++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java @@ -21,10 +21,8 @@ package org.apache.qpid.tests.protocol.v1_0; import java.net.InetSocketAddress; -import java.util.List; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; -import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils; import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; import org.apache.qpid.server.protocol.v1_0.type.messaging.Header; import org.apache.qpid.server.protocol.v1_0.type.transport.Attach; @@ -94,41 +92,27 @@ public class Utils final Header header = new Header(); messageEncoder.setHeader(header); messageEncoder.addData(messageContent); - List<QpidByteBuffer> payload = messageEncoder.getPayload(); - long size = QpidByteBufferUtils.remaining(payload); - - QpidByteBuffer[] result = new QpidByteBuffer[numberOfParts]; - int chunkSize = (int) size / numberOfParts; - int lastChunkSize = (int) size - chunkSize * (numberOfParts - 1); - for (int i = 0; i < numberOfParts; i++) - { - result[i] = QpidByteBuffer.allocate(false, i == numberOfParts - 1 ? lastChunkSize : chunkSize); - } - - int currentBufferIndex = 0; - for (QpidByteBuffer p : payload) + final QpidByteBuffer[] result; + try (QpidByteBuffer payload = messageEncoder.getPayload()) { - final int limit = p.limit(); + long size = (long) payload.remaining(); - while (p.hasRemaining()) + result = new QpidByteBuffer[numberOfParts]; + int chunkSize = (int) size / numberOfParts; + int lastChunkSize = (int) size - chunkSize * (numberOfParts - 1); + for (int i = 0; i < numberOfParts; i++) { - QpidByteBuffer currentBuffer = result[currentBufferIndex]; - if (currentBuffer.hasRemaining()) + result[i] = QpidByteBuffer.allocate(false, i == numberOfParts - 1 ? lastChunkSize : chunkSize); + final int remaining = result[i].remaining(); + try (QpidByteBuffer view = payload.view(0, remaining)) { - int length = Math.min(p.remaining(), currentBuffer.remaining()); - p.limit(p.position() + length); - currentBuffer.put(p.slice()); - p.position(p.position() + length); - p.limit(limit); - } - - if (!currentBuffer.hasRemaining()) - { - currentBuffer.flip(); - currentBufferIndex++; + result[i].put(view); } + result[i].flip(); + payload.position(payload.position() + remaining); } } + return result; } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java index 8cc6f92..8a150fa 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java @@ -77,34 +77,39 @@ public class DecodeErrorTest extends BrokerAdminUsingTestBase try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { - List<QpidByteBuffer> combinedPayload = new ArrayList<>(); + List<QpidByteBuffer> payloads = new ArrayList<>(); final HeaderSection headerSection = new Header().createEncodingRetainingSection(); - combinedPayload.addAll(headerSection.getEncodedForm()); + payloads.add(headerSection.getEncodedForm()); headerSection.dispose(); final StringWriter stringWriter = new StringWriter("string in between annotation sections"); QpidByteBuffer encodedString = QpidByteBuffer.allocate(stringWriter.getEncodedSize()); stringWriter.writeToBuffer(encodedString); encodedString.flip(); - combinedPayload.add(encodedString); + payloads.add(encodedString); final DeliveryAnnotationsSection deliveryAnnotationsSection = new DeliveryAnnotations(Collections.emptyMap()).createEncodingRetainingSection(); - combinedPayload.addAll(deliveryAnnotationsSection.getEncodedForm()); + payloads.add(deliveryAnnotationsSection.getEncodedForm()); deliveryAnnotationsSection.dispose(); - final Detach detachResponse = transport.newInteraction() - .negotiateProtocol().consumeResponse() - .open().consumeResponse(Open.class) - .begin().consumeResponse(Begin.class) - .attachRole(Role.SENDER) - .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME) - .attach().consumeResponse(Attach.class) - .consumeResponse(Flow.class) - .transferMessageFormat(UnsignedInteger.ZERO) - .transferPayload(combinedPayload) - .transfer() - .consumeResponse() - .getLatestResponse(Detach.class); + final Detach detachResponse; + try (QpidByteBuffer combinedPayload = QpidByteBuffer.concatenate(payloads)) + { + detachResponse = transport.newInteraction() + .negotiateProtocol().consumeResponse() + .open().consumeResponse(Open.class) + .begin().consumeResponse(Begin.class) + .attachRole(Role.SENDER) + .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME) + .attach().consumeResponse(Attach.class) + .consumeResponse(Flow.class) + .transferMessageFormat(UnsignedInteger.ZERO) + .transferPayload(combinedPayload) + .transfer() + .consumeResponse() + .getLatestResponse(Detach.class); + } + payloads.forEach(QpidByteBuffer::dispose); assertThat(detachResponse.getError(), is(notNullValue())); assertThat(detachResponse.getError().getCondition(), is(equalTo(DECODE_ERROR))); } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java index 6a373d1..97fe3bb 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java @@ -26,7 +26,6 @@ import static org.hamcrest.Matchers.is; import static org.junit.Assert.fail; import java.net.InetSocketAddress; -import java.util.Collections; import org.junit.Before; import org.junit.Test; @@ -83,18 +82,22 @@ public class MessageFormat extends BrokerAdminUsingTestBase .consumeResponse(Flow.class) .transferMore(true) .transferMessageFormat(UnsignedInteger.ZERO) - .transferPayload(Collections.singletonList(payloads[0])) + .transferPayload(payloads[0]) .transfer() .consumeResponse(null, Flow.class, Disposition.class) .transferDeliveryTag(null) .transferDeliveryId(null) .transferMore(false) .transferMessageFormat(UnsignedInteger.ONE) - .transferPayload(Collections.singletonList(payloads[1])) + .transferPayload(payloads[1]) .transfer() .consumeResponse(Detach.class, End.class, Close.class) .getLatestResponse(); + for (final QpidByteBuffer payload : payloads) + { + payload.dispose(); + } assertThat(latestResponse, is(notNullValue())); final Object responseBody = latestResponse.getBody(); final Error error; http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java index 5c7a7f5..bcd155f 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java @@ -29,7 +29,6 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.isOneOf; import java.net.InetSocketAddress; -import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -51,13 +50,13 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Flow; import org.apache.qpid.server.protocol.v1_0.type.transport.Open; import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode; import org.apache.qpid.server.protocol.v1_0.type.transport.Role; -import org.apache.qpid.tests.utils.BrokerAdmin; import org.apache.qpid.tests.protocol.v1_0.FrameTransport; import org.apache.qpid.tests.protocol.v1_0.Interaction; -import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase; import org.apache.qpid.tests.protocol.v1_0.Response; import org.apache.qpid.tests.protocol.v1_0.SpecificationTest; import org.apache.qpid.tests.protocol.v1_0.Utils; +import org.apache.qpid.tests.utils.BrokerAdmin; +import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase; public class MultiTransferTest extends BrokerAdminUsingTestBase { @@ -111,18 +110,22 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL) .attach().consumeResponse(Attach.class) .consumeResponse(Flow.class) - .transferPayload(Collections.singletonList(payloads[0])) + .transferPayload(payloads[0]) .transferDeliveryId(deliveryId) .transferDeliveryTag(deliveryTag) .transferMore(true) .transfer() .sync() .transferMore(false) - .transferPayload(Collections.singletonList(payloads[1])) + .transferPayload(payloads[1]) .transfer() .consumeResponse() .getLatestResponse(Disposition.class); + for (final QpidByteBuffer payload : payloads) + { + payload.dispose(); + } assertThat(disposition.getFirst(), is(equalTo(deliveryId))); assertThat(disposition.getLast(), isOneOf(null, deliveryId)); assertThat(disposition.getSettled(), is(equalTo(false))); @@ -155,30 +158,34 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase .transferDeliveryId(deliveryId) .transferDeliveryTag(deliveryTag) .transferMore(true) - .transferPayload(Collections.singletonList(payloads[0])) + .transferPayload(payloads[0]) .transfer() .sync() .transferDeliveryId(deliveryId) .transferDeliveryTag(null) .transferMore(true) - .transferPayload(Collections.singletonList(payloads[1])) + .transferPayload(payloads[1]) .transfer() .sync() .transferDeliveryId(null) .transferDeliveryTag(deliveryTag) .transferMore(true) - .transferPayload(Collections.singletonList(payloads[2])) + .transferPayload(payloads[2]) .transfer() .sync() .transferDeliveryId(null) .transferDeliveryTag(null) .transferMore(false) - .transferPayload(Collections.singletonList(payloads[3])) + .transferPayload(payloads[3]) .transfer() .consumeResponse(); Disposition disposition = interaction.getLatestResponse(Disposition.class); + for (final QpidByteBuffer payload : payloads) + { + payload.dispose(); + } assertThat(disposition.getFirst(), is(equalTo(deliveryId))); assertThat(disposition.getLast(), isOneOf(null, deliveryId)); assertThat(disposition.getSettled(), is(equalTo(false))); @@ -212,7 +219,7 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL) .attach().consumeResponse(Attach.class) .consumeResponse(Flow.class) - .transferPayload(Collections.singletonList(payloads[0])) + .transferPayload(payloads[0]) .transferDeliveryId(deliveryId) .transferDeliveryTag(deliveryTag) .transferMore(true) @@ -223,6 +230,10 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase .transferAborted(true) .transfer(); + for (final QpidByteBuffer payload : payloads) + { + payload.dispose(); + } Response<?> latestResponse = interaction.consumeResponse(new Class<?>[] {null}).getLatestResponse(); assertThat(latestResponse, is(nullValue())); } @@ -272,7 +283,7 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase .transferDeliveryId(deliverId1) .transferDeliveryTag(deliveryTag1) .transferMore(true) - .transferPayload(Collections.singletonList(messagePayload1[0])) + .transferPayload(messagePayload1[0]) .transfer() .sync() @@ -280,7 +291,7 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase .transferDeliveryId(deliveryId2) .transferDeliveryTag(deliveryTag2) .transferMore(true) - .transferPayload(Collections.singletonList(messagePayload2[0])) + .transferPayload(messagePayload2[0]) .transfer() .sync() @@ -288,7 +299,7 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase .transferDeliveryId(deliverId1) .transferDeliveryTag(deliveryTag1) .transferMore(false) - .transferPayload(Collections.singletonList(messagePayload1[1])) + .transferPayload(messagePayload1[1]) .transfer() .sync() @@ -296,10 +307,19 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase .transferDeliveryId(deliveryId2) .transferDeliveryTag(deliveryTag2) .transferMore(false) - .transferPayload(Collections.singletonList(messagePayload2[1])) + .transferPayload(messagePayload2[1]) .transfer() .sync(); + for (final QpidByteBuffer payload : messagePayload1) + { + payload.dispose(); + } + for (final QpidByteBuffer payload : messagePayload2) + { + payload.dispose(); + } + Map<UnsignedInteger, Disposition> dispositionMap = new HashMap<>(); for (int i = 0; i < 2; i++) { @@ -349,16 +369,24 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase .transferDeliveryId(deliverId1) .transferDeliveryTag(deliveryTag1) .transferMore(true) - .transferPayload(Collections.singletonList(messagePayload1[0])) + .transferPayload(messagePayload1[0]) .transfer() .sync() .transferDeliveryId(deliveryId2) .transferDeliveryTag(deliveryTag2) .transferMore(true) - .transferPayload(Collections.singletonList(messagePayload2[0])) + .transferPayload(messagePayload2[0]) .transfer() .sync(); + for (final QpidByteBuffer payload : messagePayload1) + { + payload.dispose(); + } + for (final QpidByteBuffer payload : messagePayload2) + { + payload.dispose(); + } interaction.consumeResponse(Detach.class, End.class, Close.class); } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java index f081006..1853446 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java @@ -94,7 +94,6 @@ public class OutcomeTest extends BrokerAdminUsingTestBase .flow() .receiveDelivery() .decodeLatestDelivery(); - ; Object secondDeliveryPayload = interaction.getDecodedLatestDelivery(); assertThat(secondDeliveryPayload, is(equalTo("message2"))); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java index 5ce5e5d..eb72532 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java @@ -47,7 +47,7 @@ import org.junit.Before; import org.junit.Ignore; import org.junit.Test; -import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils; +import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.protocol.v1_0.type.Binary; import org.apache.qpid.server.protocol.v1_0.type.ErrorCarryingFrameBody; import org.apache.qpid.server.protocol.v1_0.type.Outcome; @@ -618,7 +618,11 @@ public class TransferTest extends BrokerAdminUsingTestBase assertThat(first.getMore(), is(equalTo(true))); messageDecoder.addTransfer(first); - final long firstRemaining = QpidByteBufferUtils.remaining(first.getPayload()); + final long firstRemaining; + try (QpidByteBuffer payload = first.getPayload()) + { + firstRemaining = payload.remaining(); + } Received state = new Received(); state.setSectionNumber(UnsignedInteger.ZERO); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java index 5769255..35ee4e7 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java @@ -67,13 +67,13 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode; import org.apache.qpid.server.protocol.v1_0.type.transport.Role; import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode; import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer; -import org.apache.qpid.tests.utils.BrokerAdmin; import org.apache.qpid.tests.protocol.v1_0.FrameTransport; import org.apache.qpid.tests.protocol.v1_0.Interaction; -import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase; import org.apache.qpid.tests.protocol.v1_0.Response; import org.apache.qpid.tests.protocol.v1_0.SpecificationTest; import org.apache.qpid.tests.protocol.v1_0.Utils; +import org.apache.qpid.tests.utils.BrokerAdmin; +import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase; public class ResumeDeliveriesTest extends BrokerAdminUsingTestBase { @@ -715,7 +715,7 @@ public class ResumeDeliveriesTest extends BrokerAdminUsingTestBase .transferDeliveryId(UnsignedInteger.ZERO) .transferDeliveryTag(deliveryTag) .transferMore(true) - .transferPayload(Collections.singletonList(messagePayload[0])) + .transferPayload(messagePayload[0]) .transfer(); // 3. detach the link @@ -740,9 +740,14 @@ public class ResumeDeliveriesTest extends BrokerAdminUsingTestBase .transfer() .sync() .transferMore(false) - .transferPayload(Collections.singletonList(messagePayload[1])) + .transferPayload(messagePayload[1]) .transfer(); + for (final QpidByteBuffer payload : messagePayload) + { + payload.dispose(); + } + boolean settled = false; do { http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java ---------------------------------------------------------------------- diff --git a/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java b/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java index b06eb3e..d27eb1b 100644 --- a/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java +++ b/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java @@ -310,6 +310,7 @@ public class ProtocolNegotiationTest extends QpidBrokerTestCase ConnectionHeartbeat heartbeat = new ConnectionHeartbeat(); ServerDisassembler serverDisassembler = new ServerDisassembler(sender, Frame.HEADER_SIZE + 1); serverDisassembler.command(null, heartbeat); + serverDisassembler.closed(); } else if(isBrokerPre010()) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
