Author: kturner
Date: Fri Sep 21 16:24:08 2012
New Revision: 1388563
URL: http://svn.apache.org/viewvc?rev=1388563&view=rev
Log:
ACCUMULO-706 Made batchwriter timeout trigger when tablet sever is responsive,
but never accepts any data. Also, now only modified socket timeout if user
timeout is less.
Modified:
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
Modified:
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
URL:
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java?rev=1388563&r1=1388562&r2=1388563&view=diff
==============================================================================
---
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
(original)
+++
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
Fri Sep 21 16:24:08 2012
@@ -52,6 +52,7 @@ import org.apache.accumulo.core.client.T
import org.apache.accumulo.core.client.TableOfflineException;
import org.apache.accumulo.core.client.TimedOutException;
import
org.apache.accumulo.core.client.impl.TabletLocator.TabletServerMutations;
+import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.constraints.Violations;
import org.apache.accumulo.core.data.ConstraintViolationSummary;
import org.apache.accumulo.core.data.KeyExtent;
@@ -170,7 +171,7 @@ public class TabletServerBatchWriter {
firstErrorTime = null;
}
- void errorOccured(Exception e) {
+ void wroteNothing() {
if (firstErrorTime == null) {
firstErrorTime = activityTime;
} else if (System.currentTimeMillis() - firstErrorTime > timeOut) {
@@ -178,6 +179,10 @@ public class TabletServerBatchWriter {
}
}
+ void errorOccured(Exception e) {
+ wroteNothing();
+ }
+
public long getTimeOut() {
return timeOut;
}
@@ -807,7 +812,8 @@ public class TabletServerBatchWriter {
try {
TabletClientService.Iface client;
- if (timeoutTracker.getTimeOut() < Long.MAX_VALUE)
+
+ if (timeoutTracker.getTimeOut() <
instance.getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT))
client = ThriftUtil.getTServerClient(location,
instance.getConfiguration(), timeoutTracker.getTimeOut());
else
client = ThriftUtil.getTServerClient(location,
instance.getConfiguration());
@@ -830,7 +836,6 @@ public class TabletServerBatchWriter {
} else {
long usid = client.startUpdate(tinfo, credentials);
- timeoutTracker.madeProgress();
List<TMutation> updates = new ArrayList<TMutation>();
for (Entry<KeyExtent,List<Mutation>> entry : tabMuts.entrySet()) {
@@ -844,22 +849,23 @@ public class TabletServerBatchWriter {
}
client.applyUpdates(tinfo, usid, entry.getKey().toThrift(),
updates);
- timeoutTracker.madeProgress();
updates.clear();
size = 0;
}
}
UpdateErrors updateErrors = client.closeUpdate(tinfo, usid);
- timeoutTracker.madeProgress();
Map<KeyExtent,Long> failures =
Translator.translate(updateErrors.failedExtents, Translator.TKET);
updatedConstraintViolations(Translator.translate(updateErrors.violationSummaries,
Translator.TCVST));
updateAuthorizationFailures(Translator.translate(updateErrors.authorizationFailures,
Translator.TKET));
+ long totalCommitted = 0;
+
for (Entry<KeyExtent,Long> entry : failures.entrySet()) {
KeyExtent failedExtent = entry.getKey();
int numCommitted = (int) (long) entry.getValue();
+ totalCommitted += numCommitted;
String table = failedExtent.getTableId().toString();
@@ -868,6 +874,14 @@ public class TabletServerBatchWriter {
ArrayList<Mutation> mutations = (ArrayList<Mutation>)
tabMuts.get(failedExtent);
allFailures.addAll(table, mutations.subList(numCommitted,
mutations.size()));
}
+
+ if (failures.keySet().containsAll(tabMuts.keySet()) &&
totalCommitted == 0) {
+ // nothing was successfully written
+ timeoutTracker.wroteNothing();
+ } else {
+ // successfully wrote something to tablet server
+ timeoutTracker.madeProgress();
+ }
}
return allFailures;
} finally {