Updated Branches: refs/heads/master b6fcf7b4a -> 8945f1677
ACCUMULO-1152 add tserver lock check to tablet location cache Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/8945f167 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/8945f167 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/8945f167 Branch: refs/heads/master Commit: 8945f1677d066d0a0d8871d1d5e44433c4bb11e5 Parents: b6fcf7b Author: Keith Turner <[email protected]> Authored: Tue Sep 24 10:07:34 2013 -0400 Committer: Keith Turner <[email protected]> Committed: Tue Sep 24 17:42:54 2013 -0400 ---------------------------------------------------------------------- .../core/client/impl/ConditionalWriterImpl.java | 2 +- .../core/client/impl/RootTabletLocator.java | 56 ++++- .../core/client/impl/TabletLocator.java | 35 ++- .../core/client/impl/TabletLocatorImpl.java | 157 +++++++++---- .../core/client/impl/ZookeeperLockChecker.java | 56 +++++ .../core/client/mock/MockTabletLocator.java | 2 +- .../core/metadata/MetadataLocationObtainer.java | 34 +-- .../core/client/impl/TabletLocatorImplTest.java | 229 ++++++++++++++++--- .../server/client/BulkImporterTest.java | 2 +- 9 files changed, 443 insertions(+), 130 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/8945f167/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java index ab56b4b..d7eb144 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java @@ -658,7 +658,7 @@ class ConditionalWriterImpl implements ConditionalWriter { while (true) { if (!ZooLock.isLockHeld(ServerClient.getZooCache(instance), lid)) { - // TODO if ACCUMULO-1152 adds a tserver lock check to the tablet location cache, then this invalidation would prevent future attempts to contact the + // ACCUMULO-1152 added a tserver lock check to the tablet location cache, so this invalidation prevents future attempts to contact the // tserver even its gone zombie and is still running w/o a lock locator.invalidateCache(location); return; http://git-wip-us.apache.org/repos/asf/accumulo/blob/8945f167/core/src/main/java/org/apache/accumulo/core/client/impl/RootTabletLocator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/RootTabletLocator.java b/core/src/main/java/org/apache/accumulo/core/client/impl/RootTabletLocator.java index 0a96bab..1224207 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/RootTabletLocator.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/RootTabletLocator.java @@ -21,36 +21,45 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.impl.TabletLocatorImpl.TabletServerLockChecker; import org.apache.accumulo.core.data.KeyExtent; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.security.Credentials; +import org.apache.accumulo.core.util.OpTimer; import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.ZooCache; import org.apache.hadoop.io.Text; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; public class RootTabletLocator extends TabletLocator { private Instance instance; + private TabletServerLockChecker lockChecker; - RootTabletLocator(Instance instance) { + RootTabletLocator(Instance instance, TabletServerLockChecker lockChecker) { this.instance = instance; + this.lockChecker = lockChecker; } @Override public <T extends Mutation> void binMutations(Credentials credentials, List<T> mutations, Map<String,TabletServerMutations<T>> binnedMutations, List<T> failures) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { - String rootTabletLocation = instance.getRootTabletLocation(); + TabletLocation rootTabletLocation = getRootTabletLocation(); if (rootTabletLocation != null) { - TabletServerMutations<T> tsm = new TabletServerMutations<T>(); + TabletServerMutations<T> tsm = new TabletServerMutations<T>(rootTabletLocation.tablet_session); for (T mutation : mutations) { tsm.addMutation(RootTable.EXTENT, mutation); } - binnedMutations.put(rootTabletLocation, tsm); + binnedMutations.put(rootTabletLocation.tablet_location, tsm); } else { failures.addAll(mutations); } @@ -60,10 +69,10 @@ public class RootTabletLocator extends TabletLocator { public List<Range> binRanges(Credentials credentials, List<Range> ranges, Map<String,Map<KeyExtent,List<Range>>> binnedRanges) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { - String rootTabletLocation = instance.getRootTabletLocation(); + TabletLocation rootTabletLocation = getRootTabletLocation(); if (rootTabletLocation != null) { for (Range range : ranges) { - TabletLocatorImpl.addRange(binnedRanges, rootTabletLocation, RootTable.EXTENT, range); + TabletLocatorImpl.addRange(binnedRanges, rootTabletLocation.tablet_location, RootTable.EXTENT, range); } return Collections.emptyList(); } @@ -77,23 +86,46 @@ public class RootTabletLocator extends TabletLocator { public void invalidateCache(Collection<KeyExtent> keySet) {} @Override - public void invalidateCache(String server) {} + public void invalidateCache(String server) { + ZooCache zooCache = ZooCache.getInstance(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut()); + String root = ZooUtil.getRoot(instance) + Constants.ZTSERVERS; + zooCache.clear(root + "/" + server); + } @Override public void invalidateCache() {} + protected TabletLocation getRootTabletLocation() { + String zRootLocPath = ZooUtil.getRoot(instance) + RootTable.ZROOT_TABLET_LOCATION; + ZooCache zooCache = ZooCache.getInstance(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut()); + + OpTimer opTimer = new OpTimer(Logger.getLogger(this.getClass()), Level.TRACE).start("Looking up root tablet location in zookeeper."); + byte[] loc = zooCache.get(zRootLocPath); + opTimer.stop("Found root tablet at " + (loc == null ? null : new String(loc)) + " in %DURATION%"); + + if (loc == null) { + return null; + } + + String[] tokens = new String(loc).split("\\|"); + + if (lockChecker.isLockHeld(tokens[0], tokens[1])) + return new TabletLocation(RootTable.EXTENT, tokens[0], tokens[1]); + else + return null; + } + @Override public TabletLocation locateTablet(Credentials credentials, Text row, boolean skipRow, boolean retry) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { - String location = instance.getRootTabletLocation(); + TabletLocation location = getRootTabletLocation(); // Always retry when finding the root tablet while (retry && location == null) { UtilWaitThread.sleep(500); - location = instance.getRootTabletLocation(); + location = getRootTabletLocation(); } - if (location != null) - return new TabletLocation(RootTable.EXTENT, location); - return null; + + return location; } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/8945f167/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocator.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocator.java index 294d702..e396d82 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocator.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocator.java @@ -100,11 +100,11 @@ public abstract class TabletLocator { MetadataLocationObtainer mlo = new MetadataLocationObtainer(instance); if (tableId.toString().equals(RootTable.ID)) { - tl = new RootTabletLocator(instance); + tl = new RootTabletLocator(instance, new ZookeeperLockChecker(instance)); } else if (tableId.toString().equals(MetadataTable.ID)) { - tl = new TabletLocatorImpl(new Text(MetadataTable.ID), getLocator(instance, new Text(RootTable.ID)), mlo); + tl = new TabletLocatorImpl(new Text(MetadataTable.ID), getLocator(instance, new Text(RootTable.ID)), mlo, new ZookeeperLockChecker(instance)); } else { - tl = new TabletLocatorImpl(tableId, getLocator(instance, new Text(MetadataTable.ID)), mlo); + tl = new TabletLocatorImpl(tableId, getLocator(instance, new Text(MetadataTable.ID)), mlo, new ZookeeperLockChecker(instance)); } locators.put(key, tl); } @@ -152,18 +152,20 @@ public abstract class TabletLocator { public final KeyExtent tablet_extent; public final String tablet_location; + public final String tablet_session; - public TabletLocation(KeyExtent tablet_extent, String tablet_location) { - ArgumentChecker.notNull(tablet_extent, tablet_location); + public TabletLocation(KeyExtent tablet_extent, String tablet_location, String session) { + ArgumentChecker.notNull(tablet_extent, tablet_location, session); this.tablet_extent = tablet_extent; this.tablet_location = dedupeLocation(tablet_location); + this.tablet_session = dedupeLocation(session); } @Override public boolean equals(Object o) { if (o instanceof TabletLocation) { TabletLocation otl = (TabletLocation) o; - return tablet_extent.equals(otl.tablet_extent) && tablet_location.equals(otl.tablet_location); + return tablet_extent.equals(otl.tablet_extent) && tablet_location.equals(otl.tablet_location) && tablet_session.equals(otl.tablet_session); } return false; } @@ -175,25 +177,30 @@ public abstract class TabletLocator { @Override public String toString() { - return "(" + tablet_extent + "," + tablet_location + ")"; + return "(" + tablet_extent + "," + tablet_location + "," + tablet_session + ")"; } @Override public int compareTo(TabletLocation o) { int result = tablet_extent.compareTo(o.tablet_extent); - if (result == 0) + if (result == 0) { result = tablet_location.compareTo(o.tablet_location); + if (result == 0) + result = tablet_session.compareTo(o.tablet_session); + } return result; } } public static class TabletServerMutations<T extends Mutation> { private Map<KeyExtent,List<T>> mutations; - - public TabletServerMutations() { - mutations = new HashMap<KeyExtent,List<T>>(); + private String tserverSession; + + public TabletServerMutations(String tserverSession) { + this.tserverSession = tserverSession; + this.mutations = new HashMap<KeyExtent,List<T>>(); } - + public void addMutation(KeyExtent ke, T m) { List<T> mutList = mutations.get(ke); if (mutList == null) { @@ -207,5 +214,9 @@ public abstract class TabletLocator { public Map<KeyExtent,List<T>> getMutations() { return mutations; } + + final String getSession() { + return tserverSession; + } } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/8945f167/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocatorImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocatorImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocatorImpl.java index d8873e6..6f4e598 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocatorImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocatorImpl.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -41,6 +42,7 @@ import org.apache.accumulo.core.data.PartialKey; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.security.Credentials; import org.apache.accumulo.core.util.OpTimer; +import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.TextUtil; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.hadoop.io.Text; @@ -85,12 +87,14 @@ public class TabletLocatorImpl extends TabletLocator { protected TabletLocator parent; protected TreeMap<Text,TabletLocation> metaCache = new TreeMap<Text,TabletLocation>(endRowComparator); protected TabletLocationObtainer locationObtainer; + private TabletServerLockChecker lockChecker; protected Text lastTabletRow; private TreeSet<KeyExtent> badExtents = new TreeSet<KeyExtent>(); private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); private final Lock rLock = rwLock.readLock(); private final Lock wLock = rwLock.writeLock(); + public static interface TabletLocationObtainer { /** @@ -103,10 +107,52 @@ public class TabletLocatorImpl extends TabletLocator { throws AccumuloSecurityException, AccumuloException; } - public TabletLocatorImpl(Text table, TabletLocator parent, TabletLocationObtainer tlo) { + public static interface TabletServerLockChecker { + boolean isLockHeld(String tserver, String session); + + void invalidateCache(String server); + } + + private class LockCheckerSession { + + private HashSet<Pair<String,String>> okLocks = new HashSet<Pair<String,String>>(); + private HashSet<Pair<String,String>> invalidLocks = new HashSet<Pair<String,String>>(); + + private TabletLocation checkLock(TabletLocation tl) { + // the goal of this class is to minimize calls out to lockChecker under that assumption that its a resource synchronized among many threads... want to + // avoid fine grained synchronization when binning lots of mutations or ranges... remember decisions from the lockChecker in thread local unsynchronized + // memory + + if (tl == null) + return null; + + Pair<String,String> lock = new Pair<String,String>(tl.tablet_location, tl.tablet_session); + + if (okLocks.contains(lock)) + return tl; + + if (invalidLocks.contains(lock)) + return null; + + if (lockChecker.isLockHeld(tl.tablet_location, tl.tablet_session)) { + okLocks.add(lock); + return tl; + } + + if (log.isTraceEnabled()) + log.trace("Tablet server " + tl.tablet_location + " " + tl.tablet_session + " no longer holds its lock"); + + invalidLocks.add(lock); + + return null; + } + } + + public TabletLocatorImpl(Text table, TabletLocator parent, TabletLocationObtainer tlo, TabletServerLockChecker tslc) { this.tableId = table; this.parent = parent; this.locationObtainer = tlo; + this.lockChecker = tslc; this.lastTabletRow = new Text(tableId); lastTabletRow.append(new byte[] {'<'}, 0, 1); @@ -123,9 +169,11 @@ public class TabletLocatorImpl extends TabletLocator { ArrayList<T> notInCache = new ArrayList<T>(); Text row = new Text(); + LockCheckerSession lcSession = new LockCheckerSession(); + rLock.lock(); try { - processInvalidated(credentials); + processInvalidated(credentials, lcSession); // for this to be efficient rows need to be in sorted order, but always sorting is slow... therefore only sort the // stuff not in the cache.... it is most efficient to pass _locateTablet rows in sorted order @@ -136,11 +184,8 @@ public class TabletLocatorImpl extends TabletLocator { for (T mutation : mutations) { row.set(mutation.getRow()); TabletLocation tl = locateTabletInCache(row); - if (tl == null) + if (tl == null || !addMutation(binnedMutations, mutation, tl, lcSession)) notInCache.add(mutation); - else - addMutation(binnedMutations, mutation, tl); - } } finally { rLock.unlock(); @@ -167,43 +212,55 @@ public class TabletLocatorImpl extends TabletLocator { row.set(mutation.getRow()); - TabletLocation tl = _locateTablet(credentials, row, false, false, false); + TabletLocation tl = _locateTablet(credentials, row, false, false, false, lcSession); - if (tl == null) { + if (tl == null || !addMutation(binnedMutations, mutation, tl, lcSession)) { failures.add(mutation); failed = true; - } else { - addMutation(binnedMutations, mutation, tl); } } } finally { wLock.unlock(); } } - + if (opTimer != null) opTimer.stop("Binned " + mutations.size() + " mutations for table " + tableId + " to " + binnedMutations.size() + " tservers in %DURATION%"); } - - private <T extends Mutation> void addMutation(Map<String,TabletServerMutations<T>> binnedMutations, T mutation, TabletLocation tl) { + + private <T extends Mutation> boolean addMutation(Map<String,TabletServerMutations<T>> binnedMutations, T mutation, TabletLocation tl, + LockCheckerSession lcSession) { TabletServerMutations<T> tsm = binnedMutations.get(tl.tablet_location); if (tsm == null) { - tsm = new TabletServerMutations<T>(); - binnedMutations.put(tl.tablet_location, tsm); + // do lock check once per tserver here to make binning faster + boolean lockHeld = lcSession.checkLock(tl) != null; + if (lockHeld) { + tsm = new TabletServerMutations<T>(tl.tablet_session); + binnedMutations.put(tl.tablet_location, tsm); + } else { + return false; + } } - tsm.addMutation(tl.tablet_extent, mutation); + // its possible the same tserver could be listed with different sessions + if (tsm.getSession().equals(tl.tablet_session)) { + tsm.addMutation(tl.tablet_extent, mutation); + return true; + } + + return false; } - private List<Range> binRanges(Credentials credentials, List<Range> ranges, Map<String,Map<KeyExtent,List<Range>>> binnedRanges, boolean useCache) - throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + private List<Range> binRanges(Credentials credentials, List<Range> ranges, Map<String,Map<KeyExtent,List<Range>>> binnedRanges, boolean useCache, + LockCheckerSession lcSession) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { List<Range> failures = new ArrayList<Range>(); List<TabletLocation> tabletLocations = new ArrayList<TabletLocation>(); boolean lookupFailed = false; l1: for (Range range : ranges) { + tabletLocations.clear(); Text startRow; @@ -216,9 +273,9 @@ public class TabletLocatorImpl extends TabletLocator { TabletLocation tl = null; if (useCache) - tl = locateTabletInCache(startRow); + tl = lcSession.checkLock(locateTabletInCache(startRow)); else if (!lookupFailed) - tl = _locateTablet(credentials, startRow, false, false, false); + tl = _locateTablet(credentials, startRow, false, false, false, lcSession); if (tl == null) { failures.add(range); @@ -233,9 +290,9 @@ public class TabletLocatorImpl extends TabletLocator { if (useCache) { Text row = new Text(tl.tablet_extent.getEndRow()); row.append(new byte[] {0}, 0, 1); - tl = locateTabletInCache(row); + tl = lcSession.checkLock(locateTabletInCache(row)); } else { - tl = _locateTablet(credentials, tl.tablet_extent.getEndRow(), true, false, false); + tl = _locateTablet(credentials, tl.tablet_extent.getEndRow(), true, false, false, lcSession); } if (tl == null) { @@ -246,7 +303,7 @@ public class TabletLocatorImpl extends TabletLocator { } tabletLocations.add(tl); } - + for (TabletLocation tl2 : tabletLocations) { TabletLocatorImpl.addRange(binnedRanges, tl2.tablet_location, tl2.tablet_extent, range); } @@ -269,17 +326,19 @@ public class TabletLocatorImpl extends TabletLocator { if (log.isTraceEnabled()) opTimer = new OpTimer(log, Level.TRACE).start("Binning " + ranges.size() + " ranges for table " + tableId); + LockCheckerSession lcSession = new LockCheckerSession(); + List<Range> failures; rLock.lock(); try { - processInvalidated(credentials); + processInvalidated(credentials, lcSession); // for this to be optimal, need to look ranges up in sorted order when // ranges are not present in cache... however do not want to always // sort ranges... therefore try binning ranges using only the cache // and sort whatever fails and retry - failures = binRanges(credentials, ranges, binnedRanges, true); + failures = binRanges(credentials, ranges, binnedRanges, true, lcSession); } finally { rLock.unlock(); } @@ -291,7 +350,7 @@ public class TabletLocatorImpl extends TabletLocator { // try lookups again wLock.lock(); try { - failures = binRanges(credentials, failures, binnedRanges, false); + failures = binRanges(credentials, failures, binnedRanges, false, lcSession); } finally { wLock.unlock(); } @@ -299,7 +358,7 @@ public class TabletLocatorImpl extends TabletLocator { if (opTimer != null) opTimer.stop("Binned " + ranges.size() + " ranges for table " + tableId + " to " + binnedRanges.size() + " tservers in %DURATION%"); - + return failures; } @@ -342,6 +401,8 @@ public class TabletLocatorImpl extends TabletLocator { wLock.unlock(); } + lockChecker.invalidateCache(server); + if (log.isTraceEnabled()) log.trace("invalidated " + invalidatedCount + " cache entries table=" + tableId + " server=" + server); @@ -372,10 +433,9 @@ public class TabletLocatorImpl extends TabletLocator { while (true) { - TabletLocation tl; - - tl = _locateTablet(credentials, row, skipRow, retry, true); - + LockCheckerSession lcSession = new LockCheckerSession(); + TabletLocation tl = _locateTablet(credentials, row, skipRow, retry, true, lcSession); + if (retry && tl == null) { UtilWaitThread.sleep(100); if (log.isTraceEnabled()) @@ -390,7 +450,8 @@ public class TabletLocatorImpl extends TabletLocator { } } - private void lookupTabletLocation(Credentials credentials, Text row, boolean retry) throws AccumuloException, AccumuloSecurityException, + private void lookupTabletLocation(Credentials credentials, Text row, boolean retry, LockCheckerSession lcSession) throws AccumuloException, + AccumuloSecurityException, TableNotFoundException { Text metadataRow = new Text(tableId); metadataRow.append(new byte[] {';'}, 0, 1); @@ -428,7 +489,8 @@ public class TabletLocatorImpl extends TabletLocator { // create new location if current prevEndRow == endRow if ((lastEndRow != null) && (ke.getPrevEndRow() != null) && ke.getPrevEndRow().equals(lastEndRow)) { - locToCache = new TabletLocation(new KeyExtent(ke.getTableId(), ke.getEndRow(), lastEndRow), tabletLocation.tablet_location); + locToCache = new TabletLocation(new KeyExtent(ke.getTableId(), ke.getEndRow(), lastEndRow), tabletLocation.tablet_location, + tabletLocation.tablet_session); } else { locToCache = tabletLocation; } @@ -436,13 +498,13 @@ public class TabletLocatorImpl extends TabletLocator { // save endRow for next iteration lastEndRow = locToCache.tablet_extent.getEndRow(); - updateCache(locToCache); + updateCache(locToCache, lcSession); } } } - private void updateCache(TabletLocation tabletLocation) { + private void updateCache(TabletLocation tabletLocation, LockCheckerSession lcSession) { if (!tabletLocation.tablet_extent.getTableId().equals(tableId)) { // sanity check throw new IllegalStateException("Unexpected extent returned " + tableId + " " + tabletLocation.tablet_extent); @@ -461,6 +523,10 @@ public class TabletLocatorImpl extends TabletLocator { // clear out any overlapping extents in cache removeOverlapping(metaCache, tabletLocation.tablet_extent); + // do not add to cache unless lock is held + if (lcSession.checkLock(tabletLocation) == null) + return; + // add it to cache Text er = tabletLocation.tablet_extent.getEndRow(); if (er == null) @@ -510,20 +576,22 @@ public class TabletLocatorImpl extends TabletLocator { extents.remove(overlapping); } } - + private TabletLocation locateTabletInCache(Text row) { Entry<Text,TabletLocation> entry = metaCache.ceilingEntry(row); if (entry != null) { KeyExtent ke = entry.getValue().tablet_extent; - if (ke.getPrevEndRow() == null || ke.getPrevEndRow().compareTo(row) < 0) + if (ke.getPrevEndRow() == null || ke.getPrevEndRow().compareTo(row) < 0) { return entry.getValue(); + } } return null; } - protected TabletLocation _locateTablet(Credentials credentials, Text row, boolean skipRow, boolean retry, boolean lock) throws AccumuloException, + protected TabletLocation _locateTablet(Credentials credentials, Text row, boolean skipRow, boolean retry, boolean lock, LockCheckerSession lcSession) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException { if (skipRow) { @@ -536,8 +604,8 @@ public class TabletLocatorImpl extends TabletLocator { if (lock) rLock.lock(); try { - processInvalidated(credentials); - tl = locateTabletInCache(row); + processInvalidated(credentials, lcSession); + tl = lcSession.checkLock(locateTabletInCache(row)); } finally { if (lock) rLock.unlock(); @@ -548,9 +616,9 @@ public class TabletLocatorImpl extends TabletLocator { wLock.lock(); try { // not in cache, so obtain info - lookupTabletLocation(credentials, row, retry); + lookupTabletLocation(credentials, row, retry, lcSession); - tl = locateTabletInCache(row); + tl = lcSession.checkLock(locateTabletInCache(row)); } finally { if (lock) wLock.unlock(); @@ -560,7 +628,8 @@ public class TabletLocatorImpl extends TabletLocator { return tl; } - private void processInvalidated(Credentials credentials) throws AccumuloSecurityException, AccumuloException, TableNotFoundException { + private void processInvalidated(Credentials credentials, LockCheckerSession lcSession) throws AccumuloSecurityException, AccumuloException, + TableNotFoundException { if (badExtents.size() == 0) return; @@ -595,7 +664,7 @@ public class TabletLocatorImpl extends TabletLocator { List<TabletLocation> locations = locationObtainer.lookupTablets(credentials, tserver, binnedRanges.get(tserver), parent); for (TabletLocation tabletLocation : locations) { - updateCache(tabletLocation); + updateCache(tabletLocation, lcSession); } } } finally { http://git-wip-us.apache.org/repos/asf/accumulo/blob/8945f167/core/src/main/java/org/apache/accumulo/core/client/impl/ZookeeperLockChecker.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ZookeeperLockChecker.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ZookeeperLockChecker.java new file mode 100644 index 0000000..85bd171 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ZookeeperLockChecker.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client.impl; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.impl.TabletLocatorImpl.TabletServerLockChecker; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.ZooCache; +import org.apache.accumulo.fate.zookeeper.ZooLock; +import org.apache.zookeeper.KeeperException; + +/** + * + */ +public class ZookeeperLockChecker implements TabletServerLockChecker { + + private ZooCache zc; + private String root; + + ZookeeperLockChecker(Instance instance) { + zc = ZooCache.getInstance(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut()); + this.root = ZooUtil.getRoot(instance) + Constants.ZTSERVERS; + } + + @Override + public boolean isLockHeld(String tserver, String session) { + try { + return ZooLock.getSessionId(zc, root + "/" + tserver) == Long.parseLong(session, 16); + } catch (KeeperException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + public void invalidateCache(String tserver) { + zc.clear(root + "/" + tserver); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/8945f167/core/src/main/java/org/apache/accumulo/core/client/mock/MockTabletLocator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mock/MockTabletLocator.java b/core/src/main/java/org/apache/accumulo/core/client/mock/MockTabletLocator.java index b3458ca..6bd01a9 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mock/MockTabletLocator.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mock/MockTabletLocator.java @@ -43,7 +43,7 @@ public class MockTabletLocator extends TabletLocator { @Override public <T extends Mutation> void binMutations(Credentials credentials, List<T> mutations, Map<String,TabletServerMutations<T>> binnedMutations, List<T> failures) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { - TabletServerMutations<T> tsm = new TabletServerMutations<T>(); + TabletServerMutations<T> tsm = new TabletServerMutations<T>("5"); for (T m : mutations) tsm.addMutation(new KeyExtent(), m); binnedMutations.put("", tsm); http://git-wip-us.apache.org/repos/asf/accumulo/blob/8945f167/core/src/main/java/org/apache/accumulo/core/metadata/MetadataLocationObtainer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/MetadataLocationObtainer.java b/core/src/main/java/org/apache/accumulo/core/metadata/MetadataLocationObtainer.java index def9507..7d6312d 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/MetadataLocationObtainer.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/MetadataLocationObtainer.java @@ -54,7 +54,6 @@ import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.Credentials; import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException; import org.apache.accumulo.core.util.OpTimer; -import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.TextUtil; import org.apache.hadoop.io.Text; import org.apache.log4j.Level; @@ -81,8 +80,6 @@ public class MetadataLocationObtainer implements TabletLocationObtainer { throws AccumuloSecurityException, AccumuloException { try { - ArrayList<TabletLocation> list = new ArrayList<TabletLocation>(); - OpTimer opTimer = null; if (log.isTraceEnabled()) opTimer = new OpTimer(log, Level.TRACE).start("Looking up in " + src.tablet_extent.getTableId() + " row=" + TextUtil.truncate(row) + " extent=" @@ -118,13 +115,7 @@ public class MetadataLocationObtainer implements TabletLocationObtainer { // System.out.println("results "+results.keySet()); - Pair<SortedMap<KeyExtent,Text>,List<KeyExtent>> metadata = MetadataLocationObtainer.getMetadataLocationEntries(results); - - for (Entry<KeyExtent,Text> entry : metadata.getFirst().entrySet()) { - list.add(new TabletLocation(entry.getKey(), entry.getValue().toString())); - } - - return new TabletLocations(list, metadata.getSecond()); + return MetadataLocationObtainer.getMetadataLocationEntries(results); } catch (AccumuloServerException ase) { if (log.isTraceEnabled()) @@ -158,9 +149,7 @@ public class MetadataLocationObtainer implements TabletLocationObtainer { throws AccumuloSecurityException, AccumuloException { final TreeMap<Key,Value> results = new TreeMap<Key,Value>(); - - ArrayList<TabletLocation> list = new ArrayList<TabletLocation>(); - + ResultReceiver rr = new ResultReceiver() { @Override @@ -204,23 +193,18 @@ public class MetadataLocationObtainer implements TabletLocationObtainer { throw e; } - SortedMap<KeyExtent,Text> metadata = MetadataLocationObtainer.getMetadataLocationEntries(results).getFirst(); - - for (Entry<KeyExtent,Text> entry : metadata.entrySet()) { - list.add(new TabletLocation(entry.getKey(), entry.getValue().toString())); - } - - return list; + return MetadataLocationObtainer.getMetadataLocationEntries(results).getLocations(); } - public static Pair<SortedMap<KeyExtent,Text>,List<KeyExtent>> getMetadataLocationEntries(SortedMap<Key,Value> entries) { + public static TabletLocations getMetadataLocationEntries(SortedMap<Key,Value> entries) { Key key; Value val; Text location = null; + Text session = null; Value prevRow = null; KeyExtent ke; - SortedMap<KeyExtent,Text> results = new TreeMap<KeyExtent,Text>(); + List<TabletLocation> results = new ArrayList<TabletLocation>(); ArrayList<KeyExtent> locationless = new ArrayList<KeyExtent>(); Text lastRowFromKey = new Text(); @@ -236,6 +220,7 @@ public class MetadataLocationObtainer implements TabletLocationObtainer { if (key.compareRow(lastRowFromKey) != 0) { prevRow = null; location = null; + session = null; key.getRow(lastRowFromKey); } @@ -248,6 +233,7 @@ public class MetadataLocationObtainer implements TabletLocationObtainer { throw new IllegalStateException("Tablet has multiple locations : " + lastRowFromKey); } location = new Text(val.toString()); + session = new Text(colq); } else if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.equals(colf, colq)) { prevRow = new Value(val); } @@ -255,7 +241,7 @@ public class MetadataLocationObtainer implements TabletLocationObtainer { if (prevRow != null) { ke = new KeyExtent(key.getRow(), prevRow); if (location != null) - results.put(ke, location); + results.add(new TabletLocation(ke, location.toString(), session.toString())); else locationless.add(ke); @@ -264,6 +250,6 @@ public class MetadataLocationObtainer implements TabletLocationObtainer { } } - return new Pair<SortedMap<KeyExtent,Text>,List<KeyExtent>>(results, locationless); + return new TabletLocations(results, locationless); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/8945f167/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java b/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java index 13350d5..f4fe0a6 100644 --- a/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java +++ b/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java @@ -39,6 +39,7 @@ import org.apache.accumulo.core.client.impl.TabletLocator.TabletLocation; import org.apache.accumulo.core.client.impl.TabletLocator.TabletLocations; import org.apache.accumulo.core.client.impl.TabletLocator.TabletServerMutations; import org.apache.accumulo.core.client.impl.TabletLocatorImpl.TabletLocationObtainer; +import org.apache.accumulo.core.client.impl.TabletLocatorImpl.TabletServerLockChecker; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.data.Key; @@ -52,7 +53,6 @@ import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; import org.apache.accumulo.core.security.Credentials; -import org.apache.accumulo.core.util.Pair; import org.apache.hadoop.io.Text; public class TabletLocatorImplTest extends TestCase { @@ -111,7 +111,7 @@ public class TabletLocatorImplTest extends TestCase { for (int i = 0; i < data.length; i += 2) { KeyExtent ke = (KeyExtent) data[i]; String loc = (String) data[i + 1]; - mcke.put(ke, new TabletLocation(ke, loc)); + mcke.put(ke, new TabletLocation(ke, loc, "1")); } return mcke; @@ -132,16 +132,16 @@ public class TabletLocatorImplTest extends TestCase { return mc; } - static TabletLocatorImpl createLocators(TServers tservers, String rootTabLoc, String metaTabLoc, String table, Object... data) { + static TabletLocatorImpl createLocators(TServers tservers, String rootTabLoc, String metaTabLoc, String table, TabletServerLockChecker tslc, Object... data) { TreeMap<KeyExtent,TabletLocation> mcke = createMetaCacheKE(data); TestTabletLocationObtainer ttlo = new TestTabletLocationObtainer(tservers); TestInstance testInstance = new TestInstance("instance1", "tserver1"); - RootTabletLocator rtl = new RootTabletLocator(testInstance); - TabletLocatorImpl rootTabletCache = new TabletLocatorImpl(new Text(MetadataTable.ID), rtl, ttlo); - TabletLocatorImpl tab1TabletCache = new TabletLocatorImpl(new Text(table), rootTabletCache, ttlo); + RootTabletLocator rtl = new TestRootTabletLocator(testInstance); + TabletLocatorImpl rootTabletCache = new TabletLocatorImpl(new Text(MetadataTable.ID), rtl, ttlo, new YesLockChecker()); + TabletLocatorImpl tab1TabletCache = new TabletLocatorImpl(new Text(table), rootTabletCache, ttlo, tslc); setLocation(tservers, rootTabLoc, RTE, MTE, metaTabLoc); @@ -153,6 +153,10 @@ public class TabletLocatorImplTest extends TestCase { } + static TabletLocatorImpl createLocators(TServers tservers, String rootTabLoc, String metaTabLoc, String table, Object... data) { + return createLocators(tservers, rootTabLoc, metaTabLoc, table, new YesLockChecker(), data); + } + static TabletLocatorImpl createLocators(String table, Object... data) { TServers tservers = new TServers(); return createLocators(tservers, "tserver1", "tserver2", table, data); @@ -475,9 +479,7 @@ public class TabletLocatorImplTest extends TestCase { // System.out.println("lookupTablet("+src+","+row+","+stopRow+","+ parent+")"); // System.out.println(tservers); - - ArrayList<TabletLocation> list = new ArrayList<TabletLocation>(); - + Map<KeyExtent,SortedMap<Key,Value>> tablets = tservers.get(src.tablet_location); if (tablets == null) { @@ -500,13 +502,7 @@ public class TabletLocatorImplTest extends TestCase { SortedMap<Key,Value> results = tabletData.tailMap(startKey).headMap(stopKey); - Pair<SortedMap<KeyExtent,Text>,List<KeyExtent>> metadata = MetadataLocationObtainer.getMetadataLocationEntries(results); - - for (Entry<KeyExtent,Text> entry : metadata.getFirst().entrySet()) { - list.add(new TabletLocation(entry.getKey(), entry.getValue().toString())); - } - - return new TabletLocations(list, metadata.getSecond()); + return MetadataLocationObtainer.getMetadataLocationEntries(results); } @Override @@ -556,18 +552,41 @@ public class TabletLocatorImplTest extends TestCase { if (failures.size() > 0) parent.invalidateCache(failures); - SortedMap<KeyExtent,Text> metadata = MetadataLocationObtainer.getMetadataLocationEntries(results).getFirst(); - - for (Entry<KeyExtent,Text> entry : metadata.entrySet()) { - list.add(new TabletLocation(entry.getKey(), entry.getValue().toString())); - } - - return list; + return MetadataLocationObtainer.getMetadataLocationEntries(results).getLocations(); } } + static class YesLockChecker implements TabletServerLockChecker { + @Override + public boolean isLockHeld(String tserver, String session) { + return true; + } + + @Override + public void invalidateCache(String server) {} + } + + static class TestRootTabletLocator extends RootTabletLocator { + + private Instance instance; + + TestRootTabletLocator(Instance instance) { + super(instance, new YesLockChecker()); + this.instance = instance; + } + + @Override + protected TabletLocation getRootTabletLocation() { + return new TabletLocation(RootTable.EXTENT, instance.getRootTabletLocation(), "1"); + } + + @Override + public void invalidateCache(String server) {} + + } + static void createEmptyTablet(TServers tservers, String server, KeyExtent tablet) { Map<KeyExtent,SortedMap<Key,Value>> tablets = tservers.tservers.get(server); if (tablets == null) { @@ -584,6 +603,23 @@ public class TabletLocatorImplTest extends TestCase { } } + static void clearLocation(TServers tservers, String server, KeyExtent tablet, KeyExtent ke, String instance) { + Map<KeyExtent,SortedMap<Key,Value>> tablets = tservers.tservers.get(server); + if (tablets == null) { + return; + } + + SortedMap<Key,Value> tabletData = tablets.get(tablet); + if (tabletData == null) { + return; + } + + Text mr = ke.getMetadataEntry(); + Key lk = new Key(mr, TabletsSection.CurrentLocationColumnFamily.NAME, new Text(instance)); + tabletData.remove(lk); + + } + static void setLocation(TServers tservers, String server, KeyExtent tablet, KeyExtent ke, String location, String instance) { Map<KeyExtent,SortedMap<Key,Value>> tablets = tservers.tservers.get(server); if (tablets == null) { @@ -645,9 +681,9 @@ public class TabletLocatorImplTest extends TestCase { TestTabletLocationObtainer ttlo = new TestTabletLocationObtainer(tservers); TestInstance testInstance = new TestInstance("instance1", "tserver1"); - RootTabletLocator rtl = new RootTabletLocator(testInstance); - TabletLocatorImpl rootTabletCache = new TabletLocatorImpl(new Text(MetadataTable.ID), rtl, ttlo); - TabletLocatorImpl tab1TabletCache = new TabletLocatorImpl(new Text("tab1"), rootTabletCache, ttlo); + RootTabletLocator rtl = new TestRootTabletLocator(testInstance); + TabletLocatorImpl rootTabletCache = new TabletLocatorImpl(new Text(MetadataTable.ID), rtl, ttlo, new YesLockChecker()); + TabletLocatorImpl tab1TabletCache = new TabletLocatorImpl(new Text("tab1"), rootTabletCache, ttlo, new YesLockChecker()); locateTabletTest(tab1TabletCache, "r1", null, null, credentials); @@ -1177,9 +1213,9 @@ public class TabletLocatorImplTest extends TestCase { TestTabletLocationObtainer ttlo = new TestTabletLocationObtainer(tservers); TestInstance testInstance = new TestInstance("instance1", "tserver1"); - RootTabletLocator rtl = new RootTabletLocator(testInstance); - TabletLocatorImpl rootTabletCache = new TabletLocatorImpl(new Text(MetadataTable.ID), rtl, ttlo); - TabletLocatorImpl tab0TabletCache = new TabletLocatorImpl(new Text("0"), rootTabletCache, ttlo); + RootTabletLocator rtl = new TestRootTabletLocator(testInstance); + TabletLocatorImpl rootTabletCache = new TabletLocatorImpl(new Text(MetadataTable.ID), rtl, ttlo, new YesLockChecker()); + TabletLocatorImpl tab0TabletCache = new TabletLocatorImpl(new Text("0"), rootTabletCache, ttlo, new YesLockChecker()); setLocation(tservers, "tserver1", RTE, mte1, "tserver2"); setLocation(tservers, "tserver1", RTE, mte2, "tserver3"); @@ -1204,9 +1240,9 @@ public class TabletLocatorImplTest extends TestCase { TestTabletLocationObtainer ttlo = new TestTabletLocationObtainer(tservers); TestInstance testInstance = new TestInstance("instance1", "tserver1"); - RootTabletLocator rtl = new RootTabletLocator(testInstance); - TabletLocatorImpl rootTabletCache = new TabletLocatorImpl(new Text(MetadataTable.ID), rtl, ttlo); - TabletLocatorImpl tab0TabletCache = new TabletLocatorImpl(new Text("0"), rootTabletCache, ttlo); + RootTabletLocator rtl = new TestRootTabletLocator(testInstance); + TabletLocatorImpl rootTabletCache = new TabletLocatorImpl(new Text(MetadataTable.ID), rtl, ttlo, new YesLockChecker()); + TabletLocatorImpl tab0TabletCache = new TabletLocatorImpl(new Text("0"), rootTabletCache, ttlo, new YesLockChecker()); setLocation(tservers, "tserver1", RTE, mte1, "tserver2"); setLocation(tservers, "tserver1", RTE, mte2, "tserver3"); @@ -1234,10 +1270,10 @@ public class TabletLocatorImplTest extends TestCase { TestTabletLocationObtainer ttlo = new TestTabletLocationObtainer(tservers); TestInstance testInstance = new TestInstance("instance1", "tserver1"); - RootTabletLocator rtl = new RootTabletLocator(testInstance); + RootTabletLocator rtl = new TestRootTabletLocator(testInstance); - TabletLocatorImpl rootTabletCache = new TabletLocatorImpl(new Text(MetadataTable.ID), rtl, ttlo); - TabletLocatorImpl tab0TabletCache = new TabletLocatorImpl(new Text("1"), rootTabletCache, ttlo); + TabletLocatorImpl rootTabletCache = new TabletLocatorImpl(new Text(MetadataTable.ID), rtl, ttlo, new YesLockChecker()); + TabletLocatorImpl tab0TabletCache = new TabletLocatorImpl(new Text("1"), rootTabletCache, ttlo, new YesLockChecker()); setLocation(tservers, "tserver1", RTE, mte1, "tserver2"); setLocation(tservers, "tserver1", RTE, mte2, "tserver3"); @@ -1274,4 +1310,127 @@ public class TabletLocatorImplTest extends TestCase { } } + + public void testLostLock() throws Exception { + + final HashSet<String> activeLocks = new HashSet<String>(); + + TServers tservers = new TServers(); + TabletLocatorImpl metaCache = createLocators(tservers, "tserver1", "tserver2", "foo", new TabletServerLockChecker() { + @Override + public boolean isLockHeld(String tserver, String session) { + return activeLocks.contains(tserver + ":" + session); + } + + @Override + public void invalidateCache(String server) {} + }); + + KeyExtent ke1 = nke("foo", null, null); + setLocation(tservers, "tserver2", MTE, ke1, "L1", "5"); + + activeLocks.add("L1:5"); + + locateTabletTest(metaCache, "a", ke1, "L1", credentials); + locateTabletTest(metaCache, "a", ke1, "L1", credentials); + + activeLocks.clear(); + + locateTabletTest(metaCache, "a", null, null, credentials); + locateTabletTest(metaCache, "a", null, null, credentials); + locateTabletTest(metaCache, "a", null, null, credentials); + + clearLocation(tservers, "tserver2", MTE, ke1, "5"); + setLocation(tservers, "tserver2", MTE, ke1, "L2", "6"); + + activeLocks.add("L2:6"); + + locateTabletTest(metaCache, "a", ke1, "L2", credentials); + locateTabletTest(metaCache, "a", ke1, "L2", credentials); + + clearLocation(tservers, "tserver2", MTE, ke1, "6"); + + locateTabletTest(metaCache, "a", ke1, "L2", credentials); + + setLocation(tservers, "tserver2", MTE, ke1, "L3", "7"); + + locateTabletTest(metaCache, "a", ke1, "L2", credentials); + + activeLocks.clear(); + + locateTabletTest(metaCache, "a", null, null, credentials); + locateTabletTest(metaCache, "a", null, null, credentials); + + activeLocks.add("L3:7"); + + locateTabletTest(metaCache, "a", ke1, "L3", credentials); + locateTabletTest(metaCache, "a", ke1, "L3", credentials); + + List<Mutation> ml = nml(nm("a", "cf1:cq1=v1", "cf1:cq2=v2"), nm("w", "cf1:cq3=v3")); + Map<String,Map<KeyExtent,List<String>>> emb = cemb(nol("a", "L3", ke1), nol("w", "L3", ke1)); + runTest(metaCache, ml, emb); + + clearLocation(tservers, "tserver2", MTE, ke1, "7"); + + runTest(metaCache, ml, emb); + + activeLocks.clear(); + + emb.clear(); + + runTest(metaCache, ml, emb, "a", "w"); + runTest(metaCache, ml, emb, "a", "w"); + + KeyExtent ke11 = nke("foo", "m", null); + KeyExtent ke12 = nke("foo", null, "m"); + + setLocation(tservers, "tserver2", MTE, ke11, "L1", "8"); + setLocation(tservers, "tserver2", MTE, ke12, "L2", "9"); + + runTest(metaCache, ml, emb, "a", "w"); + + activeLocks.add("L1:8"); + + emb = cemb(nol("a", "L1", ke11)); + runTest(metaCache, ml, emb, "w"); + + activeLocks.add("L2:9"); + + emb = cemb(nol("a", "L1", ke11), nol("w", "L2", ke12)); + runTest(metaCache, ml, emb); + + List<Range> ranges = nrl(new Range("a"), nr("b", "o"), nr("r", "z")); + Map<String,Map<KeyExtent,List<Range>>> expected = createExpectedBinnings("L1", nol(ke11, nrl(new Range("a"), nr("b", "o"))), "L2", + nol(ke12, nrl(nr("b", "o"), nr("r", "z")))); + + runTest(null, ranges, metaCache, expected); + + activeLocks.remove("L2:9"); + + expected = createExpectedBinnings("L1", nol(ke11, nrl(new Range("a")))); + runTest(null, ranges, metaCache, expected, nrl(nr("b", "o"), nr("r", "z"))); + + activeLocks.clear(); + + expected = createExpectedBinnings(); + runTest(null, ranges, metaCache, expected, nrl(new Range("a"), nr("b", "o"), nr("r", "z"))); + + clearLocation(tservers, "tserver2", MTE, ke11, "8"); + clearLocation(tservers, "tserver2", MTE, ke12, "9"); + setLocation(tservers, "tserver2", MTE, ke11, "L3", "10"); + setLocation(tservers, "tserver2", MTE, ke12, "L4", "11"); + + runTest(null, ranges, metaCache, expected, nrl(new Range("a"), nr("b", "o"), nr("r", "z"))); + + activeLocks.add("L3:10"); + + expected = createExpectedBinnings("L3", nol(ke11, nrl(new Range("a")))); + runTest(null, ranges, metaCache, expected, nrl(nr("b", "o"), nr("r", "z"))); + + activeLocks.add("L4:11"); + + expected = createExpectedBinnings("L3", nol(ke11, nrl(new Range("a"), nr("b", "o"))), "L4", nol(ke12, nrl(nr("b", "o"), nr("r", "z")))); + runTest(null, ranges, metaCache, expected); + } + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/8945f167/server/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java b/server/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java index fb4a3dc..80b3eda 100644 --- a/server/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java +++ b/server/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java @@ -63,7 +63,7 @@ public class BulkImporterTest { @Override public TabletLocation locateTablet(Credentials credentials, Text row, boolean skipRow, boolean retry) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { - return new TabletLocation(fakeMetaData.tailSet(new KeyExtent(tableId, row, null)).first(), "localhost"); + return new TabletLocation(fakeMetaData.tailSet(new KeyExtent(tableId, row, null)).first(), "localhost", "1"); } @Override
