This is an automated email from the ASF dual-hosted git repository.

kenhuuu pushed a commit to branch tx-idle-to
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit 986ab97a1b88d849e09d158e095726b8c92fe575
Author: Ken Hu <[email protected]>
AuthorDate: Wed Jun 24 19:39:11 2026 -0700

    Suspend the HTTP transaction idle timer while an operation is running
    
    The idle timeout was armed on request arrival rather than when the
    transaction went idle, so a single operation running longer than the timeout
    tripped it mid-execution, contradicting the documented promise that active
    transactions are unaffected. A long operation should be bounded by
    evaluationTimeout; the idle timer should only reclaim abandoned 
transactions.
    
    The per-transaction executor is now a ThreadPoolExecutor(1,1) whose
    before/afterExecute hooks suspend the idle timer while work runs and re-arm 
it
    only once the worker parks with an empty queue. This gives a reliable
    running-vs-idle signal without wrapping submitted tasks, which would break 
the
    evaluation-timeout interrupt that relies on cancelling the real FutureTask.
    
    transactionTimeout is renamed to idleTransactionTimeout to reflect its 
actual
    meaning (renamed outright as the feature is unreleased), and now honors 0 as
    "disabled" to match its documentation.
    
    Assisted-by: Claude Code:claude-opus-4-8
---
 docs/src/dev/provider/index.asciidoc               |  10 +-
 docs/src/reference/gremlin-applications.asciidoc   |  10 +-
 .../apache/tinkerpop/gremlin/server/Settings.java  |   9 +-
 .../server/transaction/TransactionManager.java     |  14 +-
 .../server/transaction/UnmanagedTransaction.java   | 142 +++++++---
 .../gremlin/server/util/ServerGremlinExecutor.java |   2 +-
 .../GremlinDriverTransactionIntegrateTest.java     |   4 +-
 .../GremlinServerHttpTransactionIntegrateTest.java |  53 +++-
 .../transaction/UnmanagedTransactionTest.java      | 291 +++++++++++++++++++++
 .../util/ManualScheduledExecutorService.java       | 279 ++++++++++++++++++++
 10 files changed, 744 insertions(+), 70 deletions(-)

diff --git a/docs/src/dev/provider/index.asciidoc 
b/docs/src/dev/provider/index.asciidoc
index a09c67b915..1f6ed71945 100644
--- a/docs/src/dev/provider/index.asciidoc
+++ b/docs/src/dev/provider/index.asciidoc
@@ -1430,14 +1430,14 @@ rejects it with HTTP 400. This prevents cross-graph 
operations within a single t
 
 ==== Transaction Timeout and Idle Reclamation
 
-Servers implement a configurable transaction timeout (`transactionTimeout`, 
default 600000ms). The timeout represents
-how long a transaction can sit idle with no requests before the server 
forcibly rolls it back and removes it. The
-timeout resets on each request received for that transaction, so active 
transactions are not affected. After a timeout
-fires, any subsequent request with that transaction ID receives a 404 response.
+Servers implement a configurable transaction timeout 
(`idleTransactionTimeout`, default 600000ms). The timeout
+represents how long a transaction can sit idle with no requests before the 
server forcibly rolls it back and removes it.
+The timeout resets on each request received for that transaction, so active 
transactions are not affected. After a
+timeout fires, any subsequent request with that transaction ID receives a 404 
response.
 
 ==== Transaction Capacity Limits
 
