This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a703900af HDDS-10037. Reduce buffer copying in OMRatisHelper. (#5894)
2a703900af is described below

commit 2a703900af08e4598ba037c7b85eb22dbd4c3998
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Jan 1 07:20:05 2024 -0800

    HDDS-10037. Reduce buffer copying in OMRatisHelper. (#5894)
---
 .../hdds/utils/io/ByteBufferInputStream.java       | 75 ++++++++++++++++++++++
 .../hadoop/ozone/om/helpers/OMRatisHelper.java     | 68 ++++++++++----------
 .../ozone/om/ratis/OzoneManagerRatisServer.java    |  3 +-
 3 files changed, 109 insertions(+), 37 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/io/ByteBufferInputStream.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/io/ByteBufferInputStream.java
new file mode 100644
index 0000000000..d2301bed2c
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/io/ByteBufferInputStream.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.utils.io;
+
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+/** Warp a {@link ByteBuffer} as an {@link InputStream}. */
+public class ByteBufferInputStream extends InputStream {
+  private final ByteBuffer buffer;
+
+  public ByteBufferInputStream(ByteBuffer buffer) {
+    this.buffer = buffer.asReadOnlyBuffer();
+  }
+
+  @Override
+  public int read() throws IOException {
+    if (!buffer.hasRemaining()) {
+      return -1;
+    }
+    return buffer.get() & 0xFF;
+  }
+
+  @Override
+  public int read(@Nonnull byte[] array, int offset, int length) throws 
IOException {
+    assertArrayIndex(array, offset, length);
+
+    if (length == 0) {
+      return 0;
+    }
+    final int remaining = buffer.remaining();
+    if (remaining <= 0) {
+      return -1;
+    }
+    final int min = Math.min(remaining, length);
+    buffer.get(array, offset, min);
+    return min;
+  }
+
+  static void assertArrayIndex(@Nonnull byte[] array, int offset, int length) {
+    Objects.requireNonNull(array, "array == null");
+    if (offset < 0) {
+      throw new ArrayIndexOutOfBoundsException("offset = " + offset + " < 0");
+    } else if (length < 0) {
+      throw new ArrayIndexOutOfBoundsException("length = " + length + " < 0");
+    }
+    final int end = offset + length;
+    if (end < 0) {
+      throw new ArrayIndexOutOfBoundsException(
+          "Overflow: offset+length > Integer.MAX_VALUE, offset=" + offset + ", 
length=" + length);
+    } else if (end > array.length) {
+      throw new ArrayIndexOutOfBoundsException(
+          "offset+length > array.length = " + array.length + ", offset=" + 
offset + ", length=" + length);
+    }
+  }
+}
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMRatisHelper.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMRatisHelper.java
index bd379c207a..eb7ce0f699 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMRatisHelper.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMRatisHelper.java
@@ -17,22 +17,23 @@
 
 package org.apache.hadoop.ozone.om.helpers;
 
-import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.TextFormat;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
-    .OMRequest;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
-    .OMResponse;
+import org.apache.hadoop.hdds.utils.io.ByteBufferInputStream;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
 /**
- * Ratis helper methods for OM Ratis server and client.
+ * Helper methods for converting between proto 2 (OM) and proto 3 (Ratis) 
messages.
  */
 public final class OMRatisHelper {
   private static final Logger LOG = LoggerFactory.getLogger(
@@ -41,50 +42,47 @@ public final class OMRatisHelper {
   private OMRatisHelper() {
   }
 
-  static RaftPeerId getRaftPeerId(String omId) {
-    return RaftPeerId.valueOf(omId);
-  }
-
+  /** Convert the given proto 2 request to a proto 3 {@link ByteString}. */
   public static ByteString convertRequestToByteString(OMRequest request) {
-    byte[] requestBytes = request.toByteArray();
-    return ByteString.copyFrom(requestBytes);
+    return 
UnsafeByteOperations.unsafeWrap(request.toByteString().asReadOnlyByteBuffer());
   }
 
-  public static OMRequest convertByteStringToOMRequest(ByteString byteString)
-      throws InvalidProtocolBufferException {
-    byte[] bytes = byteString.toByteArray();
-    return OMRequest.parseFrom(bytes);
+  /** Convert the given proto 3 {@link ByteString} to a proto 2 request. */
+  public static OMRequest convertByteStringToOMRequest(ByteString bytes) 
throws IOException {
+    final ByteBuffer buffer = bytes.asReadOnlyByteBuffer();
+    return OMRequest.parseFrom(new ByteBufferInputStream(buffer));
   }
 
+  /** Convert the given proto 2 response to a proto 3 {@link ByteString}. */
   public static Message convertResponseToMessage(OMResponse response) {
-    byte[] requestBytes = response.toByteArray();
-    return Message.valueOf(ByteString.copyFrom(requestBytes));
+    return () -> 
UnsafeByteOperations.unsafeWrap(response.toByteString().asReadOnlyByteBuffer());
   }
 
-  public static OMResponse getOMResponseFromRaftClientReply(
-      RaftClientReply reply) throws InvalidProtocolBufferException {
-    byte[] bytes = reply.getMessage().getContent().toByteArray();
-    return OMResponse.newBuilder(OMResponse.parseFrom(bytes))
+  /** Convert the given proto 3 {@link ByteString} to a proto 2 response. */
+  public static OMResponse convertByteStringToOMResponse(ByteString bytes) 
throws IOException {
+    final ByteBuffer buffer = bytes.asReadOnlyByteBuffer();
+    return OMResponse.parseFrom(new ByteBufferInputStream(buffer));
+  }
+
+  /** Convert the given reply with proto 3 {@link ByteString} to a proto 2 
response. */
+  public static OMResponse getOMResponseFromRaftClientReply(RaftClientReply 
reply) throws IOException {
+    final OMResponse response = 
convertByteStringToOMResponse(reply.getMessage().getContent());
+    if (reply.getReplierId().equals(response.getLeaderOMNodeId())) {
+      return response;
+    }
+    return OMResponse.newBuilder(response)
         .setLeaderOMNodeId(reply.getReplierId())
         .build();
   }
 
-  /**
-   * Convert StateMachineLogEntryProto to String.
-   * @param proto - {@link StateMachineLogEntryProto}
-   * @return String
-   */
+  /** Convert the given {@link StateMachineLogEntryProto} to a short {@link 
String}. */
   public static String smProtoToString(StateMachineLogEntryProto proto) {
-    StringBuilder builder = new StringBuilder();
     try {
-      builder.append(TextFormat.shortDebugString(
-          OMRatisHelper.convertByteStringToOMRequest(proto.getLogData())));
-
+      final OMRequest request = 
convertByteStringToOMRequest(proto.getLogData());
+      return TextFormat.shortDebugString(request);
     } catch (Throwable ex) {
       LOG.info("smProtoToString failed", ex);
-      builder.append("smProtoToString failed with");
-      builder.append(ex.getMessage());
+      return "Failed to smProtoToString: " + ex;
     }
-    return builder.toString();
   }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
index d6a5b3810c..e6a747b35a 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.ozone.om.ratis;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
-import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.ServiceException;
 
 import java.io.File;
@@ -526,7 +525,7 @@ public final class OzoneManagerRatisServer {
 
     try {
       return OMRatisHelper.getOMResponseFromRaftClientReply(reply);
-    } catch (InvalidProtocolBufferException ex) {
+    } catch (IOException ex) {
       if (ex.getMessage() != null) {
         throw new ServiceException(ex.getMessage(), ex);
       } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to