This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 2a703900af HDDS-10037. Reduce buffer copying in OMRatisHelper. (#5894)
2a703900af is described below
commit 2a703900af08e4598ba037c7b85eb22dbd4c3998
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Jan 1 07:20:05 2024 -0800
HDDS-10037. Reduce buffer copying in OMRatisHelper. (#5894)
---
.../hdds/utils/io/ByteBufferInputStream.java | 75 ++++++++++++++++++++++
.../hadoop/ozone/om/helpers/OMRatisHelper.java | 68 ++++++++++----------
.../ozone/om/ratis/OzoneManagerRatisServer.java | 3 +-
3 files changed, 109 insertions(+), 37 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/io/ByteBufferInputStream.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/io/ByteBufferInputStream.java
new file mode 100644
index 0000000000..d2301bed2c
--- /dev/null
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/io/ByteBufferInputStream.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.utils.io;
+
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+/** Warp a {@link ByteBuffer} as an {@link InputStream}. */
+public class ByteBufferInputStream extends InputStream {
+ private final ByteBuffer buffer;
+
+ public ByteBufferInputStream(ByteBuffer buffer) {
+ this.buffer = buffer.asReadOnlyBuffer();
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (!buffer.hasRemaining()) {
+ return -1;
+ }
+ return buffer.get() & 0xFF;
+ }
+
+ @Override
+ public int read(@Nonnull byte[] array, int offset, int length) throws
IOException {
+ assertArrayIndex(array, offset, length);
+
+ if (length == 0) {
+ return 0;
+ }
+ final int remaining = buffer.remaining();
+ if (remaining <= 0) {
+ return -1;
+ }
+ final int min = Math.min(remaining, length);
+ buffer.get(array, offset, min);
+ return min;
+ }
+
+ static void assertArrayIndex(@Nonnull byte[] array, int offset, int length) {
+ Objects.requireNonNull(array, "array == null");
+ if (offset < 0) {
+ throw new ArrayIndexOutOfBoundsException("offset = " + offset + " < 0");
+ } else if (length < 0) {
+ throw new ArrayIndexOutOfBoundsException("length = " + length + " < 0");
+ }
+ final int end = offset + length;
+ if (end < 0) {
+ throw new ArrayIndexOutOfBoundsException(
+ "Overflow: offset+length > Integer.MAX_VALUE, offset=" + offset + ",
length=" + length);
+ } else if (end > array.length) {
+ throw new ArrayIndexOutOfBoundsException(
+ "offset+length > array.length = " + array.length + ", offset=" +
offset + ", length=" + length);
+ }
+ }
+}
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMRatisHelper.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMRatisHelper.java
index bd379c207a..eb7ce0f699 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMRatisHelper.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMRatisHelper.java
@@ -17,22 +17,23 @@
package org.apache.hadoop.ozone.om.helpers;
-import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.TextFormat;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
- .OMRequest;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
- .OMResponse;
+import org.apache.hadoop.hdds.utils.io.ByteBufferInputStream;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
/**
- * Ratis helper methods for OM Ratis server and client.
+ * Helper methods for converting between proto 2 (OM) and proto 3 (Ratis)
messages.
*/
public final class OMRatisHelper {
private static final Logger LOG = LoggerFactory.getLogger(
@@ -41,50 +42,47 @@ public final class OMRatisHelper {
private OMRatisHelper() {
}
- static RaftPeerId getRaftPeerId(String omId) {
- return RaftPeerId.valueOf(omId);
- }
-
+ /** Convert the given proto 2 request to a proto 3 {@link ByteString}. */
public static ByteString convertRequestToByteString(OMRequest request) {
- byte[] requestBytes = request.toByteArray();
- return ByteString.copyFrom(requestBytes);
+ return
UnsafeByteOperations.unsafeWrap(request.toByteString().asReadOnlyByteBuffer());
}
- public static OMRequest convertByteStringToOMRequest(ByteString byteString)
- throws InvalidProtocolBufferException {
- byte[] bytes = byteString.toByteArray();
- return OMRequest.parseFrom(bytes);
+ /** Convert the given proto 3 {@link ByteString} to a proto 2 request. */
+ public static OMRequest convertByteStringToOMRequest(ByteString bytes)
throws IOException {
+ final ByteBuffer buffer = bytes.asReadOnlyByteBuffer();
+ return OMRequest.parseFrom(new ByteBufferInputStream(buffer));
}
+ /** Convert the given proto 2 response to a proto 3 {@link ByteString}. */
public static Message convertResponseToMessage(OMResponse response) {
- byte[] requestBytes = response.toByteArray();
- return Message.valueOf(ByteString.copyFrom(requestBytes));
+ return () ->
UnsafeByteOperations.unsafeWrap(response.toByteString().asReadOnlyByteBuffer());
}
- public static OMResponse getOMResponseFromRaftClientReply(
- RaftClientReply reply) throws InvalidProtocolBufferException {
- byte[] bytes = reply.getMessage().getContent().toByteArray();
- return OMResponse.newBuilder(OMResponse.parseFrom(bytes))
+ /** Convert the given proto 3 {@link ByteString} to a proto 2 response. */
+ public static OMResponse convertByteStringToOMResponse(ByteString bytes)
throws IOException {
+ final ByteBuffer buffer = bytes.asReadOnlyByteBuffer();
+ return OMResponse.parseFrom(new ByteBufferInputStream(buffer));
+ }
+
+ /** Convert the given reply with proto 3 {@link ByteString} to a proto 2
response. */
+ public static OMResponse getOMResponseFromRaftClientReply(RaftClientReply
reply) throws IOException {
+ final OMResponse response =
convertByteStringToOMResponse(reply.getMessage().getContent());
+ if (reply.getReplierId().equals(response.getLeaderOMNodeId())) {
+ return response;
+ }
+ return OMResponse.newBuilder(response)
.setLeaderOMNodeId(reply.getReplierId())
.build();
}
- /**
- * Convert StateMachineLogEntryProto to String.
- * @param proto - {@link StateMachineLogEntryProto}
- * @return String
- */
+ /** Convert the given {@link StateMachineLogEntryProto} to a short {@link
String}. */
public static String smProtoToString(StateMachineLogEntryProto proto) {
- StringBuilder builder = new StringBuilder();
try {
- builder.append(TextFormat.shortDebugString(
- OMRatisHelper.convertByteStringToOMRequest(proto.getLogData())));
-
+ final OMRequest request =
convertByteStringToOMRequest(proto.getLogData());
+ return TextFormat.shortDebugString(request);
} catch (Throwable ex) {
LOG.info("smProtoToString failed", ex);
- builder.append("smProtoToString failed with");
- builder.append(ex.getMessage());
+ return "Failed to smProtoToString: " + ex;
}
- return builder.toString();
}
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
index d6a5b3810c..e6a747b35a 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.ozone.om.ratis;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
-import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.ServiceException;
import java.io.File;
@@ -526,7 +525,7 @@ public final class OzoneManagerRatisServer {
try {
return OMRatisHelper.getOMResponseFromRaftClientReply(reply);
- } catch (InvalidProtocolBufferException ex) {
+ } catch (IOException ex) {
if (ex.getMessage() != null) {
throw new ServiceException(ex.getMessage(), ex);
} else {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]