HBASE-12729 Backport HBASE-5162 (Basic client pushback mechanism) to 0.98
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/85e7270b Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/85e7270b Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/85e7270b Branch: refs/heads/0.98 Commit: 85e7270b6f8cfb593c1cba137241f2aad986a3e1 Parents: d081756 Author: Andrew Purtell <[email protected]> Authored: Mon Jan 19 17:02:27 2015 -0800 Committer: Andrew Purtell <[email protected]> Committed: Mon Jan 19 17:02:27 2015 -0800 ---------------------------------------------------------------------- .../hadoop/hbase/client/AsyncProcess.java | 199 +++-- .../hadoop/hbase/client/ClientScanner.java | 5 +- .../hadoop/hbase/client/ClientSmallScanner.java | 4 +- .../hadoop/hbase/client/DelayingRunner.java | 117 +++ .../apache/hadoop/hbase/client/HBaseAdmin.java | 4 +- .../hadoop/hbase/client/HConnectionManager.java | 32 +- .../org/apache/hadoop/hbase/client/HTable.java | 4 +- .../hadoop/hbase/client/HTableMultiplexer.java | 8 +- .../apache/hadoop/hbase/client/MultiAction.java | 17 + .../org/apache/hadoop/hbase/client/Result.java | 17 + .../hadoop/hbase/client/ResultStatsUtil.java | 88 ++ .../hadoop/hbase/client/RpcRetryingCaller.java | 4 +- .../hbase/client/RpcRetryingCallerFactory.java | 43 +- .../hbase/client/ServerStatisticTracker.java | 74 ++ .../hbase/client/StatisticsHConnection.java | 39 + .../client/StatsTrackingRpcRetryingCaller.java | 63 ++ .../client/backoff/ClientBackoffPolicy.java | 42 + .../backoff/ClientBackoffPolicyFactory.java | 59 ++ .../backoff/ExponentialClientBackoffPolicy.java | 71 ++ .../hbase/client/backoff/ServerStatistics.java | 68 ++ .../hbase/protobuf/ResponseConverter.java | 19 +- .../client/TestClientExponentialBackoff.java | 110 +++ .../hadoop/hbase/client/TestDelayingRunner.java | 62 ++ .../org/apache/hadoop/hbase/HConstants.java | 5 + .../hbase/protobuf/generated/ClientProtos.java | 808 ++++++++++++++++++- hbase-protocol/src/main/protobuf/Client.proto | 10 + .../hbase/mapreduce/LoadIncrementalHFiles.java | 2 +- .../hadoop/hbase/regionserver/HRegion.java | 33 +- .../hbase/regionserver/HRegionServer.java | 25 +- .../regionserver/wal/WALEditsReplaySink.java | 5 +- .../hadoop/hbase/client/TestClientPushback.java | 160 ++++ 31 files changed, 2074 insertions(+), 123 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index d206c71..bd15837 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -46,6 +46,8 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.backoff.ServerStatistics; +import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -281,6 +283,19 @@ class AsyncProcess<CResult> { * @param atLeastOne true if we should submit at least a subset. */ public void submit(List<? extends Row> rows, boolean atLeastOne) throws InterruptedIOException { + submit(rows, atLeastOne, null); + } + + /** + * Extract from the rows list what we can submit. The rows we can not submit are kept in the + * list. + * + * @param rows - the submitted row. Modified by the method: we remove the rows we took. + * @param atLeastOne true if we should submit at least a subset. + * @param batchCallback Batch callback. Only called on success + */ + public void submit(List<? extends Row> rows, boolean atLeastOne, + Batch.Callback<CResult> batchCallback) throws InterruptedIOException { if (rows.isEmpty()) { return; } @@ -331,7 +346,7 @@ class AsyncProcess<CResult> { } while (retainedActions.isEmpty() && atLeastOne && !hasError()); HConnectionManager.ServerErrorTracker errorsByServer = createServerErrorTracker(); - sendMultiAction(retainedActions, actionsByServer, 1, errorsByServer); + sendMultiAction(retainedActions, actionsByServer, 1, errorsByServer, batchCallback); } /** @@ -520,7 +535,7 @@ class AsyncProcess<CResult> { } if (!actionsByServer.isEmpty()) { - sendMultiAction(initialActions, actionsByServer, numAttempt, errorsByServer); + sendMultiAction(initialActions, actionsByServer, numAttempt, errorsByServer, null); } } @@ -535,61 +550,141 @@ class AsyncProcess<CResult> { public void sendMultiAction(final List<Action<Row>> initialActions, Map<HRegionLocation, MultiAction<Row>> actionsByServer, final int numAttempt, - final HConnectionManager.ServerErrorTracker errorsByServer) { + final HConnectionManager.ServerErrorTracker errorsByServer, + Batch.Callback<CResult> batchCallback) { // Send the queries and add them to the inProgress list // This iteration is by server (the HRegionLocation comparator is by server portion only). for (Map.Entry<HRegionLocation, MultiAction<Row>> e : actionsByServer.entrySet()) { - final HRegionLocation loc = e.getKey(); - final MultiAction<Row> multiAction = e.getValue(); - incTaskCounters(multiAction.getRegions(), loc.getServerName()); - Runnable runnable = Trace.wrap("AsyncProcess.sendMultiAction", new Runnable() { - @Override - public void run() { - MultiResponse res; + HRegionLocation loc = e.getKey(); + MultiAction<Row> multiAction = e.getValue(); + Collection<? extends Runnable> runnables = getNewMultiActionRunnable(initialActions, loc, + multiAction, numAttempt, errorsByServer, batchCallback); + for (Runnable runnable: runnables) { + try { + incTaskCounters(multiAction.getRegions(), loc.getServerName()); + this.pool.submit(runnable); + } catch (RejectedExecutionException ree) { + // This should never happen. But as the pool is provided by the end user, let's secure + // this a little. + decTaskCounters(multiAction.getRegions(), loc.getServerName()); + LOG.warn("#" + id + ", the task was rejected by the pool. This is unexpected." + + " Server is " + loc.getServerName(), ree); + // We're likely to fail again, but this will increment the attempt counter, so it will + // finish. + receiveGlobalFailure(initialActions, multiAction, loc, numAttempt, ree, errorsByServer); + } + } + } + } + + private Runnable getNewSingleServerRunnable( + final List<Action<Row>> initialActions, + final HRegionLocation loc, + final MultiAction<Row> multiAction, + final int numAttempt, + final HConnectionManager.ServerErrorTracker errorsByServer, + final Batch.Callback<CResult> batchCallback) { + return new Runnable() { + @Override + public void run() { + MultiResponse res; + try { + MultiServerCallable<Row> callable = createCallable(loc, multiAction); try { - MultiServerCallable<Row> callable = createCallable(loc, multiAction); - try { - res = createCaller(callable).callWithoutRetries(callable, timeout); - } catch (IOException e) { - // The service itself failed . It may be an error coming from the communication - // layer, but, as well, a functional error raised by the server. - receiveGlobalFailure(initialActions, multiAction, loc, numAttempt, e, - errorsByServer); - return; - } catch (Throwable t) { - // This should not happen. Let's log & retry anyway. - LOG.error("#" + id + ", Caught throwable while calling. This is unexpected." + - " Retrying. Server is " + loc.getServerName() + ", tableName=" + tableName, t); - receiveGlobalFailure(initialActions, multiAction, loc, numAttempt, t, - errorsByServer); - return; - } + res = createCaller(callable).callWithoutRetries(callable, timeout); + } catch (IOException e) { + // The service itself failed . It may be an error coming from the communication + // layer, but, as well, a functional error raised by the server. + receiveGlobalFailure(initialActions, multiAction, loc, numAttempt, e, + errorsByServer); + return; + } catch (Throwable t) { + // This should not happen. Let's log & retry anyway. + LOG.error("#" + id + ", Caught throwable while calling. This is unexpected." + + " Retrying. Server is " + loc.getServerName() + ", tableName=" + tableName, t); + receiveGlobalFailure(initialActions, multiAction, loc, numAttempt, t, + errorsByServer); + return; + } - // Nominal case: we received an answer from the server, and it's not an exception. - receiveMultiAction(initialActions, multiAction, loc, res, numAttempt, errorsByServer); + // Nominal case: we received an answer from the server, and it's not an exception. + receiveMultiAction(initialActions, multiAction, loc, res, numAttempt, errorsByServer, + batchCallback); - } finally { - decTaskCounters(multiAction.getRegions(), loc.getServerName()); - } + } finally { + decTaskCounters(multiAction.getRegions(), loc.getServerName()); } - }); - - try { - this.pool.submit(runnable); - } catch (RejectedExecutionException ree) { - // This should never happen. But as the pool is provided by the end user, let's secure - // this a little. - decTaskCounters(multiAction.getRegions(), loc.getServerName()); - LOG.warn("#" + id + ", the task was rejected by the pool. This is unexpected." + - " Server is " + loc.getServerName(), ree); - // We're likely to fail again, but this will increment the attempt counter, so it will - // finish. - receiveGlobalFailure(initialActions, multiAction, loc, numAttempt, ree, errorsByServer); } + }; + } + + private Collection<? extends Runnable> getNewMultiActionRunnable( + final List<Action<Row>> initialActions, + final HRegionLocation loc, + final MultiAction<Row> multiAction, + final int numAttempt, + final HConnectionManager.ServerErrorTracker errorsByServer, + final Batch.Callback<CResult> batchCallback) { + // no stats to manage, just do the standard action + if (!(AsyncProcess.this.hConnection instanceof StatisticsHConnection) || + ((StatisticsHConnection)AsyncProcess.this.hConnection).getStatisticsTracker() == null) { + List<Runnable> toReturn = new ArrayList<Runnable>(1); + toReturn.add(Trace.wrap("AsyncProcess.sendMultiAction", + getNewSingleServerRunnable(initialActions, loc, multiAction, numAttempt, + errorsByServer, batchCallback))); + return toReturn; + } else { + // group the actions by the amount of delay + Map<Long, DelayingRunner> actions = new HashMap<Long, DelayingRunner>(multiAction + .size()); + + // split up the actions + for (Map.Entry<byte[], List<Action<Row>>> e : multiAction.actions.entrySet()) { + Long backoff = getBackoff(loc); + DelayingRunner runner = actions.get(backoff); + if (runner == null) { + actions.put(backoff, new DelayingRunner(backoff, e)); + } else { + runner.add(e); + } + } + + List<Runnable> toReturn = new ArrayList<Runnable>(actions.size()); + for (DelayingRunner runner : actions.values()) { + String traceText = "AsyncProcess.sendMultiAction"; + Runnable runnable = getNewSingleServerRunnable(initialActions, loc, runner.getActions(), + numAttempt, errorsByServer, batchCallback); + // use a delay runner only if we need to sleep for some time + if (runner.getSleepTime() > 0) { + runner.setRunner(runnable); + traceText = "AsyncProcess.clientBackoff.sendMultiAction"; + runnable = runner; + } + runnable = Trace.wrap(traceText, runnable); + toReturn.add(runnable); + } + return toReturn; } } /** + * @param server server location where the target region is hosted + * @param regionName name of the region which we are going to write some data + * @return the amount of time the client should wait until it submit a request to the + * specified server and region + */ + private Long getBackoff(HRegionLocation location) { + Preconditions.checkState(AsyncProcess.this.hConnection instanceof StatisticsHConnection, + "AsyncProcess connection should be a StatisticsHConnection"); + ServerStatisticTracker tracker = ((StatisticsHConnection)AsyncProcess.this.hConnection) + .getStatisticsTracker(); + ServerStatistics stats = tracker.getStats(location.getServerName()); + return ((StatisticsHConnection)AsyncProcess.this.hConnection).getBackoffPolicy() + .getBackoffTime(location.getServerName(), location.getRegionInfo().getRegionName(), + stats); + } + + /** * Create a callable. Isolated to be easily overridden in the tests. */ protected MultiServerCallable<Row> createCallable(final HRegionLocation location, @@ -738,7 +833,8 @@ class AsyncProcess<CResult> { private void receiveMultiAction(List<Action<Row>> initialActions, MultiAction<Row> multiAction, HRegionLocation location, MultiResponse responses, int numAttempt, - HConnectionManager.ServerErrorTracker errorsByServer) { + HConnectionManager.ServerErrorTracker errorsByServer, + Batch.Callback<CResult> batchCallback) { assert responses != null; // Success or partial success @@ -780,12 +876,17 @@ class AsyncProcess<CResult> { toReplay.add(correspondingAction); } } else { // success - if (callback != null) { + if (callback != null || batchCallback != null) { int index = regionResult.getFirst(); Action<Row> correspondingAction = initialActions.get(index); Row row = correspondingAction.getAction(); - //noinspection unchecked - this.callback.success(index, resultsForRS.getKey(), row, (CResult) result); + if (callback != null) { + //noinspection unchecked + this.callback.success(index, resultsForRS.getKey(), row, (CResult) result); + } + if (batchCallback != null) { + batchCallback.update(resultsForRS.getKey(), row.getRow(), (CResult) result); + } } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index bea2a1b..7821f46 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -107,7 +107,10 @@ public class ClientScanner extends AbstractClientScanner { */ public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName, HConnection connection) throws IOException { - this(conf, scan, tableName, connection, RpcRetryingCallerFactory.instantiate(conf), + this(conf, scan, tableName, connection, + RpcRetryingCallerFactory.instantiate(conf, + connection instanceof StatisticsHConnection ? + ((StatisticsHConnection)connection).getStatisticsTracker() : null), RpcControllerFactory.instantiate(conf)); } http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java index 31df3b0..ad9380f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java @@ -83,7 +83,9 @@ public class ClientSmallScanner extends ClientScanner { */ public ClientSmallScanner(final Configuration conf, final Scan scan, final TableName tableName, HConnection connection) throws IOException { - this(conf, scan, tableName, connection, RpcRetryingCallerFactory.instantiate(conf), + this(conf, scan, tableName, connection, RpcRetryingCallerFactory.instantiate(conf, + connection instanceof StatisticsHConnection ? + ((StatisticsHConnection)connection).getStatisticsTracker() : null), RpcControllerFactory.instantiate(conf)); } http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelayingRunner.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelayingRunner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelayingRunner.java new file mode 100644 index 0000000..22c0461 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelayingRunner.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +/** + * A wrapper for a runnable for a group of actions for a single regionserver. + * <p> + * This can be used to build up the actions that should be taken and then + * </p> + * <p> + * This class exists to simulate using a ScheduledExecutorService with just a regular + * ExecutorService and Runnables. It is used for legacy reasons in the the client; this could + * only be removed if we change the expectations in HTable around the pool the client is able to + * pass in and even if we deprecate the current APIs would require keeping this class around + * for the interim to bridge between the legacy ExecutorServices and the scheduled pool. + * </p> + */ [email protected] +public class DelayingRunner<T> implements Runnable { + private static final Log LOG = LogFactory.getLog(DelayingRunner.class); + + private final Object sleepLock = new Object(); + private boolean triggerWake = false; + private long sleepTime; + private MultiAction<T> actions = new MultiAction<T>(); + private Runnable runnable; + + public DelayingRunner(long sleepTime, Entry<byte[], List<Action<T>>> e) { + this.sleepTime = sleepTime; + add(e); + } + + public void setRunner(Runnable runner) { + this.runnable = runner; + } + + @Override + public void run() { + if (!sleep()) { + LOG.warn( + "Interrupted while sleeping for expected sleep time " + sleepTime + " ms"); + } + //TODO maybe we should consider switching to a listenableFuture for the actual callable and + // then handling the results/errors as callbacks. That way we can decrement outstanding tasks + // even if we get interrupted here, but for now, we still need to run so we decrement the + // outstanding tasks + this.runnable.run(); + } + + /** + * Sleep for an expected amount of time. + * <p> + * This is nearly a copy of what the Sleeper does, but with the ability to know if you + * got interrupted while sleeping. + * </p> + * + * @return <tt>true</tt> if the sleep completely entirely successfully, + * but otherwise <tt>false</tt> if the sleep was interrupted. + */ + private boolean sleep() { + long now = EnvironmentEdgeManager.currentTimeMillis(); + long startTime = now; + long waitTime = sleepTime; + while (waitTime > 0) { + long woke = -1; + try { + synchronized (sleepLock) { + if (triggerWake) break; + sleepLock.wait(waitTime); + } + woke = EnvironmentEdgeManager.currentTimeMillis(); + } catch (InterruptedException iex) { + return false; + } + // Recalculate waitTime. + woke = (woke == -1) ? EnvironmentEdgeManager.currentTimeMillis() : woke; + waitTime = waitTime - (woke - startTime); + } + return true; + } + + public void add(Map.Entry<byte[], List<Action<T>>> e) { + actions.add(e.getKey(), e.getValue()); + } + + public MultiAction<T> getActions() { + return actions; + } + + public long getSleepTime() { + return sleepTime; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index effa069..e3af5e0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -214,7 +214,9 @@ public class HBaseAdmin implements Abortable, Closeable { HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); this.retryLongerMultiplier = this.conf.getInt( "hbase.client.retries.longer.multiplier", 10); - this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf); + this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf, + connection instanceof StatisticsHConnection ? + ((StatisticsHConnection)connection).getStatisticsTracker() : null); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java index da7a122..b06dc6f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java @@ -66,6 +66,8 @@ import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase; +import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; +import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.exceptions.RegionOpeningException; @@ -592,7 +594,7 @@ public class HConnectionManager { @edu.umd.cs.findbugs.annotations.SuppressWarnings( value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION", justification="Access to the conncurrent hash map is under a lock so should be fine.") - public static class HConnectionImplementation implements HConnection, Closeable { + public static class HConnectionImplementation implements StatisticsHConnection, Closeable { static final Log LOG = LogFactory.getLog(HConnectionImplementation.class); private final long pause; private final int numTries; @@ -664,6 +666,11 @@ public class HConnectionManager { private RpcControllerFactory rpcControllerFactory; + // single tracker per connection + private final ServerStatisticTracker stats; + + private final ClientBackoffPolicy backoffPolicy; + /** * Cluster registry of basic info such as clusterid and meta region location. */ @@ -720,7 +727,7 @@ public class HConnectionManager { } } - this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf); + this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, this.stats); this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); } @@ -760,13 +767,16 @@ public class HConnectionManager { this.nonceGenerator = new NoNonceGenerator(); } + this.stats = ServerStatisticTracker.create(conf); this.usePrefetch = conf.getBoolean(HConstants.HBASE_CLIENT_PREFETCH, HConstants.DEFAULT_HBASE_CLIENT_PREFETCH); this.prefetchRegionLimit = conf.getInt( HConstants.HBASE_CLIENT_PREFETCH_LIMIT, HConstants.DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT); - this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf); this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); + this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, this.stats); + this.backoffPolicy = ClientBackoffPolicyFactory.create(conf); + } @Override @@ -2418,11 +2428,12 @@ public class HConnectionManager { // For tests. protected <R> AsyncProcess createAsyncProcess(TableName tableName, ExecutorService pool, AsyncProcess.AsyncProcessCallback<R> callback, Configuration conf) { - return new AsyncProcess<R>(this, tableName, pool, callback, conf, - RpcRetryingCallerFactory.instantiate(conf), RpcControllerFactory.instantiate(conf)); + RpcControllerFactory controllerFactory = RpcControllerFactory.instantiate(conf); + RpcRetryingCallerFactory callerFactory = RpcRetryingCallerFactory.instantiate(conf, this.stats); + return new AsyncProcess<R>(this, tableName, pool, callback, conf, callerFactory, + controllerFactory); } - /** * Fill the result array for the interfaces using it. */ @@ -2461,6 +2472,15 @@ public class HConnectionManager { } } + @Override + public ServerStatisticTracker getStatisticsTracker() { + return this.stats; + } + + @Override + public ClientBackoffPolicy getBackoffPolicy() { + return this.backoffPolicy; + } /* * Return the number of cached region for a table. It will only be called http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 1d8a037..9cc6d7b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -379,7 +379,9 @@ public class HTable implements HTableInterface { this.scannerCaching = tableConfiguration.getScannerCaching(); if (this.rpcCallerFactory == null) { - this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(configuration); + this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(configuration, + this.connection instanceof StatisticsHConnection ? + ((StatisticsHConnection)this.connection).getStatisticsTracker() : null); } if (this.rpcControllerFactory == null) { this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration); http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java index 709e5fd..7f5a4ca 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java @@ -406,14 +406,16 @@ public class HTableMultiplexer { private final ScheduledExecutorService executor; private final int maxRetryInQueue; private final AtomicInteger retryInQueue = new AtomicInteger(0); - + public FlushWorker(Configuration conf, HConnection conn, HRegionLocation addr, HTableMultiplexer multiplexer, int perRegionServerBufferQueueSize, ExecutorService pool, ScheduledExecutorService executor) { this.addr = addr; this.multiplexer = multiplexer; this.queue = new LinkedBlockingQueue<PutStatus>(perRegionServerBufferQueueSize); - RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf); + RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, + conn instanceof StatisticsHConnection ? + ((StatisticsHConnection)conn).getStatisticsTracker() : null); RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf); this.ap = new AsyncProcess<Object>(conn, null, pool, this, conf, rpcCallerFactory, rpcControllerFactory); @@ -519,7 +521,7 @@ public class HTableMultiplexer { try { HConnectionManager.ServerErrorTracker errorsByServer = new HConnectionManager.ServerErrorTracker(1, 10); - ap.sendMultiAction(retainedActions, actionsByServer, 10, errorsByServer); + ap.sendMultiAction(retainedActions, actionsByServer, 10, errorsByServer, null); ap.waitUntilDone(); if (ap.hasError()) { http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java index e1cfc9a..0af351f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java @@ -76,6 +76,23 @@ public final class MultiAction<R> { rsActions.add(a); } + /** + * Add an list of Actions to this container based on it's regionName. If the regionName + * is wrong, the initial execution will fail, but will be automatically + * retried after looking up the correct region. + * + * @param regionName + * @param a + */ + public void add(byte[] regionName, List<Action<R>> a) { + List<Action<R>> rsActions = actions.get(regionName); + if (rsActions == null) { + rsActions = new ArrayList<Action<R>>(a.size()); + actions.put(regionName, rsActions); + } + rsActions.addAll(a); + } + public void setNonceGroup(long nonceGroup) { this.nonceGroup = nonceGroup; } http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java index b995e28..c220718 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.util.Bytes; /** @@ -93,6 +94,7 @@ public class Result implements CellScannable, CellScanner { * Index for where we are when Result is acting as a {@link CellScanner}. */ private int cellScannerIndex = INITIAL_CELLSCANNER_INDEX; + private ClientProtos.RegionLoadStats loadStats; /** * Creates an empty Result w/ no KeyValue payload; returns null if you call {@link #rawCells()}. @@ -839,4 +841,19 @@ public class Result implements CellScannable, CellScanner { this.exists = exists; } + /** + * Add load information about the region to the information about the result + * @param loadStats statistics about the current region from which this was returned + */ + public void addResults(ClientProtos.RegionLoadStats loadStats) { + this.loadStats = loadStats; + } + + /** + * @return the associated statistics about the region from which this was returned. Can be + * <tt>null</tt> if stats are disabled. + */ + public ClientProtos.RegionLoadStats getStats() { + return loadStats; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java new file mode 100644 index 0000000..3398d7d --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.util.Pair; + +/** + * A {@link Result} with some statistics about the server/region status + */ [email protected] +public final class ResultStatsUtil { + + private ResultStatsUtil() { + //private ctor for util class + } + + /** + * Update the stats for the specified region if the result is an instance of {@link + * ResultStatsUtil} + * + * @param r object that contains the result and possibly the statistics about the region + * @param serverStats stats tracker to update from the result + * @param server server from which the result was obtained + * @param regionName full region name for the stats. + * @return the underlying {@link Result} if the passed result is an {@link + * ResultStatsUtil} or just returns the result; + */ + public static <T> T updateStats(T r, ServerStatisticTracker serverStats, + ServerName server, byte[] regionName) { + if (!(r instanceof Result)) { + return r; + } + Result result = (Result) r; + // early exit if there are no stats to collect + ClientProtos.RegionLoadStats stats = result.getStats(); + if (stats == null) { + return r; + } + serverStats.updateRegionStats(server, regionName, stats); + return r; + } + + public static <T> T updateStats(T r, ServerStatisticTracker stats, + HRegionLocation regionLocation) { + // Writes submitted using multi() will receive MultiResponses + if (r instanceof MultiResponse) { + MultiResponse mr = (MultiResponse) r; + for (Map.Entry<byte[], List<Pair<Integer, Object>>> e: mr.getResults().entrySet()) { + byte[] regionName = e.getKey(); + for (Pair<Integer, Object> regionResult : e.getValue()) { + Object o = regionResult.getSecond(); + if (o instanceof Result) { + Result result = (Result) o; + ClientProtos.RegionLoadStats loadStats = result.getStats(); + if (loadStats != null) { + stats.updateRegionStats(regionLocation.getServerName(), regionName, loadStats); + // Once we have stats for one region we can move on to the next + break; + } + } + } + } + } + return r; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java index 9552992..95fbb92 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java @@ -39,9 +39,7 @@ import org.apache.hadoop.ipc.RemoteException; import com.google.protobuf.ServiceException; /** - * Runs an rpc'ing {@link RetryingCallable}. Sets into rpc client - * threadlocal outstanding timeouts as so we don't persist too much. - * Dynamic rather than static so can set the generic appropriately. + * Dynamic rather than static so can set the generic return type appropriately. */ @InterfaceAudience.Private @edu.umd.cs.findbugs.annotations.SuppressWarnings http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java index 53d5c58..dd9c725 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java @@ -19,11 +19,13 @@ package org.apache.hadoop.hbase.client; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.ReflectionUtils; /** * Factory to create an {@link RpcRetryingCaller} */ [email protected] public class RpcRetryingCallerFactory { /** Configuration key for a custom {@link RpcRetryingCaller} */ @@ -32,31 +34,62 @@ public class RpcRetryingCallerFactory { private final long pause; private final int retries; private final int startLogErrorsCnt; + private final boolean enableBackPressure; + private ServerStatisticTracker stats; public RpcRetryingCallerFactory(Configuration conf) { + this(conf, null); + } + + public RpcRetryingCallerFactory(Configuration conf, ServerStatisticTracker stats) { this.conf = conf; + this.stats = stats; pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY, AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT); + enableBackPressure = conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, + HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE); + } + + /** + * Set the tracker that should be used for tracking statistics about the server + */ + public void setStatisticTracker(ServerStatisticTracker statisticTracker) { + this.stats = statisticTracker; } public <T> RpcRetryingCaller<T> newCaller() { // We store the values in the factory instance. This way, constructing new objects // is cheap as it does not require parsing a complex structure. - return new RpcRetryingCaller<T>(pause, retries, startLogErrorsCnt); + RpcRetryingCaller<T> caller; + if (enableBackPressure && this.stats != null) { + caller = new StatsTrackingRpcRetryingCaller<T>(pause, retries, startLogErrorsCnt, + this.stats); + } else { + caller = new RpcRetryingCaller<T>(pause, retries, startLogErrorsCnt); + } + return caller; } - public static RpcRetryingCallerFactory instantiate(Configuration configuration) { + public static RpcRetryingCallerFactory instantiate(Configuration configuration, + ServerStatisticTracker stats) { String clazzName = RpcRetryingCallerFactory.class.getName(); String rpcCallerFactoryClazz = configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, clazzName); if (rpcCallerFactoryClazz.equals(clazzName)) { - return new RpcRetryingCallerFactory(configuration); + return new RpcRetryingCallerFactory(configuration, stats); + } + try { + return ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz, + new Class[] { Configuration.class, ServerStatisticTracker.class }, + new Object[] { configuration, stats }); + } catch (UnsupportedOperationException e) { + return ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz, + new Class[] { Configuration.class }, + new Object[] { configuration }); } - return ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz, - new Class[] { Configuration.class }, new Object[] { configuration }); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java new file mode 100644 index 0000000..0c7b683 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.backoff.ServerStatistics; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Tracks the statistics for multiple regions + */ [email protected] +public class ServerStatisticTracker { + + private final Map<ServerName, ServerStatistics> stats = + new ConcurrentHashMap<ServerName, ServerStatistics>(); + + public void updateRegionStats(ServerName server, byte[] region, ClientProtos.RegionLoadStats + currentStats) { + ServerStatistics stat = stats.get(server); + + if (stat == null) { + // create a stats object and update the stats + synchronized (this) { + stat = stats.get(server); + // we don't have stats for that server yet, so we need to make some + if (stat == null) { + stat = new ServerStatistics(); + stats.put(server, stat); + } + } + } + stat.update(region, currentStats); + } + + public ServerStatistics getStats(ServerName server) { + return this.stats.get(server); + } + + public static ServerStatisticTracker create(Configuration conf) { + if (!conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, + HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE)) { + return null; + } + return new ServerStatisticTracker(); + } + + @VisibleForTesting + ServerStatistics getServerStatsForTesting(ServerName server) { + return stats.get(server); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatisticsHConnection.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatisticsHConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatisticsHConnection.java new file mode 100644 index 0000000..79bf9f7 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatisticsHConnection.java @@ -0,0 +1,39 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; + +/** + * A server statistics tracking aware HConnection. + */ [email protected] +public interface StatisticsHConnection extends HConnection { + /** + * @return the current statistics tracker associated with this connection + */ + ServerStatisticTracker getStatisticsTracker(); + + /** + * @return the configured client backoff policy + */ + ClientBackoffPolicy getBackoffPolicy(); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java new file mode 100644 index 0000000..2a5f0ec --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +import java.io.IOException; + +/** + * An {@link RpcRetryingCaller} that will update the per-region stats for the call on return, + * if stats are available + */ [email protected] +public class StatsTrackingRpcRetryingCaller<T> extends RpcRetryingCaller<T> { + private final ServerStatisticTracker stats; + + public StatsTrackingRpcRetryingCaller(long pause, int retries, int startLogErrorsCnt, + ServerStatisticTracker stats) { + super(pause, retries, startLogErrorsCnt); + this.stats = stats; + } + + @Override + public T callWithRetries(RetryingCallable<T> callable, int callTimeout) + throws IOException, RuntimeException { + T result = super.callWithRetries(callable, callTimeout); + return updateStatsAndUnwrap(result, callable); + } + + @Override + public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout) + throws IOException, RuntimeException { + T result = super.callWithRetries(callable, callTimeout); + return updateStatsAndUnwrap(result, callable); + } + + private T updateStatsAndUnwrap(T result, RetryingCallable<T> callable) { + // don't track stats about requests that aren't to regionservers + if (!(callable instanceof RegionServerCallable)) { + return result; + } + + RegionServerCallable<T> regionCallable = (RegionServerCallable) callable; + HRegionLocation location = regionCallable.getLocation(); + return ResultStatsUtil.updateStats(result, stats, location); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicy.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicy.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicy.java new file mode 100644 index 0000000..94e434f --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicy.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.backoff; + +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Configurable policy for the amount of time a client should wait for a new request to the + * server when given the server load statistics. + * <p> + * Must have a single-argument constructor that takes a {@link org.apache.hadoop.conf.Configuration} + * </p> + */ [email protected] [email protected] +public interface ClientBackoffPolicy { + + public static final String BACKOFF_POLICY_CLASS = + "hbase.client.statistics.backoff-policy"; + + /** + * @return the number of ms to wait on the client based on the + */ + public long getBackoffTime(ServerName serverName, byte[] region, ServerStatistics stats); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicyFactory.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicyFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicyFactory.java new file mode 100644 index 0000000..879a0e2 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicyFactory.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.backoff; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.util.ReflectionUtils; + [email protected] [email protected] +public final class ClientBackoffPolicyFactory { + + private static final Log LOG = LogFactory.getLog(ClientBackoffPolicyFactory.class); + + private ClientBackoffPolicyFactory() { + } + + public static ClientBackoffPolicy create(Configuration conf) { + // create the backoff policy + String className = + conf.get(ClientBackoffPolicy.BACKOFF_POLICY_CLASS, NoBackoffPolicy.class + .getName()); + return ReflectionUtils.instantiateWithCustomCtor(className, + new Class<?>[] { Configuration.class }, new Object[] { conf }); + } + + /** + * Default backoff policy that doesn't create any backoff for the client, regardless of load + */ + public static class NoBackoffPolicy implements ClientBackoffPolicy { + public NoBackoffPolicy(Configuration conf){ + // necessary to meet contract + } + + @Override + public long getBackoffTime(ServerName serverName, byte[] region, ServerStatistics stats) { + return 0; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ExponentialClientBackoffPolicy.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ExponentialClientBackoffPolicy.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ExponentialClientBackoffPolicy.java new file mode 100644 index 0000000..6e75670 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ExponentialClientBackoffPolicy.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.backoff; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Simple exponential backoff policy on for the client that uses a percent^4 times the + * max backoff to generate the backoff time. + */ [email protected] [email protected] +public class ExponentialClientBackoffPolicy implements ClientBackoffPolicy { + + private static final Log LOG = LogFactory.getLog(ExponentialClientBackoffPolicy.class); + + private static final long ONE_MINUTE = 60 * 1000; + public static final long DEFAULT_MAX_BACKOFF = 5 * ONE_MINUTE; + public static final String MAX_BACKOFF_KEY = "hbase.client.exponential-backoff.max"; + private long maxBackoff; + + public ExponentialClientBackoffPolicy(Configuration conf) { + this.maxBackoff = conf.getLong(MAX_BACKOFF_KEY, DEFAULT_MAX_BACKOFF); + } + + @Override + public long getBackoffTime(ServerName serverName, byte[] region, ServerStatistics stats) { + // no stats for the server yet, so don't backoff + if (stats == null) { + return 0; + } + + ServerStatistics.RegionStatistics regionStats = stats.getStatsForRegion(region); + // no stats for the region yet - don't backoff + if (regionStats == null) { + return 0; + } + + // square the percent as a value less than 1. Closer we move to 100 percent, + // the percent moves to 1, but squaring causes the exponential curve + double percent = regionStats.getMemstoreLoadPercent() / 100.0; + double multiplier = Math.pow(percent, 4.0); + // shouldn't ever happen, but just incase something changes in the statistic data + if (multiplier > 1) { + LOG.warn("Somehow got a backoff multiplier greater than the allowed backoff. Forcing back " + + "down to the max backoff"); + multiplier = 1; + } + return (long) (multiplier * maxBackoff); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ServerStatistics.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ServerStatistics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ServerStatistics.java new file mode 100644 index 0000000..a3b8e11 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ServerStatistics.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.backoff; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.util.Bytes; + +import java.util.Map; +import java.util.TreeMap; + +/** + * Track the statistics for a single region + */ [email protected] +public class ServerStatistics { + + private Map<byte[], RegionStatistics> + stats = new TreeMap<byte[], RegionStatistics>(Bytes.BYTES_COMPARATOR); + + /** + * Good enough attempt. Last writer wins. It doesn't really matter which one gets to update, + * as something gets set + * @param region + * @param currentStats + */ + public void update(byte[] region, ClientProtos.RegionLoadStats currentStats) { + RegionStatistics regionStat = this.stats.get(region); + if(regionStat == null){ + regionStat = new RegionStatistics(); + this.stats.put(region, regionStat); + } + + regionStat.update(currentStats); + } + + @InterfaceAudience.Private + public RegionStatistics getStatsForRegion(byte[] regionName){ + return stats.get(regionName); + } + + public static class RegionStatistics{ + private int memstoreLoad = 0; + + public void update(ClientProtos.RegionLoadStats currentStats) { + this.memstoreLoad = currentStats.getMemstoreLoad(); + } + + public int getMemstoreLoadPercent(){ + return this.memstoreLoad; + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java index 367d5f9..887c4c0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java @@ -116,19 +116,23 @@ public final class ResponseConverter { } for (ResultOrException roe : actionResult.getResultOrExceptionList()) { + Object responseValue; if (roe.hasException()) { - results.add(regionName, new Pair<Integer, Object>(roe.getIndex(), - ProtobufUtil.toException(roe.getException()))); + responseValue = ProtobufUtil.toException(roe.getException()); } else if (roe.hasResult()) { - results.add(regionName, new Pair<Integer, Object>(roe.getIndex(), - ProtobufUtil.toResult(roe.getResult(), cells))); + responseValue = ProtobufUtil.toResult(roe.getResult(), cells); + // add the load stats, if we got any + if (roe.hasLoadStats()) { + ((Result) responseValue).addResults(roe.getLoadStats()); + } } else if (roe.hasServiceResult()) { - results.add(regionName, roe.getIndex(), roe.getServiceResult()); + responseValue = roe.getServiceResult(); } else { // no result & no exception. Unexpected. throw new IllegalStateException("No result & no exception roe=" + roe + " for region " + actions.getRegion()); } + results.add(regionName, roe.getIndex(), responseValue); } } @@ -151,11 +155,14 @@ public final class ResponseConverter { * Wrap a throwable to an action result. * * @param r + * @param stats * @return an action result builder */ - public static ResultOrException.Builder buildActionResult(final ClientProtos.Result r) { + public static ResultOrException.Builder buildActionResult(final ClientProtos.Result r, + ClientProtos.RegionLoadStats stats) { ResultOrException.Builder builder = ResultOrException.newBuilder(); if (r != null) builder.setResult(r); + if (stats != null) builder.setLoadStats(stats); return builder; } http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientExponentialBackoff.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientExponentialBackoff.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientExponentialBackoff.java new file mode 100644 index 0000000..24cb661 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientExponentialBackoff.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.backoff.ExponentialClientBackoffPolicy; +import org.apache.hadoop.hbase.client.backoff.ServerStatistics; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; +import org.mockito.Mockito; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestClientExponentialBackoff { + + ServerName server = Mockito.mock(ServerName.class); + byte[] regionname = Bytes.toBytes("region"); + + @Test + public void testNulls() { + Configuration conf = new Configuration(false); + ExponentialClientBackoffPolicy backoff = new ExponentialClientBackoffPolicy(conf); + assertEquals(0, backoff.getBackoffTime(null, null, null)); + + // server name doesn't matter to calculation, but check it now anyways + assertEquals(0, backoff.getBackoffTime(server, null, null)); + assertEquals(0, backoff.getBackoffTime(server, regionname, null)); + + // check when no stats for the region yet + ServerStatistics stats = new ServerStatistics(); + assertEquals(0, backoff.getBackoffTime(server, regionname, stats)); + } + + @Test + public void testMaxLoad() { + Configuration conf = new Configuration(false); + ExponentialClientBackoffPolicy backoff = new ExponentialClientBackoffPolicy(conf); + + ServerStatistics stats = new ServerStatistics(); + update(stats, 100); + assertEquals(ExponentialClientBackoffPolicy.DEFAULT_MAX_BACKOFF, backoff.getBackoffTime(server, + regionname, stats)); + + // another policy with a different max timeout + long max = 100; + conf.setLong(ExponentialClientBackoffPolicy.MAX_BACKOFF_KEY, max); + ExponentialClientBackoffPolicy backoffShortTimeout = new ExponentialClientBackoffPolicy(conf); + assertEquals(max, backoffShortTimeout.getBackoffTime(server, regionname, stats)); + + // test beyond 100 still doesn't exceed the max + update(stats, 101); + assertEquals(ExponentialClientBackoffPolicy.DEFAULT_MAX_BACKOFF, backoff.getBackoffTime(server, + regionname, stats)); + assertEquals(max, backoffShortTimeout.getBackoffTime(server, regionname, stats)); + + // and that when we are below 100, its less than the max timeout + update(stats, 99); + assertTrue(backoff.getBackoffTime(server, + regionname, stats) < ExponentialClientBackoffPolicy.DEFAULT_MAX_BACKOFF); + assertTrue(backoffShortTimeout.getBackoffTime(server, regionname, stats) < max); + } + + /** + * Make sure that we get results in the order that we expect - backoff for a load of 1 should + * less than backoff for 10, which should be less than that for 50. + */ + @Test + public void testResultOrdering() { + Configuration conf = new Configuration(false); + // make the max timeout really high so we get differentiation between load factors + conf.setLong(ExponentialClientBackoffPolicy.MAX_BACKOFF_KEY, Integer.MAX_VALUE); + ExponentialClientBackoffPolicy backoff = new ExponentialClientBackoffPolicy(conf); + + ServerStatistics stats = new ServerStatistics(); + long previous = backoff.getBackoffTime(server, regionname, stats); + for (int i = 1; i <= 100; i++) { + update(stats, i); + long next = backoff.getBackoffTime(server, regionname, stats); + assertTrue( + "Previous backoff time" + previous + " >= " + next + ", the next backoff time for " + + "load " + i, previous < next); + previous = next; + } + } + + private void update(ServerStatistics stats, int load) { + ClientProtos.RegionLoadStats stat = ClientProtos.RegionLoadStats.newBuilder() + .setMemstoreLoad + (load).build(); + stats.update(regionname, stat); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestDelayingRunner.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestDelayingRunner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestDelayingRunner.java new file mode 100644 index 0000000..308d7ef --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestDelayingRunner.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.*; + +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestDelayingRunner { + + private static final TableName DUMMY_TABLE = + TableName.valueOf("DUMMY_TABLE"); + private static final byte[] DUMMY_BYTES_1 = "DUMMY_BYTES_1".getBytes(); + private static final byte[] DUMMY_BYTES_2 = "DUMMY_BYTES_2".getBytes(); + private static HRegionInfo hri1 = + new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1); + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testDelayingRunner() throws Exception{ + MultiAction<Row> ma = new MultiAction<Row>(); + ma.add(hri1.getRegionName(), new Action<Row>(new Put(DUMMY_BYTES_1), 0)); + final AtomicLong endTime = new AtomicLong(); + final long sleepTime = 1000; + DelayingRunner runner = new DelayingRunner(sleepTime, ma.actions.entrySet().iterator().next()); + runner.setRunner(new Runnable() { + @Override + public void run() { + endTime.set(EnvironmentEdgeManager.currentTimeMillis()); + } + }); + long startTime = EnvironmentEdgeManager.currentTimeMillis(); + runner.run(); + long delay = endTime.get() - startTime; + assertTrue("DelayingRunner did not delay long enough", delay >= sleepTime); + assertFalse("DelayingRunner delayed too long", delay > sleepTime + sleepTime*0.2); + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 269a285..c181ad5 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1037,6 +1037,11 @@ public final class HConstants { /** Configuration key for setting replication codec class name */ public static final String REPLICATION_CODEC_CONF_KEY = "hbase.replication.rpc.codec"; + /** Config key for if the server should send backpressure and if the client should listen to + * that backpressure from the server */ + public static final String ENABLE_CLIENT_BACKPRESSURE = "hbase.client.backpressure.enabled"; + public static final boolean DEFAULT_ENABLE_CLIENT_BACKPRESSURE = false; + private HConstants() { // Can't be instantiated with this ctor. }
