http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java deleted file mode 100644 index e0c7586..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java +++ /dev/null @@ -1,126 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.ipc; - -import java.io.IOException; - -import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; -import org.apache.hadoop.ipc.RemoteException; - -import com.google.protobuf.Message; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufInputStream; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.SimpleChannelInboundHandler; - -/** - * Handles Hbase responses - */ [email protected] -public class AsyncServerResponseHandler extends SimpleChannelInboundHandler<ByteBuf> { - private final AsyncRpcChannel channel; - - /** - * Constructor - * @param channel on which this response handler operates - */ - public AsyncServerResponseHandler(AsyncRpcChannel channel) { - this.channel = channel; - } - - @Override - protected void channelRead0(ChannelHandlerContext ctx, ByteBuf inBuffer) throws Exception { - ByteBufInputStream in = new ByteBufInputStream(inBuffer); - int totalSize = inBuffer.readableBytes(); - // Read the header - RPCProtos.ResponseHeader responseHeader = RPCProtos.ResponseHeader.parseDelimitedFrom(in); - int id = responseHeader.getCallId(); - AsyncCall call = channel.removePendingCall(id); - if (call == null) { - // So we got a response for which we have no corresponding 'call' here on the client-side. - // We probably timed out waiting, cleaned up all references, and now the server decides - // to return a response. There is nothing we can do w/ the response at this stage. Clean - // out the wire of the response so its out of the way and we can get other responses on - // this connection. - int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader); - int whatIsLeftToRead = totalSize - readSoFar; - - // This is done through a Netty ByteBuf which has different behavior than InputStream. - // It does not return number of bytes read but will update pointer internally and throws an - // exception when too many bytes are to be skipped. - inBuffer.skipBytes(whatIsLeftToRead); - return; - } - - if (responseHeader.hasException()) { - RPCProtos.ExceptionResponse exceptionResponse = responseHeader.getException(); - RemoteException re = createRemoteException(exceptionResponse); - if (exceptionResponse.getExceptionClassName() - .equals(FatalConnectionException.class.getName())) { - channel.close(re); - } else { - call.setFailed(re); - } - } else { - Message value = null; - // Call may be null because it may have timedout and been cleaned up on this side already - if (call.responseDefaultType != null) { - Message.Builder builder = call.responseDefaultType.newBuilderForType(); - ProtobufUtil.mergeDelimitedFrom(builder, in); - value = builder.build(); - } - CellScanner cellBlockScanner = null; - if (responseHeader.hasCellBlockMeta()) { - int size = responseHeader.getCellBlockMeta().getLength(); - byte[] cellBlock = new byte[size]; - inBuffer.readBytes(cellBlock, 0, cellBlock.length); - cellBlockScanner = channel.client.createCellScanner(cellBlock); - } - call.setSuccess(value, cellBlockScanner); - call.callStats.setResponseSizeBytes(totalSize); - } - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - channel.close(cause); - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - channel.close(new IOException("connection closed")); - } - - /** - * @param e Proto exception - * @return RemoteException made from passed <code>e</code> - */ - private RemoteException createRemoteException(final RPCProtos.ExceptionResponse e) { - String innerExceptionClassName = e.getExceptionClassName(); - boolean doNotRetry = e.getDoNotRetry(); - return e.hasHostname() ? - // If a hostname then add it to the RemoteWithExtrasException - new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), e.getHostname(), - e.getPort(), doNotRetry) - : new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), doNotRetry); - } -}
http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java new file mode 100644 index 0000000..d27602e --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import com.google.common.annotations.VisibleForTesting; + +import java.io.IOException; +import java.net.SocketAddress; + +import javax.net.SocketFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.MetricsConnection; +import org.apache.hadoop.net.NetUtils; + +/** + * Does RPC against a cluster. Manages connections per regionserver in the cluster. + * <p> + * See HBaseServer + */ [email protected] +public class BlockingRpcClient extends AbstractRpcClient<BlockingRpcConnection> { + + protected final SocketFactory socketFactory; // how to create sockets + + /** + * Used in test only. Construct an IPC client for the cluster {@code clusterId} with the default + * SocketFactory + */ + @VisibleForTesting + BlockingRpcClient(Configuration conf) { + this(conf, HConstants.CLUSTER_ID_DEFAULT, null, null); + } + + /** + * Construct an IPC client for the cluster {@code clusterId} with the default SocketFactory This + * method is called with reflection by the RpcClientFactory to create an instance + * @param conf configuration + * @param clusterId the cluster id + * @param localAddr client socket bind address. + * @param metrics the connection metrics + */ + public BlockingRpcClient(Configuration conf, String clusterId, SocketAddress localAddr, + MetricsConnection metrics) { + super(conf, clusterId, localAddr, metrics); + this.socketFactory = NetUtils.getDefaultSocketFactory(conf); + } + + /** + * Creates a connection. Can be overridden by a subclass for testing. + * @param remoteId - the ConnectionId to use for the connection creation. + */ + protected BlockingRpcConnection createConnection(ConnectionId remoteId) throws IOException { + return new BlockingRpcConnection(this, remoteId); + } + + @Override + protected void closeInternal() { + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java new file mode 100644 index 0000000..4dc121c --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java @@ -0,0 +1,730 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import static org.apache.hadoop.hbase.ipc.IPCUtil.buildRequestHeader; +import static org.apache.hadoop.hbase.ipc.IPCUtil.createRemoteException; +import static org.apache.hadoop.hbase.ipc.IPCUtil.getTotalSizeWhenWrittenDelimited; +import static org.apache.hadoop.hbase.ipc.IPCUtil.isFatalConnectionException; +import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled; +import static org.apache.hadoop.hbase.ipc.IPCUtil.write; + +import com.google.protobuf.Message; +import com.google.protobuf.Message.Builder; +import com.google.protobuf.RpcCallback; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.io.OutputStream; +import java.net.Socket; +import java.net.SocketTimeoutException; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayDeque; +import java.util.Locale; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadLocalRandom; + +import javax.security.sasl.SaslException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; +import org.apache.hadoop.hbase.io.ByteArrayOutputStream; +import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader; +import org.apache.hadoop.hbase.security.HBaseSaslRpcClient; +import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.ExceptionUtil; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.htrace.Trace; +import org.apache.htrace.TraceScope; + +/** + * Thread that reads responses and notifies callers. Each connection owns a socket connected to a + * remote address. Calls are multiplexed through this socket: responses may be delivered out of + * order. + */ [email protected] +class BlockingRpcConnection extends RpcConnection implements Runnable { + + private static final Log LOG = LogFactory.getLog(BlockingRpcConnection.class); + + private final BlockingRpcClient rpcClient; + + private final String threadName; + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", + justification = "We are always under lock actually") + private Thread thread; + + // connected socket. protected for writing UT. + protected Socket socket = null; + private DataInputStream in; + private DataOutputStream out; + + private HBaseSaslRpcClient saslRpcClient; + + // currently active calls + private final ConcurrentMap<Integer, Call> calls = new ConcurrentHashMap<>(); + + private final CallSender callSender; + + private boolean closed = false; + + private byte[] connectionHeaderPreamble; + + private byte[] connectionHeaderWithLength; + + /** + * If the client wants to interrupt its calls easily (i.e. call Thread#interrupt), it gets into a + * java issue: an interruption during a write closes the socket/channel. A way to avoid this is to + * use a different thread for writing. This way, on interruptions, we either cancel the writes or + * ignore the answer if the write is already done, but we don't stop the write in the middle. This + * adds a thread per region server in the client, so it's kept as an option. + * <p> + * The implementation is simple: the client threads adds their call to the queue, and then wait + * for an answer. The CallSender blocks on the queue, and writes the calls one after the other. On + * interruption, the client cancels its call. The CallSender checks that the call has not been + * canceled before writing it. + * </p> + * When the connection closes, all the calls not yet sent are dismissed. The client thread is + * notified with an appropriate exception, as if the call was already sent but the answer not yet + * received. + * </p> + */ + private class CallSender extends Thread { + + private final Queue<Call> callsToWrite; + + private final int maxQueueSize; + + public CallSender(String name, Configuration conf) { + int queueSize = conf.getInt("hbase.ipc.client.write.queueSize", 1000); + callsToWrite = new ArrayDeque<>(queueSize); + this.maxQueueSize = queueSize; + setDaemon(true); + setName(name + " - writer"); + } + + public void sendCall(final Call call) throws IOException { + if (callsToWrite.size() >= maxQueueSize) { + throw new IOException("Can't add the call " + call.id + + " to the write queue. callsToWrite.size()=" + callsToWrite.size()); + } + callsToWrite.offer(call); + BlockingRpcConnection.this.notifyAll(); + } + + public void remove(Call call) { + callsToWrite.remove(); + // By removing the call from the expected call list, we make the list smaller, but + // it means as well that we don't know how many calls we cancelled. + calls.remove(call.id); + call.setException(new CallCancelledException("Call id=" + call.id + ", waitTime=" + + (EnvironmentEdgeManager.currentTime() - call.getStartTime()) + ", rpcTimeout=" + + call.timeout)); + } + + /** + * Reads the call from the queue, write them on the socket. + */ + @Override + public void run() { + synchronized (BlockingRpcConnection.this) { + while (!closed) { + if (callsToWrite.isEmpty()) { + // We should use another monitor object here for better performance since the read + // thread also uses ConnectionImpl.this. But this makes the locking schema more + // complicated, can do it later as an optimization. + try { + BlockingRpcConnection.this.wait(); + } catch (InterruptedException e) { + } + // check if we need to quit, so continue the main loop instead of fallback. + continue; + } + Call call = callsToWrite.poll(); + if (call.isDone()) { + continue; + } + try { + tracedWriteRequest(call); + } catch (IOException e) { + // exception here means the call has not been added to the pendingCalls yet, so we need + // to fail it by our own. + if (LOG.isDebugEnabled()) { + LOG.debug("call write error for call #" + call.id, e); + } + call.setException(e); + closeConn(e); + } + } + } + } + + /** + * Cleans the call not yet sent when we finish. + */ + public void cleanup(IOException e) { + IOException ie = + new ConnectionClosingException("Connection to " + remoteId.address + " is closing."); + for (Call call : callsToWrite) { + call.setException(ie); + } + callsToWrite.clear(); + } + } + + BlockingRpcConnection(BlockingRpcClient rpcClient, ConnectionId remoteId) throws IOException { + super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId, + rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor); + this.rpcClient = rpcClient; + if (remoteId.getAddress().isUnresolved()) { + throw new UnknownHostException("unknown host: " + remoteId.getAddress().getHostName()); + } + + this.connectionHeaderPreamble = getConnectionHeaderPreamble(); + ConnectionHeader header = getConnectionHeader(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(4 + header.getSerializedSize()); + DataOutputStream dos = new DataOutputStream(baos); + dos.writeInt(header.getSerializedSize()); + header.writeTo(dos); + assert baos.size() == 4 + header.getSerializedSize(); + this.connectionHeaderWithLength = baos.getBuffer(); + + UserGroupInformation ticket = remoteId.ticket.getUGI(); + this.threadName = "IPC Client (" + this.rpcClient.socketFactory.hashCode() + + ") connection to " + remoteId.getAddress().toString() + + ((ticket == null) ? " from an unknown user" : (" from " + ticket.getUserName())); + + if (this.rpcClient.conf.getBoolean(BlockingRpcClient.SPECIFIC_WRITE_THREAD, false)) { + callSender = new CallSender(threadName, this.rpcClient.conf); + callSender.start(); + } else { + callSender = null; + } + } + + // protected for write UT. + protected void setupConnection() throws IOException { + short ioFailures = 0; + short timeoutFailures = 0; + while (true) { + try { + this.socket = this.rpcClient.socketFactory.createSocket(); + this.socket.setTcpNoDelay(this.rpcClient.isTcpNoDelay()); + this.socket.setKeepAlive(this.rpcClient.tcpKeepAlive); + if (this.rpcClient.localAddr != null) { + this.socket.bind(this.rpcClient.localAddr); + } + NetUtils.connect(this.socket, remoteId.getAddress(), this.rpcClient.connectTO); + this.socket.setSoTimeout(this.rpcClient.readTO); + return; + } catch (SocketTimeoutException toe) { + /* + * The max number of retries is 45, which amounts to 20s*45 = 15 minutes retries. + */ + handleConnectionFailure(timeoutFailures++, this.rpcClient.maxRetries, toe); + } catch (IOException ie) { + handleConnectionFailure(ioFailures++, this.rpcClient.maxRetries, ie); + } + } + } + + /** + * Handle connection failures If the current number of retries is equal to the max number of + * retries, stop retrying and throw the exception; Otherwise backoff N seconds and try connecting + * again. This Method is only called from inside setupIOstreams(), which is synchronized. Hence + * the sleep is synchronized; the locks will be retained. + * @param curRetries current number of retries + * @param maxRetries max number of retries allowed + * @param ioe failure reason + * @throws IOException if max number of retries is reached + */ + private void handleConnectionFailure(int curRetries, int maxRetries, IOException ioe) + throws IOException { + closeSocket(); + + // throw the exception if the maximum number of retries is reached + if (curRetries >= maxRetries || ExceptionUtil.isInterrupt(ioe)) { + throw ioe; + } + + // otherwise back off and retry + try { + Thread.sleep(this.rpcClient.failureSleep); + } catch (InterruptedException ie) { + ExceptionUtil.rethrowIfInterrupt(ie); + } + + LOG.info("Retrying connect to server: " + remoteId.getAddress() + " after sleeping " + + this.rpcClient.failureSleep + "ms. Already tried " + curRetries + " time(s)."); + } + + /* + * wait till someone signals us to start reading RPC response or it is idle too long, it is marked + * as to be closed, or the client is marked as not running. + * @return true if it is time to read a response; false otherwise. + */ + private synchronized boolean waitForWork() { + // beware of the concurrent access to the calls list: we can add calls, but as well + // remove them. + long waitUntil = EnvironmentEdgeManager.currentTime() + this.rpcClient.minIdleTimeBeforeClose; + for (;;) { + if (thread == null) { + return false; + } + if (!calls.isEmpty()) { + return true; + } + if (EnvironmentEdgeManager.currentTime() >= waitUntil) { + closeConn( + new IOException("idle connection closed with " + calls.size() + " pending request(s)")); + return false; + } + try { + wait(Math.min(this.rpcClient.minIdleTimeBeforeClose, 1000)); + } catch (InterruptedException e) { + } + } + } + + @Override + public void run() { + if (LOG.isTraceEnabled()) { + LOG.trace(threadName + ": starting, connections " + this.rpcClient.connections.size()); + } + while (waitForWork()) { + readResponse(); + } + if (LOG.isTraceEnabled()) { + LOG.trace(threadName + ": stopped, connections " + this.rpcClient.connections.size()); + } + } + + private void disposeSasl() { + if (saslRpcClient != null) { + saslRpcClient.dispose(); + saslRpcClient = null; + } + } + + private boolean setupSaslConnection(final InputStream in2, final OutputStream out2) + throws IOException { + saslRpcClient = new HBaseSaslRpcClient(authMethod, token, serverPrincipal, + this.rpcClient.fallbackAllowed, this.rpcClient.conf.get("hbase.rpc.protection", + QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT))); + return saslRpcClient.saslConnect(in2, out2); + } + + /** + * If multiple clients with the same principal try to connect to the same server at the same time, + * the server assumes a replay attack is in progress. This is a feature of kerberos. In order to + * work around this, what is done is that the client backs off randomly and tries to initiate the + * connection again. The other problem is to do with ticket expiry. To handle that, a relogin is + * attempted. + * <p> + * The retry logic is governed by the {@link #shouldAuthenticateOverKrb} method. In case when the + * user doesn't have valid credentials, we don't need to retry (from cache or ticket). In such + * cases, it is prudent to throw a runtime exception when we receive a SaslException from the + * underlying authentication implementation, so there is no retry from other high level (for eg, + * HCM or HBaseAdmin). + * </p> + */ + private void handleSaslConnectionFailure(final int currRetries, final int maxRetries, + final Exception ex, final UserGroupInformation user) + throws IOException, InterruptedException { + closeSocket(); + user.doAs(new PrivilegedExceptionAction<Object>() { + @Override + public Object run() throws IOException, InterruptedException { + if (shouldAuthenticateOverKrb()) { + if (currRetries < maxRetries) { + if (LOG.isDebugEnabled()) { + LOG.debug("Exception encountered while connecting to " + "the server : " + ex); + } + // try re-login + relogin(); + disposeSasl(); + // have granularity of milliseconds + // we are sleeping with the Connection lock held but since this + // connection instance is being used for connecting to the server + // in question, it is okay + Thread.sleep(ThreadLocalRandom.current().nextInt(reloginMaxBackoff) + 1); + return null; + } else { + String msg = "Couldn't setup connection for " + + UserGroupInformation.getLoginUser().getUserName() + " to " + serverPrincipal; + LOG.warn(msg, ex); + throw (IOException) new IOException(msg).initCause(ex); + } + } else { + LOG.warn("Exception encountered while connecting to " + "the server : " + ex); + } + if (ex instanceof RemoteException) { + throw (RemoteException) ex; + } + if (ex instanceof SaslException) { + String msg = "SASL authentication failed." + + " The most likely cause is missing or invalid credentials." + " Consider 'kinit'."; + LOG.fatal(msg, ex); + throw new RuntimeException(msg, ex); + } + throw new IOException(ex); + } + }); + } + + private void setupIOstreams() throws IOException { + if (socket != null) { + // The connection is already available. Perfect. + return; + } + + if (this.rpcClient.failedServers.isFailedServer(remoteId.getAddress())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Not trying to connect to " + remoteId.address + + " this server is in the failed servers list"); + } + throw new FailedServerException( + "This server is in the failed servers list: " + remoteId.address); + } + + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Connecting to " + remoteId.address); + } + + short numRetries = 0; + final short MAX_RETRIES = 5; + while (true) { + setupConnection(); + InputStream inStream = NetUtils.getInputStream(socket); + // This creates a socket with a write timeout. This timeout cannot be changed. + OutputStream outStream = NetUtils.getOutputStream(socket, this.rpcClient.writeTO); + // Write out the preamble -- MAGIC, version, and auth to use. + writeConnectionHeaderPreamble(outStream); + if (useSasl) { + final InputStream in2 = inStream; + final OutputStream out2 = outStream; + UserGroupInformation ticket = getUGI(); + boolean continueSasl; + if (ticket == null) { + throw new FatalConnectionException("ticket/user is null"); + } + try { + continueSasl = ticket.doAs(new PrivilegedExceptionAction<Boolean>() { + @Override + public Boolean run() throws IOException { + return setupSaslConnection(in2, out2); + } + }); + } catch (Exception ex) { + ExceptionUtil.rethrowIfInterrupt(ex); + handleSaslConnectionFailure(numRetries++, MAX_RETRIES, ex, ticket); + continue; + } + if (continueSasl) { + // Sasl connect is successful. Let's set up Sasl i/o streams. + inStream = saslRpcClient.getInputStream(inStream); + outStream = saslRpcClient.getOutputStream(outStream); + } else { + // fall back to simple auth because server told us so. + // do not change authMethod and useSasl here, we should start from secure when + // reconnecting because regionserver may change its sasl config after restart. + } + } + this.in = new DataInputStream(new BufferedInputStream(inStream)); + this.out = new DataOutputStream(new BufferedOutputStream(outStream)); + // Now write out the connection header + writeConnectionHeader(); + break; + } + } catch (Throwable t) { + closeSocket(); + IOException e = ExceptionUtil.asInterrupt(t); + if (e == null) { + this.rpcClient.failedServers.addToFailedServers(remoteId.address); + if (t instanceof LinkageError) { + // probably the hbase hadoop version does not match the running hadoop version + e = new DoNotRetryIOException(t); + } else if (t instanceof IOException) { + e = (IOException) t; + } else { + e = new IOException("Could not set up IO Streams to " + remoteId.address, t); + } + } + throw e; + } + + // start the receiver thread after the socket connection has been set up + thread = new Thread(this, threadName); + thread.setDaemon(true); + thread.start(); + } + + /** + * Write the RPC header: {@code <MAGIC WORD -- 'HBas'> <ONEBYTE_VERSION> <ONEBYTE_AUTH_TYPE>} + */ + private void writeConnectionHeaderPreamble(OutputStream out) throws IOException { + out.write(connectionHeaderPreamble); + out.flush(); + } + + /** + * Write the connection header. + */ + private void writeConnectionHeader() throws IOException { + this.out.write(connectionHeaderWithLength); + this.out.flush(); + } + + private void tracedWriteRequest(Call call) throws IOException { + try (TraceScope ignored = Trace.startSpan("RpcClientImpl.tracedWriteRequest", call.span)) { + writeRequest(call); + } + } + + /** + * Initiates a call by sending the parameter to the remote server. Note: this is not called from + * the Connection thread, but by other threads. + * @see #readResponse() + */ + private void writeRequest(Call call) throws IOException { + ByteBuffer cellBlock = + this.rpcClient.cellBlockBuilder.buildCellBlock(this.codec, this.compressor, call.cells); + CellBlockMeta cellBlockMeta; + if (cellBlock != null) { + cellBlockMeta = CellBlockMeta.newBuilder().setLength(cellBlock.limit()).build(); + } else { + cellBlockMeta = null; + } + RequestHeader requestHeader = buildRequestHeader(call, cellBlockMeta); + + setupIOstreams(); + + // Now we're going to write the call. We take the lock, then check that the connection + // is still valid, and, if so we do the write to the socket. If the write fails, we don't + // know where we stand, we have to close the connection. + if (Thread.interrupted()) { + throw new InterruptedIOException(); + } + + calls.put(call.id, call); // We put first as we don't want the connection to become idle. + // from here, we do not throw any exception to upper layer as the call has been tracked in the + // pending calls map. + try { + call.callStats.setRequestSizeBytes(write(this.out, requestHeader, call.param, cellBlock)); + } catch (IOException e) { + closeConn(e); + return; + } + notifyAll(); + } + + /* + * Receive a response. Because only one receiver, so no synchronization on in. + */ + private void readResponse() { + Call call = null; + boolean expectedCall = false; + try { + // See HBaseServer.Call.setResponse for where we write out the response. + // Total size of the response. Unused. But have to read it in anyways. + int totalSize = in.readInt(); + + // Read the header + ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in); + int id = responseHeader.getCallId(); + call = calls.remove(id); // call.done have to be set before leaving this method + expectedCall = (call != null && !call.isDone()); + if (!expectedCall) { + // So we got a response for which we have no corresponding 'call' here on the client-side. + // We probably timed out waiting, cleaned up all references, and now the server decides + // to return a response. There is nothing we can do w/ the response at this stage. Clean + // out the wire of the response so its out of the way and we can get other responses on + // this connection. + int readSoFar = getTotalSizeWhenWrittenDelimited(responseHeader); + int whatIsLeftToRead = totalSize - readSoFar; + IOUtils.skipFully(in, whatIsLeftToRead); + if (call != null) { + call.callStats.setResponseSizeBytes(totalSize); + call.callStats + .setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime()); + } + return; + } + if (responseHeader.hasException()) { + ExceptionResponse exceptionResponse = responseHeader.getException(); + RemoteException re = createRemoteException(exceptionResponse); + call.setException(re); + call.callStats.setResponseSizeBytes(totalSize); + call.callStats + .setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime()); + if (isFatalConnectionException(exceptionResponse)) { + synchronized (this) { + closeConn(re); + } + } + } else { + Message value = null; + if (call.responseDefaultType != null) { + Builder builder = call.responseDefaultType.newBuilderForType(); + ProtobufUtil.mergeDelimitedFrom(builder, in); + value = builder.build(); + } + CellScanner cellBlockScanner = null; + if (responseHeader.hasCellBlockMeta()) { + int size = responseHeader.getCellBlockMeta().getLength(); + byte[] cellBlock = new byte[size]; + IOUtils.readFully(this.in, cellBlock, 0, cellBlock.length); + cellBlockScanner = this.rpcClient.cellBlockBuilder.createCellScanner(this.codec, + this.compressor, cellBlock); + } + call.setResponse(value, cellBlockScanner); + call.callStats.setResponseSizeBytes(totalSize); + call.callStats + .setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime()); + } + } catch (IOException e) { + if (expectedCall) { + call.setException(e); + } + if (e instanceof SocketTimeoutException) { + // Clean up open calls but don't treat this as a fatal condition, + // since we expect certain responses to not make it by the specified + // {@link ConnectionId#rpcTimeout}. + if (LOG.isTraceEnabled()) { + LOG.trace("ignored", e); + } + } else { + synchronized (this) { + closeConn(e); + } + } + } + } + + @Override + protected synchronized void callTimeout(Call call) { + // call sender + calls.remove(call.id); + } + + // just close socket input and output. + private void closeSocket() { + IOUtils.closeStream(out); + IOUtils.closeStream(in); + IOUtils.closeSocket(socket); + out = null; + in = null; + socket = null; + } + + // close socket, reader, and clean up all pending calls. + private void closeConn(IOException e) { + if (thread == null) { + return; + } + thread.interrupt(); + thread = null; + closeSocket(); + if (callSender != null) { + callSender.cleanup(e); + } + for (Call call : calls.values()) { + call.setException(e); + } + calls.clear(); + } + + // release all resources, the connection will not be used any more. + @Override + public synchronized void shutdown() { + closed = true; + if (callSender != null) { + callSender.interrupt(); + } + closeConn(new IOException("connection to " + remoteId.address + " closed")); + } + + @Override + public void cleanupConnection() { + // do nothing + } + + @Override + public synchronized void sendRequest(final Call call, HBaseRpcController pcrc) + throws IOException { + pcrc.notifyOnCancel(new RpcCallback<Object>() { + + @Override + public void run(Object parameter) { + setCancelled(call); + synchronized (BlockingRpcConnection.this) { + if (callSender != null) { + callSender.remove(call); + } else { + calls.remove(call.id); + } + } + } + }, new CancellationCallback() { + + @Override + public void run(boolean cancelled) throws IOException { + if (cancelled) { + setCancelled(call); + return; + } + scheduleTimeoutTask(call); + if (callSender != null) { + callSender.sendCall(call); + } else { + tracedWriteRequest(call); + } + } + }); + } + + @Override + public synchronized boolean isActive() { + return thread != null; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java new file mode 100644 index 0000000..c628c31 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * We will expose the connection to upper layer before initialized, so we need to buffer the calls + * passed in and write them out once the connection is established. + */ [email protected] +class BufferCallBeforeInitHandler extends ChannelDuplexHandler { + + private enum BufferCallAction { + FLUSH, FAIL + } + + public static final class BufferCallEvent { + + public final BufferCallAction action; + + public final IOException error; + + private BufferCallEvent(BufferCallBeforeInitHandler.BufferCallAction action, + IOException error) { + this.action = action; + this.error = error; + } + + public static BufferCallBeforeInitHandler.BufferCallEvent success() { + return SUCCESS_EVENT; + } + + public static BufferCallBeforeInitHandler.BufferCallEvent fail(IOException error) { + return new BufferCallEvent(BufferCallAction.FAIL, error); + } + } + + private static final BufferCallEvent SUCCESS_EVENT = new BufferCallEvent(BufferCallAction.FLUSH, + null); + + private final Map<Integer, Call> id2Call = new HashMap<>(); + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + if (msg instanceof Call) { + Call call = (Call) msg; + id2Call.put(call.id, call); + // The call is already in track so here we set the write operation as success. + // We will fail the call directly if we can not write it out. + promise.trySuccess(); + } else { + ctx.write(msg, promise); + } + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof BufferCallEvent) { + BufferCallEvent bcEvt = (BufferCallBeforeInitHandler.BufferCallEvent) evt; + switch (bcEvt.action) { + case FLUSH: + for (Call call : id2Call.values()) { + ctx.write(call); + } + break; + case FAIL: + for (Call call : id2Call.values()) { + call.setException(bcEvt.error); + } + break; + } + ctx.flush(); + ctx.pipeline().remove(this); + } else if (evt instanceof CallEvent) { + // just remove the call for now until we add other call event other than timeout and cancel. + id2Call.remove(((CallEvent) evt).call.id); + } else { + ctx.fireUserEventTriggered(evt); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java index 5f90837..a6203d5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java @@ -19,36 +19,50 @@ package org.apache.hadoop.hbase.ipc; import com.google.protobuf.Descriptors; import com.google.protobuf.Message; +import com.google.protobuf.RpcCallback; + +import io.netty.util.Timeout; + +import java.io.IOException; + import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; - -import java.io.IOException; +import org.apache.htrace.Span; +import org.apache.htrace.Trace; /** A call waiting for a value. */ @InterfaceAudience.Private -public class Call { - final int id; // call id - final Message param; // rpc request method param object +class Call { + final int id; // call id + final Message param; // rpc request method param object /** - * Optionally has cells when making call. Optionally has cells set on response. Used - * passing cells to the rpc and receiving the response. + * Optionally has cells when making call. Optionally has cells set on response. Used passing cells + * to the rpc and receiving the response. */ CellScanner cells; - Message response; // value, null if error - // The return type. Used to create shell into which we deserialize the response if any. + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", + justification = "Direct access is only allowed after done") + Message response; // value, null if error + // The return type. Used to create shell into which we deserialize the response if any. Message responseDefaultType; - IOException error; // exception, null if value - volatile boolean done; // true when call is done + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", + justification = "Direct access is only allowed after done") + IOException error; // exception, null if value + private boolean done; // true when call is done final Descriptors.MethodDescriptor md; final int timeout; // timeout in millisecond for this call; 0 means infinite. + final int priority; final MetricsConnection.CallStats callStats; + final RpcCallback<Call> callback; + final Span span; + Timeout timeoutTask; protected Call(int id, final Descriptors.MethodDescriptor md, Message param, - final CellScanner cells, final Message responseDefaultType, int timeout, - MetricsConnection.CallStats callStats) { + final CellScanner cells, final Message responseDefaultType, int timeout, int priority, + RpcCallback<Call> callback, MetricsConnection.CallStats callStats) { this.param = param; this.md = md; this.cells = cells; @@ -57,73 +71,74 @@ public class Call { this.responseDefaultType = responseDefaultType; this.id = id; this.timeout = timeout; + this.priority = priority; + this.callback = callback; + this.span = Trace.currentSpan(); + } + + @Override + public String toString() { + return "callId: " + this.id + " methodName: " + this.md.getName() + " param {" + + (this.param != null ? ProtobufUtil.getShortTextFormat(this.param) : "") + "}"; } /** - * Check if the call did timeout. Set an exception (includes a notify) if it's the case. - * @return true if the call is on timeout, false otherwise. + * called from timeoutTask, prevent self cancel */ - public boolean checkAndSetTimeout() { - if (timeout == 0){ - return false; - } - - long waitTime = EnvironmentEdgeManager.currentTime() - getStartTime(); - if (waitTime >= timeout) { - IOException ie = new CallTimeoutException("Call id=" + id + - ", waitTime=" + waitTime + ", operationTimeout=" + timeout + " expired."); - setException(ie); // includes a notify - return true; - } else { - return false; + public void setTimeout(IOException error) { + synchronized (this) { + if (done) { + return; + } + this.done = true; + this.error = error; } + callback.run(this); } - public int remainingTime() { - if (timeout == 0) { - return Integer.MAX_VALUE; + private void callComplete() { + if (timeoutTask != null) { + timeoutTask.cancel(); } - - int remaining = timeout - (int) (EnvironmentEdgeManager.currentTime() - getStartTime()); - return remaining > 0 ? remaining : 0; + callback.run(this); } - @Override - public String toString() { - return "callId: " + this.id + " methodName: " + this.md.getName() + " param {" + - (this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") + "}"; - } - - /** Indicate when the call is complete and the - * value or error are available. Notifies by default. */ - protected synchronized void callComplete() { - this.done = true; - notify(); // notify caller - } - - /** Set the exception when there is an error. - * Notify the caller the call is done. - * + /** + * Set the exception when there is an error. Notify the caller the call is done. * @param error exception thrown by the call; either local or remote */ public void setException(IOException error) { - this.error = error; + synchronized (this) { + if (done) { + return; + } + this.done = true; + this.error = error; + } callComplete(); } /** - * Set the return value when there is no error. - * Notify the caller the call is done. - * + * Set the return value when there is no error. Notify the caller the call is done. * @param response return value of the call. * @param cells Can be null */ public void setResponse(Message response, final CellScanner cells) { - this.response = response; - this.cells = cells; + synchronized (this) { + if (done) { + return; + } + this.done = true; + this.response = response; + this.cells = cells; + } callComplete(); } + public synchronized boolean isDone() { + return done; + } + public long getStartTime() { return this.callStats.getStartTime(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallCancelledException.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallCancelledException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallCancelledException.java new file mode 100644 index 0000000..a6777c0 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallCancelledException.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Client side call cancelled. + */ [email protected] [email protected] +public class CallCancelledException extends HBaseIOException { + + private static final long serialVersionUID = 309775809470318208L; + + public CallCancelledException(String message) { + super(message); + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallEvent.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallEvent.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallEvent.java new file mode 100644 index 0000000..1c2ea32 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallEvent.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Used to tell netty handler the call is cancelled, timeout... + */ [email protected] +class CallEvent { + + public enum Type { + TIMEOUT, CANCELLED + } + + final Type type; + + final Call call; + + CallEvent(Type type, Call call) { + this.type = type; + this.call = call; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java new file mode 100644 index 0000000..0dac2d1 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java @@ -0,0 +1,293 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufOutputStream; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.codec.Codec; +import org.apache.hadoop.hbase.io.BoundedByteBufferPool; +import org.apache.hadoop.hbase.io.ByteBufferInputStream; +import org.apache.hadoop.hbase.io.ByteBufferOutputStream; +import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.hadoop.io.compress.CodecPool; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionInputStream; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.Decompressor; + +/** + * Helper class for building cell block. + */ [email protected] +class CellBlockBuilder { + + // LOG is being used in TestCellBlockBuilder + static final Log LOG = LogFactory.getLog(CellBlockBuilder.class); + + private final Configuration conf; + + /** + * How much we think the decompressor will expand the original compressed content. + */ + private final int cellBlockDecompressionMultiplier; + + private final int cellBlockBuildingInitialBufferSize; + + public CellBlockBuilder(Configuration conf) { + this.conf = conf; + this.cellBlockDecompressionMultiplier = + conf.getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3); + + // Guess that 16k is a good size for rpc buffer. Could go bigger. See the TODO below in + // #buildCellBlock. + this.cellBlockBuildingInitialBufferSize = + ClassSize.align(conf.getInt("hbase.ipc.cellblock.building.initial.buffersize", 16 * 1024)); + } + + private interface OutputStreamSupplier { + + OutputStream get(int expectedSize); + + int size(); + } + + private static final class ByteBufferOutputStreamSupplier implements OutputStreamSupplier { + + private ByteBufferOutputStream baos; + + @Override + public OutputStream get(int expectedSize) { + baos = new ByteBufferOutputStream(expectedSize); + return baos; + } + + @Override + public int size() { + return baos.size(); + } + } + + /** + * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or + * <code>compressor</code>. + * @param codec + * @param compressor + * @param cellScanner + * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using + * passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has + * been flipped and is ready for reading. Use limit to find total size. + * @throws IOException + */ + public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor, + final CellScanner cellScanner) throws IOException { + ByteBufferOutputStreamSupplier supplier = new ByteBufferOutputStreamSupplier(); + if (buildCellBlock(codec, compressor, cellScanner, supplier)) { + ByteBuffer bb = supplier.baos.getByteBuffer(); + // If no cells, don't mess around. Just return null (could be a bunch of existence checking + // gets or something -- stuff that does not return a cell). + return bb.hasRemaining() ? bb : null; + } else { + return null; + } + } + + private static final class ByteBufOutputStreamSupplier implements OutputStreamSupplier { + + private final ByteBufAllocator alloc; + + private ByteBuf buf; + + public ByteBufOutputStreamSupplier(ByteBufAllocator alloc) { + this.alloc = alloc; + } + + @Override + public OutputStream get(int expectedSize) { + buf = alloc.buffer(expectedSize); + return new ByteBufOutputStream(buf); + } + + @Override + public int size() { + return buf.writerIndex(); + } + } + + public ByteBuf buildCellBlock(Codec codec, CompressionCodec compressor, CellScanner cellScanner, + ByteBufAllocator alloc) throws IOException { + ByteBufOutputStreamSupplier supplier = new ByteBufOutputStreamSupplier(alloc); + if (buildCellBlock(codec, compressor, cellScanner, supplier)) { + return supplier.buf; + } else { + return null; + } + } + + private boolean buildCellBlock(final Codec codec, final CompressionCodec compressor, + final CellScanner cellScanner, OutputStreamSupplier supplier) throws IOException { + if (cellScanner == null) { + return false; + } + if (codec == null) { + throw new CellScannerButNoCodecException(); + } + int bufferSize = cellBlockBuildingInitialBufferSize; + encodeCellsTo(supplier.get(bufferSize), cellScanner, codec, compressor); + if (LOG.isTraceEnabled() && bufferSize < supplier.size()) { + LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + supplier.size() + + "; up hbase.ipc.cellblock.building.initial.buffersize?"); + } + return true; + } + + private void encodeCellsTo(OutputStream os, CellScanner cellScanner, Codec codec, + CompressionCodec compressor) throws IOException { + Compressor poolCompressor = null; + try { + if (compressor != null) { + if (compressor instanceof Configurable) { + ((Configurable) compressor).setConf(this.conf); + } + poolCompressor = CodecPool.getCompressor(compressor); + os = compressor.createOutputStream(os, poolCompressor); + } + Codec.Encoder encoder = codec.getEncoder(os); + while (cellScanner.advance()) { + encoder.write(cellScanner.current()); + } + encoder.flush(); + } catch (BufferOverflowException | IndexOutOfBoundsException e) { + throw new DoNotRetryIOException(e); + } finally { + os.close(); + if (poolCompressor != null) { + CodecPool.returnCompressor(poolCompressor); + } + } + } + + /** + * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or + * <code>compressor</code>. + * @param codec + * @param compressor + * @param cellScanner + * @param pool Pool of ByteBuffers to make use of. Can be null and then we'll allocate our own + * ByteBuffer. + * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using + * passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has + * been flipped and is ready for reading. Use limit to find total size. If + * <code>pool</code> was not null, then this returned ByteBuffer came from there and + * should be returned to the pool when done. + * @throws IOException + */ + public ByteBuffer buildCellBlock(Codec codec, CompressionCodec compressor, + CellScanner cellScanner, BoundedByteBufferPool pool) throws IOException { + if (cellScanner == null) { + return null; + } + if (codec == null) { + throw new CellScannerButNoCodecException(); + } + ByteBufferOutputStream bbos; + ByteBuffer bb = null; + if (pool != null) { + bb = pool.getBuffer(); + bbos = new ByteBufferOutputStream(bb); + } else { + bbos = new ByteBufferOutputStream(cellBlockBuildingInitialBufferSize); + } + encodeCellsTo(bbos, cellScanner, codec, compressor); + if (bbos.size() == 0) { + if (pool != null) { + pool.putBuffer(bb); + } + return null; + } + return bbos.getByteBuffer(); + } + + /** + * @param codec to use for cellblock + * @param cellBlock to encode + * @return CellScanner to work against the content of <code>cellBlock</code> + * @throws IOException if encoding fails + */ + public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor, + final byte[] cellBlock) throws IOException { + return createCellScanner(codec, compressor, ByteBuffer.wrap(cellBlock)); + } + + /** + * @param codec + * @param cellBlock ByteBuffer containing the cells written by the Codec. The buffer should be + * position()'ed at the start of the cell block and limit()'ed at the end. + * @return CellScanner to work against the content of <code>cellBlock</code> + * @throws IOException + */ + public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor, + ByteBuffer cellBlock) throws IOException { + if (compressor != null) { + cellBlock = decompress(compressor, cellBlock); + } + // Not making the Decoder over the ByteBuffer purposefully. The Decoder over the BB will + // make Cells directly over the passed BB. This method is called at client side and we don't + // want the Cells to share the same byte[] where the RPC response is being read. Caching of any + // of the Cells at user's app level will make it not possible to GC the response byte[] + return codec.getDecoder(new ByteBufferInputStream(cellBlock)); + } + + private ByteBuffer decompress(CompressionCodec compressor, ByteBuffer cellBlock) + throws IOException { + // GZIPCodec fails w/ NPE if no configuration. + if (compressor instanceof Configurable) { + ((Configurable) compressor).setConf(this.conf); + } + Decompressor poolDecompressor = CodecPool.getDecompressor(compressor); + CompressionInputStream cis = + compressor.createInputStream(new ByteBufferInputStream(cellBlock), poolDecompressor); + ByteBufferOutputStream bbos; + try { + // TODO: This is ugly. The buffer will be resized on us if we guess wrong. + // TODO: Reuse buffers. + bbos = + new ByteBufferOutputStream(cellBlock.remaining() * this.cellBlockDecompressionMultiplier); + IOUtils.copy(cis, bbos); + bbos.close(); + cellBlock = bbos.getByteBuffer(); + } finally { + CodecPool.returnDecompressor(poolDecompressor); + } + return cellBlock; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellScannerButNoCodecException.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellScannerButNoCodecException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellScannerButNoCodecException.java new file mode 100644 index 0000000..ffd27b3 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellScannerButNoCodecException.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Thrown if a cellscanner but no codec to encode it with. + */ +@SuppressWarnings("serial") [email protected] [email protected] +public class CellScannerButNoCodecException extends HBaseIOException { +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DefaultNettyEventLoopConfig.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DefaultNettyEventLoopConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DefaultNettyEventLoopConfig.java new file mode 100644 index 0000000..f710d54 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DefaultNettyEventLoopConfig.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import io.netty.channel.Channel; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.util.concurrent.DefaultThreadFactory; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Pair; + +/** + * The default netty event loop config + */ [email protected] +class DefaultNettyEventLoopConfig { + + public static final Pair<EventLoopGroup, Class<? extends Channel>> GROUP_AND_CHANNEL_CLASS = Pair + .<EventLoopGroup, Class<? extends Channel>> newPair( + new NioEventLoopGroup(0, + new DefaultThreadFactory("Default-IPC-NioEventLoopGroup", true, Thread.MAX_PRIORITY)), + NioSocketChannel.class); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java new file mode 100644 index 0000000..aaaea1f --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import com.google.protobuf.RpcCallback; + +import java.io.IOException; + +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Simple delegating controller for use with the {@link RpcControllerFactory} to help override + * standard behavior of a {@link HBaseRpcController}. Used testing. + */ [email protected] +public class DelegatingHBaseRpcController implements HBaseRpcController { + + private final HBaseRpcController delegate; + + public DelegatingHBaseRpcController(HBaseRpcController delegate) { + this.delegate = delegate; + } + + @Override + public void reset() { + delegate.reset(); + } + + @Override + public boolean failed() { + return delegate.failed(); + } + + @Override + public String errorText() { + return delegate.errorText(); + } + + @Override + public void startCancel() { + delegate.startCancel(); + } + + @Override + public void setFailed(String reason) { + delegate.setFailed(reason); + } + + @Override + public boolean isCanceled() { + return delegate.isCanceled(); + } + + @Override + public void notifyOnCancel(RpcCallback<Object> callback) { + delegate.notifyOnCancel(callback); + } + + @Override + public CellScanner cellScanner() { + return delegate.cellScanner(); + } + + @Override + public void setCellScanner(CellScanner cellScanner) { + delegate.setCellScanner(cellScanner); + } + + @Override + public void setPriority(int priority) { + delegate.setPriority(priority); + } + + @Override + public void setPriority(TableName tn) { + delegate.setPriority(tn); + } + + @Override + public int getPriority() { + return delegate.getPriority(); + } + + @Override + public int getCallTimeout() { + return delegate.getCallTimeout(); + } + + @Override + public void setCallTimeout(int callTimeout) { + delegate.setCallTimeout(callTimeout); + } + + @Override + public boolean hasCallTimeout() { + return delegate.hasCallTimeout(); + } + + @Override + public void setFailed(IOException e) { + delegate.setFailed(e); + } + + @Override + public IOException getFailed() { + return delegate.getFailed(); + } + + @Override + public void setDone(CellScanner cellScanner) { + delegate.setDone(cellScanner); + } + + @Override + public void notifyOnCancel(RpcCallback<Object> callback, CancellationCallback action) + throws IOException { + delegate.notifyOnCancel(callback, action); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingPayloadCarryingRpcController.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingPayloadCarryingRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingPayloadCarryingRpcController.java deleted file mode 100644 index ad4224b..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingPayloadCarryingRpcController.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.ipc; - -import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; - -/** - * Simple delegating controller for use with the {@link RpcControllerFactory} to help override - * standard behavior of a {@link PayloadCarryingRpcController}. - */ [email protected] -public class DelegatingPayloadCarryingRpcController extends PayloadCarryingRpcController { - private PayloadCarryingRpcController delegate; - - public DelegatingPayloadCarryingRpcController(PayloadCarryingRpcController delegate) { - this.delegate = delegate; - } - - @Override - public CellScanner cellScanner() { - return delegate.cellScanner(); - } - - @Override - public void setCellScanner(final CellScanner cellScanner) { - delegate.setCellScanner(cellScanner); - } - - @Override - public void setPriority(int priority) { - delegate.setPriority(priority); - } - - @Override - public void setPriority(final TableName tn) { - delegate.setPriority(tn); - } - - @Override - public int getPriority() { - return delegate.getPriority(); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FallbackDisallowedException.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FallbackDisallowedException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FallbackDisallowedException.java new file mode 100644 index 0000000..721148b --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FallbackDisallowedException.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Indicate that the rpc server tells client to fallback to simple auth but client is disabled to do + * so. + */ [email protected] [email protected] +public class FallbackDisallowedException extends HBaseIOException { + + private static final long serialVersionUID = -6942845066279358253L; + + public FallbackDisallowedException() { + super("Server asks us to fall back to SIMPLE auth, " + + "but this client is configured to only allow secure connections."); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java new file mode 100644 index 0000000..2c4b335 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; + +import java.io.IOException; + +import org.apache.hadoop.hbase.CellScannable; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Optionally carries Cells across the proxy/service interface down into ipc. On its way out it + * optionally carries a set of result Cell data. We stick the Cells here when we want to avoid + * having to protobuf them (for performance reasons). This class is used ferrying data across the + * proxy/protobuf service chasm. Also does call timeout. Used by client and server ipc'ing. + */ [email protected] +public interface HBaseRpcController extends RpcController, CellScannable { + + static final int PRIORITY_UNSET = -1; + + /** + * Only used to send cells to rpc server, the returned cells should be set by + * {@link #setDone(CellScanner)}. + */ + void setCellScanner(CellScanner cellScanner); + + /** + * @param priority Priority for this request; should fall roughly in the range + * {@link HConstants#NORMAL_QOS} to {@link HConstants#HIGH_QOS} + */ + void setPriority(int priority); + + /** + * @param tn Set priority based off the table we are going against. + */ + void setPriority(final TableName tn); + + /** + * @return The priority of this request + */ + int getPriority(); + + int getCallTimeout(); + + void setCallTimeout(int callTimeout); + + boolean hasCallTimeout(); + + /** + * Set failed with an exception to pass on. For use in async rpc clients + * @param e exception to set with + */ + void setFailed(IOException e); + + /** + * Return the failed exception, null if not failed. + */ + IOException getFailed(); + + /** + * <b>IMPORTANT:</b> always call this method if the call finished without any exception to tell + * the {@code HBaseRpcController} that we are done. + */ + void setDone(CellScanner cellScanner); + + /** + * A little different from the basic RpcController: + * <ol> + * <li>You can register multiple callbacks to an {@code HBaseRpcController}.</li> + * <li>The callback will not be called if the rpc call is finished without any cancellation.</li> + * <li>You can call me at client side also.</li> + * </ol> + */ + @Override + void notifyOnCancel(RpcCallback<Object> callback); + + interface CancellationCallback { + void run(boolean cancelled) throws IOException; + } + + /** + * If not cancelled, add the callback to cancellation callback list. And then execute the action + * with the cancellation state as a parameter. The implementation should guarantee that the + * cancellation state does not change during this call. + */ + void notifyOnCancel(RpcCallback<Object> callback, CancellationCallback action) throws IOException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java new file mode 100644 index 0000000..a976473 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java @@ -0,0 +1,244 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import com.google.protobuf.RpcCallback; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.CellScannable; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Optionally carries Cells across the proxy/service interface down into ipc. On its way out it + * optionally carries a set of result Cell data. We stick the Cells here when we want to avoid + * having to protobuf them (for performance reasons). This class is used ferrying data across the + * proxy/protobuf service chasm. Also does call timeout. Used by client and server ipc'ing. + */ [email protected] +public class HBaseRpcControllerImpl implements HBaseRpcController { + /** + * The time, in ms before the call should expire. + */ + private Integer callTimeout; + + private boolean done = false; + + private boolean cancelled = false; + + private final List<RpcCallback<Object>> cancellationCbs = new ArrayList<>(); + + private IOException exception; + + /** + * Priority to set on this request. Set it here in controller so available composing the request. + * This is the ordained way of setting priorities going forward. We will be undoing the old + * annotation-based mechanism. + */ + private int priority = PRIORITY_UNSET; + + /** + * They are optionally set on construction, cleared after we make the call, and then optionally + * set on response with the result. We use this lowest common denominator access to Cells because + * sometimes the scanner is backed by a List of Cells and other times, it is backed by an encoded + * block that implements CellScanner. + */ + private CellScanner cellScanner; + + public HBaseRpcControllerImpl() { + this((CellScanner) null); + } + + public HBaseRpcControllerImpl(final CellScanner cellScanner) { + this.cellScanner = cellScanner; + } + + public HBaseRpcControllerImpl(final List<CellScannable> cellIterables) { + this.cellScanner = cellIterables == null ? null : CellUtil.createCellScanner(cellIterables); + } + + /** + * @return One-shot cell scanner (you cannot back it up and restart) + */ + @Override + public CellScanner cellScanner() { + return cellScanner; + } + + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", + justification = "The only possible race method is startCancel") + @Override + public void setCellScanner(final CellScanner cellScanner) { + this.cellScanner = cellScanner; + } + + @Override + public void setPriority(int priority) { + this.priority = priority; + } + + @Override + public void setPriority(final TableName tn) { + setPriority( + tn != null && tn.isSystemTable() ? HConstants.SYSTEMTABLE_QOS : HConstants.NORMAL_QOS); + } + + @Override + public int getPriority() { + return priority; + } + + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", + justification = "The only possible race method is startCancel") + @Override + public void reset() { + priority = 0; + cellScanner = null; + exception = null; + callTimeout = null; + // In the implementations of some callable with replicas, rpc calls are executed in a executor + // and we could cancel the operation from outside which means there could be a race between + // reset and startCancel. Although I think the race should be handled by the callable since the + // reset may clear the cancel state... + synchronized (this) { + done = false; + cancelled = false; + cancellationCbs.clear(); + } + } + + @Override + public int getCallTimeout() { + if (callTimeout != null) { + return callTimeout.intValue(); + } else { + return 0; + } + } + + @Override + public void setCallTimeout(int callTimeout) { + this.callTimeout = callTimeout; + } + + @Override + public boolean hasCallTimeout() { + return callTimeout != null; + } + + @Override + public synchronized String errorText() { + if (!done || exception == null) { + return null; + } + return exception.getMessage(); + } + + @Override + public synchronized boolean failed() { + return done && this.exception != null; + } + + @Override + public synchronized boolean isCanceled() { + return cancelled; + } + + @Override + public void notifyOnCancel(RpcCallback<Object> callback) { + synchronized (this) { + if (done) { + return; + } + if (!cancelled) { + cancellationCbs.add(callback); + return; + } + } + // run it directly as we have already been cancelled. + callback.run(null); + } + + @Override + public synchronized void setFailed(String reason) { + if (done) { + return; + } + done = true; + exception = new IOException(reason); + } + + @Override + public synchronized void setFailed(IOException e) { + if (done) { + return; + } + done = true; + exception = e; + } + + @Override + public synchronized IOException getFailed() { + return done ? exception : null; + } + + @Override + public synchronized void setDone(CellScanner cellScanner) { + if (done) { + return; + } + done = true; + this.cellScanner = cellScanner; + } + + @Override + public void startCancel() { + // As said above in the comment of reset, the cancellationCbs maybe cleared by reset, so we need + // to copy it. + List<RpcCallback<Object>> cbs; + synchronized (this) { + if (done) { + return; + } + done = true; + cancelled = true; + cbs = new ArrayList<>(cancellationCbs); + } + for (RpcCallback<?> cb : cbs) { + cb.run(null); + } + } + + @Override + public synchronized void notifyOnCancel(RpcCallback<Object> callback, CancellationCallback action) + throws IOException { + if (cancelled) { + action.run(true); + } else { + cancellationCbs.add(callback); + action.run(false); + } + } + +} \ No newline at end of file
