HADOOP-13483. Optimize IPC server protobuf decoding. Contributed by Daryn Sharp.
(cherry picked from commit 580a8334963709e728ed677c815fb7fef9bca70e)
Conflicts:
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a4292fe6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a4292fe6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a4292fe6
Branch: refs/heads/branch-2.8
Commit: a4292fe6187eeb9a159160983df98fcc38326188
Parents: 983d47e
Author: Kihwal Lee <[email protected]>
Authored: Wed Aug 3 14:12:02 2016 -0500
Committer: Kihwal Lee <[email protected]>
Committed: Thu Sep 1 16:00:55 2016 -0500
----------------------------------------------------------------------
.../apache/hadoop/ipc/ProtobufRpcEngine.java | 15 +-
.../java/org/apache/hadoop/ipc/RpcWritable.java | 184 +++++++++++++++++++
.../main/java/org/apache/hadoop/ipc/Server.java | 94 ++++------
.../org/apache/hadoop/ipc/TestRpcWritable.java | 129 +++++++++++++
4 files changed, 361 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4292fe6/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
----------------------------------------------------------------------
diff --git
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
index ea629b1..cfdf8ab 100644
---
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
+++
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.RPC.RpcInvoker;
+import org.apache.hadoop.ipc.RpcWritable;
import
org.apache.hadoop.ipc.protobuf.ProtobufRpcEngineProtos.RequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
@@ -68,7 +69,7 @@ public class ProtobufRpcEngine implements RpcEngine {
static { // Register the rpcRequest deserializer for WritableRpcEngine
org.apache.hadoop.ipc.Server.registerProtocolEngine(
- RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWrapper.class,
+ RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcWritable.Buffer.class,
new Server.ProtoBufRpcInvoker());
}
@@ -612,8 +613,9 @@ public class ProtobufRpcEngine implements RpcEngine {
*/
public Writable call(RPC.Server server, String protocol,
Writable writableRequest, long receiveTime) throws Exception {
- RpcRequestWrapper request = (RpcRequestWrapper) writableRequest;
- RequestHeaderProto rpcRequest = request.requestHeader;
+ RpcWritable.Buffer request = (RpcWritable.Buffer) writableRequest;
+ RequestHeaderProto rpcRequest =
+ request.getValue(RequestHeaderProto.getDefaultInstance());
String methodName = rpcRequest.getMethodName();
String protoName = rpcRequest.getDeclaringClassProtocolName();
long clientVersion = rpcRequest.getClientProtocolVersion();
@@ -632,9 +634,8 @@ public class ProtobufRpcEngine implements RpcEngine {
throw new RpcNoSuchMethodException(msg);
}
Message prototype = service.getRequestPrototype(methodDescriptor);
- Message param = prototype.newBuilderForType()
- .mergeFrom(request.theRequestRead).build();
-
+ Message param = request.getValue(prototype);
+
Message result;
long startTime = Time.now();
int qTime = (int) (startTime - receiveTime);
@@ -663,7 +664,7 @@ public class ProtobufRpcEngine implements RpcEngine {
exception.getClass().getSimpleName();
server.updateMetrics(detailedMetricsName, qTime, processingTime);
}
- return new RpcResponseWrapper(result);
+ return RpcWritable.wrap(result);
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4292fe6/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcWritable.java
----------------------------------------------------------------------
diff --git
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcWritable.java
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcWritable.java
new file mode 100644
index 0000000..5125939
--- /dev/null
+++
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcWritable.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.Message;
+
[email protected]
+public abstract class RpcWritable implements Writable {
+
+ static RpcWritable wrap(Object o) {
+ if (o instanceof RpcWritable) {
+ return (RpcWritable)o;
+ } else if (o instanceof Message) {
+ return new ProtobufWrapper((Message)o);
+ } else if (o instanceof Writable) {
+ return new WritableWrapper((Writable)o);
+ }
+ throw new IllegalArgumentException("Cannot wrap " + o.getClass());
+ }
+
+ // don't support old inefficient Writable methods.
+ @Override
+ public final void readFields(DataInput in) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public final void write(DataOutput out) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ // methods optimized for reduced intermediate byte[] allocations.
+ abstract void writeTo(ResponseBuffer out) throws IOException;
+ abstract <T> T readFrom(ByteBuffer bb) throws IOException;
+
+ // adapter for Writables.
+ static class WritableWrapper extends RpcWritable {
+ private final Writable writable;
+
+ WritableWrapper(Writable writable) {
+ this.writable = writable;
+ }
+
+ @Override
+ public void writeTo(ResponseBuffer out) throws IOException {
+ writable.write(out);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ <T> T readFrom(ByteBuffer bb) throws IOException {
+ // create a stream that may consume up to the entire ByteBuffer.
+ DataInputStream in = new DataInputStream(new ByteArrayInputStream(
+ bb.array(), bb.position() + bb.arrayOffset(), bb.remaining()));
+ try {
+ writable.readFields(in);
+ } finally {
+ // advance over the bytes read.
+ bb.position(bb.limit() - in.available());
+ }
+ return (T)writable;
+ }
+ }
+
+ // adapter for Protobufs.
+ static class ProtobufWrapper extends RpcWritable {
+ private Message message;
+
+ ProtobufWrapper(Message message) {
+ this.message = message;
+ }
+
+ @Override
+ void writeTo(ResponseBuffer out) throws IOException {
+ int length = message.getSerializedSize();
+ length += CodedOutputStream.computeRawVarint32Size(length);
+ out.ensureCapacity(length);
+ message.writeDelimitedTo(out);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ <T> T readFrom(ByteBuffer bb) throws IOException {
+ // using the parser with a byte[]-backed coded input stream is the
+ // most efficient way to deserialize a protobuf. it has a direct
+ // path to the PB ctor that doesn't create multi-layered streams
+ // that internally buffer.
+ CodedInputStream cis = CodedInputStream.newInstance(
+ bb.array(), bb.position() + bb.arrayOffset(), bb.remaining());
+ try {
+ cis.pushLimit(cis.readRawVarint32());
+ message = message.getParserForType().parseFrom(cis);
+ cis.checkLastTagWas(0);
+ } finally {
+ // advance over the bytes read.
+ bb.position(bb.position() + cis.getTotalBytesRead());
+ }
+ return (T)message;
+ }
+ }
+
+ // adapter to allow decoding of writables and protobufs from a byte buffer.
+ static class Buffer extends RpcWritable {
+ private ByteBuffer bb;
+
+ static Buffer wrap(ByteBuffer bb) {
+ return new Buffer(bb);
+ }
+
+ Buffer() {}
+
+ Buffer(ByteBuffer bb) {
+ this.bb = bb;
+ }
+
+ @Override
+ void writeTo(ResponseBuffer out) throws IOException {
+ out.ensureCapacity(bb.remaining());
+ out.write(bb.array(), bb.position() + bb.arrayOffset(), bb.remaining());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ <T> T readFrom(ByteBuffer bb) throws IOException {
+ // effectively consume the rest of the buffer from the callers
+ // perspective.
+ this.bb = bb.slice();
+ bb.limit(bb.position());
+ return (T)this;
+ }
+
+ public <T> T newInstance(Class<T> valueClass,
+ Configuration conf) throws IOException {
+ T instance;
+ try {
+ // this is much faster than ReflectionUtils!
+ instance = valueClass.newInstance();
+ if (instance instanceof Configurable) {
+ ((Configurable)instance).setConf(conf);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return getValue(instance);
+ }
+
+ public <T> T getValue(T value) throws IOException {
+ return RpcWritable.wrap(value).readFrom(bb);
+ }
+
+ int remaining() {
+ return bb.remaining();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4292fe6/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
----------------------------------------------------------------------
diff --git
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index a0402c2..d7a2673c 100644
---
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -25,7 +25,6 @@ import static org.apache.hadoop.ipc.RpcConstants.PING_CALL_ID;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
@@ -82,8 +81,6 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseWrapper;
-import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcWrapper;
import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.RPC.VersionMismatch;
import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
@@ -113,7 +110,6 @@ import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.ProtoUtil;
-import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.htrace.core.SpanId;
@@ -122,9 +118,7 @@ import org.apache.htrace.core.Tracer;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
-import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.Message;
-import com.google.protobuf.Message.Builder;
/** An abstract IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on
@@ -1345,6 +1339,7 @@ public abstract class Server {
* A WrappedRpcServerException that is suppressed altogether
* for the purposes of logging.
*/
+ @SuppressWarnings("serial")
private static class WrappedRpcServerExceptionSuppressed
extends WrappedRpcServerException {
public WrappedRpcServerExceptionSuppressed(
@@ -1474,10 +1469,10 @@ public abstract class Server {
}
}
- private void saslReadAndProcess(DataInputStream dis) throws
+ private void saslReadAndProcess(RpcWritable.Buffer buffer) throws
WrappedRpcServerException, IOException, InterruptedException {
final RpcSaslProto saslMessage =
- decodeProtobufFromStream(RpcSaslProto.newBuilder(), dis);
+ getMessage(RpcSaslProto.getDefaultInstance(), buffer);
switch (saslMessage.getState()) {
case WRAP: {
if (!saslContextEstablished || !useWrap) {
@@ -1675,7 +1670,7 @@ public abstract class Server {
RpcConstants.INVALID_RETRY_COUNT, null, this);
setupResponse(saslCall,
RpcStatusProto.SUCCESS, null,
- new RpcResponseWrapper(message), null, null);
+ RpcWritable.wrap(message), null, null);
saslCall.sendResponse();
}
@@ -1780,7 +1775,7 @@ public abstract class Server {
dataLengthBuffer.clear();
data.flip();
boolean isHeaderRead = connectionContextRead;
- processOneRpc(data.array());
+ processOneRpc(data);
data = null;
if (!isHeaderRead) {
continue;
@@ -1897,7 +1892,7 @@ public abstract class Server {
* @throws WrappedRpcServerException - if the header cannot be
* deserialized, or the user is not authorized
*/
- private void processConnectionContext(DataInputStream dis)
+ private void processConnectionContext(RpcWritable.Buffer buffer)
throws WrappedRpcServerException {
// allow only one connection context during a session
if (connectionContextRead) {
@@ -1905,8 +1900,7 @@ public abstract class Server {
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
"Connection context already processed");
}
- connectionContext = decodeProtobufFromStream(
- IpcConnectionContextProto.newBuilder(), dis);
+ connectionContext =
getMessage(IpcConnectionContextProto.getDefaultInstance(), buffer);
protocolName = connectionContext.hasProtocol() ? connectionContext
.getProtocol() : null;
@@ -1981,7 +1975,7 @@ public abstract class Server {
if (unwrappedData.remaining() == 0) {
unwrappedDataLengthBuffer.clear();
unwrappedData.flip();
- processOneRpc(unwrappedData.array());
+ processOneRpc(unwrappedData);
unwrappedData = null;
}
}
@@ -1997,31 +1991,30 @@ public abstract class Server {
* the client that does not require verbose logging by the
* Listener thread
* @throws InterruptedException
- */
- private void processOneRpc(byte[] buf)
+ */
+ private void processOneRpc(ByteBuffer bb)
throws IOException, WrappedRpcServerException, InterruptedException {
int callId = -1;
int retry = RpcConstants.INVALID_RETRY_COUNT;
try {
- final DataInputStream dis =
- new DataInputStream(new ByteArrayInputStream(buf));
+ final RpcWritable.Buffer buffer = RpcWritable.Buffer.wrap(bb);
final RpcRequestHeaderProto header =
- decodeProtobufFromStream(RpcRequestHeaderProto.newBuilder(), dis);
+ getMessage(RpcRequestHeaderProto.getDefaultInstance(), buffer);
callId = header.getCallId();
retry = header.getRetryCount();
if (LOG.isDebugEnabled()) {
LOG.debug(" got #" + callId);
}
checkRpcHeaders(header);
-
+
if (callId < 0) { // callIds typically used during connection setup
- processRpcOutOfBandRequest(header, dis);
+ processRpcOutOfBandRequest(header, buffer);
} else if (!connectionContextRead) {
throw new WrappedRpcServerException(
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
"Connection context not established");
} else {
- processRpcRequest(header, dis);
+ processRpcRequest(header, buffer);
}
} catch (WrappedRpcServerException wrse) { // inform client of error
Throwable ioe = wrse.getCause();
@@ -2074,7 +2067,7 @@ public abstract class Server {
* @throws InterruptedException
*/
private void processRpcRequest(RpcRequestHeaderProto header,
- DataInputStream dis) throws WrappedRpcServerException,
+ RpcWritable.Buffer buffer) throws WrappedRpcServerException,
InterruptedException {
Class<? extends Writable> rpcRequestClass =
getRpcRequestWrapper(header.getRpcKind());
@@ -2088,8 +2081,7 @@ public abstract class Server {
}
Writable rpcRequest;
try { //Read the rpc request
- rpcRequest = ReflectionUtils.newInstance(rpcRequestClass, conf);
- rpcRequest.readFields(dis);
+ rpcRequest = buffer.newInstance(rpcRequestClass, conf);
} catch (Throwable t) { // includes runtime exception from newInstance
LOG.warn("Unable to read call parameters for client " +
getHostAddress() + "on connection protocol " +
@@ -2169,8 +2161,8 @@ public abstract class Server {
* @throws InterruptedException
*/
private void processRpcOutOfBandRequest(RpcRequestHeaderProto header,
- DataInputStream dis) throws WrappedRpcServerException, IOException,
- InterruptedException {
+ RpcWritable.Buffer buffer) throws WrappedRpcServerException,
+ IOException, InterruptedException {
final int callId = header.getCallId();
if (callId == CONNECTION_CONTEXT_CALL_ID) {
// SASL must be established prior to connection context
@@ -2180,7 +2172,7 @@ public abstract class Server {
"Connection header sent during SASL negotiation");
}
// read and authorize the user
- processConnectionContext(dis);
+ processConnectionContext(buffer);
} else if (callId == AuthProtocol.SASL.callId) {
// if client was switched to simple, ignore first SASL message
if (authProtocol != AuthProtocol.SASL) {
@@ -2188,7 +2180,7 @@ public abstract class Server {
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
"SASL protocol not requested by client");
}
- saslReadAndProcess(dis);
+ saslReadAndProcess(buffer);
} else if (callId == PING_CALL_ID) {
LOG.debug("Received ping message");
} else {
@@ -2235,13 +2227,12 @@ public abstract class Server {
* @throws WrappedRpcServerException - deserialization failed
*/
@SuppressWarnings("unchecked")
- private <T extends Message> T decodeProtobufFromStream(Builder builder,
- DataInputStream dis) throws WrappedRpcServerException {
+ <T extends Message> T getMessage(Message message,
+ RpcWritable.Buffer buffer) throws WrappedRpcServerException {
try {
- builder.mergeDelimitedFrom(dis);
- return (T)builder.build();
+ return (T)buffer.getValue(message);
} catch (Exception ioe) {
- Class<?> protoClass = builder.getDefaultInstanceForType().getClass();
+ Class<?> protoClass = message.getClass();
throw new WrappedRpcServerException(
RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST,
"Error decoding " + protoClass.getSimpleName() + ": "+ ioe);
@@ -2632,25 +2623,20 @@ public abstract class Server {
private void setupResponse(Call call,
RpcResponseHeaderProto header, Writable rv) throws IOException {
ResponseBuffer buf = responseBuffer.get().reset();
- // adjust capacity on estimated length to reduce resizing copies
- int estimatedLen = header.getSerializedSize();
- estimatedLen += CodedOutputStream.computeRawVarint32Size(estimatedLen);
- // if it's not a wrapped protobuf, just let it grow on its own
- if (rv instanceof RpcWrapper) {
- estimatedLen += ((RpcWrapper)rv).getLength();
- }
- buf.ensureCapacity(estimatedLen);
- header.writeDelimitedTo(buf);
- if (rv != null) { // null for exceptions
- rv.write(buf);
- }
- call.setResponse(ByteBuffer.wrap(buf.toByteArray()));
- // Discard a large buf and reset it back to smaller size
- // to free up heap.
- if (buf.capacity() > maxRespSize) {
- LOG.warn("Large response size " + buf.size() + " for call "
- + call.toString());
- buf.setCapacity(INITIAL_RESP_BUF_SIZE);
+ try {
+ RpcWritable.wrap(header).writeTo(buf);
+ if (rv != null) {
+ RpcWritable.wrap(rv).writeTo(buf);
+ }
+ call.setResponse(ByteBuffer.wrap(buf.toByteArray()));
+ } finally {
+ // Discard a large buf and reset it back to smaller size
+ // to free up heap.
+ if (buf.capacity() > maxRespSize) {
+ LOG.warn("Large response size " + buf.size() + " for call "
+ + call.toString());
+ buf.setCapacity(INITIAL_RESP_BUF_SIZE);
+ }
}
}
@@ -2701,7 +2687,7 @@ public abstract class Server {
.setState(SaslState.WRAP)
.setToken(ByteString.copyFrom(token))
.build();
- setupResponse(call, saslHeader, new RpcResponseWrapper(saslMessage));
+ setupResponse(call, saslHeader, RpcWritable.wrap(saslMessage));
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4292fe6/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcWritable.java
----------------------------------------------------------------------
diff --git
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcWritable.java
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcWritable.java
new file mode 100644
index 0000000..837f579
--- /dev/null
+++
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcWritable.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto;
+import org.apache.hadoop.util.Time;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.protobuf.Message;
+
+public class TestRpcWritable {//extends TestRpcBase {
+
+ static Writable writable = new LongWritable(Time.now());
+ static Message message1 =
+ EchoRequestProto.newBuilder().setMessage("testing1").build();
+ static Message message2 =
+ EchoRequestProto.newBuilder().setMessage("testing2").build();
+
+ @Test
+ public void testWritableWrapper() throws IOException {
+ // serial writable in byte buffer
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ writable.write(new DataOutputStream(baos));
+ ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
+
+ // deserial
+ LongWritable actual = RpcWritable.wrap(new LongWritable())
+ .readFrom(bb);
+ Assert.assertEquals(writable, actual);
+ Assert.assertEquals(0, bb.remaining());
+ }
+
+ @Test
+ public void testProtobufWrapper() throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ message1.writeDelimitedTo(baos);
+ ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
+
+ Message actual = RpcWritable.wrap(EchoRequestProto.getDefaultInstance())
+ .readFrom(bb);
+ Assert.assertEquals(message1, actual);
+ Assert.assertEquals(0, bb.remaining());
+ }
+
+ @Test
+ public void testBufferWrapper() throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ message1.writeDelimitedTo(dos);
+ message2.writeDelimitedTo(dos);
+ writable.write(dos);
+
+ ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
+ RpcWritable.Buffer buf = RpcWritable.Buffer.wrap(bb);
+ Assert.assertEquals(baos.size(), bb.remaining());
+ Assert.assertEquals(baos.size(), buf.remaining());
+
+ Object actual = buf.getValue(EchoRequestProto.getDefaultInstance());
+ Assert.assertEquals(message1, actual);
+ Assert.assertTrue(bb.remaining() > 0);
+ Assert.assertEquals(bb.remaining(), buf.remaining());
+
+ actual = buf.getValue(EchoRequestProto.getDefaultInstance());
+ Assert.assertEquals(message2, actual);
+ Assert.assertTrue(bb.remaining() > 0);
+ Assert.assertEquals(bb.remaining(), buf.remaining());
+
+ actual = buf.newInstance(LongWritable.class, null);
+ Assert.assertEquals(writable, actual);
+ Assert.assertEquals(0, bb.remaining());
+ Assert.assertEquals(0, buf.remaining());
+ }
+
+ @Test
+ public void testBufferWrapperNested() throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ writable.write(dos);
+ message1.writeDelimitedTo(dos);
+ message2.writeDelimitedTo(dos);
+ ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
+ RpcWritable.Buffer buf1 = RpcWritable.Buffer.wrap(bb);
+ Assert.assertEquals(baos.size(), bb.remaining());
+ Assert.assertEquals(baos.size(), buf1.remaining());
+
+ Object actual = buf1.newInstance(LongWritable.class, null);
+ Assert.assertEquals(writable, actual);
+ int left = bb.remaining();
+ Assert.assertTrue(left > 0);
+ Assert.assertEquals(left, buf1.remaining());
+
+ // original bb now appears empty, but rpc writable has a slice of the bb.
+ RpcWritable.Buffer buf2 = buf1.newInstance(RpcWritable.Buffer.class, null);
+ Assert.assertEquals(0, bb.remaining());
+ Assert.assertEquals(0, buf1.remaining());
+ Assert.assertEquals(left, buf2.remaining());
+
+ actual = buf2.getValue(EchoRequestProto.getDefaultInstance());
+ Assert.assertEquals(message1, actual);
+ Assert.assertTrue(buf2.remaining() > 0);
+
+ actual = buf2.getValue(EchoRequestProto.getDefaultInstance());
+ Assert.assertEquals(message2, actual);
+ Assert.assertEquals(0, buf2.remaining());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]