adoroszlai commented on code in PR #971:
URL: https://github.com/apache/ratis/pull/971#discussion_r1422648830
##########
ratis-grpc/src/main/java/org/apache/ratis/grpc/util/ZeroCopyMessageMarshaller.java:
##########
@@ -74,55 +95,116 @@ public InputStream stream(T value) {
@Override
public T parse(InputStream stream) {
+ final T message;
try {
- if (stream instanceof KnownLength
- && stream instanceof Detachable
- && stream instanceof HasByteBuffer
- && ((HasByteBuffer) stream).byteBufferSupported()) {
- int size = stream.available();
- // Stream is now detached here and should be closed later.
- InputStream detachedStream = ((Detachable) stream).detach();
- try {
- // This mark call is to keep buffer while traversing buffers using
skip.
- detachedStream.mark(size);
- List<ByteString> byteStrings = new LinkedList<>();
- while (detachedStream.available() != 0) {
- ByteBuffer buffer = ((HasByteBuffer)
detachedStream).getByteBuffer();
- byteStrings.add(UnsafeByteOperations.unsafeWrap(buffer));
- detachedStream.skip(buffer.remaining());
- }
- detachedStream.reset();
- CodedInputStream codedInputStream =
ByteString.copyFrom(byteStrings).newCodedInput();
- codedInputStream.enableAliasing(true);
- codedInputStream.setSizeLimit(Integer.MAX_VALUE);
- // fast path (no memory copy)
- T message;
- try {
- message = parseFrom(codedInputStream);
- } catch (InvalidProtocolBufferException ipbe) {
- throw Status.INTERNAL
- .withDescription("Invalid protobuf byte sequence")
- .withCause(ipbe)
- .asRuntimeException();
- }
- unclosedStreams.put(message, detachedStream);
- detachedStream = null;
- return message;
- } finally {
- if (detachedStream != null) {
- detachedStream.close();
- }
- }
- }
+ // fast path (no memory copy)
+ message = parseZeroCopy(stream);
} catch (IOException e) {
- throw new RuntimeException(e);
+ throw Status.INTERNAL
+ .withDescription("Failed to parseZeroCopy")
+ .withCause(e)
+ .asRuntimeException();
}
+ if (message != null) {
+ zeroCopyCount.accept(message);
+ return message;
+ }
+
// slow path
- return marshaller.parse(stream);
+ final T copied = marshaller.parse(stream);
+ nonZeroCopyCount.accept(copied);
+ return copied;
+ }
+
+ /** Release the underlying buffers in the given message. */
+ public void release(T message) {
Review Comment:
@szetszwo I would like to propose adding back the `popStream()` method for
compatibility. Ideally upgrade from 3.0.0 to 3.0.1 shouldn't require code
change for users.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]