-Servers enforce a configurable maximum number of concurrent open transactions 
(`maxConcurrentTransactions`, default
+Servers may enforce a configurable maximum number of concurrent open 
transactions (`maxConcurrentTransactions`, default
 1000). When the limit is reached, new begin requests are rejected with HTTP 
503. Slots are freed when transactions
 close via commit, rollback, or timeout.
 
diff --git a/docs/src/reference/gremlin-applications.asciidoc 
b/docs/src/reference/gremlin-applications.asciidoc
index 6494972cdf..3b1989ab2f 100644
--- a/docs/src/reference/gremlin-applications.asciidoc
+++ b/docs/src/reference/gremlin-applications.asciidoc
@@ -930,6 +930,7 @@ The following table describes the various YAML 
configuration options that Gremli
 |gremlinPool |The number of "Gremlin" threads available to execute actual 
scripts in a `ScriptEngine`. This pool represents the workers available to 
handle blocking operations in Gremlin Server. When set to `0`, Gremlin Server 
will use the value provided by `Runtime.availableProcessors()`. |0
 |host |The name of the host to bind the server to. |localhost
 |idleConnectionTimeout |Time in milliseconds that the server will allow a 
channel to not receive requests from a client before it automatically closes. 
If enabled, the value provided should typically exceed the amount of time given 
to `keepAliveInterval`. Note that while this value is to be provided as 
milliseconds it will resolve to second precision. Set this value to `0` to 
disable this feature. |0
+|idleTransactionTimeout |Time in milliseconds that a transaction can sit idle 
(no requests) before the server forcibly rolls it back and removes it. The 
timeout resets on each request received for that transaction. Set to `0` to 
disable this feature. |600000
 |keepAliveInterval |Time in milliseconds that the server will allow a channel 
to not send responses to a client before it sends a "ping" to see if it is 
still present. If it is present, the client should respond with a "pong" which 
will thus reset the `idleConnectionTimeout` and keep the channel open. If 
enabled, this number should be smaller than the value provided to the 
`idleConnectionTimeout`. Note that while this value is to be provided as 
milliseconds it will resolve to second prec [...]
 |maxAccumulationBufferComponents |Maximum number of request components that 
can be aggregated for a message. |1024
 |maxChunkSize |The maximum length of the content or each chunk.  If the 
content length exceeds this value, the transfer encoding of the decoded request 
will be converted to 'chunked' and the content will be split into multiple 
`HttpContent` objects.  If the transfer encoding of the HTTP request is 
'chunked' already, each chunk will be split into smaller chunks if the length 
of the chunk exceeds this value. |8192
@@ -983,7 +984,6 @@ The following table describes the various YAML 
configuration options that Gremli
 |strictTransactionManagement |Set to `true` to require `aliases` to be 
submitted on every requests, where the `aliases` become the scope of 
transaction management. |false
 |threadPoolBoss |The number of threads available to Gremlin Server for 
accepting connections. Should always be set to `1`. |1
 |threadPoolWorker |The number of threads available to Gremlin Server for 
processing non-blocking reads and writes. |1
-|transactionTimeout |Time in milliseconds that a transaction can sit idle (no 
requests) before the server forcibly rolls it back and removes it. The timeout 
resets on each request received for that transaction. Set to `0` to disable 
this feature. |600000
 |useEpollEventLoop |Try to use epoll event loops (works only on Linux os) 
instead of netty NIO. |false
 |writeBufferHighWaterMark | If the number of bytes in the network send buffer 
exceeds this value then the channel is no longer writeable, accepting no 
additional writes until buffer is drained and the `writeBufferLowWaterMark` is 
met. |65536
 |writeBufferLowWaterMark | Once the number of bytes queued in the network send 
buffer exceeds the `writeBufferHighWaterMark`, the channel will not become 
writeable again until the buffer is drained and it drops below this value. 
|32768
@@ -2261,14 +2261,14 @@ an explicit transaction on a graph that does not 
support them will result in an
 
 Two settings in the Gremlin Server YAML control transaction resource usage:
 
-* `transactionTimeout` (default: 600000ms) -- How long a transaction can sit 
idle before the server forcibly rolls it
-  back. The timeout resets on each request received for that transaction.
+* `idleTransactionTimeout` (default: 600000ms) -- How long a transaction can 
sit idle before the server forcibly rolls
+   it back. The timeout resets on each request received for that transaction.
 * `maxConcurrentTransactions` (default: 1000) -- The maximum number of open 
transactions allowed. When the limit is
   reached, new begin requests are rejected with HTTP 503.
 
 Each open transaction consumes a dedicated thread on the server to maintain 
thread-local transaction state for the
-underlying graph. Ensure that clients close transactions promptly and that the 
`transactionTimeout` is set to reclaim
-abandoned ones. The `transactions` gauge metric can be used to monitor usage.
+underlying graph. Ensure that clients close transactions promptly and that the 
`idleTransactionTimeout` is set to
+reclaim abandoned ones. The `transactions` gauge metric can be used to monitor 
usage.
 
 In load-balanced deployments, all requests within a transaction must reach the 
same server instance because
 transaction state is local to the server that created it. The 
`X-Transaction-Id` header is available for load
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java
index 8d6111ef27..236a4b4431 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java
@@ -185,10 +185,13 @@ public class Settings {
     public boolean strictTransactionManagement = false;
 
     /**
-     * Time in milliseconds that a transaction can remain idle before it is 
automatically rolled back.
-     * This prevents resource leaks from abandoned transactions. Default is 
600000 (10 minutes).
+     * Time in milliseconds that a transaction can remain idle (no operation 
running or queued) before it is
+     * automatically rolled back. This prevents resource leaks from abandoned 
transactions. The idle timer is suspended
+     * while an operation is in progress, so a long-running operation does not 
trip it (its duration is instead bounded
+     * by {@link #evaluationTimeout}). Set to {@code 0} to disable idle 
reclamation entirely. Default is 600000
+     * (10 minutes).
      */
-    public long transactionTimeout = 600000L;
+    public long idleTransactionTimeout = 600000L;
 
     /**
      * Time in milliseconds to wait for a transaction commit or rollback 
operation to complete.
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/TransactionManager.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/TransactionManager.java
index 6901664c15..ec203e820e 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/TransactionManager.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/TransactionManager.java
@@ -43,7 +43,7 @@ public class TransactionManager {
     private final ConcurrentMap<String, UnmanagedTransaction> transactions = 
new ConcurrentHashMap<>();
     private final ScheduledExecutorService scheduledExecutorService;
     private final GraphManager graphManager;
-    private final long transactionTimeoutMs;
+    private final long idleTransactionTimeoutMs;
     private final int maxConcurrentTransactions;
     private final long perGraphCloseMs;
 
@@ -52,23 +52,23 @@ public class TransactionManager {
      *
      * @param scheduledExecutorService Scheduler for timeout management
      * @param graphManager The graph manager for accessing traversal sources
-     * @param transactionTimeoutMs Timeout in milliseconds before auto-rollback
+     * @param idleTransactionTimeoutMs Inactivity timeout in milliseconds 
before auto-rollback; {@code 0} disables it
      * @param maxConcurrentTransactions Maximum number of concurrent 
transactions allowed
      */
     public TransactionManager(final ScheduledExecutorService 
scheduledExecutorService,
                               final GraphManager graphManager,
-                              final long transactionTimeoutMs,
+                              final long idleTransactionTimeoutMs,
                               final int maxConcurrentTransactions,
                               final long perGraphCloseMs) {
         this.scheduledExecutorService = scheduledExecutorService;
         this.graphManager = graphManager;
-        this.transactionTimeoutMs = transactionTimeoutMs;
+        this.idleTransactionTimeoutMs = idleTransactionTimeoutMs;
         this.maxConcurrentTransactions = maxConcurrentTransactions;
         this.perGraphCloseMs = perGraphCloseMs;
 
         MetricManager.INSTANCE.getGauge(transactions::size, 
name(GremlinServer.class, "transactions"));
-        logger.info("TransactionManager initialized with timeout={}ms, 
maxTransactions={}",
-                transactionTimeoutMs, maxConcurrentTransactions);
+        logger.info("TransactionManager initialized with 
idleTransactionTimeout={}ms, maxTransactions={}",
+                idleTransactionTimeoutMs, maxConcurrentTransactions);
     }
 
     /**
@@ -123,7 +123,7 @@ public class TransactionManager {
                     traversalSourceName,
                     graph,
                     scheduledExecutorService,
-                    transactionTimeoutMs,
+                    idleTransactionTimeoutMs,
                     perGraphCloseMs
             );
         } while (transactions.putIfAbsent(txId, ctx) != null);
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransaction.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransaction.java
index 3dfc394204..c462dfa41b 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransaction.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransaction.java
@@ -23,12 +23,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Optional;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -42,6 +43,12 @@ import java.util.concurrent.atomic.AtomicReference;
  * the complete request lifecycle (graph operation, error handling, response 
writing),
  * following the same pattern as the non-transactional HTTP path and the legacy
  * {@code SessionOpProcessor}.
+ * <p>
+ * The single-threaded executor is a {@link SingleThreadTransactionExecutor} 
(a {@code ThreadPoolExecutor} with one
+ * core/max thread) rather than {@link 
java.util.concurrent.Executors#newSingleThreadExecutor}. It is behaviorally
+ * identical for task execution but exposes the {@code beforeExecute}/{@code 
afterExecute} lifecycle hooks and the task
+ * queue, which the idle-timer management relies on to tell "an operation is 
running" apart from "the worker is idle
+ * with an empty queue". The {@code Executors} factory hides those behind a 
sealed wrapper.
  */
 public class UnmanagedTransaction {
     private static final Logger logger = 
LoggerFactory.getLogger(UnmanagedTransaction.class);
@@ -51,16 +58,16 @@ public class UnmanagedTransaction {
     private final TransactionManager manager;
     private final Graph graph;
     private final ScheduledExecutorService scheduledExecutorService;
-    private final long timeout;
+    private final long idleTimeout;
     private final long perGraphClose;
-    private final AtomicReference<ScheduledFuture<?>> timeoutFuture = new 
AtomicReference<>();
+    private final AtomicReference<ScheduledFuture<?>> idleFuture = new 
AtomicReference<>();
     // Controls whether the executor is still accepting tasks.
     private final AtomicBoolean accepting = new AtomicBoolean(true);
     /**
      * Single-threaded executor ensures all operations for this transaction 
run on
      * the same thread, preserving the ThreadLocal nature of Graph 
transactions.
      */
-    private final ExecutorService executor;
+    private final SingleThreadTransactionExecutor executor;
 
     /**
      * Creates a new {@code UnmanagedTransaction} for managing an HTTP 
transaction.
@@ -70,14 +77,14 @@ public class UnmanagedTransaction {
      * @param traversalSourceName The traversal source name bound at begin time
      * @param graph The graph instance for this transaction
      * @param scheduledExecutorService Scheduler for timeout management
-     * @param transactionTimeout Timeout in milliseconds before auto-rollback
+     * @param idleTransactionTimeout Inactivity timeout in milliseconds before 
auto-rollback; {@code 0} disables it
      */
     public UnmanagedTransaction(final String transactionId,
                                 final TransactionManager transactionManager,
                                 final String traversalSourceName,
                                 final Graph graph,
                                 final ScheduledExecutorService 
scheduledExecutorService,
-                                final long transactionTimeout,
+                                final long idleTransactionTimeout,
                                 final long perGraphClose) {
         logger.debug("New transaction context established for {}", 
transactionId);
         this.transactionId = transactionId;
@@ -85,12 +92,15 @@ public class UnmanagedTransaction {
         this.manager = transactionManager;
         this.graph = graph;
         this.scheduledExecutorService = scheduledExecutorService;
-        this.timeout = transactionTimeout;
+        this.idleTimeout = idleTransactionTimeout;
         this.perGraphClose = perGraphClose;
 
-        // Create single-threaded executor with named thread for debugging
-        this.executor = Executors.newSingleThreadExecutor(
-            r -> new Thread(r, "tx-" + transactionId.substring(0, Math.min(8, 
transactionId.length()))));
+        // Create single-threaded executor with named thread for debugging. A 
ThreadPoolExecutor(1,1) is used (rather
+        // than Executors.newSingleThreadExecutor) so the before/afterExecute 
hooks and the task queue are accessible
+        // for idle-timer management; see SingleThreadTransactionExecutor.
+        final ThreadFactory threadFactory =
+            r -> new Thread(r, "tx-" + transactionId.substring(0, Math.min(8, 
transactionId.length())));
+        this.executor = new SingleThreadTransactionExecutor(threadFactory);
     }
 
     /**
@@ -107,36 +117,6 @@ public class UnmanagedTransaction {
         return traversalSourceName;
     }
 
-    /**
-     * Resets the timeout for this transaction. Called on each request.
-     */
-    public void touch() {
-        timeoutFuture.updateAndGet(future -> {
-            if (future != null) future.cancel(false);
-            return scheduledExecutorService.schedule(() -> {
-                logger.info("Transaction {} timed out after {} ms of 
inactivity", transactionId, timeout);
-                close(false);
-            }, timeout, TimeUnit.MILLISECONDS);
-        });
-    }
-
-    /**
-     * Opens the underlying graph transaction and starts the inactivity 
timeout.
-     * Should be called on the transaction's single-threaded executor to 
preserve
-     * ThreadLocal affinity. On failure the exception is re-thrown and the 
caller
-     * is responsible for cleanup (e.g. via {@link #close(boolean)}).
-     */
-    public void open() {
-        try {
-            graph.tx().open();
-            touch();
-            logger.debug("Transaction {} opened", transactionId);
-        } catch (Exception e) {
-            logger.warn("Failed to begin transaction {}: {}", transactionId, 
e.getMessage());
-            throw e;
-        }
-    }
-
     /**
      * Closes this transaction and releases its resources. When {@code force} 
is {@code false},
      * any open graph transaction is rolled back before shutdown. When {@code 
force} is {@code true},
@@ -183,7 +163,7 @@ public class UnmanagedTransaction {
         // reorder these two statements.
         manager.destroy(transactionId);
         executor.shutdown();
-        Optional.ofNullable(timeoutFuture.get()).ifPresent(f -> 
f.cancel(true));
+        Optional.ofNullable(idleFuture.get()).ifPresent(f -> f.cancel(true));
         logger.debug("Transaction {} closed", transactionId);
     }
 
@@ -199,7 +179,83 @@ public class UnmanagedTransaction {
     public Future<?> submit(final FutureTask<Void> task) {
         if (!accepting.get()) throw new IllegalStateException("Transaction " + 
transactionId + " is closed");
 
-        touch();
+        // Insurance backstop: cancel (do NOT arm) the idle timer on submit. 
Arming is the executor's job, done in
+        // afterExecute once the worker parks with an empty queue. 
beforeExecute will also cancel when the task starts;
+        // cancelling here too closes the small window between accepting a 
task and the worker picking it up.
+        cancelIdleTimer();
         return executor.submit(task);
     }
+
+    /**
+     * Suspends the inactivity timer because an operation is running (or about 
to run) on the transaction thread.
+     * Invoked from {@link SingleThreadTransactionExecutor#beforeExecute} and, 
as a backstop, from {@link #submit}.
+     * <p>
+     * A long-running operation must not trip the idle timeout: while an 
operation is in progress the idle timer is
+     * simply not armed (the operation's own duration is bounded by the 
per-request {@code evaluationTimeout} instead).
+     */
+    private void cancelIdleTimer() {
+        idleFuture.updateAndGet(future -> {
+            if (future != null) future.cancel(false);
+            return null;
+        });
+    }
+
+    /**
+     * (Re)arms the inactivity timer, but only when the transaction is 
genuinely idle. Invoked from
+     * {@link SingleThreadTransactionExecutor#afterExecute} once an operation 
has finished and the worker is about to
+     * look for more work.
+     * <p>
+     * "Idle" means: still {@link #accepting} new work (not closing), the 
executor queue is empty (no sibling request is
+     * already waiting — on a single thread there is a brief instant between 
one task finishing and the next starting),
+     * and the idle timeout is enabled ({@code idleTimeout > 0}; {@code 0} 
disables idle reclamation entirely). When all
+     * hold, a fresh {@code close(false)} is scheduled {@code idleTimeout} ms 
out, replacing any previously scheduled one.
+     */
+    private void maybeScheduleIdleTimer() {
+        if (!accepting.get()) return;            // closing/closed: never 
re-arm a dying transaction
+        if (idleTimeout <= 0) return;            // 0 (or negative) disables 
idle reclamation
+        if (!executor.getQueue().isEmpty()) return; // a sibling task is 
already queued -> not idle yet
+
+        idleFuture.updateAndGet(future -> {
+            if (future != null) future.cancel(false);
+            return scheduledExecutorService.schedule(() -> {
+                logger.info("Transaction {} timed out after {} ms of 
inactivity", transactionId, idleTimeout);
+                close(false);
+            }, idleTimeout, TimeUnit.MILLISECONDS);
+        });
+
+        // The accepting check above and the arm below are not atomic: a 
concurrent close() could have flipped
+        // accepting=false and cancelled idleFuture in between, leaving the 
timer we just armed orphaned (it would fire
+        // ~idleTimeout later and call close() on an already-gone 
transaction). Re-check after arming and cancel if so,
+        // so the "never re-arm a dying transaction" invariant actually holds.
+        if (!accepting.get()) cancelIdleTimer();
+    }
+
+    /**
+     * A single-threaded {@link ThreadPoolExecutor} (one core and max thread) 
that runs all operations for a single
+     * transaction on the same worker thread, preserving the ThreadLocal 
nature of graph transactions.
+     * <p>
+     * It is used in place of {@link 
java.util.concurrent.Executors#newSingleThreadExecutor} solely to expose the
+     * {@link #beforeExecute}/{@link #afterExecute} lifecycle hooks (and, via 
{@link #getQueue()}, the pending-task
+     * queue), which the enclosing {@link UnmanagedTransaction} needs to 
distinguish "an operation is running" from
+     * "the worker is idle with nothing queued". Task-execution semantics are 
otherwise identical to a single-thread
+     * executor: one worker, FIFO ordering. Submitted {@link FutureTask}s are 
returned unwrapped so callers can
+     * {@code cancel(true)} the real work (e.g. the per-request evaluation 
timeout interrupting a running operation).
+     */
+    private final class SingleThreadTransactionExecutor extends 
ThreadPoolExecutor {
+        private SingleThreadTransactionExecutor(final ThreadFactory 
threadFactory) {
+            super(1, 1, 0L, TimeUnit.MILLISECONDS, new 
LinkedBlockingQueue<>(), threadFactory);
+        }
+
+        @Override
+        protected void beforeExecute(final Thread t, final Runnable r) {
+            super.beforeExecute(t, r);
+            cancelIdleTimer();
+        }
+
+        @Override
+        protected void afterExecute(final Runnable r, final Throwable t) {
+            super.afterExecute(r, t);
+            maybeScheduleIdleTimer();
+        }
+    }
 }
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java
index 3995337566..ab6be093e2 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java
@@ -306,7 +306,7 @@ public class ServerGremlinExecutor {
         transactionManager = new TransactionManager(
                 scheduledExecutorService,
                 graphManager,
-                settings.transactionTimeout,
+                settings.idleTransactionTimeout,
                 settings.maxConcurrentTransactions,
                 settings.perGraphCloseTimeout
         );
diff --git 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverTransactionIntegrateTest.java
 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverTransactionIntegrateTest.java
index 69f387b17e..8a6c4700cd 100644
--- 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverTransactionIntegrateTest.java
+++ 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverTransactionIntegrateTest.java
@@ -85,10 +85,10 @@ public class GremlinDriverTransactionIntegrateTest extends 
AbstractGremlinServer
             case "shouldTimeoutIdleTransaction":
             case "shouldTimeoutIdleTransactionWithNoOperations":
             case "shouldRejectLateCommitAfterTimeout":
-                settings.transactionTimeout = 1000;
+                settings.idleTransactionTimeout = 1000;
                 break;
             case "shouldTimeoutOnlyIdleTransactionNotActiveOne":
-                settings.transactionTimeout = 2000;
+                settings.idleTransactionTimeout = 2000;
                 break;
         }
         return settings;
diff --git 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpTransactionIntegrateTest.java
 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpTransactionIntegrateTest.java
index 95895e66b4..c9a05b82ab 100644
--- 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpTransactionIntegrateTest.java
+++ 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpTransactionIntegrateTest.java
@@ -93,17 +93,21 @@ public class GremlinServerHttpTransactionIntegrateTest 
extends AbstractGremlinSe
                 break;
             case "shouldTimeoutFreeSlotUnderMaxConcurrentTransactions":
                 settings.maxConcurrentTransactions = 1;
-                settings.transactionTimeout = 1000;
+                settings.idleTransactionTimeout = 1000;
                 break;
             case "shouldTimeoutIdleTransactionWithNoOperations":
-                settings.transactionTimeout = 500;
+                settings.idleTransactionTimeout = 500;
                 break;
             case "shouldTimeoutAndRejectLateCommit":
             case "shouldTrackTransactionCountAccurately":
-                settings.transactionTimeout = 1000;
+                settings.idleTransactionTimeout = 1000;
                 break;
             case "shouldRollbackAbandonedTransaction":
-                settings.transactionTimeout = 300;
+                settings.idleTransactionTimeout = 300;
+                break;
+            case "shouldNotIdleTimeoutLongRunningOperation":
+                // Short idle timeout, but a single long operation must NOT 
trip it (idle suspended while busy).
+                settings.idleTransactionTimeout = 500;
                 break;
             case "shouldRejectMismatchedGraphAliasInTransaction": {
                 final Settings.GraphSettings gs = new Settings.GraphSettings();
@@ -528,6 +532,47 @@ public class GremlinServerHttpTransactionIntegrateTest 
extends AbstractGremlinSe
         }
     }
 
+    @Test
+    public void shouldNotIdleTimeoutLongRunningOperation() throws Exception {
+        // With a short idle timeout (500ms), a single operation that runs 
LONGER than the idle timeout must still
+        // succeed -- the idle timer is suspended while an operation is in 
progress, so a long-running op is not
+        // reclaimed mid-execution (it is instead bounded by 
evaluationTimeout, left at its default here).
+        final String txId = beginTx(client, GTX);
+
+        // Seed two vertices and an edge so repeat(both()) has something to 
traverse and keeps the executor busy. Each
+        // response body is fully consumed before the next request is sent: 
the chunked stream is only complete once the
+        // server has finished processing, so consuming guarantees these 
requests are strictly ordered.
+        try (final CloseableHttpResponse r = submitInTx(client, txId, 
"g.addV().property(T.id, 1)", GTX)) {
+            assertEquals(200, r.getStatusLine().getStatusCode());
+            EntityUtils.consume(r.getEntity());
+        }
+        try (final CloseableHttpResponse r = submitInTx(client, txId, 
"g.addV().property(T.id, 2)", GTX)) {
+            assertEquals(200, r.getStatusLine().getStatusCode());
+            EntityUtils.consume(r.getEntity());
+        }
+        try (final CloseableHttpResponse r = submitInTx(client, txId, 
"g.V(1).addE('self').to(__.V(2))", GTX)) {
+            assertEquals(200, r.getStatusLine().getStatusCode());
+            EntityUtils.consume(r.getEntity());
+        }
+
+        // A traversal that runs well past the 500ms idle timeout. Under the 
old arm-on-arrival behavior the idle timer
+        // would have fired mid-execution; under suspend-while-busy it does 
not.
+        try (final CloseableHttpResponse r = submitInTx(client, txId, 
"g.V().repeat(both()).times(2000)", GTX)) {
+            assertEquals(200, r.getStatusLine().getStatusCode());
+            EntityUtils.consume(r.getEntity());
+        }
+
+        // The transaction is still alive and usable after the long op (it was 
not reclaimed mid-flight).
+        try (final CloseableHttpResponse r = submitInTx(client, txId, 
"g.V().count()", GTX)) {
+            assertEquals(200, r.getStatusLine().getStatusCode());
+            assertEquals(2, extractCount(r)); // extractCount fully reads the 
body
+        }
+        try (final CloseableHttpResponse r = commitTx(client, txId, GTX)) {
+            assertEquals(200, r.getStatusLine().getStatusCode());
+            EntityUtils.consume(r.getEntity());
+        }
+    }
+
     @Test
     public void shouldRejectMismatchedGraphAliasInTransaction() throws 
Exception {
         final String txId = beginTx(client, GTX);
diff --git 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransactionTest.java
 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransactionTest.java
new file mode 100644
index 0000000000..81a8a65949
--- /dev/null
+++ 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransactionTest.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.transaction;
+
+import org.apache.tinkerpop.gremlin.server.util.ManualScheduledExecutorService;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.Transaction;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for {@link UnmanagedTransaction}, driven by a deterministic 
{@link ManualScheduledExecutorService} so the
+ * inactivity-timeout behaviour can be asserted without real wall-clock waits.
+ * <p>
+ * These are <em>specification</em> tests for the reworked idle timer 
(suspend-while-busy): the idle timer is armed only
+ * when the transaction goes idle (no operation running, empty queue) and is 
suspended while an operation runs. The idle
+ * timer is (re)armed from the executor's {@code afterExecute} hook, which 
runs on the transaction worker thread, so
+ * timer assertions poll the scheduler with a bounded wait via {@link 
#awaitPendingTimer(boolean)}.
+ */
+public class UnmanagedTransactionTest {
+
+    private static final String TX_ID = "test-tx-0001";
+    private static final long TIMEOUT_MS = 600000L;
+    private static final long PER_GRAPH_CLOSE_MS = 10000L;
+    private static final long AWAIT_MS = 5000L;
+
+    private TransactionManager manager;
+    private Graph graph;
+    private ManualScheduledExecutorService scheduler;
+    private UnmanagedTransaction tx;
+
+    @Before
+    public void setUp() {
+        manager = mock(TransactionManager.class);
+        graph = mock(Graph.class);
+        final Transaction graphTx = mock(Transaction.class);
+        when(graph.tx()).thenReturn(graphTx);
+        when(graphTx.isOpen()).thenReturn(false); // rollback path is a no-op 
during close(false)
+
+        scheduler = new ManualScheduledExecutorService();
+        tx = new UnmanagedTransaction(TX_ID, manager, "g", graph, scheduler, 
TIMEOUT_MS, PER_GRAPH_CLOSE_MS);
+
+        // close() short-circuits unless the manager still knows about the 
transaction.
+        when(manager.get(TX_ID)).thenReturn(Optional.of(tx));
+    }
+
+    /**
+     * Submits a no-op task and blocks until it has finished running on the 
worker thread.
+     */
+    private void runOp() throws Exception {
+        tx.submit(new FutureTask<>(() -> null)).get(AWAIT_MS, 
TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Waits (bounded) for the idle timer to reach the expected 
armed/not-armed state, since it is (re)armed on the
+     * worker thread from afterExecute slightly after the submitted task's 
Future completes. Returns once the condition
+     * holds or the wait elapses; the caller asserts on the final state.
+     */
+    private void awaitPendingTimer(final boolean expectArmed) throws 
InterruptedException {
+        final long deadline = System.nanoTime() + 
TimeUnit.MILLISECONDS.toNanos(AWAIT_MS);
+        while (System.nanoTime() < deadline) {
+            if ((scheduler.getPendingTaskCount() == 1) == expectArmed) return;
+            Thread.sleep(5);
+        }
+    }
+
+    @Test
+    public void shouldNotScheduleAnyCloseAtConstruction() {
+        assertEquals(0, scheduler.getScheduledTaskCount());
+        assertEquals(0, scheduler.getPendingTaskCount());
+    }
+
+    @Test
+    public void shouldArmIdleTimerWhenWorkerGoesIdleAfterAnOperation() throws 
Exception {
+        runOp();
+
+        awaitPendingTimer(true);
+        assertEquals("idle timer should be armed once the worker parks with an 
empty queue",
+                1, scheduler.getPendingTaskCount());
+        assertEquals(TIMEOUT_MS, scheduler.nextPendingDelayMillis());
+    }
+
+    @Test
+    public void shouldNotArmIdleTimerWhileAnOperationIsRunning() throws 
Exception {
+        // Hold an operation "running" and assert no idle timer is armed 
during that window.
+        final CountDownLatch started = new CountDownLatch(1);
+        final CountDownLatch release = new CountDownLatch(1);
+        final Future<?> running = tx.submit(new FutureTask<>(() -> {
+            started.countDown();
+            release.await();
+            return null;
+        }));
+
+        assertTrue(started.await(AWAIT_MS, MILLISECONDS));
+        // While the op runs, the idle timer must not be armed (a long op must 
not trip the idle timeout).
+        assertEquals(0, scheduler.getPendingTaskCount());
+
+        release.countDown();
+        running.get(AWAIT_MS, MILLISECONDS);
+
+        // Once the worker goes idle, the timer arms.
+        awaitPendingTimer(true);
+        assertEquals(1, scheduler.getPendingTaskCount());
+    }
+
+    @Test
+    public void shouldNotFireIdleCloseForALongRunningOperation() throws 
Exception {
+        // A single operation that runs longer than the idle timeout must not 
be reclaimed mid-execution: no timer is
+        // armed while it runs, so advancing the clock far past the timeout 
fires nothing.
+        final CountDownLatch started = new CountDownLatch(1);
+        final CountDownLatch release = new CountDownLatch(1);
+        final Future<?> running = tx.submit(new FutureTask<>(() -> {
+            started.countDown();
+            release.await();
+            return null;
+        }));
+        assertTrue(started.await(AWAIT_MS, MILLISECONDS));
+
+        scheduler.advanceTimeBy(TIMEOUT_MS * 2, MILLISECONDS);
+
+        verify(manager, never()).destroy(TX_ID);
+        release.countDown();
+        running.get(AWAIT_MS, MILLISECONDS);
+    }
+
+    @Test
+    public void shouldCloseTransactionWhenIdleTimeoutFires() throws Exception {
+        runOp();
+        awaitPendingTimer(true);
+
+        scheduler.advanceTimeBy(TIMEOUT_MS, MILLISECONDS);
+
+        // The scheduled close(false) removes the transaction from the manager.
+        verify(manager).destroy(TX_ID);
+        assertEquals(0, scheduler.getPendingTaskCount());
+    }
+
+    @Test
+    public void shouldNotCloseBeforeIdleTimeoutElapses() throws Exception {
+        runOp();
+        awaitPendingTimer(true);
+
+        scheduler.advanceTimeBy(TIMEOUT_MS - 1, MILLISECONDS);
+
+        verify(manager, never()).destroy(TX_ID);
+        assertEquals(1, scheduler.getPendingTaskCount());
+    }
+
+    @Test
+    public void shouldReArmIdleTimerAfterEachOperation() throws Exception {
+        runOp();
+        awaitPendingTimer(true);
+        assertEquals(1, scheduler.getScheduledTaskCount());
+
+        runOp();
+        awaitPendingTimer(true);
+
+        // A second operation cancels the prior idle timer and arms a fresh 
one.
+        assertEquals(2, scheduler.getScheduledTaskCount());
+        assertEquals(1, scheduler.getPendingTaskCount());
+    }
+
+    @Test
+    public void shouldNotArmIdleTimerWhenIdleTimeoutDisabled() throws 
Exception {
+        // idleTransactionTimeout == 0 disables idle reclamation entirely: the 
timer is never armed.
+        final UnmanagedTransaction disabledTx =
+                new UnmanagedTransaction(TX_ID, manager, "g", graph, 
scheduler, 0L, PER_GRAPH_CLOSE_MS);
+
+        disabledTx.submit(new FutureTask<>(() -> null)).get(AWAIT_MS, 
TimeUnit.MILLISECONDS);
+
+        awaitPendingTimer(false);
+        assertEquals(0, scheduler.getPendingTaskCount());
+    }
+
+    @Test
+    public void shouldCancelScheduledCloseOnExplicitClose() throws Exception {
+        runOp();
+        awaitPendingTimer(true);
+
+        tx.close(true);
+
+        verify(manager).destroy(TX_ID);
+        // The pending inactivity close must be cancelled so it cannot fire 
after the transaction is gone.
+        assertEquals(0, scheduler.getPendingTaskCount());
+    }
+
+    @Test
+    public void shouldNotReArmIdleTimerAfterClose() throws Exception {
+        runOp();
+        awaitPendingTimer(true);
+
+        tx.close(false);
+
+        verify(manager).destroy(TX_ID);
+        // Advancing the clock must not resurrect a close on a transaction 
that is already gone.
+        scheduler.advanceTimeBy(TIMEOUT_MS * 2, MILLISECONDS);
+        assertEquals(0, scheduler.getPendingTaskCount());
+    }
+
+    // ---- Step 2: SingleThreadTransactionExecutor invariants (executor swap) 
----
+
+    @Test
+    public void 
shouldRunSubmittedTasksOnASingleNamedTransactionThreadInOrder() throws 
Exception {
+        final List<String> executionOrder = new CopyOnWriteArrayList<>();
+        final List<String> threadNames = new CopyOnWriteArrayList<>();
+
+        Future<?> last = null;
+        for (int i = 0; i < 5; i++) {
+            final int n = i;
+            last = tx.submit(new FutureTask<>(() -> {
+                threadNames.add(Thread.currentThread().getName());
+                executionOrder.add("task-" + n);
+                return null;
+            }));
+        }
+        last.get(5, TimeUnit.SECONDS); // FIFO single thread: the last task 
completing means all ran
+
+        assertEquals(List.of("task-0", "task-1", "task-2", "task-3", 
"task-4"), executionOrder);
+        // All ran on one thread, and that thread is the named transaction 
worker.
+        assertEquals(1, threadNames.stream().distinct().count());
+        assertTrue("expected tx-* thread but was " + threadNames.get(0),
+                threadNames.get(0).startsWith("tx-"));
+    }
+
+    @Test
+    public void shouldInterruptRunningTaskWhenReturnedFutureIsCancelled() 
throws Exception {
+        // Guards the "do NOT wrap submitted tasks" invariant: cancel(true) on 
the Future returned by submit() must
+        // interrupt the real work, exactly as the per-request evaluation 
timeout relies on in the handler.
+        final CountDownLatch started = new CountDownLatch(1);
+        final AtomicBoolean interrupted = new AtomicBoolean(false);
+        final AtomicReference<Throwable> unexpected = new AtomicReference<>();
+
+        final Future<?> running = tx.submit(new FutureTask<>(() -> {
+            started.countDown();
+            try {
+                Thread.sleep(30000); // block until interrupted by cancel(true)
+            } catch (InterruptedException e) {
+                interrupted.set(true);
+                throw e;
+            } catch (Throwable t) {
+                unexpected.set(t);
+            }
+            return null;
+        }));
+
+        assertTrue("task did not start", started.await(5, TimeUnit.SECONDS));
+        running.cancel(true);
+
+        // Give the worker a moment to observe the interrupt and record it.
+        final long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(5);
+        while (!interrupted.get() && System.nanoTime() < deadline) {
+            Thread.sleep(10);
+        }
+        assertTrue("cancel(true) did not interrupt the running task", 
interrupted.get());
+        assertEquals(null, unexpected.get());
+    }
+}
diff --git 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/util/ManualScheduledExecutorService.java
 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/util/ManualScheduledExecutorService.java
new file mode 100644
index 0000000000..21d93a2e08
--- /dev/null
+++ 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/util/ManualScheduledExecutorService.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.util;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A deterministic, single-threaded test double for {@link 
ScheduledExecutorService} backed by a virtual clock.
+ * <p>
+ * It exists so timer-driven behavior (such as the {@code gremlin-server} 
transaction idle / lifetime timeouts) can be
+ * exercised without {@link Thread#sleep(long)} and without real wall-clock 
waits, which are slow and flaky. Time only
+ * advances when the test calls {@link #advanceTimeBy(long, TimeUnit)} (or 
{@link #runDueTasks()}), at which point any
+ * scheduled task whose trigger time has been reached is run synchronously on 
the calling thread, in trigger-time order.
+ * <p>
+ * Only the one-shot {@link #schedule(Runnable, long, TimeUnit)} overload is 
implemented, because that is all the
+ * transaction code under test uses. Every other {@link 
ScheduledExecutorService} method throws
+ * {@link UnsupportedOperationException} with an explanatory message so that 
an unsupported use is loud rather than
+ * silently wrong.
+ * <p>
+ * It is thread-safe: the transaction idle timer is armed/cancelled on the 
transaction's worker thread (via the
+ * executor's before/afterExecute hooks) while a test thread advances the 
clock and reads counts. All shared state is
+ * guarded by {@code lock}. Fired task commands are run <em>outside</em> the 
lock — a fired idle close calls
+ * {@code close(false)}, which submits a rollback to the transaction executor 
and blocks on it; running it under the
+ * lock could deadlock against the worker thread re-entering {@link #schedule}.
+ */
+public class ManualScheduledExecutorService implements 
ScheduledExecutorService {
+
+    private final Object lock = new Object();
+    private final List<ScheduledTask> tasks = new ArrayList<>();
+    private long nowMillis = 0L;
+    private int scheduledCount = 0;
+
+    /**
+     * Schedules a one-shot task to run when the virtual clock advances by at 
least {@code delay}. Returns a
+     * {@link ScheduledFuture} whose {@link Future#cancel(boolean)} prevents 
the task from running on a later advance.
+     */
+    @Override
+    public ScheduledFuture<?> schedule(final Runnable command, final long 
delay, final TimeUnit unit) {
+        synchronized (lock) {
+            final ScheduledTask task = new ScheduledTask(command, nowMillis + 
unit.toMillis(delay));
+            tasks.add(task);
+            scheduledCount++;
+            return task;
+        }
+    }
+
+    /**
+     * Advances the virtual clock by the given amount and runs every task that 
is now due (trigger time {@code <=} the
+     * new current time) and not cancelled, in ascending trigger-time order.
+     */
+    public void advanceTimeBy(final long amount, final TimeUnit unit) {
+        synchronized (lock) {
+            nowMillis += unit.toMillis(amount);
+        }
+        runDueTasks();
+    }
+
+    /**
+     * Runs every currently-due, non-cancelled task without advancing the 
clock. Useful for firing a zero-delay task.
+     * Each due task is selected under the lock but executed outside it (see 
class Javadoc).
+     */
+    public void runDueTasks() {
+        while (true) {
+            final ScheduledTask next;
+            synchronized (lock) {
+                ScheduledTask soonest = null;
+                // Loop because a fired task may schedule another task that is 
itself immediately due.
+                for (final ScheduledTask t : tasks) {
+                    if (!t.cancelled && !t.done && t.triggerAtMillis <= 
nowMillis) {
+                        if (soonest == null || t.triggerAtMillis < 
soonest.triggerAtMillis) soonest = t;
+                    }
+                }
+                if (soonest == null) return;
+                soonest.done = true; // mark done under lock so it is not 
re-selected
+                next = soonest;
+            }
+            next.command.run(); // run OUTSIDE the lock
+        }
+    }
+
+    /**
+     * The number of tasks that are still scheduled to run (not cancelled, not 
yet fired).
+     */
+    public int getPendingTaskCount() {
+        synchronized (lock) {
+            int count = 0;
+            for (final ScheduledTask t : tasks) {
+                if (!t.cancelled && !t.done) count++;
+            }
+            return count;
+        }
+    }
+
+    /**
+     * The total number of tasks ever scheduled, including ones later 
cancelled or already fired. Lets a test assert
+     * that a reschedule actually issued a fresh {@code schedule(...)} call.
+     */
+    public int getScheduledTaskCount() {
+        synchronized (lock) {
+            return scheduledCount;
+        }
+    }
+
+    /**
+     * The remaining delay (ms) until the soonest still-pending task fires, or 
{@code -1} if none is pending.
+     */
+    public long nextPendingDelayMillis() {
+        synchronized (lock) {
+            long soonest = Long.MAX_VALUE;
+            for (final ScheduledTask t : tasks) {
+                if (!t.cancelled && !t.done) soonest = Math.min(soonest, 
t.triggerAtMillis - nowMillis);
+            }
+            return soonest == Long.MAX_VALUE ? -1L : soonest;
+        }
+    }
+
+    private final class ScheduledTask implements ScheduledFuture<Object> {
+        private final Runnable command;
+        private final long triggerAtMillis;
+        private volatile boolean cancelled = false;
+        private volatile boolean done = false;
+
+        private ScheduledTask(final Runnable command, final long 
triggerAtMillis) {
+            this.command = command;
+            this.triggerAtMillis = triggerAtMillis;
+        }
+
+        @Override
+        public long getDelay(final TimeUnit unit) {
+            synchronized (lock) {
+                return unit.convert(triggerAtMillis - nowMillis, 
TimeUnit.MILLISECONDS);
+            }
+        }
+
+        @Override
+        public int compareTo(final Delayed o) {
+            return Long.compare(getDelay(TimeUnit.MILLISECONDS), 
o.getDelay(TimeUnit.MILLISECONDS));
+        }
+
+        @Override
+        public boolean cancel(final boolean mayInterruptIfRunning) {
+            if (done || cancelled) return false;
+            cancelled = true;
+            return true;
+        }
+
+        @Override
+        public boolean isCancelled() {
+            return cancelled;
+        }
+
+        @Override
+        public boolean isDone() {
+            return done || cancelled;
+        }
+
+        @Override
+        public Object get() {
+            return null;
+        }
+
+        @Override
+        public Object get(final long timeout, final TimeUnit unit) {
+            return null;
+        }
+    }
+
+    // ---- Unsupported ScheduledExecutorService surface: fail loudly rather 
than behave unexpectedly. ----
+
+    private static UnsupportedOperationException unsupported(final String 
method) {
+        return new UnsupportedOperationException(
+                ManualScheduledExecutorService.class.getSimpleName() + " does 
not support " + method
+                        + "; only schedule(Runnable, long, TimeUnit) is 
implemented for tests.");
+    }
+
+    @Override
+    public <V> ScheduledFuture<V> schedule(final Callable<V> callable, final 
long delay, final TimeUnit unit) {
+        throw unsupported("schedule(Callable, long, TimeUnit)");
+    }
+
+    @Override
+    public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command, 
final long initialDelay, final long period, final TimeUnit unit) {
+        throw unsupported("scheduleAtFixedRate");
+    }
+
+    @Override
+    public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable command, 
final long initialDelay, final long delay, final TimeUnit unit) {
+        throw unsupported("scheduleWithFixedDelay");
+    }
+
+    @Override
+    public void execute(final Runnable command) {
+        throw unsupported("execute");
+    }
+
+    @Override
+    public void shutdown() {
+        throw unsupported("shutdown");
+    }
+
+    @Override
+    public List<Runnable> shutdownNow() {
+        throw unsupported("shutdownNow");
+    }
+
+    @Override
+    public boolean isShutdown() {
+        throw unsupported("isShutdown");
+    }
+
+    @Override
+    public boolean isTerminated() {
+        throw unsupported("isTerminated");
+    }
+
+    @Override
+    public boolean awaitTermination(final long timeout, final TimeUnit unit) {
+        throw unsupported("awaitTermination");
+    }
+
+    @Override
+    public <T> Future<T> submit(final Callable<T> task) {
+        throw unsupported("submit(Callable)");
+    }
+
+    @Override
+    public <T> Future<T> submit(final Runnable task, final T result) {
+        throw unsupported("submit(Runnable, T)");
+    }
+
+    @Override
+    public Future<?> submit(final Runnable task) {
+        throw unsupported("submit(Runnable)");
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(final java.util.Collection<? extends 
Callable<T>> tasks) {
+        throw unsupported("invokeAll");
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(final java.util.Collection<? extends 
Callable<T>> tasks, final long timeout, final TimeUnit unit) {
+        throw unsupported("invokeAll");
+    }
+
+    @Override
+    public <T> T invokeAny(final java.util.Collection<? extends Callable<T>> 
tasks) throws ExecutionException {
+        throw unsupported("invokeAny");
+    }
+
+    @Override
+    public <T> T invokeAny(final java.util.Collection<? extends Callable<T>> 
tasks, final long timeout, final TimeUnit unit) throws ExecutionException {
+        throw unsupported("invokeAny");
+    }
+}

Reply via email to