dao-jun commented on code in PR #22760:
URL: https://github.com/apache/pulsar/pull/22760#discussion_r1610518281
##########
pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java:
##########
@@ -122,38 +123,22 @@ public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise)
// ByteBuf are automatically released after a write. If the
ByteBufPair ref count is increased and it
// gets written multiple times, the individual buffers
refcount should be reflected as well.
try {
- ctx.write(b.getFirst().retainedDuplicate(),
ctx.voidPromise());
- ctx.write(b.getSecond().retainedDuplicate(), promise);
+ ctx.write(readOnlyRetainedDuplicate(b.getFirst()),
ctx.voidPromise());
+ ctx.write(readOnlyRetainedDuplicate(b.getSecond()),
promise);
} finally {
ReferenceCountUtil.safeRelease(b);
}
} else {
ctx.write(msg, promise);
}
}
- }
-
- @Sharable
- @SuppressWarnings("checkstyle:JavadocType")
- public static class CopyingEncoder extends ChannelOutboundHandlerAdapter {
- @Override
- public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) throws Exception {
- if (msg instanceof ByteBufPair) {
- ByteBufPair b = (ByteBufPair) msg;
- // Some handlers in the pipeline will modify the bytebufs
passed in to them (i.e. SslHandler).
- // For these handlers, we need to pass a copy of the buffers
as the source buffers may be cached
- // for multiple requests.
- try {
- ctx.write(b.getFirst().copy(), ctx.voidPromise());
- ctx.write(b.getSecond().copy(), promise);
- } finally {
- ReferenceCountUtil.safeRelease(b);
- }
- } else {
- ctx.write(msg, promise);
- }
+ // .asReadOnly() is needed to prevent SslHandler from modifying the
input buffers.
+ private static ByteBuf readOnlyRetainedDuplicate(ByteBuf buf) {
+ // If the buffer is already read-only, .asReadOnly() will return
the same buffer.
+ // That's why the additional .retainedDuplicate() is needed to
ensure that the returned buffer
+ // has independent readIndex and writeIndex.
+ return buf.asReadOnly().retainedDuplicate();
Review Comment:
Oh, I checked the source code, it looks a little strange that I didn't
understand at the first time.
```java
private static boolean attemptCopyToCumulation(ByteBuf cumulation,
ByteBuf next, int wrapDataSize) {
final int inReadableBytes = next.readableBytes();
final int cumulationCapacity = cumulation.capacity();
if (wrapDataSize - cumulation.readableBytes() >= inReadableBytes &&
// Avoid using the same buffer if next's data would make
cumulation exceed the wrapDataSize.
// Only copy if there is enough space available and the
capacity is large enough, and attempt to
// resize if the capacity is small.
(cumulation.isWritable(inReadableBytes) &&
cumulationCapacity >= wrapDataSize ||
cumulationCapacity < wrapDataSize &&
ensureWritableSuccess(cumulation.ensureWritable(inReadableBytes, false)))) {
cumulation.writeBytes(next);
next.release();
return true;
}
return false;
}
```
We should return false immediately if `cumulation` is not writable
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]