This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch activemq-5.19.x
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/activemq-5.19.x by this push:
new 38ac99fba7 Improve handling of partial reads and EOF in Frame size
limited filter (#2154) (#2156)
38ac99fba7 is described below
commit 38ac99fba792af0a09950d33f634465613538e3e
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Tue Jun 23 20:00:38 2026 -0400
Improve handling of partial reads and EOF in Frame size limited filter
(#2154) (#2156)
This update makes the following changes/improvements:
* If the wrapped stream is finished and returns -1 this is now returned
correctly and availableBytes are not decremented.
* Partial reads now will be attempted up to the availableBytes limit
* If 0 is passed in for the length for a partial read then no attempt to
read is done and 0 is returned, in line with the spec.
(cherry picked from commit 5d9c3036b662995642b47a6ecdb4e5eb652621c9)
---
.../FrameSizeLimitedFilterInputStream.java | 34 +++++++++++++++-------
.../FrameSizeLimitedFilterInputStreamTest.java | 33 +++++++++++++++++++--
2 files changed, 54 insertions(+), 13 deletions(-)
diff --git
a/activemq-client/src/main/java/org/apache/activemq/transport/FrameSizeLimitedFilterInputStream.java
b/activemq-client/src/main/java/org/apache/activemq/transport/FrameSizeLimitedFilterInputStream.java
index 419d31561a..09818e19b5 100644
---
a/activemq-client/src/main/java/org/apache/activemq/transport/FrameSizeLimitedFilterInputStream.java
+++
b/activemq-client/src/main/java/org/apache/activemq/transport/FrameSizeLimitedFilterInputStream.java
@@ -151,7 +151,8 @@ public class FrameSizeLimitedFilterInputStream extends
InputStream {
final int read = stream.read();
- reduceAvailable(1);
+ // if -1 then the stream is done
+ reduceAvailable(read >= 0 ? 1 : -1);
return read;
}
@@ -165,16 +166,25 @@ public class FrameSizeLimitedFilterInputStream extends
InputStream {
public int read(byte[] b, int off, int len) throws IOException {
Objects.requireNonNull(stream, "The stream wrapper has not been bound
to a source input stream");
- // It is technically permissible for this method to read up to
available
- // bytes if the length is greater than that but it is likely not going
to
- // result in outcomes we can predict as easily so for now this is
limited
- // and just throws for anything over available bytes. This could be
changed
- // to call a read using Math.min(availableBytes, length) but what could
- // happen is we get into a read loop where we endlessly return end of
stream
- // which won't send the signal that a read past the max limit was
triggered.
- validateAvailable(len, availableBytes);
+ // If length is 0, this method is supposed to just return 0 with
+ // no bytes being read
+ if (len == 0) {
+ return 0;
+ }
+
+ final int toRead;
+ if (availableBytes > 0) {
+ // read the smaller of availableBytes or the length
+ // this method allows partial reads less than len
+ toRead = Math.min(len, availableBytes);
+ } else {
+ // we have no more remaining but there is data left so trigger
error
+ toRead = 1;
+ }
- return reduceAvailable(stream.read(b, off, len));
+ validateAvailable(toRead, availableBytes);
+
+ return reduceAvailable(stream.read(b, off, toRead));
}
@Override
@@ -230,6 +240,10 @@ public class FrameSizeLimitedFilterInputStream extends
InputStream {
}
private int reduceAvailable(int amount) throws IOException {
+ if (amount == -1) {
+ return -1; // Underlying says there are no more bytes
+ }
+
try {
availableBytes = Math.subtractExact(availableBytes, amount);
} catch (ArithmeticException e) {
diff --git
a/activemq-client/src/test/java/org/apache/activemq/transport/FrameSizeLimitedFilterInputStreamTest.java
b/activemq-client/src/test/java/org/apache/activemq/transport/FrameSizeLimitedFilterInputStreamTest.java
index 9be7eb50c6..b130c87468 100644
---
a/activemq-client/src/test/java/org/apache/activemq/transport/FrameSizeLimitedFilterInputStreamTest.java
+++
b/activemq-client/src/test/java/org/apache/activemq/transport/FrameSizeLimitedFilterInputStreamTest.java
@@ -152,6 +152,27 @@ public class FrameSizeLimitedFilterInputStreamTest {
}
assertThrows(IOException.class, () -> stream.read());
+ // docs say len of 0 just returns 0 and no attempt to read is made
+ assertEquals(0, stream.read(new byte[10], 0, 0));
+ }
+ }
+
+ @Test
+ public void testReadBytesStreamLessThanLimit() throws IOException {
+ final ByteArrayInputStream bais = new
ByteArrayInputStream(createPayload());
+
+ try (FrameSizeLimitedFilterInputStream stream = new
FrameSizeLimitedFilterInputStream(Integer.MAX_VALUE, bais)) {
+ for (int i = 0; i < DEFAULT_TEST_PAYLOAD_SIZE; ++i) {
+ assertEquals(i, stream.read());
+ }
+
+ // Stream should return -1 because we are finished but less than
the
+ // limit of the wrapper
+ assertEquals(-1, stream.read());
+ assertEquals(-1, stream.read(new byte[10]));
+ assertEquals(-1, stream.read(new byte[10], 0, 10));
+ // docs say len of 0 just returns 0
+ assertEquals(0, stream.read(new byte[10], 0, 0));
}
}
@@ -387,10 +408,16 @@ public class FrameSizeLimitedFilterInputStreamTest {
assertEquals(6, stream.available());
assertEquals(4, stream.read());
- assertThrows(IOException.class, () -> stream.read(new byte[10], 0,
10));
+ // partial read should work
+ byte[] data = new byte[10];
+ assertEquals(5, stream.read(data, 0, 10));
+ for (int i = 0; i < 5; i++) {
+ assertEquals(i + 5, data[i]);
+ }
- assertEquals(5, stream.available());
- assertEquals(5, stream.read());
+ assertEquals(0, stream.available());
+ // availableBytes has been exhausted so this will error
+ assertThrows(IOException.class, stream::read);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact