This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch 5.2
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.2 by this push:
new 8848c3052c PHOENIX-7326 Simplify LockManager and make it more
efficient (#1902)
8848c3052c is described below
commit 8848c3052c27c6926e0183bdfa3d77f5cdcc74e8
Author: Kadir Ozdemir <[email protected]>
AuthorDate: Sun Jun 9 17:12:49 2024 +0300
PHOENIX-7326 Simplify LockManager and make it more efficient (#1902)
---
.../apache/phoenix/hbase/index/LockManager.java | 212 ++++++++-------------
.../end2end/ConcurrentMutationsExtendedIT.java | 3 +
2 files changed, 81 insertions(+), 134 deletions(-)
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/LockManager.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/LockManager.java
index cb5fd22518..ec189d3d0e 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/LockManager.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/LockManager.java
@@ -21,224 +21,168 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
- * Class, copied for the most part from HRegion.getRowLockInternal
implementation
- * that manages reentrant row locks based on the row key. Phoenix needs to
manage
- * it's own locking due to secondary indexes needing a consistent snapshot from
+ * Manages reentrant row locks based on row keys. Phoenix needs to manage
+ * its own locking due to secondary indexes needing a consistent snapshot from
* the time the mvcc is acquired until the time it is advanced (PHOENIX-4053).
*
*/
public class LockManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(LockManager.class);
- private final ConcurrentHashMap<ImmutableBytesPtr, RowLockContext>
lockedRows =
- new ConcurrentHashMap<ImmutableBytesPtr, RowLockContext>();
+ private final ConcurrentHashMap<ImmutableBytesPtr, RowLockImpl> lockedRows
=
+ new ConcurrentHashMap<>();
public LockManager () {
}
/**
* Lock the row or throw otherwise
- * @param rowKey the row key
- * @return RowLock used to eventually release the lock
+ * @param rowKey
+ * @param waitDurationMs
+ * @return RowLock used to eventually release the lock
* @throws TimeoutIOException if the lock could not be acquired within the
* allowed rowLockWaitDuration and InterruptedException if interrupted
while
* waiting to acquire lock.
*/
- public RowLock lockRow(ImmutableBytesPtr rowKey, int waitDuration) throws
IOException {
- RowLockContext rowLockContext = null;
- RowLockImpl result = null;
+ public RowLock lockRow(ImmutableBytesPtr rowKey, long waitDurationMs)
throws IOException {
+ RowLockImpl rowLock = new RowLockImpl(rowKey);
TraceScope traceScope = null;
// If we're tracing start a span to show how long this took.
if (Trace.isTracing()) {
- traceScope = Trace.startSpan("LockManager.getRowLock");
- traceScope.getSpan().addTimelineAnnotation("Getting a lock");
+ traceScope = Trace.startSpan("LockManager.lockRow");
+ traceScope.getSpan().addTimelineAnnotation("Getting a row lock");
}
-
boolean success = false;
try {
- // Keep trying until we have a lock or error out.
- // TODO: do we need to add a time component here?
- while (result == null) {
-
- // Try adding a RowLockContext to the lockedRows.
- // If we can add it then there's no other transactions
currently running.
- rowLockContext = new RowLockContext(rowKey);
- RowLockContext existingContext =
lockedRows.putIfAbsent(rowKey, rowLockContext);
-
- // if there was a running transaction then there's already a
context.
- if (existingContext != null) {
- rowLockContext = existingContext;
+ while (true) {
+ RowLockImpl existingRowLock = lockedRows.putIfAbsent(rowKey,
rowLock);
+ if (existingRowLock == null) {
+ // The row was not locked
+ success = true;
+ return rowLock;
}
-
- result = rowLockContext.newRowLock();
- }
- if (!result.getLock().tryLock(waitDuration,
TimeUnit.MILLISECONDS)) {
- if (traceScope != null) {
- traceScope.getSpan().addTimelineAnnotation("Failed to get
row lock");
+ // The row is already locked by a different thread. Wait for
the lock to be released
+ // for waitDurationMs time
+ long startTime = EnvironmentEdgeManager.currentTimeMillis();
+ RowLockImpl usableRowLock =
existingRowLock.lock(waitDurationMs);
+ if (usableRowLock != null) {
+ success = true;
+ return usableRowLock;
}
- throw new TimeoutIOException("Timed out waiting for lock for
row: " + rowKey);
+ // The existing lock was released and removed from the hash
map before the current
+ // thread attempt to lock
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ long timePassed = now - startTime;
+ if (timePassed > waitDurationMs) {
+ throw new TimeoutIOException("Timed out waiting for lock
for row: " + rowKey);
+ }
+ waitDurationMs -= timePassed;
}
- rowLockContext.setThreadName(Thread.currentThread().getName());
- success = true;
- return result;
} catch (InterruptedException ie) {
LOGGER.warn("Thread interrupted waiting for lock on row: " +
rowKey);
InterruptedIOException iie = new InterruptedIOException();
iie.initCause(ie);
- if (traceScope != null) {
- traceScope.getSpan().addTimelineAnnotation("Interrupted
exception getting row lock");
- }
Thread.currentThread().interrupt();
throw iie;
} finally {
- // On failure, clean up the counts just in case this was the thing
keeping the context alive.
- if (!success && rowLockContext != null) rowLockContext.cleanUp();
if (traceScope != null) {
+ if (!success) {
+ traceScope.getSpan().addTimelineAnnotation("Failed to get
row lock");
+ }
traceScope.close();
}
}
}
- public RowLock lockRow(byte[] row, int waitDuration) throws IOException {
- // create an object to use a a key in the row lock map
+ public RowLock lockRow(byte[] row, long waitDurationMs) throws IOException
{
ImmutableBytesPtr rowKey = new ImmutableBytesPtr(row);
- return lockRow(rowKey, waitDuration);
+ return lockRow(rowKey, waitDurationMs);
}
/**
- * Unlock the row. We need this stateless way of unlocking because
- * we have no means of passing the RowLock instances between
- * coprocessor calls (see HBASE-18482). Once we have that, we
- * can have the caller collect RowLock instances and free when
- * needed.
- * @param row the row key
- * @throws IOException
+ * Class used to represent a lock on a row.
*/
- public void unlockRow(byte[] row) throws IOException {
- ImmutableBytesPtr rowKey = new ImmutableBytesPtr(row);
- RowLockContext lockContext = lockedRows.get(rowKey);
- if (lockContext != null) {
- lockContext.releaseRowLock();
- }
- }
-
- class RowLockContext {
+ public class RowLockImpl implements RowLock {
private final ImmutableBytesPtr rowKey;
- // TODO: consider making this non atomic. It's only saving one
- // synchronization in the case of cleanup() when more than one
- // thread is holding on to the lock.
- private final AtomicInteger count = new AtomicInteger(0);
- private final ReentrantLock reentrantLock = new ReentrantLock(true);
- // TODO: remove once we can pass List<RowLock> as needed through
- // coprocessor calls.
- private volatile RowLockImpl rowLock = RowLockImpl.UNINITIALIZED;
+ private int count = 1;
+ private boolean usable = true;
+ private final ReentrantLock lock = new ReentrantLock(true);
private String threadName;
- RowLockContext(ImmutableBytesPtr rowKey) {
+ private RowLockImpl(ImmutableBytesPtr rowKey) {
this.rowKey = rowKey;
+ lock.lock();
+ threadName = Thread.currentThread().getName();
}
- RowLockImpl newRowLock() {
- count.incrementAndGet();
+ RowLockImpl lock(long waitDuration) throws InterruptedException,
TimeoutIOException {
synchronized (this) {
- if (rowLock != null) {
- rowLock = new RowLockImpl(this, reentrantLock);
- return rowLock;
- } else {
+ if (!usable) {
return null;
}
+ count++;
}
- }
-
- void releaseRowLock() {
- synchronized (this) {
- if (rowLock != null) {
- rowLock.release();
+ boolean success = false;
+ threadName = Thread.currentThread().getName();
+ try {
+ if (!lock.tryLock(waitDuration, TimeUnit.MILLISECONDS)) {
+ throw new TimeoutIOException("Timed out waiting for lock
for row: " + rowKey);
}
- }
- }
-
- void cleanUp() {
- long c = count.decrementAndGet();
- if (c <= 0) {
- synchronized (this) {
- if (count.get() <= 0 && rowLock != null){
- rowLock = null;
- RowLockContext removed = lockedRows.remove(rowKey);
- assert removed == this: "we should never remove a
different context";
- }
+ success = true;
+ } finally {
+ if (!success) {
+ cleanUp();
+ return null;
}
}
+ return this;
}
- void setThreadName(String threadName) {
- this.threadName = threadName;
- }
-
- @Override
- public String toString() {
- return "RowLockContext{" +
- "row=" + rowKey +
- ", readWriteLock=" + reentrantLock +
- ", count=" + count +
- ", threadName=" + threadName +
- '}';
- }
- }
-
- /**
- * Class used to represent a lock on a row.
- */
- public static class RowLockImpl implements RowLock {
- static final RowLockImpl UNINITIALIZED = new RowLockImpl();
- private final RowLockContext context;
- private final Lock lock;
-
- private RowLockImpl() {
- context = null;
- lock = null;
- }
-
- RowLockImpl(RowLockContext context, Lock lock) {
- this.context = context;
- this.lock = lock;
- }
-
- Lock getLock() {
- return lock;
+ private void cleanUp() {
+ synchronized (this) {
+ count--;
+ if (count == 0) {
+ RowLockImpl removed = lockedRows.remove(rowKey);
+ assert removed == this : "We should never remove a
different lock";
+ usable = false;
+ } else {
+ assert count > 0 : "Reference count should never be less
than zero";
+ }
+ }
}
-
@Override
public void release() {
lock.unlock();
- context.cleanUp();
+ cleanUp();
}
@Override
public ImmutableBytesPtr getRowKey() {
- return context.rowKey;
+ return rowKey;
}
@Override
public String toString() {
return "RowLockImpl{" +
- "context=" + context +
+ "row=" + rowKey +
+ ", count=" + count +
+ ", threadName=" + threadName +
", lock=" + lock +
- '}';
+ ", usable=" + usable +
+ "}";
}
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
index 4bf5ffacc1..95639d284a 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
@@ -18,6 +18,8 @@
package org.apache.phoenix.end2end;
import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
+import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository;
+import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
@@ -107,6 +109,7 @@ public class ConcurrentMutationsExtendedIT extends
ParallelStatsDisabledIT {
IndexTool indexTool = IndexToolIT.runIndexTool(false, "", tableName,
indexName, null, 0, IndexTool.IndexVerifyType.ONLY);
System.out.println(indexTool.getJob().getCounters());
+ TestUtil.dumpTable(conn,
TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME));
assertEquals(0,
indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
assertEquals(0,
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
assertEquals(0,
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());