This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 12ed0048c RATIS-1932. Create zero-copy Marshaller. (#962)
12ed0048c is described below
commit 12ed0048cbe2194865907da14d5ceeda26dbbdb6
Author: Duong Nguyen <[email protected]>
AuthorDate: Mon Nov 6 22:55:12 2023 -0800
RATIS-1932. Create zero-copy Marshaller. (#962)
---
.../ratis/grpc/util/ZeroCopyMessageMarshaller.java | 142 +++++++++++++++++++++
.../ratis/grpc/util/ZeroCopyReadinessChecker.java | 74 +++++++++++
2 files changed, 216 insertions(+)
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/ZeroCopyMessageMarshaller.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/ZeroCopyMessageMarshaller.java
new file mode 100644
index 000000000..a47920fc5
--- /dev/null
+++
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/ZeroCopyMessageMarshaller.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.util;
+
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.com.google.protobuf.CodedInputStream;
+import
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.ratis.thirdparty.com.google.protobuf.MessageLite;
+import org.apache.ratis.thirdparty.com.google.protobuf.Parser;
+import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations;
+import org.apache.ratis.thirdparty.io.grpc.Detachable;
+import org.apache.ratis.thirdparty.io.grpc.HasByteBuffer;
+import org.apache.ratis.thirdparty.io.grpc.KnownLength;
+import
org.apache.ratis.thirdparty.io.grpc.MethodDescriptor.PrototypeMarshaller;
+import org.apache.ratis.thirdparty.io.grpc.Status;
+import org.apache.ratis.thirdparty.io.grpc.protobuf.lite.ProtoLiteUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Custom gRPC marshaller to use zero memory copy feature of gRPC when
deserializing messages. This
+ * achieves zero-copy by deserializing proto messages pointing to the buffers
in the input stream to
+ * avoid memory copy so stream should live as long as the message can be
referenced. Hence, it
+ * exposes the input stream to applications (through popStream) and
applications are responsible to
+ * close it when it's no longer needed. Otherwise, it'd cause memory leak.
+ */
+public class ZeroCopyMessageMarshaller<T extends MessageLite> implements
PrototypeMarshaller<T> {
+ private Map<T, InputStream> unclosedStreams =
+ Collections.synchronizedMap(new IdentityHashMap<>());
+ private final Parser<T> parser;
+ private final PrototypeMarshaller<T> marshaller;
+
+ public ZeroCopyMessageMarshaller(T defaultInstance) {
+ parser = (Parser<T>) defaultInstance.getParserForType();
+ marshaller = (PrototypeMarshaller<T>)
ProtoLiteUtils.marshaller(defaultInstance);
+ }
+
+ @Override
+ public Class<T> getMessageClass() {
+ return marshaller.getMessageClass();
+ }
+
+ @Override
+ public T getMessagePrototype() {
+ return marshaller.getMessagePrototype();
+ }
+
+ @Override
+ public InputStream stream(T value) {
+ return marshaller.stream(value);
+ }
+
+ @Override
+ public T parse(InputStream stream) {
+ try {
+ if (stream instanceof KnownLength
+ && stream instanceof Detachable
+ && stream instanceof HasByteBuffer
+ && ((HasByteBuffer) stream).byteBufferSupported()) {
+ int size = stream.available();
+ // Stream is now detached here and should be closed later.
+ InputStream detachedStream = ((Detachable) stream).detach();
+ try {
+ // This mark call is to keep buffer while traversing buffers using
skip.
+ detachedStream.mark(size);
+ List<ByteString> byteStrings = new LinkedList<>();
+ while (detachedStream.available() != 0) {
+ ByteBuffer buffer = ((HasByteBuffer)
detachedStream).getByteBuffer();
+ byteStrings.add(UnsafeByteOperations.unsafeWrap(buffer));
+ detachedStream.skip(buffer.remaining());
+ }
+ detachedStream.reset();
+ CodedInputStream codedInputStream =
ByteString.copyFrom(byteStrings).newCodedInput();
+ codedInputStream.enableAliasing(true);
+ codedInputStream.setSizeLimit(Integer.MAX_VALUE);
+ // fast path (no memory copy)
+ T message;
+ try {
+ message = parseFrom(codedInputStream);
+ } catch (InvalidProtocolBufferException ipbe) {
+ throw Status.INTERNAL
+ .withDescription("Invalid protobuf byte sequence")
+ .withCause(ipbe)
+ .asRuntimeException();
+ }
+ unclosedStreams.put(message, detachedStream);
+ detachedStream = null;
+ return message;
+ } finally {
+ if (detachedStream != null) {
+ detachedStream.close();
+ }
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ // slow path
+ return marshaller.parse(stream);
+ }
+
+ private T parseFrom(CodedInputStream stream) throws
InvalidProtocolBufferException {
+ T message = parser.parseFrom(stream);
+ try {
+ stream.checkLastTagWas(0);
+ return message;
+ } catch (InvalidProtocolBufferException e) {
+ e.setUnfinishedMessage(message);
+ throw e;
+ }
+ }
+
+ /**
+ * Application needs to call this function to get the stream for the message
and
+ * call stream.close() function to return it to the pool.
+ */
+ public InputStream popStream(T message) {
+ return unclosedStreams.remove(message);
+ }
+}
\ No newline at end of file
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/ZeroCopyReadinessChecker.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/ZeroCopyReadinessChecker.java
new file mode 100644
index 000000000..5a20d83e9
--- /dev/null
+++
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/ZeroCopyReadinessChecker.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.util;
+
+import org.apache.ratis.thirdparty.com.google.protobuf.MessageLite;
+import org.apache.ratis.thirdparty.io.grpc.KnownLength;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Checker to test whether a zero-copy masharller is available from the
versions of gRPC and
+ * Protobuf.
+ */
+public final class ZeroCopyReadinessChecker {
+ static final Logger LOG =
LoggerFactory.getLogger(ZeroCopyReadinessChecker.class);
+ private static final boolean IS_ZERO_COPY_READY;
+
+ private ZeroCopyReadinessChecker() {
+ }
+
+ static {
+ // Check whether io.grpc.Detachable exists?
+ boolean detachableClassExists = false;
+ try {
+ // Try to load Detachable interface in the package where KnownLength is
in.
+ // This can be done directly by looking up io.grpc.Detachable but rather
+ // done indirectly to handle the case where gRPC is being shaded in a
+ // different package.
+ String knownLengthClassName = KnownLength.class.getName();
+ String detachableClassName =
+ knownLengthClassName.substring(0,
knownLengthClassName.lastIndexOf('.') + 1)
+ + "Detachable";
+ // check if class exists.
+ Class.forName(detachableClassName);
+ detachableClassExists = true;
+ } catch (ClassNotFoundException ex) {
+ LOG.debug("io.grpc.Detachable not found", ex);
+ }
+ // Check whether com.google.protobuf.UnsafeByteOperations exists?
+ boolean unsafeByteOperationsClassExists = false;
+ try {
+ // Same above
+ String messageLiteClassName = MessageLite.class.getName();
+ String unsafeByteOperationsClassName =
+ messageLiteClassName.substring(0,
messageLiteClassName.lastIndexOf('.') + 1)
+ + "UnsafeByteOperations";
+ // check if class exists.
+ Class.forName(unsafeByteOperationsClassName);
+ unsafeByteOperationsClassExists = true;
+ } catch (ClassNotFoundException ex) {
+ LOG.debug("com.google.protobuf.UnsafeByteOperations not found", ex);
+ }
+ IS_ZERO_COPY_READY = detachableClassExists &&
unsafeByteOperationsClassExists;
+ }
+
+ public static boolean isReady() {
+ return IS_ZERO_COPY_READY;
+ }
+}