This is an automated email from the ASF dual-hosted git repository.
domgarguilo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new a1efd91039 Use Timer object to improve timekeeping code (#4380)
a1efd91039 is described below
commit a1efd91039e856eb7c0360b8023ba02fe698962e
Author: Dom G. <[email protected]>
AuthorDate: Tue Jan 6 11:34:46 2026 -0500
Use Timer object to improve timekeeping code (#4380)
* Use NanoTime in more places to improve safety and maintainability of
timekeeping code
---------
Co-authored-by: Keith Turner <[email protected]>
---
.../core/clientImpl/TabletServerBatchWriter.java | 6 +--
.../fate/zookeeper/DistributedReadWriteLock.java | 7 ++--
.../java/org/apache/accumulo/core/util/Retry.java | 43 ++++++++++------------
.../accumulo/server/mem/LowMemoryDetector.java | 15 ++++----
.../apache/accumulo/server/rpc/TimedProcessor.java | 17 +++++----
.../org/apache/accumulo/tserver/tablet/Tablet.java | 11 +++---
6 files changed, 48 insertions(+), 51 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
index ab3bb1a1a5..358d46a886 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
@@ -45,7 +45,6 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -80,6 +79,7 @@ import
org.apache.accumulo.core.tabletingest.thrift.TabletIngestClientService;
import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.Retry;
+import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.thrift.TApplicationException;
@@ -1094,7 +1094,7 @@ public class TabletServerBatchWriter implements
AutoCloseable {
final HostAndPort parsedServer = HostAndPort.fromString(location);
- long startTime = System.nanoTime();
+ Timer timer = Timer.startNew();
// If somethingFailed is true then the batch writer will throw an
exception on close or
// flush, so no need to close this session. Only want to close the
session for retryable
@@ -1147,7 +1147,7 @@ public class TabletServerBatchWriter implements
AutoCloseable {
}
// if a timeout is set on the batch writer, then do not retry longer
than the timeout
- if (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime) >
timeout) {
+ if (timer.hasElapsed(timeout, MILLISECONDS)) {
log.debug("Giving up on canceling session {} {} and timing out.",
location, usid);
throw new TimedOutException(Set.of(location));
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java
b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java
index 116c5850e0..afa55e73b7 100644
---
a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java
+++
b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java
@@ -32,6 +32,7 @@ import java.util.function.Supplier;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.zookeeper.FateLock.FateLockEntry;
+import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -147,15 +148,13 @@ public class DistributedReadWriteLock implements
java.util.concurrent.locks.Read
@Override
public boolean tryLock(long time, TimeUnit unit) throws
InterruptedException {
- long now = System.currentTimeMillis();
- long returnTime = now + MILLISECONDS.convert(time, unit);
- while (returnTime > now) {
+ Timer timer = Timer.startNew();
+ while (!timer.hasElapsed(time, unit)) {
if (tryLock()) {
return true;
}
// TODO: do something better than poll - ACCUMULO-1310
UtilWaitThread.sleep(100);
- now = System.currentTimeMillis();
}
return false;
}
diff --git a/core/src/main/java/org/apache/accumulo/core/util/Retry.java
b/core/src/main/java/org/apache/accumulo/core/util/Retry.java
index 96f4452382..8fc7c78a9f 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/Retry.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/Retry.java
@@ -41,9 +41,8 @@ public class Retry {
private Duration currentWait;
private Duration initialWait;
- private boolean hasNeverLogged;
private boolean hasLoggedWarn = false;
- private long lastRetryLog;
+ private Timer lastRetryLogTimer;
private double currentBackOffFactor;
private boolean doTimeJitter = true;
@@ -63,8 +62,7 @@ public class Retry {
this.currentWait = startWait;
this.initialWait = startWait;
this.logInterval = logInterval;
- this.hasNeverLogged = true;
- this.lastRetryLog = -1;
+ this.lastRetryLogTimer = null;
this.backOffFactor = backOffFactor;
this.currentBackOffFactor = this.backOffFactor;
@@ -201,16 +199,14 @@ public class Retry {
public void logRetry(Logger log, String message, Throwable t) {
// log the first time as debug, and then after every logInterval as a
warning
- long now = System.nanoTime();
- if (hasNeverLogged) {
+ if (lastRetryLogTimer == null) {
if (log.isDebugEnabled()) {
log.debug(getMessage(message, t));
}
- hasNeverLogged = false;
- lastRetryLog = now;
- } else if ((now - lastRetryLog) > logInterval.toNanos()) {
+ lastRetryLogTimer = Timer.startNew();
+ } else if (lastRetryLogTimer.hasElapsed(logInterval)) {
log.warn(getMessage(message), t);
- lastRetryLog = now;
+ lastRetryLogTimer.restart();
hasLoggedWarn = true;
} else {
if (log.isTraceEnabled()) {
@@ -221,16 +217,14 @@ public class Retry {
public void logRetry(Logger log, String message) {
// log the first time as debug, and then after every logInterval as a
warning
- long now = System.nanoTime();
- if (hasNeverLogged) {
+ if (lastRetryLogTimer == null) {
if (log.isDebugEnabled()) {
log.debug(getMessage(message));
}
- hasNeverLogged = false;
- lastRetryLog = now;
- } else if ((now - lastRetryLog) > logInterval.toNanos()) {
+ lastRetryLogTimer = Timer.startNew();
+ } else if (lastRetryLogTimer.hasElapsed(logInterval)) {
log.warn(getMessage(message));
- lastRetryLog = now;
+ lastRetryLogTimer.restart();
hasLoggedWarn = true;
} else {
if (log.isTraceEnabled()) {
@@ -250,14 +244,15 @@ public class Retry {
}
public void logCompletion(Logger log, String operationDescription) {
- if (!hasNeverLogged) {
- var message = operationDescription + " completed after " + (retriesDone
+ 1)
- + " retries and is no longer retrying.";
- if (hasLoggedWarn) {
- log.info(message);
- } else {
- log.debug(message);
- }
+ if (lastRetryLogTimer == null) { // have never logged a retry
+ return;
+ }
+ var message = operationDescription + " completed after " + (retriesDone +
1)
+ + " retries and is no longer retrying.";
+ if (hasLoggedWarn) {
+ log.info(message);
+ } else {
+ log.debug(message);
}
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java
b/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java
index 6f20ec6b30..514ed22081 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java
@@ -31,6 +31,7 @@ import java.util.function.Supplier;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.util.Halt;
+import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.server.ServerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,7 +41,7 @@ public class LowMemoryDetector {
private static class LowMemDetectorState {
private long lastMemorySize = 0;
private int lowMemCount = 0;
- private long lastMemoryCheckTime = 0;
+ private Timer lastMemoryCheckTimer = null;
private boolean runningLowOnMemory = false;
}
@@ -101,7 +102,6 @@ public class LowMemoryDetector {
memCheckTimeLock.lock();
try {
LowMemDetectorState localState = state.get();
- final long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
List<GarbageCollectorMXBean> gcmBeans =
ManagementFactory.getGarbageCollectorMXBeans();
@@ -171,15 +171,16 @@ public class LowMemoryDetector {
}
final long keepAliveTimeout =
conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT);
- if (localState.lastMemoryCheckTime > 0 && localState.lastMemoryCheckTime
< now) {
- final long diff = now - localState.lastMemoryCheckTime;
- if (diff > keepAliveTimeout + 1000) {
+ if (localState.lastMemoryCheckTimer != null) {
+ if (localState.lastMemoryCheckTimer.hasElapsed(keepAliveTimeout + 1000,
+ TimeUnit.MILLISECONDS)) {
+ final long diff =
localState.lastMemoryCheckTimer.elapsed(TimeUnit.MILLISECONDS);
LOG.warn(String.format(
"GC pause checker not called in a timely"
+ " fashion. Expected every %.1f seconds but was %.1f
seconds since last check",
keepAliveTimeout / 1000., diff / 1000.));
}
- localState.lastMemoryCheckTime = now;
+ localState.lastMemoryCheckTimer.restart();
return;
}
@@ -188,7 +189,7 @@ public class LowMemoryDetector {
}
localState.lastMemorySize = freeMemory;
- localState.lastMemoryCheckTime = now;
+ localState.lastMemoryCheckTimer = Timer.startNew();
} finally {
memCheckTimeLock.unlock();
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java
b/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java
index 4148cfb0ed..7810d137c3 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java
@@ -18,9 +18,10 @@
*/
package org.apache.accumulo.server.rpc;
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import org.apache.accumulo.core.metrics.MetricsInfo;
+import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.server.metrics.ThriftMetrics;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
@@ -33,25 +34,25 @@ public class TimedProcessor implements TProcessor {
private final TProcessor other;
private final ThriftMetrics thriftMetrics;
- private long idleStart;
+ private final Timer idleTimer;
public TimedProcessor(final TProcessor next, final MetricsInfo metricsInfo) {
this.other = next;
thriftMetrics = new ThriftMetrics();
metricsInfo.addMetricsProducers(thriftMetrics);
- idleStart = System.nanoTime();
+ idleTimer = Timer.startNew();
}
@Override
public void process(TProtocol in, TProtocol out) throws TException {
- long processStart = System.nanoTime();
- thriftMetrics.addIdle(NANOSECONDS.toMillis(processStart - idleStart));
+ thriftMetrics.addIdle(idleTimer.elapsed(MILLISECONDS));
+ Timer processTimer = Timer.startNew();
try {
other.process(in, out);
} finally {
- // set idle to now, calc time in process
- idleStart = System.nanoTime();
- thriftMetrics.addExecute(NANOSECONDS.toMillis(idleStart - processStart));
+ // calc time in process, restart idle timer
+ thriftMetrics.addExecute(processTimer.elapsed(MILLISECONDS));
+ idleTimer.restart();
}
}
}
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 1db20cf550..e6d61f44e0 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.tserver.tablet;
import static
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.stream.Collectors.toList;
import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
@@ -86,6 +87,7 @@ import
org.apache.accumulo.core.tabletserver.thrift.TabletStats;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.Halt;
import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.server.compaction.CompactionStats;
import org.apache.accumulo.server.fs.VolumeManager;
@@ -928,7 +930,7 @@ public class Tablet extends TabletBase {
return currentlyUnreserved;
});
- long lastLogTime = System.nanoTime();
+ Timer lastLogTimer = Timer.startNew();
// wait for reads and writes to complete
while (writesInProgress > 0 || !runningScans.isEmpty()) {
@@ -940,13 +942,12 @@ public class Tablet extends TabletBase {
return currentlyUnreserved;
});
- if (log.isDebugEnabled()
- && System.nanoTime() - lastLogTime > TimeUnit.SECONDS.toNanos(60))
{
+ if (log.isDebugEnabled() && lastLogTimer.hasElapsed(1, MINUTES)) {
for (ScanDataSource activeScan : runningScans) {
log.debug("Waiting on scan in completeClose {} {}", extent,
activeScan);
}
- lastLogTime = System.nanoTime();
+ lastLogTimer.restart();
}
try {
@@ -1587,7 +1588,7 @@ public class Tablet extends TabletBase {
} catch (IOException ioe) {
log.warn("Tablet {} failed to rename {} after MinC, will retry in 60
secs...", getExtent(),
newDatafile, ioe);
- sleepUninterruptibly(1, TimeUnit.MINUTES);
+ sleepUninterruptibly(1, MINUTES);
}
} while (true);