This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch 5.2
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/5.2 by this push:
     new 8848c3052c PHOENIX-7326 Simplify LockManager and make it more 
efficient (#1902)
8848c3052c is described below

commit 8848c3052c27c6926e0183bdfa3d77f5cdcc74e8
Author: Kadir Ozdemir <[email protected]>
AuthorDate: Sun Jun 9 17:12:49 2024 +0300

    PHOENIX-7326 Simplify LockManager and make it more efficient (#1902)
---
 .../apache/phoenix/hbase/index/LockManager.java    | 212 ++++++++-------------
 .../end2end/ConcurrentMutationsExtendedIT.java     |   3 +
 2 files changed, 81 insertions(+), 134 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/LockManager.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/LockManager.java
index cb5fd22518..ec189d3d0e 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/LockManager.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/LockManager.java
@@ -21,224 +21,168 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
 import org.apache.htrace.Trace;
 import org.apache.htrace.TraceScope;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * 
- * Class, copied for the most part from HRegion.getRowLockInternal 
implementation
- * that manages reentrant row locks based on the row key. Phoenix needs to 
manage
- * it's own locking due to secondary indexes needing a consistent snapshot from
+ * Manages reentrant row locks based on row keys. Phoenix needs to manage
+ * its own locking due to secondary indexes needing a consistent snapshot from
  * the time the mvcc is acquired until the time it is advanced (PHOENIX-4053).
  *
  */
 public class LockManager {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(LockManager.class);
 
-    private final ConcurrentHashMap<ImmutableBytesPtr, RowLockContext> 
lockedRows =
-            new ConcurrentHashMap<ImmutableBytesPtr, RowLockContext>();
+    private final ConcurrentHashMap<ImmutableBytesPtr, RowLockImpl> lockedRows 
=
+            new ConcurrentHashMap<>();
 
     public LockManager () {
     }
 
     /**
      * Lock the row or throw otherwise
-     * @param rowKey the row key
-     * @return RowLock used to eventually release the lock 
+     * @param rowKey
+     * @param waitDurationMs
+     * @return RowLock used to eventually release the lock
      * @throws TimeoutIOException if the lock could not be acquired within the
      * allowed rowLockWaitDuration and InterruptedException if interrupted 
while
      * waiting to acquire lock.
      */
-    public RowLock lockRow(ImmutableBytesPtr rowKey, int waitDuration) throws 
IOException {
-        RowLockContext rowLockContext = null;
-        RowLockImpl result = null;
+    public RowLock lockRow(ImmutableBytesPtr rowKey, long waitDurationMs) 
throws IOException {
+        RowLockImpl rowLock = new RowLockImpl(rowKey);
         TraceScope traceScope = null;
 
         // If we're tracing start a span to show how long this took.
         if (Trace.isTracing()) {
-            traceScope = Trace.startSpan("LockManager.getRowLock");
-            traceScope.getSpan().addTimelineAnnotation("Getting a lock");
+            traceScope = Trace.startSpan("LockManager.lockRow");
+            traceScope.getSpan().addTimelineAnnotation("Getting a row lock");
         }
-
         boolean success = false;
         try {
-            // Keep trying until we have a lock or error out.
-            // TODO: do we need to add a time component here?
-            while (result == null) {
-
-                // Try adding a RowLockContext to the lockedRows.
-                // If we can add it then there's no other transactions 
currently running.
-                rowLockContext = new RowLockContext(rowKey);
-                RowLockContext existingContext = 
lockedRows.putIfAbsent(rowKey, rowLockContext);
-
-                // if there was a running transaction then there's already a 
context.
-                if (existingContext != null) {
-                    rowLockContext = existingContext;
+            while (true) {
+                RowLockImpl existingRowLock = lockedRows.putIfAbsent(rowKey, 
rowLock);
+                if (existingRowLock == null) {
+                    // The row was not locked
+                    success = true;
+                    return rowLock;
                 }
-
-                result = rowLockContext.newRowLock();
-            }
-            if (!result.getLock().tryLock(waitDuration, 
TimeUnit.MILLISECONDS)) {
-                if (traceScope != null) {
-                    traceScope.getSpan().addTimelineAnnotation("Failed to get 
row lock");
+                // The row is already locked by a different thread. Wait for 
the lock to be released
+                // for waitDurationMs time
+                long startTime = EnvironmentEdgeManager.currentTimeMillis();
+                RowLockImpl usableRowLock = 
existingRowLock.lock(waitDurationMs);
+                if (usableRowLock != null) {
+                    success = true;
+                    return usableRowLock;
                 }
-                throw new TimeoutIOException("Timed out waiting for lock for 
row: " + rowKey);
+                // The existing lock was released and removed from the hash 
map before the current
+                // thread attempt to lock
+                long now = EnvironmentEdgeManager.currentTimeMillis();
+                long timePassed = now - startTime;
+                if (timePassed > waitDurationMs) {
+                    throw new TimeoutIOException("Timed out waiting for lock 
for row: " + rowKey);
+                }
+                waitDurationMs -= timePassed;
             }
-            rowLockContext.setThreadName(Thread.currentThread().getName());
-            success = true;
-            return result;
         } catch (InterruptedException ie) {
             LOGGER.warn("Thread interrupted waiting for lock on row: " + 
rowKey);
             InterruptedIOException iie = new InterruptedIOException();
             iie.initCause(ie);
-            if (traceScope != null) {
-                traceScope.getSpan().addTimelineAnnotation("Interrupted 
exception getting row lock");
-            }
             Thread.currentThread().interrupt();
             throw iie;
         } finally {
-            // On failure, clean up the counts just in case this was the thing 
keeping the context alive.
-            if (!success && rowLockContext != null) rowLockContext.cleanUp();
             if (traceScope != null) {
+                if (!success) {
+                    traceScope.getSpan().addTimelineAnnotation("Failed to get 
row lock");
+                }
                 traceScope.close();
             }
         }
     }
 
-    public RowLock lockRow(byte[] row, int waitDuration) throws IOException {
-        // create an object to use a a key in the row lock map
+    public RowLock lockRow(byte[] row, long waitDurationMs) throws IOException 
{
         ImmutableBytesPtr rowKey = new ImmutableBytesPtr(row);
-        return lockRow(rowKey, waitDuration);
+        return lockRow(rowKey, waitDurationMs);
     }
 
     /**
-     * Unlock the row. We need this stateless way of unlocking because
-     * we have no means of passing the RowLock instances between
-     * coprocessor calls (see HBASE-18482). Once we have that, we
-     * can have the caller collect RowLock instances and free when
-     * needed.
-     * @param row the row key
-     * @throws IOException
+     * Class used to represent a lock on a row.
      */
-    public void unlockRow(byte[] row) throws IOException {
-        ImmutableBytesPtr rowKey = new ImmutableBytesPtr(row);
-        RowLockContext lockContext = lockedRows.get(rowKey);
-        if (lockContext != null) {
-            lockContext.releaseRowLock();
-        }
-    }
-
-    class RowLockContext {
+    public class RowLockImpl implements RowLock {
         private final ImmutableBytesPtr rowKey;
-        // TODO: consider making this non atomic. It's only saving one
-        // synchronization in the case of cleanup() when more than one
-        // thread is holding on to the lock.
-        private final AtomicInteger count = new AtomicInteger(0);
-        private final ReentrantLock reentrantLock = new ReentrantLock(true);
-        // TODO: remove once we can pass List<RowLock> as needed through
-        // coprocessor calls.
-        private volatile RowLockImpl rowLock = RowLockImpl.UNINITIALIZED;
+        private int count = 1;
+        private boolean usable = true;
+        private final ReentrantLock lock = new ReentrantLock(true);
         private String threadName;
 
-        RowLockContext(ImmutableBytesPtr rowKey) {
+        private RowLockImpl(ImmutableBytesPtr rowKey) {
             this.rowKey = rowKey;
+            lock.lock();
+            threadName = Thread.currentThread().getName();
         }
 
-        RowLockImpl newRowLock() {
-            count.incrementAndGet();
+        RowLockImpl lock(long waitDuration) throws InterruptedException, 
TimeoutIOException {
             synchronized (this) {
-                if (rowLock != null) {
-                    rowLock = new RowLockImpl(this, reentrantLock);
-                    return rowLock;
-                } else {
+                if (!usable) {
                     return null;
                 }
+                count++;
             }
-        }
-
-        void releaseRowLock() {
-            synchronized (this) {
-                if (rowLock != null) {
-                    rowLock.release();
+            boolean success = false;
+            threadName = Thread.currentThread().getName();
+            try {
+                if (!lock.tryLock(waitDuration, TimeUnit.MILLISECONDS)) {
+                    throw new TimeoutIOException("Timed out waiting for lock 
for row: " + rowKey);
                 }
-            }
-        }
-        
-        void cleanUp() {
-            long c = count.decrementAndGet();
-            if (c <= 0) {
-                synchronized (this) {
-                    if (count.get() <= 0 && rowLock != null){
-                        rowLock = null;
-                        RowLockContext removed = lockedRows.remove(rowKey);
-                        assert removed == this: "we should never remove a 
different context";
-                    }
+                success = true;
+            } finally {
+                if (!success) {
+                    cleanUp();
+                    return null;
                 }
             }
+            return this;
         }
 
-        void setThreadName(String threadName) {
-            this.threadName = threadName;
-        }
-
-        @Override
-        public String toString() {
-            return "RowLockContext{" +
-                    "row=" + rowKey +
-                    ", readWriteLock=" + reentrantLock +
-                    ", count=" + count +
-                    ", threadName=" + threadName +
-                    '}';
-        }
-    }
-
-    /**
-     * Class used to represent a lock on a row.
-     */
-    public static class RowLockImpl implements RowLock {
-        static final RowLockImpl UNINITIALIZED = new RowLockImpl();
-        private final RowLockContext context;
-        private final Lock lock;
-
-        private RowLockImpl() {
-            context = null;
-            lock = null;
-        }
-        
-        RowLockImpl(RowLockContext context, Lock lock) {
-            this.context = context;
-            this.lock = lock;
-        }
-
-        Lock getLock() {
-            return lock;
+        private void cleanUp() {
+            synchronized (this) {
+                count--;
+                if (count == 0) {
+                    RowLockImpl removed = lockedRows.remove(rowKey);
+                    assert removed == this : "We should never remove a 
different lock";
+                    usable = false;
+                } else {
+                    assert count > 0 : "Reference count should never be less 
than zero";
+                }
+            }
         }
-
         @Override
         public void release() {
             lock.unlock();
-            context.cleanUp();
+            cleanUp();
         }
 
         @Override
         public ImmutableBytesPtr getRowKey() {
-            return context.rowKey;
+            return rowKey;
         }
 
         @Override
         public String toString() {
             return "RowLockImpl{" +
-                    "context=" + context +
+                    "row=" + rowKey +
+                    ", count=" + count +
+                    ", threadName=" + threadName +
                     ", lock=" + lock +
-                    '}';
+                    ", usable=" + usable +
+                    "}";
         }
     }
 
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
index 4bf5ffacc1..95639d284a 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
@@ -18,6 +18,8 @@
 package org.apache.phoenix.end2end;
 
 import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
+import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository;
+import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseIOException;
@@ -107,6 +109,7 @@ public class ConcurrentMutationsExtendedIT extends 
ParallelStatsDisabledIT {
         IndexTool indexTool = IndexToolIT.runIndexTool(false, "", tableName,
                 indexName, null, 0, IndexTool.IndexVerifyType.ONLY);
         System.out.println(indexTool.getJob().getCounters());
+        TestUtil.dumpTable(conn, 
TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME));
         assertEquals(0, 
indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
         assertEquals(0, 
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
         assertEquals(0, 
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());

Reply via email to