HBASE-15745 Refactor RPC classes to better accept async changes Signed-off-by: stack <st...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/56358a0f Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/56358a0f Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/56358a0f Branch: refs/heads/hbase-12439 Commit: 56358a0fd3e93b7ba8dc2a9cd8d92d961a1c24a9 Parents: 86ca09e Author: Jurriaan Mous <jurm...@jurmo.us> Authored: Sun May 1 15:01:01 2016 +0200 Committer: stack <st...@apache.org> Committed: Fri May 6 13:43:45 2016 -0700 ---------------------------------------------------------------------- .../client/AbstractRegionServerCallable.java | 158 ++++ .../client/FastFailInterceptorContext.java | 5 +- .../client/NoOpRetryingInterceptorContext.java | 4 +- .../hbase/client/RegionServerCallable.java | 107 +-- .../hadoop/hbase/client/RetryingCallable.java | 34 +- .../hbase/client/RetryingCallableBase.java | 60 ++ .../RetryingCallerInterceptorContext.java | 4 +- .../hadoop/hbase/ipc/AsyncRpcChannel.java | 700 +---------------- .../hadoop/hbase/ipc/AsyncRpcChannelImpl.java | 743 +++++++++++++++++++ .../apache/hadoop/hbase/ipc/AsyncRpcClient.java | 8 +- .../hbase/ipc/AsyncServerResponseHandler.java | 4 +- .../hadoop/hbase/ipc/CoprocessorRpcChannel.java | 64 +- .../hbase/ipc/MasterCoprocessorRpcChannel.java | 2 +- .../hbase/ipc/RegionCoprocessorRpcChannel.java | 12 +- .../ipc/RegionServerCoprocessorRpcChannel.java | 2 +- .../hbase/ipc/SyncCoprocessorRpcChannel.java | 79 ++ 16 files changed, 1106 insertions(+), 880 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/56358a0f/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java new file mode 100644 index 0000000..ee9a781 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.net.ConnectException; +import java.net.SocketTimeoutException; + +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.exceptions.RegionMovedException; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Implementations call a RegionServer. + * Passed to a {@link RpcRetryingCaller} so we retry on fail. + * TODO: this class is actually tied to one region, because most of the paths make use of + * the regioninfo part of location when building requests. The only reason it works for + * multi-region requests (e.g. batch) is that they happen to not use the region parts. + * This could be done cleaner (e.g. having a generic parameter and 2 derived classes, + * RegionCallable and actual RegionServerCallable with ServerName. + * @param <T> the class that the ServerCallable handles + */ +@InterfaceAudience.Private +abstract class AbstractRegionServerCallable<T> implements RetryingCallableBase { + protected final Connection connection; + protected final TableName tableName; + protected final byte[] row; + protected HRegionLocation location; + + protected final static int MIN_WAIT_DEAD_SERVER = 10000; + + /** + * @param connection Connection to use. + * @param tableName Table name to which <code>row</code> belongs. + * @param row The row we want in <code>tableName</code>. + */ + public AbstractRegionServerCallable(Connection connection, TableName tableName, byte[] row) { + this.connection = connection; + this.tableName = tableName; + this.row = row; + } + + /** + * @return {@link ClusterConnection} instance used by this Callable. + */ + ClusterConnection getConnection() { + return (ClusterConnection) this.connection; + } + + protected HRegionLocation getLocation() { + return this.location; + } + + protected void setLocation(final HRegionLocation location) { + this.location = location; + } + + public TableName getTableName() { + return this.tableName; + } + + public byte [] getRow() { + return this.row; + } + + @Override + public void throwable(Throwable t, boolean retrying) { + if (t instanceof SocketTimeoutException || + t instanceof ConnectException || + t instanceof RetriesExhaustedException || + (location != null && getConnection().isDeadServer(location.getServerName()))) { + // if thrown these exceptions, we clear all the cache entries that + // map to that slow/dead server; otherwise, let cache miss and ask + // hbase:meta again to find the new location + if (this.location != null) { + getConnection().clearCaches(location.getServerName()); + } + } else if (t instanceof RegionMovedException) { + getConnection().updateCachedLocations(tableName, row, t, location); + } else if (t instanceof NotServingRegionException && !retrying) { + // Purge cache entries for this specific region from hbase:meta cache + // since we don't call connect(true) when number of retries is 1. + getConnection().deleteCachedRegionLocation(location); + } + } + + @Override + public String getExceptionMessageAdditionalDetail() { + return "row '" + Bytes.toString(row) + "' on table '" + tableName + "' at " + location; + } + + @Override + public long sleep(long pause, int tries) { + // Tries hasn't been bumped up yet so we use "tries + 1" to get right pause time + long sleep = ConnectionUtils.getPauseTime(pause, tries + 1); + if (sleep < MIN_WAIT_DEAD_SERVER + && (location == null || getConnection().isDeadServer(location.getServerName()))) { + sleep = ConnectionUtils.addJitter(MIN_WAIT_DEAD_SERVER, 0.10f); + } + return sleep; + } + + /** + * @return the HRegionInfo for the current region + */ + public HRegionInfo getHRegionInfo() { + if (this.location == null) { + return null; + } + return this.location.getRegionInfo(); + } + + /** + * Prepare for connection to the server hosting region with row from tablename. Does lookup + * to find region location and hosting server. + * @param reload Set this to true if connection should re-find the region + * @throws IOException e + */ + @Override + public void prepare(final boolean reload) throws IOException { + try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) { + this.location = regionLocator.getRegionLocation(row, reload); + } + if (this.location == null) { + throw new IOException("Failed to find location, tableName=" + tableName + + ", row=" + Bytes.toString(row) + ", reload=" + reload); + } + setClientByServiceName(this.location.getServerName()); + } + + /** + * Set the Rpc client for Client services + * @param serviceName to get client for + * @throws IOException When client could not be created + */ + abstract void setClientByServiceName(ServerName serviceName) throws IOException; +} http://git-wip-us.apache.org/repos/asf/hbase/blob/56358a0f/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FastFailInterceptorContext.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FastFailInterceptorContext.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FastFailInterceptorContext.java index 3cbdfb3..c9d2324 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FastFailInterceptorContext.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FastFailInterceptorContext.java @@ -118,12 +118,11 @@ class FastFailInterceptorContext extends tries = 0; } - public FastFailInterceptorContext prepare(RetryingCallable<?> callable) { + public FastFailInterceptorContext prepare(RetryingCallableBase callable) { return prepare(callable, 0); } - public FastFailInterceptorContext prepare(RetryingCallable<?> callable, - int tries) { + public FastFailInterceptorContext prepare(RetryingCallableBase callable, int tries) { if (callable instanceof RegionServerCallable) { RegionServerCallable<?> retryingCallable = (RegionServerCallable<?>) callable; server = retryingCallable.getLocation().getServerName(); http://git-wip-us.apache.org/repos/asf/hbase/blob/56358a0f/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryingInterceptorContext.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryingInterceptorContext.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryingInterceptorContext.java index 1ccf43c..f8542bd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryingInterceptorContext.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryingInterceptorContext.java @@ -29,14 +29,14 @@ class NoOpRetryingInterceptorContext extends RetryingCallerInterceptorContext { @Override public RetryingCallerInterceptorContext prepare( - RetryingCallable<?> callable) { + RetryingCallableBase callable) { // Do Nothing return this; } @Override public RetryingCallerInterceptorContext prepare( - RetryingCallable<?> callable, int tries) { + RetryingCallableBase callable, int tries) { // Do Nothing return this; } http://git-wip-us.apache.org/repos/asf/hbase/blob/56358a0f/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java index 6e0752b..d878bae 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java @@ -21,15 +21,10 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.TableNotEnabledException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; -import org.apache.hadoop.hbase.util.Bytes; /** * Implementations call a RegionServer and implement {@link #call(int)}. @@ -42,16 +37,10 @@ import org.apache.hadoop.hbase.util.Bytes; * @param <T> the class that the ServerCallable handles */ @InterfaceAudience.Private -public abstract class RegionServerCallable<T> implements RetryingCallable<T> { - // Public because used outside of this package over in ipc. - private static final Log LOG = LogFactory.getLog(RegionServerCallable.class); - protected final Connection connection; - protected final TableName tableName; - protected final byte[] row; - protected HRegionLocation location; - private ClientService.BlockingInterface stub; +public abstract class RegionServerCallable<T> extends AbstractRegionServerCallable<T> implements + RetryingCallable<T> { - protected final static int MIN_WAIT_DEAD_SERVER = 10000; + private ClientService.BlockingInterface stub; /** * @param connection Connection to use. @@ -59,97 +48,25 @@ public abstract class RegionServerCallable<T> implements RetryingCallable<T> { * @param row The row we want in <code>tableName</code>. */ public RegionServerCallable(Connection connection, TableName tableName, byte [] row) { - this.connection = connection; - this.tableName = tableName; - this.row = row; + super(connection, tableName, row); } - /** - * Prepare for connection to the server hosting region with row from tablename. Does lookup - * to find region location and hosting server. - * @param reload Set to true to re-check the table state - * @throws IOException e - */ - @Override - public void prepare(final boolean reload) throws IOException { - // check table state if this is a retry - if (reload && - !tableName.equals(TableName.META_TABLE_NAME) && - getConnection().isTableDisabled(tableName)) { - throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled."); - } - try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) { - this.location = regionLocator.getRegionLocation(row); - } - if (this.location == null) { - throw new IOException("Failed to find location, tableName=" + tableName + - ", row=" + Bytes.toString(row) + ", reload=" + reload); - } - setStub(getConnection().getClient(this.location.getServerName())); + void setClientByServiceName(ServerName service) throws IOException { + this.setStub(getConnection().getClient(service)); } /** - * @return {@link HConnection} instance used by this Callable. + * @return Client Rpc protobuf communication stub */ - HConnection getConnection() { - return (HConnection) this.connection; - } - protected ClientService.BlockingInterface getStub() { return this.stub; } - void setStub(final ClientService.BlockingInterface stub) { - this.stub = stub; - } - - protected HRegionLocation getLocation() { - return this.location; - } - - protected void setLocation(final HRegionLocation location) { - this.location = location; - } - - public TableName getTableName() { - return this.tableName; - } - - public byte [] getRow() { - return this.row; - } - - @Override - public void throwable(Throwable t, boolean retrying) { - if (location != null) { - getConnection().updateCachedLocations(tableName, location.getRegionInfo().getRegionName(), - row, t, location.getServerName()); - } - } - - @Override - public String getExceptionMessageAdditionalDetail() { - return "row '" + Bytes.toString(row) + "' on table '" + tableName + "' at " + location; - } - - @Override - public long sleep(long pause, int tries) { - // Tries hasn't been bumped up yet so we use "tries + 1" to get right pause time - long sleep = ConnectionUtils.getPauseTime(pause, tries + 1); - if (sleep < MIN_WAIT_DEAD_SERVER - && (location == null || getConnection().isDeadServer(location.getServerName()))) { - sleep = ConnectionUtils.addJitter(MIN_WAIT_DEAD_SERVER, 0.10f); - } - return sleep; - } - /** - * @return the HRegionInfo for the current region + * Set the client protobuf communication stub + * @param stub to set */ - public HRegionInfo getHRegionInfo() { - if (this.location == null) { - return null; - } - return this.location.getRegionInfo(); + void setStub(final ClientService.BlockingInterface stub) { + this.stub = stub; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/56358a0f/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java index 7a32175..2377a0d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java @@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.client; -import java.io.IOException; - import org.apache.hadoop.hbase.classification.InterfaceAudience; /** @@ -29,23 +27,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; * @param <T> result class from executing <tt>this</tt> */ @InterfaceAudience.Private -public interface RetryingCallable<T> { - /** - * Prepare by setting up any connections to servers, etc., ahead of {@link #call(int)} invocation. - * @param reload Set this to true if need to requery locations - * @throws IOException e - */ - void prepare(final boolean reload) throws IOException; - - /** - * Called when {@link #call(int)} throws an exception and we are going to retry; take action to - * make it so we succeed on next call (clear caches, do relookup of locations, etc.). - * @param t - * @param retrying True if we are in retrying mode (we are not in retrying mode when max - * retries == 1; we ARE in retrying mode if retries > 1 even when we are the last attempt) - */ - void throwable(final Throwable t, boolean retrying); - +public interface RetryingCallable<T> extends RetryingCallableBase { /** * Computes a result, or throws an exception if unable to do so. * @@ -54,18 +36,4 @@ public interface RetryingCallable<T> { * @throws Exception if unable to compute a result */ T call(int callTimeout) throws Exception; - - /** - * @return Some details from the implementation that we would like to add to a terminating - * exception; i.e. a fatal exception is being thrown ending retries and we might like to add - * more implementation-specific detail on to the exception being thrown. - */ - String getExceptionMessageAdditionalDetail(); - - /** - * @param pause - * @param tries - * @return Suggestion on how much to sleep between retries - */ - long sleep(final long pause, final int tries); } http://git-wip-us.apache.org/repos/asf/hbase/blob/56358a0f/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallableBase.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallableBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallableBase.java new file mode 100644 index 0000000..483f6c2 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallableBase.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * All generic methods for a Callable that can be retried. It is extended with Sync and + * Async versions. + */ +@InterfaceAudience.Private +public interface RetryingCallableBase { + /** + * Prepare by setting up any connections to servers, etc., ahead of call invocation. + * @param reload Set this to true if need to requery locations + * @throws IOException e + */ + void prepare(final boolean reload) throws IOException; + + /** + * Called when call throws an exception and we are going to retry; take action to + * make it so we succeed on next call (clear caches, do relookup of locations, etc.). + * @param t throwable which was thrown + * @param retrying True if we are in retrying mode (we are not in retrying mode when max + * retries == 1; we ARE in retrying mode if retries > 1 even when we are the + * last attempt) + */ + void throwable(final Throwable t, boolean retrying); + + /** + * @return Some details from the implementation that we would like to add to a terminating + * exception; i.e. a fatal exception is being thrown ending retries and we might like to + * add more implementation-specific detail on to the exception being thrown. + */ + String getExceptionMessageAdditionalDetail(); + + /** + * @param pause time to pause + * @param tries amount of tries until till sleep + * @return Suggestion on how much to sleep between retries + */ + long sleep(final long pause, final int tries); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/56358a0f/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorContext.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorContext.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorContext.java index a9f414f..b0ba9f5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorContext.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorContext.java @@ -50,7 +50,7 @@ abstract class RetryingCallerInterceptorContext { * used for use in the current retrying call */ public abstract RetryingCallerInterceptorContext prepare( - RetryingCallable<?> callable); + RetryingCallableBase callable); /** * Telescopic extension that takes which of the many retries we are currently @@ -65,5 +65,5 @@ abstract class RetryingCallerInterceptorContext { * retrying call */ public abstract RetryingCallerInterceptorContext prepare( - RetryingCallable<?> callable, int tries); + RetryingCallableBase callable, int tries); } http://git-wip-us.apache.org/repos/asf/hbase/blob/56358a0f/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java index 53eb824..60dc5e4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java @@ -17,275 +17,22 @@ */ package org.apache.hadoop.hbase.ipc; -import java.io.IOException; -import java.net.ConnectException; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.security.PrivilegedExceptionAction; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.TimeUnit; - -import javax.security.sasl.SaslException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.MetricsConnection; -import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos; -import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind; -import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; -import org.apache.hadoop.hbase.protobuf.generated.TracingProtos; -import org.apache.hadoop.hbase.security.AuthMethod; -import org.apache.hadoop.hbase.security.SaslClientHandler; -import org.apache.hadoop.hbase.security.SaslUtil; -import org.apache.hadoop.hbase.security.SecurityInfo; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.security.token.TokenSelector; -import org.apache.htrace.Span; -import org.apache.htrace.Trace; - import com.google.protobuf.Descriptors; import com.google.protobuf.Message; -import com.google.protobuf.RpcCallback; -import io.netty.bootstrap.Bootstrap; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufOutputStream; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.handler.codec.LengthFieldBasedFrameDecoder; -import io.netty.util.Timeout; -import io.netty.util.TimerTask; -import io.netty.util.concurrent.GenericFutureListener; +import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Promise; +import java.net.InetSocketAddress; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.MetricsConnection; + /** - * Netty RPC channel + * Interface for Async Rpc Channels */ @InterfaceAudience.Private -public class AsyncRpcChannel { - private static final Log LOG = LogFactory.getLog(AsyncRpcChannel.class.getName()); - - private static final int MAX_SASL_RETRIES = 5; - - protected final static Map<Kind, TokenSelector<? extends TokenIdentifier>> TOKEN_HANDDLERS - = new HashMap<>(); - - static { - TOKEN_HANDDLERS.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN, - new AuthenticationTokenSelector()); - } - - final AsyncRpcClient client; - - // Contains the channel to work with. - // Only exists when connected - private Channel channel; - - String name; - final User ticket; - final String serviceName; - final InetSocketAddress address; - - private int failureCounter = 0; - - boolean useSasl; - AuthMethod authMethod; - private int reloginMaxBackoff; - private Token<? extends TokenIdentifier> token; - private String serverPrincipal; - - // NOTE: closed and connected flags below are only changed when a lock on pendingCalls - private final Map<Integer, AsyncCall> pendingCalls = new HashMap<Integer, AsyncCall>(); - private boolean connected = false; - private boolean closed = false; - - private Timeout cleanupTimer; - - private final TimerTask timeoutTask = new TimerTask() { - @Override - public void run(Timeout timeout) throws Exception { - cleanupCalls(); - } - }; - - /** - * Constructor for netty RPC channel - * @param bootstrap to construct channel on - * @param client to connect with - * @param ticket of user which uses connection - * @param serviceName name of service to connect to - * @param address to connect to - */ - public AsyncRpcChannel(Bootstrap bootstrap, final AsyncRpcClient client, User ticket, - String serviceName, InetSocketAddress address) { - this.client = client; - - this.ticket = ticket; - this.serviceName = serviceName; - this.address = address; - - this.channel = connect(bootstrap).channel(); - - name = ("IPC Client (" + channel.hashCode() + ") to " + address.toString() - + ((ticket == null) ? " from unknown user" : (" from " + ticket.getName()))); - } - - /** - * Connect to channel - * @param bootstrap to connect to - * @return future of connection - */ - private ChannelFuture connect(final Bootstrap bootstrap) { - return bootstrap.remoteAddress(address).connect() - .addListener(new GenericFutureListener<ChannelFuture>() { - @Override - public void operationComplete(final ChannelFuture f) throws Exception { - if (!f.isSuccess()) { - retryOrClose(bootstrap, failureCounter++, client.failureSleep, f.cause()); - return; - } - channel = f.channel(); - - setupAuthorization(); - - ByteBuf b = channel.alloc().directBuffer(6); - createPreamble(b, authMethod); - channel.writeAndFlush(b).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); - if (useSasl) { - UserGroupInformation ticket = AsyncRpcChannel.this.ticket.getUGI(); - if (authMethod == AuthMethod.KERBEROS) { - if (ticket != null && ticket.getRealUser() != null) { - ticket = ticket.getRealUser(); - } - } - SaslClientHandler saslHandler; - if (ticket == null) { - throw new FatalConnectionException("ticket/user is null"); - } - final UserGroupInformation realTicket = ticket; - saslHandler = ticket.doAs(new PrivilegedExceptionAction<SaslClientHandler>() { - @Override - public SaslClientHandler run() throws IOException { - return getSaslHandler(realTicket, bootstrap); - } - }); - if (saslHandler != null) { - // Sasl connect is successful. Let's set up Sasl channel handler - channel.pipeline().addFirst(saslHandler); - } else { - // fall back to simple auth because server told us so. - authMethod = AuthMethod.SIMPLE; - useSasl = false; - } - } else { - startHBaseConnection(f.channel()); - } - } - }); - } - - /** - * Start HBase connection - * @param ch channel to start connection on - */ - private void startHBaseConnection(Channel ch) { - ch.pipeline().addLast("frameDecoder", - new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); - ch.pipeline().addLast(new AsyncServerResponseHandler(this)); - try { - writeChannelHeader(ch).addListener(new GenericFutureListener<ChannelFuture>() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - close(future.cause()); - return; - } - List<AsyncCall> callsToWrite; - synchronized (pendingCalls) { - connected = true; - callsToWrite = new ArrayList<AsyncCall>(pendingCalls.values()); - } - for (AsyncCall call : callsToWrite) { - writeRequest(call); - } - } - }); - } catch (IOException e) { - close(e); - } - } - - /** - * Get SASL handler - * @param bootstrap to reconnect to - * @return new SASL handler - * @throws java.io.IOException if handler failed to create - */ - private SaslClientHandler getSaslHandler(final UserGroupInformation realTicket, - final Bootstrap bootstrap) throws IOException { - return new SaslClientHandler(realTicket, authMethod, token, serverPrincipal, - client.fallbackAllowed, - client.conf.get("hbase.rpc.protection", - SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase()), - new SaslClientHandler.SaslExceptionHandler() { - @Override - public void handle(int retryCount, Random random, Throwable cause) { - try { - // Handle Sasl failure. Try to potentially get new credentials - handleSaslConnectionFailure(retryCount, cause, realTicket); - - retryOrClose(bootstrap, failureCounter++, random.nextInt(reloginMaxBackoff) + 1, - cause); - } catch (IOException | InterruptedException e) { - close(e); - } - } - }, new SaslClientHandler.SaslSuccessfulConnectHandler() { - @Override - public void onSuccess(Channel channel) { - startHBaseConnection(channel); - } - }); - } - - /** - * Retry to connect or close - * @param bootstrap to connect with - * @param failureCount failure count - * @param e exception of fail - */ - private void retryOrClose(final Bootstrap bootstrap, int failureCount, long timeout, - Throwable e) { - if (failureCount < client.maxRetries) { - client.newTimeout(new TimerTask() { - @Override - public void run(Timeout timeout) throws Exception { - connect(bootstrap); - } - }, timeout, TimeUnit.MILLISECONDS); - } else { - client.failedServers.addToFailedServers(address); - close(e); - } - } +public interface AsyncRpcChannel { /** * Calls method on channel @@ -294,439 +41,32 @@ public class AsyncRpcChannel { * @param request to send * @param responsePrototype to construct response with */ - public Promise<Message> callMethod(final Descriptors.MethodDescriptor method, + Promise<Message> callMethod(final Descriptors.MethodDescriptor method, final PayloadCarryingRpcController controller, final Message request, - final Message responsePrototype, MetricsConnection.CallStats callStats) { - final AsyncCall call = new AsyncCall(channel.eventLoop(), client.callIdCnt.getAndIncrement(), - method, request, controller, responsePrototype, callStats); - controller.notifyOnCancel(new RpcCallback<Object>() { - @Override - public void run(Object parameter) { - // TODO: do not need to call AsyncCall.setFailed? - synchronized (pendingCalls) { - pendingCalls.remove(call.id); - } - } - }); - // TODO: this should be handled by PayloadCarryingRpcController. - if (controller.isCanceled()) { - // To finish if the call was cancelled before we set the notification (race condition) - call.cancel(true); - return call; - } - - synchronized (pendingCalls) { - if (closed) { - Promise<Message> promise = channel.eventLoop().newPromise(); - promise.setFailure(new ConnectException()); - return promise; - } - pendingCalls.put(call.id, call); - // Add timeout for cleanup if none is present - if (cleanupTimer == null && call.getRpcTimeout() > 0) { - cleanupTimer = client.newTimeout(timeoutTask, call.getRpcTimeout(), TimeUnit.MILLISECONDS); - } - if (!connected) { - return call; - } - } - writeRequest(call); - return call; - } - - AsyncCall removePendingCall(int id) { - synchronized (pendingCalls) { - return pendingCalls.remove(id); - } - } + final Message responsePrototype, MetricsConnection.CallStats callStats); /** - * Write the channel header - * @param channel to write to - * @return future of write - * @throws java.io.IOException on failure to write + * Get the EventLoop on which this channel operated + * @return EventLoop */ - private ChannelFuture writeChannelHeader(Channel channel) throws IOException { - RPCProtos.ConnectionHeader.Builder headerBuilder = RPCProtos.ConnectionHeader.newBuilder() - .setServiceName(serviceName); - - RPCProtos.UserInformation userInfoPB = buildUserInfo(ticket.getUGI(), authMethod); - if (userInfoPB != null) { - headerBuilder.setUserInfo(userInfoPB); - } - - if (client.codec != null) { - headerBuilder.setCellBlockCodecClass(client.codec.getClass().getCanonicalName()); - } - if (client.compressor != null) { - headerBuilder.setCellBlockCompressorClass(client.compressor.getClass().getCanonicalName()); - } - - headerBuilder.setVersionInfo(ProtobufUtil.getVersionInfo()); - RPCProtos.ConnectionHeader header = headerBuilder.build(); - - int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header); - - ByteBuf b = channel.alloc().directBuffer(totalSize); - - b.writeInt(header.getSerializedSize()); - b.writeBytes(header.toByteArray()); - - return channel.writeAndFlush(b); - } - - /** - * Write request to channel - * @param call to write - */ - private void writeRequest(final AsyncCall call) { - try { - final RPCProtos.RequestHeader.Builder requestHeaderBuilder = RPCProtos.RequestHeader - .newBuilder(); - requestHeaderBuilder.setCallId(call.id).setMethodName(call.method.getName()) - .setRequestParam(call.param != null); - - if (Trace.isTracing()) { - Span s = Trace.currentSpan(); - requestHeaderBuilder.setTraceInfo(TracingProtos.RPCTInfo.newBuilder() - .setParentId(s.getSpanId()).setTraceId(s.getTraceId())); - } - - ByteBuffer cellBlock = client.buildCellBlock(call.controller.cellScanner()); - if (cellBlock != null) { - final RPCProtos.CellBlockMeta.Builder cellBlockBuilder = RPCProtos.CellBlockMeta - .newBuilder(); - cellBlockBuilder.setLength(cellBlock.limit()); - requestHeaderBuilder.setCellBlockMeta(cellBlockBuilder.build()); - } - // Only pass priority if there one. Let zero be same as no priority. - if (call.controller.getPriority() != PayloadCarryingRpcController.PRIORITY_UNSET) { - requestHeaderBuilder.setPriority(call.controller.getPriority()); - } - - RPCProtos.RequestHeader rh = requestHeaderBuilder.build(); - - int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(rh, call.param); - if (cellBlock != null) { - totalSize += cellBlock.remaining(); - } - - ByteBuf b = channel.alloc().directBuffer(4 + totalSize); - try (ByteBufOutputStream out = new ByteBufOutputStream(b)) { - call.callStats.setRequestSizeBytes(IPCUtil.write(out, rh, call.param, cellBlock)); - } - - channel.writeAndFlush(b).addListener(new CallWriteListener(this, call.id)); - } catch (IOException e) { - close(e); - } - } - - /** - * Set up server authorization - * @throws java.io.IOException if auth setup failed - */ - private void setupAuthorization() throws IOException { - SecurityInfo securityInfo = SecurityInfo.getInfo(serviceName); - this.useSasl = client.userProvider.isHBaseSecurityEnabled(); - - this.token = null; - if (useSasl && securityInfo != null) { - AuthenticationProtos.TokenIdentifier.Kind tokenKind = securityInfo.getTokenKind(); - if (tokenKind != null) { - TokenSelector<? extends TokenIdentifier> tokenSelector = TOKEN_HANDDLERS.get(tokenKind); - if (tokenSelector != null) { - token = tokenSelector.selectToken(new Text(client.clusterId), - ticket.getUGI().getTokens()); - } else if (LOG.isDebugEnabled()) { - LOG.debug("No token selector found for type " + tokenKind); - } - } - String serverKey = securityInfo.getServerPrincipal(); - if (serverKey == null) { - throw new IOException("Can't obtain server Kerberos config key from SecurityInfo"); - } - this.serverPrincipal = SecurityUtil.getServerPrincipal(client.conf.get(serverKey), - address.getAddress().getCanonicalHostName().toLowerCase()); - if (LOG.isDebugEnabled()) { - LOG.debug("RPC Server Kerberos principal name for service=" + serviceName + " is " - + serverPrincipal); - } - } - - if (!useSasl) { - authMethod = AuthMethod.SIMPLE; - } else if (token != null) { - authMethod = AuthMethod.DIGEST; - } else { - authMethod = AuthMethod.KERBEROS; - } - - if (LOG.isDebugEnabled()) { - LOG.debug( - "Use " + authMethod + " authentication for service " + serviceName + ", sasl=" + useSasl); - } - reloginMaxBackoff = client.conf.getInt("hbase.security.relogin.maxbackoff", 5000); - } - - /** - * Build the user information - * @param ugi User Group Information - * @param authMethod Authorization method - * @return UserInformation protobuf - */ - private RPCProtos.UserInformation buildUserInfo(UserGroupInformation ugi, AuthMethod authMethod) { - if (ugi == null || authMethod == AuthMethod.DIGEST) { - // Don't send user for token auth - return null; - } - RPCProtos.UserInformation.Builder userInfoPB = RPCProtos.UserInformation.newBuilder(); - if (authMethod == AuthMethod.KERBEROS) { - // Send effective user for Kerberos auth - userInfoPB.setEffectiveUser(ugi.getUserName()); - } else if (authMethod == AuthMethod.SIMPLE) { - // Send both effective user and real user for simple auth - userInfoPB.setEffectiveUser(ugi.getUserName()); - if (ugi.getRealUser() != null) { - userInfoPB.setRealUser(ugi.getRealUser().getUserName()); - } - } - return userInfoPB.build(); - } - - /** - * Create connection preamble - * @param byteBuf to write to - * @param authMethod to write - */ - private void createPreamble(ByteBuf byteBuf, AuthMethod authMethod) { - byteBuf.writeBytes(HConstants.RPC_HEADER); - byteBuf.writeByte(HConstants.RPC_CURRENT_VERSION); - byteBuf.writeByte(authMethod.code); - } - - private void close0(Throwable e) { - List<AsyncCall> toCleanup; - synchronized (pendingCalls) { - if (closed) { - return; - } - closed = true; - toCleanup = new ArrayList<AsyncCall>(pendingCalls.values()); - pendingCalls.clear(); - } - IOException closeException = null; - if (e != null) { - if (e instanceof IOException) { - closeException = (IOException) e; - } else { - closeException = new IOException(e); - } - } - // log the info - if (LOG.isDebugEnabled() && closeException != null) { - LOG.debug(name + ": closing ipc connection to " + address, closeException); - } - if (cleanupTimer != null) { - cleanupTimer.cancel(); - cleanupTimer = null; - } - for (AsyncCall call : toCleanup) { - call.setFailed(closeException != null ? closeException - : new ConnectionClosingException( - "Call id=" + call.id + " on server " + address + " aborted: connection is closing")); - } - channel.disconnect().addListener(ChannelFutureListener.CLOSE); - if (LOG.isDebugEnabled()) { - LOG.debug(name + ": closed"); - } - } + EventExecutor getEventExecutor(); /** * Close connection - * @param e exception on close + * @param cause of closure. */ - public void close(final Throwable e) { - client.removeConnection(this); - - // Move closing from the requesting thread to the channel thread - if (channel.eventLoop().inEventLoop()) { - close0(e); - } else { - channel.eventLoop().execute(new Runnable() { - @Override - public void run() { - close0(e); - } - }); - } - } - - /** - * Clean up calls. - */ - private void cleanupCalls() { - List<AsyncCall> toCleanup = new ArrayList<AsyncCall>(); - long currentTime = EnvironmentEdgeManager.currentTime(); - long nextCleanupTaskDelay = -1L; - synchronized (pendingCalls) { - for (Iterator<AsyncCall> iter = pendingCalls.values().iterator(); iter.hasNext();) { - AsyncCall call = iter.next(); - long timeout = call.getRpcTimeout(); - if (timeout > 0) { - if (currentTime - call.getStartTime() >= timeout) { - iter.remove(); - toCleanup.add(call); - } else { - if (nextCleanupTaskDelay < 0 || timeout < nextCleanupTaskDelay) { - nextCleanupTaskDelay = timeout; - } - } - } - } - if (nextCleanupTaskDelay > 0) { - cleanupTimer = client.newTimeout(timeoutTask, nextCleanupTaskDelay, TimeUnit.MILLISECONDS); - } else { - cleanupTimer = null; - } - } - for (AsyncCall call : toCleanup) { - call.setFailed(new CallTimeoutException("Call id=" + call.id + ", waitTime=" - + (currentTime - call.getStartTime()) + ", rpcTimeout=" + call.getRpcTimeout())); - } - } + void close(Throwable cause); /** * Check if the connection is alive + * * @return true if alive */ - public boolean isAlive() { - return channel.isOpen(); - } - - /** - * Check if user should authenticate over Kerberos - * @return true if should be authenticated over Kerberos - * @throws java.io.IOException on failure of check - */ - private synchronized boolean shouldAuthenticateOverKrb() throws IOException { - UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); - UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); - UserGroupInformation realUser = currentUser.getRealUser(); - return authMethod == AuthMethod.KERBEROS && loginUser != null && - // Make sure user logged in using Kerberos either keytab or TGT - loginUser.hasKerberosCredentials() && - // relogin only in case it is the login user (e.g. JT) - // or superuser (like oozie). - (loginUser.equals(currentUser) || loginUser.equals(realUser)); - } + boolean isAlive(); /** - * If multiple clients with the same principal try to connect to the same server at the same time, - * the server assumes a replay attack is in progress. This is a feature of kerberos. In order to - * work around this, what is done is that the client backs off randomly and tries to initiate the - * connection again. The other problem is to do with ticket expiry. To handle that, a relogin is - * attempted. - * <p> - * The retry logic is governed by the {@link #shouldAuthenticateOverKrb} method. In case when the - * user doesn't have valid credentials, we don't need to retry (from cache or ticket). In such - * cases, it is prudent to throw a runtime exception when we receive a SaslException from the - * underlying authentication implementation, so there is no retry from other high level (for eg, - * HCM or HBaseAdmin). - * </p> - * @param currRetries retry count - * @param ex exception describing fail - * @param user which is trying to connect - * @throws java.io.IOException if IO fail - * @throws InterruptedException if thread is interrupted + * Get the address on which this channel operates + * @return InetSocketAddress */ - private void handleSaslConnectionFailure(final int currRetries, final Throwable ex, - final UserGroupInformation user) throws IOException, InterruptedException { - user.doAs(new PrivilegedExceptionAction<Void>() { - @Override - public Void run() throws IOException, InterruptedException { - if (shouldAuthenticateOverKrb()) { - if (currRetries < MAX_SASL_RETRIES) { - LOG.debug("Exception encountered while connecting to the server : " + ex); - // try re-login - if (UserGroupInformation.isLoginKeytabBased()) { - UserGroupInformation.getLoginUser().reloginFromKeytab(); - } else { - UserGroupInformation.getLoginUser().reloginFromTicketCache(); - } - - // Should reconnect - return null; - } else { - String msg = "Couldn't setup connection for " - + UserGroupInformation.getLoginUser().getUserName() + " to " + serverPrincipal; - LOG.warn(msg); - throw (IOException) new IOException(msg).initCause(ex); - } - } else { - LOG.warn("Exception encountered while connecting to " + "the server : " + ex); - } - if (ex instanceof RemoteException) { - throw (RemoteException) ex; - } - if (ex instanceof SaslException) { - String msg = "SASL authentication failed." - + " The most likely cause is missing or invalid credentials." + " Consider 'kinit'."; - LOG.fatal(msg, ex); - throw new RuntimeException(msg, ex); - } - throw new IOException(ex); - } - }); - } - - public int getConnectionHashCode() { - return ConnectionId.hashCode(ticket, serviceName, address); - } - - @Override - public int hashCode() { - return getConnectionHashCode(); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof AsyncRpcChannel) { - AsyncRpcChannel channel = (AsyncRpcChannel) obj; - return channel.hashCode() == obj.hashCode(); - } - return false; - } - - @Override - public String toString() { - return this.address.toString() + "/" + this.serviceName + "/" + this.ticket; - } - - /** - * Listens to call writes and fails if write failed - */ - private static final class CallWriteListener implements ChannelFutureListener { - private final AsyncRpcChannel rpcChannel; - private final int id; - - public CallWriteListener(AsyncRpcChannel asyncRpcChannel, int id) { - this.rpcChannel = asyncRpcChannel; - this.id = id; - } - - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - AsyncCall call = rpcChannel.removePendingCall(id); - if (call != null) { - if (future.cause() instanceof IOException) { - call.setFailed((IOException) future.cause()); - } else { - call.setFailed(new IOException(future.cause())); - } - } - } - } - } + InetSocketAddress getAddress(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/56358a0f/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java new file mode 100644 index 0000000..7cc9e78 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java @@ -0,0 +1,743 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import com.google.protobuf.RpcCallback; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufOutputStream; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.EventLoop; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; +import io.netty.util.concurrent.GenericFutureListener; +import io.netty.util.concurrent.Promise; + +import java.io.IOException; +import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import javax.security.sasl.SaslException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.MetricsConnection; +import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos; +import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; +import org.apache.hadoop.hbase.protobuf.generated.TracingProtos; +import org.apache.hadoop.hbase.security.AuthMethod; +import org.apache.hadoop.hbase.security.SaslClientHandler; +import org.apache.hadoop.hbase.security.SaslUtil; +import org.apache.hadoop.hbase.security.SecurityInfo; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.security.token.TokenSelector; +import org.apache.htrace.Span; +import org.apache.htrace.Trace; + +/** + * Netty RPC channel + */ +@InterfaceAudience.Private +public class AsyncRpcChannelImpl implements AsyncRpcChannel { + private static final Log LOG = LogFactory.getLog(AsyncRpcChannelImpl.class.getName()); + + private static final int MAX_SASL_RETRIES = 5; + + protected final static Map<Kind, TokenSelector<? extends TokenIdentifier>> TOKEN_HANDDLERS + = new HashMap<>(); + + static { + TOKEN_HANDDLERS.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN, + new AuthenticationTokenSelector()); + } + + final AsyncRpcClient client; + + // Contains the channel to work with. + // Only exists when connected + private Channel channel; + + String name; + final User ticket; + final String serviceName; + final InetSocketAddress address; + + private int failureCounter = 0; + + boolean useSasl; + AuthMethod authMethod; + private int reloginMaxBackoff; + private Token<? extends TokenIdentifier> token; + private String serverPrincipal; + + // NOTE: closed and connected flags below are only changed when a lock on pendingCalls + private final Map<Integer, AsyncCall> pendingCalls = new HashMap<Integer, AsyncCall>(); + private boolean connected = false; + private boolean closed = false; + + private Timeout cleanupTimer; + + private final TimerTask timeoutTask = new TimerTask() { + @Override + public void run(Timeout timeout) throws Exception { + cleanupCalls(); + } + }; + + /** + * Constructor for netty RPC channel + * @param bootstrap to construct channel on + * @param client to connect with + * @param ticket of user which uses connection + * @param serviceName name of service to connect to + * @param address to connect to + */ + public AsyncRpcChannelImpl(Bootstrap bootstrap, final AsyncRpcClient client, User ticket, + String serviceName, InetSocketAddress address) { + this.client = client; + + this.ticket = ticket; + this.serviceName = serviceName; + this.address = address; + + this.channel = connect(bootstrap).channel(); + + name = ("IPC Client (" + channel.hashCode() + ") to " + address.toString() + + ((ticket == null) ? " from unknown user" : (" from " + ticket.getName()))); + } + + /** + * Connect to channel + * @param bootstrap to connect to + * @return future of connection + */ + private ChannelFuture connect(final Bootstrap bootstrap) { + return bootstrap.remoteAddress(address).connect() + .addListener(new GenericFutureListener<ChannelFuture>() { + @Override + public void operationComplete(final ChannelFuture f) throws Exception { + if (!f.isSuccess()) { + retryOrClose(bootstrap, failureCounter++, client.failureSleep, f.cause()); + return; + } + channel = f.channel(); + + setupAuthorization(); + + ByteBuf b = channel.alloc().directBuffer(6); + createPreamble(b, authMethod); + channel.writeAndFlush(b).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); + if (useSasl) { + UserGroupInformation ticket = AsyncRpcChannelImpl.this.ticket.getUGI(); + if (authMethod == AuthMethod.KERBEROS) { + if (ticket != null && ticket.getRealUser() != null) { + ticket = ticket.getRealUser(); + } + } + SaslClientHandler saslHandler; + if (ticket == null) { + throw new FatalConnectionException("ticket/user is null"); + } + final UserGroupInformation realTicket = ticket; + saslHandler = ticket.doAs(new PrivilegedExceptionAction<SaslClientHandler>() { + @Override + public SaslClientHandler run() throws IOException { + return getSaslHandler(realTicket, bootstrap); + } + }); + if (saslHandler != null) { + // Sasl connect is successful. Let's set up Sasl channel handler + channel.pipeline().addFirst(saslHandler); + } else { + // fall back to simple auth because server told us so. + authMethod = AuthMethod.SIMPLE; + useSasl = false; + } + } else { + startHBaseConnection(f.channel()); + } + } + }); + } + + /** + * Start HBase connection + * @param ch channel to start connection on + */ + private void startHBaseConnection(Channel ch) { + ch.pipeline().addLast("frameDecoder", + new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); + ch.pipeline().addLast(new AsyncServerResponseHandler(this)); + try { + writeChannelHeader(ch).addListener(new GenericFutureListener<ChannelFuture>() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + close(future.cause()); + return; + } + List<AsyncCall> callsToWrite; + synchronized (pendingCalls) { + connected = true; + callsToWrite = new ArrayList<AsyncCall>(pendingCalls.values()); + } + for (AsyncCall call : callsToWrite) { + writeRequest(call); + } + } + }); + } catch (IOException e) { + close(e); + } + } + + /** + * Get SASL handler + * @param bootstrap to reconnect to + * @return new SASL handler + * @throws java.io.IOException if handler failed to create + */ + private SaslClientHandler getSaslHandler(final UserGroupInformation realTicket, + final Bootstrap bootstrap) throws IOException { + return new SaslClientHandler(realTicket, authMethod, token, serverPrincipal, + client.fallbackAllowed, + client.conf.get("hbase.rpc.protection", + SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase()), + new SaslClientHandler.SaslExceptionHandler() { + @Override + public void handle(int retryCount, Random random, Throwable cause) { + try { + // Handle Sasl failure. Try to potentially get new credentials + handleSaslConnectionFailure(retryCount, cause, realTicket); + + retryOrClose(bootstrap, failureCounter++, random.nextInt(reloginMaxBackoff) + 1, + cause); + } catch (IOException | InterruptedException e) { + close(e); + } + } + }, new SaslClientHandler.SaslSuccessfulConnectHandler() { + @Override + public void onSuccess(Channel channel) { + startHBaseConnection(channel); + } + }); + } + + /** + * Retry to connect or close + * @param bootstrap to connect with + * @param failureCount failure count + * @param e exception of fail + */ + private void retryOrClose(final Bootstrap bootstrap, int failureCount, long timeout, + Throwable e) { + if (failureCount < client.maxRetries) { + client.newTimeout(new TimerTask() { + @Override + public void run(Timeout timeout) throws Exception { + connect(bootstrap); + } + }, timeout, TimeUnit.MILLISECONDS); + } else { + client.failedServers.addToFailedServers(address); + close(e); + } + } + + /** + * Calls method on channel + * @param method to call + * @param controller to run call with + * @param request to send + * @param responsePrototype to construct response with + */ + public Promise<Message> callMethod(final Descriptors.MethodDescriptor method, + final PayloadCarryingRpcController controller, final Message request, + final Message responsePrototype, MetricsConnection.CallStats callStats) { + final AsyncCall call = new AsyncCall(channel.eventLoop(), client.callIdCnt.getAndIncrement(), + method, request, controller, responsePrototype, callStats); + controller.notifyOnCancel(new RpcCallback<Object>() { + @Override + public void run(Object parameter) { + // TODO: do not need to call AsyncCall.setFailed? + synchronized (pendingCalls) { + pendingCalls.remove(call.id); + } + } + }); + // TODO: this should be handled by PayloadCarryingRpcController. + if (controller.isCanceled()) { + // To finish if the call was cancelled before we set the notification (race condition) + call.cancel(true); + return call; + } + + synchronized (pendingCalls) { + if (closed) { + Promise<Message> promise = channel.eventLoop().newPromise(); + promise.setFailure(new ConnectException()); + return promise; + } + pendingCalls.put(call.id, call); + // Add timeout for cleanup if none is present + if (cleanupTimer == null && call.getRpcTimeout() > 0) { + cleanupTimer = client.newTimeout(timeoutTask, call.getRpcTimeout(), TimeUnit.MILLISECONDS); + } + if (!connected) { + return call; + } + } + writeRequest(call); + return call; + } + + @Override + public EventLoop getEventExecutor() { + return this.channel.eventLoop(); + } + + AsyncCall removePendingCall(int id) { + synchronized (pendingCalls) { + return pendingCalls.remove(id); + } + } + + /** + * Write the channel header + * @param channel to write to + * @return future of write + * @throws java.io.IOException on failure to write + */ + private ChannelFuture writeChannelHeader(Channel channel) throws IOException { + RPCProtos.ConnectionHeader.Builder headerBuilder = RPCProtos.ConnectionHeader.newBuilder() + .setServiceName(serviceName); + + RPCProtos.UserInformation userInfoPB = buildUserInfo(ticket.getUGI(), authMethod); + if (userInfoPB != null) { + headerBuilder.setUserInfo(userInfoPB); + } + + if (client.codec != null) { + headerBuilder.setCellBlockCodecClass(client.codec.getClass().getCanonicalName()); + } + if (client.compressor != null) { + headerBuilder.setCellBlockCompressorClass(client.compressor.getClass().getCanonicalName()); + } + + headerBuilder.setVersionInfo(ProtobufUtil.getVersionInfo()); + RPCProtos.ConnectionHeader header = headerBuilder.build(); + + int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header); + + ByteBuf b = channel.alloc().directBuffer(totalSize); + + b.writeInt(header.getSerializedSize()); + b.writeBytes(header.toByteArray()); + + return channel.writeAndFlush(b); + } + + /** + * Write request to channel + * @param call to write + */ + private void writeRequest(final AsyncCall call) { + try { + final RPCProtos.RequestHeader.Builder requestHeaderBuilder = RPCProtos.RequestHeader + .newBuilder(); + requestHeaderBuilder.setCallId(call.id).setMethodName(call.method.getName()) + .setRequestParam(call.param != null); + + if (Trace.isTracing()) { + Span s = Trace.currentSpan(); + requestHeaderBuilder.setTraceInfo(TracingProtos.RPCTInfo.newBuilder() + .setParentId(s.getSpanId()).setTraceId(s.getTraceId())); + } + + ByteBuffer cellBlock = client.buildCellBlock(call.controller.cellScanner()); + if (cellBlock != null) { + final RPCProtos.CellBlockMeta.Builder cellBlockBuilder = RPCProtos.CellBlockMeta + .newBuilder(); + cellBlockBuilder.setLength(cellBlock.limit()); + requestHeaderBuilder.setCellBlockMeta(cellBlockBuilder.build()); + } + // Only pass priority if there one. Let zero be same as no priority. + if (call.controller.getPriority() != PayloadCarryingRpcController.PRIORITY_UNSET) { + requestHeaderBuilder.setPriority(call.controller.getPriority()); + } + + RPCProtos.RequestHeader rh = requestHeaderBuilder.build(); + + int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(rh, call.param); + if (cellBlock != null) { + totalSize += cellBlock.remaining(); + } + + ByteBuf b = channel.alloc().directBuffer(4 + totalSize); + try (ByteBufOutputStream out = new ByteBufOutputStream(b)) { + call.callStats.setRequestSizeBytes(IPCUtil.write(out, rh, call.param, cellBlock)); + } + + channel.writeAndFlush(b).addListener(new CallWriteListener(this, call.id)); + } catch (IOException e) { + close(e); + } + } + + /** + * Set up server authorization + * @throws java.io.IOException if auth setup failed + */ + private void setupAuthorization() throws IOException { + SecurityInfo securityInfo = SecurityInfo.getInfo(serviceName); + this.useSasl = client.userProvider.isHBaseSecurityEnabled(); + + this.token = null; + if (useSasl && securityInfo != null) { + AuthenticationProtos.TokenIdentifier.Kind tokenKind = securityInfo.getTokenKind(); + if (tokenKind != null) { + TokenSelector<? extends TokenIdentifier> tokenSelector = TOKEN_HANDDLERS.get(tokenKind); + if (tokenSelector != null) { + token = tokenSelector.selectToken(new Text(client.clusterId), + ticket.getUGI().getTokens()); + } else if (LOG.isDebugEnabled()) { + LOG.debug("No token selector found for type " + tokenKind); + } + } + String serverKey = securityInfo.getServerPrincipal(); + if (serverKey == null) { + throw new IOException("Can't obtain server Kerberos config key from SecurityInfo"); + } + this.serverPrincipal = SecurityUtil.getServerPrincipal(client.conf.get(serverKey), + address.getAddress().getCanonicalHostName().toLowerCase()); + if (LOG.isDebugEnabled()) { + LOG.debug("RPC Server Kerberos principal name for service=" + serviceName + " is " + + serverPrincipal); + } + } + + if (!useSasl) { + authMethod = AuthMethod.SIMPLE; + } else if (token != null) { + authMethod = AuthMethod.DIGEST; + } else { + authMethod = AuthMethod.KERBEROS; + } + + if (LOG.isDebugEnabled()) { + LOG.debug( + "Use " + authMethod + " authentication for service " + serviceName + ", sasl=" + useSasl); + } + reloginMaxBackoff = client.conf.getInt("hbase.security.relogin.maxbackoff", 5000); + } + + /** + * Build the user information + * @param ugi User Group Information + * @param authMethod Authorization method + * @return UserInformation protobuf + */ + private RPCProtos.UserInformation buildUserInfo(UserGroupInformation ugi, AuthMethod authMethod) { + if (ugi == null || authMethod == AuthMethod.DIGEST) { + // Don't send user for token auth + return null; + } + RPCProtos.UserInformation.Builder userInfoPB = RPCProtos.UserInformation.newBuilder(); + if (authMethod == AuthMethod.KERBEROS) { + // Send effective user for Kerberos auth + userInfoPB.setEffectiveUser(ugi.getUserName()); + } else if (authMethod == AuthMethod.SIMPLE) { + // Send both effective user and real user for simple auth + userInfoPB.setEffectiveUser(ugi.getUserName()); + if (ugi.getRealUser() != null) { + userInfoPB.setRealUser(ugi.getRealUser().getUserName()); + } + } + return userInfoPB.build(); + } + + /** + * Create connection preamble + * @param byteBuf to write to + * @param authMethod to write + */ + private void createPreamble(ByteBuf byteBuf, AuthMethod authMethod) { + byteBuf.writeBytes(HConstants.RPC_HEADER); + byteBuf.writeByte(HConstants.RPC_CURRENT_VERSION); + byteBuf.writeByte(authMethod.code); + } + + private void close0(Throwable e) { + List<AsyncCall> toCleanup; + synchronized (pendingCalls) { + if (closed) { + return; + } + closed = true; + toCleanup = new ArrayList<AsyncCall>(pendingCalls.values()); + pendingCalls.clear(); + } + IOException closeException = null; + if (e != null) { + if (e instanceof IOException) { + closeException = (IOException) e; + } else { + closeException = new IOException(e); + } + } + // log the info + if (LOG.isDebugEnabled() && closeException != null) { + LOG.debug(name + ": closing ipc connection to " + address, closeException); + } + if (cleanupTimer != null) { + cleanupTimer.cancel(); + cleanupTimer = null; + } + for (AsyncCall call : toCleanup) { + call.setFailed(closeException != null ? closeException + : new ConnectionClosingException( + "Call id=" + call.id + " on server " + address + " aborted: connection is closing")); + } + channel.disconnect().addListener(ChannelFutureListener.CLOSE); + if (LOG.isDebugEnabled()) { + LOG.debug(name + ": closed"); + } + } + + /** + * Close connection + * @param e exception on close + */ + public void close(final Throwable e) { + client.removeConnection(this); + + // Move closing from the requesting thread to the channel thread + if (channel.eventLoop().inEventLoop()) { + close0(e); + } else { + channel.eventLoop().execute(new Runnable() { + @Override + public void run() { + close0(e); + } + }); + } + } + + /** + * Clean up calls. + */ + private void cleanupCalls() { + List<AsyncCall> toCleanup = new ArrayList<AsyncCall>(); + long currentTime = EnvironmentEdgeManager.currentTime(); + long nextCleanupTaskDelay = -1L; + synchronized (pendingCalls) { + for (Iterator<AsyncCall> iter = pendingCalls.values().iterator(); iter.hasNext();) { + AsyncCall call = iter.next(); + long timeout = call.getRpcTimeout(); + if (timeout > 0) { + if (currentTime - call.getStartTime() >= timeout) { + iter.remove(); + toCleanup.add(call); + } else { + if (nextCleanupTaskDelay < 0 || timeout < nextCleanupTaskDelay) { + nextCleanupTaskDelay = timeout; + } + } + } + } + if (nextCleanupTaskDelay > 0) { + cleanupTimer = client.newTimeout(timeoutTask, nextCleanupTaskDelay, TimeUnit.MILLISECONDS); + } else { + cleanupTimer = null; + } + } + for (AsyncCall call : toCleanup) { + call.setFailed(new CallTimeoutException("Call id=" + call.id + ", waitTime=" + + (currentTime - call.getStartTime()) + ", rpcTimeout=" + call.getRpcTimeout())); + } + } + + /** + * Check if the connection is alive + * @return true if alive + */ + public boolean isAlive() { + return channel.isOpen(); + } + + @Override + public InetSocketAddress getAddress() { + return this.address; + } + + /** + * Check if user should authenticate over Kerberos + * @return true if should be authenticated over Kerberos + * @throws java.io.IOException on failure of check + */ + private synchronized boolean shouldAuthenticateOverKrb() throws IOException { + UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); + UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); + UserGroupInformation realUser = currentUser.getRealUser(); + return authMethod == AuthMethod.KERBEROS && loginUser != null && + // Make sure user logged in using Kerberos either keytab or TGT + loginUser.hasKerberosCredentials() && + // relogin only in case it is the login user (e.g. JT) + // or superuser (like oozie). + (loginUser.equals(currentUser) || loginUser.equals(realUser)); + } + + /** + * If multiple clients with the same principal try to connect to the same server at the same time, + * the server assumes a replay attack is in progress. This is a feature of kerberos. In order to + * work around this, what is done is that the client backs off randomly and tries to initiate the + * connection again. The other problem is to do with ticket expiry. To handle that, a relogin is + * attempted. + * <p> + * The retry logic is governed by the {@link #shouldAuthenticateOverKrb} method. In case when the + * user doesn't have valid credentials, we don't need to retry (from cache or ticket). In such + * cases, it is prudent to throw a runtime exception when we receive a SaslException from the + * underlying authentication implementation, so there is no retry from other high level (for eg, + * HCM or HBaseAdmin). + * </p> + * @param currRetries retry count + * @param ex exception describing fail + * @param user which is trying to connect + * @throws java.io.IOException if IO fail + * @throws InterruptedException if thread is interrupted + */ + private void handleSaslConnectionFailure(final int currRetries, final Throwable ex, + final UserGroupInformation user) throws IOException, InterruptedException { + user.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws IOException, InterruptedException { + if (shouldAuthenticateOverKrb()) { + if (currRetries < MAX_SASL_RETRIES) { + LOG.debug("Exception encountered while connecting to the server : " + ex); + // try re-login + if (UserGroupInformation.isLoginKeytabBased()) { + UserGroupInformation.getLoginUser().reloginFromKeytab(); + } else { + UserGroupInformation.getLoginUser().reloginFromTicketCache(); + } + + // Should reconnect + return null; + } else { + String msg = "Couldn't setup connection for " + + UserGroupInformation.getLoginUser().getUserName() + " to " + serverPrincipal; + LOG.warn(msg); + throw (IOException) new IOException(msg).initCause(ex); + } + } else { + LOG.warn("Exception encountered while connecting to " + "the server : " + ex); + } + if (ex instanceof RemoteException) { + throw (RemoteException) ex; + } + if (ex instanceof SaslException) { + String msg = "SASL authentication failed." + + " The most likely cause is missing or invalid credentials." + " Consider 'kinit'."; + LOG.fatal(msg, ex); + throw new RuntimeException(msg, ex); + } + throw new IOException(ex); + } + }); + } + + public int getConnectionHashCode() { + return ConnectionId.hashCode(ticket, serviceName, address); + } + + @Override + public int hashCode() { + return getConnectionHashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof AsyncRpcChannelImpl) { + AsyncRpcChannelImpl channel = (AsyncRpcChannelImpl) obj; + return channel.hashCode() == obj.hashCode(); + } + return false; + } + + @Override + public String toString() { + return this.address.toString() + "/" + this.serviceName + "/" + this.ticket; + } + + /** + * Listens to call writes and fails if write failed + */ + private static final class CallWriteListener implements ChannelFutureListener { + private final AsyncRpcChannelImpl rpcChannel; + private final int id; + + public CallWriteListener(AsyncRpcChannelImpl asyncRpcChannelImpl, int id) { + this.rpcChannel = asyncRpcChannelImpl; + this.id = id; + } + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + AsyncCall call = rpcChannel.removePendingCall(id); + if (call != null) { + if (future.cause() instanceof IOException) { + call.setFailed((IOException) future.cause()); + } else { + call.setFailed(new IOException(future.cause())); + } + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/56358a0f/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java index c2bd457..8d9a5b3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java @@ -388,11 +388,11 @@ public class AsyncRpcClient extends AbstractRpcClient { } rpcChannel = connections.get(hashCode); if (rpcChannel != null && !rpcChannel.isAlive()) { - LOG.debug("Removing dead channel from server="+rpcChannel.address.toString()); + LOG.debug("Removing dead channel from server="+rpcChannel.getAddress().toString()); connections.remove(hashCode); } if (rpcChannel == null) { - rpcChannel = new AsyncRpcChannel(this.bootstrap, this, ticket, serviceName, location); + rpcChannel = new AsyncRpcChannelImpl(this.bootstrap, this, ticket, serviceName, location); connections.put(hashCode, rpcChannel); } } @@ -415,8 +415,8 @@ public class AsyncRpcClient extends AbstractRpcClient { synchronized (connections) { for (AsyncRpcChannel rpcChannel : connections.values()) { if (rpcChannel.isAlive() && - rpcChannel.address.getPort() == sn.getPort() && - rpcChannel.address.getHostName().contentEquals(sn.getHostname())) { + rpcChannel.getAddress().getPort() == sn.getPort() && + rpcChannel.getAddress().getHostName().contentEquals(sn.getHostname())) { LOG.info("The server on " + sn.toString() + " is dead - stopping the connection " + rpcChannel.toString()); rpcChannel.close(null); http://git-wip-us.apache.org/repos/asf/hbase/blob/56358a0f/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java index e0c7586..a0928b1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java @@ -37,13 +37,13 @@ import io.netty.channel.SimpleChannelInboundHandler; */ @InterfaceAudience.Private public class AsyncServerResponseHandler extends SimpleChannelInboundHandler<ByteBuf> { - private final AsyncRpcChannel channel; + private final AsyncRpcChannelImpl channel; /** * Constructor * @param channel on which this response handler operates */ - public AsyncServerResponseHandler(AsyncRpcChannel channel) { + public AsyncServerResponseHandler(AsyncRpcChannelImpl channel) { this.channel = channel; } http://git-wip-us.apache.org/repos/asf/hbase/blob/56358a0f/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcChannel.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcChannel.java index b1d54a4..45c3700 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcChannel.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,68 +15,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.ipc; -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.protobuf.ResponseConverter; - import com.google.protobuf.BlockingRpcChannel; -import com.google.protobuf.Descriptors; -import com.google.protobuf.Message; -import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcChannel; -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; /** - * Base class which provides clients with an RPC connection to + * Base interface which provides clients with an RPC connection to * call coprocessor endpoint {@link com.google.protobuf.Service}s. * Note that clients should not use this class directly, except through - * {@link org.apache.hadoop.hbase.client.HTableInterface#coprocessorService(byte[])}. + * {@link org.apache.hadoop.hbase.client.Table#coprocessorService(byte[])}. */ @InterfaceAudience.Public @InterfaceStability.Evolving -public abstract class CoprocessorRpcChannel implements RpcChannel, BlockingRpcChannel { - private static final Log LOG = LogFactory.getLog(CoprocessorRpcChannel.class); - - @Override - @InterfaceAudience.Private - public void callMethod(Descriptors.MethodDescriptor method, - RpcController controller, - Message request, Message responsePrototype, - RpcCallback<Message> callback) { - Message response = null; - try { - response = callExecService(controller, method, request, responsePrototype); - } catch (IOException ioe) { - LOG.warn("Call failed on IOException", ioe); - ResponseConverter.setControllerException(controller, ioe); - } - if (callback != null) { - callback.run(response); - } - } - - @Override - @InterfaceAudience.Private - public Message callBlockingMethod(Descriptors.MethodDescriptor method, - RpcController controller, - Message request, Message responsePrototype) - throws ServiceException { - try { - return callExecService(controller, method, request, responsePrototype); - } catch (IOException ioe) { - throw new ServiceException("Error calling method "+method.getFullName(), ioe); - } - } - - protected abstract Message callExecService(RpcController controller, - Descriptors.MethodDescriptor method, Message request, Message responsePrototype) - throws IOException; -} +public interface CoprocessorRpcChannel extends RpcChannel, BlockingRpcChannel { +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/56358a0f/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java index ac6a022..68798ed 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java @@ -42,7 +42,7 @@ import com.google.protobuf.RpcController; * @see org.apache.hadoop.hbase.client.HBaseAdmin#coprocessorService() */ @InterfaceAudience.Private -public class MasterCoprocessorRpcChannel extends CoprocessorRpcChannel{ +public class MasterCoprocessorRpcChannel extends SyncCoprocessorRpcChannel { private static final Log LOG = LogFactory.getLog(MasterCoprocessorRpcChannel.class); private final ClusterConnection connection; http://git-wip-us.apache.org/repos/asf/hbase/blob/56358a0f/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java index 0f06af1..4d3a453 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java @@ -45,7 +45,7 @@ import com.google.protobuf.RpcController; * @see org.apache.hadoop.hbase.client.Table#coprocessorService(byte[]) */ @InterfaceAudience.Private -public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{ +public class RegionCoprocessorRpcChannel extends SyncCoprocessorRpcChannel { private static final Log LOG = LogFactory.getLog(RegionCoprocessorRpcChannel.class); private final ClusterConnection connection; @@ -57,6 +57,12 @@ public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{ private RpcRetryingCallerFactory rpcCallerFactory; private RpcControllerFactory rpcControllerFactory; + /** + * Constructor + * @param conn connection to use + * @param table to connect to + * @param row to locate region with + */ public RegionCoprocessorRpcChannel(ClusterConnection conn, TableName table, byte[] row) { this.connection = conn; this.table = table; @@ -114,6 +120,10 @@ public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{ return response; } + /** + * Get last region this RpcChannel communicated with + * @return region name as byte array + */ public byte[] getLastRegion() { return lastRegion; }