duongkame commented on code in PR #971:
URL: https://github.com/apache/ratis/pull/971#discussion_r1406929832
##########
ratis-grpc/src/main/java/org/apache/ratis/grpc/util/ZeroCopyMessageMarshaller.java:
##########
@@ -74,55 +95,116 @@ public InputStream stream(T value) {
@Override
public T parse(InputStream stream) {
+ final T message;
try {
- if (stream instanceof KnownLength
- && stream instanceof Detachable
- && stream instanceof HasByteBuffer
- && ((HasByteBuffer) stream).byteBufferSupported()) {
- int size = stream.available();
- // Stream is now detached here and should be closed later.
- InputStream detachedStream = ((Detachable) stream).detach();
- try {
- // This mark call is to keep buffer while traversing buffers using
skip.
- detachedStream.mark(size);
- List<ByteString> byteStrings = new LinkedList<>();
- while (detachedStream.available() != 0) {
- ByteBuffer buffer = ((HasByteBuffer)
detachedStream).getByteBuffer();
- byteStrings.add(UnsafeByteOperations.unsafeWrap(buffer));
- detachedStream.skip(buffer.remaining());
- }
- detachedStream.reset();
- CodedInputStream codedInputStream =
ByteString.copyFrom(byteStrings).newCodedInput();
- codedInputStream.enableAliasing(true);
- codedInputStream.setSizeLimit(Integer.MAX_VALUE);
- // fast path (no memory copy)
- T message;
- try {
- message = parseFrom(codedInputStream);
- } catch (InvalidProtocolBufferException ipbe) {
- throw Status.INTERNAL
- .withDescription("Invalid protobuf byte sequence")
- .withCause(ipbe)
- .asRuntimeException();
- }
- unclosedStreams.put(message, detachedStream);
- detachedStream = null;
- return message;
- } finally {
- if (detachedStream != null) {
- detachedStream.close();
- }
- }
- }
+ // fast path (no memory copy)
+ message = parseZeroCopy(stream);
} catch (IOException e) {
- throw new RuntimeException(e);
+ throw Status.INTERNAL
+ .withDescription("Failed to parseZeroCopy")
+ .withCause(e)
+ .asRuntimeException();
}
+ if (message != null) {
+ zeroCopyCount.accept(message);
+ return message;
+ }
+
// slow path
- return marshaller.parse(stream);
+ final T copied = marshaller.parse(stream);
+ nonZeroCopyCount.accept(copied);
+ return copied;
+ }
+
+ /** Release the underlying buffers in the given message. */
+ public void release(T message) {
Review Comment:
This could impact scenarios where releasing messages is not straightforward,
e.g. in GrpcClientProtocolService or GrpcServerProtocol service, we can't
release the protobuf objects right in `onNext()` but have to keep them in a
separate component, let's call it the `ZeroCopyStreamCleaner`, and release
stream handles based on events like log applied/comitted.
In such scenarios, the difference is.
- With the new approach, we need to keep the protobuf objects in
`ZeroCopyStreamCleaner` and the popStream in `ZeroCopyMessageMarshaller`.
- Or we can just pop the input stream handle and keep it in
`ZeroCopyStreamCleaner`. The protobuf object can be GCed right after onNext.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]