http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index 3740b7f..634c101 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -18,62 +18,108 @@ package org.apache.hadoop.hbase.ipc; +import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE; +import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException; + import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.protobuf.BlockingRpcChannel; import com.google.protobuf.Descriptors; import com.google.protobuf.Message; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcChannel; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; +import io.netty.util.HashedWheelTimer; + import java.io.IOException; -import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.net.SocketTimeoutException; import java.net.UnknownHostException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.KeyValueCodec; -import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; +import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.PoolMap; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.security.token.TokenSelector; /** * Provides the basics for a RpcClient implementation like configuration and Logging. + * <p> + * Locking schema of the current IPC implementation + * <ul> + * <li>There is a lock in {@link AbstractRpcClient} to protect the fetching or creating + * connection.</li> + * <li>There is a lock in {@link Call} to make sure that we can only finish the call once.</li> + * <li>The same for {@link HBaseRpcController} as {@link Call}. And see the comment of + * {@link HBaseRpcController#notifyOnCancel(RpcCallback, HBaseRpcController.CancellationCallback)} + * of how to deal with cancel.</li> + * <li>For connection implementation, the construction of a connection should be as fast as possible + * because the creation is protected under a lock. Connect to remote side when needed. There is no + * forced locking schema for a connection implementation.</li> + * <li>For the locking order, the {@link Call} and {@link HBaseRpcController}'s lock should be held + * at last. So the callbacks in {@link Call} and {@link HBaseRpcController} should be execute + * outside the lock in {@link Call} and {@link HBaseRpcController} which means the implementations + * of the callbacks are free to hold any lock.</li> + * </ul> */ @InterfaceAudience.Private -public abstract class AbstractRpcClient implements RpcClient { +public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcClient { // Log level is being changed in tests public static final Log LOG = LogFactory.getLog(AbstractRpcClient.class); + protected static final HashedWheelTimer WHEEL_TIMER = new HashedWheelTimer( + Threads.newDaemonThreadFactory("RpcClient-timer"), 10, TimeUnit.MILLISECONDS); + + private static final ScheduledExecutorService IDLE_CONN_SWEEPER = Executors + .newScheduledThreadPool(1, Threads.newDaemonThreadFactory("Idle-Rpc-Conn-Sweeper")); + + protected final static Map<Kind, TokenSelector<? extends TokenIdentifier>> TOKEN_HANDLERS = new HashMap<>(); + + static { + TOKEN_HANDLERS.put(Kind.HBASE_AUTH_TOKEN, new AuthenticationTokenSelector()); + } + + protected boolean running = true; // if client runs + protected final Configuration conf; - protected String clusterId; + protected final String clusterId; protected final SocketAddress localAddr; protected final MetricsConnection metrics; - protected UserProvider userProvider; - protected final IPCUtil ipcUtil; + protected final UserProvider userProvider; + protected final CellBlockBuilder cellBlockBuilder; protected final int minIdleTimeBeforeClose; // if the connection is idle for more than this // time (in ms), it will be closed at any moment. - protected final int maxRetries; //the max. no. of retries for socket connections + protected final int maxRetries; // the max. no. of retries for socket connections protected final long failureSleep; // Time to sleep before retry on failure. protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm protected final boolean tcpKeepAlive; // if T then use keepalives @@ -81,10 +127,18 @@ public abstract class AbstractRpcClient implements RpcClient { protected final CompressionCodec compressor; protected final boolean fallbackAllowed; + protected final FailedServers failedServers; + protected final int connectTO; protected final int readTO; protected final int writeTO; + protected final PoolMap<ConnectionId, T> connections; + + private final AtomicInteger callIdCnt = new AtomicInteger(0); + + private final ScheduledFuture<?> cleanupIdleConnectionTask; + private int maxConcurrentCallsPerServer; private static final LoadingCache<InetSocketAddress, AtomicInteger> concurrentCounterCache = @@ -97,7 +151,6 @@ public abstract class AbstractRpcClient implements RpcClient { /** * Construct an IPC client for the cluster <code>clusterId</code> - * * @param conf configuration * @param clusterId the cluster id * @param localAddr client socket bind address. @@ -110,17 +163,18 @@ public abstract class AbstractRpcClient implements RpcClient { this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true); this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT; this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, - HConstants.DEFAULT_HBASE_CLIENT_PAUSE); + HConstants.DEFAULT_HBASE_CLIENT_PAUSE); this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0); this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true); - this.ipcUtil = new IPCUtil(conf); + this.cellBlockBuilder = new CellBlockBuilder(conf); this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000); // 2 minutes this.conf = conf; this.codec = getCodec(); this.compressor = getCompressor(conf); this.fallbackAllowed = conf.getBoolean(IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, - IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT); + IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT); + this.failedServers = new FailedServers(conf); this.connectTO = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT); this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ); this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE); @@ -129,25 +183,47 @@ public abstract class AbstractRpcClient implements RpcClient { HConstants.HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD, HConstants.DEFAULT_HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD); - // login the server principal (if using secure Hadoop) + this.connections = new PoolMap<>(getPoolType(conf), getPoolSize(conf)); + + this.cleanupIdleConnectionTask = IDLE_CONN_SWEEPER.scheduleAtFixedRate(new Runnable() { + + @Override + public void run() { + cleanupIdleConnections(); + } + }, minIdleTimeBeforeClose, minIdleTimeBeforeClose, TimeUnit.MILLISECONDS); + if (LOG.isDebugEnabled()) { - LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor + - ", tcpKeepAlive=" + this.tcpKeepAlive + - ", tcpNoDelay=" + this.tcpNoDelay + - ", connectTO=" + this.connectTO + - ", readTO=" + this.readTO + - ", writeTO=" + this.writeTO + - ", minIdleTimeBeforeClose=" + this.minIdleTimeBeforeClose + - ", maxRetries=" + this.maxRetries + - ", fallbackAllowed=" + this.fallbackAllowed + - ", bind address=" + (this.localAddr != null ? this.localAddr : "null")); + LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor + ", tcpKeepAlive=" + + this.tcpKeepAlive + ", tcpNoDelay=" + this.tcpNoDelay + ", connectTO=" + this.connectTO + + ", readTO=" + this.readTO + ", writeTO=" + this.writeTO + ", minIdleTimeBeforeClose=" + + this.minIdleTimeBeforeClose + ", maxRetries=" + this.maxRetries + ", fallbackAllowed=" + + this.fallbackAllowed + ", bind address=" + + (this.localAddr != null ? this.localAddr : "null")); + } + } + + private void cleanupIdleConnections() { + long closeBeforeTime = EnvironmentEdgeManager.currentTime() - minIdleTimeBeforeClose; + synchronized (connections) { + for (T conn : connections.values()) { + // remove connection if it has not been chosen by anyone for more than maxIdleTime, and the + // connection itself has already shutdown. The latter check is because that we may still + // have some pending calls on connection so we should not shutdown the connection outside. + // The connection itself will disconnect if there is no pending call for maxIdleTime. + if (conn.getLastTouched() < closeBeforeTime && !conn.isActive()) { + LOG.info("Cleanup idle connection to " + conn.remoteId().address); + connections.removeValue(conn.remoteId(), conn); + conn.cleanupConnection(); + } + } } } @VisibleForTesting public static String getDefaultCodec(final Configuration c) { // If "hbase.client.default.rpc.codec" is empty string -- you can't set it to null because - // Configuration will complain -- then no default codec (and we'll pb everything). Else + // Configuration will complain -- then no default codec (and we'll pb everything). Else // default is KeyValueCodec return c.get(DEFAULT_CODEC_CLASS, KeyValueCodec.class.getCanonicalName()); } @@ -160,9 +236,11 @@ public abstract class AbstractRpcClient implements RpcClient { // For NO CODEC, "hbase.client.rpc.codec" must be configured with empty string AND // "hbase.client.default.rpc.codec" also -- because default is to do cell block encoding. String className = conf.get(HConstants.RPC_CODEC_CONF_KEY, getDefaultCodec(this.conf)); - if (className == null || className.length() == 0) return null; + if (className == null || className.length() == 0) { + return null; + } try { - return (Codec)Class.forName(className).newInstance(); + return (Codec) Class.forName(className).newInstance(); } catch (Exception e) { throw new RuntimeException("Failed getting codec " + className, e); } @@ -173,6 +251,12 @@ public abstract class AbstractRpcClient implements RpcClient { return this.codec != null; } + // for writing tests that want to throw exception when connecting. + @VisibleForTesting + boolean isTcpNoDelay() { + return tcpNoDelay; + } + /** * Encapsulate the ugly casting and RuntimeException conversion in private method. * @param conf configuration @@ -180,142 +264,289 @@ public abstract class AbstractRpcClient implements RpcClient { */ private static CompressionCodec getCompressor(final Configuration conf) { String className = conf.get("hbase.client.rpc.compressor", null); - if (className == null || className.isEmpty()) return null; + if (className == null || className.isEmpty()) { + return null; + } try { - return (CompressionCodec)Class.forName(className).newInstance(); + return (CompressionCodec) Class.forName(className).newInstance(); } catch (Exception e) { throw new RuntimeException("Failed getting compressor " + className, e); } } /** - * Return the pool type specified in the configuration, which must be set to - * either {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or - * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}, - * otherwise default to the former. - * - * For applications with many user threads, use a small round-robin pool. For - * applications with few user threads, you may want to try using a - * thread-local pool. In any case, the number of {@link org.apache.hadoop.hbase.ipc.RpcClient} - * instances should not exceed the operating system's hard limit on the number of - * connections. - * + * Return the pool type specified in the configuration, which must be set to either + * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or + * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}, otherwise default to the + * former. For applications with many user threads, use a small round-robin pool. For applications + * with few user threads, you may want to try using a thread-local pool. In any case, the number + * of {@link org.apache.hadoop.hbase.ipc.RpcClient} instances should not exceed the operating + * system's hard limit on the number of connections. * @param config configuration * @return either a {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal} */ - protected static PoolMap.PoolType getPoolType(Configuration config) { - return PoolMap.PoolType - .valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE), PoolMap.PoolType.RoundRobin, - PoolMap.PoolType.ThreadLocal); + private static PoolMap.PoolType getPoolType(Configuration config) { + return PoolMap.PoolType.valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE), + PoolMap.PoolType.RoundRobin, PoolMap.PoolType.ThreadLocal); } /** - * Return the pool size specified in the configuration, which is applicable only if - * the pool type is {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin}. - * + * Return the pool size specified in the configuration, which is applicable only if the pool type + * is {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin}. * @param config configuration * @return the maximum pool size */ - protected static int getPoolSize(Configuration config) { + private static int getPoolSize(Configuration config) { return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1); } + private int nextCallId() { + int id, next; + do { + id = callIdCnt.get(); + next = id < Integer.MAX_VALUE ? id + 1 : 0; + } while (!callIdCnt.compareAndSet(id, next)); + return id; + } + /** * Make a blocking call. Throws exceptions if there are network problems or if the remote code * threw an exception. - * * @param ticket Be careful which ticket you pass. A new user will mean a new Connection. - * {@link UserProvider#getCurrent()} makes a new instance of User each time so - * will be a - * new Connection each time. + * {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a + * new Connection each time. * @return A pair with the Message response and the Cell data (if any). */ - Message callBlockingMethod(Descriptors.MethodDescriptor md, PayloadCarryingRpcController pcrc, + private Message callBlockingMethod(Descriptors.MethodDescriptor md, HBaseRpcController hrc, Message param, Message returnType, final User ticket, final InetSocketAddress isa) throws ServiceException { - if (pcrc == null) { - pcrc = new PayloadCarryingRpcController(); + BlockingRpcCallback<Message> done = new BlockingRpcCallback<>(); + callMethod(md, hrc, param, returnType, ticket, isa, done); + Message val; + try { + val = done.get(); + } catch (IOException e) { + throw new ServiceException(e); } + if (hrc.failed()) { + throw new ServiceException(hrc.getFailed()); + } else { + return val; + } + } - Pair<Message, CellScanner> val; - AtomicInteger counter = concurrentCounterCache.getUnchecked(isa); - int count = counter.incrementAndGet(); - try { - if (count > maxConcurrentCallsPerServer) { - throw new ServerTooBusyException(isa, count); + /** + * Get a connection from the pool, or create a new one and add it to the pool. Connections to a + * given host/port are reused. + */ + private T getConnection(ConnectionId remoteId) throws IOException { + if (failedServers.isFailedServer(remoteId.getAddress())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Not trying to connect to " + remoteId.address + + " this server is in the failed servers list"); } - final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); - cs.setStartTime(EnvironmentEdgeManager.currentTime()); - val = call(pcrc, md, param, returnType, ticket, isa, cs); - // Shove the results into controller so can be carried across the proxy/pb service void. - pcrc.setCellScanner(val.getSecond()); - - cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime()); - if (metrics != null) { - metrics.updateRpc(md, param, cs); + throw new FailedServerException( + "This server is in the failed servers list: " + remoteId.address); + } + T conn; + synchronized (connections) { + if (!running) { + throw new StoppedRpcClientException(); } - if (LOG.isTraceEnabled()) { - LOG.trace("Call: " + md.getName() + ", callTime: " + cs.getCallTimeMs() + "ms"); + conn = connections.get(remoteId); + if (conn == null) { + conn = createConnection(remoteId); + connections.put(remoteId, conn); } - return val.getFirst(); - } catch (Throwable e) { - throw new ServiceException(e); - } finally { - counter.decrementAndGet(); + conn.setLastTouched(EnvironmentEdgeManager.currentTime()); } + return conn; } /** - * Make a call, passing <code>param</code>, to the IPC server running at - * <code>address</code> which is servicing the <code>protocol</code> protocol, - * with the <code>ticket</code> credentials, returning the value. - * Throws exceptions if there are network problems or if the remote code - * threw an exception. - * - * @param ticket Be careful which ticket you pass. A new user will mean a new Connection. - * {@link UserProvider#getCurrent()} makes a new instance of User each time so - * will be a - * new Connection each time. - * @return A pair with the Message response and the Cell data (if any). - * @throws InterruptedException - * @throws java.io.IOException + * Not connected. */ - protected abstract Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc, - Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket, - InetSocketAddress isa, MetricsConnection.CallStats callStats) - throws IOException, InterruptedException; + protected abstract T createConnection(ConnectionId remoteId) throws IOException; + + private void onCallFinished(Call call, HBaseRpcController hrc, InetSocketAddress addr, + RpcCallback<Message> callback) { + call.callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.getStartTime()); + if (metrics != null) { + metrics.updateRpc(call.md, call.param, call.callStats); + } + if (LOG.isTraceEnabled()) { + LOG.trace( + "Call: " + call.md.getName() + ", callTime: " + call.callStats.getCallTimeMs() + "ms"); + } + if (call.error != null) { + if (call.error instanceof RemoteException) { + call.error.fillInStackTrace(); + hrc.setFailed(call.error); + } else { + hrc.setFailed(wrapException(addr, call.error)); + } + callback.run(null); + } else { + hrc.setDone(call.cells); + callback.run(call.response); + } + } + private void callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc, + final Message param, Message returnType, final User ticket, final InetSocketAddress addr, + final RpcCallback<Message> callback) { + final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); + cs.setStartTime(EnvironmentEdgeManager.currentTime()); + final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr); + Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType, + hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() { + @Override + public void run(Call call) { + counter.decrementAndGet(); + onCallFinished(call, hrc, addr, callback); + } + }, cs); + ConnectionId remoteId = new ConnectionId(ticket, md.getService().getName(), addr); + int count = counter.incrementAndGet(); + try { + if (count > maxConcurrentCallsPerServer) { + throw new ServerTooBusyException(addr, count); + } + T connection = getConnection(remoteId); + connection.sendRequest(call, hrc); + } catch (Exception e) { + call.setException(toIOE(e)); + } + } + + private InetSocketAddress createAddr(ServerName sn) throws UnknownHostException { + InetSocketAddress addr = new InetSocketAddress(sn.getHostname(), sn.getPort()); + if (addr.isUnresolved()) { + throw new UnknownHostException("can not resolve " + sn.getServerName()); + } + return addr; + } + + /** + * Interrupt the connections to the given ip:port server. This should be called if the server is + * known as actually dead. This will not prevent current operation to be retried, and, depending + * on their own behavior, they may retry on the same server. This can be a feature, for example at + * startup. In any case, they're likely to get connection refused (if the process died) or no + * route to host: i.e. their next retries should be faster and with a safe exception. + */ @Override - public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket, - int defaultOperationTimeout) throws UnknownHostException { - return new BlockingRpcChannelImplementation(this, sn, ticket, defaultOperationTimeout); + public void cancelConnections(ServerName sn) { + synchronized (connections) { + for (T connection : connections.values()) { + ConnectionId remoteId = connection.remoteId(); + if (remoteId.address.getPort() == sn.getPort() && + remoteId.address.getHostName().equals(sn.getHostname())) { + LOG.info("The server on " + sn.toString() + " is dead - stopping the connection " + + connection.remoteId); + connections.removeValue(remoteId, connection); + connection.shutdown(); + } + } + } } /** - * Takes an Exception and the address we were trying to connect to and return an IOException with - * the input exception as the cause. The new exception provides the stack trace of the place where - * the exception is thrown and some extra diagnostics information. If the exception is - * ConnectException or SocketTimeoutException, return a new one of the same type; Otherwise return - * an IOException. - * @param addr target address - * @param exception the relevant exception - * @return an exception to throw + * Configure an hbase rpccontroller + * @param controller to configure + * @param channelOperationTimeout timeout for operation + * @return configured controller */ - protected IOException wrapException(InetSocketAddress addr, Exception exception) { - if (exception instanceof ConnectException) { - // connection refused; include the host:port in the error - return (ConnectException) new ConnectException("Call to " + addr - + " failed on connection exception: " + exception).initCause(exception); - } else if (exception instanceof SocketTimeoutException) { - return (SocketTimeoutException) new SocketTimeoutException("Call to " + addr - + " failed because " + exception).initCause(exception); - } else if (exception instanceof ConnectionClosingException) { - return (ConnectionClosingException) new ConnectionClosingException("Call to " + addr - + " failed on local exception: " + exception).initCause(exception); + static HBaseRpcController configureHBaseRpcController( + RpcController controller, int channelOperationTimeout) { + HBaseRpcController hrc; + if (controller != null && controller instanceof HBaseRpcController) { + hrc = (HBaseRpcController) controller; + if (!hrc.hasCallTimeout()) { + hrc.setCallTimeout(channelOperationTimeout); + } } else { - return (IOException) new IOException("Call to " + addr + " failed on local exception: " - + exception).initCause(exception); + hrc = new HBaseRpcControllerImpl(); + hrc.setCallTimeout(channelOperationTimeout); + } + return hrc; + } + + protected abstract void closeInternal(); + + @Override + public void close() { + if (LOG.isDebugEnabled()) { + LOG.debug("Stopping rpc client"); + } + Collection<T> connToClose; + synchronized (connections) { + if (!running) { + return; + } + running = false; + connToClose = connections.values(); + connections.clear(); + } + cleanupIdleConnectionTask.cancel(true); + for (T conn : connToClose) { + conn.shutdown(); + } + closeInternal(); + for (T conn : connToClose) { + conn.cleanupConnection(); + } + } + + @Override + public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket, + int rpcTimeout) throws UnknownHostException { + return new BlockingRpcChannelImplementation(this, createAddr(sn), ticket, rpcTimeout); + } + + @Override + public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout) + throws UnknownHostException { + return new RpcChannelImplementation(this, createAddr(sn), user, rpcTimeout); + } + + private static class AbstractRpcChannel { + + protected final InetSocketAddress addr; + + protected final AbstractRpcClient<?> rpcClient; + + protected final User ticket; + + protected final int rpcTimeout; + + protected AbstractRpcChannel(AbstractRpcClient<?> rpcClient, InetSocketAddress addr, + User ticket, int rpcTimeout) { + this.addr = addr; + this.rpcClient = rpcClient; + this.ticket = ticket; + this.rpcTimeout = rpcTimeout; + } + + /** + * Configure an rpc controller + * @param controller to configure + * @return configured rpc controller + */ + protected HBaseRpcController configureRpcController(RpcController controller) { + HBaseRpcController hrc; + // TODO: Ideally we should not use an RpcController other than HBaseRpcController at client + // side. And now we may use ServerRpcController. + if (controller != null && controller instanceof HBaseRpcController) { + hrc = (HBaseRpcController) controller; + if (!hrc.hasCallTimeout()) { + hrc.setCallTimeout(rpcTimeout); + } + } else { + hrc = new HBaseRpcControllerImpl(); + hrc.setCallTimeout(rpcTimeout); + } + return hrc; } } @@ -323,42 +554,42 @@ public abstract class AbstractRpcClient implements RpcClient { * Blocking rpc channel that goes via hbase rpc. */ @VisibleForTesting - public static class BlockingRpcChannelImplementation implements BlockingRpcChannel { - private final InetSocketAddress isa; - private final AbstractRpcClient rpcClient; - private final User ticket; - private final int channelOperationTimeout; + public static class BlockingRpcChannelImplementation extends AbstractRpcChannel + implements BlockingRpcChannel { - /** - * @param channelOperationTimeout - the default timeout when no timeout is given - */ - protected BlockingRpcChannelImplementation(final AbstractRpcClient rpcClient, - final ServerName sn, final User ticket, int channelOperationTimeout) - throws UnknownHostException { - this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort()); - if (this.isa.isUnresolved()) { - throw new UnknownHostException(sn.getHostname()); - } - this.rpcClient = rpcClient; - this.ticket = ticket; - this.channelOperationTimeout = channelOperationTimeout; + protected BlockingRpcChannelImplementation(AbstractRpcClient<?> rpcClient, + InetSocketAddress addr, User ticket, int rpcTimeout) { + super(rpcClient, addr, ticket, rpcTimeout); } @Override public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller, Message param, Message returnType) throws ServiceException { - PayloadCarryingRpcController pcrc; - if (controller != null && controller instanceof PayloadCarryingRpcController) { - pcrc = (PayloadCarryingRpcController) controller; - if (!pcrc.hasCallTimeout()) { - pcrc.setCallTimeout(channelOperationTimeout); - } - } else { - pcrc = new PayloadCarryingRpcController(); - pcrc.setCallTimeout(channelOperationTimeout); - } + return rpcClient.callBlockingMethod(md, configureRpcController(controller), + param, returnType, ticket, addr); + } + } + + /** + * Async rpc channel that goes via hbase rpc. + */ + public static class RpcChannelImplementation extends AbstractRpcChannel implements + RpcChannel { - return this.rpcClient.callBlockingMethod(md, pcrc, param, returnType, this.ticket, this.isa); + protected RpcChannelImplementation(AbstractRpcClient<?> rpcClient, InetSocketAddress addr, + User ticket, int rpcTimeout) throws UnknownHostException { + super(rpcClient, addr, ticket, rpcTimeout); + } + + @Override + public void callMethod(Descriptors.MethodDescriptor md, RpcController controller, + Message param, Message returnType, RpcCallback<Message> done) { + // This method does not throw any exceptions, so the caller must provide a + // HBaseRpcController which is used to pass the exceptions. + this.rpcClient.callMethod(md, + configureRpcController(Preconditions.checkNotNull(controller, + "RpcController can not be null for async rpc call")), + param, returnType, ticket, addr, done); } } -} +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java deleted file mode 100644 index a5da0dc..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java +++ /dev/null @@ -1,141 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.ipc; - -import com.google.protobuf.Descriptors; -import com.google.protobuf.Message; -import io.netty.channel.EventLoop; -import io.netty.util.concurrent.DefaultPromise; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.MetricsConnection; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.ExceptionUtil; -import org.apache.hadoop.ipc.RemoteException; - -import java.io.IOException; - -/** - * Represents an Async Hbase call and its response. - * - * Responses are passed on to its given doneHandler and failures to the rpcController - */ [email protected] -public class AsyncCall extends DefaultPromise<Message> { - private static final Log LOG = LogFactory.getLog(AsyncCall.class.getName()); - - final int id; - - final Descriptors.MethodDescriptor method; - final Message param; - final PayloadCarryingRpcController controller; - final Message responseDefaultType; - final long startTime; - final long rpcTimeout; - final MetricsConnection.CallStats callStats; - - /** - * Constructor - * - * @param eventLoop for call - * @param connectId connection id - * @param md the method descriptor - * @param param parameters to send to Server - * @param controller controller for response - * @param responseDefaultType the default response type - */ - public AsyncCall(EventLoop eventLoop, int connectId, Descriptors.MethodDescriptor md, Message - param, PayloadCarryingRpcController controller, Message responseDefaultType, - MetricsConnection.CallStats callStats) { - super(eventLoop); - - this.id = connectId; - - this.method = md; - this.param = param; - this.controller = controller; - this.responseDefaultType = responseDefaultType; - - this.startTime = EnvironmentEdgeManager.currentTime(); - this.rpcTimeout = controller.hasCallTimeout() ? controller.getCallTimeout() : 0; - this.callStats = callStats; - } - - /** - * Get the start time - * - * @return start time for the call - */ - public long getStartTime() { - return this.startTime; - } - - @Override - public String toString() { - return "callId=" + this.id + ", method=" + this.method.getName() + - ", rpcTimeout=" + this.rpcTimeout + ", param {" + - (this.param != null ? ProtobufUtil.getShortTextFormat(this.param) : "") + "}"; - } - - /** - * Set success with a cellBlockScanner - * - * @param value to set - * @param cellBlockScanner to set - */ - public void setSuccess(Message value, CellScanner cellBlockScanner) { - if (cellBlockScanner != null) { - controller.setCellScanner(cellBlockScanner); - } - - if (LOG.isTraceEnabled()) { - long callTime = EnvironmentEdgeManager.currentTime() - startTime; - LOG.trace("Call: " + method.getName() + ", callTime: " + callTime + "ms"); - } - - this.setSuccess(value); - } - - /** - * Set failed - * - * @param exception to set - */ - public void setFailed(IOException exception) { - if (ExceptionUtil.isInterrupt(exception)) { - exception = ExceptionUtil.asInterrupt(exception); - } - if (exception instanceof RemoteException) { - exception = ((RemoteException) exception).unwrapRemoteException(); - } - - this.setFailure(exception); - } - - /** - * Get the rpc timeout - * - * @return current timeout for this call - */ - public long getRpcTimeout() { - return rpcTimeout; - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java deleted file mode 100644 index 878d8b8..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java +++ /dev/null @@ -1,785 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.ipc; - -import java.io.IOException; -import java.net.ConnectException; -import java.net.InetSocketAddress; -import java.net.SocketException; -import java.nio.ByteBuffer; -import java.security.PrivilegedExceptionAction; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.TimeUnit; - -import javax.security.sasl.SaslException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.MetricsConnection; -import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos; -import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind; -import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; -import org.apache.hadoop.hbase.protobuf.generated.TracingProtos; -import org.apache.hadoop.hbase.security.AuthMethod; -import org.apache.hadoop.hbase.security.SaslClientHandler; -import org.apache.hadoop.hbase.security.SaslUtil; -import org.apache.hadoop.hbase.security.SecurityInfo; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.security.token.TokenSelector; -import org.apache.htrace.Span; -import org.apache.htrace.Trace; - -import com.google.protobuf.Descriptors; -import com.google.protobuf.Message; -import com.google.protobuf.RpcCallback; - -import io.netty.bootstrap.Bootstrap; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufOutputStream; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.handler.codec.LengthFieldBasedFrameDecoder; -import io.netty.util.Timeout; -import io.netty.util.TimerTask; -import io.netty.util.concurrent.GenericFutureListener; -import io.netty.util.concurrent.Promise; - -/** - * Netty RPC channel - */ [email protected] -public class AsyncRpcChannel { - private static final Log LOG = LogFactory.getLog(AsyncRpcChannel.class.getName()); - - private static final int MAX_SASL_RETRIES = 5; - - protected final static Map<Kind, TokenSelector<? extends TokenIdentifier>> TOKEN_HANDDLERS - = new HashMap<>(); - - static { - TOKEN_HANDDLERS.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN, - new AuthenticationTokenSelector()); - } - - final AsyncRpcClient client; - - // Contains the channel to work with. - // Only exists when connected - private Channel channel; - - String name; - final User ticket; - final String serviceName; - final InetSocketAddress address; - - private int ioFailureCounter = 0; - private int connectFailureCounter = 0; - - boolean useSasl; - AuthMethod authMethod; - private int reloginMaxBackoff; - private Token<? extends TokenIdentifier> token; - private String serverPrincipal; - - // NOTE: closed and connected flags below are only changed when a lock on pendingCalls - private final Map<Integer, AsyncCall> pendingCalls = new HashMap<Integer, AsyncCall>(); - private boolean connected = false; - private boolean closed = false; - - private Timeout cleanupTimer; - - private final TimerTask timeoutTask = new TimerTask() { - @Override - public void run(Timeout timeout) throws Exception { - cleanupCalls(); - } - }; - - /** - * Constructor for netty RPC channel - * @param bootstrap to construct channel on - * @param client to connect with - * @param ticket of user which uses connection - * @param serviceName name of service to connect to - * @param address to connect to - */ - public AsyncRpcChannel(Bootstrap bootstrap, final AsyncRpcClient client, User ticket, - String serviceName, InetSocketAddress address) { - this.client = client; - - this.ticket = ticket; - this.serviceName = serviceName; - this.address = address; - - this.channel = connect(bootstrap).channel(); - - name = ("IPC Client (" + channel.hashCode() + ") to " + address.toString() - + ((ticket == null) ? " from unknown user" : (" from " + ticket.getName()))); - } - - /** - * Connect to channel - * @param bootstrap to connect to - * @return future of connection - */ - private ChannelFuture connect(final Bootstrap bootstrap) { - return bootstrap.remoteAddress(address).connect() - .addListener(new GenericFutureListener<ChannelFuture>() { - @Override - public void operationComplete(final ChannelFuture f) throws Exception { - if (!f.isSuccess()) { - if (f.cause() instanceof SocketException) { - retryOrClose(bootstrap, connectFailureCounter++, f.cause()); - } else { - retryOrClose(bootstrap, ioFailureCounter++, f.cause()); - } - return; - } - channel = f.channel(); - - setupAuthorization(); - - ByteBuf b = channel.alloc().directBuffer(6); - createPreamble(b, authMethod); - channel.writeAndFlush(b).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); - if (useSasl) { - UserGroupInformation ticket = AsyncRpcChannel.this.ticket.getUGI(); - if (authMethod == AuthMethod.KERBEROS) { - if (ticket != null && ticket.getRealUser() != null) { - ticket = ticket.getRealUser(); - } - } - SaslClientHandler saslHandler; - if (ticket == null) { - throw new FatalConnectionException("ticket/user is null"); - } - final UserGroupInformation realTicket = ticket; - saslHandler = ticket.doAs(new PrivilegedExceptionAction<SaslClientHandler>() { - @Override - public SaslClientHandler run() throws IOException { - return getSaslHandler(realTicket, bootstrap); - } - }); - if (saslHandler != null) { - // Sasl connect is successful. Let's set up Sasl channel handler - channel.pipeline().addFirst(saslHandler); - } else { - // fall back to simple auth because server told us so. - authMethod = AuthMethod.SIMPLE; - useSasl = false; - } - } else { - startHBaseConnection(f.channel()); - } - } - }); - } - - /** - * Start HBase connection - * @param ch channel to start connection on - */ - private void startHBaseConnection(Channel ch) { - ch.pipeline().addLast("frameDecoder", - new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); - ch.pipeline().addLast(new AsyncServerResponseHandler(this)); - try { - writeChannelHeader(ch).addListener(new GenericFutureListener<ChannelFuture>() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - close(future.cause()); - return; - } - List<AsyncCall> callsToWrite; - synchronized (pendingCalls) { - connected = true; - callsToWrite = new ArrayList<AsyncCall>(pendingCalls.values()); - } - for (AsyncCall call : callsToWrite) { - writeRequest(call); - } - } - }); - } catch (IOException e) { - close(e); - } - } - - /** - * Start HBase connection with sasl encryption - * @param ch channel to start connection on - */ - private void startConnectionWithEncryption(Channel ch) { - // for rpc encryption, the order of ChannelInboundHandler should be: - // LengthFieldBasedFrameDecoder->SaslClientHandler->LengthFieldBasedFrameDecoder - // Don't skip the first 4 bytes for length in beforeUnwrapDecoder, - // SaslClientHandler will handler this - ch.pipeline().addFirst("beforeUnwrapDecoder", - new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 0)); - ch.pipeline().addLast("afterUnwrapDecoder", - new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); - ch.pipeline().addLast(new AsyncServerResponseHandler(this)); - List<AsyncCall> callsToWrite; - synchronized (pendingCalls) { - connected = true; - callsToWrite = new ArrayList<AsyncCall>(pendingCalls.values()); - } - for (AsyncCall call : callsToWrite) { - writeRequest(call); - } - } - - /** - * Get SASL handler - * @param bootstrap to reconnect to - * @return new SASL handler - * @throws java.io.IOException if handler failed to create - */ - private SaslClientHandler getSaslHandler(final UserGroupInformation realTicket, - final Bootstrap bootstrap) throws IOException { - return new SaslClientHandler(realTicket, authMethod, token, serverPrincipal, - client.fallbackAllowed, - client.conf.get("hbase.rpc.protection", - SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)), - getChannelHeaderBytes(authMethod), - new SaslClientHandler.SaslExceptionHandler() { - @Override - public void handle(int retryCount, Random random, Throwable cause) { - try { - // Handle Sasl failure. Try to potentially get new credentials - handleSaslConnectionFailure(retryCount, cause, realTicket); - - // Try to reconnect - client.newTimeout(new TimerTask() { - @Override - public void run(Timeout timeout) throws Exception { - connect(bootstrap); - } - }, random.nextInt(reloginMaxBackoff) + 1, TimeUnit.MILLISECONDS); - } catch (IOException | InterruptedException e) { - close(e); - } - } - }, new SaslClientHandler.SaslSuccessfulConnectHandler() { - @Override - public void onSuccess(Channel channel) { - startHBaseConnection(channel); - } - - @Override - public void onSaslProtectionSucess(Channel channel) { - startConnectionWithEncryption(channel); - } - }); - } - - /** - * Retry to connect or close - * - * @param bootstrap to connect with - * @param connectCounter amount of tries - * @param e exception of fail - */ - private void retryOrClose(final Bootstrap bootstrap, int connectCounter, Throwable e) { - if (connectCounter < client.maxRetries) { - client.newTimeout(new TimerTask() { - @Override public void run(Timeout timeout) throws Exception { - connect(bootstrap); - } - }, client.failureSleep, TimeUnit.MILLISECONDS); - } else { - client.failedServers.addToFailedServers(address); - close(e); - } - } - - /** - * Calls method on channel - * @param method to call - * @param controller to run call with - * @param request to send - * @param responsePrototype to construct response with - */ - public Promise<Message> callMethod(final Descriptors.MethodDescriptor method, - final PayloadCarryingRpcController controller, final Message request, - final Message responsePrototype, MetricsConnection.CallStats callStats) { - final AsyncCall call = new AsyncCall(channel.eventLoop(), client.callIdCnt.getAndIncrement(), - method, request, controller, responsePrototype, callStats); - controller.notifyOnCancel(new RpcCallback<Object>() { - @Override - public void run(Object parameter) { - // TODO: do not need to call AsyncCall.setFailed? - synchronized (pendingCalls) { - pendingCalls.remove(call.id); - } - } - }); - // TODO: this should be handled by PayloadCarryingRpcController. - if (controller.isCanceled()) { - // To finish if the call was cancelled before we set the notification (race condition) - call.cancel(true); - return call; - } - - synchronized (pendingCalls) { - if (closed) { - Promise<Message> promise = channel.eventLoop().newPromise(); - promise.setFailure(new ConnectException()); - return promise; - } - pendingCalls.put(call.id, call); - // Add timeout for cleanup if none is present - if (cleanupTimer == null && call.getRpcTimeout() > 0) { - cleanupTimer = client.newTimeout(timeoutTask, call.getRpcTimeout(), TimeUnit.MILLISECONDS); - } - if (!connected) { - return call; - } - } - writeRequest(call); - return call; - } - - AsyncCall removePendingCall(int id) { - synchronized (pendingCalls) { - return pendingCalls.remove(id); - } - } - - /** - * Write the channel header - * @param channel to write to - * @return future of write - * @throws java.io.IOException on failure to write - */ - private ChannelFuture writeChannelHeader(Channel channel) throws IOException { - RPCProtos.ConnectionHeader header = getChannelHeader(authMethod); - int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header); - - ByteBuf b = channel.alloc().directBuffer(totalSize); - - b.writeInt(header.getSerializedSize()); - b.writeBytes(header.toByteArray()); - - return channel.writeAndFlush(b); - } - - private byte[] getChannelHeaderBytes(AuthMethod authMethod) { - RPCProtos.ConnectionHeader header = getChannelHeader(authMethod); - ByteBuffer b = ByteBuffer.allocate(header.getSerializedSize() + 4); - b.putInt(header.getSerializedSize()); - b.put(header.toByteArray()); - return b.array(); - } - - private RPCProtos.ConnectionHeader getChannelHeader(AuthMethod authMethod) { - RPCProtos.ConnectionHeader.Builder headerBuilder = RPCProtos.ConnectionHeader.newBuilder() - .setServiceName(serviceName); - - RPCProtos.UserInformation userInfoPB = buildUserInfo(ticket.getUGI(), authMethod); - if (userInfoPB != null) { - headerBuilder.setUserInfo(userInfoPB); - } - - if (client.codec != null) { - headerBuilder.setCellBlockCodecClass(client.codec.getClass().getCanonicalName()); - } - if (client.compressor != null) { - headerBuilder.setCellBlockCompressorClass(client.compressor.getClass().getCanonicalName()); - } - - headerBuilder.setVersionInfo(ProtobufUtil.getVersionInfo()); - return headerBuilder.build(); - } - - /** - * Write request to channel - * @param call to write - */ - private void writeRequest(final AsyncCall call) { - try { - final RPCProtos.RequestHeader.Builder requestHeaderBuilder = RPCProtos.RequestHeader - .newBuilder(); - requestHeaderBuilder.setCallId(call.id).setMethodName(call.method.getName()) - .setRequestParam(call.param != null); - - if (Trace.isTracing()) { - Span s = Trace.currentSpan(); - requestHeaderBuilder.setTraceInfo(TracingProtos.RPCTInfo.newBuilder() - .setParentId(s.getSpanId()).setTraceId(s.getTraceId())); - } - - ByteBuffer cellBlock = client.buildCellBlock(call.controller.cellScanner()); - if (cellBlock != null) { - final RPCProtos.CellBlockMeta.Builder cellBlockBuilder = RPCProtos.CellBlockMeta - .newBuilder(); - cellBlockBuilder.setLength(cellBlock.limit()); - requestHeaderBuilder.setCellBlockMeta(cellBlockBuilder.build()); - } - // Only pass priority if there one. Let zero be same as no priority. - if (call.controller.getPriority() != PayloadCarryingRpcController.PRIORITY_UNSET) { - requestHeaderBuilder.setPriority(call.controller.getPriority()); - } - requestHeaderBuilder.setTimeout(call.rpcTimeout > Integer.MAX_VALUE ? - Integer.MAX_VALUE : (int)call.rpcTimeout); - RPCProtos.RequestHeader rh = requestHeaderBuilder.build(); - - int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(rh, call.param); - if (cellBlock != null) { - totalSize += cellBlock.remaining(); - } - - ByteBuf b = channel.alloc().directBuffer(4 + totalSize); - try (ByteBufOutputStream out = new ByteBufOutputStream(b)) { - call.callStats.setRequestSizeBytes(IPCUtil.write(out, rh, call.param, cellBlock)); - } - - channel.writeAndFlush(b).addListener(new CallWriteListener(this, call.id)); - } catch (IOException e) { - close(e); - } - } - - /** - * Set up server authorization - * @throws java.io.IOException if auth setup failed - */ - private void setupAuthorization() throws IOException { - SecurityInfo securityInfo = SecurityInfo.getInfo(serviceName); - this.useSasl = client.userProvider.isHBaseSecurityEnabled(); - - this.token = null; - if (useSasl && securityInfo != null) { - AuthenticationProtos.TokenIdentifier.Kind tokenKind = securityInfo.getTokenKind(); - if (tokenKind != null) { - TokenSelector<? extends TokenIdentifier> tokenSelector = TOKEN_HANDDLERS.get(tokenKind); - if (tokenSelector != null) { - token = tokenSelector.selectToken(new Text(client.clusterId), - ticket.getUGI().getTokens()); - } else if (LOG.isDebugEnabled()) { - LOG.debug("No token selector found for type " + tokenKind); - } - } - String serverKey = securityInfo.getServerPrincipal(); - if (serverKey == null) { - throw new IOException("Can't obtain server Kerberos config key from SecurityInfo"); - } - this.serverPrincipal = SecurityUtil.getServerPrincipal(client.conf.get(serverKey), - address.getAddress().getCanonicalHostName().toLowerCase(Locale.ROOT)); - if (LOG.isDebugEnabled()) { - LOG.debug("RPC Server Kerberos principal name for service=" + serviceName + " is " - + serverPrincipal); - } - } - - if (!useSasl) { - authMethod = AuthMethod.SIMPLE; - } else if (token != null) { - authMethod = AuthMethod.DIGEST; - } else { - authMethod = AuthMethod.KERBEROS; - } - - if (LOG.isDebugEnabled()) { - LOG.debug( - "Use " + authMethod + " authentication for service " + serviceName + ", sasl=" + useSasl); - } - reloginMaxBackoff = client.conf.getInt("hbase.security.relogin.maxbackoff", 5000); - } - - /** - * Build the user information - * @param ugi User Group Information - * @param authMethod Authorization method - * @return UserInformation protobuf - */ - private RPCProtos.UserInformation buildUserInfo(UserGroupInformation ugi, AuthMethod authMethod) { - if (ugi == null || authMethod == AuthMethod.DIGEST) { - // Don't send user for token auth - return null; - } - RPCProtos.UserInformation.Builder userInfoPB = RPCProtos.UserInformation.newBuilder(); - if (authMethod == AuthMethod.KERBEROS) { - // Send effective user for Kerberos auth - userInfoPB.setEffectiveUser(ugi.getUserName()); - } else if (authMethod == AuthMethod.SIMPLE) { - // Send both effective user and real user for simple auth - userInfoPB.setEffectiveUser(ugi.getUserName()); - if (ugi.getRealUser() != null) { - userInfoPB.setRealUser(ugi.getRealUser().getUserName()); - } - } - return userInfoPB.build(); - } - - /** - * Create connection preamble - * @param byteBuf to write to - * @param authMethod to write - */ - private void createPreamble(ByteBuf byteBuf, AuthMethod authMethod) { - byteBuf.writeBytes(HConstants.RPC_HEADER); - byteBuf.writeByte(HConstants.RPC_CURRENT_VERSION); - byteBuf.writeByte(authMethod.code); - } - - private void close0(Throwable e) { - List<AsyncCall> toCleanup; - synchronized (pendingCalls) { - if (closed) { - return; - } - closed = true; - toCleanup = new ArrayList<AsyncCall>(pendingCalls.values()); - pendingCalls.clear(); - } - IOException closeException = null; - if (e != null) { - if (e instanceof IOException) { - closeException = (IOException) e; - } else { - closeException = new IOException(e); - } - } - // log the info - if (LOG.isDebugEnabled() && closeException != null) { - LOG.debug(name + ": closing ipc connection to " + address, closeException); - } - if (cleanupTimer != null) { - cleanupTimer.cancel(); - cleanupTimer = null; - } - for (AsyncCall call : toCleanup) { - call.setFailed(closeException != null ? closeException - : new ConnectionClosingException( - "Call id=" + call.id + " on server " + address + " aborted: connection is closing")); - } - channel.disconnect().addListener(ChannelFutureListener.CLOSE); - if (LOG.isDebugEnabled()) { - LOG.debug(name + ": closed"); - } - } - - /** - * Close connection - * @param e exception on close - */ - public void close(final Throwable e) { - client.removeConnection(this); - - // Move closing from the requesting thread to the channel thread - if (channel.eventLoop().inEventLoop()) { - close0(e); - } else { - channel.eventLoop().execute(new Runnable() { - @Override - public void run() { - close0(e); - } - }); - } - } - - /** - * Clean up calls. - */ - private void cleanupCalls() { - List<AsyncCall> toCleanup = new ArrayList<AsyncCall>(); - long currentTime = EnvironmentEdgeManager.currentTime(); - long nextCleanupTaskDelay = -1L; - synchronized (pendingCalls) { - for (Iterator<AsyncCall> iter = pendingCalls.values().iterator(); iter.hasNext();) { - AsyncCall call = iter.next(); - long timeout = call.getRpcTimeout(); - if (timeout > 0) { - if (currentTime - call.getStartTime() >= timeout) { - iter.remove(); - toCleanup.add(call); - } else { - if (nextCleanupTaskDelay < 0 || timeout < nextCleanupTaskDelay) { - nextCleanupTaskDelay = timeout; - } - } - } - } - if (nextCleanupTaskDelay > 0) { - cleanupTimer = client.newTimeout(timeoutTask, nextCleanupTaskDelay, TimeUnit.MILLISECONDS); - } else { - cleanupTimer = null; - } - } - for (AsyncCall call : toCleanup) { - call.setFailed(new CallTimeoutException("Call id=" + call.id + ", waitTime=" - + (currentTime - call.getStartTime()) + ", rpcTimeout=" + call.getRpcTimeout())); - } - } - - /** - * Check if the connection is alive - * @return true if alive - */ - public boolean isAlive() { - return channel.isOpen(); - } - - /** - * Check if user should authenticate over Kerberos - * @return true if should be authenticated over Kerberos - * @throws java.io.IOException on failure of check - */ - private synchronized boolean shouldAuthenticateOverKrb() throws IOException { - UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); - UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); - UserGroupInformation realUser = currentUser.getRealUser(); - return authMethod == AuthMethod.KERBEROS && loginUser != null && - // Make sure user logged in using Kerberos either keytab or TGT - loginUser.hasKerberosCredentials() && - // relogin only in case it is the login user (e.g. JT) - // or superuser (like oozie). - (loginUser.equals(currentUser) || loginUser.equals(realUser)); - } - - /** - * If multiple clients with the same principal try to connect to the same server at the same time, - * the server assumes a replay attack is in progress. This is a feature of kerberos. In order to - * work around this, what is done is that the client backs off randomly and tries to initiate the - * connection again. The other problem is to do with ticket expiry. To handle that, a relogin is - * attempted. - * <p> - * The retry logic is governed by the {@link #shouldAuthenticateOverKrb} method. In case when the - * user doesn't have valid credentials, we don't need to retry (from cache or ticket). In such - * cases, it is prudent to throw a runtime exception when we receive a SaslException from the - * underlying authentication implementation, so there is no retry from other high level (for eg, - * HCM or HBaseAdmin). - * </p> - * @param currRetries retry count - * @param ex exception describing fail - * @param user which is trying to connect - * @throws java.io.IOException if IO fail - * @throws InterruptedException if thread is interrupted - */ - private void handleSaslConnectionFailure(final int currRetries, final Throwable ex, - final UserGroupInformation user) throws IOException, InterruptedException { - user.doAs(new PrivilegedExceptionAction<Void>() { - @Override - public Void run() throws IOException, InterruptedException { - if (shouldAuthenticateOverKrb()) { - if (currRetries < MAX_SASL_RETRIES) { - LOG.debug("Exception encountered while connecting to the server : " + ex); - // try re-login - if (UserGroupInformation.isLoginKeytabBased()) { - UserGroupInformation.getLoginUser().reloginFromKeytab(); - } else { - UserGroupInformation.getLoginUser().reloginFromTicketCache(); - } - - // Should reconnect - return null; - } else { - String msg = "Couldn't setup connection for " - + UserGroupInformation.getLoginUser().getUserName() + " to " + serverPrincipal; - LOG.warn(msg); - throw (IOException) new IOException(msg).initCause(ex); - } - } else { - LOG.warn("Exception encountered while connecting to " + "the server : " + ex); - } - if (ex instanceof RemoteException) { - throw (RemoteException) ex; - } - if (ex instanceof SaslException) { - String msg = "SASL authentication failed." - + " The most likely cause is missing or invalid credentials." + " Consider 'kinit'."; - LOG.fatal(msg, ex); - throw new RuntimeException(msg, ex); - } - throw new IOException(ex); - } - }); - } - - public int getConnectionHashCode() { - return ConnectionId.hashCode(ticket, serviceName, address); - } - - @Override - public int hashCode() { - return getConnectionHashCode(); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof AsyncRpcChannel) { - AsyncRpcChannel channel = (AsyncRpcChannel) obj; - return channel.hashCode() == obj.hashCode(); - } - return false; - } - - @Override - public String toString() { - return this.address.toString() + "/" + this.serviceName + "/" + this.ticket; - } - - /** - * Listens to call writes and fails if write failed - */ - private static final class CallWriteListener implements ChannelFutureListener { - private final AsyncRpcChannel rpcChannel; - private final int id; - - public CallWriteListener(AsyncRpcChannel asyncRpcChannel, int id) { - this.rpcChannel = asyncRpcChannel; - this.id = id; - } - - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - AsyncCall call = rpcChannel.removePendingCall(id); - if (call != null) { - if (future.cause() instanceof IOException) { - call.setFailed((IOException) future.cause()); - } else { - call.setFailed(new IOException(future.cause())); - } - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java deleted file mode 100644 index e12e298..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java +++ /dev/null @@ -1,499 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.ipc; - -import io.netty.bootstrap.Bootstrap; -import io.netty.channel.Channel; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.epoll.EpollEventLoopGroup; -import io.netty.channel.epoll.EpollSocketChannel; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.util.HashedWheelTimer; -import io.netty.util.Timeout; -import io.netty.util.TimerTask; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.GenericFutureListener; -import io.netty.util.concurrent.Promise; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.MetricsConnection; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.JVM; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.util.PoolMap; -import org.apache.hadoop.hbase.util.Threads; - -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.Descriptors; -import com.google.protobuf.Message; -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcChannel; -import com.google.protobuf.RpcController; - -/** - * Netty client for the requests and responses - */ [email protected](HBaseInterfaceAudience.CONFIG) -public class AsyncRpcClient extends AbstractRpcClient { - - private static final Log LOG = LogFactory.getLog(AsyncRpcClient.class); - - public static final String CLIENT_MAX_THREADS = "hbase.rpc.client.threads.max"; - public static final String USE_NATIVE_TRANSPORT = "hbase.rpc.client.nativetransport"; - public static final String USE_GLOBAL_EVENT_LOOP_GROUP = "hbase.rpc.client.globaleventloopgroup"; - - private static final HashedWheelTimer WHEEL_TIMER = - new HashedWheelTimer(Threads.newDaemonThreadFactory("AsyncRpcChannel-timer"), - 100, TimeUnit.MILLISECONDS); - - private static final ChannelInitializer<SocketChannel> DEFAULT_CHANNEL_INITIALIZER = - new ChannelInitializer<SocketChannel>() { - @Override - protected void initChannel(SocketChannel ch) throws Exception { - //empty initializer - } - }; - - protected final AtomicInteger callIdCnt = new AtomicInteger(); - - private final PoolMap<Integer, AsyncRpcChannel> connections; - - final FailedServers failedServers; - - @VisibleForTesting - final Bootstrap bootstrap; - - private final boolean useGlobalEventLoopGroup; - - @VisibleForTesting - static Pair<EventLoopGroup, Class<? extends Channel>> GLOBAL_EVENT_LOOP_GROUP; - - private synchronized static Pair<EventLoopGroup, Class<? extends Channel>> - getGlobalEventLoopGroup(Configuration conf) { - if (GLOBAL_EVENT_LOOP_GROUP == null) { - GLOBAL_EVENT_LOOP_GROUP = createEventLoopGroup(conf); - if (LOG.isDebugEnabled()) { - LOG.debug("Create global event loop group " - + GLOBAL_EVENT_LOOP_GROUP.getFirst().getClass().getSimpleName()); - } - } - return GLOBAL_EVENT_LOOP_GROUP; - } - - private static Pair<EventLoopGroup, Class<? extends Channel>> createEventLoopGroup( - Configuration conf) { - // Max amount of threads to use. 0 lets Netty decide based on amount of cores - int maxThreads = conf.getInt(CLIENT_MAX_THREADS, 0); - - // Config to enable native transport. Does not seem to be stable at time of implementation - // although it is not extensively tested. - boolean epollEnabled = conf.getBoolean(USE_NATIVE_TRANSPORT, false); - - // Use the faster native epoll transport mechanism on linux if enabled - if (epollEnabled && JVM.isLinux() && JVM.isAmd64()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Create EpollEventLoopGroup with maxThreads = " + maxThreads); - } - return new Pair<EventLoopGroup, Class<? extends Channel>>(new EpollEventLoopGroup(maxThreads, - Threads.newDaemonThreadFactory("AsyncRpcChannel")), EpollSocketChannel.class); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Create NioEventLoopGroup with maxThreads = " + maxThreads); - } - return new Pair<EventLoopGroup, Class<? extends Channel>>(new NioEventLoopGroup(maxThreads, - Threads.newDaemonThreadFactory("AsyncRpcChannel")), NioSocketChannel.class); - } - } - - /** - * Constructor for tests - * - * @param configuration to HBase - * @param clusterId for the cluster - * @param localAddress local address to connect to - * @param metrics the connection metrics - * @param channelInitializer for custom channel handlers - */ - protected AsyncRpcClient(Configuration configuration, String clusterId, - SocketAddress localAddress, MetricsConnection metrics, - ChannelInitializer<SocketChannel> channelInitializer) { - super(configuration, clusterId, localAddress, metrics); - - if (LOG.isDebugEnabled()) { - LOG.debug("Starting async Hbase RPC client"); - } - - Pair<EventLoopGroup, Class<? extends Channel>> eventLoopGroupAndChannelClass; - this.useGlobalEventLoopGroup = conf.getBoolean(USE_GLOBAL_EVENT_LOOP_GROUP, true); - if (useGlobalEventLoopGroup) { - eventLoopGroupAndChannelClass = getGlobalEventLoopGroup(configuration); - } else { - eventLoopGroupAndChannelClass = createEventLoopGroup(configuration); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Use " + (useGlobalEventLoopGroup ? "global" : "individual") + " event loop group " - + eventLoopGroupAndChannelClass.getFirst().getClass().getSimpleName()); - } - - this.connections = new PoolMap<>(getPoolType(configuration), getPoolSize(configuration)); - this.failedServers = new FailedServers(configuration); - - int operationTimeout = configuration.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, - HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); - - // Configure the default bootstrap. - this.bootstrap = new Bootstrap(); - bootstrap.group(eventLoopGroupAndChannelClass.getFirst()) - .channel(eventLoopGroupAndChannelClass.getSecond()) - .option(ChannelOption.TCP_NODELAY, tcpNoDelay) - .option(ChannelOption.SO_KEEPALIVE, tcpKeepAlive) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, operationTimeout); - if (channelInitializer == null) { - channelInitializer = DEFAULT_CHANNEL_INITIALIZER; - } - bootstrap.handler(channelInitializer); - if (localAddress != null) { - bootstrap.localAddress(localAddress); - } - } - - /** Used in test only. */ - AsyncRpcClient(Configuration configuration) { - this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null); - } - - /** Used in test only. */ - AsyncRpcClient(Configuration configuration, - ChannelInitializer<SocketChannel> channelInitializer) { - this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null, channelInitializer); - } - - /** - * Constructor - * - * @param configuration to HBase - * @param clusterId for the cluster - * @param localAddress local address to connect to - * @param metrics the connection metrics - */ - public AsyncRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress, - MetricsConnection metrics) { - this(configuration, clusterId, localAddress, metrics, null); - } - - /** - * Make a call, passing <code>param</code>, to the IPC server running at - * <code>address</code> which is servicing the <code>protocol</code> protocol, - * with the <code>ticket</code> credentials, returning the value. - * Throws exceptions if there are network problems or if the remote code - * threw an exception. - * - * @param ticket Be careful which ticket you pass. A new user will mean a new Connection. - * {@link org.apache.hadoop.hbase.security.UserProvider#getCurrent()} makes a new - * instance of User each time so will be a new Connection each time. - * @return A pair with the Message response and the Cell data (if any). - * @throws InterruptedException if call is interrupted - * @throws java.io.IOException if a connection failure is encountered - */ - @Override - protected Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc, - Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket, - InetSocketAddress addr, MetricsConnection.CallStats callStats) - throws IOException, InterruptedException { - if (pcrc == null) { - pcrc = new PayloadCarryingRpcController(); - } - final AsyncRpcChannel connection = createRpcChannel(md.getService().getName(), addr, ticket); - - Promise<Message> promise = connection.callMethod(md, pcrc, param, returnType, callStats); - long timeout = pcrc.hasCallTimeout() ? pcrc.getCallTimeout() : 0; - try { - Message response = timeout > 0 ? promise.get(timeout, TimeUnit.MILLISECONDS) : promise.get(); - return new Pair<>(response, pcrc.cellScanner()); - } catch (ExecutionException e) { - if (e.getCause() instanceof IOException) { - throw (IOException) e.getCause(); - } else { - throw wrapException(addr, (Exception) e.getCause()); - } - } catch (TimeoutException e) { - CallTimeoutException cte = new CallTimeoutException(promise.toString()); - throw wrapException(addr, cte); - } - } - - /** - * Call method async - */ - private void callMethod(final Descriptors.MethodDescriptor md, - final PayloadCarryingRpcController pcrc, final Message param, Message returnType, User ticket, - InetSocketAddress addr, final RpcCallback<Message> done) { - final AsyncRpcChannel connection; - try { - connection = createRpcChannel(md.getService().getName(), addr, ticket); - final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); - GenericFutureListener<Future<Message>> listener = - new GenericFutureListener<Future<Message>>() { - @Override - public void operationComplete(Future<Message> future) throws Exception { - cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime()); - if (metrics != null) { - metrics.updateRpc(md, param, cs); - } - if (LOG.isTraceEnabled()) { - LOG.trace("Call: " + md.getName() + ", callTime: " + cs.getCallTimeMs() + "ms"); - } - if (!future.isSuccess()) { - Throwable cause = future.cause(); - if (cause instanceof IOException) { - pcrc.setFailed((IOException) cause); - } else { - pcrc.setFailed(new IOException(cause)); - } - } else { - try { - done.run(future.get()); - } catch (ExecutionException e) { - Throwable cause = e.getCause(); - if (cause instanceof IOException) { - pcrc.setFailed((IOException) cause); - } else { - pcrc.setFailed(new IOException(cause)); - } - } catch (InterruptedException e) { - pcrc.setFailed(new IOException(e)); - } - } - } - }; - cs.setStartTime(EnvironmentEdgeManager.currentTime()); - connection.callMethod(md, pcrc, param, returnType, cs).addListener(listener); - } catch (StoppedRpcClientException|FailedServerException e) { - pcrc.setFailed(e); - } - } - - private boolean closed = false; - - /** - * Close netty - */ - public void close() { - if (LOG.isDebugEnabled()) { - LOG.debug("Stopping async HBase RPC client"); - } - - synchronized (connections) { - if (closed) { - return; - } - closed = true; - for (AsyncRpcChannel conn : connections.values()) { - conn.close(null); - } - } - // do not close global EventLoopGroup. - if (!useGlobalEventLoopGroup) { - bootstrap.group().shutdownGracefully(); - } - } - - /** - * Create a cell scanner - * - * @param cellBlock to create scanner for - * @return CellScanner - * @throws java.io.IOException on error on creation cell scanner - */ - public CellScanner createCellScanner(byte[] cellBlock) throws IOException { - return ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock); - } - - /** - * Build cell block - * - * @param cells to create block with - * @return ByteBuffer with cells - * @throws java.io.IOException if block creation fails - */ - public ByteBuffer buildCellBlock(CellScanner cells) throws IOException { - return ipcUtil.buildCellBlock(this.codec, this.compressor, cells); - } - - /** - * Creates an RPC client - * - * @param serviceName name of servicce - * @param location to connect to - * @param ticket for current user - * @return new RpcChannel - * @throws StoppedRpcClientException when Rpc client is stopped - * @throws FailedServerException if server failed - */ - private AsyncRpcChannel createRpcChannel(String serviceName, InetSocketAddress location, - User ticket) throws StoppedRpcClientException, FailedServerException { - // Check if server is failed - if (this.failedServers.isFailedServer(location)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Not trying to connect to " + location + - " this server is in the failed servers list"); - } - throw new FailedServerException( - "This server is in the failed servers list: " + location); - } - - int hashCode = ConnectionId.hashCode(ticket,serviceName,location); - - AsyncRpcChannel rpcChannel; - synchronized (connections) { - if (closed) { - throw new StoppedRpcClientException(); - } - rpcChannel = connections.get(hashCode); - if (rpcChannel == null || !rpcChannel.isAlive()) { - rpcChannel = new AsyncRpcChannel(this.bootstrap, this, ticket, serviceName, location); - connections.put(hashCode, rpcChannel); - } - } - - return rpcChannel; - } - - /** - * Interrupt the connections to the given ip:port server. This should be called if the server - * is known as actually dead. This will not prevent current operation to be retried, and, - * depending on their own behavior, they may retry on the same server. This can be a feature, - * for example at startup. In any case, they're likely to get connection refused (if the - * process died) or no route to host: i.e. there next retries should be faster and with a - * safe exception. - * - * @param sn server to cancel connections for - */ - @Override - public void cancelConnections(ServerName sn) { - synchronized (connections) { - for (AsyncRpcChannel rpcChannel : connections.values()) { - if (rpcChannel.isAlive() && - rpcChannel.address.getPort() == sn.getPort() && - rpcChannel.address.getHostName().contentEquals(sn.getHostname())) { - LOG.info("The server on " + sn.toString() + - " is dead - stopping the connection " + rpcChannel.toString()); - rpcChannel.close(null); - } - } - } - } - - /** - * Remove connection from pool - */ - public void removeConnection(AsyncRpcChannel connection) { - int connectionHashCode = connection.hashCode(); - synchronized (connections) { - // we use address as cache key, so we should check here to prevent removing the - // wrong connection - AsyncRpcChannel connectionInPool = this.connections.get(connectionHashCode); - if (connectionInPool != null && connectionInPool.equals(connection)) { - this.connections.remove(connectionHashCode); - } else if (LOG.isDebugEnabled()) { - LOG.debug(String.format("%s already removed, expected instance %08x, actual %08x", - connection.toString(), System.identityHashCode(connection), - System.identityHashCode(connectionInPool))); - } - } - } - - /** - * Creates a "channel" that can be used by a protobuf service. Useful setting up - * protobuf stubs. - * - * @param sn server name describing location of server - * @param user which is to use the connection - * @param rpcTimeout default rpc operation timeout - * - * @return A rpc channel that goes via this rpc client instance. - */ - public RpcChannel createRpcChannel(final ServerName sn, final User user, int rpcTimeout) { - return new RpcChannelImplementation(this, sn, user, rpcTimeout); - } - - /** - * Blocking rpc channel that goes via hbase rpc. - */ - @VisibleForTesting - public static class RpcChannelImplementation implements RpcChannel { - private final InetSocketAddress isa; - private final AsyncRpcClient rpcClient; - private final User ticket; - private final int channelOperationTimeout; - - /** - * @param channelOperationTimeout - the default timeout when no timeout is given - */ - protected RpcChannelImplementation(final AsyncRpcClient rpcClient, - final ServerName sn, final User ticket, int channelOperationTimeout) { - this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort()); - this.rpcClient = rpcClient; - this.ticket = ticket; - this.channelOperationTimeout = channelOperationTimeout; - } - - @Override - public void callMethod(Descriptors.MethodDescriptor md, RpcController controller, - Message param, Message returnType, RpcCallback<Message> done) { - PayloadCarryingRpcController pcrc; - if (controller != null) { - pcrc = (PayloadCarryingRpcController) controller; - if (!pcrc.hasCallTimeout()) { - pcrc.setCallTimeout(channelOperationTimeout); - } - } else { - pcrc = new PayloadCarryingRpcController(); - pcrc.setCallTimeout(channelOperationTimeout); - } - - this.rpcClient.callMethod(md, pcrc, param, returnType, this.ticket, this.isa, done); - } - } - - Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { - return WHEEL_TIMER.newTimeout(task, delay, unit); - } -}
