Author: nkeywal
Date: Fri Oct 18 19:34:18 2013
New Revision: 1533604
URL: http://svn.apache.org/r1533604
Log:
HBASE-9768 Two issues in AsyncProcess
Modified:
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
Modified:
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java?rev=1533604&r1=1533603&r2=1533604&view=diff
==============================================================================
---
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
(original)
+++
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
Fri Oct 18 19:34:18 2013
@@ -265,7 +265,18 @@ class AsyncProcess<CResult> {
new HashMap<HRegionLocation, MultiAction<Row>>();
List<Action<Row>> retainedActions = new
ArrayList<Action<Row>>(rows.size());
+ long currentTaskCnt = tasksDone.get();
+ boolean alreadyLooped = false;
+
do {
+ if (alreadyLooped){
+ // if, for whatever reason, we looped, we want to be sure that
something has changed.
+ waitForNextTaskDone(currentTaskCnt);
+ currentTaskCnt = tasksDone.get();
+ } else {
+ alreadyLooped = true;
+ }
+
// Wait until there is at least one slot for a new task.
waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1);
@@ -280,8 +291,9 @@ class AsyncProcess<CResult> {
Row r = it.next();
HRegionLocation loc = findDestLocation(r, 1, posInList);
- if (loc != null && canTakeOperation(loc, regionIncluded,
serverIncluded)) {
- // loc is null if there is an error such as meta not available.
+ if (loc == null) { // loc is null if there is an error such as meta
not available.
+ it.remove();
+ } else if (canTakeOperation(loc, regionIncluded, serverIncluded)) {
Action<Row> action = new Action<Row>(r, ++posInList);
retainedActions.add(action);
addAction(loc, action, actionsByServer);
@@ -644,6 +656,7 @@ class AsyncProcess<CResult> {
for (Map.Entry<byte[], List<Pair<Integer, Object>>> resultsForRS :
responses.getResults().entrySet()) {
+ boolean regionFailureRegistered = false;
for (Pair<Integer, Object> regionResult : resultsForRS.getValue()) {
Object result = regionResult.getSecond();
@@ -652,8 +665,9 @@ class AsyncProcess<CResult> {
throwable = (Throwable) result;
Action<Row> correspondingAction =
initialActions.get(regionResult.getFirst());
Row row = correspondingAction.getAction();
-
- if (failureCount++ == 0) { // We're doing this once per location.
+ failureCount++;
+ if (!regionFailureRegistered) { // We're doing this once per
location.
+ regionFailureRegistered= true;
hConnection.updateCachedLocations(this.tableName, row.getRow(),
result, location);
if (errorsByServer != null) {
errorsByServer.reportServerError(location);
Modified:
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1533604&r1=1533603&r2=1533604&view=diff
==============================================================================
---
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
(original)
+++
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
Fri Oct 18 19:34:18 2013
@@ -58,8 +58,6 @@ import org.apache.hadoop.hbase.ipc.Paylo
import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
@@ -889,6 +887,7 @@ public class HTable implements HTableInt
*/
private void doPut(Put put) throws InterruptedIOException,
RetriesExhaustedWithDetailsException {
if (ap.hasError()){
+ writeAsyncBuffer.add(put);
backgroundFlushCommits(true);
}
@@ -907,25 +906,29 @@ public class HTable implements HTableInt
* Send the operations in the buffer to the servers. Does not wait for the
server's answer.
* If the is an error (max retried reach from a previous flush or bad
operation), it tries to
* send all operations in the buffer and sends an exception.
+ * @param synchronous - if true, sends all the writes and wait for all of
them to finish before
+ * returning.
*/
private void backgroundFlushCommits(boolean synchronous) throws
InterruptedIOException, RetriesExhaustedWithDetailsException {
try {
- // If there is an error on the operations in progress, we don't add new
operations.
- if (writeAsyncBuffer.size() > 0 && !ap.hasError()) {
+ do {
ap.submit(writeAsyncBuffer, true);
- }
+ } while (synchronous && !writeAsyncBuffer.isEmpty());
- if (synchronous || ap.hasError()) {
- if (ap.hasError() && LOG.isDebugEnabled()) {
- LOG.debug(tableName + ": One or more of the operations have failed
-" +
- " waiting for all operation in progress to finish (successfully
or not)");
- }
+ if (synchronous) {
ap.waitUntilDone();
}
if (ap.hasError()) {
+ LOG.debug(tableName + ": One or more of the operations have failed -" +
+ " waiting for all operation in progress to finish (successfully or
not)");
+ while (!writeAsyncBuffer.isEmpty()) {
+ ap.submit(writeAsyncBuffer, true);
+ }
+ ap.waitUntilDone();
+
if (!clearBufferOnFail) {
// if clearBufferOnFailed is not set, we're supposed to keep the
failed operation in the
// write buffer. This is a questionable feature kept here for
backward compatibility
@@ -1186,12 +1189,9 @@ public class HTable implements HTableInt
*/
@Override
public void flushCommits() throws InterruptedIOException,
RetriesExhaustedWithDetailsException {
- // We're looping, as if one region is overloaded we keep its operations in
the buffer.
// As we can have an operation in progress even if the buffer is empty, we
call
// backgroundFlushCommits at least one time.
- do {
- backgroundFlushCommits(true);
- } while (!writeAsyncBuffer.isEmpty());
+ backgroundFlushCommits(true);
}
/**