Author: jimk Date: Sat Dec 1 20:58:03 2007 New Revision: 600240 URL: http://svn.apache.org/viewvc?rev=600240&view=rev Log: HADOOP-2309 ConcurrentModificationException doing get of all region start keys
Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConnectionManager.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexConfiguration.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/onelab/filter/Filter.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/onelab/filter/RetouchedBloomFilter.java lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?rev=600240&r1=600239&r2=600240&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original) +++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Sat Dec 1 20:58:03 2007 @@ -47,6 +47,7 @@ (Edward Yoon via Stack) HADOOP-2320 Committed TestGet2 is managled (breaks build). HADOOP-2322 getRow(row, TS) client interface not properly connected + HADOOP-2309 ConcurrentModificationException doing get of all region start keys IMPROVEMENTS HADOOP-2401 Add convenience put method that takes writable Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConnectionManager.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConnectionManager.java?rev=600240&r1=600239&r2=600240&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConnectionManager.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConnectionManager.java Sat Dec 1 20:58:03 2007 @@ -28,6 +28,8 @@ import java.util.SortedMap; import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -113,10 +115,10 @@ private Set<Text> closedTables; // Set of tables currently being located - private HashSet<Text> tablesBeingLocated; + private Set<Text> tablesBeingLocated; // Known region HServerAddress.toString() -> HRegionInterface - private HashMap<String, HRegionInterface> servers; + private Map<String, HRegionInterface> servers; /** * constructor @@ -145,13 +147,14 @@ this.master = null; this.masterChecked = false; - this.tablesToServers = Collections.synchronizedMap( - new HashMap<Text, SortedMap<Text, HRegionLocation>>()); + this.tablesToServers = + new ConcurrentHashMap<Text, SortedMap<Text, HRegionLocation>>(); this.closedTables = Collections.synchronizedSet(new HashSet<Text>()); - this.tablesBeingLocated = new HashSet<Text>(); + this.tablesBeingLocated = Collections.synchronizedSet( + new HashSet<Text>()); - this.servers = new HashMap<String, HRegionInterface>(); + this.servers = new ConcurrentHashMap<String, HRegionInterface>(); } /** [EMAIL PROTECTED] */ Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java?rev=600240&r1=600239&r2=600240&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java Sat Dec 1 20:58:03 2007 @@ -21,7 +21,6 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -29,8 +28,8 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.TreeSet; -import java.util.Vector; import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -142,13 +141,13 @@ LOG.info("starting merge of regions: " + a.getRegionName() + " and " + b.getRegionName() + " into new region " + newRegionInfo.toString()); - Map<Text, Vector<HStoreFile>> byFamily = - new TreeMap<Text, Vector<HStoreFile>>(); + Map<Text, List<HStoreFile>> byFamily = + new TreeMap<Text, List<HStoreFile>>(); byFamily = filesByFamily(byFamily, a.close()); byFamily = filesByFamily(byFamily, b.close()); - for (Map.Entry<Text, Vector<HStoreFile>> es : byFamily.entrySet()) { + for (Map.Entry<Text, List<HStoreFile>> es : byFamily.entrySet()) { Text colFamily = es.getKey(); - Vector<HStoreFile> srcFiles = es.getValue(); + List<HStoreFile> srcFiles = es.getValue(); HStoreFile dst = new HStoreFile(conf, merges, HRegionInfo.encodeRegionName(newRegionInfo.getRegionName()), colFamily, Math.abs(rand.nextLong())); @@ -175,12 +174,12 @@ * @param storeFiles Store files to process. * @return Returns <code>byFamily</code> */ - private static Map<Text, Vector<HStoreFile>> filesByFamily( - Map<Text, Vector<HStoreFile>> byFamily, Vector<HStoreFile> storeFiles) { + private static Map<Text, List<HStoreFile>> filesByFamily( + Map<Text, List<HStoreFile>> byFamily, List<HStoreFile> storeFiles) { for(HStoreFile src: storeFiles) { - Vector<HStoreFile> v = byFamily.get(src.getColFamily()); + List<HStoreFile> v = byFamily.get(src.getColFamily()); if(v == null) { - v = new Vector<HStoreFile>(); + v = new ArrayList<HStoreFile>(); byFamily.put(src.getColFamily(), v); } v.add(src); @@ -192,11 +191,11 @@ // Members ////////////////////////////////////////////////////////////////////////////// - volatile Map<Text, Long> rowsToLocks = new HashMap<Text, Long>(); - volatile Map<Long, Text> locksToRows = new HashMap<Long, Text>(); - volatile Map<Text, HStore> stores = new HashMap<Text, HStore>(); + volatile Map<Text, Long> rowsToLocks = new ConcurrentHashMap<Text, Long>(); + volatile Map<Long, Text> locksToRows = new ConcurrentHashMap<Long, Text>(); + volatile Map<Text, HStore> stores = new ConcurrentHashMap<Text, HStore>(); volatile Map<Long, TreeMap<HStoreKey, byte []>> targetColumns = - new HashMap<Long, TreeMap<HStoreKey, byte []>>(); + new ConcurrentHashMap<Long, TreeMap<HStoreKey, byte []>>(); final AtomicLong memcacheSize = new AtomicLong(0); @@ -359,7 +358,7 @@ * * @throws IOException */ - public Vector<HStoreFile> close() throws IOException { + public List<HStoreFile> close() throws IOException { return close(false); } @@ -377,7 +376,7 @@ * * @throws IOException */ - Vector<HStoreFile> close(boolean abort) throws IOException { + List<HStoreFile> close(boolean abort) throws IOException { if (isClosed()) { LOG.info("region " + this.regionInfo.getRegionName() + " already closed"); return null; @@ -421,7 +420,7 @@ internalFlushcache(snapshotMemcaches()); } - Vector<HStoreFile> result = new Vector<HStoreFile>(); + List<HStoreFile> result = new ArrayList<HStoreFile>(); for (HStore store: stores.values()) { result.addAll(store.close()); } @@ -571,7 +570,7 @@ // Now close the HRegion. Close returns all store files or null if not // supposed to close (? What to do in this case? Implement abort of close?) // Close also does wait on outstanding rows and calls a flush just-in-case. - Vector<HStoreFile> hstoreFilesToSplit = close(); + List<HStoreFile> hstoreFilesToSplit = close(); if (hstoreFilesToSplit == null) { LOG.warn("Close came back null (Implement abort of close?)"); throw new RuntimeException("close returned empty vector of HStoreFiles"); @@ -909,6 +908,7 @@ // A. Flush memcache to all the HStores. // Keep running vector of all store files that includes both old and the // just-made new flush store file. + for (HStore hstore: stores.values()) { hstore.flushCache(sequenceId); } Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java?rev=600240&r1=600239&r2=600240&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java Sat Dec 1 20:58:03 2007 @@ -99,9 +99,11 @@ void snapshot() { this.lock.writeLock().lock(); try { - if (memcache.size() != 0) { - snapshot.putAll(memcache); - memcache.clear(); + synchronized (memcache) { + if (memcache.size() != 0) { + snapshot.putAll(memcache); + memcache.clear(); + } } } finally { this.lock.writeLock().unlock(); @@ -149,9 +151,14 @@ List<byte[]> get(final HStoreKey key, final int numVersions) { this.lock.readLock().lock(); try { - ArrayList<byte []> results = internalGet(memcache, key, numVersions); - results.addAll(results.size(), + List<byte []> results; + synchronized (memcache) { + results = internalGet(memcache, key, numVersions); + } + synchronized (snapshot) { + results.addAll(results.size(), internalGet(snapshot, key, numVersions - results.size())); + } return results; } finally { @@ -170,8 +177,12 @@ void getFull(HStoreKey key, SortedMap<Text, byte[]> results) { this.lock.readLock().lock(); try { - internalGetFull(memcache, key, results); - internalGetFull(snapshot, key, results); + synchronized (memcache) { + internalGetFull(memcache, key, results); + } + synchronized (snapshot) { + internalGetFull(snapshot, key, results); + } } finally { this.lock.readLock().unlock(); @@ -248,11 +259,15 @@ List<HStoreKey> getKeys(final HStoreKey origin, final int versions) { this.lock.readLock().lock(); try { - List<HStoreKey> results = - internalGetKeys(this.memcache, origin, versions); - results.addAll(results.size(), internalGetKeys(snapshot, origin, - versions == HConstants.ALL_VERSIONS ? versions : - (versions - results.size()))); + List<HStoreKey> results; + synchronized (memcache) { + results = internalGetKeys(this.memcache, origin, versions); + } + synchronized (snapshot) { + results.addAll(results.size(), internalGetKeys(snapshot, origin, + versions == HConstants.ALL_VERSIONS ? versions : + (versions - results.size()))); + } return results; } finally { Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java?rev=600240&r1=600239&r2=600240&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java Sat Dec 1 20:58:03 2007 @@ -104,10 +104,12 @@ * @return Region server added. */ public RegionServerThread addRegionServer() throws IOException { - RegionServerThread t = new RegionServerThread(new HRegionServer(conf), - this.regionThreads.size()); - this.regionThreads.add(t); - return t; + synchronized (regionThreads) { + RegionServerThread t = new RegionServerThread(new HRegionServer(conf), + this.regionThreads.size()); + this.regionThreads.add(t); + return t; + } } /** runs region servers */ @@ -146,8 +148,10 @@ * @return Name of region server that just went down. */ public String waitOnRegionServer(int serverNumber) { - RegionServerThread regionServerThread = - this.regionThreads.remove(serverNumber); + RegionServerThread regionServerThread; + synchronized (regionThreads) { + regionServerThread = this.regionThreads.remove(serverNumber); + } while (regionServerThread.isAlive()) { try { LOG.info("Waiting on " + @@ -193,8 +197,10 @@ */ public String startup() { this.master.start(); - for (RegionServerThread t: this.regionThreads) { - t.start(); + synchronized (regionThreads) { + for (RegionServerThread t: this.regionThreads) { + t.start(); + } } return this.master.getMasterAddress().toString(); } Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexConfiguration.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexConfiguration.java?rev=600240&r1=600239&r2=600240&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexConfiguration.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexConfiguration.java Sat Dec 1 20:58:03 2007 @@ -23,7 +23,7 @@ import java.io.IOException; import java.io.OutputStream; import java.io.StringWriter; -import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.Iterator; import java.util.Map; import java.util.Properties; @@ -105,7 +105,8 @@ } } - private HashMap<String, ColumnConf> columnMap = new HashMap<String, ColumnConf>(); + private Map<String, ColumnConf> columnMap = + new ConcurrentHashMap<String, ColumnConf>(); public Iterator<String> columnNameIterator() { return columnMap.keySet().iterator(); Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/onelab/filter/Filter.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/onelab/filter/Filter.java?rev=600240&r1=600239&r2=600240&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/onelab/filter/Filter.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/onelab/filter/Filter.java Sat Dec 1 20:58:03 2007 @@ -52,8 +52,8 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.util.ArrayList; import java.util.Collection; +import java.util.List; import org.apache.hadoop.io.WritableComparable; /** @@ -146,7 +146,7 @@ * Adds a list of keys to <i>this</i> filter. * @param keys The list of keys. */ - public void add(ArrayList<Key> keys){ + public void add(List<Key> keys){ if(keys == null) { throw new IllegalArgumentException("ArrayList<Key> may not be null"); } Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/onelab/filter/RetouchedBloomFilter.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/onelab/filter/RetouchedBloomFilter.java?rev=600240&r1=600239&r2=600240&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/onelab/filter/RetouchedBloomFilter.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/onelab/filter/RetouchedBloomFilter.java Sat Dec 1 20:58:03 2007 @@ -52,6 +52,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.List; import java.util.Random; /** @@ -76,12 +78,12 @@ /** * KeyList vector (or ElementList Vector, as defined in the paper) of false positives. */ - ArrayList<Key>[] fpVector; + List<Key>[] fpVector; /** * KeyList vector of keys recorded in the filter. */ - ArrayList<Key>[] keyVector; + List<Key>[] keyVector; /** * Ratio vector. @@ -158,7 +160,7 @@ * Adds a list of false positive information to <i>this</i> retouched Bloom filter. * @param keys The list of false positive. */ - public void addFalsePositive(ArrayList<Key> keys){ + public void addFalsePositive(List<Key> keys){ if(keys == null) { throw new NullPointerException("ArrayList<Key> can not be null"); } @@ -306,8 +308,8 @@ throw new ArrayIndexOutOfBoundsException(index); } - ArrayList<Key> kl = keyVector[index]; - ArrayList<Key> fpl = fpVector[index]; + List<Key> kl = keyVector[index]; + List<Key> fpl = fpVector[index]; // update key list int listSize = kl.size(); @@ -339,7 +341,7 @@ * @param k The key to remove. * @param vector The counting vector associated to the key. */ - private void removeKey(Key k, ArrayList<Key>[] vector) { + private void removeKey(Key k, List<Key>[] vector) { if(k == null) { throw new NullPointerException("Key can not be null"); } @@ -369,7 +371,7 @@ }//end for - i }//end computeRatio() - private double getWeight(ArrayList<Key> keyList) { + private double getWeight(List<Key> keyList) { double weight = 0.0; for(Key k: keyList) { weight += k.getWeight(); @@ -382,13 +384,13 @@ */ @SuppressWarnings("unchecked") private void createVector() { - fpVector = new ArrayList[vectorSize]; - keyVector = new ArrayList[vectorSize]; + fpVector = new List[vectorSize]; + keyVector = new List[vectorSize]; ratio = new double[vectorSize]; for(int i = 0; i < vectorSize; i++) { - fpVector[i] = new ArrayList<Key>(); - keyVector[i] = new ArrayList<Key>(); + fpVector[i] = Collections.synchronizedList(new ArrayList<Key>()); + keyVector[i] = Collections.synchronizedList(new ArrayList<Key>()); ratio[i] = 0.0; }//end for -i }//end createVector() @@ -422,14 +424,14 @@ public void write(DataOutput out) throws IOException { super.write(out); for(int i = 0; i < fpVector.length; i++) { - ArrayList<Key> list = fpVector[i]; + List<Key> list = fpVector[i]; out.writeInt(list.size()); for(Key k: list) { k.write(out); } } for(int i = 0; i < keyVector.length; i++) { - ArrayList<Key> list = keyVector[i]; + List<Key> list = keyVector[i]; out.writeInt(list.size()); for(Key k: list) { k.write(out); @@ -446,7 +448,7 @@ super.readFields(in); createVector(); for(int i = 0; i < fpVector.length; i++) { - ArrayList<Key> list = fpVector[i]; + List<Key> list = fpVector[i]; int size = in.readInt(); for(int j = 0; j < size; j++) { Key k = new Key(); @@ -455,7 +457,7 @@ } } for(int i = 0; i < keyVector.length; i++) { - ArrayList<Key> list = keyVector[i]; + List<Key> list = keyVector[i]; int size = in.readInt(); for(int j = 0; j < size; j++) { Key k = new Key(); @@ -478,8 +480,8 @@ RetouchedBloomFilter other = (RetouchedBloomFilter)o; for(int i = 0; result == 0 && i < fpVector.length; i++) { - ArrayList<Key> mylist = fpVector[i]; - ArrayList<Key> otherlist = other.fpVector[i]; + List<Key> mylist = fpVector[i]; + List<Key> otherlist = other.fpVector[i]; for(int j = 0; result == 0 && j < mylist.size(); j++) { result = mylist.get(j).compareTo(otherlist.get(j)); @@ -487,8 +489,8 @@ } for(int i = 0; result == 0 && i < keyVector.length; i++) { - ArrayList<Key> mylist = keyVector[i]; - ArrayList<Key> otherlist = other.keyVector[i]; + List<Key> mylist = keyVector[i]; + List<Key> otherlist = other.keyVector[i]; for(int j = 0; result == 0 && j < mylist.size(); j++) { result = mylist.get(j).compareTo(otherlist.get(j)); Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java?rev=600240&r1=600239&r2=600240&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java Sat Dec 1 20:58:03 2007 @@ -372,27 +372,63 @@ TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>(); while(scanner.next(key, results)) { + if (LOG.isDebugEnabled()) { + if (results.size() > 2 ) { + LOG.debug("Too many results, expected 2 got " + results.size()); + } + } byte[] firstValue = null; byte[] secondValue = null; int count = 0; - for(Map.Entry<Text, byte[]> e: results.entrySet()) { if (count == 0) firstValue = e.getValue(); if (count == 1) secondValue = e.getValue(); count++; + if (count == 2) { + break; + } } - // verify second value is the reverse of the first - assertNotNull(firstValue); - assertNotNull(secondValue); - assertEquals(firstValue.length, secondValue.length); - byte[] secondReversed = new byte[secondValue.length]; - for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) { - secondReversed[i] = secondValue[j]; + String first = ""; + if (firstValue == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("row=" + key.getRow() + ": first value is null"); + } + fail(); + + } else { + first = new String(firstValue, HConstants.UTF8_ENCODING); + if (LOG.isDebugEnabled()) { + LOG.debug("row=" + key.getRow() + ": first value=" + first); + } + } + + String second = ""; + if (secondValue == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("row=" + key.getRow() + ": second value is null"); + } + fail(); + + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("row=" + key.getRow() + ": second value=" + + new String(secondValue, HConstants.UTF8_ENCODING)); + } + byte[] secondReversed = new byte[secondValue.length]; + for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) { + secondReversed[i] = secondValue[j]; + } + second = new String(secondReversed, HConstants.UTF8_ENCODING); + } + if (first.compareTo(second) != 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("second key is not the reverse of first"); + } + fail(); } - assertTrue(Arrays.equals(firstValue, secondReversed)); } } finally {