http://git-wip-us.apache.org/repos/asf/hbase-site/blob/4131cace/devapidocs/src-html/org/apache/hadoop/hbase/client/HTableWrapper.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/client/HTableWrapper.html b/devapidocs/src-html/org/apache/hadoop/hbase/client/HTableWrapper.html index 287448e..39dd03a 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/client/HTableWrapper.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/client/HTableWrapper.html @@ -302,7 +302,27 @@ <span class="sourceLineNo">294</span> CompareOp compareOp, byte[] value, RowMutations rm) throws IOException {<a name="line.294"></a> <span class="sourceLineNo">295</span> return table.checkAndMutate(row, family, qualifier, compareOp, value, rm);<a name="line.295"></a> <span class="sourceLineNo">296</span> }<a name="line.296"></a> -<span class="sourceLineNo">297</span>}<a name="line.297"></a> +<span class="sourceLineNo">297</span><a name="line.297"></a> +<span class="sourceLineNo">298</span> @Override<a name="line.298"></a> +<span class="sourceLineNo">299</span> public void setOperationTimeout(int operationTimeout) {<a name="line.299"></a> +<span class="sourceLineNo">300</span> table.setOperationTimeout(operationTimeout);<a name="line.300"></a> +<span class="sourceLineNo">301</span> }<a name="line.301"></a> +<span class="sourceLineNo">302</span><a name="line.302"></a> +<span class="sourceLineNo">303</span> @Override<a name="line.303"></a> +<span class="sourceLineNo">304</span> public int getOperationTimeout() {<a name="line.304"></a> +<span class="sourceLineNo">305</span> return table.getOperationTimeout();<a name="line.305"></a> +<span class="sourceLineNo">306</span> }<a name="line.306"></a> +<span class="sourceLineNo">307</span><a name="line.307"></a> +<span class="sourceLineNo">308</span> @Override<a name="line.308"></a> +<span class="sourceLineNo">309</span> public void setRpcTimeout(int rpcTimeout) {<a name="line.309"></a> +<span class="sourceLineNo">310</span> table.setRpcTimeout(rpcTimeout);<a name="line.310"></a> +<span class="sourceLineNo">311</span> }<a name="line.311"></a> +<span class="sourceLineNo">312</span><a name="line.312"></a> +<span class="sourceLineNo">313</span> @Override<a name="line.313"></a> +<span class="sourceLineNo">314</span> public int getRpcTimeout() {<a name="line.314"></a> +<span class="sourceLineNo">315</span> return table.getRpcTimeout();<a name="line.315"></a> +<span class="sourceLineNo">316</span> }<a name="line.316"></a> +<span class="sourceLineNo">317</span>}<a name="line.317"></a>
http://git-wip-us.apache.org/repos/asf/hbase-site/blob/4131cace/devapidocs/src-html/org/apache/hadoop/hbase/client/MetaCache.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/client/MetaCache.html b/devapidocs/src-html/org/apache/hadoop/hbase/client/MetaCache.html index 6ca5b01..a27949d 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/client/MetaCache.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/client/MetaCache.html @@ -62,7 +62,7 @@ <span class="sourceLineNo">054</span><a name="line.54"></a> <span class="sourceLineNo">055</span> // The presence of a server in the map implies it's likely that there is an<a name="line.55"></a> <span class="sourceLineNo">056</span> // entry in cachedRegionLocations that map to this server; but the absence<a name="line.56"></a> -<span class="sourceLineNo">057</span> // of a server in this map guarentees that there is no entry in cache that<a name="line.57"></a> +<span class="sourceLineNo">057</span> // of a server in this map guarantees that there is no entry in cache that<a name="line.57"></a> <span class="sourceLineNo">058</span> // maps to the absent server.<a name="line.58"></a> <span class="sourceLineNo">059</span> // The access to this attribute must be protected by a lock on cachedRegionLocations<a name="line.59"></a> <span class="sourceLineNo">060</span> private final Set<ServerName> cachedServers = new CopyOnWriteArraySet<>();<a name="line.60"></a> http://git-wip-us.apache.org/repos/asf/hbase-site/blob/4131cace/devapidocs/src-html/org/apache/hadoop/hbase/client/MetricsConnection.CallStats.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/client/MetricsConnection.CallStats.html b/devapidocs/src-html/org/apache/hadoop/hbase/client/MetricsConnection.CallStats.html index 054df6f..cdba01d 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/client/MetricsConnection.CallStats.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/client/MetricsConnection.CallStats.html @@ -291,8 +291,8 @@ <span class="sourceLineNo">283</span> @VisibleForTesting protected final CallTracker putTracker;<a name="line.283"></a> <span class="sourceLineNo">284</span> @VisibleForTesting protected final CallTracker multiTracker;<a name="line.284"></a> <span class="sourceLineNo">285</span> @VisibleForTesting protected final RunnerStats runnerStats;<a name="line.285"></a> -<span class="sourceLineNo">286</span> private final Counter metaCacheNumClearServer;<a name="line.286"></a> -<span class="sourceLineNo">287</span> private final Counter metaCacheNumClearRegion;<a name="line.287"></a> +<span class="sourceLineNo">286</span> @VisibleForTesting protected final Counter metaCacheNumClearServer;<a name="line.286"></a> +<span class="sourceLineNo">287</span> @VisibleForTesting protected final Counter metaCacheNumClearRegion;<a name="line.287"></a> <span class="sourceLineNo">288</span><a name="line.288"></a> <span class="sourceLineNo">289</span> // dynamic metrics<a name="line.289"></a> <span class="sourceLineNo">290</span><a name="line.290"></a> http://git-wip-us.apache.org/repos/asf/hbase-site/blob/4131cace/devapidocs/src-html/org/apache/hadoop/hbase/client/MetricsConnection.CallTracker.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/client/MetricsConnection.CallTracker.html b/devapidocs/src-html/org/apache/hadoop/hbase/client/MetricsConnection.CallTracker.html index 054df6f..cdba01d 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/client/MetricsConnection.CallTracker.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/client/MetricsConnection.CallTracker.html @@ -291,8 +291,8 @@ <span class="sourceLineNo">283</span> @VisibleForTesting protected final CallTracker putTracker;<a name="line.283"></a> <span class="sourceLineNo">284</span> @VisibleForTesting protected final CallTracker multiTracker;<a name="line.284"></a> <span class="sourceLineNo">285</span> @VisibleForTesting protected final RunnerStats runnerStats;<a name="line.285"></a> -<span class="sourceLineNo">286</span> private final Counter metaCacheNumClearServer;<a name="line.286"></a> -<span class="sourceLineNo">287</span> private final Counter metaCacheNumClearRegion;<a name="line.287"></a> +<span class="sourceLineNo">286</span> @VisibleForTesting protected final Counter metaCacheNumClearServer;<a name="line.286"></a> +<span class="sourceLineNo">287</span> @VisibleForTesting protected final Counter metaCacheNumClearRegion;<a name="line.287"></a> <span class="sourceLineNo">288</span><a name="line.288"></a> <span class="sourceLineNo">289</span> // dynamic metrics<a name="line.289"></a> <span class="sourceLineNo">290</span><a name="line.290"></a> http://git-wip-us.apache.org/repos/asf/hbase-site/blob/4131cace/devapidocs/src-html/org/apache/hadoop/hbase/client/MetricsConnection.NewMetric.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/client/MetricsConnection.NewMetric.html b/devapidocs/src-html/org/apache/hadoop/hbase/client/MetricsConnection.NewMetric.html index 054df6f..cdba01d 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/client/MetricsConnection.NewMetric.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/client/MetricsConnection.NewMetric.html @@ -291,8 +291,8 @@ <span class="sourceLineNo">283</span> @VisibleForTesting protected final CallTracker putTracker;<a name="line.283"></a> <span class="sourceLineNo">284</span> @VisibleForTesting protected final CallTracker multiTracker;<a name="line.284"></a> <span class="sourceLineNo">285</span> @VisibleForTesting protected final RunnerStats runnerStats;<a name="line.285"></a> -<span class="sourceLineNo">286</span> private final Counter metaCacheNumClearServer;<a name="line.286"></a> -<span class="sourceLineNo">287</span> private final Counter metaCacheNumClearRegion;<a name="line.287"></a> +<span class="sourceLineNo">286</span> @VisibleForTesting protected final Counter metaCacheNumClearServer;<a name="line.286"></a> +<span class="sourceLineNo">287</span> @VisibleForTesting protected final Counter metaCacheNumClearRegion;<a name="line.287"></a> <span class="sourceLineNo">288</span><a name="line.288"></a> <span class="sourceLineNo">289</span> // dynamic metrics<a name="line.289"></a> <span class="sourceLineNo">290</span><a name="line.290"></a> http://git-wip-us.apache.org/repos/asf/hbase-site/blob/4131cace/devapidocs/src-html/org/apache/hadoop/hbase/client/MetricsConnection.RegionStats.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/client/MetricsConnection.RegionStats.html b/devapidocs/src-html/org/apache/hadoop/hbase/client/MetricsConnection.RegionStats.html index 054df6f..cdba01d 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/client/MetricsConnection.RegionStats.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/client/MetricsConnection.RegionStats.html @@ -291,8 +291,8 @@ <span class="sourceLineNo">283</span> @VisibleForTesting protected final CallTracker putTracker;<a name="line.283"></a> <span class="sourceLineNo">284</span> @VisibleForTesting protected final CallTracker multiTracker;<a name="line.284"></a> <span class="sourceLineNo">285</span> @VisibleForTesting protected final RunnerStats runnerStats;<a name="line.285"></a> -<span class="sourceLineNo">286</span> private final Counter metaCacheNumClearServer;<a name="line.286"></a> -<span class="sourceLineNo">287</span> private final Counter metaCacheNumClearRegion;<a name="line.287"></a> +<span class="sourceLineNo">286</span> @VisibleForTesting protected final Counter metaCacheNumClearServer;<a name="line.286"></a> +<span class="sourceLineNo">287</span> @VisibleForTesting protected final Counter metaCacheNumClearRegion;<a name="line.287"></a> <span class="sourceLineNo">288</span><a name="line.288"></a> <span class="sourceLineNo">289</span> // dynamic metrics<a name="line.289"></a> <span class="sourceLineNo">290</span><a name="line.290"></a> http://git-wip-us.apache.org/repos/asf/hbase-site/blob/4131cace/devapidocs/src-html/org/apache/hadoop/hbase/client/MetricsConnection.RunnerStats.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/client/MetricsConnection.RunnerStats.html b/devapidocs/src-html/org/apache/hadoop/hbase/client/MetricsConnection.RunnerStats.html index 054df6f..cdba01d 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/client/MetricsConnection.RunnerStats.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/client/MetricsConnection.RunnerStats.html @@ -291,8 +291,8 @@ <span class="sourceLineNo">283</span> @VisibleForTesting protected final CallTracker putTracker;<a name="line.283"></a> <span class="sourceLineNo">284</span> @VisibleForTesting protected final CallTracker multiTracker;<a name="line.284"></a> <span class="sourceLineNo">285</span> @VisibleForTesting protected final RunnerStats runnerStats;<a name="line.285"></a> -<span class="sourceLineNo">286</span> private final Counter metaCacheNumClearServer;<a name="line.286"></a> -<span class="sourceLineNo">287</span> private final Counter metaCacheNumClearRegion;<a name="line.287"></a> +<span class="sourceLineNo">286</span> @VisibleForTesting protected final Counter metaCacheNumClearServer;<a name="line.286"></a> +<span class="sourceLineNo">287</span> @VisibleForTesting protected final Counter metaCacheNumClearRegion;<a name="line.287"></a> <span class="sourceLineNo">288</span><a name="line.288"></a> <span class="sourceLineNo">289</span> // dynamic metrics<a name="line.289"></a> <span class="sourceLineNo">290</span><a name="line.290"></a> http://git-wip-us.apache.org/repos/asf/hbase-site/blob/4131cace/devapidocs/src-html/org/apache/hadoop/hbase/client/MetricsConnection.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/client/MetricsConnection.html b/devapidocs/src-html/org/apache/hadoop/hbase/client/MetricsConnection.html index 054df6f..cdba01d 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/client/MetricsConnection.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/client/MetricsConnection.html @@ -291,8 +291,8 @@ <span class="sourceLineNo">283</span> @VisibleForTesting protected final CallTracker putTracker;<a name="line.283"></a> <span class="sourceLineNo">284</span> @VisibleForTesting protected final CallTracker multiTracker;<a name="line.284"></a> <span class="sourceLineNo">285</span> @VisibleForTesting protected final RunnerStats runnerStats;<a name="line.285"></a> -<span class="sourceLineNo">286</span> private final Counter metaCacheNumClearServer;<a name="line.286"></a> -<span class="sourceLineNo">287</span> private final Counter metaCacheNumClearRegion;<a name="line.287"></a> +<span class="sourceLineNo">286</span> @VisibleForTesting protected final Counter metaCacheNumClearServer;<a name="line.286"></a> +<span class="sourceLineNo">287</span> @VisibleForTesting protected final Counter metaCacheNumClearRegion;<a name="line.287"></a> <span class="sourceLineNo">288</span><a name="line.288"></a> <span class="sourceLineNo">289</span> // dynamic metrics<a name="line.289"></a> <span class="sourceLineNo">290</span><a name="line.290"></a> http://git-wip-us.apache.org/repos/asf/hbase-site/blob/4131cace/devapidocs/src-html/org/apache/hadoop/hbase/client/RegionServerCallable.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/client/RegionServerCallable.html b/devapidocs/src-html/org/apache/hadoop/hbase/client/RegionServerCallable.html index 40a7cc0..a00b70e 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/client/RegionServerCallable.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/client/RegionServerCallable.html @@ -34,126 +34,133 @@ <span class="sourceLineNo">026</span>import org.apache.hadoop.hbase.HRegionInfo;<a name="line.26"></a> <span class="sourceLineNo">027</span>import org.apache.hadoop.hbase.HRegionLocation;<a name="line.27"></a> <span class="sourceLineNo">028</span>import org.apache.hadoop.hbase.TableName;<a name="line.28"></a> -<span class="sourceLineNo">029</span>import org.apache.hadoop.hbase.classification.InterfaceAudience;<a name="line.29"></a> -<span class="sourceLineNo">030</span>import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;<a name="line.30"></a> -<span class="sourceLineNo">031</span>import org.apache.hadoop.hbase.util.Bytes;<a name="line.31"></a> -<span class="sourceLineNo">032</span><a name="line.32"></a> -<span class="sourceLineNo">033</span>/**<a name="line.33"></a> -<span class="sourceLineNo">034</span> * Implementations call a RegionServer and implement {@link #call(int)}.<a name="line.34"></a> -<span class="sourceLineNo">035</span> * Passed to a {@link RpcRetryingCaller} so we retry on fail.<a name="line.35"></a> -<span class="sourceLineNo">036</span> * TODO: this class is actually tied to one region, because most of the paths make use of<a name="line.36"></a> -<span class="sourceLineNo">037</span> * the regioninfo part of location when building requests. The only reason it works for<a name="line.37"></a> -<span class="sourceLineNo">038</span> * multi-region requests (e.g. batch) is that they happen to not use the region parts.<a name="line.38"></a> -<span class="sourceLineNo">039</span> * This could be done cleaner (e.g. having a generic parameter and 2 derived classes,<a name="line.39"></a> -<span class="sourceLineNo">040</span> * RegionCallable and actual RegionServerCallable with ServerName.<a name="line.40"></a> -<span class="sourceLineNo">041</span> * @param <T> the class that the ServerCallable handles<a name="line.41"></a> -<span class="sourceLineNo">042</span> */<a name="line.42"></a> -<span class="sourceLineNo">043</span>@InterfaceAudience.Private<a name="line.43"></a> -<span class="sourceLineNo">044</span>public abstract class RegionServerCallable<T> implements RetryingCallable<T> {<a name="line.44"></a> -<span class="sourceLineNo">045</span> // Public because used outside of this package over in ipc.<a name="line.45"></a> -<span class="sourceLineNo">046</span> private static final Log LOG = LogFactory.getLog(RegionServerCallable.class);<a name="line.46"></a> -<span class="sourceLineNo">047</span> protected final Connection connection;<a name="line.47"></a> -<span class="sourceLineNo">048</span> protected final TableName tableName;<a name="line.48"></a> -<span class="sourceLineNo">049</span> protected final byte[] row;<a name="line.49"></a> -<span class="sourceLineNo">050</span> protected HRegionLocation location;<a name="line.50"></a> -<span class="sourceLineNo">051</span> private ClientService.BlockingInterface stub;<a name="line.51"></a> -<span class="sourceLineNo">052</span><a name="line.52"></a> -<span class="sourceLineNo">053</span> protected final static int MIN_WAIT_DEAD_SERVER = 10000;<a name="line.53"></a> -<span class="sourceLineNo">054</span><a name="line.54"></a> -<span class="sourceLineNo">055</span> /**<a name="line.55"></a> -<span class="sourceLineNo">056</span> * @param connection Connection to use.<a name="line.56"></a> -<span class="sourceLineNo">057</span> * @param tableName Table name to which <code>row</code> belongs.<a name="line.57"></a> -<span class="sourceLineNo">058</span> * @param row The row we want in <code>tableName</code>.<a name="line.58"></a> -<span class="sourceLineNo">059</span> */<a name="line.59"></a> -<span class="sourceLineNo">060</span> public RegionServerCallable(Connection connection, TableName tableName, byte [] row) {<a name="line.60"></a> -<span class="sourceLineNo">061</span> this.connection = connection;<a name="line.61"></a> -<span class="sourceLineNo">062</span> this.tableName = tableName;<a name="line.62"></a> -<span class="sourceLineNo">063</span> this.row = row;<a name="line.63"></a> -<span class="sourceLineNo">064</span> }<a name="line.64"></a> -<span class="sourceLineNo">065</span><a name="line.65"></a> -<span class="sourceLineNo">066</span> /**<a name="line.66"></a> -<span class="sourceLineNo">067</span> * Prepare for connection to the server hosting region with row from tablename. Does lookup<a name="line.67"></a> -<span class="sourceLineNo">068</span> * to find region location and hosting server.<a name="line.68"></a> -<span class="sourceLineNo">069</span> * @param reload Set this to true if connection should re-find the region<a name="line.69"></a> -<span class="sourceLineNo">070</span> * @throws IOException e<a name="line.70"></a> -<span class="sourceLineNo">071</span> */<a name="line.71"></a> -<span class="sourceLineNo">072</span> @Override<a name="line.72"></a> -<span class="sourceLineNo">073</span> public void prepare(final boolean reload) throws IOException {<a name="line.73"></a> -<span class="sourceLineNo">074</span> try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) {<a name="line.74"></a> -<span class="sourceLineNo">075</span> this.location = regionLocator.getRegionLocation(row, reload);<a name="line.75"></a> -<span class="sourceLineNo">076</span> }<a name="line.76"></a> -<span class="sourceLineNo">077</span> if (this.location == null) {<a name="line.77"></a> -<span class="sourceLineNo">078</span> throw new IOException("Failed to find location, tableName=" + tableName +<a name="line.78"></a> -<span class="sourceLineNo">079</span> ", row=" + Bytes.toString(row) + ", reload=" + reload);<a name="line.79"></a> +<span class="sourceLineNo">029</span>import org.apache.hadoop.hbase.TableNotEnabledException;<a name="line.29"></a> +<span class="sourceLineNo">030</span>import org.apache.hadoop.hbase.classification.InterfaceAudience;<a name="line.30"></a> +<span class="sourceLineNo">031</span>import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;<a name="line.31"></a> +<span class="sourceLineNo">032</span>import org.apache.hadoop.hbase.util.Bytes;<a name="line.32"></a> +<span class="sourceLineNo">033</span><a name="line.33"></a> +<span class="sourceLineNo">034</span>/**<a name="line.34"></a> +<span class="sourceLineNo">035</span> * Implementations call a RegionServer and implement {@link #call(int)}.<a name="line.35"></a> +<span class="sourceLineNo">036</span> * Passed to a {@link RpcRetryingCaller} so we retry on fail.<a name="line.36"></a> +<span class="sourceLineNo">037</span> * TODO: this class is actually tied to one region, because most of the paths make use of<a name="line.37"></a> +<span class="sourceLineNo">038</span> * the regioninfo part of location when building requests. The only reason it works for<a name="line.38"></a> +<span class="sourceLineNo">039</span> * multi-region requests (e.g. batch) is that they happen to not use the region parts.<a name="line.39"></a> +<span class="sourceLineNo">040</span> * This could be done cleaner (e.g. having a generic parameter and 2 derived classes,<a name="line.40"></a> +<span class="sourceLineNo">041</span> * RegionCallable and actual RegionServerCallable with ServerName.<a name="line.41"></a> +<span class="sourceLineNo">042</span> * @param <T> the class that the ServerCallable handles<a name="line.42"></a> +<span class="sourceLineNo">043</span> */<a name="line.43"></a> +<span class="sourceLineNo">044</span>@InterfaceAudience.Private<a name="line.44"></a> +<span class="sourceLineNo">045</span>public abstract class RegionServerCallable<T> implements RetryingCallable<T> {<a name="line.45"></a> +<span class="sourceLineNo">046</span> // Public because used outside of this package over in ipc.<a name="line.46"></a> +<span class="sourceLineNo">047</span> private static final Log LOG = LogFactory.getLog(RegionServerCallable.class);<a name="line.47"></a> +<span class="sourceLineNo">048</span> protected final Connection connection;<a name="line.48"></a> +<span class="sourceLineNo">049</span> protected final TableName tableName;<a name="line.49"></a> +<span class="sourceLineNo">050</span> protected final byte[] row;<a name="line.50"></a> +<span class="sourceLineNo">051</span> protected HRegionLocation location;<a name="line.51"></a> +<span class="sourceLineNo">052</span> private ClientService.BlockingInterface stub;<a name="line.52"></a> +<span class="sourceLineNo">053</span><a name="line.53"></a> +<span class="sourceLineNo">054</span> protected final static int MIN_WAIT_DEAD_SERVER = 10000;<a name="line.54"></a> +<span class="sourceLineNo">055</span><a name="line.55"></a> +<span class="sourceLineNo">056</span> /**<a name="line.56"></a> +<span class="sourceLineNo">057</span> * @param connection Connection to use.<a name="line.57"></a> +<span class="sourceLineNo">058</span> * @param tableName Table name to which <code>row</code> belongs.<a name="line.58"></a> +<span class="sourceLineNo">059</span> * @param row The row we want in <code>tableName</code>.<a name="line.59"></a> +<span class="sourceLineNo">060</span> */<a name="line.60"></a> +<span class="sourceLineNo">061</span> public RegionServerCallable(Connection connection, TableName tableName, byte [] row) {<a name="line.61"></a> +<span class="sourceLineNo">062</span> this.connection = connection;<a name="line.62"></a> +<span class="sourceLineNo">063</span> this.tableName = tableName;<a name="line.63"></a> +<span class="sourceLineNo">064</span> this.row = row;<a name="line.64"></a> +<span class="sourceLineNo">065</span> }<a name="line.65"></a> +<span class="sourceLineNo">066</span><a name="line.66"></a> +<span class="sourceLineNo">067</span> /**<a name="line.67"></a> +<span class="sourceLineNo">068</span> * Prepare for connection to the server hosting region with row from tablename. Does lookup<a name="line.68"></a> +<span class="sourceLineNo">069</span> * to find region location and hosting server.<a name="line.69"></a> +<span class="sourceLineNo">070</span> * @param reload Set to true to re-check the table state<a name="line.70"></a> +<span class="sourceLineNo">071</span> * @throws IOException e<a name="line.71"></a> +<span class="sourceLineNo">072</span> */<a name="line.72"></a> +<span class="sourceLineNo">073</span> @Override<a name="line.73"></a> +<span class="sourceLineNo">074</span> public void prepare(final boolean reload) throws IOException {<a name="line.74"></a> +<span class="sourceLineNo">075</span> // check table state if this is a retry<a name="line.75"></a> +<span class="sourceLineNo">076</span> if (reload &&<a name="line.76"></a> +<span class="sourceLineNo">077</span> !tableName.equals(TableName.META_TABLE_NAME) &&<a name="line.77"></a> +<span class="sourceLineNo">078</span> getConnection().isTableDisabled(tableName)) {<a name="line.78"></a> +<span class="sourceLineNo">079</span> throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled.");<a name="line.79"></a> <span class="sourceLineNo">080</span> }<a name="line.80"></a> -<span class="sourceLineNo">081</span> setStub(getConnection().getClient(this.location.getServerName()));<a name="line.81"></a> -<span class="sourceLineNo">082</span> }<a name="line.82"></a> -<span class="sourceLineNo">083</span><a name="line.83"></a> -<span class="sourceLineNo">084</span> /**<a name="line.84"></a> -<span class="sourceLineNo">085</span> * @return {@link HConnection} instance used by this Callable.<a name="line.85"></a> -<span class="sourceLineNo">086</span> */<a name="line.86"></a> -<span class="sourceLineNo">087</span> HConnection getConnection() {<a name="line.87"></a> -<span class="sourceLineNo">088</span> return (HConnection) this.connection;<a name="line.88"></a> +<span class="sourceLineNo">081</span> try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) {<a name="line.81"></a> +<span class="sourceLineNo">082</span> this.location = regionLocator.getRegionLocation(row);<a name="line.82"></a> +<span class="sourceLineNo">083</span> }<a name="line.83"></a> +<span class="sourceLineNo">084</span> if (this.location == null) {<a name="line.84"></a> +<span class="sourceLineNo">085</span> throw new IOException("Failed to find location, tableName=" + tableName +<a name="line.85"></a> +<span class="sourceLineNo">086</span> ", row=" + Bytes.toString(row) + ", reload=" + reload);<a name="line.86"></a> +<span class="sourceLineNo">087</span> }<a name="line.87"></a> +<span class="sourceLineNo">088</span> setStub(getConnection().getClient(this.location.getServerName()));<a name="line.88"></a> <span class="sourceLineNo">089</span> }<a name="line.89"></a> <span class="sourceLineNo">090</span><a name="line.90"></a> -<span class="sourceLineNo">091</span> protected ClientService.BlockingInterface getStub() {<a name="line.91"></a> -<span class="sourceLineNo">092</span> return this.stub;<a name="line.92"></a> -<span class="sourceLineNo">093</span> }<a name="line.93"></a> -<span class="sourceLineNo">094</span><a name="line.94"></a> -<span class="sourceLineNo">095</span> void setStub(final ClientService.BlockingInterface stub) {<a name="line.95"></a> -<span class="sourceLineNo">096</span> this.stub = stub;<a name="line.96"></a> -<span class="sourceLineNo">097</span> }<a name="line.97"></a> -<span class="sourceLineNo">098</span><a name="line.98"></a> -<span class="sourceLineNo">099</span> protected HRegionLocation getLocation() {<a name="line.99"></a> -<span class="sourceLineNo">100</span> return this.location;<a name="line.100"></a> -<span class="sourceLineNo">101</span> }<a name="line.101"></a> -<span class="sourceLineNo">102</span><a name="line.102"></a> -<span class="sourceLineNo">103</span> protected void setLocation(final HRegionLocation location) {<a name="line.103"></a> -<span class="sourceLineNo">104</span> this.location = location;<a name="line.104"></a> -<span class="sourceLineNo">105</span> }<a name="line.105"></a> -<span class="sourceLineNo">106</span><a name="line.106"></a> -<span class="sourceLineNo">107</span> public TableName getTableName() {<a name="line.107"></a> -<span class="sourceLineNo">108</span> return this.tableName;<a name="line.108"></a> -<span class="sourceLineNo">109</span> }<a name="line.109"></a> -<span class="sourceLineNo">110</span><a name="line.110"></a> -<span class="sourceLineNo">111</span> public byte [] getRow() {<a name="line.111"></a> -<span class="sourceLineNo">112</span> return this.row;<a name="line.112"></a> -<span class="sourceLineNo">113</span> }<a name="line.113"></a> -<span class="sourceLineNo">114</span><a name="line.114"></a> -<span class="sourceLineNo">115</span> @Override<a name="line.115"></a> -<span class="sourceLineNo">116</span> public void throwable(Throwable t, boolean retrying) {<a name="line.116"></a> -<span class="sourceLineNo">117</span> if (location != null) {<a name="line.117"></a> -<span class="sourceLineNo">118</span> getConnection().updateCachedLocations(tableName, location.getRegionInfo().getRegionName(),<a name="line.118"></a> -<span class="sourceLineNo">119</span> row, t, location.getServerName());<a name="line.119"></a> -<span class="sourceLineNo">120</span> }<a name="line.120"></a> -<span class="sourceLineNo">121</span> }<a name="line.121"></a> -<span class="sourceLineNo">122</span><a name="line.122"></a> -<span class="sourceLineNo">123</span> @Override<a name="line.123"></a> -<span class="sourceLineNo">124</span> public String getExceptionMessageAdditionalDetail() {<a name="line.124"></a> -<span class="sourceLineNo">125</span> return "row '" + Bytes.toString(row) + "' on table '" + tableName + "' at " + location;<a name="line.125"></a> -<span class="sourceLineNo">126</span> }<a name="line.126"></a> -<span class="sourceLineNo">127</span><a name="line.127"></a> -<span class="sourceLineNo">128</span> @Override<a name="line.128"></a> -<span class="sourceLineNo">129</span> public long sleep(long pause, int tries) {<a name="line.129"></a> -<span class="sourceLineNo">130</span> // Tries hasn't been bumped up yet so we use "tries + 1" to get right pause time<a name="line.130"></a> -<span class="sourceLineNo">131</span> long sleep = ConnectionUtils.getPauseTime(pause, tries + 1);<a name="line.131"></a> -<span class="sourceLineNo">132</span> if (sleep < MIN_WAIT_DEAD_SERVER<a name="line.132"></a> -<span class="sourceLineNo">133</span> && (location == null || getConnection().isDeadServer(location.getServerName()))) {<a name="line.133"></a> -<span class="sourceLineNo">134</span> sleep = ConnectionUtils.addJitter(MIN_WAIT_DEAD_SERVER, 0.10f);<a name="line.134"></a> -<span class="sourceLineNo">135</span> }<a name="line.135"></a> -<span class="sourceLineNo">136</span> return sleep;<a name="line.136"></a> -<span class="sourceLineNo">137</span> }<a name="line.137"></a> -<span class="sourceLineNo">138</span><a name="line.138"></a> -<span class="sourceLineNo">139</span> /**<a name="line.139"></a> -<span class="sourceLineNo">140</span> * @return the HRegionInfo for the current region<a name="line.140"></a> -<span class="sourceLineNo">141</span> */<a name="line.141"></a> -<span class="sourceLineNo">142</span> public HRegionInfo getHRegionInfo() {<a name="line.142"></a> -<span class="sourceLineNo">143</span> if (this.location == null) {<a name="line.143"></a> -<span class="sourceLineNo">144</span> return null;<a name="line.144"></a> -<span class="sourceLineNo">145</span> }<a name="line.145"></a> -<span class="sourceLineNo">146</span> return this.location.getRegionInfo();<a name="line.146"></a> -<span class="sourceLineNo">147</span> }<a name="line.147"></a> -<span class="sourceLineNo">148</span>}<a name="line.148"></a> +<span class="sourceLineNo">091</span> /**<a name="line.91"></a> +<span class="sourceLineNo">092</span> * @return {@link HConnection} instance used by this Callable.<a name="line.92"></a> +<span class="sourceLineNo">093</span> */<a name="line.93"></a> +<span class="sourceLineNo">094</span> HConnection getConnection() {<a name="line.94"></a> +<span class="sourceLineNo">095</span> return (HConnection) this.connection;<a name="line.95"></a> +<span class="sourceLineNo">096</span> }<a name="line.96"></a> +<span class="sourceLineNo">097</span><a name="line.97"></a> +<span class="sourceLineNo">098</span> protected ClientService.BlockingInterface getStub() {<a name="line.98"></a> +<span class="sourceLineNo">099</span> return this.stub;<a name="line.99"></a> +<span class="sourceLineNo">100</span> }<a name="line.100"></a> +<span class="sourceLineNo">101</span><a name="line.101"></a> +<span class="sourceLineNo">102</span> void setStub(final ClientService.BlockingInterface stub) {<a name="line.102"></a> +<span class="sourceLineNo">103</span> this.stub = stub;<a name="line.103"></a> +<span class="sourceLineNo">104</span> }<a name="line.104"></a> +<span class="sourceLineNo">105</span><a name="line.105"></a> +<span class="sourceLineNo">106</span> protected HRegionLocation getLocation() {<a name="line.106"></a> +<span class="sourceLineNo">107</span> return this.location;<a name="line.107"></a> +<span class="sourceLineNo">108</span> }<a name="line.108"></a> +<span class="sourceLineNo">109</span><a name="line.109"></a> +<span class="sourceLineNo">110</span> protected void setLocation(final HRegionLocation location) {<a name="line.110"></a> +<span class="sourceLineNo">111</span> this.location = location;<a name="line.111"></a> +<span class="sourceLineNo">112</span> }<a name="line.112"></a> +<span class="sourceLineNo">113</span><a name="line.113"></a> +<span class="sourceLineNo">114</span> public TableName getTableName() {<a name="line.114"></a> +<span class="sourceLineNo">115</span> return this.tableName;<a name="line.115"></a> +<span class="sourceLineNo">116</span> }<a name="line.116"></a> +<span class="sourceLineNo">117</span><a name="line.117"></a> +<span class="sourceLineNo">118</span> public byte [] getRow() {<a name="line.118"></a> +<span class="sourceLineNo">119</span> return this.row;<a name="line.119"></a> +<span class="sourceLineNo">120</span> }<a name="line.120"></a> +<span class="sourceLineNo">121</span><a name="line.121"></a> +<span class="sourceLineNo">122</span> @Override<a name="line.122"></a> +<span class="sourceLineNo">123</span> public void throwable(Throwable t, boolean retrying) {<a name="line.123"></a> +<span class="sourceLineNo">124</span> if (location != null) {<a name="line.124"></a> +<span class="sourceLineNo">125</span> getConnection().updateCachedLocations(tableName, location.getRegionInfo().getRegionName(),<a name="line.125"></a> +<span class="sourceLineNo">126</span> row, t, location.getServerName());<a name="line.126"></a> +<span class="sourceLineNo">127</span> }<a name="line.127"></a> +<span class="sourceLineNo">128</span> }<a name="line.128"></a> +<span class="sourceLineNo">129</span><a name="line.129"></a> +<span class="sourceLineNo">130</span> @Override<a name="line.130"></a> +<span class="sourceLineNo">131</span> public String getExceptionMessageAdditionalDetail() {<a name="line.131"></a> +<span class="sourceLineNo">132</span> return "row '" + Bytes.toString(row) + "' on table '" + tableName + "' at " + location;<a name="line.132"></a> +<span class="sourceLineNo">133</span> }<a name="line.133"></a> +<span class="sourceLineNo">134</span><a name="line.134"></a> +<span class="sourceLineNo">135</span> @Override<a name="line.135"></a> +<span class="sourceLineNo">136</span> public long sleep(long pause, int tries) {<a name="line.136"></a> +<span class="sourceLineNo">137</span> // Tries hasn't been bumped up yet so we use "tries + 1" to get right pause time<a name="line.137"></a> +<span class="sourceLineNo">138</span> long sleep = ConnectionUtils.getPauseTime(pause, tries + 1);<a name="line.138"></a> +<span class="sourceLineNo">139</span> if (sleep < MIN_WAIT_DEAD_SERVER<a name="line.139"></a> +<span class="sourceLineNo">140</span> && (location == null || getConnection().isDeadServer(location.getServerName()))) {<a name="line.140"></a> +<span class="sourceLineNo">141</span> sleep = ConnectionUtils.addJitter(MIN_WAIT_DEAD_SERVER, 0.10f);<a name="line.141"></a> +<span class="sourceLineNo">142</span> }<a name="line.142"></a> +<span class="sourceLineNo">143</span> return sleep;<a name="line.143"></a> +<span class="sourceLineNo">144</span> }<a name="line.144"></a> +<span class="sourceLineNo">145</span><a name="line.145"></a> +<span class="sourceLineNo">146</span> /**<a name="line.146"></a> +<span class="sourceLineNo">147</span> * @return the HRegionInfo for the current region<a name="line.147"></a> +<span class="sourceLineNo">148</span> */<a name="line.148"></a> +<span class="sourceLineNo">149</span> public HRegionInfo getHRegionInfo() {<a name="line.149"></a> +<span class="sourceLineNo">150</span> if (this.location == null) {<a name="line.150"></a> +<span class="sourceLineNo">151</span> return null;<a name="line.151"></a> +<span class="sourceLineNo">152</span> }<a name="line.152"></a> +<span class="sourceLineNo">153</span> return this.location.getRegionInfo();<a name="line.153"></a> +<span class="sourceLineNo">154</span> }<a name="line.154"></a> +<span class="sourceLineNo">155</span>}<a name="line.155"></a> http://git-wip-us.apache.org/repos/asf/hbase-site/blob/4131cace/devapidocs/src-html/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.html b/devapidocs/src-html/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.html index e002817..f71660f 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.html @@ -124,7 +124,7 @@ <span class="sourceLineNo">116</span> for(String addr : uniqAddr) {<a name="line.116"></a> <span class="sourceLineNo">117</span> addrs.append(addr).append(", ");<a name="line.117"></a> <span class="sourceLineNo">118</span> }<a name="line.118"></a> -<span class="sourceLineNo">119</span> return s;<a name="line.119"></a> +<span class="sourceLineNo">119</span> return addrs.toString();<a name="line.119"></a> <span class="sourceLineNo">120</span> }<a name="line.120"></a> <span class="sourceLineNo">121</span><a name="line.121"></a> <span class="sourceLineNo">122</span> public String getExhaustiveDescription() {<a name="line.122"></a> http://git-wip-us.apache.org/repos/asf/hbase-site/blob/4131cace/devapidocs/src-html/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.html b/devapidocs/src-html/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.html index 25c61a7..1701cb2 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.html @@ -41,71 +41,87 @@ <span class="sourceLineNo">033</span> protected final Configuration conf;<a name="line.33"></a> <span class="sourceLineNo">034</span> private final long pause;<a name="line.34"></a> <span class="sourceLineNo">035</span> private final int retries;<a name="line.35"></a> -<span class="sourceLineNo">036</span> private final RetryingCallerInterceptor interceptor;<a name="line.36"></a> -<span class="sourceLineNo">037</span> private final int startLogErrorsCnt;<a name="line.37"></a> -<span class="sourceLineNo">038</span> private final boolean enableBackPressure;<a name="line.38"></a> -<span class="sourceLineNo">039</span> private ServerStatisticTracker stats;<a name="line.39"></a> -<span class="sourceLineNo">040</span><a name="line.40"></a> -<span class="sourceLineNo">041</span> public RpcRetryingCallerFactory(Configuration conf) {<a name="line.41"></a> -<span class="sourceLineNo">042</span> this(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR);<a name="line.42"></a> -<span class="sourceLineNo">043</span> }<a name="line.43"></a> -<span class="sourceLineNo">044</span> <a name="line.44"></a> -<span class="sourceLineNo">045</span> public RpcRetryingCallerFactory(Configuration conf, RetryingCallerInterceptor interceptor) {<a name="line.45"></a> -<span class="sourceLineNo">046</span> this.conf = conf;<a name="line.46"></a> -<span class="sourceLineNo">047</span> pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,<a name="line.47"></a> -<span class="sourceLineNo">048</span> HConstants.DEFAULT_HBASE_CLIENT_PAUSE);<a name="line.48"></a> -<span class="sourceLineNo">049</span> retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,<a name="line.49"></a> -<span class="sourceLineNo">050</span> HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);<a name="line.50"></a> -<span class="sourceLineNo">051</span> startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY,<a name="line.51"></a> -<span class="sourceLineNo">052</span> AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT);<a name="line.52"></a> -<span class="sourceLineNo">053</span> this.interceptor = interceptor;<a name="line.53"></a> -<span class="sourceLineNo">054</span> enableBackPressure = conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE,<a name="line.54"></a> -<span class="sourceLineNo">055</span> HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE);<a name="line.55"></a> -<span class="sourceLineNo">056</span> }<a name="line.56"></a> -<span class="sourceLineNo">057</span><a name="line.57"></a> -<span class="sourceLineNo">058</span> /**<a name="line.58"></a> -<span class="sourceLineNo">059</span> * Set the tracker that should be used for tracking statistics about the server<a name="line.59"></a> -<span class="sourceLineNo">060</span> */<a name="line.60"></a> -<span class="sourceLineNo">061</span> public void setStatisticTracker(ServerStatisticTracker statisticTracker) {<a name="line.61"></a> -<span class="sourceLineNo">062</span> this.stats = statisticTracker;<a name="line.62"></a> -<span class="sourceLineNo">063</span> }<a name="line.63"></a> -<span class="sourceLineNo">064</span><a name="line.64"></a> -<span class="sourceLineNo">065</span> public <T> RpcRetryingCaller<T> newCaller() {<a name="line.65"></a> -<span class="sourceLineNo">066</span> // We store the values in the factory instance. This way, constructing new objects<a name="line.66"></a> -<span class="sourceLineNo">067</span> // is cheap as it does not require parsing a complex structure.<a name="line.67"></a> -<span class="sourceLineNo">068</span> RpcRetryingCaller<T> caller = new RpcRetryingCallerImpl<T>(pause, retries, interceptor,<a name="line.68"></a> -<span class="sourceLineNo">069</span> startLogErrorsCnt);<a name="line.69"></a> -<span class="sourceLineNo">070</span> return caller;<a name="line.70"></a> -<span class="sourceLineNo">071</span> }<a name="line.71"></a> -<span class="sourceLineNo">072</span><a name="line.72"></a> -<span class="sourceLineNo">073</span> public static RpcRetryingCallerFactory instantiate(Configuration configuration) {<a name="line.73"></a> -<span class="sourceLineNo">074</span> return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null);<a name="line.74"></a> -<span class="sourceLineNo">075</span> }<a name="line.75"></a> -<span class="sourceLineNo">076</span><a name="line.76"></a> -<span class="sourceLineNo">077</span> public static RpcRetryingCallerFactory instantiate(Configuration configuration,<a name="line.77"></a> -<span class="sourceLineNo">078</span> ServerStatisticTracker stats) {<a name="line.78"></a> -<span class="sourceLineNo">079</span> return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats);<a name="line.79"></a> -<span class="sourceLineNo">080</span> }<a name="line.80"></a> -<span class="sourceLineNo">081</span><a name="line.81"></a> -<span class="sourceLineNo">082</span> public static RpcRetryingCallerFactory instantiate(Configuration configuration,<a name="line.82"></a> -<span class="sourceLineNo">083</span> RetryingCallerInterceptor interceptor, ServerStatisticTracker stats) {<a name="line.83"></a> -<span class="sourceLineNo">084</span> String clazzName = RpcRetryingCallerFactory.class.getName();<a name="line.84"></a> -<span class="sourceLineNo">085</span> String rpcCallerFactoryClazz =<a name="line.85"></a> -<span class="sourceLineNo">086</span> configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, clazzName);<a name="line.86"></a> -<span class="sourceLineNo">087</span> RpcRetryingCallerFactory factory;<a name="line.87"></a> -<span class="sourceLineNo">088</span> if (rpcCallerFactoryClazz.equals(clazzName)) {<a name="line.88"></a> -<span class="sourceLineNo">089</span> factory = new RpcRetryingCallerFactory(configuration, interceptor);<a name="line.89"></a> -<span class="sourceLineNo">090</span> } else {<a name="line.90"></a> -<span class="sourceLineNo">091</span> factory = ReflectionUtils.instantiateWithCustomCtor(<a name="line.91"></a> -<span class="sourceLineNo">092</span> rpcCallerFactoryClazz, new Class[] { Configuration.class },<a name="line.92"></a> -<span class="sourceLineNo">093</span> new Object[] { configuration });<a name="line.93"></a> -<span class="sourceLineNo">094</span> }<a name="line.94"></a> -<span class="sourceLineNo">095</span><a name="line.95"></a> -<span class="sourceLineNo">096</span> // setting for backwards compat with existing caller factories, rather than in the ctor<a name="line.96"></a> -<span class="sourceLineNo">097</span> factory.setStatisticTracker(stats);<a name="line.97"></a> -<span class="sourceLineNo">098</span> return factory;<a name="line.98"></a> -<span class="sourceLineNo">099</span> }<a name="line.99"></a> -<span class="sourceLineNo">100</span>}<a name="line.100"></a> +<span class="sourceLineNo">036</span> private final int rpcTimeout;<a name="line.36"></a> +<span class="sourceLineNo">037</span> private final RetryingCallerInterceptor interceptor;<a name="line.37"></a> +<span class="sourceLineNo">038</span> private final int startLogErrorsCnt;<a name="line.38"></a> +<span class="sourceLineNo">039</span> private final boolean enableBackPressure;<a name="line.39"></a> +<span class="sourceLineNo">040</span> private ServerStatisticTracker stats;<a name="line.40"></a> +<span class="sourceLineNo">041</span><a name="line.41"></a> +<span class="sourceLineNo">042</span> public RpcRetryingCallerFactory(Configuration conf) {<a name="line.42"></a> +<span class="sourceLineNo">043</span> this(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR);<a name="line.43"></a> +<span class="sourceLineNo">044</span> }<a name="line.44"></a> +<span class="sourceLineNo">045</span> <a name="line.45"></a> +<span class="sourceLineNo">046</span> public RpcRetryingCallerFactory(Configuration conf, RetryingCallerInterceptor interceptor) {<a name="line.46"></a> +<span class="sourceLineNo">047</span> this.conf = conf;<a name="line.47"></a> +<span class="sourceLineNo">048</span> pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,<a name="line.48"></a> +<span class="sourceLineNo">049</span> HConstants.DEFAULT_HBASE_CLIENT_PAUSE);<a name="line.49"></a> +<span class="sourceLineNo">050</span> retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,<a name="line.50"></a> +<span class="sourceLineNo">051</span> HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);<a name="line.51"></a> +<span class="sourceLineNo">052</span> startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY,<a name="line.52"></a> +<span class="sourceLineNo">053</span> AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT);<a name="line.53"></a> +<span class="sourceLineNo">054</span> this.interceptor = interceptor;<a name="line.54"></a> +<span class="sourceLineNo">055</span> enableBackPressure = conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE,<a name="line.55"></a> +<span class="sourceLineNo">056</span> HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE);<a name="line.56"></a> +<span class="sourceLineNo">057</span> rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,HConstants.DEFAULT_HBASE_RPC_TIMEOUT);<a name="line.57"></a> +<span class="sourceLineNo">058</span> }<a name="line.58"></a> +<span class="sourceLineNo">059</span><a name="line.59"></a> +<span class="sourceLineNo">060</span> /**<a name="line.60"></a> +<span class="sourceLineNo">061</span> * Set the tracker that should be used for tracking statistics about the server<a name="line.61"></a> +<span class="sourceLineNo">062</span> */<a name="line.62"></a> +<span class="sourceLineNo">063</span> public void setStatisticTracker(ServerStatisticTracker statisticTracker) {<a name="line.63"></a> +<span class="sourceLineNo">064</span> this.stats = statisticTracker;<a name="line.64"></a> +<span class="sourceLineNo">065</span> }<a name="line.65"></a> +<span class="sourceLineNo">066</span><a name="line.66"></a> +<span class="sourceLineNo">067</span> /**<a name="line.67"></a> +<span class="sourceLineNo">068</span> * Create a new RetryingCaller with specific rpc timeout.<a name="line.68"></a> +<span class="sourceLineNo">069</span> */<a name="line.69"></a> +<span class="sourceLineNo">070</span> public <T> RpcRetryingCaller<T> newCaller(int rpcTimeout) {<a name="line.70"></a> +<span class="sourceLineNo">071</span> // We store the values in the factory instance. This way, constructing new objects<a name="line.71"></a> +<span class="sourceLineNo">072</span> // is cheap as it does not require parsing a complex structure.<a name="line.72"></a> +<span class="sourceLineNo">073</span> RpcRetryingCaller<T> caller = new RpcRetryingCallerImpl<T>(pause, retries, interceptor,<a name="line.73"></a> +<span class="sourceLineNo">074</span> startLogErrorsCnt, rpcTimeout);<a name="line.74"></a> +<span class="sourceLineNo">075</span> return caller;<a name="line.75"></a> +<span class="sourceLineNo">076</span> }<a name="line.76"></a> +<span class="sourceLineNo">077</span><a name="line.77"></a> +<span class="sourceLineNo">078</span> /**<a name="line.78"></a> +<span class="sourceLineNo">079</span> * Create a new RetryingCaller with configured rpc timeout.<a name="line.79"></a> +<span class="sourceLineNo">080</span> */<a name="line.80"></a> +<span class="sourceLineNo">081</span> public <T> RpcRetryingCaller<T> newCaller() {<a name="line.81"></a> +<span class="sourceLineNo">082</span> // We store the values in the factory instance. This way, constructing new objects<a name="line.82"></a> +<span class="sourceLineNo">083</span> // is cheap as it does not require parsing a complex structure.<a name="line.83"></a> +<span class="sourceLineNo">084</span> RpcRetryingCaller<T> caller = new RpcRetryingCallerImpl<T>(pause, retries, interceptor,<a name="line.84"></a> +<span class="sourceLineNo">085</span> startLogErrorsCnt, rpcTimeout);<a name="line.85"></a> +<span class="sourceLineNo">086</span> return caller;<a name="line.86"></a> +<span class="sourceLineNo">087</span> }<a name="line.87"></a> +<span class="sourceLineNo">088</span><a name="line.88"></a> +<span class="sourceLineNo">089</span> public static RpcRetryingCallerFactory instantiate(Configuration configuration) {<a name="line.89"></a> +<span class="sourceLineNo">090</span> return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null);<a name="line.90"></a> +<span class="sourceLineNo">091</span> }<a name="line.91"></a> +<span class="sourceLineNo">092</span><a name="line.92"></a> +<span class="sourceLineNo">093</span> public static RpcRetryingCallerFactory instantiate(Configuration configuration,<a name="line.93"></a> +<span class="sourceLineNo">094</span> ServerStatisticTracker stats) {<a name="line.94"></a> +<span class="sourceLineNo">095</span> return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats);<a name="line.95"></a> +<span class="sourceLineNo">096</span> }<a name="line.96"></a> +<span class="sourceLineNo">097</span><a name="line.97"></a> +<span class="sourceLineNo">098</span> public static RpcRetryingCallerFactory instantiate(Configuration configuration,<a name="line.98"></a> +<span class="sourceLineNo">099</span> RetryingCallerInterceptor interceptor, ServerStatisticTracker stats) {<a name="line.99"></a> +<span class="sourceLineNo">100</span> String clazzName = RpcRetryingCallerFactory.class.getName();<a name="line.100"></a> +<span class="sourceLineNo">101</span> String rpcCallerFactoryClazz =<a name="line.101"></a> +<span class="sourceLineNo">102</span> configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, clazzName);<a name="line.102"></a> +<span class="sourceLineNo">103</span> RpcRetryingCallerFactory factory;<a name="line.103"></a> +<span class="sourceLineNo">104</span> if (rpcCallerFactoryClazz.equals(clazzName)) {<a name="line.104"></a> +<span class="sourceLineNo">105</span> factory = new RpcRetryingCallerFactory(configuration, interceptor);<a name="line.105"></a> +<span class="sourceLineNo">106</span> } else {<a name="line.106"></a> +<span class="sourceLineNo">107</span> factory = ReflectionUtils.instantiateWithCustomCtor(<a name="line.107"></a> +<span class="sourceLineNo">108</span> rpcCallerFactoryClazz, new Class[] { Configuration.class },<a name="line.108"></a> +<span class="sourceLineNo">109</span> new Object[] { configuration });<a name="line.109"></a> +<span class="sourceLineNo">110</span> }<a name="line.110"></a> +<span class="sourceLineNo">111</span><a name="line.111"></a> +<span class="sourceLineNo">112</span> // setting for backwards compat with existing caller factories, rather than in the ctor<a name="line.112"></a> +<span class="sourceLineNo">113</span> factory.setStatisticTracker(stats);<a name="line.113"></a> +<span class="sourceLineNo">114</span> return factory;<a name="line.114"></a> +<span class="sourceLineNo">115</span> }<a name="line.115"></a> +<span class="sourceLineNo">116</span>}<a name="line.116"></a> http://git-wip-us.apache.org/repos/asf/hbase-site/blob/4131cace/devapidocs/src-html/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.html b/devapidocs/src-html/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.html index 23144ac..3d4cc71 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.html @@ -65,164 +65,175 @@ <span class="sourceLineNo">057</span><a name="line.57"></a> <span class="sourceLineNo">058</span> private final long pause;<a name="line.58"></a> <span class="sourceLineNo">059</span> private final int maxAttempts;// how many times to try<a name="line.59"></a> -<span class="sourceLineNo">060</span> private final AtomicBoolean cancelled = new AtomicBoolean(false);<a name="line.60"></a> -<span class="sourceLineNo">061</span> private final RetryingCallerInterceptor interceptor;<a name="line.61"></a> -<span class="sourceLineNo">062</span> private final RetryingCallerInterceptorContext context;<a name="line.62"></a> -<span class="sourceLineNo">063</span> private final RetryingTimeTracker tracker;<a name="line.63"></a> -<span class="sourceLineNo">064</span><a name="line.64"></a> -<span class="sourceLineNo">065</span> public RpcRetryingCallerImpl(long pause, int retries, int startLogErrorsCnt) {<a name="line.65"></a> -<span class="sourceLineNo">066</span> this(pause, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt);<a name="line.66"></a> -<span class="sourceLineNo">067</span> }<a name="line.67"></a> -<span class="sourceLineNo">068</span> <a name="line.68"></a> -<span class="sourceLineNo">069</span> public RpcRetryingCallerImpl(long pause, int retries,<a name="line.69"></a> -<span class="sourceLineNo">070</span> RetryingCallerInterceptor interceptor, int startLogErrorsCnt) {<a name="line.70"></a> -<span class="sourceLineNo">071</span> this.pause = pause;<a name="line.71"></a> -<span class="sourceLineNo">072</span> this.maxAttempts = retries + 1;<a name="line.72"></a> -<span class="sourceLineNo">073</span> this.interceptor = interceptor;<a name="line.73"></a> -<span class="sourceLineNo">074</span> context = interceptor.createEmptyContext();<a name="line.74"></a> -<span class="sourceLineNo">075</span> this.startLogErrorsCnt = startLogErrorsCnt;<a name="line.75"></a> -<span class="sourceLineNo">076</span> this.tracker = new RetryingTimeTracker();<a name="line.76"></a> -<span class="sourceLineNo">077</span> }<a name="line.77"></a> -<span class="sourceLineNo">078</span> <a name="line.78"></a> -<span class="sourceLineNo">079</span> @Override<a name="line.79"></a> -<span class="sourceLineNo">080</span> public void cancel(){<a name="line.80"></a> -<span class="sourceLineNo">081</span> cancelled.set(true);<a name="line.81"></a> -<span class="sourceLineNo">082</span> synchronized (cancelled){<a name="line.82"></a> -<span class="sourceLineNo">083</span> cancelled.notifyAll();<a name="line.83"></a> -<span class="sourceLineNo">084</span> }<a name="line.84"></a> -<span class="sourceLineNo">085</span> }<a name="line.85"></a> -<span class="sourceLineNo">086</span><a name="line.86"></a> -<span class="sourceLineNo">087</span> @Override<a name="line.87"></a> -<span class="sourceLineNo">088</span> public T callWithRetries(RetryingCallable<T> callable, int callTimeout)<a name="line.88"></a> -<span class="sourceLineNo">089</span> throws IOException, RuntimeException {<a name="line.89"></a> -<span class="sourceLineNo">090</span> List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =<a name="line.90"></a> -<span class="sourceLineNo">091</span> new ArrayList<RetriesExhaustedException.ThrowableWithExtraContext>();<a name="line.91"></a> -<span class="sourceLineNo">092</span> tracker.start();<a name="line.92"></a> -<span class="sourceLineNo">093</span> context.clear();<a name="line.93"></a> -<span class="sourceLineNo">094</span> for (int tries = 0;; tries++) {<a name="line.94"></a> -<span class="sourceLineNo">095</span> long expectedSleep;<a name="line.95"></a> -<span class="sourceLineNo">096</span> try {<a name="line.96"></a> -<span class="sourceLineNo">097</span> callable.prepare(tries != 0); // if called with false, check table status on ZK<a name="line.97"></a> -<span class="sourceLineNo">098</span> interceptor.intercept(context.prepare(callable, tries));<a name="line.98"></a> -<span class="sourceLineNo">099</span> return callable.call(tracker.getRemainingTime(callTimeout));<a name="line.99"></a> -<span class="sourceLineNo">100</span> } catch (PreemptiveFastFailException e) {<a name="line.100"></a> -<span class="sourceLineNo">101</span> throw e;<a name="line.101"></a> -<span class="sourceLineNo">102</span> } catch (Throwable t) {<a name="line.102"></a> -<span class="sourceLineNo">103</span> ExceptionUtil.rethrowIfInterrupt(t);<a name="line.103"></a> -<span class="sourceLineNo">104</span> if (tries > startLogErrorsCnt) {<a name="line.104"></a> -<span class="sourceLineNo">105</span> LOG.info("Call exception, tries=" + tries + ", maxAttempts=" + maxAttempts + ", started="<a name="line.105"></a> -<span class="sourceLineNo">106</span> + (EnvironmentEdgeManager.currentTime() - tracker.getStartTime()) + " ms ago, "<a name="line.106"></a> -<span class="sourceLineNo">107</span> + "cancelled=" + cancelled.get() + ", msg="<a name="line.107"></a> -<span class="sourceLineNo">108</span> + callable.getExceptionMessageAdditionalDetail());<a name="line.108"></a> -<span class="sourceLineNo">109</span> }<a name="line.109"></a> -<span class="sourceLineNo">110</span><a name="line.110"></a> -<span class="sourceLineNo">111</span> // translateException throws exception when should not retry: i.e. when request is bad.<a name="line.111"></a> -<span class="sourceLineNo">112</span> interceptor.handleFailure(context, t);<a name="line.112"></a> -<span class="sourceLineNo">113</span> t = translateException(t);<a name="line.113"></a> -<span class="sourceLineNo">114</span> callable.throwable(t, maxAttempts != 1);<a name="line.114"></a> -<span class="sourceLineNo">115</span> RetriesExhaustedException.ThrowableWithExtraContext qt =<a name="line.115"></a> -<span class="sourceLineNo">116</span> new RetriesExhaustedException.ThrowableWithExtraContext(t,<a name="line.116"></a> -<span class="sourceLineNo">117</span> EnvironmentEdgeManager.currentTime(), toString());<a name="line.117"></a> -<span class="sourceLineNo">118</span> exceptions.add(qt);<a name="line.118"></a> -<span class="sourceLineNo">119</span> if (tries >= maxAttempts - 1) {<a name="line.119"></a> -<span class="sourceLineNo">120</span> throw new RetriesExhaustedException(tries, exceptions);<a name="line.120"></a> -<span class="sourceLineNo">121</span> }<a name="line.121"></a> -<span class="sourceLineNo">122</span> // If the server is dead, we need to wait a little before retrying, to give<a name="line.122"></a> -<span class="sourceLineNo">123</span> // a chance to the regions to be<a name="line.123"></a> -<span class="sourceLineNo">124</span> // tries hasn't been bumped up yet so we use "tries + 1" to get right pause time<a name="line.124"></a> -<span class="sourceLineNo">125</span> expectedSleep = callable.sleep(pause, tries + 1);<a name="line.125"></a> -<span class="sourceLineNo">126</span><a name="line.126"></a> -<span class="sourceLineNo">127</span> // If, after the planned sleep, there won't be enough time left, we stop now.<a name="line.127"></a> -<span class="sourceLineNo">128</span> long duration = singleCallDuration(expectedSleep);<a name="line.128"></a> -<span class="sourceLineNo">129</span> if (duration > callTimeout) {<a name="line.129"></a> -<span class="sourceLineNo">130</span> String msg = "callTimeout=" + callTimeout + ", callDuration=" + duration +<a name="line.130"></a> -<span class="sourceLineNo">131</span> ": " + callable.getExceptionMessageAdditionalDetail();<a name="line.131"></a> -<span class="sourceLineNo">132</span> throw (SocketTimeoutException)(new SocketTimeoutException(msg).initCause(t));<a name="line.132"></a> -<span class="sourceLineNo">133</span> }<a name="line.133"></a> -<span class="sourceLineNo">134</span> } finally {<a name="line.134"></a> -<span class="sourceLineNo">135</span> interceptor.updateFailureInfo(context);<a name="line.135"></a> -<span class="sourceLineNo">136</span> }<a name="line.136"></a> -<span class="sourceLineNo">137</span> try {<a name="line.137"></a> -<span class="sourceLineNo">138</span> if (expectedSleep > 0) {<a name="line.138"></a> -<span class="sourceLineNo">139</span> synchronized (cancelled) {<a name="line.139"></a> -<span class="sourceLineNo">140</span> if (cancelled.get()) return null;<a name="line.140"></a> -<span class="sourceLineNo">141</span> cancelled.wait(expectedSleep);<a name="line.141"></a> -<span class="sourceLineNo">142</span> }<a name="line.142"></a> -<span class="sourceLineNo">143</span> }<a name="line.143"></a> -<span class="sourceLineNo">144</span> if (cancelled.get()) return null;<a name="line.144"></a> -<span class="sourceLineNo">145</span> } catch (InterruptedException e) {<a name="line.145"></a> -<span class="sourceLineNo">146</span> throw new InterruptedIOException("Interrupted after " + tries<a name="line.146"></a> -<span class="sourceLineNo">147</span> + " tries while maxAttempts=" + maxAttempts);<a name="line.147"></a> -<span class="sourceLineNo">148</span> }<a name="line.148"></a> -<span class="sourceLineNo">149</span> }<a name="line.149"></a> -<span class="sourceLineNo">150</span> }<a name="line.150"></a> -<span class="sourceLineNo">151</span><a name="line.151"></a> -<span class="sourceLineNo">152</span> /**<a name="line.152"></a> -<span class="sourceLineNo">153</span> * @return Calculate how long a single call took<a name="line.153"></a> -<span class="sourceLineNo">154</span> */<a name="line.154"></a> -<span class="sourceLineNo">155</span> private long singleCallDuration(final long expectedSleep) {<a name="line.155"></a> -<span class="sourceLineNo">156</span> return (EnvironmentEdgeManager.currentTime() - tracker.getStartTime()) + expectedSleep;<a name="line.156"></a> -<span class="sourceLineNo">157</span> }<a name="line.157"></a> -<span class="sourceLineNo">158</span><a name="line.158"></a> -<span class="sourceLineNo">159</span> @Override<a name="line.159"></a> -<span class="sourceLineNo">160</span> public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)<a name="line.160"></a> -<span class="sourceLineNo">161</span> throws IOException, RuntimeException {<a name="line.161"></a> -<span class="sourceLineNo">162</span> // The code of this method should be shared with withRetries.<a name="line.162"></a> -<span class="sourceLineNo">163</span> try {<a name="line.163"></a> -<span class="sourceLineNo">164</span> callable.prepare(false);<a name="line.164"></a> -<span class="sourceLineNo">165</span> return callable.call(callTimeout);<a name="line.165"></a> -<span class="sourceLineNo">166</span> } catch (Throwable t) {<a name="line.166"></a> -<span class="sourceLineNo">167</span> Throwable t2 = translateException(t);<a name="line.167"></a> -<span class="sourceLineNo">168</span> ExceptionUtil.rethrowIfInterrupt(t2);<a name="line.168"></a> -<span class="sourceLineNo">169</span> // It would be nice to clear the location cache here.<a name="line.169"></a> -<span class="sourceLineNo">170</span> if (t2 instanceof IOException) {<a name="line.170"></a> -<span class="sourceLineNo">171</span> throw (IOException)t2;<a name="line.171"></a> -<span class="sourceLineNo">172</span> } else {<a name="line.172"></a> -<span class="sourceLineNo">173</span> throw new RuntimeException(t2);<a name="line.173"></a> -<span class="sourceLineNo">174</span> }<a name="line.174"></a> -<span class="sourceLineNo">175</span> }<a name="line.175"></a> -<span class="sourceLineNo">176</span> }<a name="line.176"></a> -<span class="sourceLineNo">177</span> <a name="line.177"></a> -<span class="sourceLineNo">178</span> /**<a name="line.178"></a> -<span class="sourceLineNo">179</span> * Get the good or the remote exception if any, throws the DoNotRetryIOException.<a name="line.179"></a> -<span class="sourceLineNo">180</span> * @param t the throwable to analyze<a name="line.180"></a> -<span class="sourceLineNo">181</span> * @return the translated exception, if it's not a DoNotRetryIOException<a name="line.181"></a> -<span class="sourceLineNo">182</span> * @throws DoNotRetryIOException - if we find it, we throw it instead of translating.<a name="line.182"></a> -<span class="sourceLineNo">183</span> */<a name="line.183"></a> -<span class="sourceLineNo">184</span> static Throwable translateException(Throwable t) throws DoNotRetryIOException {<a name="line.184"></a> -<span class="sourceLineNo">185</span> if (t instanceof UndeclaredThrowableException) {<a name="line.185"></a> -<span class="sourceLineNo">186</span> if (t.getCause() != null) {<a name="line.186"></a> -<span class="sourceLineNo">187</span> t = t.getCause();<a name="line.187"></a> -<span class="sourceLineNo">188</span> }<a name="line.188"></a> -<span class="sourceLineNo">189</span> }<a name="line.189"></a> -<span class="sourceLineNo">190</span> if (t instanceof RemoteException) {<a name="line.190"></a> -<span class="sourceLineNo">191</span> t = ((RemoteException)t).unwrapRemoteException();<a name="line.191"></a> +<span class="sourceLineNo">060</span> private final int rpcTimeout;// timeout for each rpc request<a name="line.60"></a> +<span class="sourceLineNo">061</span> private final AtomicBoolean cancelled = new AtomicBoolean(false);<a name="line.61"></a> +<span class="sourceLineNo">062</span> private final RetryingCallerInterceptor interceptor;<a name="line.62"></a> +<span class="sourceLineNo">063</span> private final RetryingCallerInterceptorContext context;<a name="line.63"></a> +<span class="sourceLineNo">064</span> private final RetryingTimeTracker tracker;<a name="line.64"></a> +<span class="sourceLineNo">065</span><a name="line.65"></a> +<span class="sourceLineNo">066</span> public RpcRetryingCallerImpl(long pause, int retries, int startLogErrorsCnt) {<a name="line.66"></a> +<span class="sourceLineNo">067</span> this(pause, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt, 0);<a name="line.67"></a> +<span class="sourceLineNo">068</span> }<a name="line.68"></a> +<span class="sourceLineNo">069</span> <a name="line.69"></a> +<span class="sourceLineNo">070</span> public RpcRetryingCallerImpl(long pause, int retries,<a name="line.70"></a> +<span class="sourceLineNo">071</span> RetryingCallerInterceptor interceptor, int startLogErrorsCnt, int rpcTimeout) {<a name="line.71"></a> +<span class="sourceLineNo">072</span> this.pause = pause;<a name="line.72"></a> +<span class="sourceLineNo">073</span> this.maxAttempts = retries + 1;<a name="line.73"></a> +<span class="sourceLineNo">074</span> this.interceptor = interceptor;<a name="line.74"></a> +<span class="sourceLineNo">075</span> context = interceptor.createEmptyContext();<a name="line.75"></a> +<span class="sourceLineNo">076</span> this.startLogErrorsCnt = startLogErrorsCnt;<a name="line.76"></a> +<span class="sourceLineNo">077</span> this.tracker = new RetryingTimeTracker();<a name="line.77"></a> +<span class="sourceLineNo">078</span> this.rpcTimeout = rpcTimeout;<a name="line.78"></a> +<span class="sourceLineNo">079</span> }<a name="line.79"></a> +<span class="sourceLineNo">080</span> <a name="line.80"></a> +<span class="sourceLineNo">081</span> @Override<a name="line.81"></a> +<span class="sourceLineNo">082</span> public void cancel(){<a name="line.82"></a> +<span class="sourceLineNo">083</span> cancelled.set(true);<a name="line.83"></a> +<span class="sourceLineNo">084</span> synchronized (cancelled){<a name="line.84"></a> +<span class="sourceLineNo">085</span> cancelled.notifyAll();<a name="line.85"></a> +<span class="sourceLineNo">086</span> }<a name="line.86"></a> +<span class="sourceLineNo">087</span> }<a name="line.87"></a> +<span class="sourceLineNo">088</span><a name="line.88"></a> +<span class="sourceLineNo">089</span> @Override<a name="line.89"></a> +<span class="sourceLineNo">090</span> public T callWithRetries(RetryingCallable<T> callable, int callTimeout)<a name="line.90"></a> +<span class="sourceLineNo">091</span> throws IOException, RuntimeException {<a name="line.91"></a> +<span class="sourceLineNo">092</span> List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =<a name="line.92"></a> +<span class="sourceLineNo">093</span> new ArrayList<RetriesExhaustedException.ThrowableWithExtraContext>();<a name="line.93"></a> +<span class="sourceLineNo">094</span> tracker.start();<a name="line.94"></a> +<span class="sourceLineNo">095</span> context.clear();<a name="line.95"></a> +<span class="sourceLineNo">096</span> for (int tries = 0;; tries++) {<a name="line.96"></a> +<span class="sourceLineNo">097</span> long expectedSleep;<a name="line.97"></a> +<span class="sourceLineNo">098</span> try {<a name="line.98"></a> +<span class="sourceLineNo">099</span> // bad cache entries are cleared in the call to RetryingCallable#throwable() in catch block<a name="line.99"></a> +<span class="sourceLineNo">100</span> callable.prepare(tries != 0);<a name="line.100"></a> +<span class="sourceLineNo">101</span> interceptor.intercept(context.prepare(callable, tries));<a name="line.101"></a> +<span class="sourceLineNo">102</span> return callable.call(getTimeout(callTimeout));<a name="line.102"></a> +<span class="sourceLineNo">103</span> } catch (PreemptiveFastFailException e) {<a name="line.103"></a> +<span class="sourceLineNo">104</span> throw e;<a name="line.104"></a> +<span class="sourceLineNo">105</span> } catch (Throwable t) {<a name="line.105"></a> +<span class="sourceLineNo">106</span> ExceptionUtil.rethrowIfInterrupt(t);<a name="line.106"></a> +<span class="sourceLineNo">107</span> if (tries > startLogErrorsCnt) {<a name="line.107"></a> +<span class="sourceLineNo">108</span> LOG.info("Call exception, tries=" + tries + ", maxAttempts=" + maxAttempts + ", started="<a name="line.108"></a> +<span class="sourceLineNo">109</span> + (EnvironmentEdgeManager.currentTime() - tracker.getStartTime()) + " ms ago, "<a name="line.109"></a> +<span class="sourceLineNo">110</span> + "cancelled=" + cancelled.get() + ", msg="<a name="line.110"></a> +<span class="sourceLineNo">111</span> + callable.getExceptionMessageAdditionalDetail());<a name="line.111"></a> +<span class="sourceLineNo">112</span> }<a name="line.112"></a> +<span class="sourceLineNo">113</span><a name="line.113"></a> +<span class="sourceLineNo">114</span> // translateException throws exception when should not retry: i.e. when request is bad.<a name="line.114"></a> +<span class="sourceLineNo">115</span> interceptor.handleFailure(context, t);<a name="line.115"></a> +<span class="sourceLineNo">116</span> t = translateException(t);<a name="line.116"></a> +<span class="sourceLineNo">117</span> callable.throwable(t, maxAttempts != 1);<a name="line.117"></a> +<span class="sourceLineNo">118</span> RetriesExhaustedException.ThrowableWithExtraContext qt =<a name="line.118"></a> +<span class="sourceLineNo">119</span> new RetriesExhaustedException.ThrowableWithExtraContext(t,<a name="line.119"></a> +<span class="sourceLineNo">120</span> EnvironmentEdgeManager.currentTime(), toString());<a name="line.120"></a> +<span class="sourceLineNo">121</span> exceptions.add(qt);<a name="line.121"></a> +<span class="sourceLineNo">122</span> if (tries >= maxAttempts - 1) {<a name="line.122"></a> +<span class="sourceLineNo">123</span> throw new RetriesExhaustedException(tries, exceptions);<a name="line.123"></a> +<span class="sourceLineNo">124</span> }<a name="line.124"></a> +<span class="sourceLineNo">125</span> // If the server is dead, we need to wait a little before retrying, to give<a name="line.125"></a> +<span class="sourceLineNo">126</span> // a chance to the regions to be<a name="line.126"></a> +<span class="sourceLineNo">127</span> // tries hasn't been bumped up yet so we use "tries + 1" to get right pause time<a name="line.127"></a> +<span class="sourceLineNo">128</span> expectedSleep = callable.sleep(pause, tries + 1);<a name="line.128"></a> +<span class="sourceLineNo">129</span><a name="line.129"></a> +<span class="sourceLineNo">130</span> // If, after the planned sleep, there won't be enough time left, we stop now.<a name="line.130"></a> +<span class="sourceLineNo">131</span> long duration = singleCallDuration(expectedSleep);<a name="line.131"></a> +<span class="sourceLineNo">132</span> if (duration > callTimeout) {<a name="line.132"></a> +<span class="sourceLineNo">133</span> String msg = "callTimeout=" + callTimeout + ", callDuration=" + duration +<a name="line.133"></a> +<span class="sourceLineNo">134</span> ": " + callable.getExceptionMessageAdditionalDetail();<a name="line.134"></a> +<span class="sourceLineNo">135</span> throw (SocketTimeoutException)(new SocketTimeoutException(msg).initCause(t));<a name="line.135"></a> +<span class="sourceLineNo">136</span> }<a name="line.136"></a> +<span class="sourceLineNo">137</span> } finally {<a name="line.137"></a> +<span class="sourceLineNo">138</span> interceptor.updateFailureInfo(context);<a name="line.138"></a> +<span class="sourceLineNo">139</span> }<a name="line.139"></a> +<span class="sourceLineNo">140</span> try {<a name="line.140"></a> +<span class="sourceLineNo">141</span> if (expectedSleep > 0) {<a name="line.141"></a> +<span class="sourceLineNo">142</span> synchronized (cancelled) {<a name="line.142"></a> +<span class="sourceLineNo">143</span> if (cancelled.get()) return null;<a name="line.143"></a> +<span class="sourceLineNo">144</span> cancelled.wait(expectedSleep);<a name="line.144"></a> +<span class="sourceLineNo">145</span> }<a name="line.145"></a> +<span class="sourceLineNo">146</span> }<a name="line.146"></a> +<span class="sourceLineNo">147</span> if (cancelled.get()) return null;<a name="line.147"></a> +<span class="sourceLineNo">148</span> } catch (InterruptedException e) {<a name="line.148"></a> +<span class="sourceLineNo">149</span> throw new InterruptedIOException("Interrupted after " + tries<a name="line.149"></a> +<span class="sourceLineNo">150</span> + " tries while maxAttempts=" + maxAttempts);<a name="line.150"></a> +<span class="sourceLineNo">151</span> }<a name="line.151"></a> +<span class="sourceLineNo">152</span> }<a name="line.152"></a> +<span class="sourceLineNo">153</span> }<a name="line.153"></a> +<span class="sourceLineNo">154</span><a name="line.154"></a> +<span class="sourceLineNo">155</span> /**<a name="line.155"></a> +<span class="sourceLineNo">156</span> * @return Calculate how long a single call took<a name="line.156"></a> +<span class="sourceLineNo">157</span> */<a name="line.157"></a> +<span class="sourceLineNo">158</span> private long singleCallDuration(final long expectedSleep) {<a name="line.158"></a> +<span class="sourceLineNo">159</span> return (EnvironmentEdgeManager.currentTime() - tracker.getStartTime()) + expectedSleep;<a name="line.159"></a> +<span class="sourceLineNo">160</span> }<a name="line.160"></a> +<span class="sourceLineNo">161</span><a name="line.161"></a> +<span class="sourceLineNo">162</span> @Override<a name="line.162"></a> +<span class="sourceLineNo">163</span> public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)<a name="line.163"></a> +<span class="sourceLineNo">164</span> throws IOException, RuntimeException {<a name="line.164"></a> +<span class="sourceLineNo">165</span> // The code of this method should be shared with withRetries.<a name="line.165"></a> +<span class="sourceLineNo">166</span> try {<a name="line.166"></a> +<span class="sourceLineNo">167</span> callable.prepare(false);<a name="line.167"></a> +<span class="sourceLineNo">168</span> return callable.call(callTimeout);<a name="line.168"></a> +<span class="sourceLineNo">169</span> } catch (Throwable t) {<a name="line.169"></a> +<span class="sourceLineNo">170</span> Throwable t2 = translateException(t);<a name="line.170"></a> +<span class="sourceLineNo">171</span> ExceptionUtil.rethrowIfInterrupt(t2);<a name="line.171"></a> +<span class="sourceLineNo">172</span> // It would be nice to clear the location cache here.<a name="line.172"></a> +<span class="sourceLineNo">173</span> if (t2 instanceof IOException) {<a name="line.173"></a> +<span class="sourceLineNo">174</span> throw (IOException)t2;<a name="line.174"></a> +<span class="sourceLineNo">175</span> } else {<a name="line.175"></a> +<span class="sourceLineNo">176</span> throw new RuntimeException(t2);<a name="line.176"></a> +<span class="sourceLineNo">177</span> }<a name="line.177"></a> +<span class="sourceLineNo">178</span> }<a name="line.178"></a> +<span class="sourceLineNo">179</span> }<a name="line.179"></a> +<span class="sourceLineNo">180</span> <a name="line.180"></a> +<span class="sourceLineNo">181</span> /**<a name="line.181"></a> +<span class="sourceLineNo">182</span> * Get the good or the remote exception if any, throws the DoNotRetryIOException.<a name="line.182"></a> +<span class="sourceLineNo">183</span> * @param t the throwable to analyze<a name="line.183"></a> +<span class="sourceLineNo">184</span> * @return the translated exception, if it's not a DoNotRetryIOException<a name="line.184"></a> +<span class="sourceLineNo">185</span> * @throws DoNotRetryIOException - if we find it, we throw it instead of translating.<a name="line.185"></a> +<span class="sourceLineNo">186</span> */<a name="line.186"></a> +<span class="sourceLineNo">187</span> static Throwable translateException(Throwable t) throws DoNotRetryIOException {<a name="line.187"></a> +<span class="sourceLineNo">188</span> if (t instanceof UndeclaredThrowableException) {<a name="line.188"></a> +<span class="sourceLineNo">189</span> if (t.getCause() != null) {<a name="line.189"></a> +<span class="sourceLineNo">190</span> t = t.getCause();<a name="line.190"></a> +<span class="sourceLineNo">191</span> }<a name="line.191"></a> <span class="sourceLineNo">192</span> }<a name="line.192"></a> -<span class="sourceLineNo">193</span> if (t instanceof LinkageError) {<a name="line.193"></a> -<span class="sourceLineNo">194</span> throw new DoNotRetryIOException(t);<a name="line.194"></a> +<span class="sourceLineNo">193</span> if (t instanceof RemoteException) {<a name="line.193"></a> +<span class="sourceLineNo">194</span> t = ((RemoteException)t).unwrapRemoteException();<a name="line.194"></a> <span class="sourceLineNo">195</span> }<a name="line.195"></a> -<span class="sourceLineNo">196</span> if (t instanceof ServiceException) {<a name="line.196"></a> -<span class="sourceLineNo">197</span> ServiceException se = (ServiceException)t;<a name="line.197"></a> -<span class="sourceLineNo">198</span> Throwable cause = se.getCause();<a name="line.198"></a> -<span class="sourceLineNo">199</span> if (cause != null && cause instanceof DoNotRetryIOException) {<a name="line.199"></a> -<span class="sourceLineNo">200</span> throw (DoNotRetryIOException)cause;<a name="line.200"></a> -<span class="sourceLineNo">201</span> }<a name="line.201"></a> -<span class="sourceLineNo">202</span> // Don't let ServiceException out; its rpc specific.<a name="line.202"></a> -<span class="sourceLineNo">203</span> t = cause;<a name="line.203"></a> -<span class="sourceLineNo">204</span> // t could be a RemoteException so go around again.<a name="line.204"></a> -<span class="sourceLineNo">205</span> translateException(t);<a name="line.205"></a> -<span class="sourceLineNo">206</span> } else if (t instanceof DoNotRetryIOException) {<a name="line.206"></a> -<span class="sourceLineNo">207</span> throw (DoNotRetryIOException)t;<a name="line.207"></a> -<span class="sourceLineNo">208</span> }<a name="line.208"></a> -<span class="sourceLineNo">209</span> return t;<a name="line.209"></a> -<span class="sourceLineNo">210</span> }<a name="line.210"></a> -<span class="sourceLineNo">211</span><a name="line.211"></a> -<span class="sourceLineNo">212</span> @Override<a name="line.212"></a> -<span class="sourceLineNo">213</span> public String toString() {<a name="line.213"></a> -<span class="sourceLineNo">214</span> return "RpcRetryingCaller{" + "globalStartTime=" + tracker.getStartTime() +<a name="line.214"></a> -<span class="sourceLineNo">215</span> ", pause=" + pause + ", maxAttempts=" + maxAttempts + '}';<a name="line.215"></a> -<span class="sourceLineNo">216</span> }<a name="line.216"></a> -<span class="sourceLineNo">217</span>}<a name="line.217"></a> +<span class="sourceLineNo">196</span> if (t instanceof LinkageError) {<a name="line.196"></a> +<span class="sourceLineNo">197</span> throw new DoNotRetryIOException(t);<a name="line.197"></a> +<span class="sourceLineNo">198</span> }<a name="line.198"></a> +<span class="sourceLineNo">199</span> if (t instanceof ServiceException) {<a name="line.199"></a> +<span class="sourceLineNo">200</span> ServiceException se = (ServiceException)t;<a name="line.200"></a> +<span class="sourceLineNo">201</span> Throwable cause = se.getCause();<a name="line.201"></a> +<span class="sourceLineNo">202</span> if (cause != null && cause instanceof DoNotRetryIOException) {<a name="line.202"></a> +<span class="sourceLineNo">203</span> throw (DoNotRetryIOException)cause;<a name="line.203"></a> +<span class="sourceLineNo">204</span> }<a name="line.204"></a> +<span class="sourceLineNo">205</span> // Don't let ServiceException out; its rpc specific.<a name="line.205"></a> +<span class="sourceLineNo">206</span> t = cause;<a name="line.206"></a> +<span class="sourceLineNo">207</span> // t could be a RemoteException so go around again.<a name="line.207"></a> +<span class="sourceLineNo">208</span> translateException(t);<a name="line.208"></a> +<span class="sourceLineNo">209</span> } else if (t instanceof DoNotRetryIOException) {<a name="line.209"></a> +<span class="sourceLineNo">210</span> throw (DoNotRetryIOException)t;<a name="line.210"></a> +<span class="sourceLineNo">211</span> }<a name="line.211"></a> +<span class="sourceLineNo">212</span> return t;<a name="line.212"></a> +<span class="sourceLineNo">213</span> }<a name="line.213"></a> +<span class="sourceLineNo">214</span><a name="line.214"></a> +<span class="sourceLineNo">215</span> private int getTimeout(int callTimeout){<a name="line.215"></a> +<span class="sourceLineNo">216</span> int timeout = tracker.getRemainingTime(callTimeout);<a name="line.216"></a> +<span class="sourceLineNo">217</span> if (timeout <= 0 || rpcTimeout > 0 && rpcTimeout < timeout){<a name="line.217"></a> +<span class="sourceLineNo">218</span> timeout = rpcTimeout;<a name="line.218"></a> +<span class="sourceLineNo">219</span> }<a name="line.219"></a> +<span class="sourceLineNo">220</span> return timeout;<a name="line.220"></a> +<span class="sourceLineNo">221</span> }<a name="line.221"></a> +<span class="sourceLineNo">222</span><a name="line.222"></a> +<span class="sourceLineNo">223</span> @Override<a name="line.223"></a> +<span class="sourceLineNo">224</span> public String toString() {<a name="line.224"></a> +<span class="sourceLineNo">225</span> return "RpcRetryingCaller{" + "globalStartTime=" + tracker.getStartTime() +<a name="line.225"></a> +<span class="sourceLineNo">226</span> ", pause=" + pause + ", maxAttempts=" + maxAttempts + '}';<a name="line.226"></a> +<span class="sourceLineNo">227</span> }<a name="line.227"></a> +<span class="sourceLineNo">228</span>}<a name="line.228"></a>
