This is an automated email from the ASF dual-hosted git repository.

domgarguilo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new a1efd91039 Use Timer object to improve timekeeping code (#4380)
a1efd91039 is described below

commit a1efd91039e856eb7c0360b8023ba02fe698962e
Author: Dom G. <[email protected]>
AuthorDate: Tue Jan 6 11:34:46 2026 -0500

    Use Timer object to improve timekeeping code (#4380)
    
    * Use NanoTime in more places to improve safety and maintainability of 
timekeeping code
    
    ---------
    
    Co-authored-by: Keith Turner <[email protected]>
---
 .../core/clientImpl/TabletServerBatchWriter.java   |  6 +--
 .../fate/zookeeper/DistributedReadWriteLock.java   |  7 ++--
 .../java/org/apache/accumulo/core/util/Retry.java  | 43 ++++++++++------------
 .../accumulo/server/mem/LowMemoryDetector.java     | 15 ++++----
 .../apache/accumulo/server/rpc/TimedProcessor.java | 17 +++++----
 .../org/apache/accumulo/tserver/tablet/Tablet.java | 11 +++---
 6 files changed, 48 insertions(+), 51 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
index ab3bb1a1a5..358d46a886 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
@@ -45,7 +45,6 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -80,6 +79,7 @@ import 
org.apache.accumulo.core.tabletingest.thrift.TabletIngestClientService;
 import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
 import org.apache.accumulo.core.trace.TraceUtil;
 import org.apache.accumulo.core.util.Retry;
+import org.apache.accumulo.core.util.Timer;
 import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.core.util.threads.Threads;
 import org.apache.thrift.TApplicationException;
@@ -1094,7 +1094,7 @@ public class TabletServerBatchWriter implements 
AutoCloseable {
 
         final HostAndPort parsedServer = HostAndPort.fromString(location);
 
-        long startTime = System.nanoTime();
+        Timer timer = Timer.startNew();
 
         // If somethingFailed is true then the batch writer will throw an 
exception on close or
         // flush, so no need to close this session. Only want to close the 
session for retryable
@@ -1147,7 +1147,7 @@ public class TabletServerBatchWriter implements 
AutoCloseable {
           }
 
           // if a timeout is set on the batch writer, then do not retry longer 
than the timeout
-          if (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime) > 
timeout) {
+          if (timer.hasElapsed(timeout, MILLISECONDS)) {
             log.debug("Giving up on canceling session {} {} and timing out.", 
location, usid);
             throw new TimedOutException(Set.of(location));
           }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java
 
b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java
index 116c5850e0..afa55e73b7 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java
@@ -32,6 +32,7 @@ import java.util.function.Supplier;
 
 import org.apache.accumulo.core.fate.FateId;
 import org.apache.accumulo.core.fate.zookeeper.FateLock.FateLockEntry;
+import org.apache.accumulo.core.util.Timer;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -147,15 +148,13 @@ public class DistributedReadWriteLock implements 
java.util.concurrent.locks.Read
 
     @Override
     public boolean tryLock(long time, TimeUnit unit) throws 
InterruptedException {
-      long now = System.currentTimeMillis();
-      long returnTime = now + MILLISECONDS.convert(time, unit);
-      while (returnTime > now) {
+      Timer timer = Timer.startNew();
+      while (!timer.hasElapsed(time, unit)) {
         if (tryLock()) {
           return true;
         }
         // TODO: do something better than poll - ACCUMULO-1310
         UtilWaitThread.sleep(100);
-        now = System.currentTimeMillis();
       }
       return false;
     }
diff --git a/core/src/main/java/org/apache/accumulo/core/util/Retry.java 
b/core/src/main/java/org/apache/accumulo/core/util/Retry.java
index 96f4452382..8fc7c78a9f 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/Retry.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/Retry.java
@@ -41,9 +41,8 @@ public class Retry {
   private Duration currentWait;
   private Duration initialWait;
 
-  private boolean hasNeverLogged;
   private boolean hasLoggedWarn = false;
-  private long lastRetryLog;
+  private Timer lastRetryLogTimer;
   private double currentBackOffFactor;
   private boolean doTimeJitter = true;
 
@@ -63,8 +62,7 @@ public class Retry {
     this.currentWait = startWait;
     this.initialWait = startWait;
     this.logInterval = logInterval;
-    this.hasNeverLogged = true;
-    this.lastRetryLog = -1;
+    this.lastRetryLogTimer = null;
     this.backOffFactor = backOffFactor;
     this.currentBackOffFactor = this.backOffFactor;
 
@@ -201,16 +199,14 @@ public class Retry {
 
   public void logRetry(Logger log, String message, Throwable t) {
     // log the first time as debug, and then after every logInterval as a 
warning
-    long now = System.nanoTime();
-    if (hasNeverLogged) {
+    if (lastRetryLogTimer == null) {
       if (log.isDebugEnabled()) {
         log.debug(getMessage(message, t));
       }
-      hasNeverLogged = false;
-      lastRetryLog = now;
-    } else if ((now - lastRetryLog) > logInterval.toNanos()) {
+      lastRetryLogTimer = Timer.startNew();
+    } else if (lastRetryLogTimer.hasElapsed(logInterval)) {
       log.warn(getMessage(message), t);
-      lastRetryLog = now;
+      lastRetryLogTimer.restart();
       hasLoggedWarn = true;
     } else {
       if (log.isTraceEnabled()) {
@@ -221,16 +217,14 @@ public class Retry {
 
   public void logRetry(Logger log, String message) {
     // log the first time as debug, and then after every logInterval as a 
warning
-    long now = System.nanoTime();
-    if (hasNeverLogged) {
+    if (lastRetryLogTimer == null) {
       if (log.isDebugEnabled()) {
         log.debug(getMessage(message));
       }
-      hasNeverLogged = false;
-      lastRetryLog = now;
-    } else if ((now - lastRetryLog) > logInterval.toNanos()) {
+      lastRetryLogTimer = Timer.startNew();
+    } else if (lastRetryLogTimer.hasElapsed(logInterval)) {
       log.warn(getMessage(message));
-      lastRetryLog = now;
+      lastRetryLogTimer.restart();
       hasLoggedWarn = true;
     } else {
       if (log.isTraceEnabled()) {
@@ -250,14 +244,15 @@ public class Retry {
   }
 
   public void logCompletion(Logger log, String operationDescription) {
-    if (!hasNeverLogged) {
-      var message = operationDescription + " completed after " + (retriesDone 
+ 1)
-          + " retries and is no longer retrying.";
-      if (hasLoggedWarn) {
-        log.info(message);
-      } else {
-        log.debug(message);
-      }
+    if (lastRetryLogTimer == null) { // have never logged a retry
+      return;
+    }
+    var message = operationDescription + " completed after " + (retriesDone + 
1)
+        + " retries and is no longer retrying.";
+    if (hasLoggedWarn) {
+      log.info(message);
+    } else {
+      log.debug(message);
     }
   }
 
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java
 
b/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java
index 6f20ec6b30..514ed22081 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java
@@ -31,6 +31,7 @@ import java.util.function.Supplier;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.util.Halt;
+import org.apache.accumulo.core.util.Timer;
 import org.apache.accumulo.server.ServerContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,7 +41,7 @@ public class LowMemoryDetector {
   private static class LowMemDetectorState {
     private long lastMemorySize = 0;
     private int lowMemCount = 0;
-    private long lastMemoryCheckTime = 0;
+    private Timer lastMemoryCheckTimer = null;
     private boolean runningLowOnMemory = false;
   }
 
@@ -101,7 +102,6 @@ public class LowMemoryDetector {
     memCheckTimeLock.lock();
     try {
       LowMemDetectorState localState = state.get();
-      final long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
 
       List<GarbageCollectorMXBean> gcmBeans = 
ManagementFactory.getGarbageCollectorMXBeans();
 
@@ -171,15 +171,16 @@ public class LowMemoryDetector {
       }
 
       final long keepAliveTimeout = 
conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT);
-      if (localState.lastMemoryCheckTime > 0 && localState.lastMemoryCheckTime 
< now) {
-        final long diff = now - localState.lastMemoryCheckTime;
-        if (diff > keepAliveTimeout + 1000) {
+      if (localState.lastMemoryCheckTimer != null) {
+        if (localState.lastMemoryCheckTimer.hasElapsed(keepAliveTimeout + 1000,
+            TimeUnit.MILLISECONDS)) {
+          final long diff = 
localState.lastMemoryCheckTimer.elapsed(TimeUnit.MILLISECONDS);
           LOG.warn(String.format(
               "GC pause checker not called in a timely"
                   + " fashion. Expected every %.1f seconds but was %.1f 
seconds since last check",
               keepAliveTimeout / 1000., diff / 1000.));
         }
-        localState.lastMemoryCheckTime = now;
+        localState.lastMemoryCheckTimer.restart();
         return;
       }
 
@@ -188,7 +189,7 @@ public class LowMemoryDetector {
       }
 
       localState.lastMemorySize = freeMemory;
-      localState.lastMemoryCheckTime = now;
+      localState.lastMemoryCheckTimer = Timer.startNew();
     } finally {
       memCheckTimeLock.unlock();
     }
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java 
b/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java
index 4148cfb0ed..7810d137c3 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java
@@ -18,9 +18,10 @@
  */
 package org.apache.accumulo.server.rpc;
 
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
 import org.apache.accumulo.core.metrics.MetricsInfo;
+import org.apache.accumulo.core.util.Timer;
 import org.apache.accumulo.server.metrics.ThriftMetrics;
 import org.apache.thrift.TException;
 import org.apache.thrift.TProcessor;
@@ -33,25 +34,25 @@ public class TimedProcessor implements TProcessor {
 
   private final TProcessor other;
   private final ThriftMetrics thriftMetrics;
-  private long idleStart;
+  private final Timer idleTimer;
 
   public TimedProcessor(final TProcessor next, final MetricsInfo metricsInfo) {
     this.other = next;
     thriftMetrics = new ThriftMetrics();
     metricsInfo.addMetricsProducers(thriftMetrics);
-    idleStart = System.nanoTime();
+    idleTimer = Timer.startNew();
   }
 
   @Override
   public void process(TProtocol in, TProtocol out) throws TException {
-    long processStart = System.nanoTime();
-    thriftMetrics.addIdle(NANOSECONDS.toMillis(processStart - idleStart));
+    thriftMetrics.addIdle(idleTimer.elapsed(MILLISECONDS));
+    Timer processTimer = Timer.startNew();
     try {
       other.process(in, out);
     } finally {
-      // set idle to now, calc time in process
-      idleStart = System.nanoTime();
-      thriftMetrics.addExecute(NANOSECONDS.toMillis(idleStart - processStart));
+      // calc time in process, restart idle timer
+      thriftMetrics.addExecute(processTimer.elapsed(MILLISECONDS));
+      idleTimer.restart();
     }
   }
 }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 1db20cf550..e6d61f44e0 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.tserver.tablet;
 
 import static 
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.concurrent.TimeUnit.MINUTES;
 import static java.util.stream.Collectors.toList;
 import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
 
@@ -86,6 +87,7 @@ import 
org.apache.accumulo.core.tabletserver.thrift.TabletStats;
 import org.apache.accumulo.core.trace.TraceUtil;
 import org.apache.accumulo.core.util.Halt;
 import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.Timer;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.server.compaction.CompactionStats;
 import org.apache.accumulo.server.fs.VolumeManager;
@@ -928,7 +930,7 @@ public class Tablet extends TabletBase {
         return currentlyUnreserved;
       });
 
-      long lastLogTime = System.nanoTime();
+      Timer lastLogTimer = Timer.startNew();
 
       // wait for reads and writes to complete
       while (writesInProgress > 0 || !runningScans.isEmpty()) {
@@ -940,13 +942,12 @@ public class Tablet extends TabletBase {
           return currentlyUnreserved;
         });
 
-        if (log.isDebugEnabled()
-            && System.nanoTime() - lastLogTime > TimeUnit.SECONDS.toNanos(60)) 
{
+        if (log.isDebugEnabled() && lastLogTimer.hasElapsed(1, MINUTES)) {
           for (ScanDataSource activeScan : runningScans) {
             log.debug("Waiting on scan in completeClose {} {}", extent, 
activeScan);
           }
 
-          lastLogTime = System.nanoTime();
+          lastLogTimer.restart();
         }
 
         try {
@@ -1587,7 +1588,7 @@ public class Tablet extends TabletBase {
       } catch (IOException ioe) {
         log.warn("Tablet {} failed to rename {} after MinC, will retry in 60 
secs...", getExtent(),
             newDatafile, ioe);
-        sleepUninterruptibly(1, TimeUnit.MINUTES);
+        sleepUninterruptibly(1, MINUTES);
       }
     } while (true);
 

Reply via email to