Author: kturner Date: Tue Nov 15 21:51:02 2011 New Revision: 1202436 URL: http://svn.apache.org/viewvc?rev=1202436&view=rev Log: ACCUMULO-120 made special case for single mutation in batch writer
Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java?rev=1202436&r1=1202435&r2=1202436&view=diff ============================================================================== --- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java (original) +++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java Tue Nov 15 21:51:02 2011 @@ -54,7 +54,9 @@ import org.apache.accumulo.core.data.thr import org.apache.accumulo.core.master.state.tables.TableState; import org.apache.accumulo.core.security.thrift.AuthInfo; import org.apache.accumulo.core.security.thrift.ThriftSecurityException; +import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException; import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException; +import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException; import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; import org.apache.accumulo.core.util.ThriftUtil; import org.apache.hadoop.io.Text; @@ -752,42 +754,54 @@ public class TabletServerBatchWriter { try { MutationSet allFailures = new MutationSet(); - long usid = client.startUpdate(null, credentials); - - List<TMutation> updates = new ArrayList<TMutation>(); - for (Entry<KeyExtent,List<Mutation>> entry : tabMuts.entrySet()) { - long size = 0; - Iterator<Mutation> iter = entry.getValue().iterator(); - while (iter.hasNext()) { - while (size < MUTATION_BATCH_SIZE && iter.hasNext()) { - Mutation mutation = iter.next(); - updates.add(mutation.toThrift()); - size += mutation.numBytes(); - } - - client.applyUpdates(null, usid, entry.getKey().toThrift(), updates); - updates.clear(); - size = 0; + if (tabMuts.size() == 1 && tabMuts.values().iterator().next().size() == 1) { + Entry<KeyExtent,List<Mutation>> entry = tabMuts.entrySet().iterator().next(); + + try { + client.update(null, credentials, entry.getKey().toThrift(), entry.getValue().get(0).toThrift()); + } catch (NotServingTabletException e) { + allFailures.addAll(entry.getKey().getTableId().toString(), entry.getValue()); + } catch (ConstraintViolationException e) { + updatedConstraintViolations(Translator.translate(e.violationSummaries, Translator.TCVST)); } - } - - UpdateErrors updateErrors = client.closeUpdate(null, usid); - Map<KeyExtent,Long> failures = Translator.translate(updateErrors.failedExtents, Translator.TKET); - updatedConstraintViolations(Translator.translate(updateErrors.violationSummaries, Translator.TCVST)); - updateAuthorizationFailures(Translator.translate(updateErrors.authorizationFailures, Translator.TKET)); - - for (Entry<KeyExtent,Long> entry : failures.entrySet()) { - KeyExtent failedExtent = entry.getKey(); - int numCommitted = (int) (long) entry.getValue(); + } else { + + long usid = client.startUpdate(null, credentials); - String table = failedExtent.getTableId().toString(); + List<TMutation> updates = new ArrayList<TMutation>(); + for (Entry<KeyExtent,List<Mutation>> entry : tabMuts.entrySet()) { + long size = 0; + Iterator<Mutation> iter = entry.getValue().iterator(); + while (iter.hasNext()) { + while (size < MUTATION_BATCH_SIZE && iter.hasNext()) { + Mutation mutation = iter.next(); + updates.add(mutation.toThrift()); + size += mutation.numBytes(); + } + + client.applyUpdates(null, usid, entry.getKey().toThrift(), updates); + updates.clear(); + size = 0; + } + } - TabletLocator.getInstance(instance, credentials, new Text(table)).invalidateCache(failedExtent); + UpdateErrors updateErrors = client.closeUpdate(null, usid); + Map<KeyExtent,Long> failures = Translator.translate(updateErrors.failedExtents, Translator.TKET); + updatedConstraintViolations(Translator.translate(updateErrors.violationSummaries, Translator.TCVST)); + updateAuthorizationFailures(Translator.translate(updateErrors.authorizationFailures, Translator.TKET)); - ArrayList<Mutation> mutations = (ArrayList<Mutation>) tabMuts.get(failedExtent); - allFailures.addAll(table, mutations.subList(numCommitted, mutations.size())); + for (Entry<KeyExtent,Long> entry : failures.entrySet()) { + KeyExtent failedExtent = entry.getKey(); + int numCommitted = (int) (long) entry.getValue(); + + String table = failedExtent.getTableId().toString(); + + TabletLocator.getInstance(instance, credentials, new Text(table)).invalidateCache(failedExtent); + + ArrayList<Mutation> mutations = (ArrayList<Mutation>) tabMuts.get(failedExtent); + allFailures.addAll(table, mutations.subList(numCommitted, mutations.size())); + } } - return allFailures; } finally { ThriftUtil.returnClient((TServiceClient) client);