added ability to invalidate server side conditional update sessions
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/ec537137 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/ec537137 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/ec537137 Branch: refs/heads/ACCUMULO-1000 Commit: ec537137aa958cf87d8d11ff5fdfe05c78dea624 Parents: a169064 Author: [email protected] <[email protected]> Authored: Fri Jul 19 16:00:13 2013 -0400 Committer: [email protected] <[email protected]> Committed: Fri Jul 19 16:00:13 2013 -0400 ---------------------------------------------------------------------- .../core/client/impl/ConditionalWriterImpl.java | 120 +- .../thrift/TabletClientService.java | 2831 +++++++++++++++--- core/src/main/thrift/tabletserver.thrift | 9 +- .../server/security/SecurityOperation.java | 15 +- .../server/tabletserver/TabletServer.java | 97 +- .../test/performance/thrift/NullTserver.java | 16 +- .../accumulo/test/ConditionalWriterTest.java | 1 + 7 files changed, 2571 insertions(+), 518 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/ec537137/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java index f0d6108..31403fb 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java @@ -34,10 +34,12 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.ConditionalWriter; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.TableDeletedException; +import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.TableOfflineException; import org.apache.accumulo.core.client.impl.TabletLocator.TabletServerMutations; import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; @@ -57,6 +59,7 @@ import org.apache.accumulo.core.security.ColumnVisibility; import org.apache.accumulo.core.security.VisibilityEvaluator; import org.apache.accumulo.core.security.VisibilityParseException; import org.apache.accumulo.core.security.thrift.TCredentials; +import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException; import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; import org.apache.accumulo.core.util.BadArgumentException; import org.apache.accumulo.core.util.ByteBufferUtil; @@ -177,7 +180,7 @@ class ConditionalWriterImpl implements ConditionalWriter { return queue; } - private void queueFailed(List<QCMutation> mutations) { + private void queueRetry(List<QCMutation> mutations) { for (QCMutation qcm : mutations) { qcm.resetDelay(); } @@ -208,7 +211,7 @@ class ConditionalWriterImpl implements ConditionalWriter { } if (failures.size() > 0) - queueFailed(failures); + queueRetry(failures); for (Entry<String,TabletServerMutations<QCMutation>> entry : binnedMutations.entrySet()) { queue(entry.getKey(), entry.getValue()); @@ -350,6 +353,8 @@ class ConditionalWriterImpl implements ConditionalWriter { Map<Long,CMK> cmidToCm = new HashMap<Long,CMK>(); MutableLong cmid = new MutableLong(0); + Long sessionId = null; + try { client = ThriftUtil.getTServerClient(location, instance.getConfiguration()); @@ -357,9 +362,11 @@ class ConditionalWriterImpl implements ConditionalWriter { CompressedIterators compressedIters = new CompressedIterators(); convertMutations(mutations, cmidToCm, cmid, tmutations, compressedIters); - - List<TCMResult> tresults = client.conditionalUpdate(tinfo, credentials, ByteBufferUtil.toByteBuffers(auths.getAuthorizations()), tmutations, - compressedIters.getSymbolTable()); + + //TODO create a session per tserver and keep reusing it + sessionId = client.startConditionalUpdate(tinfo, credentials, ByteBufferUtil.toByteBuffers(auths.getAuthorizations()), tableId); + + List<TCMResult> tresults = client.conditionalUpdate(tinfo, sessionId, tmutations, compressedIters.getSymbolTable()); HashSet<KeyExtent> extentsToInvalidate = new HashSet<KeyExtent>(); @@ -383,27 +390,108 @@ class ConditionalWriterImpl implements ConditionalWriter { locator.invalidateCache(ke); } - queueFailed(ignored); + queueRetry(ignored); + } catch (NoSuchScanIDException nssie){ + queueRetry(cmidToCm); } catch (ThriftSecurityException tse) { AccumuloSecurityException ase = new AccumuloSecurityException(credentials.getPrincipal(), tse.getCode(), Tables.getPrintableTableInfoFromId(instance, tableId), tse); - for (CMK cmk : cmidToCm.values()) - cmk.cm.resultQueue.add(new Result(ase, cmk.cm, location)); + queueException(location, cmidToCm, ase); } catch (TTransportException e) { locator.invalidateCache(location); - for (CMK cmk : cmidToCm.values()) - cmk.cm.resultQueue.add(new Result(Status.UNKNOWN, cmk.cm, location)); + invalidateSession(location, mutations, cmidToCm, sessionId); } catch (TApplicationException tae) { - for (CMK cmk : cmidToCm.values()) - cmk.cm.resultQueue.add(new Result(new AccumuloServerException(location, tae), cmk.cm, location)); + queueException(location, cmidToCm, new AccumuloServerException(location, tae)); } catch (TException e) { locator.invalidateCache(location); - for (CMK cmk : cmidToCm.values()) - cmk.cm.resultQueue.add(new Result(Status.UNKNOWN, cmk.cm, location)); + invalidateSession(location, mutations, cmidToCm, sessionId); } catch (Exception e) { - for (CMK cmk : cmidToCm.values()) - cmk.cm.resultQueue.add(new Result(e, cmk.cm, location)); + queueException(location, cmidToCm, e); + } finally { + ThriftUtil.returnClient((TServiceClient) client); + } + } + + private void queueRetry(Map<Long,CMK> cmidToCm) { + ArrayList<QCMutation> ignored = new ArrayList<QCMutation>(); + for (CMK cmk : cmidToCm.values()) + ignored.add(cmk.cm); + queueRetry(ignored); + } + + private void queueException(String location, Map<Long,CMK> cmidToCm, Exception e) { + for (CMK cmk : cmidToCm.values()) + cmk.cm.resultQueue.add(new Result(e, cmk.cm, location)); + } + + private void invalidateSession(String location, TabletServerMutations<QCMutation> mutations, Map<Long,CMK> cmidToCm, Long sessionId) { + if(sessionId == null){ + queueRetry(cmidToCm); + }else{ + try { + invalidateSession(sessionId, location, mutations); + for (CMK cmk : cmidToCm.values()) + cmk.cm.resultQueue.add(new Result(Status.UNKNOWN, cmk.cm, location)); + }catch(Exception e2){ + queueException(location, cmidToCm, e2); + } + } + } + + /* + * The purpose of this code is to ensure that a conditional mutation will not execute when its status is unknown. This allows a user to read the row when the + * status is unknown and not have to worry about the tserver applying the mutation after the scan. + * + * If a conditional mutation is taking a long time to process, then this method will wait for it to finish... unless this exceeds timeout. + */ + private void invalidateSession(long sessionId, String location, TabletServerMutations<QCMutation> mutations) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + + // TODO could assume tserver will invalidate sessions after a given time period + + ArrayList<QCMutation> mutList = new ArrayList<QCMutation>(); + + for (List<QCMutation> tml : mutations.getMutations().values()) { + mutList.addAll(tml); + } + + while (true) { + Map<String,TabletServerMutations<QCMutation>> binnedMutations = new HashMap<String,TabletLocator.TabletServerMutations<QCMutation>>(); + List<QCMutation> failures = new ArrayList<QCMutation>(); + + locator.binMutations(mutList, binnedMutations, failures, credentials); + + // TODO do failures matter? not if failures only indicates tablets are not assigned + + if (!binnedMutations.containsKey(location)) { + // the tablets are at different locations now, so there is no need to invalidate the session + // TODO could be a case where tablet comes back to tserver and then UNKNOW condMut goes through + return; + } + + try { + // if the mutation is currently processing, this method will block until its done or times out + invalidateSession(sessionId, location); + return; + } catch (TApplicationException tae) { + throw new AccumuloServerException(location, tae); + } catch (TException e) { + locator.invalidateCache(location); + } + + //TODO sleep + } + + } + + private void invalidateSession(long sessionId, String location) throws TException { + TabletClientService.Iface client = null; + + TInfo tinfo = Tracer.traceInfo(); + + try { + client = ThriftUtil.getTServerClient(location, instance.getConfiguration()); + client.invalidateConditionalUpdate(tinfo, sessionId); } finally { ThriftUtil.returnClient((TServiceClient) client); }
