This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch activemq-5.19.x
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/activemq-5.19.x by this push:
     new e6fac68191 Provide a flexible filter style input stream that limits 
read amounts (#2118) (#2121)
e6fac68191 is described below

commit e6fac68191c96f8252c625572e9bd05ec1432a79
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Mon Jun 15 16:20:56 2026 -0400

    Provide a flexible filter style input stream that limits read amounts 
(#2118) (#2121)
    
    This commit supplies a utility type that acts as a filter input stream
    style stream wrapper than can be configured with an available bytes
    window which decreases as bytes are read from the stream. This allows
    a transport or other utility to limit what can be read based on a fixed
    amount such as a max frame size option.
    
    (cherry picked from commit 81ab8875bef634ee33026ed180586d2d36040bfb)
    
    Co-authored-by: Timothy Bish <[email protected]>
---
 .../FrameSizeLimitedFilterInputStream.java         | 247 +++++++++++
 .../FrameSizeLimitedFilterInputStreamTest.java     | 492 +++++++++++++++++++++
 2 files changed, 739 insertions(+)

diff --git 
a/activemq-client/src/main/java/org/apache/activemq/transport/FrameSizeLimitedFilterInputStream.java
 
b/activemq-client/src/main/java/org/apache/activemq/transport/FrameSizeLimitedFilterInputStream.java
new file mode 100644
index 0000000000..f96b12427d
--- /dev/null
+++ 
b/activemq-client/src/main/java/org/apache/activemq/transport/FrameSizeLimitedFilterInputStream.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.transport;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Objects;
+
+/**
+ * A filtered style input stream that allows reads up to a given known max 
frame size
+ * before it starts to throw exceptions indicating the reader has exceeded the 
set
+ * limit. This can be used to wrap another stream that contains a protocol 
frame to
+ * be parsed and enforce that decoding of that frame does not cross the 
boundary set
+ * as the max available bytes before error.
+ * <p>
+ * This is a specialized stream type that may obfuscate the actual state of 
the underlying
+ * stream such as its actual available bytes. The user should be aware of the 
behavior of
+ * this stream when using it to ensure they do not run into unexpected 
failures. It is possible
+ * to configure this stream with a higher available limit than the underlying 
stream actually
+ * has access to but that inconsistency is left as a requirement for the 
caller to handle.
+ */
+public class FrameSizeLimitedFilterInputStream extends InputStream {
+
+    private boolean canMark;
+
+    private int maxAvailableBytes;
+    private int availableBytes;
+
+    private int markLimit;
+    private int markRemaining;
+
+    private InputStream stream;
+
+    /**
+     * Create a new uninitialized instance of the filter stream that will fail 
to
+     * read until a stream and a frame size limit is configured.
+     */
+    public FrameSizeLimitedFilterInputStream() {
+        this.maxAvailableBytes = 0;
+        this.availableBytes = maxAvailableBytes;
+    }
+
+    /**
+     * Create a new instance with the given amount of available bytes that 
should be
+     * readable before an exception is thrown indicating that more bytes where 
requested
+     * from the known fixed frame size than is allowed.
+     *
+     * @param available
+     *                 The number of available bytes to allow in a given frame.
+     * @param in
+     *                 The {@link InputStream} to read from (cannot be null).
+     */
+    public FrameSizeLimitedFilterInputStream(int available, InputStream in) {
+        if (available < 0) {
+            throw new IllegalArgumentException("Available bytes needs to be a 
positive integer but was: " + available);
+        }
+
+        this.stream = Objects.requireNonNull(in);
+        this.canMark = in.markSupported();
+        this.maxAvailableBytes = available;
+        this.availableBytes = maxAvailableBytes;
+    }
+
+    /**
+     * Render the stream unusable until a reset is called that either changes
+     * the stream and assigns a new max or simply assigns a new max which
+     * assumes that the underlying stream remains readable which is only a
+     * subset of stream types such as byte array wrapper variants.
+     */
+    @Override
+    public void close() throws IOException {
+        maxAvailableBytes = availableBytes = markLimit = markRemaining = 0;
+        canMark = false;
+        if (stream != null) {
+            stream.close();
+        }
+    }
+
+    /**
+     * Resets the number of available bytes that can be read from the 
underlying
+     * stream. The underlying stream may still throw exceptions if it cannot 
provide
+     * this many bytes. As a result of calling this method any currently set 
mark
+     * is cleared and the stream cannot be reset back to a previously available
+     * number of bytes from this point onward.
+     * <p>
+     * Calling this method on a stream wrapper that has not been initialized 
will
+     * not result in a readable state, the limit remains zero.
+     */
+    public void resetAvailable() {
+        resetAvailable(maxAvailableBytes);
+    }
+
+    /**
+     * Resets the number of available bytes that can be read from the 
underlying
+     * stream to the new amount. The underlying stream may still throw 
exceptions
+     * if it cannot provide this many bytes. As a result of calling this method
+     * any currently set mark is cleared and the stream cannot be reset back 
to a
+     * previously available number of bytes from this point onward.
+     *
+     * @param available
+     *                 The new available number of bytes to allow from this 
stream wrapper
+     */
+    public void resetAvailable(int available) {
+        resetAvailable(stream, available);
+    }
+
+    /**
+     * Resets the number of available bytes and assigns a new stream that can 
be read
+     * from the which allows this type to be re-usable across command reads. 
The underlying
+     * stream may still throw exceptions if it cannot provide this many bytes. 
As a result
+     * of calling this method any currently set mark is cleared and the stream 
cannot be
+     * reset back to a previously available number of bytes from this point 
onward.
+     *
+     * @param in
+     *                 The new input stream to read bytes from (cannot be 
assigned as null).
+     * @param available
+     *                 The new available number of bytes to allow from this 
stream wrapper
+     */
+   public void resetAvailable(InputStream in, int available) {
+       if (available < 0) {
+           throw new IllegalArgumentException("Available bytes needs to be a 
positive integer but was: " + available);
+       }
+
+       availableBytes = maxAvailableBytes = available;
+       markLimit = markRemaining = 0;
+       stream = Objects.requireNonNull(in);
+       canMark = stream.markSupported();
+   }
+
+    @Override
+    public int read() throws IOException {
+        Objects.requireNonNull(stream, "The stream wrapper has not been bound 
to a source input stream");
+
+        validateAvailable(1, availableBytes);
+
+        final int read = stream.read();
+
+        reduceAvailable(1);
+
+        return read;
+    }
+
+    @Override
+    public int read(byte[] b) throws IOException {
+        return read(b, 0, b.length);
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        Objects.requireNonNull(stream, "The stream wrapper has not been bound 
to a source input stream");
+
+        // It is technically permissible for this method to read up to 
available
+        // bytes if the length is greater than that but it is likely not going 
to
+        // result in outcomes we can predict as easily so for now this is 
limited
+        // and just throws for anything over available bytes. This could be 
changed
+        // to call a read using Math.min(availableBytes, length) but what could
+        // happen is we get into a read loop where we endlessly return end of 
stream
+        // which won't send the signal that a read past the max limit was 
triggered.
+        validateAvailable(len, availableBytes);
+
+        return reduceAvailable(stream.read(b, off, len));
+    }
+
+    @Override
+    public long skip(long amount) throws IOException {
+        if (amount < 0) {
+            return 0;
+        }
+
+        Objects.requireNonNull(stream, "The stream wrapper has not been bound 
to a source input stream");
+
+        final int safeSkipRange = (int) Math.min(Integer.MAX_VALUE, amount);
+
+        // Max frame size is limited to Integer.MAX_VALUE as we store that 
value as an integer
+        // so don't accept more than that amount which is valid and does allow 
the caller to
+        // skip that full massive frame but will fail on the next stream 
operation.
+        validateAvailable(safeSkipRange, availableBytes);
+
+        return reduceAvailable((int) stream.skip(safeSkipRange));
+    }
+
+    @Override
+    public int available() throws IOException {
+        return availableBytes;
+    }
+
+    @Override
+    public void mark(int readLimit) {
+        if (canMark && readLimit > 0) {
+            markLimit = markRemaining = readLimit;
+            stream.mark(readLimit);
+        }
+    }
+
+    @Override
+    public void reset() throws IOException {
+        if (canMark && markLimit > 0) {
+            availableBytes += markLimit - markRemaining;
+            markRemaining = markLimit = 0;
+            stream.reset();
+        }
+    }
+
+    @Override
+    public boolean markSupported() {
+        return canMark;
+    }
+
+    private static void validateAvailable(int requested, int available) throws 
IOException {
+        if (requested > available) {
+            throw new IOException(String.format(
+                "Cannot read more than the max available %d bytes: requested 
%d", available, requested));
+        }
+    }
+
+    private int reduceAvailable(int amount) throws IOException {
+        try {
+            availableBytes = Math.subtractExact(availableBytes, amount);
+        } catch (ArithmeticException e) {
+            throw new IOException(e);
+        }
+
+        if (markLimit > 0) {
+            markRemaining = markRemaining - amount;
+            if (markRemaining < 0) {
+                markLimit = markRemaining = 0;
+            }
+        }
+
+        return amount;
+    }
+}
diff --git 
a/activemq-client/src/test/java/org/apache/activemq/transport/FrameSizeLimitedFilterInputStreamTest.java
 
b/activemq-client/src/test/java/org/apache/activemq/transport/FrameSizeLimitedFilterInputStreamTest.java
new file mode 100644
index 0000000000..9be7eb50c6
--- /dev/null
+++ 
b/activemq-client/src/test/java/org/apache/activemq/transport/FrameSizeLimitedFilterInputStreamTest.java
@@ -0,0 +1,492 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.transport;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+import org.junit.Test;
+
+/**
+ * Tests for FrameSizeLimitedFilterInputStream
+ */
+public class FrameSizeLimitedFilterInputStreamTest {
+
+    private static final int DEFAULT_TEST_PAYLOAD_SIZE = 256;
+
+    private byte[] createPayload() {
+        final byte[] data = new byte[DEFAULT_TEST_PAYLOAD_SIZE];
+
+        for (int i = 0; i < DEFAULT_TEST_PAYLOAD_SIZE; ++i) {
+            data[i] = (byte) i;
+        }
+
+        return data;
+    }
+
+    @Test
+    public void testCreate() throws IOException {
+        final ByteArrayInputStream bais = new ByteArrayInputStream(new 
byte[2048]);
+
+        try (FrameSizeLimitedFilterInputStream stream = new 
FrameSizeLimitedFilterInputStream(1024, bais)) {
+            assertTrue(stream.markSupported());
+            assertEquals(1024, stream.available());
+        }
+    }
+
+    @Test
+    public void testCreateChecks() throws IOException {
+        assertThrows(NullPointerException.class, () -> new 
FrameSizeLimitedFilterInputStream(1024, null));
+        assertThrows(IllegalArgumentException.class, () -> new 
FrameSizeLimitedFilterInputStream(-1, new ByteArrayInputStream(new byte[0])));
+    }
+
+    @Test
+    public void testCreateUnbound() throws IOException {
+        try (FrameSizeLimitedFilterInputStream stream = new 
FrameSizeLimitedFilterInputStream()) {
+            assertThrows(NullPointerException.class, () -> stream.read());
+            assertThrows(NullPointerException.class, () -> stream.skip(1));
+            assertThrows(NullPointerException.class, () -> stream.read(new 
byte[0]));
+            assertThrows(NullPointerException.class, () -> stream.read(new 
byte[1], 0, 1));
+        }
+    }
+
+    @Test
+    public void testUnusableAfterClose() throws IOException {
+        final ByteArrayInputStream bais = new ByteArrayInputStream(new 
byte[2048]);
+
+        try (FrameSizeLimitedFilterInputStream stream = new 
FrameSizeLimitedFilterInputStream(1024, bais)) {
+            stream.close();
+
+            assertThrows(IOException.class, () -> stream.read());
+            assertThrows(IOException.class, () -> stream.skip(1));
+            assertThrows(IOException.class, () -> stream.read(new byte[1]));
+            assertThrows(IOException.class, () -> stream.read(new byte[1], 0, 
1));
+
+            assertFalse(stream.markSupported());
+
+            // Should no-op and not throw
+            stream.mark(1);
+            stream.reset();
+        }
+    }
+
+    @Test
+    public void testReadByte() throws IOException {
+        final ByteArrayInputStream bais = new 
ByteArrayInputStream(createPayload());
+
+        try (FrameSizeLimitedFilterInputStream stream = new 
FrameSizeLimitedFilterInputStream(Byte.MAX_VALUE, bais)) {
+            for (int i = 0; i < Byte.MAX_VALUE; ++i) {
+                assertEquals(i, stream.read());
+            }
+
+            assertThrows(IOException.class, () -> stream.read());
+        }
+    }
+
+    @Test
+    public void testReadFailsAfterClosed() throws IOException {
+        final ByteArrayInputStream bais = new 
ByteArrayInputStream(createPayload());
+
+        try (FrameSizeLimitedFilterInputStream stream = new 
FrameSizeLimitedFilterInputStream(Byte.MAX_VALUE, bais)) {
+            stream.close();
+
+            assertThrows(IOException.class, () -> stream.read());
+            assertThrows(IOException.class, () -> stream.read(new byte[10]));
+            assertThrows(IOException.class, () -> stream.read(new byte[10], 0, 
10));
+        }
+    }
+
+    @Test
+    public void testReadByteAndResetAvailable() throws IOException {
+        final ByteArrayInputStream bais = new 
ByteArrayInputStream(createPayload());
+
+        try (FrameSizeLimitedFilterInputStream stream = new 
FrameSizeLimitedFilterInputStream(1, bais)) {
+            assertEquals(0, stream.read());
+            assertThrows(IOException.class, () -> stream.read());
+
+            stream.resetAvailable();
+
+            assertEquals(1, stream.read());
+            assertThrows(IOException.class, () -> stream.read());
+
+            stream.resetAvailable(2);
+
+            assertEquals(2, stream.read());
+            assertEquals(3, stream.read());
+            assertThrows(IOException.class, () -> stream.read());
+        }
+    }
+
+    @Test
+    public void testReadBytes() throws IOException {
+        final ByteArrayInputStream bais = new 
ByteArrayInputStream(createPayload());
+
+        try (FrameSizeLimitedFilterInputStream stream = new 
FrameSizeLimitedFilterInputStream(Byte.MAX_VALUE, bais)) {
+            final byte[] sink = new byte[Byte.MAX_VALUE];
+
+            assertEquals(Byte.MAX_VALUE, stream.read(sink));
+
+            for (int i = 0; i < Byte.MAX_VALUE; ++i) {
+                assertEquals(i, sink[i]);
+            }
+
+            assertThrows(IOException.class, () -> stream.read());
+        }
+    }
+
+    @Test
+    public void testReadBytesAndResetAvailable() throws IOException {
+        final ByteArrayInputStream bais = new 
ByteArrayInputStream(createPayload());
+        final byte[] sink = new byte[1];
+
+        try (FrameSizeLimitedFilterInputStream stream = new 
FrameSizeLimitedFilterInputStream(1, bais)) {
+            assertEquals(1, stream.read(sink));
+            assertEquals(0, sink[0]);
+            assertThrows(IOException.class, () -> stream.read());
+
+            stream.resetAvailable();
+
+            assertEquals(1, stream.read(sink));
+            assertEquals(1, sink[0]);
+            assertThrows(IOException.class, () -> stream.read());
+
+            stream.resetAvailable(2);
+
+            assertEquals(1, stream.read(sink));
+            assertEquals(2, sink[0]);
+            assertEquals(1, stream.read(sink));
+            assertEquals(3, sink[0]);
+            assertThrows(IOException.class, () -> stream.read());
+        }
+    }
+
+    @Test
+    public void testReadBytesIndexed() throws IOException {
+        final ByteArrayInputStream bais = new 
ByteArrayInputStream(createPayload());
+
+        try (FrameSizeLimitedFilterInputStream stream = new 
FrameSizeLimitedFilterInputStream(Byte.MAX_VALUE, bais)) {
+            final byte[] sink = new byte[Byte.MAX_VALUE];
+
+            assertEquals(Byte.MAX_VALUE, stream.read(sink, 0, sink.length));
+
+            for (int i = 0; i < Byte.MAX_VALUE; ++i) {
+                assertEquals(i, sink[i]);
+            }
+
+            assertThrows(IOException.class, () -> stream.read(sink, 0, 
sink.length));
+        }
+    }
+
+    @Test
+    public void testReadBytesIndexedAndResetAvailable() throws IOException {
+        final ByteArrayInputStream bais = new 
ByteArrayInputStream(createPayload());
+        final byte[] sink = new byte[1];
+
+        try (FrameSizeLimitedFilterInputStream stream = new 
FrameSizeLimitedFilterInputStream(1, bais)) {
+            assertEquals(1, stream.read(sink, 0, sink.length));
+            assertEquals(0, sink[0]);
+            assertThrows(IOException.class, () -> stream.read());
+
+            stream.resetAvailable();
+
+            assertEquals(1, stream.read(sink, 0, sink.length));
+            assertEquals(1, sink[0]);
+            assertThrows(IOException.class, () -> stream.read());
+
+            stream.resetAvailable(2);
+
+            assertEquals(1, stream.read(sink, 0, sink.length));
+            assertEquals(2, sink[0]);
+            assertEquals(1, stream.read(sink, 0, sink.length));
+            assertEquals(3, sink[0]);
+            assertThrows(IOException.class, () -> stream.read());
+        }
+    }
+
+    @Test
+    public void testSkipBytes() throws IOException {
+        final ByteArrayInputStream bais = new 
ByteArrayInputStream(createPayload());
+
+        try (FrameSizeLimitedFilterInputStream stream = new 
FrameSizeLimitedFilterInputStream(1, bais)) {
+            assertEquals(1, stream.skip(1));
+            assertThrows(IOException.class, () -> stream.skip(1));
+
+            stream.resetAvailable();
+
+            assertEquals(1, stream.skip(1));
+            assertThrows(IOException.class, () -> stream.skip(10));
+
+            stream.resetAvailable(2);
+
+            assertEquals(2, stream.skip(2));
+            assertThrows(IOException.class, () -> stream.skip(100));
+
+            stream.resetAvailable();
+
+            assertEquals(4, stream.read());
+            assertEquals(5, stream.read());
+
+            assertThrows(IOException.class, () -> stream.skip(1));
+        }
+    }
+
+    @Test
+    public void testSkipNegativeThrows() throws IOException {
+        final ByteArrayInputStream bais = new 
ByteArrayInputStream(createPayload());
+
+        try (FrameSizeLimitedFilterInputStream stream = new 
FrameSizeLimitedFilterInputStream(1, bais)) {
+            assertEquals(1, stream.available());
+            assertEquals(0, stream.skip(-1));
+            assertEquals(1, stream.available());
+        }
+    }
+
+    @Test
+    public void testSkipMassiveSize() throws IOException {
+        final ByteArrayInputStream bais = new 
ByteArrayInputStream(createPayload()) {
+
+            @Override
+            public long skip(long amount) {
+                return amount;
+            }
+        };
+
+        try (FrameSizeLimitedFilterInputStream stream = new 
FrameSizeLimitedFilterInputStream(Integer.MAX_VALUE, bais)) {
+            assertEquals(Integer.MAX_VALUE, stream.skip(Long.MAX_VALUE));
+
+            assertThrows(IOException.class, () -> stream.skip(1));
+            assertThrows(IOException.class, () -> stream.read());
+            assertThrows(IOException.class, () -> stream.read(new byte[1]));
+            assertThrows(IOException.class, () -> stream.read(new byte[1], 0, 
1));
+        }
+    }
+
+    @Test
+    public void testSkipBytesPastConfiguredAvailable() throws IOException {
+        final ByteArrayInputStream bais = new 
ByteArrayInputStream(createPayload());
+
+        try (FrameSizeLimitedFilterInputStream stream = new 
FrameSizeLimitedFilterInputStream(1, bais)) {
+            assertThrows(IOException.class, () -> stream.skip(2));
+
+            assertEquals(1, stream.skip(1));
+
+            assertThrows(IOException.class, () -> stream.skip(1));
+        }
+    }
+
+    @Test
+    public void testBasicMark() throws IOException {
+        final ByteArrayInputStream bais = new 
ByteArrayInputStream(createPayload());
+
+        assertTrue(bais.markSupported());
+
+        try (FrameSizeLimitedFilterInputStream stream = new 
FrameSizeLimitedFilterInputStream(10, bais)) {
+            assertTrue(stream.markSupported());
+
+            stream.mark(1);
+            assertEquals(0, stream.read());
+            assertEquals(9, stream.available());
+
+            stream.reset();
+            assertEquals(10, stream.available());
+            assertEquals(0, stream.read());
+            assertEquals(9, stream.available());
+
+            stream.reset(); // Mark wasn't called
+            assertEquals(9, stream.available());
+        }
+    }
+
+    @Test
+    public void testMarkZeroIsIgnored() throws IOException {
+        final ByteArrayInputStream bais = new 
ByteArrayInputStream(createPayload());
+
+        assertTrue(bais.markSupported());
+
+        try (FrameSizeLimitedFilterInputStream stream = new 
FrameSizeLimitedFilterInputStream(10, bais)) {
+            assertTrue(stream.markSupported());
+
+            stream.mark(0);
+            assertEquals(0, stream.read());
+            assertEquals(9, stream.available());
+
+            stream.reset();
+            assertEquals(9, stream.available());
+            assertEquals(1, stream.read());
+            assertEquals(8, stream.available());
+
+            stream.reset(); // Mark wasn't called
+            assertEquals(8, stream.available());
+        }
+    }
+
+    @Test
+    public void testLastMarkWins() throws IOException {
+        final ByteArrayInputStream bais = new 
ByteArrayInputStream(createPayload());
+
+        assertTrue(bais.markSupported());
+
+        try (FrameSizeLimitedFilterInputStream stream = new 
FrameSizeLimitedFilterInputStream(10, bais)) {
+            assertTrue(stream.markSupported());
+
+            stream.mark(1);
+            stream.mark(2);
+            stream.mark(3);
+            stream.mark(4);
+
+            final byte[] read1 = new byte[4];
+            final byte[] read2 = new byte[4];
+
+            stream.read(read1);
+            assertEquals(6, stream.available());
+            stream.reset();
+            assertEquals(10, stream.available());
+            stream.read(read2);
+
+            assertArrayEquals(read1, read2);
+        }
+    }
+    @Test
+    public void testReadPastMarkClearsMark() throws IOException {
+        final ByteArrayInputStream bais = new 
ByteArrayInputStream(createPayload());
+
+        assertTrue(bais.markSupported());
+
+        try (FrameSizeLimitedFilterInputStream stream = new 
FrameSizeLimitedFilterInputStream(10, bais)) {
+            assertTrue(stream.markSupported());
+
+            stream.mark(3);
+
+            assertEquals(3, stream.read(new byte[3]));
+            assertEquals(3, stream.read()); // Past mark.
+            assertEquals(6, stream.available());
+
+            stream.reset(); // Should have no affect
+
+            assertEquals(6, stream.available());
+            assertEquals(4, stream.read());
+
+            assertThrows(IOException.class, () -> stream.read(new byte[10], 0, 
10));
+
+            assertEquals(5, stream.available());
+            assertEquals(5, stream.read());
+        }
+    }
+
+    @Test
+    public void testMarkNotSupported() throws IOException {
+        final ByteArrayInputStream bais = new 
ByteArrayInputStream(createPayload()) {
+
+            @Override
+            public boolean markSupported() {
+                return false;
+            }
+        };
+
+        assertFalse(bais.markSupported());
+
+        try (FrameSizeLimitedFilterInputStream stream = new 
FrameSizeLimitedFilterInputStream(10, bais)) {
+            assertFalse(stream.markSupported());
+
+            stream.mark(1);
+            assertEquals(0, stream.read());
+            assertEquals(9, stream.available());
+
+            stream.reset();
+            assertEquals(9, stream.available());
+            assertEquals(1, stream.read());
+            assertEquals(8, stream.available());
+
+            stream.reset();
+            assertEquals(8, stream.available());
+
+            final byte[] first = new byte[8];
+
+            stream.mark(8);
+            stream.read(first);
+            assertEquals(0, stream.available());
+            stream.reset();
+            assertEquals(0, stream.available());
+
+            stream.resetAvailable(10);
+
+            final byte[] second = new byte[10];
+
+            stream.mark(10);
+            stream.read(second);
+            assertEquals(0, stream.available());
+            stream.reset();
+            assertEquals(0, stream.available());
+        }
+    }
+
+    @Test
+    public void testResetToNewStream() throws IOException {
+        final ByteArrayInputStream bais = new ByteArrayInputStream(new 
byte[2048]);
+
+        try (FrameSizeLimitedFilterInputStream stream = new 
FrameSizeLimitedFilterInputStream(2048, bais)) {
+            assertTrue(stream.markSupported());
+            assertEquals(2048, stream.available());
+
+            final ByteArrayInputStream nextStream = new 
ByteArrayInputStream(new byte[1024]) {
+
+                @Override
+                public boolean markSupported() {
+                    return false;
+                }
+            };
+
+            assertFalse(nextStream.markSupported());
+
+            stream.resetAvailable(nextStream, 1024);
+
+            assertFalse(stream.markSupported());
+            assertEquals(1024, stream.available());
+        }
+    }
+
+    @Test
+    public void testResetWithinSameStream() throws IOException {
+        final ByteArrayInputStream bais = new 
ByteArrayInputStream(createPayload());
+
+        try (FrameSizeLimitedFilterInputStream stream = new 
FrameSizeLimitedFilterInputStream(DEFAULT_TEST_PAYLOAD_SIZE, bais)) {
+            assertTrue(stream.markSupported());
+            assertEquals(DEFAULT_TEST_PAYLOAD_SIZE, stream.available());
+
+            assertEquals(0, stream.read());
+
+            assertThrows(IllegalArgumentException.class, () -> 
stream.resetAvailable(-1));
+            stream.resetAvailable(1);
+
+            assertEquals(1, stream.read());
+            assertThrows(IOException.class, () -> stream.read());
+
+            stream.resetAvailable(2);
+
+            assertEquals(2, stream.read());
+            assertEquals(3, stream.read());
+            assertThrows(IOException.class, () -> stream.read());
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to