Author: sershe
Date: Thu Apr 10 20:57:59 2014
New Revision: 1586468
URL: http://svn.apache.org/r1586468
Log:
HBASE-10794 multi-get should handle replica location missing from cache
Modified:
hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
hbase/branches/hbase-10070/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java
hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
Modified:
hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
URL:
http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java?rev=1586468&r1=1586467&r2=1586468&view=diff
==============================================================================
---
hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
(original)
+++
hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
Thu Apr 10 20:57:59 2014
@@ -24,6 +24,7 @@ import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -340,7 +341,15 @@ class AsyncProcess {
Row r = it.next();
HRegionLocation loc = null;
try {
- loc = findDestLocation(tableName, r,
true).getDefaultRegionLocation();
+ if (r == null) throw new IllegalArgumentException("#" + id + ", row
cannot be null");
+ // Make sure we get 0-s replica.
+ RegionLocations locs = hConnection.locateRegion(
+ tableName, r.getRow(), true, true,
RegionReplicaUtil.DEFAULT_REPLICA_ID);
+ if (locs == null || locs.isEmpty() ||
locs.getDefaultRegionLocation() == null) {
+ throw new IOException("#" + id + ", no location found, aborting
submit for" +
+ " tableName=" + tableName + " rowkey=" +
Arrays.toString(r.getRow()));
+ }
+ loc = locs.getDefaultRegionLocation();
} catch (IOException ex) {
if (locationErrors == null) {
locationErrors = new ArrayList<Exception>();
@@ -377,10 +386,11 @@ class AsyncProcess {
for (int i = 0; i < locationErrors.size(); ++i) {
int originalIndex = locationErrorRows.get(i);
Row row = retainedActions.get(originalIndex).getAction();
- ars.manageError(originalIndex, row, false, locationErrors.get(i),
null);
+ ars.manageError(originalIndex, row,
+ Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null);
}
}
- ars.sendMultiAction(actionsByServer, 1, null);
+ ars.sendMultiAction(actionsByServer, 1, null, false);
return ars;
}
@@ -407,24 +417,6 @@ class AsyncProcess {
}
/**
- * Find the destination.
- * @param tableName the requisite table.
- * @param row the row
- * @return the destination.
- */
- private RegionLocations findDestLocation(
- TableName tableName, Row row, boolean checkPrimary) throws IOException {
- if (row == null) throw new IllegalArgumentException("#" + id + ", row
cannot be null");
- RegionLocations loc = hConnection.locateRegionAll(tableName, row.getRow());
- if (loc == null
- || (checkPrimary && (loc.isEmpty() || loc.getDefaultRegionLocation()
== null))) {
- throw new IOException("#" + id + ", no location found, aborting submit
for" +
- " tableName=" + tableName + " rowkey=" +
Arrays.toString(row.getRow()));
- }
- return loc;
- }
-
- /**
* Check if we should send new operations to this region or region server.
* We're taking into account the past decision; if we have already accepted
* operation on a given region, we accept all operations for this region.
@@ -575,17 +567,29 @@ class AsyncProcess {
if (done) return; // Done within primary timeout
Map<ServerName, MultiAction<Row>> actionsByServer =
new HashMap<ServerName, MultiAction<Row>>();
+ List<Action<Row>> unknownLocActions = new ArrayList<Action<Row>>();
if (replicaGetIndices == null) {
for (int i = 0; i < results.length; ++i) {
- addReplicaActions(i, actionsByServer);
+ addReplicaActions(i, actionsByServer, unknownLocActions);
}
} else {
for (int i = 0; i < replicaGetIndices.length; ++i) {
- addReplicaActions(replicaGetIndices[i], actionsByServer);
+ addReplicaActions(replicaGetIndices[i], actionsByServer,
unknownLocActions);
+ }
+ }
+ if (!actionsByServer.isEmpty()) {
+ sendMultiAction(actionsByServer, 1, null,
unknownLocActions.isEmpty());
+ }
+ if (!unknownLocActions.isEmpty()) {
+ actionsByServer = new HashMap<ServerName, MultiAction<Row>>();
+ for (Action<Row> action : unknownLocActions) {
+ addReplicaActionsAgain(action, actionsByServer);
+ }
+ // Some actions may have completely failed, they are handled inside
addAgain.
+ if (!actionsByServer.isEmpty()) {
+ sendMultiAction(actionsByServer, 1, null, true);
}
}
- if (actionsByServer.isEmpty()) return; // Nothing to do - done or no
replicas found.
- sendMultiAction(actionsByServer, 1, null);
}
/**
@@ -593,33 +597,14 @@ class AsyncProcess {
* @param index Index of the original action.
* @param actionsByServer The map by server to add it to.
*/
- private void addReplicaActions(
- int index, Map<ServerName, MultiAction<Row>> actionsByServer) {
+ private void addReplicaActions(int index, Map<ServerName,
MultiAction<Row>> actionsByServer,
+ List<Action<Row>> unknownReplicaActions) {
if (results[index] != null) return; // opportunistic. Never goes from
non-null to null.
Action<Row> action = initialActions.get(index);
- RegionLocations loc = null;
- try {
- // For perf, we assume that this location coming from cache, since
we just got location
- // from meta for the primary call. If it turns out to not be the
case, we'd need local
- // cache since we want to keep as little time as possible before
replica call.
- loc = findDestLocation(tableName, action.getAction(), false);
- } catch (IOException ex) {
- manageError(action.getOriginalIndex(), action.getAction(), false,
ex, null);
- LOG.error("Cannot get location - no replica calls for some actions",
ex);
- return;
- }
+ RegionLocations loc = findAllLocationsOrFail(action, true);
+ if (loc == null) return;
HRegionLocation[] locs = loc.getRegionLocations();
- int replicaCount = 0;
- for (int i = 1; i < locs.length; ++i) {
- replicaCount += (locs[i] != null) ? 1 : 0;
- }
- if (replicaCount == 0) {
- // we could have got the replica back (if the server went down and
the replica moved)
- try {
- loc = hConnection.locateRegion(tableName,
action.getAction().getRow(), false, true);
- } catch (IOException e) {
- manageError(action.getOriginalIndex(), action.getAction(), false,
e, null);
- }
+ if (locs.length == 1) {
LOG.warn("No replicas found for " + action.getAction());
return;
}
@@ -629,14 +614,30 @@ class AsyncProcess {
// but that would require additional synchronization w.r.t.
returning to caller.
if (results[index] != null) return;
// We set the number of calls here. After that any path must call
setResult/setError.
- results[index] = new ReplicaResultState(replicaCount + 1);
+ // True even for replicas that are not found - if we refuse to send
we MUST set error.
+ results[index] = new ReplicaResultState(locs.length);
}
for (int i = 1; i < locs.length; ++i) {
- if (locs[i] == null) continue;
- addAction(locs[i].getServerName(),
locs[i].getRegionInfo().getRegionName(),
- new Action<Row>(action, i), actionsByServer, nonceGroup);
+ Action<Row> replicaAction = new Action<Row>(action, i);
+ if (locs[i] != null) {
+ addAction(locs[i].getServerName(),
locs[i].getRegionInfo().getRegionName(),
+ replicaAction, actionsByServer, nonceGroup);
+ } else {
+ unknownReplicaActions.add(replicaAction);
+ }
}
}
+
+ private void addReplicaActionsAgain(
+ Action<Row> action, Map<ServerName, MultiAction<Row>>
actionsByServer) {
+ if (action.getReplicaId() == RegionReplicaUtil.DEFAULT_REPLICA_ID) {
+ throw new AssertionError("Cannot have default replica here");
+ }
+ HRegionLocation loc = getReplicaLocationOrFail(action);
+ if (loc == null) return;
+ addAction(loc.getServerName(), loc.getRegionInfo().getRegionName(),
+ action, actionsByServer, nonceGroup);
+ }
}
/**
@@ -790,22 +791,14 @@ class AsyncProcess {
* @param numAttempt - the current numAttempt (first attempt is 1)
*/
private void groupAndSendMultiAction(List<Action<Row>> currentActions, int
numAttempt) {
- // group per location => regions server
- final Map<ServerName, MultiAction<Row>> actionsByServer =
+ Map<ServerName, MultiAction<Row>> actionsByServer =
new HashMap<ServerName, MultiAction<Row>>();
boolean isReplica = false;
+ List<Action<Row>> unknownReplicaActions = null;
for (Action<Row> action : currentActions) {
- RegionLocations locs = null;
- try {
- locs = findDestLocation(tableName, action.getAction(), false);
- } catch (IOException ex) {
- // There are multiple retries in locateRegion already. No need to
add new.
- // We can't continue with this row, hence it's the last retry.
- manageError(action.getOriginalIndex(), action.getAction(), false,
ex, null);
- continue;
- }
-
+ RegionLocations locs = findAllLocationsOrFail(action, true);
+ if (locs == null) continue;
boolean isReplicaAction =
!RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
if (isReplica && !isReplicaAction) {
// This is the property of the current implementation, not a
requirement.
@@ -814,34 +807,89 @@ class AsyncProcess {
isReplica = isReplicaAction;
HRegionLocation loc = locs.getRegionLocation(action.getReplicaId());
if (loc == null || loc.getServerName() == null) {
- try {
- locs = hConnection.locateRegion(tableName,
action.getAction().getRow(), false, true, action.getReplicaId());
- loc = locs.getRegionLocation(action.getReplicaId());
- } catch (IOException e) {
- // There are multiple retries in locateRegion already. No need to
add new.
- // We can't continue with this row, hence it's the last retry.
- manageError(action.getOriginalIndex(), action.getAction(), false,
e, null);
- continue;
- }
- if (loc == null || loc.getServerName() == null) {
- // On retry, we couldn't find location for some replica we saw
before.
- String str = "Cannot find location for replica " +
action.getReplicaId();
- LOG.error(str);
- manageError(action.getOriginalIndex(), action.getAction(),
- false, new IOException(str), null);
- continue;
+ if (isReplica) {
+ if (unknownReplicaActions == null) {
+ unknownReplicaActions = new ArrayList<Action<Row>>();
+ }
+ unknownReplicaActions.add(action);
+ } else {
+ // TODO: relies on primary location always being fetched
+ manageLocationError(action, null);
}
+ } else {
+ byte[] regionName = loc.getRegionInfo().getRegionName();
+ addAction(loc.getServerName(), regionName, action, actionsByServer,
nonceGroup);
}
- byte[] regionName = loc.getRegionInfo().getRegionName();
- addAction(loc.getServerName(), regionName, action, actionsByServer,
nonceGroup);
}
- // If this is a first attempt to group and send, no replicas, we need
replica thread.
+ boolean doStartReplica = (numAttempt == 1 && !isReplica &&
hasAnyReplicaGets);
+ boolean hasUnknown = unknownReplicaActions != null &&
!unknownReplicaActions.isEmpty();
+
if (!actionsByServer.isEmpty()) {
- boolean doStartReplica = (numAttempt == 1 && !isReplica &&
hasAnyReplicaGets);
- sendMultiAction(actionsByServer, numAttempt, doStartReplica ?
currentActions : null);
+ // If this is a first attempt to group and send, no replicas, we need
replica thread.
+ sendMultiAction(actionsByServer, numAttempt, (doStartReplica &&
!hasUnknown)
+ ? currentActions : null, numAttempt > 1 && !hasUnknown);
+ }
+
+ if (hasUnknown) {
+ actionsByServer = new HashMap<ServerName, MultiAction<Row>>();
+ for (Action<Row> action : unknownReplicaActions) {
+ HRegionLocation loc = getReplicaLocationOrFail(action);
+ if (loc == null) continue;
+ byte[] regionName = loc.getRegionInfo().getRegionName();
+ addAction(loc.getServerName(), regionName, action, actionsByServer,
nonceGroup);
+ }
+ if (!actionsByServer.isEmpty()) {
+ sendMultiAction(
+ actionsByServer, numAttempt, doStartReplica ? currentActions :
null, true);
+ }
}
}
+ private HRegionLocation getReplicaLocationOrFail(Action<Row> action) {
+ // We are going to try get location once again. For each action, we'll
do it once
+ // from cache, because the previous calls in the loop might populate it.
+ int replicaId = action.getReplicaId();
+ RegionLocations locs = findAllLocationsOrFail(action, true);
+ if (locs == null) return null; // manageError already called
+ HRegionLocation loc = locs.getRegionLocation(replicaId);
+ if (loc == null || loc.getServerName() == null) {
+ locs = findAllLocationsOrFail(action, false);
+ if (locs == null) return null; // manageError already called
+ loc = locs.getRegionLocation(replicaId);
+ }
+ if (loc == null || loc.getServerName() == null) {
+ manageLocationError(action, null);
+ return null;
+ }
+ return loc;
+ }
+
+ private void manageLocationError(Action<Row> action, Exception ex) {
+ String msg = "Cannot get replica " + action.getReplicaId()
+ + " location for " + action.getAction();
+ LOG.error(msg);
+ if (ex == null) {
+ ex = new IOException(msg);
+ }
+ manageError(action.getOriginalIndex(), action.getAction(),
+ Retry.NO_LOCATION_PROBLEM, ex, null);
+ }
+
+ private RegionLocations findAllLocationsOrFail(Action<Row> action, boolean
useCache) {
+ if (action.getAction() == null) throw new IllegalArgumentException("#" +
id +
+ ", row cannot be null");
+ RegionLocations loc = null;
+ try {
+ loc = hConnection.locateRegion(
+ tableName, action.getAction().getRow(), useCache, true,
action.getReplicaId());
+ } catch (IOException ex) {
+ manageLocationError(action, ex);
+ }
+ return loc;
+ }
+
+
+
/**
* Send a multi action structure to the servers, after a delay depending
on the attempt
* number. Asynchronous.
@@ -851,7 +899,7 @@ class AsyncProcess {
* @param actionsForReplicaThread original actions for replica thread;
null on non-first call.
*/
private void sendMultiAction(Map<ServerName, MultiAction<Row>>
actionsByServer,
- int numAttempt, List<Action<Row>> actionsForReplicaThread) {
+ int numAttempt, List<Action<Row>> actionsForReplicaThread, boolean
reuseThread) {
// Run the last item on the same thread if we are already on a send
thread.
// We hope most of the time it will be the only item, so we can cut down
on threads.
int actionsRemaining = actionsByServer.size();
@@ -862,8 +910,7 @@ class AsyncProcess {
incTaskCounters(multiAction.getRegions(), server);
Runnable runnable = Trace.wrap("AsyncProcess.sendMultiAction",
new SingleServerRequestRunnable(multiAction, numAttempt, server));
- --actionsRemaining;
- if ((numAttempt > 1) && actionsRemaining == 0) {
+ if ((--actionsRemaining == 0) && reuseThread) {
runnable.run();
} else {
try {
@@ -916,25 +963,24 @@ class AsyncProcess {
* @param server the location, if any (can be null)
* @return true if the action can be retried, false otherwise.
*/
- public boolean manageError(int originalIndex, Row row, boolean canRetry,
+ public Retry manageError(int originalIndex, Row row, Retry canRetry,
Throwable throwable, ServerName server) {
- if (canRetry && throwable != null && throwable instanceof
DoNotRetryIOException) {
- canRetry = false;
+ if (canRetry == Retry.YES
+ && throwable != null && throwable instanceof DoNotRetryIOException) {
+ canRetry = Retry.NO_NOT_RETRIABLE;
}
- if (canRetry && hardRetryLimit != null) {
- canRetry = hardRetryLimit.decrementAndGet() >= 0;
+ if (canRetry == Retry.YES
+ && hardRetryLimit != null && hardRetryLimit.decrementAndGet() < 0) {
+ canRetry = Retry.NO_RETRIES_EXHAUSTED;
}
- if (!canRetry) {
+ if (canRetry != Retry.YES) {
// Batch.Callback<Res> was not called on failure in 0.94. We keep this.
setError(originalIndex, row, throwable, server);
- } else {
- // See if we are dealing with a replica action that was completed from
other server.
- // Doesn't have to be synchronized, worst case we'd retry and be
unable to set result.
- canRetry = !isActionComplete(originalIndex, row);
+ } else if (isActionComplete(originalIndex, row)) {
+ canRetry = Retry.NO_OTHER_SUCCEEDED;
}
-
return canRetry;
}
@@ -949,8 +995,12 @@ class AsyncProcess {
private void receiveGlobalFailure(
MultiAction<Row> rsActions, ServerName server, int numAttempt,
Throwable t) {
errorsByServer.reportServerError(server);
- boolean canRetry = errorsByServer.canRetryMore(numAttempt);
+ Retry canRetry = errorsByServer.canRetryMore(numAttempt)
+ ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
+ int failed = 0, stopped = 0;
+ boolean isReplica = false;
+ boolean firstAction = false;
List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
for (Map.Entry<byte[], List<Action<Row>>> e :
rsActions.actions.entrySet()) {
byte[] regionName = e.getKey();
@@ -960,35 +1010,36 @@ class AsyncProcess {
// TODO: depending on type of exception we might not want to update
cache at all?
hConnection.updateCachedLocations(tableName, regionName, row, null,
server);
for (Action<Row> action : e.getValue()) {
- if (manageError(action.getOriginalIndex(), action.getAction(),
canRetry, t, server)) {
+ if (firstAction) {
+ firstAction = false;
+ isReplica =
!RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
+ }
+ Retry retry = manageError(
+ action.getOriginalIndex(), action.getAction(), canRetry, t,
server);
+ if (retry == Retry.YES) {
toReplay.add(action);
+ } else if (retry == Retry.NO_OTHER_SUCCEEDED) {
+ ++stopped;
+ } else {
+ ++failed;
}
}
}
- logAndResubmit(server, toReplay, numAttempt, rsActions.size(), t);
+ if (toReplay.isEmpty()) {
+ logNoResubmit(server, numAttempt, rsActions.size(), t, isReplica,
failed, stopped);
+ } else {
+ resubmit(server, toReplay, numAttempt, rsActions.size(), t, isReplica);
+ }
}
/**
* Log as much info as possible, and, if there is something to replay,
* submit it again after a back off sleep.
+ * @param isReplica
*/
- private void logAndResubmit(ServerName oldServer, List<Action<Row>>
toReplay,
- int numAttempt, int failureCount, Throwable throwable) {
- if (toReplay.isEmpty()) {
- // it's either a success or a last failure
- if (failureCount != 0) {
- // We have a failure but nothing to retry. We're done, it's a final
failure..
- LOG.warn(createLog(numAttempt, failureCount, toReplay.size(),
- oldServer, throwable, -1, false,
errorsByServer.getStartTrackingTime()));
- } else if (numAttempt > startLogErrorsCnt + 1) {
- // The operation was successful, but needed several attempts. Let's
log this.
- LOG.info(createLog(numAttempt, failureCount, 0,
- oldServer, throwable, -1, false,
errorsByServer.getStartTrackingTime()));
- }
- return;
- }
-
+ private void resubmit(ServerName oldServer, List<Action<Row>> toReplay,
+ int numAttempt, int failureCount, Throwable throwable, boolean
isReplica) {
// We have something to replay. We're going to sleep a little before.
// We have two contradicting needs here:
@@ -1001,7 +1052,7 @@ class AsyncProcess {
// We use this value to have some logs when we have multiple failures,
but not too many
// logs, as errors are to be expected when a region moves, splits and
so on
LOG.info(createLog(numAttempt, failureCount, toReplay.size(),
- oldServer, throwable, backOffTime, true,
errorsByServer.getStartTrackingTime()));
+ oldServer, throwable, backOffTime, true, null, isReplica, -1, -1));
}
try {
@@ -1015,6 +1066,21 @@ class AsyncProcess {
groupAndSendMultiAction(toReplay, numAttempt + 1);
}
+ private void logNoResubmit(ServerName oldServer, int numAttempt,
+ int failureCount, Throwable throwable, boolean isReplica, int failed,
int stopped) {
+ if (failureCount != 0 || numAttempt > startLogErrorsCnt + 1) {
+ String timeStr = new
Date(errorsByServer.getStartTrackingTime()).toString();
+ String logMessage = createLog(numAttempt, failureCount, 0, oldServer,
+ throwable, -1, false, timeStr, isReplica, failed, stopped);
+ if (failed != 0) {
+ // Only log final failures as warning
+ LOG.warn(logMessage);
+ } else {
+ LOG.info(logMessage);
+ }
+ }
+ }
+
/**
* Called when we receive the result of a server query.
*
@@ -1039,6 +1105,9 @@ class AsyncProcess {
boolean canRetry = true;
// Go by original action.
+ int failed = 0, stopped = 0;
+ boolean isReplica = false;
+ boolean firstAction = false;
for (Map.Entry<byte[], List<Action<Row>>> regionEntry :
multiAction.actions.entrySet()) {
byte[] regionName = regionEntry.getKey();
Map<Integer, Object> regionResults =
responses.getResults().get(regionName);
@@ -1052,6 +1121,10 @@ class AsyncProcess {
}
boolean regionFailureRegistered = false;
for (Action<Row> sentAction : regionEntry.getValue()) {
+ if (firstAction) {
+ firstAction = false;
+ isReplica =
!RegionReplicaUtil.isDefaultReplica(sentAction.getReplicaId());
+ }
Object result = regionResults.get(sentAction.getOriginalIndex());
// Failure: retry if it's make sense else update the errors lists
if (result == null || result instanceof Throwable) {
@@ -1068,9 +1141,14 @@ class AsyncProcess {
canRetry = errorsByServer.canRetryMore(numAttempt);
}
++failureCount;
- if (manageError(
- sentAction.getOriginalIndex(), row, canRetry,
(Throwable)result, server)) {
+ Retry retry = manageError(sentAction.getOriginalIndex(), row,
+ canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED,
(Throwable)result, server);
+ if (retry == Retry.YES) {
toReplay.add(sentAction);
+ } else if (retry == Retry.NO_OTHER_SUCCEEDED) {
+ ++stopped;
+ } else {
+ ++failed;
}
} else {
if (callback != null) {
@@ -1108,39 +1186,58 @@ class AsyncProcess {
failureCount += actions.size();
for (Action<Row> action : actions) {
+ if (firstAction) {
+ firstAction = false;
+ isReplica =
!RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
+ }
Row row = action.getAction();
- if (manageError(action.getOriginalIndex(), row, canRetry, throwable,
server)) {
+ Retry retry = manageError(action.getOriginalIndex(), row,
+ canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, throwable,
server);
+ if (retry == Retry.YES) {
toReplay.add(action);
+ } else if (retry == Retry.NO_OTHER_SUCCEEDED) {
+ ++stopped;
+ } else {
+ ++failed;
}
}
}
- logAndResubmit(server, toReplay, numAttempt, failureCount, throwable);
+ if (toReplay.isEmpty()) {
+ logNoResubmit(server, numAttempt, failureCount, throwable, isReplica,
failed, stopped);
+ } else {
+ resubmit(server, toReplay, numAttempt, failureCount, throwable,
isReplica);
+ }
}
private String createLog(int numAttempt, int failureCount, int replaySize,
ServerName sn,
- Throwable error, long backOffTime, boolean
willRetry, String startTime){
+ Throwable error, long backOffTime, boolean willRetry, String startTime,
+ boolean isReplica, int failed, int stopped) {
StringBuilder sb = new StringBuilder();
-
- sb.append("#").append(id).append(", table=").append(tableName).
- append(",
attempt=").append(numAttempt).append("/").append(numTries).append(" ");
+ sb.append("#").append(id).append(", table=").append(tableName).append(",
")
+ .append(isReplica ? "replica, " : "primary,
").append("attempt=").append(numAttempt)
+ .append("/").append(numTries).append(" ");
if (failureCount > 0 || error != null){
sb.append("failed ").append(failureCount).append(" ops").append(",
last exception: ").
append(error == null ? "null" : error);
} else {
- sb.append("SUCCEEDED");
+ sb.append("succeeded");
}
- sb.append(" on ").append(sn);
-
- sb.append(", tracking started ").append(startTime);
+ sb.append(" on ").append(sn).append(", tracking started
").append(startTime);
if (willRetry) {
sb.append(", retrying after ").append(backOffTime).append(" ms").
- append(", replay ").append(replaySize).append(" ops.");
+ append(", replay ").append(replaySize).append(" ops");
} else if (failureCount > 0) {
- sb.append(" - FAILED, NOT RETRYING ANYMORE");
+ if (stopped > 0) {
+ sb.append("; not retrying ").append(stopped).append(" due to success
from other replica");
+ }
+ if (failed > 0) {
+ sb.append("; not retrying ").append(failed).append(" - final
failure");
+ }
+
}
return sb.toString();
@@ -1162,7 +1259,7 @@ class AsyncProcess {
decActionCounter(index);
return; // Simple case, no replica requests.
} else if ((state = trySetResultSimple(
- index, action.getAction(), result, isStale)) == null) {
+ index, action.getAction(), false, result, null, isStale)) == null) {
return; // Simple case, no replica requests.
}
assert state != null;
@@ -1201,8 +1298,8 @@ class AsyncProcess {
errors.add(throwable, row, server);
decActionCounter(index);
return; // Simple case, no replica requests.
- } else if ((state = trySetResultSimple(index, row, throwable, false)) ==
null) {
- errors.add(throwable, row, server);
+ } else if ((state = trySetResultSimple(
+ index, row, true, throwable, server, false)) == null) {
return; // Simple case, no replica requests.
}
assert state != null;
@@ -1261,8 +1358,8 @@ class AsyncProcess {
* Tries to set the result or error for a particular action as if there
were no replica calls.
* @return null if successful; replica state if there were in fact replica
calls.
*/
- private ReplicaResultState trySetResultSimple(
- int index, Row row, Object result, boolean isFromReplica) {
+ private ReplicaResultState trySetResultSimple(int index, Row row, boolean
isError,
+ Object result, ServerName server, boolean isFromReplica) {
Object resObj = null;
if (!isReplicaGet(row)) {
if (isFromReplica) {
@@ -1279,11 +1376,20 @@ class AsyncProcess {
}
}
}
+
+ ReplicaResultState rrs =
+ (resObj instanceof ReplicaResultState) ? (ReplicaResultState)resObj
: null;
+ if (rrs == null && isError) {
+ // The resObj is not replica state (null or already set).
+ errors.add((Throwable)result, row, server);
+ }
+
if (resObj == null) {
+ // resObj is null - no replica calls were made.
decActionCounter(index);
return null;
}
- return (resObj instanceof ReplicaResultState) ?
(ReplicaResultState)resObj : null;
+ return rrs;
}
private void decActionCounter(int index) {
@@ -1523,4 +1629,15 @@ class AsyncProcess {
private static boolean isReplicaGet(Row row) {
return (row instanceof Get) && (((Get)row).getConsistency() ==
Consistency.TIMELINE);
}
+
+ /**
+ * For manageError. Only used to make logging more clear, we don't actually
care why we don't retry.
+ */
+ private enum Retry {
+ YES,
+ NO_LOCATION_PROBLEM,
+ NO_NOT_RETRIABLE,
+ NO_RETRIES_EXHAUSTED,
+ NO_OTHER_SUCCEEDED
+ }
}
Modified:
hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
URL:
http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java?rev=1586468&r1=1586467&r2=1586468&view=diff
==============================================================================
---
hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
(original)
+++
hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
Thu Apr 10 20:57:59 2014
@@ -268,10 +268,4 @@ interface ClusterConnection extends HCon
* @return Default AsyncProcess associated with this connection.
*/
AsyncProcess getAsyncProcess();
-
- /**
- * @return All locations for a particular region.
- */
- RegionLocations locateRegionAll(TableName tableName, byte[] row) throws
IOException;
-
}
\ No newline at end of file
Modified:
hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
URL:
http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java?rev=1586468&r1=1586467&r2=1586468&view=diff
==============================================================================
---
hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
(original)
+++
hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
Thu Apr 10 20:57:59 2014
@@ -1060,15 +1060,9 @@ class ConnectionManager {
}
@Override
- public RegionLocations locateRegionAll(
- final TableName tableName, final byte[] row) throws IOException{
- return locateRegion(tableName, row, true, true);
- }
-
- @Override
public HRegionLocation locateRegion(
final TableName tableName, final byte[] row) throws IOException{
- RegionLocations locations = locateRegionAll(tableName, row);
+ RegionLocations locations = locateRegion(tableName, row, true, true);
return locations == null ? null : locations.getRegionLocation();
}
@@ -2564,12 +2558,12 @@ class ConnectionManager {
new ConcurrentHashMap<ServerName, ServerErrors>();
private final long canRetryUntil;
private final int maxRetries;
- private final String startTrackingTime;
+ private final long startTrackingTime;
public ServerErrorTracker(long timeout, int maxRetries) {
this.maxRetries = maxRetries;
this.canRetryUntil = EnvironmentEdgeManager.currentTimeMillis() +
timeout;
- this.startTrackingTime = new Date().toString();
+ this.startTrackingTime = new Date().getTime();
}
/**
@@ -2616,7 +2610,7 @@ class ConnectionManager {
}
}
- String getStartTrackingTime() {
+ long getStartTrackingTime() {
return startTrackingTime;
}
Modified:
hbase/branches/hbase-10070/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
URL:
http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java?rev=1586468&r1=1586467&r2=1586468&view=diff
==============================================================================
---
hbase/branches/hbase-10070/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
(original)
+++
hbase/branches/hbase-10070/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
Thu Apr 10 20:57:59 2014
@@ -35,8 +35,10 @@ import org.apache.hadoop.hbase.client.co
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
import org.mockito.Mockito;
import com.google.common.collect.Lists;
@@ -313,7 +315,8 @@ public class TestAsyncProcess {
}
@Override
- public RegionLocations locateRegionAll(TableName tableName, byte[] row)
throws IOException {
+ public RegionLocations locateRegion(TableName tableName,
+ byte[] row, boolean useCache, boolean retry, int replicaId) throws
IOException {
return new RegionLocations(loc1);
}
}
@@ -332,7 +335,8 @@ public class TestAsyncProcess {
}
@Override
- public RegionLocations locateRegionAll(TableName tableName, byte[] row)
throws IOException {
+ public RegionLocations locateRegion(TableName tableName,
+ byte[] row, boolean useCache, boolean retry, int replicaId) throws
IOException {
int i = 0;
for (HRegionLocation hr : hrl){
if (Arrays.equals(row, hr.getRegionInfo().getStartKey())) {
@@ -346,6 +350,9 @@ public class TestAsyncProcess {
}
+ @Rule
+ public Timeout timeout = new Timeout(10000); // 10 seconds max per method
tested
+
@Test
public void testSubmit() throws Exception {
ClusterConnection hc = createHConnection();
@@ -592,8 +599,8 @@ public class TestAsyncProcess {
private static void setMockLocation(ClusterConnection hc, byte[] row,
RegionLocations result) throws IOException {
- Mockito.when(hc.locateRegionAll(
- Mockito.eq(DUMMY_TABLE), Mockito.eq(row))).thenReturn(result);
+ Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row),
+ Mockito.anyBoolean(), Mockito.anyBoolean(),
Mockito.anyInt())).thenReturn(result);
}
private static ClusterConnection createHConnectionCommon() {
@@ -945,7 +952,7 @@ public class TestAsyncProcess {
for (int i = 0; i < expecteds.length; ++i) {
Object actual = actuals[i];
RR expected = expecteds[i];
- Assert.assertEquals(expected == RR.FAILED, actual instanceof Throwable);
+ Assert.assertEquals(actual.toString(), expected == RR.FAILED, actual
instanceof Throwable);
if (expected != RR.FAILED && expected != RR.DONT_CARE) {
Assert.assertEquals(expected == RR.TRUE, ((Result)actual).isStale());
}
Modified:
hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java
URL:
http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java?rev=1586468&r1=1586467&r2=1586468&view=diff
==============================================================================
---
hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java
(original)
+++
hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java
Thu Apr 10 20:57:59 2014
@@ -480,9 +480,4 @@ class CoprocessorHConnection implements
public AsyncProcess getAsyncProcess() {
return delegate.getAsyncProcess();
}
-
- @Override
- public RegionLocations locateRegionAll(TableName tableName, byte[] row)
throws IOException {
- return delegate.locateRegionAll(tableName, row);
- }
}
\ No newline at end of file
Modified:
hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
URL:
http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java?rev=1586468&r1=1586467&r2=1586468&view=diff
==============================================================================
---
hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
(original)
+++
hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
Thu Apr 10 20:57:59 2014
@@ -108,8 +108,9 @@ public class HConnectionTestingUtility {
thenReturn(loc);
Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[])
Mockito.any())).
thenReturn(loc);
- Mockito.when(c.locateRegionAll((TableName) Mockito.any(), (byte[])
Mockito.any())).
- thenReturn(new RegionLocations(loc));
+ Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[])
Mockito.any(),
+ Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyInt()))
+ .thenReturn(new RegionLocations(loc));
if (admin != null) {
// If a call to getAdmin, return this implementation.
Mockito.when(c.getAdmin(Mockito.any(ServerName.class))).