This is an automated email from the ASF dual-hosted git repository.
ivandika3 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 32a88e76aa6 HDDS-15205. Cut WritableRPCEngine (#10213)
32a88e76aa6 is described below
commit 32a88e76aa6c3c132d0c95554871fb8e031a4261
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Sat May 9 08:52:45 2026 +0200
HDDS-15205. Cut WritableRPCEngine (#10213)
---
.../src/main/java/org/apache/hadoop/ipc_/RPC.java | 18 +-
.../main/java/org/apache/hadoop/ipc_/Server.java | 40 +-
.../org/apache/hadoop/ipc_/WritableRpcEngine.java | 630 ---------------------
.../src/main/proto/RpcHeader.proto | 2 +-
4 files changed, 45 insertions(+), 645 deletions(-)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/RPC.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/RPC.java
index 2c544716f95..8f6711ccb84 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/RPC.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/RPC.java
@@ -83,7 +83,7 @@ public class RPC {
final static int RPC_SERVICE_CLASS_DEFAULT = 0;
public enum RpcKind {
RPC_BUILTIN ((short) 1), // Used for built in calls by tests
- RPC_WRITABLE ((short) 2), // Use WritableRpcEngine
+ RPC_WRITABLE ((short) 2), // ignored
RPC_PROTOCOL_BUFFER ((short) 3); // Use ProtobufRpcEngine
final static short MAX_SIZE = RPC_PROTOCOL_BUFFER.value; // used for array
size
private final short value;
@@ -216,8 +216,7 @@ static synchronized RpcEngine getProtocolEngine(Class<?>
protocol,
Configuration conf) {
RpcEngine engine = PROTOCOL_ENGINES.get(protocol);
if (engine == null) {
- Class<?> impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(),
- WritableRpcEngine.class);
+ Class<?> impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(),
ProtobufRpcEngine.class);
engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf);
PROTOCOL_ENGINES.put(protocol, engine);
}
@@ -1034,6 +1033,19 @@ Map<ProtoNameVer, ProtoClassProtoImpl>
getProtocolImplMap(RPC.RpcKind rpcKind) {
return protocolImplMapArray.get(rpcKind.ordinal());
}
+ /**
+ * Returns {@code true} only if at least one protocol has been registered
+ * on this server instance for the given {@link RPC.RpcKind}.
+ * Used to reject incoming requests for unsupported RPC kinds before any
+ * deserialization of the request payload takes place.
+ * @param rpcKind the RPC kind from the incoming request header.
+ * @return {@code true} if at least one protocol is registered for this
kind.
+ */
+ boolean hasRegisteredProtocols(RPC.RpcKind rpcKind) {
+ Map<ProtoNameVer, ProtoClassProtoImpl> implMap =
getProtocolImplMap(rpcKind);
+ return implMap != null && !implMap.isEmpty();
+ }
+
// Register protocol and its impl for rpc calls
void registerProtocolAndImpl(RpcKind rpcKind, Class<?> protocolClass,
Object protocolImpl) {
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/Server.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/Server.java
index e0e4517ad58..2f767ae4bdf 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/Server.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/Server.java
@@ -2654,15 +2654,33 @@ private void checkRpcHeaders(RpcRequestHeaderProto
header)
private void processRpcRequest(RpcRequestHeaderProto header,
RpcWritable.Buffer buffer) throws RpcServerException,
InterruptedException {
- Class<? extends Writable> rpcRequestClass =
+ if (header.getRpcKind() == RpcKindProto.RPC_WRITABLE) {
+ final String err = "WritableRpcEngine is not supported.";
+ LOG.warn("{} Client: {}", err, getHostAddress());
+ throw new
FatalRpcServerException(RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
+ }
+ // Reject requests for RPC kinds with no registered protocols on this
+ // server instance. This prevents deserialization of untrusted payloads
+ // for unsupported kinds. See HADOOP-19864.
+ if (Server.this instanceof RPC.Server) {
+ RPC.Server server = (RPC.Server) Server.this;
+ final RPC.RpcKind kind = ProtoUtil.convert(header.getRpcKind());
+ if (!server.hasRegisteredProtocols(kind)) {
+ final String err = "No protocols registered on this server for
RpcKind "
+ + header.getRpcKind()
+ + ". Rejecting request without deserialization.";
+ LOG.info("{} Client: {}", err, getHostAddress());
+ throw new FatalRpcServerException(
+ RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
+ }
+ }
+ Class<? extends Writable> rpcRequestClass =
getRpcRequestWrapper(header.getRpcKind());
if (rpcRequestClass == null) {
- LOG.warn("Unknown rpc kind " + header.getRpcKind() +
- " from client " + getHostAddress());
- final String err = "Unknown rpc kind in rpc header" +
- header.getRpcKind();
+ LOG.warn("Unknown rpc kind {} from client {}", header.getRpcKind(),
getHostAddress());
throw new FatalRpcServerException(
- RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
+ RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
+ "Unknown rpc kind in rpc header " + header.getRpcKind());
}
Writable rpcRequest;
try { //Read the rpc request
@@ -2670,12 +2688,12 @@ private void processRpcRequest(RpcRequestHeaderProto
header,
} catch (RpcServerException rse) { // lets tests inject failures.
throw rse;
} catch (Throwable t) { // includes runtime exception from newInstance
- LOG.warn("Unable to read call parameters for client " +
- getHostAddress() + "on connection protocol " +
- this.protocolName + " for rpcKind " + header.getRpcKind(), t);
- String err = "IPC server unable to read call parameters: "+
t.getMessage();
+ LOG.warn(
+ "Unable to read call parameters for client {} on connection
protocol {} for rpcKind {}",
+ getHostAddress(), this.protocolName, header.getRpcKind(), t);
throw new FatalRpcServerException(
- RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST, err);
+ RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST,
+ "IPC server unable to read call parameters: "+ t.getMessage());
}
CallerContext callerContext = null;
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/WritableRpcEngine.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/WritableRpcEngine.java
deleted file mode 100644
index d23e59b4a1f..00000000000
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/WritableRpcEngine.java
+++ /dev/null
@@ -1,630 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ipc_;
-
-import java.lang.reflect.Proxy;
-import java.lang.reflect.Method;
-import java.lang.reflect.InvocationTargetException;
-
-import java.net.InetSocketAddress;
-import java.io.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.net.SocketFactory;
-
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.ipc_.Client.ConnectionId;
-import org.apache.hadoop.ipc_.RPC.RpcInvoker;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.SecretManager;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.util.Time;
-import org.apache.hadoop.conf.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/** An RpcEngine implementation for Writable data. */
-@Deprecated
-public class WritableRpcEngine implements RpcEngine {
- private static final Logger LOG = LoggerFactory.getLogger(RPC.class);
-
- //writableRpcVersion should be updated if there is a change
- //in format of the rpc messages.
-
- // 2L - added declared class to Invocation
- public static final long writableRpcVersion = 2L;
-
- /**
- * Whether or not this class has been initialized.
- */
- private static boolean isInitialized = false;
-
- static {
- ensureInitialized();
- }
-
- /**
- * Initialize this class if it isn't already.
- */
- public static synchronized void ensureInitialized() {
- if (!isInitialized) {
- initialize();
- }
- }
-
- /**
- * Register the rpcRequest deserializer for WritableRpcEngine
- */
- private static synchronized void initialize() {
-
org.apache.hadoop.ipc_.Server.registerProtocolEngine(RPC.RpcKind.RPC_WRITABLE,
- Invocation.class, new Server.WritableRpcInvoker());
- isInitialized = true;
- }
-
-
- /** A method invocation, including the method name and its parameters.*/
- private static class Invocation implements Writable, Configurable {
- private String methodName;
- private Class<?>[] parameterClasses;
- private Object[] parameters;
- private Configuration conf;
- private long clientVersion;
- private int clientMethodsHash;
- private String declaringClassProtocolName;
-
- //This could be different from static writableRpcVersion when received
- //at server, if client is using a different version.
- private long rpcVersion;
-
- @SuppressWarnings("unused") // called when deserializing an invocation
- public Invocation() {}
-
- public Invocation(Method method, Object[] parameters) {
- this.methodName = method.getName();
- this.parameterClasses = method.getParameterTypes();
- this.parameters = parameters;
- rpcVersion = writableRpcVersion;
- if (method.getDeclaringClass().equals(VersionedProtocol.class)) {
- //VersionedProtocol is exempted from version check.
- clientVersion = 0;
- clientMethodsHash = 0;
- } else {
- this.clientVersion =
RPC.getProtocolVersion(method.getDeclaringClass());
- this.clientMethodsHash = ProtocolSignature.getFingerprint(method
- .getDeclaringClass().getMethods());
- }
- this.declaringClassProtocolName =
- RPC.getProtocolName(method.getDeclaringClass());
- }
-
- /** The name of the method invoked. */
- public String getMethodName() { return methodName; }
-
- /** The parameter classes. */
- public Class<?>[] getParameterClasses() { return parameterClasses; }
-
- /** The parameter instances. */
- public Object[] getParameters() { return parameters; }
-
- private long getProtocolVersion() {
- return clientVersion;
- }
-
- @SuppressWarnings("unused")
- private int getClientMethodsHash() {
- return clientMethodsHash;
- }
-
- /**
- * Returns the rpc version used by the client.
- * @return rpcVersion
- */
- public long getRpcVersion() {
- return rpcVersion;
- }
-
- @Override
- @SuppressWarnings("deprecation")
- public void readFields(DataInput in) throws IOException {
- rpcVersion = in.readLong();
- declaringClassProtocolName = UTF8.readString(in);
- methodName = UTF8.readString(in);
- clientVersion = in.readLong();
- clientMethodsHash = in.readInt();
- parameters = new Object[in.readInt()];
- parameterClasses = new Class[parameters.length];
- ObjectWritable objectWritable = new ObjectWritable();
- for (int i = 0; i < parameters.length; i++) {
- parameters[i] =
- ObjectWritable.readObject(in, objectWritable, this.conf);
- parameterClasses[i] = objectWritable.getDeclaredClass();
- }
- }
-
- @Override
- @SuppressWarnings("deprecation")
- public void write(DataOutput out) throws IOException {
- out.writeLong(rpcVersion);
- UTF8.writeString(out, declaringClassProtocolName);
- UTF8.writeString(out, methodName);
- out.writeLong(clientVersion);
- out.writeInt(clientMethodsHash);
- out.writeInt(parameterClasses.length);
- for (int i = 0; i < parameterClasses.length; i++) {
- ObjectWritable.writeObject(out, parameters[i], parameterClasses[i],
- conf, true);
- }
- }
-
- @Override
- public String toString() {
- StringBuilder buffer = new StringBuilder();
- buffer.append(methodName);
- buffer.append("(");
- for (int i = 0; i < parameters.length; i++) {
- if (i != 0)
- buffer.append(", ");
- buffer.append(parameters[i]);
- }
- buffer.append(")");
- buffer.append(", rpc version="+rpcVersion);
- buffer.append(", client version="+clientVersion);
- buffer.append(", methodsFingerPrint="+clientMethodsHash);
- return buffer.toString();
- }
-
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
-
- @Override
- public Configuration getConf() {
- return this.conf;
- }
-
- }
-
- private static ClientCache CLIENTS=new ClientCache();
-
- private static class Invoker implements RpcInvocationHandler {
- private Client.ConnectionId remoteId;
- private Client client;
- private boolean isClosed = false;
- private final AtomicBoolean fallbackToSimpleAuth;
- private final AlignmentContext alignmentContext;
-
- public Invoker(Class<?> protocol,
- InetSocketAddress address, UserGroupInformation ticket,
- Configuration conf, SocketFactory factory,
- int rpcTimeout, AtomicBoolean fallbackToSimpleAuth,
- AlignmentContext alignmentContext)
- throws IOException {
- this.remoteId = Client.ConnectionId.getConnectionId(address, protocol,
- ticket, rpcTimeout, null, conf);
- this.client = CLIENTS.getClient(conf, factory);
- this.fallbackToSimpleAuth = fallbackToSimpleAuth;
- this.alignmentContext = alignmentContext;
- }
-
- @Override
- public Object invoke(Object proxy, Method method, Object[] args)
- throws Throwable {
- long startTime = 0;
- if (LOG.isDebugEnabled()) {
- startTime = Time.monotonicNow();
- }
-
- ObjectWritable value = (ObjectWritable)
- client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args),
- remoteId, fallbackToSimpleAuth, alignmentContext);
- if (LOG.isDebugEnabled()) {
- long callTime = Time.monotonicNow() - startTime;
- LOG.debug("Call: " + method.getName() + " " + callTime);
- }
- return value.get();
- }
-
- /* close the IPC client that's responsible for this invoker's RPCs */
- @Override
- synchronized public void close() {
- if (!isClosed) {
- isClosed = true;
- CLIENTS.stopClient(client);
- }
- }
-
- @Override
- public ConnectionId getConnectionId() {
- return remoteId;
- }
- }
-
- // for unit testing only
- static Client getClient(Configuration conf) {
- return CLIENTS.getClient(conf);
- }
-
- /**
- * Construct a client-side proxy object that implements the named protocol,
- * talking to a server at the named address.
- * @param <T> Generics Type T
- * @param protocol input protocol.
- * @param clientVersion input clientVersion.
- * @param addr input addr.
- * @param ticket input ticket.
- * @param conf input configuration.
- * @param factory input factory.
- * @param rpcTimeout input rpcTimeout.
- * @param connectionRetryPolicy input connectionRetryPolicy.
- * @throws IOException raised on errors performing I/O.
- */
- @Override
- public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
- InetSocketAddress addr, UserGroupInformation ticket,
- Configuration conf, SocketFactory factory,
- int rpcTimeout, RetryPolicy connectionRetryPolicy)
- throws IOException {
- return getProxy(protocol, clientVersion, addr, ticket, conf, factory,
- rpcTimeout, connectionRetryPolicy, null, null);
- }
-
- /**
- * Construct a client-side proxy object with a ConnectionId.
- *
- * @param <T> Generics Type T.
- * @param protocol input protocol.
- * @param clientVersion input clientVersion.
- * @param connId input ConnectionId.
- * @param conf input Configuration.
- * @param factory input factory.
- * @throws IOException raised on errors performing I/O.
- * @return ProtocolProxy.
- */
- @Override
- public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
- Client.ConnectionId connId, Configuration conf, SocketFactory factory)
- throws IOException {
- return getProxy(protocol, clientVersion, connId.getAddress(),
- connId.getTicket(), conf, factory, connId.getRpcTimeout(),
- connId.getRetryPolicy(), null, null);
- }
-
- /**
- * Construct a client-side proxy object that implements the named protocol,
- * talking to a server at the named address.
- * @param <T> Generics Type.
- * @param protocol input protocol.
- * @param clientVersion input clientVersion.
- * @param addr input addr.
- * @param ticket input ticket.
- * @param conf input configuration.
- * @param factory input factory.
- * @param rpcTimeout input rpcTimeout.
- * @param connectionRetryPolicy input connectionRetryPolicy.
- * @param fallbackToSimpleAuth input fallbackToSimpleAuth.
- * @param alignmentContext input alignmentContext.
- * @return ProtocolProxy.
- */
- @Override
- @SuppressWarnings("unchecked")
- public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
- InetSocketAddress addr, UserGroupInformation ticket,
- Configuration conf, SocketFactory factory,
- int rpcTimeout, RetryPolicy connectionRetryPolicy,
- AtomicBoolean fallbackToSimpleAuth,
- AlignmentContext alignmentContext)
- throws IOException {
-
- if (connectionRetryPolicy != null) {
- throw new UnsupportedOperationException(
- "Not supported: connectionRetryPolicy=" + connectionRetryPolicy);
- }
-
- T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
- new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf,
- factory, rpcTimeout, fallbackToSimpleAuth, alignmentContext));
- return new ProtocolProxy<T>(protocol, proxy, true);
- }
-
- /* Construct a server for a protocol implementation instance listening on a
- * port and address. */
- @Override
- public RPC.Server getServer(Class<?> protocolClass,
- Object protocolImpl, String bindAddress, int port,
- int numHandlers, int numReaders, int queueSizePerHandler,
- boolean verbose, Configuration conf,
- SecretManager<? extends TokenIdentifier> secretManager,
- String portRangeConfig, AlignmentContext
alignmentContext)
- throws IOException {
- return new Server(protocolClass, protocolImpl, conf, bindAddress, port,
- numHandlers, numReaders, queueSizePerHandler, verbose, secretManager,
- portRangeConfig, alignmentContext);
- }
-
-
- /** An RPC Server. */
- @Deprecated
- public static class Server extends RPC.Server {
- /**
- * Construct an RPC server.
- * @param instance the instance whose methods will be called
- * @param conf the configuration to use
- * @param bindAddress the address to bind on to listen for connection
- * @param port the port to listen for connections on
- *
- * @deprecated Use #Server(Class, Object, Configuration, String, int)
- * @throws IOException raised on errors performing I/O.
- */
- @Deprecated
- public Server(Object instance, Configuration conf, String bindAddress,
- int port) throws IOException {
- this(null, instance, conf, bindAddress, port);
- }
-
-
- /** Construct an RPC server.
- * @param protocolClass class
- * @param protocolImpl the instance whose methods will be called
- * @param conf the configuration to use
- * @param bindAddress the address to bind on to listen for connection
- * @param port the port to listen for connections on
- * @throws IOException raised on errors performing I/O.
- */
- public Server(Class<?> protocolClass, Object protocolImpl,
- Configuration conf, String bindAddress, int port)
- throws IOException {
- this(protocolClass, protocolImpl, conf, bindAddress, port, 1, -1, -1,
- false, null, null);
- }
-
- /**
- * Construct an RPC server.
- * @param protocolImpl the instance whose methods will be called
- * @param conf the configuration to use
- * @param bindAddress the address to bind on to listen for connection
- * @param port the port to listen for connections on
- * @param numHandlers the number of method handler threads to run
- * @param verbose whether each call should be logged
- * @param numReaders input numberReaders.
- * @param queueSizePerHandler input queueSizePerHandler.
- * @param secretManager input secretManager.
- *
- * @deprecated use Server#Server(Class, Object,
- * Configuration, String, int, int, int, int, boolean, SecretManager)
- * @throws IOException raised on errors performing I/O.
- */
- @Deprecated
- public Server(Object protocolImpl, Configuration conf, String bindAddress,
- int port, int numHandlers, int numReaders, int queueSizePerHandler,
- boolean verbose, SecretManager<? extends TokenIdentifier>
secretManager)
- throws IOException {
- this(null, protocolImpl, conf, bindAddress, port,
- numHandlers, numReaders, queueSizePerHandler, verbose,
- secretManager, null);
-
- }
-
- /**
- * Construct an RPC server.
- * @param protocolClass - the protocol being registered
- * can be null for compatibility with old usage (see below for details)
- * @param protocolImpl the protocol impl that will be called
- * @param conf the configuration to use
- * @param bindAddress the address to bind on to listen for connection
- * @param port the port to listen for connections on
- * @param numHandlers the number of method handler threads to run
- * @param verbose whether each call should be logged
- * @param secretManager input secretManager.
- * @param queueSizePerHandler input queueSizePerHandler.
- * @param portRangeConfig input portRangeConfig.
- * @param numReaders input numReaders.
- *
- * @deprecated use Server#Server(Class, Object,
- * Configuration, String, int, int, int, int, boolean, SecretManager)
- * @throws IOException raised on errors performing I/O.
- */
- @Deprecated
- public Server(Class<?> protocolClass, Object protocolImpl,
- Configuration conf, String bindAddress, int port,
- int numHandlers, int numReaders, int queueSizePerHandler,
- boolean verbose, SecretManager<? extends TokenIdentifier>
secretManager,
- String portRangeConfig)
- throws IOException {
- this(null, protocolImpl, conf, bindAddress, port,
- numHandlers, numReaders, queueSizePerHandler, verbose,
- secretManager, null, null);
- }
-
- /**
- * Construct an RPC server.
- * @param protocolClass - the protocol being registered
- * can be null for compatibility with old usage (see below for details)
- * @param protocolImpl the protocol impl that will be called
- * @param conf the configuration to use
- * @param bindAddress the address to bind on to listen for connection
- * @param port the port to listen for connections on
- * @param numHandlers the number of method handler threads to run
- * @param verbose whether each call should be logged
- * @param alignmentContext provides server state info on client responses
- * @param numReaders input numReaders.
- * @param portRangeConfig input portRangeConfig.
- * @param queueSizePerHandler input queueSizePerHandler.
- * @param secretManager input secretManager.
- * @throws IOException raised on errors performing I/O.
- */
- public Server(Class<?> protocolClass, Object protocolImpl,
- Configuration conf, String bindAddress, int port,
- int numHandlers, int numReaders, int queueSizePerHandler,
- boolean verbose, SecretManager<? extends TokenIdentifier>
secretManager,
- String portRangeConfig, AlignmentContext alignmentContext)
- throws IOException {
- super(bindAddress, port, null, numHandlers, numReaders,
- queueSizePerHandler, conf,
- serverNameFromClass(protocolImpl.getClass()), secretManager,
- portRangeConfig);
- setAlignmentContext(alignmentContext);
- this.verbose = verbose;
-
-
- Class<?>[] protocols;
- if (protocolClass == null) { // derive protocol from impl
- /*
- * In order to remain compatible with the old usage where a single
- * target protocolImpl is suppled for all protocol interfaces, and
- * the protocolImpl is derived from the protocolClass(es)
- * we register all interfaces extended by the protocolImpl
- */
- protocols = RPC.getProtocolInterfaces(protocolImpl.getClass());
-
- } else {
- if (!protocolClass.isAssignableFrom(protocolImpl.getClass())) {
- throw new IOException("protocolClass "+ protocolClass +
- " is not implemented by protocolImpl which is of class " +
- protocolImpl.getClass());
- }
- // register protocol class and its super interfaces
- registerProtocolAndImpl(RPC.RpcKind.RPC_WRITABLE, protocolClass,
protocolImpl);
- protocols = RPC.getProtocolInterfaces(protocolClass);
- }
- for (Class<?> p : protocols) {
- if (!p.equals(VersionedProtocol.class)) {
- registerProtocolAndImpl(RPC.RpcKind.RPC_WRITABLE, p, protocolImpl);
- }
- }
-
- }
-
- private static void log(String value) {
- if (value!= null && value.length() > 55)
- value = value.substring(0, 55)+"...";
- LOG.info(value);
- }
-
- @Deprecated
- static class WritableRpcInvoker implements RpcInvoker {
-
- @Override
- public Writable call(org.apache.hadoop.ipc_.RPC.Server server,
- String protocolName, Writable rpcRequest, long receivedTime)
- throws IOException, RPC.VersionMismatch {
-
- Invocation call = (Invocation)rpcRequest;
- if (server.verbose) log("Call: " + call);
-
- // Verify writable rpc version
- if (call.getRpcVersion() != writableRpcVersion) {
- // Client is using a different version of WritableRpc
- throw new RpcServerException(
- "WritableRpc version mismatch, client side version="
- + call.getRpcVersion() + ", server side version="
- + writableRpcVersion);
- }
-
- long clientVersion = call.getProtocolVersion();
- final String protoName;
- ProtoClassProtoImpl protocolImpl;
- if
(call.declaringClassProtocolName.equals(VersionedProtocol.class.getName())) {
- // VersionProtocol methods are often used by client to figure out
- // which version of protocol to use.
- //
- // Versioned protocol methods should go the protocolName protocol
- // rather than the declaring class of the method since the
- // the declaring class is VersionedProtocol which is not
- // registered directly.
- // Send the call to the highest protocol version
- VerProtocolImpl highest = server.getHighestSupportedProtocol(
- RPC.RpcKind.RPC_WRITABLE, protocolName);
- if (highest == null) {
- throw new RpcServerException("Unknown protocol: " + protocolName);
- }
- protocolImpl = highest.protocolTarget;
- } else {
- protoName = call.declaringClassProtocolName;
-
- // Find the right impl for the protocol based on client version.
- ProtoNameVer pv =
- new ProtoNameVer(call.declaringClassProtocolName, clientVersion);
- protocolImpl =
- server.getProtocolImplMap(RPC.RpcKind.RPC_WRITABLE).get(pv);
- if (protocolImpl == null) { // no match for Protocol AND Version
- VerProtocolImpl highest =
- server.getHighestSupportedProtocol(RPC.RpcKind.RPC_WRITABLE,
- protoName);
- if (highest == null) {
- throw new RpcServerException("Unknown protocol: " + protoName);
- } else { // protocol supported but not the version that client
wants
- throw new RPC.VersionMismatch(protoName, clientVersion,
- highest.version);
- }
- }
- }
-
- // Invoke the protocol method
- Exception exception = null;
- Call currentCall = Server.getCurCall().get();
- try {
- Method method =
- protocolImpl.protocolClass.getMethod(call.getMethodName(),
- call.getParameterClasses());
- method.setAccessible(true);
- server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
- currentCall.setDetailedMetricsName(call.getMethodName());
- Object value =
- method.invoke(protocolImpl.protocolImpl, call.getParameters());
- if (server.verbose) log("Return: "+value);
- return new ObjectWritable(method.getReturnType(), value);
-
- } catch (InvocationTargetException e) {
- Throwable target = e.getTargetException();
- if (target instanceof IOException) {
- exception = (IOException)target;
- throw (IOException)target;
- } else {
- IOException ioe = new IOException(target.toString());
- ioe.setStackTrace(target.getStackTrace());
- exception = ioe;
- throw ioe;
- }
- } catch (Throwable e) {
- if (!(e instanceof IOException)) {
- LOG.error("Unexpected throwable object ", e);
- }
- IOException ioe = new IOException(e.toString());
- ioe.setStackTrace(e.getStackTrace());
- exception = ioe;
- throw ioe;
- } finally {
- if (exception != null) {
- currentCall.setDetailedMetricsName(
- exception.getClass().getSimpleName());
- }
- }
- }
- }
- }
-
- @Override
- public ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy(
- ConnectionId connId, Configuration conf, SocketFactory factory)
- throws IOException {
- throw new UnsupportedOperationException("This proxy is not supported");
- }
-}
diff --git a/hadoop-hdds/interface-client/src/main/proto/RpcHeader.proto
b/hadoop-hdds/interface-client/src/main/proto/RpcHeader.proto
index a803ff68f97..72e6f9c924f 100644
--- a/hadoop-hdds/interface-client/src/main/proto/RpcHeader.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/RpcHeader.proto
@@ -47,7 +47,7 @@ package hadoop.common;
*/
enum RpcKindProto {
RPC_BUILTIN = 0; // Used for built in calls by tests
- RPC_WRITABLE = 1; // Use WritableRpcEngine
+ RPC_WRITABLE = 1; // ignored
RPC_PROTOCOL_BUFFER = 2; // Use ProtobufRpcEngine
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]