This is an automated email from the ASF dual-hosted git repository.
arp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0cfff16 HADOOP-16833. InstrumentedLock should log lock queue time.
Contributed by Stephen O'Donnell.
0cfff16 is described below
commit 0cfff16ac040bd5fb6783333d0d027369c68dead
Author: Arpit Agarwal <[email protected]>
AuthorDate: Tue Feb 18 09:50:11 2020 -0800
HADOOP-16833. InstrumentedLock should log lock queue time. Contributed by
Stephen O'Donnell.
Change-Id: Idddff05051b6f642b88e51694b40c5bb1bef0026
---
.../org/apache/hadoop/util/InstrumentedLock.java | 121 ++++++++++++++++++---
.../apache/hadoop/util/InstrumentedReadLock.java | 2 +-
.../apache/hadoop/util/TestInstrumentedLock.java | 111 ++++++++++++++++++-
.../hadoop/util/TestInstrumentedReadWriteLock.java | 9 +-
4 files changed, 222 insertions(+), 21 deletions(-)
diff --git
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedLock.java
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedLock.java
index 2c1f591..cc0ebdf 100644
---
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedLock.java
+++
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedLock.java
@@ -55,8 +55,10 @@ public class InstrumentedLock implements Lock {
// Tracking counters for lock statistics.
private volatile long lockAcquireTimestamp;
- private final AtomicLong lastLogTimestamp;
- private final AtomicLong warningsSuppressed = new AtomicLong(0);
+ private final AtomicLong lastHoldLogTimestamp;
+ private final AtomicLong lastWaitLogTimestamp;
+ private final SuppressedStats holdStats = new SuppressedStats();
+ private final SuppressedStats waitStats = new SuppressedStats();
/**
* Create a instrumented lock instance which logs a warning message
@@ -91,19 +93,24 @@ public class InstrumentedLock implements Lock {
this.logger = logger;
minLoggingGap = minLoggingGapMs;
lockWarningThreshold = lockWarningThresholdMs;
- lastLogTimestamp = new AtomicLong(
+ lastHoldLogTimestamp = new AtomicLong(
clock.monotonicNow() - Math.max(minLoggingGap, lockWarningThreshold));
+ lastWaitLogTimestamp = new AtomicLong(lastHoldLogTimestamp.get());
}
@Override
public void lock() {
+ long waitStart = clock.monotonicNow();
lock.lock();
+ check(waitStart, clock.monotonicNow(), false);
startLockTiming();
}
@Override
public void lockInterruptibly() throws InterruptedException {
+ long waitStart = clock.monotonicNow();
lock.lockInterruptibly();
+ check(waitStart, clock.monotonicNow(), false);
startLockTiming();
}
@@ -118,11 +125,14 @@ public class InstrumentedLock implements Lock {
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException
{
+ long waitStart = clock.monotonicNow();
+ boolean retval = false;
if (lock.tryLock(time, unit)) {
startLockTiming();
- return true;
+ retval = true;
}
- return false;
+ check(waitStart, clock.monotonicNow(), false);
+ return retval;
}
@Override
@@ -130,7 +140,7 @@ public class InstrumentedLock implements Lock {
long localLockReleaseTime = clock.monotonicNow();
long localLockAcquireTime = lockAcquireTimestamp;
lock.unlock();
- check(localLockAcquireTime, localLockReleaseTime);
+ check(localLockAcquireTime, localLockReleaseTime, true);
}
@Override
@@ -139,12 +149,25 @@ public class InstrumentedLock implements Lock {
}
@VisibleForTesting
- void logWarning(long lockHeldTime, long suppressed) {
+ void logWarning(long lockHeldTime, SuppressedSnapshot stats) {
logger.warn(String.format("Lock held time above threshold: " +
"lock identifier: %s " +
"lockHeldTimeMs=%d ms. Suppressed %d lock warnings. " +
+ "Longest suppressed LockHeldTimeMs=%d. " +
"The stack trace is: %s" ,
- name, lockHeldTime, suppressed,
+ name, lockHeldTime, stats.getSuppressedCount(),
+ stats.getMaxSuppressedWait(),
+ StringUtils.getStackTrace(Thread.currentThread())));
+ }
+
+ @VisibleForTesting
+ void logWaitWarning(long lockWaitTime, SuppressedSnapshot stats) {
+ logger.warn(String.format("Waited above threshold to acquire lock: " +
+ "lock identifier: %s " +
+ "waitTimeMs=%d ms. Suppressed %d lock wait warnings. " +
+ "Longest suppressed WaitTimeMs=%d. " +
+ "The stack trace is: %s", name, lockWaitTime,
+ stats.getSuppressedCount(), stats.getMaxSuppressedWait(),
StringUtils.getStackTrace(Thread.currentThread())));
}
@@ -163,27 +186,41 @@ public class InstrumentedLock implements Lock {
* @param acquireTime - timestamp just after acquiring the lock.
* @param releaseTime - timestamp just before releasing the lock.
*/
- protected void check(long acquireTime, long releaseTime) {
+ protected void check(long acquireTime, long releaseTime,
+ boolean checkLockHeld) {
if (!logger.isWarnEnabled()) {
return;
}
final long lockHeldTime = releaseTime - acquireTime;
if (lockWarningThreshold - lockHeldTime < 0) {
+ AtomicLong lastLogTime;
+ SuppressedStats stats;
+ if (checkLockHeld) {
+ lastLogTime = lastHoldLogTimestamp;
+ stats = holdStats;
+ } else {
+ lastLogTime = lastWaitLogTimestamp;
+ stats = waitStats;
+ }
long now;
long localLastLogTs;
do {
now = clock.monotonicNow();
- localLastLogTs = lastLogTimestamp.get();
+ localLastLogTs = lastLogTime.get();
long deltaSinceLastLog = now - localLastLogTs;
// check should print log or not
if (deltaSinceLastLog - minLoggingGap < 0) {
- warningsSuppressed.incrementAndGet();
+ stats.incrementSuppressed(lockHeldTime);
return;
}
- } while (!lastLogTimestamp.compareAndSet(localLastLogTs, now));
- long suppressed = warningsSuppressed.getAndSet(0);
- logWarning(lockHeldTime, suppressed);
+ } while (!lastLogTime.compareAndSet(localLastLogTs, now));
+ SuppressedSnapshot statsSnapshot = stats.snapshot();
+ if (checkLockHeld) {
+ logWarning(lockHeldTime, statsSnapshot);
+ } else {
+ logWaitWarning(lockHeldTime, statsSnapshot);
+ }
}
}
@@ -194,4 +231,60 @@ public class InstrumentedLock implements Lock {
protected Timer getTimer() {
return clock;
}
+
+ /**
+ * Internal class to track statistics about suppressed log messages in an
+ * atomic way.
+ */
+ private static class SuppressedStats {
+ private long suppressedCount = 0;
+ private long maxSuppressedWait = 0;
+
+ /**
+ * Increments the suppressed counter and increases the max wait time if the
+ * passed wait is greater than the current maxSuppressedWait.
+ * @param wait The wait time for this suppressed message
+ */
+ synchronized public void incrementSuppressed(long wait) {
+ suppressedCount++;
+ if (wait > maxSuppressedWait) {
+ maxSuppressedWait = wait;
+ }
+ }
+
+ /**
+ * Captures the current value of the counts into a SuppressedSnapshot
object
+ * and resets the values to zero.
+ *
+ * @return SuppressedSnapshot containing the current value of the counters
+ */
+ synchronized public SuppressedSnapshot snapshot() {
+ SuppressedSnapshot snap =
+ new SuppressedSnapshot(suppressedCount, maxSuppressedWait);
+ suppressedCount = 0;
+ maxSuppressedWait = 0;
+ return snap;
+ }
+ }
+
+ /**
+ * Immutable class to capture a snapshot of suppressed log message stats.
+ */
+ protected static class SuppressedSnapshot {
+ private long suppressedCount = 0;
+ private long maxSuppressedWait = 0;
+
+ public SuppressedSnapshot(long suppressedCount, long maxWait) {
+ this.suppressedCount = suppressedCount;
+ this.maxSuppressedWait = maxWait;
+ }
+
+ public long getMaxSuppressedWait() {
+ return maxSuppressedWait;
+ }
+
+ public long getSuppressedCount() {
+ return suppressedCount;
+ }
+ }
}
diff --git
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadLock.java
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadLock.java
index e115718..8ab392e 100644
---
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadLock.java
+++
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadLock.java
@@ -75,7 +75,7 @@ public class InstrumentedReadLock extends InstrumentedLock {
getLock().unlock();
if (needReport) {
readLockHeldTimeStamp.remove();
- check(localLockAcquireTime, localLockReleaseTime);
+ check(localLockAcquireTime, localLockReleaseTime, true);
}
}
diff --git
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestInstrumentedLock.java
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestInstrumentedLock.java
index 44158ec..c47ff07 100644
---
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestInstrumentedLock.java
+++
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestInstrumentedLock.java
@@ -17,9 +17,11 @@
*/
package org.apache.hadoop.util;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -117,12 +119,14 @@ public class TestInstrumentedLock {
final AtomicLong wlogged = new AtomicLong(0);
final AtomicLong wsuppresed = new AtomicLong(0);
+ final AtomicLong wMaxWait = new AtomicLong(0);
InstrumentedLock lock = new InstrumentedLock(
testname, LOG, mlock, 2000, 300, mclock) {
@Override
- void logWarning(long lockHeldTime, long suppressed) {
+ void logWarning(long lockHeldTime, SuppressedSnapshot stats) {
wlogged.incrementAndGet();
- wsuppresed.set(suppressed);
+ wsuppresed.set(stats.getSuppressedCount());
+ wMaxWait.set(stats.getMaxSuppressedWait());
}
};
@@ -132,12 +136,14 @@ public class TestInstrumentedLock {
lock.unlock(); // t = 200
assertEquals(0, wlogged.get());
assertEquals(0, wsuppresed.get());
+ assertEquals(0, wMaxWait.get());
lock.lock(); // t = 200
time.set(700);
lock.unlock(); // t = 700
assertEquals(1, wlogged.get());
assertEquals(0, wsuppresed.get());
+ assertEquals(0, wMaxWait.get());
// despite the lock held time is greater than threshold
// suppress the log warning due to the logging gap
@@ -147,6 +153,7 @@ public class TestInstrumentedLock {
lock.unlock(); // t = 1100
assertEquals(1, wlogged.get());
assertEquals(0, wsuppresed.get());
+ assertEquals(0, wMaxWait.get());
// log a warning message when the lock held time is greater the threshold
// and the logging time gap is satisfied. Also should display suppressed
@@ -157,6 +164,106 @@ public class TestInstrumentedLock {
lock.unlock(); // t = 2800
assertEquals(2, wlogged.get());
assertEquals(1, wsuppresed.get());
+ assertEquals(400, wMaxWait.get());
+ }
+
+ /**
+ * Test the lock logs warning when lock wait / queue time is greater than
+ * threshold and not log warning otherwise.
+ * @throws Exception
+ */
+ @Test(timeout=10000)
+ public void testLockLongWaitReport() throws Exception {
+ String testname = name.getMethodName();
+ final AtomicLong time = new AtomicLong(0);
+ Timer mclock = new Timer() {
+ @Override
+ public long monotonicNow() {
+ return time.get();
+ }
+ };
+ Lock mlock = new ReentrantLock(true); //mock(Lock.class);
+
+ final AtomicLong wlogged = new AtomicLong(0);
+ final AtomicLong wsuppresed = new AtomicLong(0);
+ final AtomicLong wMaxWait = new AtomicLong(0);
+ InstrumentedLock lock = new InstrumentedLock(
+ testname, LOG, mlock, 2000, 300, mclock) {
+ @Override
+ void logWaitWarning(long lockHeldTime, SuppressedSnapshot stats) {
+ wlogged.incrementAndGet();
+ wsuppresed.set(stats.getSuppressedCount());
+ wMaxWait.set(stats.getMaxSuppressedWait());
+ }
+ };
+
+ // do not log warning when the lock held time is short
+ lock.lock(); // t = 0
+
+ Thread competingThread = lockUnlockThread(lock);
+ time.set(200);
+ lock.unlock(); // t = 200
+ competingThread.join();
+ assertEquals(0, wlogged.get());
+ assertEquals(0, wsuppresed.get());
+ assertEquals(0, wMaxWait.get());
+
+
+ lock.lock(); // t = 200
+ competingThread = lockUnlockThread(lock);
+ time.set(700);
+ lock.unlock(); // t = 700
+ competingThread.join();
+
+ // The competing thread will have waited for 500ms, so it should log
+ assertEquals(1, wlogged.get());
+ assertEquals(0, wsuppresed.get());
+ assertEquals(0, wMaxWait.get());
+
+ // despite the lock wait time is greater than threshold
+ // suppress the log warning due to the logging gap
+ // (not recorded in wsuppressed until next log message)
+ lock.lock(); // t = 700
+ competingThread = lockUnlockThread(lock);
+ time.set(1100);
+ lock.unlock(); // t = 1100
+ competingThread.join();
+ assertEquals(1, wlogged.get());
+ assertEquals(0, wsuppresed.get());
+ assertEquals(0, wMaxWait.get());
+
+ // log a warning message when the lock held time is greater the threshold
+ // and the logging time gap is satisfied. Also should display suppressed
+ // previous warnings.
+ time.set(2400);
+ lock.lock(); // t = 2400
+ competingThread = lockUnlockThread(lock);
+ time.set(2800);
+ lock.unlock(); // t = 2800
+ competingThread.join();
+ assertEquals(2, wlogged.get());
+ assertEquals(1, wsuppresed.get());
+ assertEquals(400, wMaxWait.get());
+ }
+
+ private Thread lockUnlockThread(Lock lock) throws InterruptedException {
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ Thread t = new Thread(() -> {
+ try {
+ assertFalse(lock.tryLock());
+ countDownLatch.countDown();
+ lock.lock();
+ } finally {
+ lock.unlock();
+ }
+ });
+ t.start();
+ countDownLatch.await();
+ // Even with the countdown latch, the main thread releases the lock
+ // before this thread actually starts waiting on it, so introducing a
+ // short sleep so the competing thread can block on the lock as intended.
+ Thread.sleep(3);
+ return t;
}
}
diff --git
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestInstrumentedReadWriteLock.java
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestInstrumentedReadWriteLock.java
index 3e1a88b..1ea3ef1 100644
---
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestInstrumentedReadWriteLock.java
+++
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestInstrumentedReadWriteLock.java
@@ -146,9 +146,10 @@ public class TestInstrumentedReadWriteLock {
InstrumentedReadLock readLock = new InstrumentedReadLock(testname, LOG,
readWriteLock, 2000, 300, mclock) {
@Override
- protected void logWarning(long lockHeldTime, long suppressed) {
+ protected void logWarning(
+ long lockHeldTime, SuppressedSnapshot stats) {
wlogged.incrementAndGet();
- wsuppresed.set(suppressed);
+ wsuppresed.set(stats.getSuppressedCount());
}
};
@@ -200,9 +201,9 @@ public class TestInstrumentedReadWriteLock {
InstrumentedWriteLock writeLock = new InstrumentedWriteLock(testname, LOG,
readWriteLock, 2000, 300, mclock) {
@Override
- protected void logWarning(long lockHeldTime, long suppressed) {
+ protected void logWarning(long lockHeldTime, SuppressedSnapshot stats) {
wlogged.incrementAndGet();
- wsuppresed.set(suppressed);
+ wsuppresed.set(stats.getSuppressedCount());
}
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]