HBASE-12730 Backport HBASE-5162 (Basic client pushback mechanism) to branch-1
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/04a003d6 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/04a003d6 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/04a003d6 Branch: refs/heads/branch-1 Commit: 04a003d6a2b628bc551a55ae556c1b44485e3022 Parents: 05f4e0c Author: Andrew Purtell <[email protected]> Authored: Thu Jan 22 14:50:40 2015 -0800 Committer: Andrew Purtell <[email protected]> Committed: Thu Jan 22 14:50:40 2015 -0800 ---------------------------------------------------------------------- .../hadoop/hbase/client/AsyncProcess.java | 120 ++- .../hadoop/hbase/client/ClusterConnection.java | 13 +- .../hadoop/hbase/client/ConnectionAdapter.java | 12 + .../hadoop/hbase/client/ConnectionManager.java | 26 +- .../hadoop/hbase/client/DelayingRunner.java | 116 +++ .../org/apache/hadoop/hbase/client/HTable.java | 5 +- .../apache/hadoop/hbase/client/MultiAction.java | 17 +- .../org/apache/hadoop/hbase/client/Result.java | 20 + .../hadoop/hbase/client/ResultStatsUtil.java | 76 ++ .../hbase/client/RpcRetryingCallerFactory.java | 48 +- .../hbase/client/ServerStatisticTracker.java | 75 ++ .../client/StatsTrackingRpcRetryingCaller.java | 78 ++ .../client/backoff/ClientBackoffPolicy.java | 42 + .../backoff/ClientBackoffPolicyFactory.java | 59 ++ .../backoff/ExponentialClientBackoffPolicy.java | 71 ++ .../hbase/client/backoff/ServerStatistics.java | 68 ++ .../hbase/ipc/RegionCoprocessorRpcChannel.java | 2 +- .../hbase/protobuf/ResponseConverter.java | 16 +- .../client/TestClientExponentialBackoff.java | 110 +++ .../hadoop/hbase/client/TestDelayingRunner.java | 62 ++ .../org/apache/hadoop/hbase/HConstants.java | 6 + .../hbase/protobuf/generated/ClientProtos.java | 810 ++++++++++++++++++- hbase-protocol/src/main/protobuf/Client.proto | 10 + .../hbase/mapreduce/LoadIncrementalHFiles.java | 3 +- .../hadoop/hbase/regionserver/HRegion.java | 34 +- .../hbase/regionserver/RSRpcServices.java | 19 +- .../regionserver/wal/WALEditsReplaySink.java | 2 +- .../hbase/client/HConnectionTestingUtility.java | 2 +- .../hadoop/hbase/client/TestClientPushback.java | 151 ++++ 29 files changed, 1984 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/04a003d6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index eef4035..c72fb0f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.Iterator; @@ -48,6 +49,7 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.backoff.ServerStatistics; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.util.Bytes; @@ -313,7 +315,8 @@ class AsyncProcess { * Uses default ExecutorService for this AP (must have been created with one). */ public <CResult> AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows, - boolean atLeastOne, Batch.Callback<CResult> callback, boolean needResults) throws InterruptedIOException { + boolean atLeastOne, Batch.Callback<CResult> callback, boolean needResults) + throws InterruptedIOException { return submit(null, tableName, rows, atLeastOne, callback, needResults); } @@ -374,7 +377,7 @@ class AsyncProcess { locationErrors = new ArrayList<Exception>(); locationErrorRows = new ArrayList<Integer>(); LOG.error("Failed to get region location ", ex); - // This action failed before creating ars. Add it to retained but do not add to submit list. + // This action failed before creating ars. Retain it, but do not add to submit list. // We will then add it to ars in an already-failed state. retainedActions.add(new Action<Row>(r, ++posInList)); locationErrors.add(ex); @@ -918,14 +921,12 @@ class AsyncProcess { return loc; } - - /** * Send a multi action structure to the servers, after a delay depending on the attempt * number. Asynchronous. * * @param actionsByServer the actions structured by regions - * @param numAttempt the attempt number. + * @param numAttempt the attempt number. * @param actionsForReplicaThread original actions for replica thread; null on non-first call. */ private void sendMultiAction(Map<ServerName, MultiAction<Row>> actionsByServer, @@ -935,33 +936,98 @@ class AsyncProcess { int actionsRemaining = actionsByServer.size(); // This iteration is by server (the HRegionLocation comparator is by server portion only). for (Map.Entry<ServerName, MultiAction<Row>> e : actionsByServer.entrySet()) { - final ServerName server = e.getKey(); - final MultiAction<Row> multiAction = e.getValue(); + ServerName server = e.getKey(); + MultiAction<Row> multiAction = e.getValue(); incTaskCounters(multiAction.getRegions(), server); - Runnable runnable = Trace.wrap("AsyncProcess.sendMultiAction", - new SingleServerRequestRunnable(multiAction, numAttempt, server)); - if ((--actionsRemaining == 0) && reuseThread) { - runnable.run(); - } else { - try { - pool.submit(runnable); - } catch (RejectedExecutionException ree) { - // This should never happen. But as the pool is provided by the end user, let's secure - // this a little. - decTaskCounters(multiAction.getRegions(), server); - LOG.warn("#" + id + ", the task was rejected by the pool. This is unexpected." + - " Server is " + server.getServerName(), ree); - // We're likely to fail again, but this will increment the attempt counter, so it will - // finish. - receiveGlobalFailure(multiAction, server, numAttempt, ree); + Collection<? extends Runnable> runnables = getNewMultiActionRunnable(server, multiAction, + numAttempt); + // make sure we correctly count the number of runnables before we try to reuse the send + // thread, in case we had to split the request into different runnables because of backoff + if (runnables.size() > actionsRemaining) { + actionsRemaining = runnables.size(); + } + + // run all the runnables + for (Runnable runnable : runnables) { + if ((--actionsRemaining == 0) && reuseThread) { + runnable.run(); + } else { + try { + pool.submit(runnable); + } catch (RejectedExecutionException ree) { + // This should never happen. But as the pool is provided by the end user, let's secure + // this a little. + decTaskCounters(multiAction.getRegions(), server); + LOG.warn("#" + id + ", the task was rejected by the pool. This is unexpected." + + " Server is " + server.getServerName(), ree); + // We're likely to fail again, but this will increment the attempt counter, so it will + // finish. + receiveGlobalFailure(multiAction, server, numAttempt, ree); + } } } } + if (actionsForReplicaThread != null) { startWaitingForReplicaCalls(actionsForReplicaThread); } } + private Collection<? extends Runnable> getNewMultiActionRunnable(ServerName server, + MultiAction<Row> multiAction, + int numAttempt) { + // no stats to manage, just do the standard action + if (AsyncProcess.this.connection.getStatisticsTracker() == null) { + return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction", + new SingleServerRequestRunnable(multiAction, numAttempt, server))); + } + + // group the actions by the amount of delay + Map<Long, DelayingRunner> actions = new HashMap<Long, DelayingRunner>(multiAction + .size()); + + // split up the actions + for (Map.Entry<byte[], List<Action<Row>>> e : multiAction.actions.entrySet()) { + Long backoff = getBackoff(server, e.getKey()); + DelayingRunner runner = actions.get(backoff); + if (runner == null) { + actions.put(backoff, new DelayingRunner(backoff, e)); + } else { + runner.add(e); + } + } + + List<Runnable> toReturn = new ArrayList<Runnable>(actions.size()); + for (DelayingRunner runner : actions.values()) { + String traceText = "AsyncProcess.sendMultiAction"; + Runnable runnable = + new SingleServerRequestRunnable(runner.getActions(), numAttempt, server); + // use a delay runner only if we need to sleep for some time + if (runner.getSleepTime() > 0) { + runner.setRunner(runnable); + traceText = "AsyncProcess.clientBackoff.sendMultiAction"; + runnable = runner; + } + runnable = Trace.wrap(traceText, runnable); + toReturn.add(runnable); + + } + return toReturn; + } + + /** + * @param server server location where the target region is hosted + * @param regionName name of the region which we are going to write some data + * @return the amount of time the client should wait until it submit a request to the + * specified server and region + */ + private Long getBackoff(ServerName server, byte[] regionName) { + ServerStatisticTracker tracker = AsyncProcess.this.connection.getStatisticsTracker(); + ServerStatistics stats = tracker.getStats(server); + return AsyncProcess.this.connection.getBackoffPolicy() + .getBackoffTime(server, regionName, stats); + } + /** * Starts waiting to issue replica calls on a different thread; or issues them immediately. */ @@ -1169,6 +1235,13 @@ class AsyncProcess { ++failed; } } else { + // update the stats about the region, if its a user table. We don't want to slow down + // updates to meta tables, especially from internal updates (master, etc). + if (AsyncProcess.this.connection.getStatisticsTracker() != null) { + result = ResultStatsUtil.updateStats(result, + AsyncProcess.this.connection.getStatisticsTracker(), server, regionName); + } + if (callback != null) { try { //noinspection unchecked @@ -1497,7 +1570,6 @@ class AsyncProcess { } } - @VisibleForTesting /** Create AsyncRequestFuture. Isolated to be easily overridden in the tests. */ protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture( http://git-wip-us.apache.org/repos/asf/hbase/blob/04a003d6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java index 62570b9..05d5c63 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java @@ -29,6 +29,8 @@ import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService; @@ -288,5 +290,14 @@ public interface ClusterConnection extends HConnection { * @return true if this is a managed connection. */ boolean isManaged(); -} + /** + * @return the current statistics tracker associated with this connection + */ + ServerStatisticTracker getStatisticsTracker(); + + /** + * @return the configured client backoff policy + */ + ClientBackoffPolicy getBackoffPolicy(); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/04a003d6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java index 7c8f8d8..a1b71f4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java @@ -30,6 +30,8 @@ import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; @@ -442,4 +444,14 @@ abstract class ConnectionAdapter implements ClusterConnection { public boolean isManaged() { return wrappedConnection.isManaged(); } + + @Override + public ServerStatisticTracker getStatisticsTracker() { + return wrappedConnection.getStatisticsTracker(); + } + + @Override + public ClientBackoffPolicy getBackoffPolicy() { + return wrappedConnection.getBackoffPolicy(); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/04a003d6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java index 7bfa972..581bde8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java @@ -64,6 +64,8 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase; +import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; +import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.exceptions.RegionOpeningException; @@ -537,6 +539,8 @@ class ConnectionManager { final int rpcTimeout; private NonceGenerator nonceGenerator = null; private final AsyncProcess asyncProcess; + // single tracker per connection + private final ServerStatisticTracker stats; private volatile boolean closed; private volatile boolean aborted; @@ -592,6 +596,8 @@ class ConnectionManager { */ Registry registry; + private final ClientBackoffPolicy backoffPolicy; + HConnectionImplementation(Configuration conf, boolean managed) throws IOException { this(conf, managed, null, null); } @@ -666,9 +672,11 @@ class ConnectionManager { } else { this.nonceGenerator = new NoNonceGenerator(); } + stats = ServerStatisticTracker.create(conf); this.asyncProcess = createAsyncProcess(this.conf); this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build(); - this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor); + this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats); + this.backoffPolicy = ClientBackoffPolicyFactory.create(conf); } @Override @@ -2184,7 +2192,8 @@ class ConnectionManager { protected AsyncProcess createAsyncProcess(Configuration conf) { // No default pool available. return new AsyncProcess(this, conf, this.batchPool, - RpcRetryingCallerFactory.instantiate(conf), false, RpcControllerFactory.instantiate(conf)); + RpcRetryingCallerFactory.instantiate(conf, this.getStatisticsTracker()), false, + RpcControllerFactory.instantiate(conf)); } @Override @@ -2192,6 +2201,16 @@ class ConnectionManager { return asyncProcess; } + @Override + public ServerStatisticTracker getStatisticsTracker() { + return this.stats; + } + + @Override + public ClientBackoffPolicy getBackoffPolicy() { + return this.backoffPolicy; + } + /* * Return the number of cached region for a table. It will only be called * from a unit test. @@ -2469,7 +2488,8 @@ class ConnectionManager { @Override public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) { - return RpcRetryingCallerFactory.instantiate(conf, this.interceptor); + return RpcRetryingCallerFactory + .instantiate(conf, this.interceptor, this.getStatisticsTracker()); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/04a003d6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelayingRunner.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelayingRunner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelayingRunner.java new file mode 100644 index 0000000..83c73b6 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelayingRunner.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +import java.util.List; +import java.util.Map; + +/** + * A wrapper for a runnable for a group of actions for a single regionserver. + * <p> + * This can be used to build up the actions that should be taken and then + * </p> + * <p> + * This class exists to simulate using a ScheduledExecutorService with just a regular + * ExecutorService and Runnables. It is used for legacy reasons in the the client; this could + * only be removed if we change the expectations in HTable around the pool the client is able to + * pass in and even if we deprecate the current APIs would require keeping this class around + * for the interim to bridge between the legacy ExecutorServices and the scheduled pool. + * </p> + */ [email protected] +public class DelayingRunner<T> implements Runnable { + private static final Log LOG = LogFactory.getLog(DelayingRunner.class); + + private final Object sleepLock = new Object(); + private boolean triggerWake = false; + private long sleepTime; + private MultiAction<T> actions = new MultiAction<T>(); + private Runnable runnable; + + public DelayingRunner(long sleepTime, Map.Entry<byte[], List<Action<T>>> e) { + this.sleepTime = sleepTime; + add(e); + } + + public void setRunner(Runnable runner) { + this.runnable = runner; + } + + @Override + public void run() { + if (!sleep()) { + LOG.warn( + "Interrupted while sleeping for expected sleep time " + sleepTime + " ms"); + } + //TODO maybe we should consider switching to a listenableFuture for the actual callable and + // then handling the results/errors as callbacks. That way we can decrement outstanding tasks + // even if we get interrupted here, but for now, we still need to run so we decrement the + // outstanding tasks + this.runnable.run(); + } + + /** + * Sleep for an expected amount of time. + * <p> + * This is nearly a copy of what the Sleeper does, but with the ability to know if you + * got interrupted while sleeping. + * </p> + * + * @return <tt>true</tt> if the sleep completely entirely successfully, + * but otherwise <tt>false</tt> if the sleep was interrupted. + */ + private boolean sleep() { + long now = EnvironmentEdgeManager.currentTime(); + long startTime = now; + long waitTime = sleepTime; + while (waitTime > 0) { + long woke = -1; + try { + synchronized (sleepLock) { + if (triggerWake) break; + sleepLock.wait(waitTime); + } + woke = EnvironmentEdgeManager.currentTime(); + } catch (InterruptedException iex) { + return false; + } + // Recalculate waitTime. + woke = (woke == -1) ? EnvironmentEdgeManager.currentTime() : woke; + waitTime = waitTime - (woke - startTime); + } + return true; + } + + public void add(Map.Entry<byte[], List<Action<T>>> e) { + actions.add(e.getKey(), e.getValue()); + } + + public MultiAction<T> getActions() { + return actions; + } + + public long getSleepTime() { + return sleepTime; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/04a003d6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 9307667..e124b26 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -1872,8 +1872,9 @@ public class HTable implements HTableInterface { AsyncProcess asyncProcess = new AsyncProcess(connection, configuration, pool, - RpcRetryingCallerFactory.instantiate(configuration), true, - RpcControllerFactory.instantiate(configuration)); + RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()), + true, RpcControllerFactory.instantiate(configuration)); + AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs, new Callback<ClientProtos.CoprocessorServiceResult>() { @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/04a003d6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java index eefe40d..0a9055e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Set; @@ -68,12 +69,24 @@ public final class MultiAction<R> { * @param a */ public void add(byte[] regionName, Action<R> a) { + add(regionName, Arrays.asList(a)); + } + + /** + * Add an Action to this container based on it's regionName. If the regionName + * is wrong, the initial execution will fail, but will be automatically + * retried after looking up the correct region. + * + * @param regionName + * @param actionList list of actions to add for the region + */ + public void add(byte[] regionName, List<Action<R>> actionList){ List<Action<R>> rsActions = actions.get(regionName); if (rsActions == null) { - rsActions = new ArrayList<Action<R>>(); + rsActions = new ArrayList<Action<R>>(actionList.size()); actions.put(regionName, rsActions); } - rsActions.add(a); + rsActions.addAll(actionList); } public void setNonceGroup(long nonceGroup) { http://git-wip-us.apache.org/repos/asf/hbase/blob/04a003d6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java index 401710e..6730863 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java @@ -37,6 +37,9 @@ import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.util.Bytes; /** @@ -94,6 +97,7 @@ public class Result implements CellScannable, CellScanner { * Index for where we are when Result is acting as a {@link CellScanner}. */ private int cellScannerIndex = INITIAL_CELLSCANNER_INDEX; + private ClientProtos.RegionLoadStats stats; /** * Creates an empty Result w/ no KeyValue payload; returns null if you call {@link #rawCells()}. @@ -871,4 +875,20 @@ public class Result implements CellScannable, CellScanner { public boolean isStale() { return stale; } + + /** + * Add load information about the region to the information about the result + * @param loadStats statistics about the current region from which this was returned + */ + public void addResults(ClientProtos.RegionLoadStats loadStats) { + this.stats = loadStats; + } + + /** + * @return the associated statistics about the region from which this was returned. Can be + * <tt>null</tt> if stats are disabled. + */ + public ClientProtos.RegionLoadStats getStats() { + return stats; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/04a003d6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java new file mode 100644 index 0000000..3caa63e --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; + +/** + * A {@link Result} with some statistics about the server/region status + */ [email protected] +public final class ResultStatsUtil { + + private ResultStatsUtil() { + //private ctor for util class + } + + /** + * Update the stats for the specified region if the result is an instance of {@link + * ResultStatsUtil} + * + * @param r object that contains the result and possibly the statistics about the region + * @param serverStats stats tracker to update from the result + * @param server server from which the result was obtained + * @param regionName full region name for the stats. + * @return the underlying {@link Result} if the passed result is an {@link + * ResultStatsUtil} or just returns the result; + */ + public static <T> T updateStats(T r, ServerStatisticTracker serverStats, + ServerName server, byte[] regionName) { + if (!(r instanceof Result)) { + return r; + } + Result result = (Result) r; + // early exit if there are no stats to collect + ClientProtos.RegionLoadStats stats = result.getStats(); + if(stats == null){ + return r; + } + + if (regionName != null) { + serverStats.updateRegionStats(server, regionName, stats); + } + + return r; + } + + public static <T> T updateStats(T r, ServerStatisticTracker stats, + HRegionLocation regionLocation) { + byte[] regionName = null; + ServerName server = null; + if (regionLocation != null) { + server = regionLocation.getServerName(); + regionName = regionLocation.getRegionInfo().getRegionName(); + } + + return updateStats(r, stats, server, regionName); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/04a003d6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java index 9f05997..1bf7bb0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java @@ -35,6 +35,8 @@ public class RpcRetryingCallerFactory { private final int retries; private final RetryingCallerInterceptor interceptor; private final int startLogErrorsCnt; + private final boolean enableBackPressure; + private ServerStatisticTracker stats; public RpcRetryingCallerFactory(Configuration conf) { this(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR); @@ -49,27 +51,57 @@ public class RpcRetryingCallerFactory { startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY, AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT); this.interceptor = interceptor; + enableBackPressure = conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, + HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE); + } + + /** + * Set the tracker that should be used for tracking statistics about the server + */ + public void setStatisticTracker(ServerStatisticTracker statisticTracker) { + this.stats = statisticTracker; } public <T> RpcRetryingCaller<T> newCaller() { // We store the values in the factory instance. This way, constructing new objects // is cheap as it does not require parsing a complex structure. - return new RpcRetryingCaller<T>(pause, retries, interceptor, startLogErrorsCnt); + RpcRetryingCaller<T> caller = new RpcRetryingCaller<T>(pause, retries, interceptor, + startLogErrorsCnt); + + // wrap it with stats, if we are tracking them + if (enableBackPressure && this.stats != null) { + caller = new StatsTrackingRpcRetryingCaller<T>(pause, retries, interceptor, + startLogErrorsCnt, stats); + } + + return caller; } public static RpcRetryingCallerFactory instantiate(Configuration configuration) { - return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR); + return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null); } - + public static RpcRetryingCallerFactory instantiate(Configuration configuration, - RetryingCallerInterceptor interceptor) { + ServerStatisticTracker stats) { + return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats); + } + + public static RpcRetryingCallerFactory instantiate(Configuration configuration, + RetryingCallerInterceptor interceptor, ServerStatisticTracker stats) { String clazzName = RpcRetryingCallerFactory.class.getName(); String rpcCallerFactoryClazz = configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, clazzName); + RpcRetryingCallerFactory factory; if (rpcCallerFactoryClazz.equals(clazzName)) { - return new RpcRetryingCallerFactory(configuration, interceptor); + factory = new RpcRetryingCallerFactory(configuration, interceptor); + } else { + factory = ReflectionUtils.instantiateWithCustomCtor( + rpcCallerFactoryClazz, new Class[] { Configuration.class }, + new Object[] { configuration }); } - return ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz, - new Class[] { Configuration.class }, new Object[] { configuration }); + + // setting for backwards compat with existing caller factories, rather than in the ctor + factory.setStatisticTracker(stats); + return factory; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/04a003d6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java new file mode 100644 index 0000000..42da0b3 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.backoff.ServerStatistics; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Tracks the statistics for multiple regions + */ [email protected] +public class ServerStatisticTracker { + + private final ConcurrentHashMap<ServerName, ServerStatistics> stats = + new ConcurrentHashMap<ServerName, ServerStatistics>(); + + public void updateRegionStats(ServerName server, byte[] region, ClientProtos.RegionLoadStats + currentStats) { + ServerStatistics stat = stats.get(server); + + if (stat == null) { + stat = stats.get(server); + // We don't have stats for that server yet, so we need to make an entry. + // If we race with another thread it's a harmless unnecessary allocation. + if (stat == null) { + stat = new ServerStatistics(); + ServerStatistics old = stats.putIfAbsent(server, stat); + if (old != null) { + stat = old; + } + } + } + stat.update(region, currentStats); + } + + public ServerStatistics getStats(ServerName server) { + return this.stats.get(server); + } + + public static ServerStatisticTracker create(Configuration conf) { + if (!conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, + HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE)) { + return null; + } + return new ServerStatisticTracker(); + } + + @VisibleForTesting + ServerStatistics getServerStatsForTesting(ServerName server) { + return stats.get(server); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/04a003d6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java new file mode 100644 index 0000000..fc175bb --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +import java.io.IOException; + +/** + * An {@link RpcRetryingCaller} that will update the per-region stats for the call on return, + * if stats are available + */ [email protected] +public class StatsTrackingRpcRetryingCaller<T> extends RpcRetryingCaller<T> { + private final ServerStatisticTracker stats; + + public StatsTrackingRpcRetryingCaller(long pause, int retries, int startLogErrorsCnt, + ServerStatisticTracker stats) { + super(pause, retries, startLogErrorsCnt); + this.stats = stats; + } + + public StatsTrackingRpcRetryingCaller(long pause, int retries, + RetryingCallerInterceptor interceptor, int startLogErrorsCnt, + ServerStatisticTracker stats) { + super(pause, retries, interceptor, startLogErrorsCnt); + this.stats = stats; + } + + @Override + public T callWithRetries(RetryingCallable<T> callable, int callTimeout) + throws IOException, RuntimeException { + T result = super.callWithRetries(callable, callTimeout); + return updateStatsAndUnwrap(result, callable); + } + + @Override + public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout) + throws IOException, RuntimeException { + T result = super.callWithRetries(callable, callTimeout); + return updateStatsAndUnwrap(result, callable); + } + + private T updateStatsAndUnwrap(T result, RetryingCallable<T> callable) { + // don't track stats about requests that aren't to regionservers + if (!(callable instanceof RegionServerCallable)) { + return result; + } + + // mutli-server callables span multiple regions, so they don't have a location, + // but they are region server callables, so we have to handle them when we process the + // result in AsyncProcess#receiveMultiAction, not in here + if (callable instanceof MultiServerCallable) { + return result; + } + + // update the stats for the single server callable + RegionServerCallable<T> regionCallable = (RegionServerCallable) callable; + HRegionLocation location = regionCallable.getLocation(); + return ResultStatsUtil.updateStats(result, stats, location); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/04a003d6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicy.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicy.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicy.java new file mode 100644 index 0000000..94e434f --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicy.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.backoff; + +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Configurable policy for the amount of time a client should wait for a new request to the + * server when given the server load statistics. + * <p> + * Must have a single-argument constructor that takes a {@link org.apache.hadoop.conf.Configuration} + * </p> + */ [email protected] [email protected] +public interface ClientBackoffPolicy { + + public static final String BACKOFF_POLICY_CLASS = + "hbase.client.statistics.backoff-policy"; + + /** + * @return the number of ms to wait on the client based on the + */ + public long getBackoffTime(ServerName serverName, byte[] region, ServerStatistics stats); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/04a003d6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicyFactory.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicyFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicyFactory.java new file mode 100644 index 0000000..879a0e2 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicyFactory.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.backoff; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.util.ReflectionUtils; + [email protected] [email protected] +public final class ClientBackoffPolicyFactory { + + private static final Log LOG = LogFactory.getLog(ClientBackoffPolicyFactory.class); + + private ClientBackoffPolicyFactory() { + } + + public static ClientBackoffPolicy create(Configuration conf) { + // create the backoff policy + String className = + conf.get(ClientBackoffPolicy.BACKOFF_POLICY_CLASS, NoBackoffPolicy.class + .getName()); + return ReflectionUtils.instantiateWithCustomCtor(className, + new Class<?>[] { Configuration.class }, new Object[] { conf }); + } + + /** + * Default backoff policy that doesn't create any backoff for the client, regardless of load + */ + public static class NoBackoffPolicy implements ClientBackoffPolicy { + public NoBackoffPolicy(Configuration conf){ + // necessary to meet contract + } + + @Override + public long getBackoffTime(ServerName serverName, byte[] region, ServerStatistics stats) { + return 0; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/04a003d6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ExponentialClientBackoffPolicy.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ExponentialClientBackoffPolicy.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ExponentialClientBackoffPolicy.java new file mode 100644 index 0000000..6e75670 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ExponentialClientBackoffPolicy.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.backoff; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Simple exponential backoff policy on for the client that uses a percent^4 times the + * max backoff to generate the backoff time. + */ [email protected] [email protected] +public class ExponentialClientBackoffPolicy implements ClientBackoffPolicy { + + private static final Log LOG = LogFactory.getLog(ExponentialClientBackoffPolicy.class); + + private static final long ONE_MINUTE = 60 * 1000; + public static final long DEFAULT_MAX_BACKOFF = 5 * ONE_MINUTE; + public static final String MAX_BACKOFF_KEY = "hbase.client.exponential-backoff.max"; + private long maxBackoff; + + public ExponentialClientBackoffPolicy(Configuration conf) { + this.maxBackoff = conf.getLong(MAX_BACKOFF_KEY, DEFAULT_MAX_BACKOFF); + } + + @Override + public long getBackoffTime(ServerName serverName, byte[] region, ServerStatistics stats) { + // no stats for the server yet, so don't backoff + if (stats == null) { + return 0; + } + + ServerStatistics.RegionStatistics regionStats = stats.getStatsForRegion(region); + // no stats for the region yet - don't backoff + if (regionStats == null) { + return 0; + } + + // square the percent as a value less than 1. Closer we move to 100 percent, + // the percent moves to 1, but squaring causes the exponential curve + double percent = regionStats.getMemstoreLoadPercent() / 100.0; + double multiplier = Math.pow(percent, 4.0); + // shouldn't ever happen, but just incase something changes in the statistic data + if (multiplier > 1) { + LOG.warn("Somehow got a backoff multiplier greater than the allowed backoff. Forcing back " + + "down to the max backoff"); + multiplier = 1; + } + return (long) (multiplier * maxBackoff); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/04a003d6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ServerStatistics.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ServerStatistics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ServerStatistics.java new file mode 100644 index 0000000..a3b8e11 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ServerStatistics.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.backoff; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.util.Bytes; + +import java.util.Map; +import java.util.TreeMap; + +/** + * Track the statistics for a single region + */ [email protected] +public class ServerStatistics { + + private Map<byte[], RegionStatistics> + stats = new TreeMap<byte[], RegionStatistics>(Bytes.BYTES_COMPARATOR); + + /** + * Good enough attempt. Last writer wins. It doesn't really matter which one gets to update, + * as something gets set + * @param region + * @param currentStats + */ + public void update(byte[] region, ClientProtos.RegionLoadStats currentStats) { + RegionStatistics regionStat = this.stats.get(region); + if(regionStat == null){ + regionStat = new RegionStatistics(); + this.stats.put(region, regionStat); + } + + regionStat.update(currentStats); + } + + @InterfaceAudience.Private + public RegionStatistics getStatsForRegion(byte[] regionName){ + return stats.get(regionName); + } + + public static class RegionStatistics{ + private int memstoreLoad = 0; + + public void update(ClientProtos.RegionLoadStats currentStats) { + this.memstoreLoad = currentStats.getMemstoreLoad(); + } + + public int getMemstoreLoadPercent(){ + return this.memstoreLoad; + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/04a003d6/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java index 026c64b..cabe5e6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java @@ -61,7 +61,7 @@ public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{ this.connection = conn; this.table = table; this.row = row; - this.rpcFactory = RpcRetryingCallerFactory.instantiate(conn.getConfiguration()); + this.rpcFactory = RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null); this.operationTimeout = conn.getConfiguration().getInt( HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); http://git-wip-us.apache.org/repos/asf/hbase/blob/04a003d6/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java index 725736a..9493668 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java @@ -114,17 +114,23 @@ public final class ResponseConverter { } for (ResultOrException roe : actionResult.getResultOrExceptionList()) { + Object responseValue; if (roe.hasException()) { - results.add(regionName, roe.getIndex(), ProtobufUtil.toException(roe.getException())); + responseValue = ProtobufUtil.toException(roe.getException()); } else if (roe.hasResult()) { - results.add(regionName, roe.getIndex(), ProtobufUtil.toResult(roe.getResult(), cells)); + responseValue = ProtobufUtil.toResult(roe.getResult(), cells); + // add the load stats, if we got any + if (roe.hasLoadStats()) { + ((Result) responseValue).addResults(roe.getLoadStats()); + } } else if (roe.hasServiceResult()) { - results.add(regionName, roe.getIndex(), roe.getServiceResult()); + responseValue = roe.getServiceResult(); } else { // no result & no exception. Unexpected. throw new IllegalStateException("No result & no exception roe=" + roe + " for region " + actions.getRegion()); } + results.add(regionName, roe.getIndex(), responseValue); } } @@ -149,9 +155,11 @@ public final class ResponseConverter { * @param r * @return an action result builder */ - public static ResultOrException.Builder buildActionResult(final ClientProtos.Result r) { + public static ResultOrException.Builder buildActionResult(final ClientProtos.Result r, + ClientProtos.RegionLoadStats stats) { ResultOrException.Builder builder = ResultOrException.newBuilder(); if (r != null) builder.setResult(r); + if(stats != null) builder.setLoadStats(stats); return builder; } http://git-wip-us.apache.org/repos/asf/hbase/blob/04a003d6/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientExponentialBackoff.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientExponentialBackoff.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientExponentialBackoff.java new file mode 100644 index 0000000..88e409d --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientExponentialBackoff.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.backoff.ExponentialClientBackoffPolicy; +import org.apache.hadoop.hbase.client.backoff.ServerStatistics; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; +import org.mockito.Mockito; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestClientExponentialBackoff { + + ServerName server = Mockito.mock(ServerName.class); + byte[] regionname = Bytes.toBytes("region"); + + @Test + public void testNulls() { + Configuration conf = new Configuration(false); + ExponentialClientBackoffPolicy backoff = new ExponentialClientBackoffPolicy(conf); + assertEquals(0, backoff.getBackoffTime(null, null, null)); + + // server name doesn't matter to calculation, but check it now anyways + assertEquals(0, backoff.getBackoffTime(server, null, null)); + assertEquals(0, backoff.getBackoffTime(server, regionname, null)); + + // check when no stats for the region yet + ServerStatistics stats = new ServerStatistics(); + assertEquals(0, backoff.getBackoffTime(server, regionname, stats)); + } + + @Test + public void testMaxLoad() { + Configuration conf = new Configuration(false); + ExponentialClientBackoffPolicy backoff = new ExponentialClientBackoffPolicy(conf); + + ServerStatistics stats = new ServerStatistics(); + update(stats, 100); + assertEquals(ExponentialClientBackoffPolicy.DEFAULT_MAX_BACKOFF, backoff.getBackoffTime(server, + regionname, stats)); + + // another policy with a different max timeout + long max = 100; + conf.setLong(ExponentialClientBackoffPolicy.MAX_BACKOFF_KEY, max); + ExponentialClientBackoffPolicy backoffShortTimeout = new ExponentialClientBackoffPolicy(conf); + assertEquals(max, backoffShortTimeout.getBackoffTime(server, regionname, stats)); + + // test beyond 100 still doesn't exceed the max + update(stats, 101); + assertEquals(ExponentialClientBackoffPolicy.DEFAULT_MAX_BACKOFF, backoff.getBackoffTime(server, + regionname, stats)); + assertEquals(max, backoffShortTimeout.getBackoffTime(server, regionname, stats)); + + // and that when we are below 100, its less than the max timeout + update(stats, 99); + assertTrue(backoff.getBackoffTime(server, + regionname, stats) < ExponentialClientBackoffPolicy.DEFAULT_MAX_BACKOFF); + assertTrue(backoffShortTimeout.getBackoffTime(server, regionname, stats) < max); + } + + /** + * Make sure that we get results in the order that we expect - backoff for a load of 1 should + * less than backoff for 10, which should be less than that for 50. + */ + @Test + public void testResultOrdering() { + Configuration conf = new Configuration(false); + // make the max timeout really high so we get differentiation between load factors + conf.setLong(ExponentialClientBackoffPolicy.MAX_BACKOFF_KEY, Integer.MAX_VALUE); + ExponentialClientBackoffPolicy backoff = new ExponentialClientBackoffPolicy(conf); + + ServerStatistics stats = new ServerStatistics(); + long previous = backoff.getBackoffTime(server, regionname, stats); + for (int i = 1; i <= 100; i++) { + update(stats, i); + long next = backoff.getBackoffTime(server, regionname, stats); + assertTrue( + "Previous backoff time" + previous + " >= " + next + ", the next backoff time for " + + "load " + i, previous < next); + previous = next; + } + } + + private void update(ServerStatistics stats, int load) { + ClientProtos.RegionLoadStats stat = ClientProtos.RegionLoadStats.newBuilder() + .setMemstoreLoad + (load).build(); + stats.update(regionname, stat); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/04a003d6/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestDelayingRunner.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestDelayingRunner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestDelayingRunner.java new file mode 100644 index 0000000..4348100 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestDelayingRunner.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.*; + +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestDelayingRunner { + + private static final TableName DUMMY_TABLE = + TableName.valueOf("DUMMY_TABLE"); + private static final byte[] DUMMY_BYTES_1 = "DUMMY_BYTES_1".getBytes(); + private static final byte[] DUMMY_BYTES_2 = "DUMMY_BYTES_2".getBytes(); + private static HRegionInfo hri1 = + new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1); + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testDelayingRunner() throws Exception{ + MultiAction<Row> ma = new MultiAction<Row>(); + ma.add(hri1.getRegionName(), new Action<Row>(new Put(DUMMY_BYTES_1), 0)); + final AtomicLong endTime = new AtomicLong(); + final long sleepTime = 1000; + DelayingRunner runner = new DelayingRunner(sleepTime, ma.actions.entrySet().iterator().next()); + runner.setRunner(new Runnable() { + @Override + public void run() { + endTime.set(EnvironmentEdgeManager.currentTime()); + } + }); + long startTime = EnvironmentEdgeManager.currentTime(); + runner.run(); + long delay = endTime.get() - startTime; + assertTrue("DelayingRunner did not delay long enough", delay >= sleepTime); + assertFalse("DelayingRunner delayed too long", delay > sleepTime + sleepTime*0.2); + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/04a003d6/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index b18e148..22543ae 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1115,6 +1115,12 @@ public final class HConstants { public static final String HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL = "hbase.client.fast.fail.interceptor.impl"; + /** Config key for if the server should send backpressure and if the client should listen to + * that backpressure from the server */ + public static final String ENABLE_CLIENT_BACKPRESSURE = "hbase.client.backpressure.enabled"; + public static final boolean DEFAULT_ENABLE_CLIENT_BACKPRESSURE = false; + + private HConstants() { // Can't be instantiated with this ctor. }
