This is an automated email from the ASF dual-hosted git repository.
domgarguilo pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 5adc92bce1 Replace semaphore with ReentrantLock in Scanner to help
clean up scan sessions (#3644)
5adc92bce1 is described below
commit 5adc92bce11ecd9847af0f510a327b5cd2af4418
Author: Dom G <[email protected]>
AuthorDate: Wed Jul 26 11:15:51 2023 -0400
Replace semaphore with ReentrantLock in Scanner to help clean up scan
sessions (#3644)
* ReentrantLock.getOwner() to help clean up scan sessions
---------
Co-authored-by: Keith Turner <[email protected]>
---
.../apache/accumulo/tserver/tablet/Scanner.java | 57 ++++++++++++++++------
1 file changed, 41 insertions(+), 16 deletions(-)
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java
index 311492b822..2b89a2005b 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java
@@ -20,9 +20,9 @@ package org.apache.accumulo.tserver.tablet;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
@@ -35,6 +35,8 @@ import org.apache.accumulo.tserver.scan.ScanParameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+
public class Scanner {
private static final Logger log = LoggerFactory.getLogger(Scanner.class);
@@ -46,20 +48,22 @@ public class Scanner {
private boolean sawException = false;
private boolean scanClosed = false;
/**
- * A fair semaphore of one is used since explicitly know the access pattern
will be one thread to
- * read and another to call close if the session becomes idle. Since we're
explicitly preventing
- * re-entrance, we're currently using a Semaphore. If at any point we decide
read needs to be
- * re-entrant, we can switch to a Reentrant lock.
+ * An interruptible, re-entrant lock is used since we know the access
pattern will be one thread
+ * to read and another to call close if the session becomes idle. This lock
allows the closing
+ * thread to interrupt the reading thread if it can't obtain the lock
immediately. This way, the
+ * reading thread can finish its current operation and release the lock
promptly.
*/
- private Semaphore scannerSemaphore;
+ private final InterruptibleLock lock;
+
+ private final AtomicBoolean interruptFlag;
- private AtomicBoolean interruptFlag;
+ private boolean readInProgress = false;
Scanner(TabletBase tablet, Range range, ScanParameters scanParams,
AtomicBoolean interruptFlag) {
this.tablet = tablet;
this.range = range;
this.scanParams = scanParams;
- this.scannerSemaphore = new Semaphore(1, true);
+ this.lock = new InterruptibleLock();
this.interruptFlag = interruptFlag;
}
@@ -72,7 +76,11 @@ public class Scanner {
try {
try {
- scannerSemaphore.acquire();
+ lock.lockInterruptibly();
+ Preconditions.checkState(!readInProgress);
+ // Simple check to ensure the same thread never calls this method
recursively. This code
+ // would not handle that well.
+ readInProgress = true;
} catch (InterruptedException e) {
sawException = true;
}
@@ -162,22 +170,39 @@ public class Scanner {
tablet.updateQueryStats(results.getResults().size(),
results.getNumBytes());
}
} finally {
- scannerSemaphore.release();
+ readInProgress = false;
+ lock.unlock();
}
}
}
- // close and read are synchronized because can not call close on the data
source while it is in
- // use
- // this could lead to the case where file iterators that are in use by a
thread are returned
- // to the pool... this would be bad
+ private static class InterruptibleLock extends ReentrantLock {
+ private static final long serialVersionUID = 1L;
+
+ public Thread getLockOwner() {
+ return getOwner();
+ }
+ }
+
+ /*
+ * close and read are controlled by an InterruptibleLock because we cannot
call close on the data
+ * source while it is in use. Without this lock, there could be a situation
where file iterators
+ * that are in use by a thread are returned to the pool, which would be bad.
With the lock, a
+ * thread can attempt to close the Scanner. If it can't immediately acquire
the lock (because a
+ * read is in progress), it interrupts the reading thread. This ensures the
reading thread can
+ * finish its current operation and release the lock, allowing close to
finish.
+ */
public boolean close() {
interruptFlag.set(true);
boolean obtainedLock = false;
try {
- obtainedLock = scannerSemaphore.tryAcquire(10, TimeUnit.MILLISECONDS);
+ obtainedLock = lock.tryLock(10, TimeUnit.MILLISECONDS);
if (!obtainedLock) {
+ Thread ownerThread = lock.getLockOwner();
+ if (ownerThread != null) {
+ ownerThread.interrupt();
+ }
return false;
}
@@ -189,7 +214,7 @@ public class Scanner {
return false;
} finally {
if (obtainedLock) {
- scannerSemaphore.release();
+ lock.unlock();
}
}
return true;