Author: stack
Date: Wed May 25 04:18:12 2011
New Revision: 1127374
URL: http://svn.apache.org/viewvc?rev=1127374&view=rev
Log:
HBASE-3894 Thread contention over row locks set monitor
Added:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java
Modified: hbase/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1127374&r1=1127373&r2=1127374&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Wed May 25 04:18:12 2011
@@ -112,6 +112,7 @@ Release 0.91.0 - Unreleased
HBASE-3912 [Stargate] Columns not handle by Scan
HBASE-3903 A successful write to client write-buffer may be lost or not
visible (Doug Meil)
+ HBASE-3894 Thread contention over row locks set monitor (Dave Latham)
IMPROVEMENTS
HBASE-3290 Max Compaction Size (Nicolas Spiegelberg via Stack)
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1127374&r1=1127373&r2=1127374&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
Wed May 25 04:18:12 2011
@@ -1,5 +1,5 @@
/*
- * Copyright 2010 The Apache Software Foundation
+ * Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -38,11 +38,12 @@ import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.Random;
-import java.util.Set;
import java.util.TreeMap;
-import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -89,6 +90,7 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.HashedBytes;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.ClassSize;
@@ -159,11 +161,11 @@ public class HRegion implements HeapSize
// Members
//////////////////////////////////////////////////////////////////////////////
- private final Set<byte[]> lockedRows =
- new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
- private final Map<Integer, byte []> lockIds =
- new HashMap<Integer, byte []>();
- private int lockIdGenerator = 1;
+ private final ConcurrentHashMap<HashedBytes, CountDownLatch> lockedRows =
+ new ConcurrentHashMap<HashedBytes, CountDownLatch>();
+ private final ConcurrentHashMap<Integer, HashedBytes> lockIds =
+ new ConcurrentHashMap<Integer, HashedBytes>();
+ private final AtomicInteger lockIdGenerator = new AtomicInteger(1);
static private Random rand = new Random();
protected final Map<byte [], Store> stores =
@@ -197,8 +199,6 @@ public class HRegion implements HeapSize
final Path regiondir;
KeyValue.KVComparator comparator;
- private Pair<Long,Long> lastCompactInfo = null;
-
/*
* Data structure of write state flags used coordinating flushes,
* compactions and closes.
@@ -232,6 +232,9 @@ public class HRegion implements HeapSize
boolean isFlushRequested() {
return this.flushRequested;
}
+
+ static final long HEAP_SIZE = ClassSize.align(
+ ClassSize.OBJECT + 5 * Bytes.SIZEOF_BOOLEAN);
}
final WriteState writestate = new WriteState();
@@ -2365,41 +2368,41 @@ public class HRegion implements HeapSize
* null if unavailable.
*/
private Integer internalObtainRowLock(final byte[] row, boolean waitForLock)
- throws IOException {
+ throws IOException {
checkRow(row);
startRegionOperation();
try {
- synchronized (lockedRows) {
- while (lockedRows.contains(row)) {
+ HashedBytes rowKey = new HashedBytes(row);
+ CountDownLatch rowLatch = new CountDownLatch(1);
+
+ // loop until we acquire the row lock (unless !waitForLock)
+ while (true) {
+ CountDownLatch existingLatch = lockedRows.putIfAbsent(rowKey,
rowLatch);
+ if (existingLatch == null) {
+ break;
+ } else {
+ // row already locked
if (!waitForLock) {
return null;
}
try {
- lockedRows.wait();
+ existingLatch.await();
} catch (InterruptedException ie) {
// Empty
}
}
- // generate a new lockid. Attempt to insert the new [lockid, row].
- // if this lockid already exists in the map then revert and retry
- // We could have first done a lockIds.get, and if it does not exist
only
- // then do a lockIds.put, but the hope is that the lockIds.put will
- // mostly return null the first time itself because there won't be
- // too many lockId collisions.
- byte [] prev = null;
- Integer lockId = null;
- do {
- lockId = new Integer(lockIdGenerator++);
- prev = lockIds.put(lockId, row);
- if (prev != null) {
- lockIds.put(lockId, prev); // revert old value
- lockIdGenerator = rand.nextInt(); // generate new start point
- }
- } while (prev != null);
-
- lockedRows.add(row);
- lockedRows.notifyAll();
- return lockId;
+ }
+
+ // loop until we generate an unused lock id
+ while (true) {
+ Integer lockId = lockIdGenerator.incrementAndGet();
+ HashedBytes existingRowKey = lockIds.putIfAbsent(lockId, rowKey);
+ if (existingRowKey == null) {
+ return lockId;
+ } else {
+ // lockId already in use, jump generator to a new spot
+ lockIdGenerator.set(rand.nextInt());
+ }
}
} finally {
closeRegionOperation();
@@ -2411,22 +2414,28 @@ public class HRegion implements HeapSize
* @param lockid
* @return Row that goes with <code>lockid</code>
*/
- byte [] getRowFromLock(final Integer lockid) {
- synchronized (lockedRows) {
- return lockIds.get(lockid);
- }
+ byte[] getRowFromLock(final Integer lockid) {
+ HashedBytes rowKey = lockIds.get(lockid);
+ return rowKey == null ? null : rowKey.getBytes();
}
-
+
/**
* Release the row lock!
* @param lockid The lock ID to release.
*/
- public void releaseRowLock(final Integer lockid) {
- synchronized (lockedRows) {
- byte[] row = lockIds.remove(lockid);
- lockedRows.remove(row);
- lockedRows.notifyAll();
+ public void releaseRowLock(final Integer lockId) {
+ HashedBytes rowKey = lockIds.remove(lockId);
+ if (rowKey == null) {
+ LOG.warn("Release unknown lockId: " + lockId);
+ return;
}
+ CountDownLatch rowLatch = lockedRows.remove(rowKey);
+ if (rowLatch == null) {
+ LOG.error("Releases row not locked, lockId: " + lockId + " row: "
+ + rowKey);
+ return;
+ }
+ rowLatch.countDown();
}
/**
@@ -2434,13 +2443,8 @@ public class HRegion implements HeapSize
* @param lockid
* @return boolean
*/
- boolean isRowLocked(final Integer lockid) {
- synchronized (lockedRows) {
- if (lockIds.get(lockid) != null) {
- return true;
- }
- return false;
- }
+ boolean isRowLocked(final Integer lockId) {
+ return lockIds.containsKey(lockId);
}
/**
@@ -2567,7 +2571,8 @@ public class HRegion implements HeapSize
}
}
- public synchronized boolean next(List<KeyValue> outResults, int limit)
+ @Override
+ public synchronized boolean next(List<KeyValue> outResults, int limit)
throws IOException {
if (this.filterClosed) {
throw new UnknownScannerException("Scanner was closed (timed out?) " +
@@ -2596,7 +2601,8 @@ public class HRegion implements HeapSize
}
}
- public synchronized boolean next(List<KeyValue> outResults)
+ @Override
+ public synchronized boolean next(List<KeyValue> outResults)
throws IOException {
// apply the batching limit by default
return next(outResults, batch);
@@ -2692,7 +2698,8 @@ public class HRegion implements HeapSize
currentRow, 0, currentRow.length) <= isScan);
}
- public synchronized void close() {
+ @Override
+ public synchronized void close() {
if (storeHeap != null) {
storeHeap.close();
storeHeap = null;
@@ -3493,30 +3500,32 @@ public class HRegion implements HeapSize
}
public static final long FIXED_OVERHEAD = ClassSize.align(
- (4 * Bytes.SIZEOF_LONG) + ClassSize.ARRAY +
- ClassSize.align(26 * ClassSize.REFERENCE) + ClassSize.OBJECT +
- ClassSize.align(Bytes.SIZEOF_INT));
-
- public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
- (ClassSize.OBJECT * 2) + (2 * ClassSize.ATOMIC_BOOLEAN) +
- ClassSize.ATOMIC_LONG + ClassSize.ATOMIC_INTEGER +
-
- // Using TreeMap for TreeSet
- ClassSize.TREEMAP +
-
- // Using TreeMap for HashMap
- ClassSize.TREEMAP +
-
- ClassSize.CONCURRENT_SKIPLISTMAP +
ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY +
- ClassSize.align(ClassSize.OBJECT +
- (5 * Bytes.SIZEOF_BOOLEAN)) +
- (3 * ClassSize.REENTRANT_LOCK));
+ ClassSize.OBJECT + // this
+ (4 * Bytes.SIZEOF_LONG) + // memstoreFlushSize, lastFlushTime,
blockingMemStoreSize, threadWakeFrequency
+ Bytes.SIZEOF_BOOLEAN + // splitRequest
+ ClassSize.ARRAY + // splitPoint
+ (26 * ClassSize.REFERENCE));
+
+ public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
+ ClassSize.OBJECT + // closeLock
+ (2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing
+ ClassSize.ATOMIC_LONG + // memStoreSize
+ ClassSize.ATOMIC_INTEGER + // lockIdGenerator
+ (2 * ClassSize.CONCURRENT_HASHMAP) + // lockedRows, lockIds
+ WriteState.HEAP_SIZE + // writestate
+ ClassSize.CONCURRENT_SKIPLISTMAP +
ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores
+ (2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock
+ ClassSize.ARRAYLIST + // recentFlushes
+ ReadWriteConsistencyControl.FIXED_SIZE // rwcc
+ ;
+ @Override
public long heapSize() {
long heapSize = DEEP_OVERHEAD;
for(Store store : this.stores.values()) {
heapSize += store.heapSize();
}
+ // this does not take into account row locks, recent flushes, rwcc entries
return heapSize;
}
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java?rev=1127374&r1=1127373&r2=1127374&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java
Wed May 25 04:18:12 2011
@@ -20,7 +20,9 @@
package org.apache.hadoop.hbase.regionserver;
import java.util.LinkedList;
-import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
/**
* Manages the read/write consistency within memstore. This provides
@@ -158,4 +160,10 @@ public class ReadWriteConsistencyControl
return this.writeNumber;
}
}
+
+ public static final long FIXED_SIZE = ClassSize.align(
+ ClassSize.OBJECT +
+ 2 * Bytes.SIZEOF_LONG +
+ 2 * ClassSize.REFERENCE);
+
}
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java?rev=1127374&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java
(added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java Wed
May 25 04:18:12 2011
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.util.Arrays;
+
+/**
+ * This class encapsulates a byte array and overrides hashCode and equals so
+ * that it's identity is based on the data rather than the array instance.
+ */
+public class HashedBytes {
+
+ private final byte[] bytes;
+ private final int hashCode;
+
+ public HashedBytes(byte[] bytes) {
+ this.bytes = bytes;
+ hashCode = Bytes.hashCode(bytes);
+ }
+
+ public byte[] getBytes() {
+ return bytes;
+ }
+
+ @Override
+ public int hashCode() {
+ return hashCode;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null || getClass() != obj.getClass())
+ return false;
+ HashedBytes other = (HashedBytes) obj;
+ return Arrays.equals(bytes, other.bytes);
+ }
+
+ @Override
+ public String toString() {
+ return Bytes.toStringBinary(bytes);
+ }
+}
\ No newline at end of file