This is an automated email from the ASF dual-hosted git repository.

ivandika3 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 32a88e76aa6 HDDS-15205. Cut WritableRPCEngine (#10213)
32a88e76aa6 is described below

commit 32a88e76aa6c3c132d0c95554871fb8e031a4261
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Sat May 9 08:52:45 2026 +0200

    HDDS-15205. Cut WritableRPCEngine (#10213)
---
 .../src/main/java/org/apache/hadoop/ipc_/RPC.java  |  18 +-
 .../main/java/org/apache/hadoop/ipc_/Server.java   |  40 +-
 .../org/apache/hadoop/ipc_/WritableRpcEngine.java  | 630 ---------------------
 .../src/main/proto/RpcHeader.proto                 |   2 +-
 4 files changed, 45 insertions(+), 645 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/RPC.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/RPC.java
index 2c544716f95..8f6711ccb84 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/RPC.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/RPC.java
@@ -83,7 +83,7 @@ public class RPC {
   final static int RPC_SERVICE_CLASS_DEFAULT = 0;
   public enum RpcKind {
     RPC_BUILTIN ((short) 1),         // Used for built in calls by tests
-    RPC_WRITABLE ((short) 2),        // Use WritableRpcEngine 
+    RPC_WRITABLE ((short) 2),        // ignored
     RPC_PROTOCOL_BUFFER ((short) 3); // Use ProtobufRpcEngine
     final static short MAX_SIZE = RPC_PROTOCOL_BUFFER.value; // used for array 
size
     private final short value;
@@ -216,8 +216,7 @@ static synchronized RpcEngine getProtocolEngine(Class<?> 
protocol,
       Configuration conf) {
     RpcEngine engine = PROTOCOL_ENGINES.get(protocol);
     if (engine == null) {
-      Class<?> impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(),
-                                    WritableRpcEngine.class);
+      Class<?> impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(), 
ProtobufRpcEngine.class);
       engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf);
       PROTOCOL_ENGINES.put(protocol, engine);
     }
@@ -1034,6 +1033,19 @@ Map<ProtoNameVer, ProtoClassProtoImpl> 
getProtocolImplMap(RPC.RpcKind rpcKind) {
      return protocolImplMapArray.get(rpcKind.ordinal());   
    }
    
+    /**
+     * Returns {@code true} only if at least one protocol has been registered
+     * on this server instance for the given {@link RPC.RpcKind}.
+     * Used to reject incoming requests for unsupported RPC kinds before any
+     * deserialization of the request payload takes place.
+     * @param rpcKind the RPC kind from the incoming request header.
+     * @return {@code true} if at least one protocol is registered for this 
kind.
+     */
+    boolean hasRegisteredProtocols(RPC.RpcKind rpcKind) {
+      Map<ProtoNameVer, ProtoClassProtoImpl> implMap = 
getProtocolImplMap(rpcKind);
+      return implMap != null && !implMap.isEmpty();
+    }
+
    // Register  protocol and its impl for rpc calls
    void registerProtocolAndImpl(RpcKind rpcKind, Class<?> protocolClass, 
        Object protocolImpl) {
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/Server.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/Server.java
index e0e4517ad58..2f767ae4bdf 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/Server.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/Server.java
@@ -2654,15 +2654,33 @@ private void checkRpcHeaders(RpcRequestHeaderProto 
header)
     private void processRpcRequest(RpcRequestHeaderProto header,
         RpcWritable.Buffer buffer) throws RpcServerException,
         InterruptedException {
-      Class<? extends Writable> rpcRequestClass = 
+      if (header.getRpcKind() == RpcKindProto.RPC_WRITABLE) {
+        final String err = "WritableRpcEngine is not supported.";
+        LOG.warn("{} Client: {}", err, getHostAddress());
+        throw new 
FatalRpcServerException(RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
+      }
+      // Reject requests for RPC kinds with no registered protocols on this
+      // server instance. This prevents deserialization of untrusted payloads
+      // for unsupported kinds. See HADOOP-19864.
+      if (Server.this instanceof RPC.Server) {
+        RPC.Server server = (RPC.Server) Server.this;
+        final RPC.RpcKind kind = ProtoUtil.convert(header.getRpcKind());
+        if (!server.hasRegisteredProtocols(kind)) {
+          final String err = "No protocols registered on this server for 
RpcKind "
+              + header.getRpcKind()
+              + ". Rejecting request without deserialization.";
+          LOG.info("{} Client: {}", err, getHostAddress());
+          throw new FatalRpcServerException(
+              RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
+        }
+      }
+      Class<? extends Writable> rpcRequestClass =
           getRpcRequestWrapper(header.getRpcKind());
       if (rpcRequestClass == null) {
-        LOG.warn("Unknown rpc kind "  + header.getRpcKind() + 
-            " from client " + getHostAddress());
-        final String err = "Unknown rpc kind in rpc header"  + 
-            header.getRpcKind();
+        LOG.warn("Unknown rpc kind {} from client {}", header.getRpcKind(), 
getHostAddress());
         throw new FatalRpcServerException(
-            RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
+            RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
+            "Unknown rpc kind in rpc header " + header.getRpcKind());
       }
       Writable rpcRequest;
       try { //Read the rpc request
@@ -2670,12 +2688,12 @@ private void processRpcRequest(RpcRequestHeaderProto 
header,
       } catch (RpcServerException rse) { // lets tests inject failures.
         throw rse;
       } catch (Throwable t) { // includes runtime exception from newInstance
-        LOG.warn("Unable to read call parameters for client " +
-                 getHostAddress() + "on connection protocol " +
-            this.protocolName + " for rpcKind " + header.getRpcKind(),  t);
-        String err = "IPC server unable to read call parameters: "+ 
t.getMessage();
+        LOG.warn(
+            "Unable to read call parameters for client {} on connection 
protocol {} for rpcKind {}",
+            getHostAddress(), this.protocolName, header.getRpcKind(), t);
         throw new FatalRpcServerException(
-            RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST, err);
+            RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST,
+            "IPC server unable to read call parameters: "+ t.getMessage());
       }
         
       CallerContext callerContext = null;
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/WritableRpcEngine.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/WritableRpcEngine.java
deleted file mode 100644
index d23e59b4a1f..00000000000
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/WritableRpcEngine.java
+++ /dev/null
@@ -1,630 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ipc_;
-
-import java.lang.reflect.Proxy;
-import java.lang.reflect.Method;
-import java.lang.reflect.InvocationTargetException;
-
-import java.net.InetSocketAddress;
-import java.io.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.net.SocketFactory;
-
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.ipc_.Client.ConnectionId;
-import org.apache.hadoop.ipc_.RPC.RpcInvoker;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.SecretManager;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.util.Time;
-import org.apache.hadoop.conf.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/** An RpcEngine implementation for Writable data. */
-@Deprecated
-public class WritableRpcEngine implements RpcEngine {
-  private static final Logger LOG = LoggerFactory.getLogger(RPC.class);
-  
-  //writableRpcVersion should be updated if there is a change
-  //in format of the rpc messages.
-  
-  // 2L - added declared class to Invocation
-  public static final long writableRpcVersion = 2L;
-  
-  /**
-   * Whether or not this class has been initialized.
-   */
-  private static boolean isInitialized = false;
-  
-  static { 
-    ensureInitialized();
-  }
-  
-  /**
-   * Initialize this class if it isn't already.
-   */
-  public static synchronized void ensureInitialized() {
-    if (!isInitialized) {
-      initialize();
-    }
-  }
-  
-  /**
-   * Register the rpcRequest deserializer for WritableRpcEngine
-   */
-  private static synchronized void initialize() {
-    
org.apache.hadoop.ipc_.Server.registerProtocolEngine(RPC.RpcKind.RPC_WRITABLE,
-        Invocation.class, new Server.WritableRpcInvoker());
-    isInitialized = true;
-  }
-
-  
-  /** A method invocation, including the method name and its parameters.*/
-  private static class Invocation implements Writable, Configurable {
-    private String methodName;
-    private Class<?>[] parameterClasses;
-    private Object[] parameters;
-    private Configuration conf;
-    private long clientVersion;
-    private int clientMethodsHash;
-    private String declaringClassProtocolName;
-    
-    //This could be different from static writableRpcVersion when received
-    //at server, if client is using a different version.
-    private long rpcVersion;
-
-    @SuppressWarnings("unused") // called when deserializing an invocation
-    public Invocation() {}
-
-    public Invocation(Method method, Object[] parameters) {
-      this.methodName = method.getName();
-      this.parameterClasses = method.getParameterTypes();
-      this.parameters = parameters;
-      rpcVersion = writableRpcVersion;
-      if (method.getDeclaringClass().equals(VersionedProtocol.class)) {
-        //VersionedProtocol is exempted from version check.
-        clientVersion = 0;
-        clientMethodsHash = 0;
-      } else {
-        this.clientVersion = 
RPC.getProtocolVersion(method.getDeclaringClass());
-        this.clientMethodsHash = ProtocolSignature.getFingerprint(method
-            .getDeclaringClass().getMethods());
-      }
-      this.declaringClassProtocolName = 
-          RPC.getProtocolName(method.getDeclaringClass());
-    }
-
-    /** The name of the method invoked. */
-    public String getMethodName() { return methodName; }
-
-    /** The parameter classes. */
-    public Class<?>[] getParameterClasses() { return parameterClasses; }
-
-    /** The parameter instances. */
-    public Object[] getParameters() { return parameters; }
-    
-    private long getProtocolVersion() {
-      return clientVersion;
-    }
-
-    @SuppressWarnings("unused")
-    private int getClientMethodsHash() {
-      return clientMethodsHash;
-    }
-    
-    /**
-     * Returns the rpc version used by the client.
-     * @return rpcVersion
-     */
-    public long getRpcVersion() {
-      return rpcVersion;
-    }
-
-    @Override
-    @SuppressWarnings("deprecation")
-    public void readFields(DataInput in) throws IOException {
-      rpcVersion = in.readLong();
-      declaringClassProtocolName = UTF8.readString(in);
-      methodName = UTF8.readString(in);
-      clientVersion = in.readLong();
-      clientMethodsHash = in.readInt();
-      parameters = new Object[in.readInt()];
-      parameterClasses = new Class[parameters.length];
-      ObjectWritable objectWritable = new ObjectWritable();
-      for (int i = 0; i < parameters.length; i++) {
-        parameters[i] = 
-            ObjectWritable.readObject(in, objectWritable, this.conf);
-        parameterClasses[i] = objectWritable.getDeclaredClass();
-      }
-    }
-
-    @Override
-    @SuppressWarnings("deprecation")
-    public void write(DataOutput out) throws IOException {
-      out.writeLong(rpcVersion);
-      UTF8.writeString(out, declaringClassProtocolName);
-      UTF8.writeString(out, methodName);
-      out.writeLong(clientVersion);
-      out.writeInt(clientMethodsHash);
-      out.writeInt(parameterClasses.length);
-      for (int i = 0; i < parameterClasses.length; i++) {
-        ObjectWritable.writeObject(out, parameters[i], parameterClasses[i],
-                                   conf, true);
-      }
-    }
-
-    @Override
-    public String toString() {
-      StringBuilder buffer = new StringBuilder();
-      buffer.append(methodName);
-      buffer.append("(");
-      for (int i = 0; i < parameters.length; i++) {
-        if (i != 0)
-          buffer.append(", ");
-        buffer.append(parameters[i]);
-      }
-      buffer.append(")");
-      buffer.append(", rpc version="+rpcVersion);
-      buffer.append(", client version="+clientVersion);
-      buffer.append(", methodsFingerPrint="+clientMethodsHash);
-      return buffer.toString();
-    }
-
-    @Override
-    public void setConf(Configuration conf) {
-      this.conf = conf;
-    }
-
-    @Override
-    public Configuration getConf() {
-      return this.conf;
-    }
-
-  }
-
-  private static ClientCache CLIENTS=new ClientCache();
-  
-  private static class Invoker implements RpcInvocationHandler {
-    private Client.ConnectionId remoteId;
-    private Client client;
-    private boolean isClosed = false;
-    private final AtomicBoolean fallbackToSimpleAuth;
-    private final AlignmentContext alignmentContext;
-
-    public Invoker(Class<?> protocol,
-                   InetSocketAddress address, UserGroupInformation ticket,
-                   Configuration conf, SocketFactory factory,
-                   int rpcTimeout, AtomicBoolean fallbackToSimpleAuth,
-                   AlignmentContext alignmentContext)
-        throws IOException {
-      this.remoteId = Client.ConnectionId.getConnectionId(address, protocol,
-          ticket, rpcTimeout, null, conf);
-      this.client = CLIENTS.getClient(conf, factory);
-      this.fallbackToSimpleAuth = fallbackToSimpleAuth;
-      this.alignmentContext = alignmentContext;
-    }
-
-    @Override
-    public Object invoke(Object proxy, Method method, Object[] args)
-      throws Throwable {
-      long startTime = 0;
-      if (LOG.isDebugEnabled()) {
-        startTime = Time.monotonicNow();
-      }
-
-      ObjectWritable value = (ObjectWritable)
-        client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args),
-          remoteId, fallbackToSimpleAuth, alignmentContext);
-      if (LOG.isDebugEnabled()) {
-        long callTime = Time.monotonicNow() - startTime;
-        LOG.debug("Call: " + method.getName() + " " + callTime);
-      }
-      return value.get();
-    }
-    
-    /* close the IPC client that's responsible for this invoker's RPCs */ 
-    @Override
-    synchronized public void close() {
-      if (!isClosed) {
-        isClosed = true;
-        CLIENTS.stopClient(client);
-      }
-    }
-
-    @Override
-    public ConnectionId getConnectionId() {
-      return remoteId;
-    }
-  }
-  
-  // for unit testing only
-  static Client getClient(Configuration conf) {
-    return CLIENTS.getClient(conf);
-  }
-  
-  /**
-   * Construct a client-side proxy object that implements the named protocol,
-   * talking to a server at the named address. 
-   * @param <T> Generics Type T
-   * @param protocol input protocol.
-   * @param clientVersion input clientVersion.
-   * @param addr input addr.
-   * @param ticket input ticket.
-   * @param conf input configuration.
-   * @param factory input factory.
-   * @param rpcTimeout input rpcTimeout.
-   * @param connectionRetryPolicy input connectionRetryPolicy.
-   * @throws IOException raised on errors performing I/O.
-   */
-  @Override
-  public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
-                         InetSocketAddress addr, UserGroupInformation ticket,
-                         Configuration conf, SocketFactory factory,
-                         int rpcTimeout, RetryPolicy connectionRetryPolicy)
-    throws IOException {
-    return getProxy(protocol, clientVersion, addr, ticket, conf, factory,
-      rpcTimeout, connectionRetryPolicy, null, null);
-  }
-
-  /**
-   * Construct a client-side proxy object with a ConnectionId.
-   *
-   * @param <T> Generics Type T.
-   * @param protocol input protocol.
-   * @param clientVersion input clientVersion.
-   * @param connId input ConnectionId.
-   * @param conf input Configuration.
-   * @param factory input factory.
-   * @throws IOException raised on errors performing I/O.
-   * @return ProtocolProxy.
-   */
-  @Override
-  public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
-      Client.ConnectionId connId, Configuration conf, SocketFactory factory)
-      throws IOException {
-    return getProxy(protocol, clientVersion, connId.getAddress(),
-        connId.getTicket(), conf, factory, connId.getRpcTimeout(),
-        connId.getRetryPolicy(), null, null);
-  }
-
-  /**
-   * Construct a client-side proxy object that implements the named protocol,
-   * talking to a server at the named address. 
-   * @param <T> Generics Type.
-   * @param protocol input protocol.
-   * @param clientVersion input clientVersion.
-   * @param addr input addr.
-   * @param ticket input ticket.
-   * @param conf input configuration.
-   * @param factory input factory.
-   * @param rpcTimeout input rpcTimeout.
-   * @param connectionRetryPolicy input connectionRetryPolicy.
-   * @param fallbackToSimpleAuth input fallbackToSimpleAuth.
-   * @param alignmentContext input alignmentContext.
-   * @return ProtocolProxy.
-   */
-  @Override
-  @SuppressWarnings("unchecked")
-  public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
-                         InetSocketAddress addr, UserGroupInformation ticket,
-                         Configuration conf, SocketFactory factory,
-                         int rpcTimeout, RetryPolicy connectionRetryPolicy,
-                         AtomicBoolean fallbackToSimpleAuth,
-                         AlignmentContext alignmentContext)
-    throws IOException {    
-
-    if (connectionRetryPolicy != null) {
-      throw new UnsupportedOperationException(
-          "Not supported: connectionRetryPolicy=" + connectionRetryPolicy);
-    }
-
-    T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
-        new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf,
-            factory, rpcTimeout, fallbackToSimpleAuth, alignmentContext));
-    return new ProtocolProxy<T>(protocol, proxy, true);
-  }
-  
-  /* Construct a server for a protocol implementation instance listening on a
-   * port and address. */
-  @Override
-  public RPC.Server getServer(Class<?> protocolClass,
-                      Object protocolImpl, String bindAddress, int port,
-                      int numHandlers, int numReaders, int queueSizePerHandler,
-                      boolean verbose, Configuration conf,
-                      SecretManager<? extends TokenIdentifier> secretManager,
-                      String portRangeConfig, AlignmentContext 
alignmentContext)
-    throws IOException {
-    return new Server(protocolClass, protocolImpl, conf, bindAddress, port,
-        numHandlers, numReaders, queueSizePerHandler, verbose, secretManager,
-        portRangeConfig, alignmentContext);
-  }
-
-
-  /** An RPC Server. */
-  @Deprecated
-  public static class Server extends RPC.Server {
-    /** 
-     * Construct an RPC server.
-     * @param instance the instance whose methods will be called
-     * @param conf the configuration to use
-     * @param bindAddress the address to bind on to listen for connection
-     * @param port the port to listen for connections on
-     * 
-     * @deprecated Use #Server(Class, Object, Configuration, String, int)
-     * @throws IOException raised on errors performing I/O.
-     */
-    @Deprecated
-    public Server(Object instance, Configuration conf, String bindAddress,
-        int port) throws IOException {
-      this(null, instance, conf,  bindAddress, port);
-    }
-    
-    
-    /** Construct an RPC server.
-     * @param protocolClass class
-     * @param protocolImpl the instance whose methods will be called
-     * @param conf the configuration to use
-     * @param bindAddress the address to bind on to listen for connection
-     * @param port the port to listen for connections on
-     * @throws IOException raised on errors performing I/O.
-     */
-    public Server(Class<?> protocolClass, Object protocolImpl, 
-        Configuration conf, String bindAddress, int port) 
-      throws IOException {
-      this(protocolClass, protocolImpl, conf,  bindAddress, port, 1, -1, -1,
-          false, null, null);
-    }
-    
-    /** 
-     * Construct an RPC server.
-     * @param protocolImpl the instance whose methods will be called
-     * @param conf the configuration to use
-     * @param bindAddress the address to bind on to listen for connection
-     * @param port the port to listen for connections on
-     * @param numHandlers the number of method handler threads to run
-     * @param verbose whether each call should be logged
-     * @param numReaders input numberReaders.
-     * @param queueSizePerHandler input queueSizePerHandler.
-     * @param secretManager input secretManager.
-     * 
-     * @deprecated use Server#Server(Class, Object, 
-     *      Configuration, String, int, int, int, int, boolean, SecretManager)
-     * @throws IOException raised on errors performing I/O.
-     */
-    @Deprecated
-    public Server(Object protocolImpl, Configuration conf, String bindAddress,
-        int port, int numHandlers, int numReaders, int queueSizePerHandler,
-        boolean verbose, SecretManager<? extends TokenIdentifier> 
secretManager) 
-            throws IOException {
-       this(null, protocolImpl,  conf,  bindAddress,   port,
-                   numHandlers,  numReaders,  queueSizePerHandler,  verbose, 
-                   secretManager, null);
-   
-    }
-    
-    /** 
-     * Construct an RPC server.
-     * @param protocolClass - the protocol being registered
-     *     can be null for compatibility with old usage (see below for details)
-     * @param protocolImpl the protocol impl that will be called
-     * @param conf the configuration to use
-     * @param bindAddress the address to bind on to listen for connection
-     * @param port the port to listen for connections on
-     * @param numHandlers the number of method handler threads to run
-     * @param verbose whether each call should be logged
-     * @param secretManager input secretManager.
-     * @param queueSizePerHandler input queueSizePerHandler.
-     * @param portRangeConfig input portRangeConfig.
-     * @param numReaders input numReaders.
-     *
-     * @deprecated use Server#Server(Class, Object,
-     *      Configuration, String, int, int, int, int, boolean, SecretManager)
-     * @throws IOException raised on errors performing I/O.
-     */
-    @Deprecated
-    public Server(Class<?> protocolClass, Object protocolImpl,
-        Configuration conf, String bindAddress,  int port,
-        int numHandlers, int numReaders, int queueSizePerHandler, 
-        boolean verbose, SecretManager<? extends TokenIdentifier> 
secretManager,
-        String portRangeConfig) 
-        throws IOException {
-      this(null, protocolImpl,  conf,  bindAddress,   port,
-          numHandlers,  numReaders,  queueSizePerHandler,  verbose,
-          secretManager, null, null);
-    }
-
-    /**
-     * Construct an RPC server.
-     * @param protocolClass - the protocol being registered
-     *     can be null for compatibility with old usage (see below for details)
-     * @param protocolImpl the protocol impl that will be called
-     * @param conf the configuration to use
-     * @param bindAddress the address to bind on to listen for connection
-     * @param port the port to listen for connections on
-     * @param numHandlers the number of method handler threads to run
-     * @param verbose whether each call should be logged
-     * @param alignmentContext provides server state info on client responses
-     * @param numReaders input numReaders.
-     * @param portRangeConfig input portRangeConfig.
-     * @param queueSizePerHandler input queueSizePerHandler.
-     * @param secretManager input secretManager.
-     * @throws IOException raised on errors performing I/O.
-     */
-    public Server(Class<?> protocolClass, Object protocolImpl,
-        Configuration conf, String bindAddress,  int port,
-        int numHandlers, int numReaders, int queueSizePerHandler,
-        boolean verbose, SecretManager<? extends TokenIdentifier> 
secretManager,
-        String portRangeConfig, AlignmentContext alignmentContext)
-        throws IOException {
-      super(bindAddress, port, null, numHandlers, numReaders,
-          queueSizePerHandler, conf,
-          serverNameFromClass(protocolImpl.getClass()), secretManager,
-          portRangeConfig);
-      setAlignmentContext(alignmentContext);
-      this.verbose = verbose;
-      
-      
-      Class<?>[] protocols;
-      if (protocolClass == null) { // derive protocol from impl
-        /*
-         * In order to remain compatible with the old usage where a single
-         * target protocolImpl is suppled for all protocol interfaces, and
-         * the protocolImpl is derived from the protocolClass(es) 
-         * we register all interfaces extended by the protocolImpl
-         */
-        protocols = RPC.getProtocolInterfaces(protocolImpl.getClass());
-
-      } else {
-        if (!protocolClass.isAssignableFrom(protocolImpl.getClass())) {
-          throw new IOException("protocolClass "+ protocolClass +
-              " is not implemented by protocolImpl which is of class " +
-              protocolImpl.getClass());
-        }
-        // register protocol class and its super interfaces
-        registerProtocolAndImpl(RPC.RpcKind.RPC_WRITABLE, protocolClass, 
protocolImpl);
-        protocols = RPC.getProtocolInterfaces(protocolClass);
-      }
-      for (Class<?> p : protocols) {
-        if (!p.equals(VersionedProtocol.class)) {
-          registerProtocolAndImpl(RPC.RpcKind.RPC_WRITABLE, p, protocolImpl);
-        }
-      }
-
-    }
-
-    private static void log(String value) {
-      if (value!= null && value.length() > 55)
-        value = value.substring(0, 55)+"...";
-      LOG.info(value);
-    }
-
-    @Deprecated
-    static class WritableRpcInvoker implements RpcInvoker {
-
-     @Override
-      public Writable call(org.apache.hadoop.ipc_.RPC.Server server,
-          String protocolName, Writable rpcRequest, long receivedTime)
-          throws IOException, RPC.VersionMismatch {
-
-        Invocation call = (Invocation)rpcRequest;
-        if (server.verbose) log("Call: " + call);
-
-        // Verify writable rpc version
-        if (call.getRpcVersion() != writableRpcVersion) {
-          // Client is using a different version of WritableRpc
-          throw new RpcServerException(
-              "WritableRpc version mismatch, client side version="
-                  + call.getRpcVersion() + ", server side version="
-                  + writableRpcVersion);
-        }
-
-        long clientVersion = call.getProtocolVersion();
-        final String protoName;
-        ProtoClassProtoImpl protocolImpl;
-        if 
(call.declaringClassProtocolName.equals(VersionedProtocol.class.getName())) {
-          // VersionProtocol methods are often used by client to figure out
-          // which version of protocol to use.
-          //
-          // Versioned protocol methods should go the protocolName protocol
-          // rather than the declaring class of the method since the
-          // the declaring class is VersionedProtocol which is not 
-          // registered directly.
-          // Send the call to the highest  protocol version
-          VerProtocolImpl highest = server.getHighestSupportedProtocol(
-              RPC.RpcKind.RPC_WRITABLE, protocolName);
-          if (highest == null) {
-            throw new RpcServerException("Unknown protocol: " + protocolName);
-          }
-          protocolImpl = highest.protocolTarget;
-        } else {
-          protoName = call.declaringClassProtocolName;
-
-          // Find the right impl for the protocol based on client version.
-          ProtoNameVer pv = 
-              new ProtoNameVer(call.declaringClassProtocolName, clientVersion);
-          protocolImpl = 
-              server.getProtocolImplMap(RPC.RpcKind.RPC_WRITABLE).get(pv);
-          if (protocolImpl == null) { // no match for Protocol AND Version
-             VerProtocolImpl highest = 
-                 server.getHighestSupportedProtocol(RPC.RpcKind.RPC_WRITABLE, 
-                     protoName);
-            if (highest == null) {
-              throw new RpcServerException("Unknown protocol: " + protoName);
-            } else { // protocol supported but not the version that client 
wants
-              throw new RPC.VersionMismatch(protoName, clientVersion,
-                highest.version);
-            }
-          }
-        }
-
-        // Invoke the protocol method
-        Exception exception = null;
-        Call currentCall = Server.getCurCall().get();
-        try {
-          Method method =
-              protocolImpl.protocolClass.getMethod(call.getMethodName(),
-              call.getParameterClasses());
-          method.setAccessible(true);
-          server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
-          currentCall.setDetailedMetricsName(call.getMethodName());
-          Object value = 
-              method.invoke(protocolImpl.protocolImpl, call.getParameters());
-          if (server.verbose) log("Return: "+value);
-          return new ObjectWritable(method.getReturnType(), value);
-
-        } catch (InvocationTargetException e) {
-          Throwable target = e.getTargetException();
-          if (target instanceof IOException) {
-            exception = (IOException)target;
-            throw (IOException)target;
-          } else {
-            IOException ioe = new IOException(target.toString());
-            ioe.setStackTrace(target.getStackTrace());
-            exception = ioe;
-            throw ioe;
-          }
-        } catch (Throwable e) {
-          if (!(e instanceof IOException)) {
-            LOG.error("Unexpected throwable object ", e);
-          }
-          IOException ioe = new IOException(e.toString());
-          ioe.setStackTrace(e.getStackTrace());
-          exception = ioe;
-          throw ioe;
-        } finally {
-          if (exception != null) {
-            currentCall.setDetailedMetricsName(
-                exception.getClass().getSimpleName());
-          }
-        }
-      }
-    }
-  }
-
-  @Override
-  public ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy(
-      ConnectionId connId, Configuration conf, SocketFactory factory)
-      throws IOException {
-    throw new UnsupportedOperationException("This proxy is not supported");
-  }
-}
diff --git a/hadoop-hdds/interface-client/src/main/proto/RpcHeader.proto 
b/hadoop-hdds/interface-client/src/main/proto/RpcHeader.proto
index a803ff68f97..72e6f9c924f 100644
--- a/hadoop-hdds/interface-client/src/main/proto/RpcHeader.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/RpcHeader.proto
@@ -47,7 +47,7 @@ package hadoop.common;
  */
 enum RpcKindProto {
   RPC_BUILTIN          = 0;  // Used for built in calls by tests
-  RPC_WRITABLE         = 1;  // Use WritableRpcEngine 
+  RPC_WRITABLE         = 1;  // ignored
   RPC_PROTOCOL_BUFFER  = 2;  // Use ProtobufRpcEngine
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to