This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 5a496eed4fe HDDS-15154. Remove unused RPC code (#10285)
5a496eed4fe is described below
commit 5a496eed4fe4997364d0221737f3794753a90c24
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Tue May 19 12:20:30 2026 +0200
HDDS-15154. Remove unused RPC code (#10285)
---
.../main/java/org/apache/hadoop/ipc_/Client.java | 9 -
.../ipc_/ObserverRetryOnActiveException.java | 31 --
.../java/org/apache/hadoop/ipc_/ProtoUtil.java | 43 ---
.../org/apache/hadoop/ipc_/ProtobufRpcEngine.java | 43 +--
.../java/org/apache/hadoop/ipc_/ProxyCombiner.java | 151 --------
.../src/main/java/org/apache/hadoop/ipc_/RPC.java | 337 ------------------
.../java/org/apache/hadoop/ipc_/RetryCache.java | 391 ---------------------
.../java/org/apache/hadoop/ipc_/RpcEngine.java | 38 --
.../main/java/org/apache/hadoop/ipc_/Server.java | 321 +----------------
.../org/apache/hadoop/ipc_/StandbyException.java | 32 --
.../hadoop/ipc_/UnexpectedServerException.java | 48 ---
.../hadoop/ipc_/metrics/RetryCacheMetrics.java | 92 -----
12 files changed, 8 insertions(+), 1528 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/Client.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/Client.java
index 7a6bb7b4a3d..784bdac57db 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/Client.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/Client.java
@@ -1303,15 +1303,6 @@ public Client(Class<? extends Writable> valueClass,
Configuration conf,
CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT);
}
- /**
- * Construct an IPC client with the default SocketFactory.
- * @param valueClass input valueClass.
- * @param conf input Configuration.
- */
- public Client(Class<? extends Writable> valueClass, Configuration conf) {
- this(valueClass, conf, NetUtils.getDefaultSocketFactory(conf));
- }
-
@Override
public String toString() {
return getClass().getSimpleName() + "-"
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/ObserverRetryOnActiveException.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/ObserverRetryOnActiveException.java
deleted file mode 100644
index b32791bb149..00000000000
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/ObserverRetryOnActiveException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ipc_;
-
-
-/**
- * Thrown by a remote ObserverNode indicating the operation has failed and the
- * client should retry active namenode directly (instead of retry other
- * ObserverNodes).
- */
-public class ObserverRetryOnActiveException extends StandbyException {
- static final long serialVersionUID = 1L;
- public ObserverRetryOnActiveException(String msg) {
- super(msg);
- }
-}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/ProtoUtil.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/ProtoUtil.java
index 2fe400b7217..ce4fc4d8e4b 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/ProtoUtil.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/ProtoUtil.java
@@ -18,9 +18,6 @@
package org.apache.hadoop.ipc_;
-import java.io.DataInput;
-import java.io.IOException;
-
import
org.apache.hadoop.ipc_.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
import
org.apache.hadoop.ipc_.protobuf.IpcConnectionContextProtos.UserInformationProto;
import org.apache.hadoop.ipc_.protobuf.RpcHeaderProtos.*;
@@ -31,47 +28,7 @@
public abstract class ProtoUtil {
- /**
- * Read a variable length integer in the same format that ProtoBufs encodes.
- * @param in the input stream to read from
- * @return the integer
- * @throws IOException if it is malformed or EOF.
- */
- public static int readRawVarint32(DataInput in) throws IOException {
- byte tmp = in.readByte();
- if (tmp >= 0) {
- return tmp;
- }
- int result = tmp & 0x7f;
- if ((tmp = in.readByte()) >= 0) {
- result |= tmp << 7;
- } else {
- result |= (tmp & 0x7f) << 7;
- if ((tmp = in.readByte()) >= 0) {
- result |= tmp << 14;
- } else {
- result |= (tmp & 0x7f) << 14;
- if ((tmp = in.readByte()) >= 0) {
- result |= tmp << 21;
- } else {
- result |= (tmp & 0x7f) << 21;
- result |= (tmp = in.readByte()) << 28;
- if (tmp < 0) {
- // Discard upper 32 bits.
- for (int i = 0; i < 5; i++) {
- if (in.readByte() >= 0) {
- return result;
- }
- }
- throw new IOException("Malformed varint");
- }
- }
- }
- }
- return result;
- }
-
/**
* This method creates the connection context using exactly the same logic
* as the old connection context as was done for writable where
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/ProtobufRpcEngine.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/ProtobufRpcEngine.java
index c2b7c4a01fc..990ee83d0af 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/ProtobufRpcEngine.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/ProtobufRpcEngine.java
@@ -65,32 +65,6 @@ public static AsyncGet<Message, Exception>
getAsyncReturnMessage() {
return ASYNC_RETURN_MESSAGE.get();
}
- @Override
- @SuppressWarnings("unchecked")
- public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
- ConnectionId connId, Configuration conf, SocketFactory factory)
- throws IOException {
- final Invoker invoker = new Invoker(protocol, connId, conf, factory);
- return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
- protocol.getClassLoader(), new Class[] {protocol}, invoker));
- }
-
- public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
- InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
- SocketFactory factory, int rpcTimeout) throws IOException {
- return getProxy(protocol, clientVersion, addr, ticket, conf, factory,
- rpcTimeout, null);
- }
-
- @Override
- public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
- InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
- SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy
- ) throws IOException {
- return getProxy(protocol, clientVersion, addr, ticket, conf, factory,
- rpcTimeout, connectionRetryPolicy, null, null);
- }
-
@Override
@SuppressWarnings("unchecked")
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
@@ -309,22 +283,6 @@ public ConnectionId getConnectionId() {
return remoteId;
}
- protected long getClientProtocolVersion() {
- return clientProtocolVersion;
- }
-
- protected String getProtocolName() {
- return protocolName;
- }
- }
-
- static Client getClient(Configuration conf) {
- return CLIENTS.getClient(conf, SocketFactory.getDefault(),
- RpcWritable.Buffer.class);
- }
-
- public static void clearClientCache() {
- CLIENTS.clearCache();
}
@Override
@@ -556,6 +514,7 @@ static class RpcProtobufRequest extends RpcWritable.Buffer {
private volatile RequestHeaderProto requestHeader;
private Message payload;
+ @SuppressWarnings("unused") // required for Server#procesRpcRequest
public RpcProtobufRequest() {
}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/ProxyCombiner.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/ProxyCombiner.java
deleted file mode 100644
index 7a2410dc00c..00000000000
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/ProxyCombiner.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ipc_;
-
-import com.google.common.base.Joiner;
-import java.io.Closeable;
-import java.io.IOException;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-
-import org.apache.hadoop.io.MultipleIOException;
-import org.apache.hadoop.ipc_.Client.ConnectionId;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * A utility class used to combine two protocol proxies.
- * See {@link #combine(Class, Object...)}.
- */
-public final class ProxyCombiner {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(ProxyCombiner.class);
-
- private ProxyCombiner() { }
-
- /**
- * Combine two or more proxies which together comprise a single proxy
- * interface. This can be used for a protocol interface which {@code extends}
- * multiple other protocol interfaces. The returned proxy will implement
- * all of the methods of the combined proxy interface, delegating calls
- * to which proxy implements that method. If multiple proxies implement the
- * same method, the first in the list will be used for delegation.
- *
- * <p/>This will check that every method on the combined interface is
- * implemented by at least one of the supplied proxy objects.
- *
- * @param combinedProxyInterface The interface of the combined proxy.
- * @param proxies The proxies which should be used as delegates.
- * @param <T> The type of the proxy that will be returned.
- * @return The combined proxy.
- */
- @SuppressWarnings("unchecked")
- public static <T> T combine(Class<T> combinedProxyInterface,
- Object... proxies) {
- methodLoop:
- for (Method m : combinedProxyInterface.getMethods()) {
- for (Object proxy : proxies) {
- try {
- proxy.getClass().getMethod(m.getName(), m.getParameterTypes());
- continue methodLoop; // go to the next method
- } catch (NoSuchMethodException nsme) {
- // Continue to try the next proxy
- }
- }
- throw new IllegalStateException("The proxies specified for "
- + combinedProxyInterface + " do not cover method " + m);
- }
-
- InvocationHandler handler =
- new CombinedProxyInvocationHandler(combinedProxyInterface, proxies);
- return (T) Proxy.newProxyInstance(combinedProxyInterface.getClassLoader(),
- new Class[] {combinedProxyInterface}, handler);
- }
-
- private static final class CombinedProxyInvocationHandler
- implements RpcInvocationHandler {
-
- private final Class<?> proxyInterface;
- private final Object[] proxies;
-
- private CombinedProxyInvocationHandler(Class<?> proxyInterface,
- Object[] proxies) {
- this.proxyInterface = proxyInterface;
- this.proxies = proxies;
- }
-
- @Override
- public Object invoke(Object proxy, Method method, Object[] args)
- throws Throwable {
- Exception lastException = null;
- for (Object underlyingProxy : proxies) {
- try {
- return method.invoke(underlyingProxy, args);
- } catch (IllegalAccessException|IllegalArgumentException e) {
- lastException = e;
- } catch (InvocationTargetException ite) {
- throw ite.getCause();
- }
- }
- // This shouldn't happen since the method coverage was verified in
build()
- LOG.error("BUG: Method {} was unable to be found on any of the "
- + "underlying proxies for {}", method, proxy.getClass());
- throw new IllegalArgumentException("Method " + method + " not supported",
- lastException);
- }
-
- /**
- * Since this is incapable of returning multiple connection IDs, simply
- * return the first one. In most cases, the connection ID should be the
same
- * for all proxies.
- */
- @Override
- public ConnectionId getConnectionId() {
- return RPC.getConnectionIdForProxy(proxies[0]);
- }
-
- @Override
- public String toString() {
- return "CombinedProxy[" + proxyInterface.getSimpleName() + "]["
- + Joiner.on(",").join(proxies) + "]";
- }
-
- @Override
- public void close() throws IOException {
- MultipleIOException.Builder exceptionBuilder =
- new MultipleIOException.Builder();
- for (Object proxy : proxies) {
- if (proxy instanceof Closeable) {
- try {
- ((Closeable) proxy).close();
- } catch (IOException ioe) {
- exceptionBuilder.add(ioe);
- }
- }
- }
- if (!exceptionBuilder.isEmpty()) {
- throw exceptionBuilder.build();
- }
- }
- }
-}
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/RPC.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/RPC.java
index 610bdbac1a3..56d647f5eb7 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/RPC.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/RPC.java
@@ -19,15 +19,11 @@
package org.apache.hadoop.ipc_;
import java.io.IOException;
-import java.io.InterruptedIOException;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
-import java.net.ConnectException;
import java.net.InetSocketAddress;
-import java.net.NoRouteToHostException;
-import java.net.SocketTimeoutException;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Map;
@@ -45,14 +41,12 @@
import org.apache.hadoop.ipc_.Client.ConnectionId;
import
org.apache.hadoop.ipc_.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
import
org.apache.hadoop.ipc_.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
-import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -250,233 +244,6 @@ public RpcErrorCodeProto getRpcErrorCodeProto() {
}
}
- /**
- * Get a proxy connection to a remote server.
- *
- * @param <T> Generics Type T.
- * @param protocol protocol class
- * @param clientVersion client version
- * @param addr remote address
- * @param conf configuration to use
- * @return the proxy
- * @throws IOException if the far end through a RemoteException
- */
- public static <T> T waitForProxy(
- Class<T> protocol,
- long clientVersion,
- InetSocketAddress addr,
- Configuration conf
- ) throws IOException {
- return waitForProtocolProxy(protocol, clientVersion, addr,
conf).getProxy();
- }
-
- /**
- * Get a protocol proxy that contains a proxy connection to a remote server
- * and a set of methods that are supported by the server.
- *
- * @param <T> Generics Type T.
- * @param protocol protocol class
- * @param clientVersion client version
- * @param addr remote address
- * @param conf configuration to use
- * @return the protocol proxy
- * @throws IOException if the far end through a RemoteException
- */
- public static <T> ProtocolProxy<T> waitForProtocolProxy(Class<T> protocol,
- long clientVersion,
- InetSocketAddress addr,
- Configuration conf) throws IOException {
- return waitForProtocolProxy(
- protocol, clientVersion, addr, conf, Long.MAX_VALUE);
- }
-
- /**
- * Get a proxy connection to a remote server.
- *
- * @param <T> Generics Type T.
- * @param protocol protocol class
- * @param clientVersion client version
- * @param addr remote address
- * @param conf configuration to use
- * @param connTimeout time in milliseconds before giving up
- * @return the proxy
- * @throws IOException if the far end through a RemoteException
- */
- public static <T> T waitForProxy(Class<T> protocol, long clientVersion,
- InetSocketAddress addr, Configuration conf,
- long connTimeout) throws IOException {
- return waitForProtocolProxy(protocol, clientVersion, addr,
- conf, connTimeout).getProxy();
- }
-
- /**
- * Get a protocol proxy that contains a proxy connection to a remote server
- * and a set of methods that are supported by the server
- *
- * @param <T> Generics Type T.
- * @param protocol protocol class
- * @param clientVersion client version
- * @param addr remote address
- * @param conf configuration to use
- * @param connTimeout time in milliseconds before giving up
- * @return the protocol proxy
- * @throws IOException if the far end through a RemoteException
- */
- public static <T> ProtocolProxy<T> waitForProtocolProxy(Class<T> protocol,
- long clientVersion,
- InetSocketAddress addr, Configuration conf,
- long connTimeout) throws IOException {
- return waitForProtocolProxy(protocol, clientVersion, addr, conf,
- getRpcTimeout(conf), null, connTimeout);
- }
-
- /**
- * Get a proxy connection to a remote server.
- *
- * @param <T> Generics Type T.
- * @param protocol protocol class
- * @param clientVersion client version
- * @param addr remote address
- * @param conf configuration to use
- * @param rpcTimeout timeout for each RPC
- * @param timeout time in milliseconds before giving up
- * @return the proxy
- * @throws IOException if the far end through a RemoteException
- */
- public static <T> T waitForProxy(Class<T> protocol,
- long clientVersion,
- InetSocketAddress addr, Configuration conf,
- int rpcTimeout,
- long timeout) throws IOException {
- return waitForProtocolProxy(protocol, clientVersion, addr,
- conf, rpcTimeout, null, timeout).getProxy();
- }
-
- /**
- * Get a protocol proxy that contains a proxy connection to a remote server
- * and a set of methods that are supported by the server.
- *
- * @param <T> Generics Type.
- * @param protocol protocol class
- * @param clientVersion client version
- * @param addr remote address
- * @param conf configuration to use
- * @param rpcTimeout timeout for each RPC
- * @param connectionRetryPolicy input connectionRetryPolicy.
- * @param timeout time in milliseconds before giving up
- * @return the proxy
- * @throws IOException if the far end through a RemoteException.
- */
- public static <T> ProtocolProxy<T> waitForProtocolProxy(Class<T> protocol,
- long clientVersion,
- InetSocketAddress addr, Configuration conf,
- int rpcTimeout,
- RetryPolicy connectionRetryPolicy,
- long timeout) throws IOException {
- long startTime = Time.now();
- IOException ioe;
- while (true) {
- try {
- return getProtocolProxy(protocol, clientVersion, addr,
- UserGroupInformation.getCurrentUser(), conf, NetUtils
- .getDefaultSocketFactory(conf), rpcTimeout, connectionRetryPolicy);
- } catch(ConnectException se) { // namenode has not been started
- LOG.info("Server at " + addr + " not available yet, Zzzzz...");
- ioe = se;
- } catch(SocketTimeoutException te) { // namenode is busy
- LOG.info("Problem connecting to server: " + addr);
- ioe = te;
- } catch(NoRouteToHostException nrthe) { // perhaps a VIP is failing over
- LOG.info("No route to host for server: " + addr);
- ioe = nrthe;
- }
- // check if timed out
- if (Time.now()-timeout >= startTime) {
- throw ioe;
- }
-
- if (Thread.currentThread().isInterrupted()) {
- // interrupted during some IO; this may not have been caught
- throw new InterruptedIOException("Interrupted waiting for the proxy");
- }
-
- // wait for retry
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- throw (IOException) new InterruptedIOException(
- "Interrupted waiting for the proxy").initCause(ioe);
- }
- }
- }
-
- /**
- * Construct a client-side proxy object that implements the named protocol,
- * talking to a server at the named address.
- * @param <T> Generics Type T.
- * @param protocol input protocol.
- * @param clientVersion input clientVersion.
- * @param addr input addr.
- * @param conf input Configuration.
- * @param factory input factory.
- * @throws IOException raised on errors performing I/O.
- * @return proxy.
- */
- public static <T> T getProxy(Class<T> protocol,
- long clientVersion,
- InetSocketAddress addr, Configuration conf,
- SocketFactory factory) throws IOException {
- return getProtocolProxy(
- protocol, clientVersion, addr, conf, factory).getProxy();
- }
-
- /**
- * Get a protocol proxy that contains a proxy connection to a remote server
- * and a set of methods that are supported by the server.
- *
- * @param <T> Generics Type T.
- * @param protocol protocol class
- * @param clientVersion client version
- * @param addr remote address
- * @param conf configuration to use
- * @param factory socket factory
- * @return the protocol proxy
- * @throws IOException if the far end through a RemoteException
- */
- public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
- long clientVersion,
- InetSocketAddress addr, Configuration conf,
- SocketFactory factory) throws IOException {
- UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
- return getProtocolProxy(protocol, clientVersion, addr, ugi, conf, factory);
- }
-
- /**
- * Construct a client-side proxy object that implements the named protocol,
- * talking to a server at the named address.
- *
- * @param <T> Generics Type T.
- * @param protocol input protocol.
- * @param clientVersion input clientVersion.
- * @param addr input addr.
- * @param ticket input tocket.
- * @param conf input conf.
- * @param factory input factory.
- * @return the protocol proxy.
- * @throws IOException raised on errors performing I/O.
- *
- */
- public static <T> T getProxy(Class<T> protocol,
- long clientVersion,
- InetSocketAddress addr,
- UserGroupInformation ticket,
- Configuration conf,
- SocketFactory factory) throws IOException {
- return getProtocolProxy(
- protocol, clientVersion, addr, ticket, conf, factory).getProxy();
- }
-
/**
* Get a protocol proxy that contains a proxy connection to a remote server
* and a set of methods that are supported by the server
@@ -501,32 +268,6 @@ public static <T> ProtocolProxy<T>
getProtocolProxy(Class<T> protocol,
factory, getRpcTimeout(conf), null);
}
- /**
- * Construct a client-side proxy that implements the named protocol,
- * talking to a server at the named address.
- *
- * @param <T> Generics Type T.
- * @param protocol protocol
- * @param clientVersion client's version
- * @param addr server address
- * @param ticket security ticket
- * @param conf configuration
- * @param factory socket factory
- * @param rpcTimeout max time for each rpc; 0 means no timeout
- * @return the proxy
- * @throws IOException if any error occurs
- */
- public static <T> T getProxy(Class<T> protocol,
- long clientVersion,
- InetSocketAddress addr,
- UserGroupInformation ticket,
- Configuration conf,
- SocketFactory factory,
- int rpcTimeout) throws IOException {
- return getProtocolProxy(protocol, clientVersion, addr, ticket,
- conf, factory, rpcTimeout, null).getProxy();
- }
-
/**
* Get a protocol proxy that contains a proxy connection to a remote server
* and a set of methods that are supported by the server.
@@ -591,63 +332,6 @@ public static <T> ProtocolProxy<T>
getProtocolProxy(Class<T> protocol,
fallbackToSimpleAuth, null);
}
- /**
- * Get a protocol proxy that contains a proxy connection to a remote server
- * and a set of methods that are supported by the server.
- *
- * @param protocol protocol
- * @param clientVersion client's version
- * @param addr server address
- * @param ticket security ticket
- * @param conf configuration
- * @param factory socket factory
- * @param rpcTimeout max time for each rpc; 0 means no timeout
- * @param connectionRetryPolicy retry policy
- * @param fallbackToSimpleAuth set to true or false during calls to indicate
- * if a secure client falls back to simple auth
- * @param alignmentContext state alignment context
- * @param <T> Generics Type T.
- * @return the proxy
- * @throws IOException if any error occurs
- */
- public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
- long clientVersion,
- InetSocketAddress addr,
- UserGroupInformation ticket,
- Configuration conf,
- SocketFactory factory,
- int rpcTimeout,
- RetryPolicy connectionRetryPolicy,
- AtomicBoolean fallbackToSimpleAuth,
- AlignmentContext alignmentContext)
- throws IOException {
- if (UserGroupInformation.isSecurityEnabled()) {
- SaslRpcServer.init(conf);
- }
- return getProtocolEngine(protocol, conf).getProxy(protocol, clientVersion,
- addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy,
- fallbackToSimpleAuth, alignmentContext);
- }
-
- /**
- * Construct a client-side proxy object with the default SocketFactory.
- *
- * @param <T> Generics Type T.
- * @param protocol input protocol.
- * @param clientVersion input clientVersion.
- * @param addr input addr.
- * @param conf input Configuration.
- * @return a proxy instance
- * @throws IOException if the thread is interrupted.
- */
- public static <T> T getProxy(Class<T> protocol,
- long clientVersion,
- InetSocketAddress addr, Configuration conf)
- throws IOException {
-
- return getProtocolProxy(protocol, clientVersion, addr, conf).getProxy();
- }
-
/**
* @return Returns the server address for a given proxy.
* @param proxy input proxy.
@@ -673,27 +357,6 @@ public static ConnectionId getConnectionIdForProxy(Object
proxy) {
return inv.getConnectionId();
}
- /**
- * Get a protocol proxy that contains a proxy connection to a remote server
- * and a set of methods that are supported by the server
- *
- * @param protocol input protocol.
- * @param clientVersion input clientVersion.
- * @param addr input addr.
- * @param conf input configuration.
- * @param <T> Generics Type T.
- * @return a protocol proxy
- * @throws IOException if the thread is interrupted.
- */
- public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
- long clientVersion,
- InetSocketAddress addr, Configuration conf)
- throws IOException {
-
- return getProtocolProxy(protocol, clientVersion, addr, conf, NetUtils
- .getDefaultSocketFactory(conf));
- }
-
/**
* Stop the proxy. Proxy must either implement {@link Closeable} or must have
* associated {@link RpcInvocationHandler}.
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/RetryCache.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/RetryCache.java
deleted file mode 100644
index 47edb5c26fb..00000000000
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/RetryCache.java
+++ /dev/null
@@ -1,391 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ipc_;
-
-
-import java.util.Arrays;
-import java.util.Objects;
-import java.util.UUID;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.hadoop.ipc_.metrics.RetryCacheMetrics;
-import org.apache.hadoop.util.LightWeightCache;
-import org.apache.hadoop.util.LightWeightGSet;
-import org.apache.hadoop.util.LightWeightGSet.LinkedElement;
-
-import com.google.common.base.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Maintains a cache of non-idempotent requests that have been successfully
- * processed by the RPC server implementation, to handle the retries. A request
- * is uniquely identified by the unique client ID + call ID of the RPC request.
- * On receiving retried request, an entry will be found in the
- * {@link RetryCache} and the previous response is sent back to the request.
- * <p>
- * To look an implementation using this cache, see HDFS FSNamesystem class.
- */
-public class RetryCache {
- public static final Logger LOG = LoggerFactory.getLogger(RetryCache.class);
- private final RetryCacheMetrics retryCacheMetrics;
- private static final int MAX_CAPACITY = 16;
-
- /**
- * CacheEntry is tracked using unique client ID and callId of the RPC
request.
- */
- public static class CacheEntry implements LightWeightCache.Entry {
- /**
- * Processing state of the requests.
- */
- private static byte INPROGRESS = 0;
- private static byte SUCCESS = 1;
- private static byte FAILED = 2;
-
- private byte state = INPROGRESS;
-
- // Store uuid as two long for better memory utilization
- private final long clientIdMsb; // Most signficant bytes
- private final long clientIdLsb; // Least significant bytes
-
- private final int callId;
- private final long expirationTime;
- private LightWeightGSet.LinkedElement next;
-
- CacheEntry(byte[] clientId, int callId, long expirationTime) {
- // ClientId must be a UUID - that is 16 octets.
- Preconditions.checkArgument(clientId.length == ClientId.BYTE_LENGTH,
- "Invalid clientId - length is " + clientId.length
- + " expected length " + ClientId.BYTE_LENGTH);
- // Convert UUID bytes to two longs
- clientIdMsb = ClientId.getMsb(clientId);
- clientIdLsb = ClientId.getLsb(clientId);
- this.callId = callId;
- this.expirationTime = expirationTime;
- }
-
- CacheEntry(byte[] clientId, int callId, long expirationTime,
- boolean success) {
- this(clientId, callId, expirationTime);
- this.state = success ? SUCCESS : FAILED;
- }
-
- private static int hashCode(long value) {
- return (int)(value ^ (value >>> 32));
- }
-
- @Override
- public int hashCode() {
- return (hashCode(clientIdMsb) * 31 + hashCode(clientIdLsb)) * 31 +
callId;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (!(obj instanceof CacheEntry)) {
- return false;
- }
- CacheEntry other = (CacheEntry) obj;
- return callId == other.callId && clientIdMsb == other.clientIdMsb
- && clientIdLsb == other.clientIdLsb;
- }
-
- @Override
- public void setNext(LinkedElement next) {
- this.next = next;
- }
-
- @Override
- public LinkedElement getNext() {
- return next;
- }
-
- synchronized void completed(boolean success) {
- state = success ? SUCCESS : FAILED;
- this.notifyAll();
- }
-
- public synchronized boolean isSuccess() {
- return state == SUCCESS;
- }
-
- @Override
- public void setExpirationTime(long timeNano) {
- // expiration time does not change
- }
-
- @Override
- public long getExpirationTime() {
- return expirationTime;
- }
-
- @Override
- public String toString() {
- return (new UUID(this.clientIdMsb, this.clientIdLsb)).toString() + ":"
- + this.callId + ":" + this.state;
- }
- }
-
- /**
- * CacheEntry with payload that tracks the previous response or parts of
- * previous response to be used for generating response for retried requests.
- */
- public static class CacheEntryWithPayload extends CacheEntry {
- private Object payload;
-
- CacheEntryWithPayload(byte[] clientId, int callId, Object payload,
- long expirationTime) {
- super(clientId, callId, expirationTime);
- this.payload = payload;
- }
-
- CacheEntryWithPayload(byte[] clientId, int callId, Object payload,
- long expirationTime, boolean success) {
- super(clientId, callId, expirationTime, success);
- this.payload = payload;
- }
-
- /** Override equals to avoid findbugs warnings */
- @Override
- public boolean equals(Object obj) {
- return super.equals(obj);
- }
-
- /** Override hashcode to avoid findbugs warnings */
- @Override
- public int hashCode() {
- return super.hashCode();
- }
-
- public Object getPayload() {
- return payload;
- }
- }
-
- private final LightWeightGSet<CacheEntry, CacheEntry> set;
- private final long expirationTime;
- private String cacheName;
-
- private final ReentrantLock lock = new ReentrantLock();
-
- /**
- * Constructor
- * @param cacheName name to identify the cache by
- * @param percentage percentage of total java heap space used by this cache
- * @param expirationTime time for an entry to expire in nanoseconds
- */
- public RetryCache(String cacheName, double percentage, long expirationTime) {
- int capacity = LightWeightGSet.computeCapacity(percentage, cacheName);
- capacity = capacity > MAX_CAPACITY ? capacity : MAX_CAPACITY;
- this.set = new LightWeightCache<CacheEntry, CacheEntry>(capacity, capacity,
- expirationTime, 0);
- this.expirationTime = expirationTime;
- this.cacheName = cacheName;
- this.retryCacheMetrics = RetryCacheMetrics.create(this);
- }
-
- private static boolean skipRetryCache() {
- // Do not track non RPC invocation or RPC requests with
- // invalid callId or clientId in retry cache
- return !Server.isRpcInvocation() || Server.getCallId() < 0
- || Arrays.equals(Server.getClientId(), RpcConstants.DUMMY_CLIENT_ID);
- }
-
- public void lock() {
- this.lock.lock();
- }
-
- public void unlock() {
- this.lock.unlock();
- }
-
- private void incrCacheClearedCounter() {
- retryCacheMetrics.incrCacheCleared();
- }
-
- public LightWeightGSet<CacheEntry, CacheEntry> getCacheSet() {
- return set;
- }
-
- public RetryCacheMetrics getMetricsForTests() {
- return retryCacheMetrics;
- }
-
- /**
- * @return This method returns cache name for metrics.
- */
- public String getCacheName() {
- return cacheName;
- }
-
- /**
- * This method handles the following conditions:
- * <ul>
- * <li>If retry is not to be processed, return null</li>
- * <li>If there is no cache entry, add a new entry {@code newEntry} and
return
- * it.</li>
- * <li>If there is an existing entry, wait for its completion. If the
- * completion state is {@link CacheEntry#FAILED}, the expectation is that the
- * thread that waited for completion, retries the request. the
- * {@link CacheEntry} state is set to {@link CacheEntry#INPROGRESS} again.
- * <li>If the completion state is {@link CacheEntry#SUCCESS}, the entry is
- * returned so that the thread that waits for it can can return previous
- * response.</li>
- * <ul>
- *
- * @return {@link CacheEntry}.
- */
- private CacheEntry waitForCompletion(CacheEntry newEntry) {
- CacheEntry mapEntry = null;
- lock.lock();
- try {
- mapEntry = set.get(newEntry);
- // If an entry in the cache does not exist, add a new one
- if (mapEntry == null) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Adding Rpc request clientId "
- + newEntry.clientIdMsb + newEntry.clientIdLsb + " callId "
- + newEntry.callId + " to retryCache");
- }
- set.put(newEntry);
- retryCacheMetrics.incrCacheUpdated();
- return newEntry;
- } else {
- retryCacheMetrics.incrCacheHit();
- }
- } finally {
- lock.unlock();
- }
- // Entry already exists in cache. Wait for completion and return its state
- Objects.requireNonNull(mapEntry, "Entry from the cache should not be
null");
- // Wait for in progress request to complete
- synchronized (mapEntry) {
- while (mapEntry.state == CacheEntry.INPROGRESS) {
- try {
- mapEntry.wait();
- } catch (InterruptedException ie) {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- }
- }
- // Previous request has failed, the expectation is is that it will be
- // retried again.
- if (mapEntry.state != CacheEntry.SUCCESS) {
- mapEntry.state = CacheEntry.INPROGRESS;
- }
- }
- return mapEntry;
- }
-
- /**
- * Add a new cache entry into the retry cache. The cache entry consists of
- * clientId and callId extracted from editlog.
- *
- * @param clientId input clientId.
- * @param callId input callId.
- */
- public void addCacheEntry(byte[] clientId, int callId) {
- CacheEntry newEntry = new CacheEntry(clientId, callId, System.nanoTime()
- + expirationTime, true);
- lock.lock();
- try {
- set.put(newEntry);
- } finally {
- lock.unlock();
- }
- retryCacheMetrics.incrCacheUpdated();
- }
-
- public void addCacheEntryWithPayload(byte[] clientId, int callId,
- Object payload) {
- // since the entry is loaded from editlog, we can assume it succeeded.
- CacheEntry newEntry = new CacheEntryWithPayload(clientId, callId, payload,
- System.nanoTime() + expirationTime, true);
- lock.lock();
- try {
- set.put(newEntry);
- } finally {
- lock.unlock();
- }
- retryCacheMetrics.incrCacheUpdated();
- }
-
- private static CacheEntry newEntry(long expirationTime) {
- return new CacheEntry(Server.getClientId(), Server.getCallId(),
- System.nanoTime() + expirationTime);
- }
-
- private static CacheEntryWithPayload newEntry(Object payload,
- long expirationTime) {
- return new CacheEntryWithPayload(Server.getClientId(), Server.getCallId(),
- payload, System.nanoTime() + expirationTime);
- }
-
- /**
- * Static method that provides null check for retryCache.
- * @param cache input Cache.
- * @return CacheEntry.
- */
- public static CacheEntry waitForCompletion(RetryCache cache) {
- if (skipRetryCache()) {
- return null;
- }
- return cache != null ? cache
- .waitForCompletion(newEntry(cache.expirationTime)) : null;
- }
-
- /**
- * Static method that provides null check for retryCache.
- * @param cache input cache.
- * @param payload input payload.
- * @return CacheEntryWithPayload.
- */
- public static CacheEntryWithPayload waitForCompletion(RetryCache cache,
- Object payload) {
- if (skipRetryCache()) {
- return null;
- }
- return (CacheEntryWithPayload) (cache != null ? cache
- .waitForCompletion(newEntry(payload, cache.expirationTime)) : null);
- }
-
- public static void setState(CacheEntry e, boolean success) {
- if (e == null) {
- return;
- }
- e.completed(success);
- }
-
- public static void setState(CacheEntryWithPayload e, boolean success,
- Object payload) {
- if (e == null) {
- return;
- }
- e.payload = payload;
- e.completed(success);
- }
-
- public static void clear(RetryCache cache) {
- if (cache != null) {
- cache.set.clear();
- cache.incrCacheClearedCounter();
- }
- }
-}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/RpcEngine.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/RpcEngine.java
index c01f94872a5..741ea1c016b 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/RpcEngine.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/RpcEngine.java
@@ -26,7 +26,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.ipc_.Client.ConnectionId;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
@@ -34,43 +33,6 @@
/** An RPC implementation. */
public interface RpcEngine {
- /**
- * Construct a client-side proxy object.
- *
- * @param <T> Generics Type T.
- * @param protocol input protocol.
- * @param clientVersion input clientVersion.
- * @param addr input addr.
- * @param ticket input ticket.
- * @param conf input Configuration.
- * @param factory input factory.
- * @param rpcTimeout input rpcTimeout.
- * @param connectionRetryPolicy input connectionRetryPolicy.
- * @throws IOException raised on errors performing I/O.
- * @return ProtocolProxy.
- */
- <T> ProtocolProxy<T> getProxy(Class<T> protocol,
- long clientVersion, InetSocketAddress addr,
- UserGroupInformation ticket, Configuration conf,
- SocketFactory factory, int rpcTimeout,
- RetryPolicy connectionRetryPolicy) throws IOException;
-
- /**
- * Construct a client-side proxy object with a ConnectionId.
- *
- * @param <T> Generics Type T.
- * @param protocol input protocol.
- * @param clientVersion input clientVersion.
- * @param connId input ConnectionId.
- * @param conf input Configuration.
- * @param factory input factory.
- * @throws IOException raised on errors performing I/O.
- * @return ProtocolProxy.
- */
- <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
- Client.ConnectionId connId, Configuration conf, SocketFactory factory)
- throws IOException;
-
/**
* Construct a client-side proxy object.
*
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/Server.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/Server.java
index aabe01b1762..cf323bb4b85 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/Server.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/Server.java
@@ -36,7 +36,6 @@
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
-import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.Channels;
@@ -53,7 +52,6 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -99,7 +97,6 @@
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security_.SaslMechanismFactory;
import org.apache.hadoop.security.SaslPropertiesResolver;
-import org.apache.hadoop.security_.SaslRpcClient;
import org.apache.hadoop.security_.SaslRpcServer;
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
import org.apache.hadoop.security.SecurityUtil;
@@ -318,14 +315,6 @@ static Class<?> getProtocolClass(String protocolName,
Configuration conf)
return protocol;
}
- /** @return Returns the server instance called under or null. May be called
under
- * {@link #call(Writable, long)} implementations, and under {@link Writable}
- * methods of paramters and return values. Permits applications to access
- * the server context.*/
- public static Server get() {
- return SERVER.get();
- }
-
/** This is set to Call object before Handler invokes an RPC and reset
* after the call returns.
*/
@@ -348,15 +337,6 @@ public static int getCallId() {
return call != null ? call.callId : RpcConstants.INVALID_CALL_ID;
}
- /**
- * @return The current active RPC call's retry count. -1 indicates the retry
- * cache is not supported in the client side.
- */
- public static int getCallRetryCount() {
- Call call = CurCall.get();
- return call != null ? call.retryCount : RpcConstants.INVALID_RETRY_COUNT;
- }
-
/**
* @return Returns the remote side ip address when invoked inside an RPC
* Returns null incase of an error.
@@ -366,32 +346,6 @@ public static InetAddress getRemoteIp() {
return (call != null ) ? call.getHostInetAddress() : null;
}
- /**
- * Returns the SASL qop for the current call, if the current call is
- * set, and the SASL negotiation is done. Otherwise return null
- * Note this only returns established QOP for auxiliary port, and
- * returns null for primary (non-auxiliary) port.
- *
- * Also note that CurCall is thread local object. So in fact, different
- * handler threads will process different CurCall object.
- *
- * Also, only return for RPC calls, not supported for other protocols.
- * @return the QOP of the current connection.
- */
- public static String getAuxiliaryPortEstablishedQOP() {
- Call call = CurCall.get();
- if (!(call instanceof RpcCall)) {
- return null;
- }
- RpcCall rpcCall = (RpcCall)call;
- if (rpcCall.connection.isOnAuxiliaryPort()) {
- return rpcCall.connection.getEstablishedQOP();
- } else {
- // Not sending back QOP for primary port
- return null;
- }
- }
-
/**
* @return Returns the clientId from the current RPC request.
*/
@@ -417,26 +371,6 @@ public static UserGroupInformation getRemoteUser() {
return (call != null) ? call.getRemoteUser() : null;
}
- public static String getProtocol() {
- Call call = CurCall.get();
- return (call != null) ? call.getProtocol() : null;
- }
-
- /** @return Return true if the invocation was through an RPC.
- */
- public static boolean isRpcInvocation() {
- return CurCall.get() != null;
- }
-
- /**
- * @return Return the priority level assigned by call queue to an RPC
- * Returns 0 in case no priority is assigned.
- */
- public static int getPriorityLevel() {
- Call call = CurCall.get();
- return call != null? call.getPriorityLevel() : 0;
- }
-
private String bindAddress;
private int port; // port we listen on
private int handlerCount; // number of handler threads
@@ -473,10 +407,6 @@ protected ResponseBuffer initialValue() {
// maintains the set of client connections and handles idle timeouts
private ConnectionManager connectionManager;
private Listener listener = null;
- // Auxiliary listeners maintained as in a map, to allow
- // arbitrary number of of auxiliary listeners. A map from
- // the port to the listener binding to it.
- private Map<Integer, Listener> auxiliaryListenerMap;
private Responder responder = null;
private Handler[] handlers = null;
@@ -508,10 +438,6 @@ private void setPurgeIntervalNanos(int purgeInterval) {
tmpPurgeInterval, TimeUnit.MINUTES);
}
- public long getPurgeIntervalNanos() {
- return this.purgeIntervalNanos;
- }
-
/**
* Logs a Slow RPC Request.
*
@@ -593,21 +519,6 @@ void updateDeferredMetrics(String name, long
processingTime) {
rpcDetailedMetrics.addDeferredProcessingTime(name, processingTime);
}
- /**
- * A convenience method to bind to a given address and report
- * better exceptions if the address is not a valid host.
- * @param socket the socket to bind
- * @param address the address to bind to
- * @param backlog the number of connections allowed in the queue
- * @throws BindException if the address can't be bound
- * @throws UnknownHostException if the address isn't a valid host name
- * @throws IOException other random errors from bind
- */
- public static void bind(ServerSocket socket, InetSocketAddress address,
- int backlog) throws IOException {
- bind(socket, address, backlog, null, null);
- }
-
public static void bind(ServerSocket socket, InetSocketAddress address,
int backlog, Configuration conf, String rangeConf) throws IOException {
try {
@@ -640,38 +551,10 @@ public static void bind(ServerSocket socket,
InetSocketAddress address,
}
}
- int getPriorityLevel(Schedulable e) {
- return callQueue.getPriorityLevel(e);
- }
-
- int getPriorityLevel(UserGroupInformation ugi) {
- return callQueue.getPriorityLevel(ugi);
- }
-
void setPriorityLevel(UserGroupInformation ugi, int priority) {
callQueue.setPriorityLevel(ugi, priority);
}
- /**
- * Returns a handle to the rpcMetrics (required in tests)
- * @return rpc metrics
- */
- public RpcMetrics getRpcMetrics() {
- return rpcMetrics;
- }
-
- public RpcDetailedMetrics getRpcDetailedMetrics() {
- return rpcDetailedMetrics;
- }
-
- Iterable<? extends Thread> getHandlers() {
- return Arrays.asList(handlers);
- }
-
- Connection[] getConnections() {
- return connectionManager.toArray();
- }
-
/**
* Refresh the service authorization ACL for the service handled by this
server.
*
@@ -682,25 +565,6 @@ public void refreshServiceAcl(Configuration conf,
PolicyProvider provider) {
serviceAuthorizationManager.refresh(conf, provider);
}
- /**
- * Refresh the service authorization ACL for the service handled by this
server
- * using the specified Configuration.
- *
- * @param conf input Configuration.
- * @param provider input provider.
- */
- public void refreshServiceAclWithLoadedConfiguration(Configuration conf,
- PolicyProvider provider) {
- serviceAuthorizationManager.refreshWithLoadedConfiguration(conf, provider);
- }
- /**
- * Returns a handle to the serviceAuthorizationManager (required in tests)
- * @return instance of ServiceAuthorizationManager for this server
- */
- public ServiceAuthorizationManager getServiceAuthorizationManager() {
- return serviceAuthorizationManager;
- }
-
private String getQueueClassPrefix() {
return CommonConfigurationKeys.IPC_NAMESPACE + "." + port;
}
@@ -844,22 +708,6 @@ public String getHostAddress() {
return (addr != null) ? addr.getHostAddress() : null;
}
- public String getProtocol() {
- return null;
- }
-
- /**
- * Allow a IPC response to be postponed instead of sent immediately
- * after the handler returns from the proxy method. The intended use
- * case is freeing up the handler thread when the response is known,
- * but an expensive pre-condition must be satisfied before it's sent
- * to the client.
- */
- public final void postponeResponse() {
- int count = responseWaitCount.incrementAndGet();
- assert count > 0 : "response has already been sent";
- }
-
public final void sendResponse() throws IOException {
int count = responseWaitCount.decrementAndGet();
assert count >= 0 : "response has already been sent";
@@ -978,11 +826,6 @@ void setResponseFields(Writable returnValue,
this.responseParams = responseParams;
}
- @Override
- public String getProtocol() {
- return "rpc";
- }
-
@Override
public UserGroupInformation getRemoteUser() {
return connection.user;
@@ -1182,7 +1025,6 @@ private class Listener extends Thread {
private int backlogLength = conf.getInt(
CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY,
CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT);
- private boolean isOnAuxiliaryPort;
Listener(int port) throws IOException {
address = new InetSocketAddress(bindAddress, port);
@@ -1209,13 +1051,8 @@ private class Listener extends Thread {
acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
this.setName("IPC Server listener on " + port);
this.setDaemon(true);
- this.isOnAuxiliaryPort = false;
}
- void setIsAuxiliary() {
- this.isOnAuxiliaryPort = true;
- }
-
private class Reader extends Thread {
final private BlockingQueue<Connection> pendingConnections;
private final Selector readSelector;
@@ -1383,7 +1220,7 @@ void doAccept(SelectionKey key) throws
InterruptedException, IOException, OutOf
Reader reader = getReader();
Connection c = connectionManager.register(channel,
- this.listenPort, this.isOnAuxiliaryPort);
+ this.listenPort);
// If the connectionManager can't take it, close the connection.
if (c == null) {
if (channel.isOpen()) {
@@ -1804,17 +1641,14 @@ public class Connection {
IpcConnectionContextProto connectionContext;
String protocolName;
SaslServer saslServer;
- private String establishedQOP;
private AuthMethod authMethod;
private AuthProtocol authProtocol;
private boolean saslContextEstablished;
private ByteBuffer connectionHeaderBuf = null;
private ByteBuffer unwrappedData;
private ByteBuffer unwrappedDataLengthBuffer;
- private int serviceClass;
private boolean shouldClose = false;
private int ingressPort;
- private boolean isOnAuxiliaryPort;
UserGroupInformation user = null;
public UserGroupInformation attemptingUser = null; // user name before auth
@@ -1827,7 +1661,7 @@ public class Connection {
private boolean useWrap = false;
public Connection(SocketChannel channel, long lastContact,
- int ingressPort, boolean isOnAuxiliaryPort) {
+ int ingressPort) {
this.channel = channel;
this.lastContact = lastContact;
this.data = null;
@@ -1840,7 +1674,6 @@ public Connection(SocketChannel channel, long lastContact,
this.socket = channel.socket();
this.addr = socket.getInetAddress();
this.ingressPort = ingressPort;
- this.isOnAuxiliaryPort = isOnAuxiliaryPort;
if (addr == null) {
this.hostAddress = "*Unknown*";
} else {
@@ -1876,22 +1709,10 @@ public String getHostAddress() {
return hostAddress;
}
- public int getIngressPort() {
- return ingressPort;
- }
-
public InetAddress getHostInetAddress() {
return addr;
}
- public String getEstablishedQOP() {
- return establishedQOP;
- }
-
- public boolean isOnAuxiliaryPort() {
- return isOnAuxiliaryPort;
- }
-
public void setLastContact(long lastContact) {
this.lastContact = lastContact;
}
@@ -1961,7 +1782,7 @@ private void saslReadAndProcess(RpcWritable.Buffer
buffer) throws
}
/**
- * Some exceptions ({@link RetriableException} and {@link
StandbyException})
+ * Some exceptions (e.g. {@link RetriableException})
* that are wrapped as a cause of parameter e are unwrapped so that they
can
* be sent as the true cause to the client side. In case of
* {@link InvalidToken} we go one level deeper to get the true cause.
@@ -1974,8 +1795,6 @@ private Throwable getTrueCause(IOException e) {
while (cause != null) {
if (cause instanceof RetriableException) {
return cause;
- } else if (cause instanceof StandbyException) {
- return cause;
} else if (cause instanceof InvalidToken) {
// FIXME: hadoop method signatures are restricting the SASL
// callbacks to only returning InvalidToken, but some services
@@ -1999,7 +1818,7 @@ private Throwable getTrueCause(IOException e) {
* failure, premature or invalid connection context, or other
state
* errors. This exception needs to be sent to the client. This
* exception will wrap {@link RetriableException},
- * {@link InvalidToken}, {@link StandbyException} or
+ * {@link InvalidToken}, or
* {@link SaslException}.
* @throws IOException if sending reply fails
* @throws InterruptedException
@@ -2070,7 +1889,6 @@ private void saslProcess(RpcSaslProto saslMessage)
// do NOT enable wrapping until the last auth response is sent
if (saslContextEstablished) {
String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
- establishedQOP = qop;
// SASL wrapping is only used if the connection has a QOP, and
// the value is not auth. ex. auth-int & auth-priv
useWrap = (qop != null && !"auth".equalsIgnoreCase(qop));
@@ -2267,8 +2085,6 @@ public int readAndProcess() throws IOException,
InterruptedException {
return count;
}
int version = connectionHeaderBuf.get(0);
- // TODO we should add handler for service class later
- this.setServiceClass(connectionHeaderBuf.get(1));
dataLengthBuffer.flip();
// Check if it looks like the user is hitting an IPC port
@@ -2838,22 +2654,6 @@ private void sendResponse(RpcCall call) throws
IOException {
responder.doRespond(call);
}
- /**
- * Get service class for connection
- * @return the serviceClass
- */
- public int getServiceClass() {
- return serviceClass;
- }
-
- /**
- * Set service class for connection
- * @param serviceClass the serviceClass to set
- */
- public void setServiceClass(int serviceClass) {
- this.serviceClass = serviceClass;
- }
-
private synchronized void close() {
disposeSasl();
data = null;
@@ -2869,15 +2669,6 @@ private synchronized void close() {
}
}
- public void queueCall(Call call) throws IOException, InterruptedException {
- // external non-rpc calls don't need server exception wrapper.
- try {
- internalQueueCall(call);
- } catch (RpcServerException rse) {
- throw (IOException)rse.getCause();
- }
- }
-
private void internalQueueCall(Call call)
throws IOException, InterruptedException {
internalQueueCall(call, true);
@@ -3014,25 +2805,7 @@ void logException(Logger logger, Throwable e, Call call)
{
}
}
- protected Server(String bindAddress, int port,
- Class<? extends Writable> paramClass, int handlerCount,
- Configuration conf)
- throws IOException
- {
- this(bindAddress, port, paramClass, handlerCount, -1, -1, conf, Integer
- .toString(port), null, null);
- }
-
- protected Server(String bindAddress, int port,
- Class<? extends Writable> rpcRequestClass, int handlerCount,
- int numReaders, int queueSizePerHandler, Configuration conf,
- String serverName, SecretManager<? extends TokenIdentifier>
secretManager)
- throws IOException {
- this(bindAddress, port, rpcRequestClass, handlerCount, numReaders,
- queueSizePerHandler, conf, serverName, secretManager, null);
- }
-
- /**
+ /**
* Constructs a server listening on the named port and address. Parameters
passed must
* be of the named class. The <code>handlerCount</code> determines
* the number of handler threads that will be used to process calls.
@@ -3072,7 +2845,6 @@ protected Server(String bindAddress, int port,
this.handlerCount = handlerCount;
this.socketSendBufferSize = 0;
this.serverName = serverName;
- this.auxiliaryListenerMap = null;
this.maxDataLength =
conf.getInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH,
CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
if (queueSizePerHandler != -1) {
@@ -3137,8 +2909,6 @@ protected Server(String bindAddress, int port,
SaslRpcServer.init(conf);
saslPropsResolver = SaslPropertiesResolver.getInstance(conf);
}
-
- this.exceptionsHandler.addTerseLoggingExceptions(StandbyException.class);
}
private synchronized void doKerberosRelogin() throws IOException {
@@ -3161,24 +2931,6 @@ private synchronized void doKerberosRelogin() throws
IOException {
}
}
- public synchronized void addAuxiliaryListener(int auxiliaryPort)
- throws IOException {
- if (auxiliaryListenerMap == null) {
- auxiliaryListenerMap = new HashMap<>();
- }
- if (auxiliaryListenerMap.containsKey(auxiliaryPort) && auxiliaryPort != 0)
{
- throw new IOException(
- "There is already a listener binding to: " + auxiliaryPort);
- }
- Listener newListener = new Listener(auxiliaryPort);
- newListener.setIsAuxiliary();
-
- // in the case of port = 0, the listener would be on a != 0 port.
- LOG.info("Adding a server listener on port " +
- newListener.getAddress().getPort());
- auxiliaryListenerMap.put(newListener.getAddress().getPort(), newListener);
- }
-
private RpcSaslProto buildNegotiateResponse(List<AuthMethod> authMethods)
throws IOException {
RpcSaslProto.Builder negotiateBuilder = RpcSaslProto.newBuilder();
@@ -3403,22 +3155,11 @@ private void wrapWithSasl(RpcCall call) throws
IOException {
Configuration getConf() {
return conf;
}
-
- /**
- * Sets the socket buffer size used for responding to RPCs.
- * @param size input size.
- */
- public void setSocketSendBufSize(int size) { this.socketSendBufferSize =
size; }
/** Starts the service. Must be called before any calls will be handled. */
public synchronized void start() {
responder.start();
listener.start();
- if (auxiliaryListenerMap != null && auxiliaryListenerMap.size() > 0) {
- for (Listener newListener : auxiliaryListenerMap.values()) {
- newListener.start();
- }
- }
handlers = new Handler[handlerCount];
@@ -3441,12 +3182,6 @@ public synchronized void stop() {
}
listener.interrupt();
listener.doStop();
- if (auxiliaryListenerMap != null && auxiliaryListenerMap.size() > 0) {
- for (Listener newListener : auxiliaryListenerMap.values()) {
- newListener.interrupt();
- newListener.doStop();
- }
- }
responder.interrupt();
notifyAll();
this.rpcMetrics.shutdown();
@@ -3474,23 +3209,6 @@ public synchronized InetSocketAddress
getListenerAddress() {
}
/**
- * Return the set of all the configured auxiliary socket addresses NameNode
- * RPC is listening on. If there are none, or it is not configured at all, an
- * empty set is returned.
- * @return the set of all the auxiliary addresses on which the
- * RPC server is listening on.
- */
- public synchronized Set<InetSocketAddress> getAuxiliaryListenerAddresses() {
- Set<InetSocketAddress> allAddrs = new HashSet<>();
- if (auxiliaryListenerMap != null && auxiliaryListenerMap.size() > 0) {
- for (Listener auxListener : auxiliaryListenerMap.values()) {
- allAddrs.add(auxListener.getAddress());
- }
- }
- return allAddrs;
- }
-
- /**
* Called for each call.
* @deprecated Use {@link #call(RPC.RpcKind, String,
* Writable, long)} instead
@@ -3590,30 +3308,6 @@ public int getCallQueueLen() {
return callQueue.size();
}
- public boolean isClientBackoffEnabled() {
- return callQueue.isClientBackoffEnabled();
- }
-
- public void setClientBackoffEnabled(boolean value) {
- callQueue.setClientBackoffEnabled(value);
- }
-
- /**
- * The maximum size of the rpc call queue of this server.
- * @return The maximum size of the rpc call queue.
- */
- public int getMaxQueueSize() {
- return maxQueueSize;
- }
-
- /**
- * The number of reader threads for this server.
- * @return The number of reader threads.
- */
- public int getNumReaders() {
- return readThreads;
- }
-
/**
* When the read or write buffer size is larger than this limit, i/o will be
* done in chunks of this size. Most RPC requests and responses would be
@@ -3805,13 +3499,12 @@ Connection[] toArray() {
return connections.toArray(new Connection[0]);
}
- Connection register(SocketChannel channel, int ingressPort,
- boolean isOnAuxiliaryPort) {
+ Connection register(SocketChannel channel, int ingressPort) {
if (isFull()) {
return null;
}
Connection connection = new Connection(channel, Time.now(),
- ingressPort, isOnAuxiliaryPort);
+ ingressPort);
add(connection);
if (LOG.isDebugEnabled()) {
LOG.debug("Server connection from " + connection +
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/StandbyException.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/StandbyException.java
deleted file mode 100644
index 351e90ca18a..00000000000
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/StandbyException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ipc_;
-
-import java.io.IOException;
-
-
-/**
- * Thrown by a remote server when it is up, but is not the active server in a
- * set of servers in which only a subset may be active.
- */
-public class StandbyException extends IOException {
- static final long serialVersionUID = 0x12308AD010L;
- public StandbyException(String msg) {
- super(msg);
- }
-}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/UnexpectedServerException.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/UnexpectedServerException.java
deleted file mode 100644
index 5ac3a980910..00000000000
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/UnexpectedServerException.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ipc_;
-
-/**
- * Indicates that the RPC server encountered an undeclared exception from the
- * service
- */
-public class UnexpectedServerException extends RpcException {
- private static final long serialVersionUID = 1L;
-
- /**
- * Constructs exception with the specified detail message.
- *
- * @param messages detailed message.
- */
- UnexpectedServerException(final String message) {
- super(message);
- }
-
- /**
- * Constructs exception with the specified detail message and cause.
- *
- * @param message message.
- * @param cause that cause this exception
- * @param cause the cause (can be retried by the {@link #getCause()} method).
- * (A <tt>null</tt> value is permitted, and indicates that the cause
- * is nonexistent or unknown.)
- */
- UnexpectedServerException(final String message, final Throwable cause) {
- super(message, cause);
- }
-}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/metrics/RetryCacheMetrics.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/metrics/RetryCacheMetrics.java
deleted file mode 100644
index 321a41cbe2c..00000000000
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/metrics/RetryCacheMetrics.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ipc_.metrics;
-
-import org.apache.hadoop.ipc_.RetryCache;
-import org.apache.hadoop.metrics2.annotation.Metric;
-import org.apache.hadoop.metrics2.annotation.Metrics;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.hadoop.metrics2.lib.MetricsRegistry;
-import org.apache.hadoop.metrics2.lib.MutableCounterLong;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class is for maintaining the various RetryCache-related statistics
- * and publishing them through the metrics interfaces.
- */
-@Metrics(about="Aggregate RetryCache metrics", context="rpc")
-public class RetryCacheMetrics {
-
- static final Logger LOG = LoggerFactory.getLogger(RetryCacheMetrics.class);
- final MetricsRegistry registry;
- final String name;
-
- RetryCacheMetrics(RetryCache retryCache) {
- name = "RetryCache."+ retryCache.getCacheName();
- registry = new MetricsRegistry(name);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Initialized "+ registry);
- }
- }
-
- public String getName() { return name; }
-
- public static RetryCacheMetrics create(RetryCache cache) {
- RetryCacheMetrics m = new RetryCacheMetrics(cache);
- return DefaultMetricsSystem.instance().register(m.name, null, m);
- }
-
- @Metric("Number of RetryCache hit") MutableCounterLong cacheHit;
- @Metric("Number of RetryCache cleared") MutableCounterLong cacheCleared;
- @Metric("Number of RetryCache updated") MutableCounterLong cacheUpdated;
-
- /**
- * One cache hit event
- */
- public void incrCacheHit() {
- cacheHit.incr();
- }
-
- /**
- * One cache cleared
- */
- public void incrCacheCleared() {
- cacheCleared.incr();
- }
-
- /**
- * One cache updated
- */
- public void incrCacheUpdated() {
- cacheUpdated.incr();
- }
-
- public long getCacheHit() {
- return cacheHit.value();
- }
-
- public long getCacheCleared() {
- return cacheCleared.value();
- }
-
- public long getCacheUpdated() {
- return cacheUpdated.value();
- }
-
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]