Author: jimk Date: Sat Sep 15 14:27:10 2007 New Revision: 575986 URL: http://svn.apache.org/viewvc?rev=575986&view=rev Log: HADOOP-1888 NullPointerException in HMemcacheScanner (reprise)
Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHMemcache.java Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?rev=575986&r1=575985&r2=575986&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original) +++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Sat Sep 15 14:27:10 2007 @@ -47,7 +47,7 @@ HADOOP-1847 Many HBase tests do not fail well. (phase 2) HADOOP-1870 Once file system failure has been detected, don't check it again and get on with shutting down the hbase cluster. - HADOOP-1888 NullPointerException in HMemcacheScanner + HADOOP-1888 NullPointerException in HMemcacheScanner (reprise) HADOOP-1903 Possible data loss if Exception happens between snapshot and flush to disk. Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java?rev=575986&r1=575985&r2=575986&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java Sat Sep 15 14:27:10 2007 @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -43,10 +44,10 @@ // Note that since these structures are always accessed with a lock held, // no additional synchronization is required. - TreeMap<HStoreKey, byte []> memcache = new TreeMap<HStoreKey, byte []>(); - final ArrayList<TreeMap<HStoreKey, byte []>> history = - new ArrayList<TreeMap<HStoreKey, byte []>>(); - TreeMap<HStoreKey, byte []> snapshot = null; + volatile SortedMap<HStoreKey, byte []> memcache; + List<SortedMap<HStoreKey, byte []>> history = + Collections.synchronizedList(new ArrayList<SortedMap<HStoreKey, byte []>>()); + volatile SortedMap<HStoreKey, byte []> snapshot = null; final HLocking lock = new HLocking(); @@ -62,14 +63,16 @@ */ public HMemcache() { super(); + memcache = + Collections.synchronizedSortedMap(new TreeMap<HStoreKey, byte []>()); } /** represents the state of the memcache at a specified point in time */ static class Snapshot { - final TreeMap<HStoreKey, byte []> memcacheSnapshot; + final SortedMap<HStoreKey, byte []> memcacheSnapshot; final long sequenceId; - Snapshot(final TreeMap<HStoreKey, byte[]> memcache, final Long i) { + Snapshot(final SortedMap<HStoreKey, byte[]> memcache, final Long i) { super(); this.memcacheSnapshot = memcache; this.sequenceId = i.longValue(); @@ -103,8 +106,11 @@ new Snapshot(memcache, Long.valueOf(log.startCacheFlush())); // From here on, any failure is catastrophic requiring replay of hlog this.snapshot = memcache; - history.add(memcache); - memcache = new TreeMap<HStoreKey, byte []>(); + synchronized (history) { + history.add(memcache); + } + memcache = + Collections.synchronizedSortedMap(new TreeMap<HStoreKey, byte []>()); // Reset size of this memcache. this.size.set(0); return retval; @@ -126,14 +132,8 @@ if(snapshot == null) { throw new IOException("Snapshot not present!"); } - for (Iterator<TreeMap<HStoreKey, byte []>> it = history.iterator(); - it.hasNext(); ) { - - TreeMap<HStoreKey, byte []> cur = it.next(); - if (snapshot == cur) { - it.remove(); - break; - } + synchronized (history) { + history.remove(snapshot); } this.snapshot = null; } finally { @@ -182,12 +182,14 @@ this.lock.obtainReadLock(); try { ArrayList<byte []> results = get(memcache, key, numVersions); - for (int i = history.size() - 1; i >= 0; i--) { - if (numVersions > 0 && results.size() >= numVersions) { - break; + synchronized (history) { + for (int i = history.size() - 1; i >= 0; i--) { + if (numVersions > 0 && results.size() >= numVersions) { + break; + } + results.addAll(results.size(), + get(history.get(i), key, numVersions - results.size())); } - results.addAll(results.size(), - get(history.get(i), key, numVersions - results.size())); } return (results.size() == 0) ? null : ImmutableBytesWritable.toArray(results); @@ -210,9 +212,11 @@ this.lock.obtainReadLock(); try { internalGetFull(memcache, key, results); - for (int i = history.size() - 1; i >= 0; i--) { - TreeMap<HStoreKey, byte []> cur = history.get(i); - internalGetFull(cur, key, results); + synchronized (history) { + for (int i = history.size() - 1; i >= 0; i--) { + SortedMap<HStoreKey, byte []> cur = history.get(i); + internalGetFull(cur, key, results); + } } return results; @@ -221,7 +225,7 @@ } } - void internalGetFull(TreeMap<HStoreKey, byte []> map, HStoreKey key, + void internalGetFull(SortedMap<HStoreKey, byte []> map, HStoreKey key, TreeMap<Text, byte []> results) { SortedMap<HStoreKey, byte []> tailMap = map.tailMap(key); for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) { @@ -252,7 +256,7 @@ * @return Ordered list of items found in passed <code>map</code>. If no * matching values, returns an empty list (does not return null). */ - ArrayList<byte []> get(final TreeMap<HStoreKey, byte []> map, + ArrayList<byte []> get(final SortedMap<HStoreKey, byte []> map, final HStoreKey key, final int numVersions) { ArrayList<byte []> result = new ArrayList<byte []>(); // TODO: If get is of a particular version -- numVersions == 1 -- we @@ -289,10 +293,12 @@ this.lock.obtainReadLock(); try { List<HStoreKey> results = getKeys(this.memcache, origin, versions); - for (int i = history.size() - 1; i >= 0; i--) { - results.addAll(results.size(), getKeys(history.get(i), origin, - versions == HConstants.ALL_VERSIONS ? versions : - (versions - results.size()))); + synchronized (history) { + for (int i = history.size() - 1; i >= 0; i--) { + results.addAll(results.size(), getKeys(history.get(i), origin, + versions == HConstants.ALL_VERSIONS ? versions : + (versions - results.size()))); + } } return results; } finally { @@ -308,7 +314,7 @@ * equal or older timestamp. If no keys, returns an empty List. Does not * return null. */ - private List<HStoreKey> getKeys(final TreeMap<HStoreKey, byte []> map, + private List<HStoreKey> getKeys(final SortedMap<HStoreKey, byte []> map, final HStoreKey origin, final int versions) { List<HStoreKey> result = new ArrayList<HStoreKey>(); SortedMap<HStoreKey, byte []> tailMap = map.tailMap(origin); @@ -360,7 +366,7 @@ ////////////////////////////////////////////////////////////////////////////// class HMemcacheScanner extends HAbstractScanner { - final TreeMap<HStoreKey, byte []> backingMaps[]; + SortedMap<HStoreKey, byte []> backingMaps[]; final Iterator<HStoreKey> keyIterators[]; @SuppressWarnings("unchecked") @@ -370,14 +376,16 @@ super(timestamp, targetCols); lock.obtainReadLock(); try { - this.backingMaps = new TreeMap[history.size() + 1]; + synchronized (history) { + this.backingMaps = new SortedMap[history.size() + 1]; - // Note that since we iterate through the backing maps from 0 to n, we - // need to put the memcache first, the newest history second, ..., etc. + // Note that since we iterate through the backing maps from 0 to n, we + // need to put the memcache first, the newest history second, ..., etc. - backingMaps[0] = memcache; - for (int i = history.size() - 1; i > 0; i--) { - backingMaps[i + 1] = history.get(i); + backingMaps[0] = memcache; + for (int i = history.size() - 1; i >= 0; i--) { + backingMaps[i + 1] = history.get(i); + } } this.keyIterators = new Iterator[backingMaps.length]; @@ -388,9 +396,13 @@ HStoreKey firstKey = new HStoreKey(firstRow); for (int i = 0; i < backingMaps.length; i++) { - keyIterators[i] = firstRow.getLength() != 0 ? - backingMaps[i].tailMap(firstKey).keySet().iterator() : - backingMaps[i].keySet().iterator(); + if (firstRow != null && firstRow.getLength() != 0) { + keyIterators[i] = + backingMaps[i].tailMap(firstKey).keySet().iterator(); + + } else { + keyIterators[i] = backingMaps[i].keySet().iterator(); + } while (getNext(i)) { if (!findFirstRow(i, firstRow)) { Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java?rev=575986&r1=575985&r2=575986&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java Sat Sep 15 14:27:10 2007 @@ -1615,6 +1615,7 @@ return multipleMatchers; } + /** [EMAIL PROTECTED] */ public boolean next(HStoreKey key, SortedMap<Text, byte[]> results) throws IOException { // Filtered flag is set by filters. If a cell has been 'filtered out' Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java?rev=575986&r1=575985&r2=575986&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java Sat Sep 15 14:27:10 2007 @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Random; +import java.util.SortedMap; import java.util.TreeMap; import java.util.Vector; import java.util.Map.Entry; @@ -439,13 +440,13 @@ * @param logCacheFlushId flush sequence number * @throws IOException */ - void flushCache(final TreeMap<HStoreKey, byte []> inputCache, + void flushCache(final SortedMap<HStoreKey, byte []> inputCache, final long logCacheFlushId) throws IOException { flushCacheHelper(inputCache, logCacheFlushId, true); } - void flushCacheHelper(TreeMap<HStoreKey, byte []> inputCache, + void flushCacheHelper(SortedMap<HStoreKey, byte []> inputCache, long logCacheFlushId, boolean addToAvailableMaps) throws IOException { synchronized(flushLock) { @@ -1123,7 +1124,7 @@ * @param key * @param numVersions Number of versions to fetch. Must be > 0. * @param memcache Checked for deletions - * @return + * @return values for the specified versions * @throws IOException */ byte [][] get(HStoreKey key, int numVersions, final HMemcache memcache) @@ -1171,10 +1172,11 @@ break; } } - while ((readval = new ImmutableBytesWritable()) != null && + for (readval = new ImmutableBytesWritable(); map.next(readkey, readval) && readkey.matchesRowCol(key) && - !hasEnoughVersions(numVersions, results)) { + !hasEnoughVersions(numVersions, results); + readval = new ImmutableBytesWritable()) { if (!isDeleted(readkey, readval.get(), memcache, deletes)) { results.add(readval.get()); } @@ -1212,10 +1214,11 @@ * @throws IOException */ List<HStoreKey> getKeys(final HStoreKey origin, List<HStoreKey> allKeys, - final int versions) - throws IOException { - if (allKeys == null) { - allKeys = new ArrayList<HStoreKey>(); + final int versions) throws IOException { + + List<HStoreKey> keys = allKeys; + if (keys == null) { + keys = new ArrayList<HStoreKey>(); } // This code below is very close to the body of the get method. this.lock.obtainReadLock(); @@ -1238,23 +1241,24 @@ continue; } if (!isDeleted(readkey, readval.get(), null, null) && - !allKeys.contains(readkey)) { - allKeys.add(new HStoreKey(readkey)); + !keys.contains(readkey)) { + keys.add(new HStoreKey(readkey)); } - while ((readval = new ImmutableBytesWritable()) != null && + for (readval = new ImmutableBytesWritable(); map.next(readkey, readval) && - readkey.matchesRowCol(origin)) { + readkey.matchesRowCol(origin); + readval = new ImmutableBytesWritable()) { if (!isDeleted(readkey, readval.get(), null, null) && - !allKeys.contains(readkey)) { - allKeys.add(new HStoreKey(readkey)); - if (versions != ALL_VERSIONS && allKeys.size() >= versions) { + !keys.contains(readkey)) { + keys.add(new HStoreKey(readkey)); + if (versions != ALL_VERSIONS && keys.size() >= versions) { break; } } } } } - return allKeys; + return keys; } finally { this.lock.releaseReadLock(); } Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java?rev=575986&r1=575985&r2=575986&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java Sat Sep 15 14:27:10 2007 @@ -50,7 +50,8 @@ private FileSystem fs; private Path parentdir; private MasterThread masterThread = null; - ArrayList<RegionServerThread> regionThreads; + ArrayList<RegionServerThread> regionThreads = + new ArrayList<RegionServerThread>(); private boolean deleteOnExit = true; /** @@ -125,7 +126,7 @@ this.parentdir = new Path(conf.get(HBASE_DIR, DEFAULT_HBASE_DIR)); fs.mkdirs(parentdir); this.masterThread = startMaster(this.conf); - this.regionThreads = startRegionServers(this.conf, nRegionNodes); + this.regionThreads.addAll(startRegionServers(this.conf, nRegionNodes)); } catch(IOException e) { shutdown(); throw e; @@ -357,17 +358,15 @@ if(masterThread != null) { masterThread.getMaster().shutdown(); } - if (regionServerThreads != null) { - synchronized(regionServerThreads) { - if (regionServerThreads != null) { - for(Thread t: regionServerThreads) { - if (t.isAlive()) { - try { - t.join(); - } catch (InterruptedException e) { - // continue - } - } + // regionServerThreads can never be null because they are initialized when + // the class is constructed. + synchronized(regionServerThreads) { + for(Thread t: regionServerThreads) { + if (t.isAlive()) { + try { + t.join(); + } catch (InterruptedException e) { + // continue } } } @@ -381,8 +380,7 @@ } LOG.info("Shutdown " + ((masterThread != null)? masterThread.getName(): "0 masters") + " " + - ((regionServerThreads == null)? 0: regionServerThreads.size()) + - " region server(s)"); + regionServerThreads.size() + " region server(s)"); } void shutdown() { Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHMemcache.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHMemcache.java?rev=575986&r1=575985&r2=575986&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHMemcache.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHMemcache.java Sat Sep 15 14:27:10 2007 @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.Map; +import java.util.SortedMap; import java.util.TreeMap; import junit.framework.TestCase; @@ -97,7 +98,7 @@ // Save off old state. int oldHistorySize = hmc.history.size(); - TreeMap<HStoreKey, byte []> oldMemcache = hmc.memcache; + SortedMap<HStoreKey, byte []> oldMemcache = hmc.memcache; // Run snapshot. Snapshot s = hmc.snapshotMemcacheForLog(log); // Make some assertions about what just happened.