This is an automated email from the ASF dual-hosted git repository.
domgarguilo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 15e90019d0 Improve timekeeping code (#6041)
15e90019d0 is described below
commit 15e90019d06f5a7704f593436e6f067d897c2daa
Author: Dom G. <[email protected]>
AuthorDate: Fri Jan 9 10:07:00 2026 -0500
Improve timekeeping code (#6041)
* refactors elapsed-time tracking code to use Timer and/or Duration instead
of raw System.nanoTime for improved safety (harder to make mistakes with Timer
vs nanoTime) and stronger types (Duration vs long)
---
.../core/clientImpl/ThriftTransportPool.java | 21 +++++++-----
.../apache/accumulo/core/zookeeper/ZooSession.java | 8 ++---
.../miniclusterImpl/MiniAccumuloClusterImpl.java | 7 ++--
.../server/conf/ServerConfigurationFactory.java | 7 ++--
.../server/conf/store/impl/ReadyMonitorTest.java | 8 ++---
.../apache/accumulo/gc/SimpleGarbageCollector.java | 17 +++++-----
.../apache/accumulo/gc/metrics/GcCycleMetrics.java | 21 ++++++------
.../org/apache/accumulo/gc/metrics/GcMetrics.java | 5 +--
.../java/org/apache/accumulo/manager/Manager.java | 10 +++---
.../accumulo/manager/tableOps/compact/CleanUp.java | 12 +++----
.../accumulo/tserver/TabletClientHandler.java | 12 +++----
.../org/apache/accumulo/tserver/log/DfsLogger.java | 18 +++++-----
.../org/apache/accumulo/tserver/tablet/Tablet.java | 8 ++---
.../apache/accumulo/tserver/tablet/TabletBase.java | 19 +++++------
.../main/java/org/apache/accumulo/shell/Shell.java | 16 ++++-----
.../accumulo/test/functional/CompactionIT.java | 9 ++---
.../apache/accumulo/test/functional/ScannerIT.java | 38 +++++++++++++---------
.../org/apache/accumulo/test/util/SlowOps.java | 28 ++++++++--------
.../java/org/apache/accumulo/test/util/Wait.java | 25 +++++++++++---
19 files changed, 156 insertions(+), 133 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java
index df1174d022..b426fec23b 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java
@@ -22,6 +22,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
+import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
@@ -44,6 +45,7 @@ import java.util.function.Supplier;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.thrift.TConfiguration;
import org.apache.thrift.transport.TTransport;
@@ -52,6 +54,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Comparators;
import com.google.common.net.HostAndPort;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
@@ -84,18 +87,20 @@ public class ThriftTransportPool {
private Runnable thriftConnectionPoolChecker() {
return () -> {
try {
- final long minNanos = MILLISECONDS.toNanos(250);
- final long maxNanos = MINUTES.toNanos(1);
- long lastRun = System.nanoTime();
+ final Duration minInterval = Duration.ofMillis(250);
+ final Duration maxInterval = Duration.ofMinutes(1);
+ Timer lastRunTimer = Timer.startNew();
// loop often, to detect shutdowns quickly
while (!connectionPool.awaitShutdown(250)) {
// don't close on every loop; instead, check based on configured max
age, within bounds
- var threshold = Math.min(maxNanos,
- Math.max(minNanos,
MILLISECONDS.toNanos(maxAgeMillis.getAsLong()) / 2));
- long currentNanos = System.nanoTime();
- if ((currentNanos - lastRun) >= threshold) {
+
+ Duration threshold = Duration.ofMillis(maxAgeMillis.getAsLong() / 2);
+ threshold = Comparators.max(threshold, minInterval);
+ threshold = Comparators.min(maxInterval, threshold);
+
+ if (lastRunTimer.hasElapsed(threshold)) {
closeExpiredConnections();
- lastRun = currentNanos;
+ lastRunTimer.restart();
}
}
} catch (InterruptedException e) {
diff --git
a/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java
b/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java
index 41b501a9ae..ce66cc6a4a 100644
--- a/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java
+++ b/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java
@@ -20,7 +20,7 @@ package org.apache.accumulo.core.zookeeper;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
import java.io.IOException;
@@ -38,6 +38,7 @@ import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.fate.zookeeper.ZooReader;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.util.AddressUtil;
+import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.zookeeper.AddWatchMode;
import org.apache.zookeeper.AsyncCallback.StringCallback;
@@ -199,7 +200,7 @@ public class ZooSession implements AutoCloseable {
boolean tryAgain = true;
long sleepTime = 100;
- long startTime = System.nanoTime();
+ Timer timer = Timer.startNew();
ZooKeeper zk = null;
@@ -235,8 +236,7 @@ public class ZooSession implements AutoCloseable {
}
}
- long stopTime = System.nanoTime();
- long duration = NANOSECONDS.toMillis(stopTime - startTime);
+ long duration = timer.elapsed(MILLISECONDS);
if (duration > 2L * timeout) {
throw new IllegalStateException("Failed to connect to zookeeper (" +
connectString
diff --git
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
index 3560dffdce..ab496e0694 100644
---
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
+++
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
@@ -21,7 +21,6 @@ package org.apache.accumulo.miniclusterImpl;
import static
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.stream.Collectors.toList;
import java.io.BufferedWriter;
@@ -34,6 +33,7 @@ import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -96,6 +96,7 @@ import
org.apache.accumulo.core.spi.compaction.CompactionPlanner;
import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.ConfigurationImpl;
+import org.apache.accumulo.core.util.CountDownTimer;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.compaction.CompactionPlannerInitParams;
import org.apache.accumulo.core.util.compaction.CompactionServicesConfig;
@@ -804,9 +805,9 @@ public class MiniAccumuloClusterImpl implements
AccumuloCluster {
// wait up to 10 seconds for the process to start
private static void waitForProcessStart(Process p, String name) throws
InterruptedException {
- long start = System.nanoTime();
+ CountDownTimer maxWaitTimer =
CountDownTimer.startNew(Duration.ofSeconds(10));
while (p.info().startInstant().isEmpty()) {
- if (NANOSECONDS.toSeconds(System.nanoTime() - start) > 10) {
+ if (maxWaitTimer.isExpired()) {
throw new IllegalStateException(
"Error starting " + name + " - instance not started within 10
seconds");
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java
b/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java
index ea3523fd07..8c9e7c3cf9 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java
@@ -19,8 +19,8 @@
package org.apache.accumulo.server.conf;
import static com.google.common.base.Suppliers.memoize;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
@@ -37,6 +37,7 @@ import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.ResourceGroupId;
import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.core.util.cache.Caches;
import org.apache.accumulo.core.util.cache.Caches.CacheName;
import org.apache.accumulo.core.util.threads.ThreadPools;
@@ -229,7 +230,7 @@ public class ServerConfigurationFactory extends
ServerConfiguration {
*/
private void verifySnapshotVersions() {
- long refreshStart = System.nanoTime();
+ Timer refreshTimer = Timer.startNew();
int keyCount = 0;
int keyChangedCount = 0;
@@ -270,7 +271,7 @@ public class ServerConfigurationFactory extends
ServerConfiguration {
}
log.debug("data version sync: Total runtime {} ms for {} entries,
changes detected: {}",
- NANOSECONDS.toMillis(System.nanoTime() - refreshStart), keyCount,
keyChangedCount);
+ refreshTimer.elapsed(MILLISECONDS), keyCount, keyChangedCount);
}
/**
diff --git
a/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/ReadyMonitorTest.java
b/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/ReadyMonitorTest.java
index 25a94c839c..e6cd9a6bb4 100644
---
a/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/ReadyMonitorTest.java
+++
b/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/ReadyMonitorTest.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
+import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -168,13 +169,12 @@ public class ReadyMonitorTest {
public Long call() throws Exception {
// signal ready to run
readyToRunLatch.countDown();
- // time waiting for isReady to complete++
- long start = System.nanoTime();
+
+ Timer isReadyTimer = Timer.startNew();
readyMonitor.isReady();
finishedLatch.countDown();
- // returning nanoseconds.
- return System.nanoTime() - start;
+ return isReadyTimer.elapsed(NANOSECONDS);
}
}
}
diff --git
a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index bc5edfa168..6a5a548ba1 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -24,6 +24,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.UnknownHostException;
+import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -216,7 +217,7 @@ public class SimpleGarbageCollector extends AbstractServer
implements Iface {
try (Scope outerScope = outerSpan.makeCurrent()) {
Span innerSpan = TraceUtil.startSpan(this.getClass(), "loop");
try (Scope innerScope = innerSpan.makeCurrent()) {
- final long tStart = System.nanoTime();
+ final Timer timer = Timer.startNew();
try {
System.gc(); // make room
@@ -250,9 +251,8 @@ public class SimpleGarbageCollector extends AbstractServer
implements Iface {
status.current = new GcCycleStats();
}
- final long tStop = System.nanoTime();
log.info(String.format("Collect cycle took %.2f seconds",
- (TimeUnit.NANOSECONDS.toMillis(tStop - tStart) / 1000.0)));
+ timer.elapsed(MILLISECONDS) / 1000.0));
// Clean up any unused write-ahead logs
Span walSpan = TraceUtil.startSpan(this.getClass(), "walogs");
@@ -279,7 +279,7 @@ public class SimpleGarbageCollector extends AbstractServer
implements Iface {
try {
AccumuloClient accumuloClient = getContext();
- final long actionStart = System.nanoTime();
+ final Timer actionTimer = Timer.startNew();
String action =
getConfiguration().get(Property.GC_USE_FULL_COMPACTION);
log.debug("gc post action {} started", action);
@@ -301,12 +301,11 @@ public class SimpleGarbageCollector extends
AbstractServer implements Iface {
log.trace("'none - no action' or invalid value provided: {}",
action);
}
- final long actionComplete = System.nanoTime();
+ final Duration actionDuration = actionTimer.elapsed();
+ gcCycleMetrics.setPostOpDuration(actionDuration);
- gcCycleMetrics.setPostOpDurationNanos(actionComplete -
actionStart);
-
- log.info("gc post action {} completed in {} seconds", action,
String.format("%.2f",
- (TimeUnit.NANOSECONDS.toMillis(actionComplete - actionStart) /
1000.0)));
+ log.info("gc post action {} completed in {} seconds", action,
+ String.format("%.2f", actionDuration.toMillis() / 1000.0));
} catch (Exception e) {
TraceUtil.setException(outerSpan, e, false);
diff --git
a/server/gc/src/main/java/org/apache/accumulo/gc/metrics/GcCycleMetrics.java
b/server/gc/src/main/java/org/apache/accumulo/gc/metrics/GcCycleMetrics.java
index 20ac00322a..deaf8a1176 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/metrics/GcCycleMetrics.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/metrics/GcCycleMetrics.java
@@ -18,6 +18,7 @@
*/
package org.apache.accumulo.gc.metrics;
+import java.time.Duration;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -34,7 +35,7 @@ public class GcCycleMetrics {
private final AtomicReference<GcCycleStats> lastWalCollect =
new AtomicReference<>(new GcCycleStats());
- private final AtomicLong postOpDurationNanos = new AtomicLong(0);
+ private final AtomicReference<Duration> postOpDuration = new
AtomicReference<>(Duration.ZERO);
private final AtomicLong runCycleCount = new AtomicLong(0);
public GcCycleMetrics() {}
@@ -77,21 +78,21 @@ public class GcCycleMetrics {
}
/**
- * Duration of post operation (compact, flush, none) in nanoseconds.
+ * Duration of post operation (compact, flush, none).
*
- * @return duration in nanoseconds.
+ * @return duration.
*/
- long getPostOpDurationNanos() {
- return postOpDurationNanos.get();
+ Duration getPostOpDuration() {
+ return postOpDuration.get();
}
/**
- * Set the duration of post operation (compact, flush, none) in nanoseconds.
+ * Set the duration of post operation (compact, flush, none).
*
- * @param postOpDurationNanos the duration, in nanoseconds.
+ * @param postOpDuration the duration.
*/
- public void setPostOpDurationNanos(long postOpDurationNanos) {
- this.postOpDurationNanos.set(postOpDurationNanos);
+ public void setPostOpDuration(Duration postOpDuration) {
+ this.postOpDuration.set(postOpDuration);
}
/**
@@ -115,7 +116,7 @@ public class GcCycleMetrics {
final StringBuilder sb = new StringBuilder("GcMetricsValues{");
sb.append("lastCollect=").append(lastCollect.get());
sb.append(", lastWalCollect=").append(lastWalCollect.get());
- sb.append(", postOpDuration=").append(postOpDurationNanos.get());
+ sb.append(", postOpDuration=").append(postOpDuration.get().toNanos());
sb.append('}');
return sb.toString();
}
diff --git
a/server/gc/src/main/java/org/apache/accumulo/gc/metrics/GcMetrics.java
b/server/gc/src/main/java/org/apache/accumulo/gc/metrics/GcMetrics.java
index 699ac82567..43248efd3b 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/metrics/GcMetrics.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/metrics/GcMetrics.java
@@ -33,8 +33,6 @@ import static
org.apache.accumulo.core.metrics.Metric.GC_WAL_FINISHED;
import static org.apache.accumulo.core.metrics.Metric.GC_WAL_IN_USE;
import static org.apache.accumulo.core.metrics.Metric.GC_WAL_STARTED;
-import java.util.concurrent.TimeUnit;
-
import org.apache.accumulo.core.metrics.MetricsProducer;
import org.apache.accumulo.gc.SimpleGarbageCollector;
@@ -81,8 +79,7 @@ public class GcMetrics implements MetricsProducer {
Gauge.builder(GC_WAL_ERRORS.getName(), metricValues, v ->
v.getLastWalCollect().getErrors())
.description(GC_WAL_ERRORS.getDescription()).register(registry);
Gauge
- .builder(GC_POST_OP_DURATION.getName(), metricValues,
- v -> TimeUnit.NANOSECONDS.toMillis(v.getPostOpDurationNanos()))
+ .builder(GC_POST_OP_DURATION.getName(), metricValues, v ->
v.getPostOpDuration().toMillis())
.description(GC_POST_OP_DURATION.getDescription()).register(registry);
Gauge.builder(GC_RUN_CYCLE.getName(), metricValues,
GcCycleMetrics::getRunCycleCount)
.description(GC_RUN_CYCLE.getDescription()).register(registry);
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 894e5ead94..99c9feee57 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -24,7 +24,6 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Collections.emptySortedMap;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.IMPORT_TABLE_RENAME_POOL;
@@ -1289,7 +1288,7 @@ public class Manager extends AbstractServer implements
LiveTServerSet.Listener,
* @throws InterruptedException if interrupted while blocking, propagated
for caller to handle.
*/
private void blockForTservers() throws InterruptedException {
- long waitStart = System.nanoTime();
+ Timer waitTimer = Timer.startNew();
long minTserverCount =
getConfiguration().getCount(Property.MANAGER_STARTUP_TSERVER_AVAIL_MIN_COUNT);
@@ -1341,8 +1340,7 @@ public class Manager extends AbstractServer implements
LiveTServerSet.Listener,
if (needTservers) {
tserverRetry.logRetry(log, String.format(
"Blocking for tserver availability - need to reach %s servers.
Have %s Time spent blocking %s seconds.",
- minTserverCount, tserverSet.size(),
- NANOSECONDS.toSeconds(System.nanoTime() - waitStart)));
+ minTserverCount, tserverSet.size(), waitTimer.elapsed(SECONDS)));
}
tserverRetry.useRetry();
}
@@ -1351,13 +1349,13 @@ public class Manager extends AbstractServer implements
LiveTServerSet.Listener,
log.warn(
"tserver availability check time expired - continuing. Requested {},
have {} tservers on line. "
+ " Time waiting {} sec",
- tserverSet.size(), minTserverCount,
NANOSECONDS.toSeconds(System.nanoTime() - waitStart));
+ tserverSet.size(), minTserverCount, waitTimer.elapsed(SECONDS));
} else {
log.info(
"tserver availability check completed. Requested {}, have {}
tservers on line. "
+ " Time waiting {} sec",
- tserverSet.size(), minTserverCount,
NANOSECONDS.toSeconds(System.nanoTime() - waitStart));
+ tserverSet.size(), minTserverCount, waitTimer.elapsed(SECONDS));
}
}
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CleanUp.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CleanUp.java
index 9b283fe773..d7ce8f7882 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CleanUp.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CleanUp.java
@@ -22,7 +22,7 @@ import static
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType
import static
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
import static
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.USER_COMPACTION_REQUESTED;
-import java.time.Duration;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
@@ -34,6 +34,7 @@ import org.apache.accumulo.core.fate.zookeeper.LockRange;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.manager.tableOps.AbstractFateOperation;
import org.apache.accumulo.manager.tableOps.FateEnv;
import org.apache.accumulo.manager.tableOps.Utils;
@@ -72,8 +73,7 @@ public class CleanUp extends AbstractFateOperation {
}
};
- long t1;
- long t2;
+ long scanTime;
long submitted = 0;
long total = 0;
@@ -82,7 +82,7 @@ public class CleanUp extends AbstractFateOperation {
.fetch(PREV_ROW, COMPACTED,
USER_COMPACTION_REQUESTED).checkConsistency().build();
var tabletsMutator = ample.conditionallyMutateTablets(resultConsumer))
{
- t1 = System.nanoTime();
+ Timer timer = Timer.startNew();
for (TabletMetadata tablet : tablets) {
total++;
if (tablet.getCompacted().contains(fateId)
@@ -101,11 +101,9 @@ public class CleanUp extends AbstractFateOperation {
}
}
- t2 = System.nanoTime();
+ scanTime = timer.elapsed(TimeUnit.MILLISECONDS);
}
- long scanTime = Duration.ofNanos(t2 - t1).toMillis();
-
log.debug("{} removed {} of {} compacted markers for {} tablets in {}ms",
fateId,
submitted - rejectedCount.get(), submitted, total, scanTime);
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
index 97ea23793c..e2ece2a752 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
@@ -92,6 +92,7 @@ import
org.apache.accumulo.core.tabletserver.thrift.TabletStats;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.ByteBufferUtil;
import org.apache.accumulo.core.util.Halt;
+import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.core.util.time.SteadyTime;
import org.apache.accumulo.server.ServerContext;
@@ -741,17 +742,16 @@ public class TabletClientHandler implements
TabletServerClientService.Iface,
ConditionalMutationSet.deferDuplicatesRows(updates, deferred);
// get as many locks as possible w/o blocking... defer any rows that are
locked
- long lt1 = System.nanoTime();
+ Timer timer = Timer.startNew();
List<RowLock> locks = rowLocks.acquireRowlocks(updates, deferred);
- long lt2 = System.nanoTime();
- updateAverageLockTime(lt2 - lt1, TimeUnit.NANOSECONDS, numMutations);
+ updateAverageLockTime(timer.elapsed(TimeUnit.NANOSECONDS),
TimeUnit.NANOSECONDS, numMutations);
try {
Span span = TraceUtil.startSpan(this.getClass(),
"conditionalUpdate::Check conditions");
try (Scope scope = span.makeCurrent()) {
- long t1 = System.nanoTime();
+ timer.restart();
checkConditions(updates, results, cs, symbols);
- long t2 = System.nanoTime();
- updateAverageCheckTime(t2 - t1, TimeUnit.NANOSECONDS, numMutations);
+ updateAverageCheckTime(timer.elapsed(TimeUnit.NANOSECONDS),
TimeUnit.NANOSECONDS,
+ numMutations);
} catch (Exception e) {
TraceUtil.setException(span, e, true);
throw e;
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index 7c0c4a0f49..bb1d868231 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -21,7 +21,6 @@ package org.apache.accumulo.tserver.log;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Collections.singletonList;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_FINISH;
import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_START;
import static org.apache.accumulo.tserver.logger.LogEvents.DEFINE_TABLET;
@@ -35,6 +34,7 @@ import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.ClosedChannelException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -61,6 +61,7 @@ import org.apache.accumulo.core.spi.crypto.FileEncrypter;
import org.apache.accumulo.core.spi.fs.VolumeChooserEnvironment;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.fs.VolumeChooserEnvironmentImpl;
@@ -133,12 +134,12 @@ public final class DfsLogger implements
Comparable<DfsLogger> {
private final AtomicLong syncCounter;
private final AtomicLong flushCounter;
- private final long slowFlushMillis;
+ private final Duration slowFlushDuration;
- LogSyncingTask(AtomicLong syncCounter, AtomicLong flushCounter, long
slowFlushMillis) {
+ LogSyncingTask(AtomicLong syncCounter, AtomicLong flushCounter, Duration
slowFlushDuration) {
this.syncCounter = syncCounter;
this.flushCounter = flushCounter;
- this.slowFlushMillis = slowFlushMillis;
+ this.slowFlushDuration = slowFlushDuration;
}
@Override
@@ -174,7 +175,7 @@ public final class DfsLogger implements
Comparable<DfsLogger> {
}
}
- long start = System.nanoTime();
+ Timer timer = Timer.startNew();
try {
if (shouldHSync.isPresent()) {
if (shouldHSync.orElseThrow()) {
@@ -188,9 +189,8 @@ public final class DfsLogger implements
Comparable<DfsLogger> {
} catch (IOException | RuntimeException ex) {
fail(work, ex, "synching");
}
- long duration = System.nanoTime() - start;
- if (duration > MILLISECONDS.toNanos(slowFlushMillis)) {
- log.info("Slow sync cost: {} ms, current pipeline: {}",
NANOSECONDS.toMillis(duration),
+ if (timer.hasElapsed(slowFlushDuration)) {
+ log.info("Slow sync cost: {} ms, current pipeline: {}",
timer.elapsed(MILLISECONDS),
Arrays.toString(getPipeLine()));
if (expectedReplication > 0) {
int current = expectedReplication;
@@ -474,7 +474,7 @@ public final class DfsLogger implements
Comparable<DfsLogger> {
}
syncThread = Threads.createCriticalThread("Accumulo WALog thread " + this,
- new LogSyncingTask(syncCounter, flushCounter, slowFlushMillis));
+ new LogSyncingTask(syncCounter, flushCounter,
Duration.ofMillis(slowFlushMillis)));
syncThread.start();
op.await();
log.debug("Got new write-ahead log: {}", this);
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index e6d61f44e0..0c82143eb2 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -168,7 +168,7 @@ public class Tablet extends TabletBase {
OPEN, REQUESTED, CLOSING, CLOSED, COMPLETE
}
- private long closeRequestTime = 0;
+ private Timer closeRequestTimer = null;
private volatile CloseState closeState = CloseState.OPEN;
private boolean updatingFlushID = false;
@@ -796,11 +796,11 @@ public class Tablet extends TabletBase {
synchronized (this) {
if (closeState == CloseState.OPEN) {
- closeRequestTime = System.nanoTime();
+ closeRequestTimer = Timer.startNew();
closeState = CloseState.REQUESTED;
} else {
- Preconditions.checkState(closeRequestTime != 0);
- long runningTime = Duration.ofNanos(System.nanoTime() -
closeRequestTime).toMinutes();
+ Preconditions.checkState(closeRequestTimer != null);
+ long runningTime = closeRequestTimer.elapsed(MINUTES);
if (runningTime >= 15) {
CLOSING_STUCK_LOGGER.info(
"Tablet {} close requested again, but has been closing for {}
minutes", this.extent,
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java
index 1130553cbd..a696b517bc 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java
@@ -21,13 +21,13 @@ package org.apache.accumulo.tserver.tablet;
import static java.util.Objects.requireNonNull;
import java.io.IOException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
@@ -52,6 +52,7 @@ import org.apache.accumulo.core.spi.cache.CacheType;
import org.apache.accumulo.core.trace.ScanInstrumentation;
import org.apache.accumulo.core.trace.TraceAttributes;
import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.util.CountDownTimer;
import org.apache.accumulo.core.util.LocalityGroupUtil;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.ShutdownUtil;
@@ -298,12 +299,11 @@ public abstract class TabletBase {
long batchTimeOut = scanParams.getBatchTimeOut();
- long timeToRun = TimeUnit.MILLISECONDS.toNanos(batchTimeOut);
- long startNanos = System.nanoTime();
-
if (batchTimeOut == Long.MAX_VALUE || batchTimeOut <= 0) {
batchTimeOut = 0;
}
+
+ CountDownTimer runTimer =
CountDownTimer.startNew(Duration.ofMillis(batchTimeOut));
List<KVEntry> results = new ArrayList<>();
Key key = null;
@@ -342,7 +342,7 @@ public abstract class TabletBase {
resultSize += kvEntry.estimateMemoryUsed();
resultBytes += kvEntry.numBytes();
- boolean timesUp = batchTimeOut > 0 && (System.nanoTime() - startNanos)
>= timeToRun;
+ boolean timesUp = batchTimeOut > 0 && runTimer.isExpired();
boolean runningLowOnMemory =
context.getLowMemoryDetector().isRunningLowOnMemory(context,
DetectionScope.SCAN, () -> {
@@ -417,13 +417,12 @@ public abstract class TabletBase {
long batchTimeOut = scanParams.getBatchTimeOut();
- long timeToRun = TimeUnit.MILLISECONDS.toNanos(batchTimeOut);
- long startNanos = System.nanoTime();
-
if (batchTimeOut <= 0 || batchTimeOut == Long.MAX_VALUE) {
batchTimeOut = 0;
}
+ CountDownTimer runTimer =
CountDownTimer.startNew(Duration.ofMillis(batchTimeOut));
+
// determine if the iterator supported yielding
YieldCallback<Key> yield = new YieldCallback<>();
mmfi.enableYielding(yield);
@@ -431,7 +430,7 @@ public abstract class TabletBase {
for (Range range : ranges) {
- boolean timesUp = batchTimeOut > 0 && (System.nanoTime() - startNanos) >
timeToRun;
+ boolean timesUp = batchTimeOut > 0 && runTimer.isExpired();
boolean runningLowOnMemory =
context.getLowMemoryDetector().isRunningLowOnMemory(context,
DetectionScope.SCAN, () -> {
@@ -469,7 +468,7 @@ public abstract class TabletBase {
exceededMemoryUsage = lookupResult.bytesAdded > maxResultsSize;
- timesUp = batchTimeOut > 0 && (System.nanoTime() - startNanos) >
timeToRun;
+ timesUp = batchTimeOut > 0 && runTimer.isExpired();
runningLowOnMemory =
context.getLowMemoryDetector().isRunningLowOnMemory(context,
DetectionScope.SCAN, () -> {
diff --git a/shell/src/main/java/org/apache/accumulo/shell/Shell.java
b/shell/src/main/java/org/apache/accumulo/shell/Shell.java
index 18ab478206..18a469b390 100644
--- a/shell/src/main/java/org/apache/accumulo/shell/Shell.java
+++ b/shell/src/main/java/org/apache/accumulo/shell/Shell.java
@@ -31,6 +31,7 @@ import java.io.UncheckedIOException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -45,7 +46,6 @@ import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
-import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Stream;
@@ -73,6 +73,7 @@ import
org.apache.accumulo.core.spi.common.ContextClassLoaderFactory.ContextClas
import
org.apache.accumulo.core.tabletingest.thrift.ConstraintViolationException;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.BadArgumentException;
+import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.core.util.format.DefaultFormatter;
import org.apache.accumulo.core.util.format.Formatter;
import org.apache.accumulo.core.util.format.FormatterConfig;
@@ -242,8 +243,8 @@ public class Shell extends ShellOptions implements
KeywordExecutable {
private boolean canPaginate = false;
private boolean tabCompletion;
private boolean disableAuthTimeout;
- private long authTimeout;
- private long lastUserActivity = System.nanoTime();
+ private Duration authTimeout;
+ private final Timer lastUserActivity = Timer.startNew();
private boolean logErrorsToConsole = false;
private boolean askAgain = false;
private boolean usedClientProps = false;
@@ -346,7 +347,7 @@ public class Shell extends ShellOptions implements
KeywordExecutable {
return false;
}
- authTimeout = TimeUnit.MINUTES.toNanos(options.getAuthTimeout());
+ authTimeout = Duration.ofMinutes(options.getAuthTimeout());
disableAuthTimeout = options.isAuthTimeoutDisabled();
clientProperties = options.getClientProperties();
@@ -666,7 +667,7 @@ public class Shell extends ShellOptions implements
KeywordExecutable {
sb.append("- Authorization timeout: disabled\n");
} else {
sb.append("- Authorization timeout: ")
- .append(String.format("%ds%n",
TimeUnit.NANOSECONDS.toSeconds(authTimeout)));
+ .append(String.format("%ds%n", authTimeout.toSeconds()));
}
if (!scanIteratorOptions.isEmpty()) {
for (Entry<String,List<IteratorSetting>> entry :
scanIteratorOptions.entrySet()) {
@@ -744,9 +745,8 @@ public class Shell extends ShellOptions implements
KeywordExecutable {
return;
}
- long duration = System.nanoTime() - lastUserActivity;
if (!(sc instanceof ExitCommand) && !ignoreAuthTimeout
- && (duration < 0 || duration > authTimeout)) {
+ && lastUserActivity.hasElapsed(authTimeout)) {
writer.println("Shell has been idle for too long. Please
re-authenticate.");
boolean authFailed = true;
do {
@@ -769,7 +769,7 @@ public class Shell extends ShellOptions implements
KeywordExecutable {
}
}
} while (authFailed);
- lastUserActivity = System.nanoTime();
+ lastUserActivity.restart();
}
// Get the options from the command on how to parse the string
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
index 20187c78d1..20a99852e6 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
@@ -99,6 +99,7 @@ import org.apache.accumulo.core.spi.compaction.CompactionPlan;
import org.apache.accumulo.core.spi.compaction.CompactionPlanner;
import org.apache.accumulo.core.spi.compaction.RatioBasedCompactionPlanner;
import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher;
+import org.apache.accumulo.core.util.CountDownTimer;
import org.apache.accumulo.core.util.compaction.CompactionJobImpl;
import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
import org.apache.accumulo.minicluster.ServerType;
@@ -504,8 +505,8 @@ public class CompactionIT extends CompactionITBase {
// This speed bump is an attempt to increase the chance that splits and
compactions run
// concurrently. Wait.waitFor() is not used here because it will throw
an exception if the
// time limit is exceeded.
- long startTime = System.nanoTime();
- while (System.nanoTime() - startTime < SECONDS.toNanos(3)
+ CountDownTimer waitTimer =
CountDownTimer.startNew(Duration.ofSeconds(3));
+ while (!waitTimer.isExpired()
&& countTablets(tableName, tabletMetadata ->
tabletMetadata.getSelectedFiles() != null)
== 0) {
Thread.sleep(10);
@@ -523,8 +524,8 @@ public class CompactionIT extends CompactionITBase {
// before this so do not wait long. Wait.waitFor() is not used here
because it will throw an
// exception if the time limit is exceeded. This is just a speed bump,
its ok if the condition
// is not met within the time limit.
- startTime = System.nanoTime();
- while (System.nanoTime() - startTime < SECONDS.toNanos(3)
+ waitTimer.restart();
+ while (!waitTimer.isExpired()
&& countTablets(tableName, tabletMetadata ->
!tabletMetadata.getCompacted().isEmpty())
== 0) {
Thread.sleep(10);
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
index 59483d6427..44b29b84a3 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
@@ -44,6 +44,7 @@ import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.test.CloseScannerIT;
import org.apache.accumulo.test.util.Wait;
@@ -74,7 +75,7 @@ public class ScannerIT extends ConfigurableMacBase {
IteratorSetting cfg;
Iterator<Entry<Key,Value>> iterator;
- long nanosWithWait = 0;
+ Duration durationWithWait = Duration.ZERO;
try (Scanner s = c.createScanner(table, new Authorizations())) {
cfg = new IteratorSetting(100, SlowIterator.class);
@@ -88,19 +89,22 @@ public class ScannerIT extends ConfigurableMacBase {
s.setRange(new Range());
iterator = s.iterator();
- long startTime = System.nanoTime();
- while (iterator.hasNext()) {
- nanosWithWait += System.nanoTime() - startTime;
+ Timer hasNextTimer = Timer.startNew();
+ while (true) {
+ hasNextTimer.restart();
+ boolean hasNext = iterator.hasNext();
+ durationWithWait = durationWithWait.plus(hasNextTimer.elapsed());
+ if (!hasNext) {
+ break;
+ }
// While we "do work" in the client, we should be fetching the next
result
Thread.sleep(100L);
iterator.next();
- startTime = System.nanoTime();
}
- nanosWithWait += System.nanoTime() - startTime;
}
- long nanosWithNoWait = 0;
+ Duration durationWithNoWait = Duration.ZERO;
try (Scanner s = c.createScanner(table, new Authorizations())) {
s.addScanIterator(cfg);
s.setRange(new Range());
@@ -108,21 +112,25 @@ public class ScannerIT extends ConfigurableMacBase {
s.setReadaheadThreshold(0L);
iterator = s.iterator();
- long startTime = System.nanoTime();
- while (iterator.hasNext()) {
- nanosWithNoWait += System.nanoTime() - startTime;
+ Timer hasNextTimer = Timer.startNew();
+ while (true) {
+ hasNextTimer.restart();
+ boolean hasNext = iterator.hasNext();
+ durationWithNoWait = durationWithNoWait.plus(hasNextTimer.elapsed());
+ if (!hasNext) {
+ break;
+ }
// While we "do work" in the client, we should be fetching the next
result
Thread.sleep(100L);
iterator.next();
- startTime = System.nanoTime();
}
- nanosWithNoWait += System.nanoTime() - startTime;
// The "no-wait" time should be much less than the "wait-time"
- assertTrue(nanosWithNoWait < nanosWithWait,
- "Expected less time to be taken with immediate readahead (" +
nanosWithNoWait
- + ") than without immediate readahead (" + nanosWithWait +
")");
+ assertTrue(durationWithNoWait.compareTo(durationWithWait) < 0,
+ "Expected less time to be taken with immediate readahead ("
+ + durationWithNoWait.toNanos() + ") than without immediate
readahead ("
+ + durationWithWait.toNanos() + ")");
}
}
}
diff --git a/test/src/main/java/org/apache/accumulo/test/util/SlowOps.java
b/test/src/main/java/org/apache/accumulo/test/util/SlowOps.java
index fb48e21fe3..8e7a6c14b6 100644
--- a/test/src/main/java/org/apache/accumulo/test/util/SlowOps.java
+++ b/test/src/main/java/org/apache/accumulo/test/util/SlowOps.java
@@ -19,9 +19,9 @@
package org.apache.accumulo.test.util;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -46,6 +46,7 @@ import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
import org.apache.accumulo.test.functional.SlowIterator;
import org.apache.hadoop.io.Text;
@@ -65,7 +66,7 @@ public class SlowOps {
private final AccumuloClient client;
private final String tableName;
- private final long maxWaitMillis;
+ private final Duration maxWait;
// private final int numRows = DEFAULT_NUM_DATA_ROWS;
@@ -76,7 +77,7 @@ public class SlowOps {
public SlowOps(final AccumuloClient client, final String tableName, final
long maxWaitMillis) {
this.client = client;
this.tableName = tableName;
- this.maxWaitMillis = maxWaitMillis;
+ this.maxWait = Duration.ofMillis(maxWaitMillis);
createData();
}
@@ -105,10 +106,9 @@ public class SlowOps {
}
private void verifyRows() {
- long startTimestamp = System.nanoTime();
+ Timer timer = Timer.startNew();
int count = scanCount();
- log.trace("Scan time for {} rows {} ms", NUM_DATA_ROWS,
- NANOSECONDS.toMillis(System.nanoTime() - startTimestamp));
+ log.trace("Scan time for {} rows {} ms", NUM_DATA_ROWS,
timer.elapsed(MILLISECONDS));
if (count != NUM_DATA_ROWS) {
throw new IllegalStateException(
String.format("Number of rows %1$d does not match expected %2$d",
count, NUM_DATA_ROWS));
@@ -152,7 +152,7 @@ public class SlowOps {
@Override
public void run() {
- long startTimestamp = System.nanoTime();
+ Timer timer = Timer.startNew();
IteratorSetting slow = new IteratorSetting(30, "slow",
SlowIterator.class);
SlowIterator.setSleepTime(slow, SLOW_SCAN_SLEEP_MS);
@@ -188,18 +188,18 @@ public class SlowOps {
log.debug("Compaction wait is complete");
log.trace("Slow compaction of {} rows took {} ms", NUM_DATA_ROWS,
- NANOSECONDS.toMillis(System.nanoTime() - startTimestamp));
+ timer.elapsed(MILLISECONDS));
// validate that number of rows matches expected.
- startTimestamp = System.nanoTime();
+ timer.restart();
// validate expected data created and exists in table.
int count = scanCount();
log.trace("After compaction, scan time for {} rows {} ms", NUM_DATA_ROWS,
- NANOSECONDS.toMillis(System.nanoTime() - startTimestamp));
+ timer.elapsed(MILLISECONDS));
if (count != NUM_DATA_ROWS) {
throw new IllegalStateException(
@@ -215,8 +215,7 @@ public class SlowOps {
* @return true if compaction and associate fate found.
*/
private boolean blockUntilCompactionRunning() {
- long startWaitNanos = System.nanoTime();
- long maxWaitNanos = MILLISECONDS.toNanos(maxWaitMillis);
+ Timer timer = Timer.startNew();
/*
* wait for compaction to start on table - The compaction will acquire a
fate transaction lock
@@ -238,10 +237,9 @@ public class SlowOps {
} catch (InterruptedException ex) {
throw new IllegalStateException("interrupted during sleep", ex);
}
- } while ((System.nanoTime() - startWaitNanos) < maxWaitNanos);
+ } while (!timer.hasElapsed(maxWait));
- log.debug("Could not find compaction for {} after {} seconds", tableName,
- MILLISECONDS.toSeconds(maxWaitMillis));
+ log.debug("Could not find compaction for {} after {} seconds", tableName,
maxWait.toSeconds());
return false;
}
diff --git a/test/src/main/java/org/apache/accumulo/test/util/Wait.java
b/test/src/main/java/org/apache/accumulo/test/util/Wait.java
index d73bd52866..85ae2f5c9d 100644
--- a/test/src/main/java/org/apache/accumulo/test/util/Wait.java
+++ b/test/src/main/java/org/apache/accumulo/test/util/Wait.java
@@ -23,6 +23,8 @@ import static java.util.concurrent.TimeUnit.SECONDS;
import java.util.function.ToIntFunction;
+import org.apache.accumulo.core.util.CountDownTimer;
+
public class Wait {
public static final long MAX_WAIT_MILLIS = SECONDS.toMillis(30);
@@ -103,14 +105,15 @@ public class Wait {
final String failMessage) {
final int timeoutFactor = getTimeoutFactor(e -> 1); // default to factor
of 1
- final long scaledDurationNanos = MILLISECONDS.toNanos(duration) *
timeoutFactor;
- final long scaledSleepMillis = sleepMillis * timeoutFactor;
- final long startNanos = System.nanoTime();
+ final long scaledWaitMillis = multiplyClampToRange(duration,
timeoutFactor);
+ final long scaledSleepMillis = multiplyClampToRange(sleepMillis,
timeoutFactor);
+
+ final CountDownTimer maxWaitTimer =
CountDownTimer.startNew(scaledWaitMillis, MILLISECONDS);
boolean success;
try {
success = condition.isSatisfied();
- while (!success && System.nanoTime() - startNanos < scaledDurationNanos)
{
+ while (!success && !maxWaitTimer.isExpired()) {
MILLISECONDS.sleep(scaledSleepMillis);
success = condition.isSatisfied();
}
@@ -125,4 +128,18 @@ public class Wait {
throw new IllegalStateException(failMessage + ". Timeout exceeded");
}
}
+
+ /**
+ * @return the product of the inputs, clamped to the range [0,
Long.MAX_VALUE]
+ */
+ private static long multiplyClampToRange(long value, int factor) {
+ if (value <= 0 || factor <= 0) {
+ return 0;
+ }
+ try {
+ return Math.multiplyExact(value, factor);
+ } catch (ArithmeticException e) {
+ return Long.MAX_VALUE;
+ }
+ }
}