This is an automated email from the ASF dual-hosted git repository. kenhuuu pushed a commit to branch tx-idle-to in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit e5736c3332b7086b1285552073d349961a9edffa Author: Ken Hu <[email protected]> AuthorDate: Wed Jun 24 19:39:11 2026 -0700 Suspend the HTTP transaction idle timer while an operation is running The idle timeout was armed on request arrival rather than when the transaction went idle, so a single operation running longer than the timeout tripped it mid-execution, contradicting the documented promise that active transactions are unaffected. A long operation should be bounded by evaluationTimeout; the idle timer should only reclaim abandoned transactions. The per-transaction executor is now a ThreadPoolExecutor(1,1) whose before/afterExecute hooks suspend the idle timer while work runs and re-arm it only once the worker parks with an empty queue. This gives a reliable running-vs-idle signal without wrapping submitted tasks, which would break the evaluation-timeout interrupt that relies on cancelling the real FutureTask. transactionTimeout is renamed to idleTransactionTimeout to reflect its actual meaning (renamed outright as the feature is unreleased), and now honors 0 as "disabled" to match its documentation. Assisted-by: Claude Code:claude-opus-4-8 --- .../apache/tinkerpop/gremlin/server/Settings.java | 9 +- .../server/transaction/TransactionManager.java | 14 +- .../server/transaction/UnmanagedTransaction.java | 136 +++++++--- .../gremlin/server/util/ServerGremlinExecutor.java | 2 +- .../GremlinDriverTransactionIntegrateTest.java | 4 +- .../GremlinServerHttpTransactionIntegrateTest.java | 53 +++- .../transaction/UnmanagedTransactionTest.java | 291 +++++++++++++++++++++ .../util/ManualScheduledExecutorService.java | 279 ++++++++++++++++++++ 8 files changed, 728 insertions(+), 60 deletions(-) diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java index 8d6111ef27..236a4b4431 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java @@ -185,10 +185,13 @@ public class Settings { public boolean strictTransactionManagement = false; /** - * Time in milliseconds that a transaction can remain idle before it is automatically rolled back. - * This prevents resource leaks from abandoned transactions. Default is 600000 (10 minutes). + * Time in milliseconds that a transaction can remain idle (no operation running or queued) before it is + * automatically rolled back. This prevents resource leaks from abandoned transactions. The idle timer is suspended + * while an operation is in progress, so a long-running operation does not trip it (its duration is instead bounded + * by {@link #evaluationTimeout}). Set to {@code 0} to disable idle reclamation entirely. Default is 600000 + * (10 minutes). */ - public long transactionTimeout = 600000L; + public long idleTransactionTimeout = 600000L; /** * Time in milliseconds to wait for a transaction commit or rollback operation to complete. diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/TransactionManager.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/TransactionManager.java index 6901664c15..ec203e820e 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/TransactionManager.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/TransactionManager.java @@ -43,7 +43,7 @@ public class TransactionManager { private final ConcurrentMap<String, UnmanagedTransaction> transactions = new ConcurrentHashMap<>(); private final ScheduledExecutorService scheduledExecutorService; private final GraphManager graphManager; - private final long transactionTimeoutMs; + private final long idleTransactionTimeoutMs; private final int maxConcurrentTransactions; private final long perGraphCloseMs; @@ -52,23 +52,23 @@ public class TransactionManager { * * @param scheduledExecutorService Scheduler for timeout management * @param graphManager The graph manager for accessing traversal sources - * @param transactionTimeoutMs Timeout in milliseconds before auto-rollback + * @param idleTransactionTimeoutMs Inactivity timeout in milliseconds before auto-rollback; {@code 0} disables it * @param maxConcurrentTransactions Maximum number of concurrent transactions allowed */ public TransactionManager(final ScheduledExecutorService scheduledExecutorService, final GraphManager graphManager, - final long transactionTimeoutMs, + final long idleTransactionTimeoutMs, final int maxConcurrentTransactions, final long perGraphCloseMs) { this.scheduledExecutorService = scheduledExecutorService; this.graphManager = graphManager; - this.transactionTimeoutMs = transactionTimeoutMs; + this.idleTransactionTimeoutMs = idleTransactionTimeoutMs; this.maxConcurrentTransactions = maxConcurrentTransactions; this.perGraphCloseMs = perGraphCloseMs; MetricManager.INSTANCE.getGauge(transactions::size, name(GremlinServer.class, "transactions")); - logger.info("TransactionManager initialized with timeout={}ms, maxTransactions={}", - transactionTimeoutMs, maxConcurrentTransactions); + logger.info("TransactionManager initialized with idleTransactionTimeout={}ms, maxTransactions={}", + idleTransactionTimeoutMs, maxConcurrentTransactions); } /** @@ -123,7 +123,7 @@ public class TransactionManager { traversalSourceName, graph, scheduledExecutorService, - transactionTimeoutMs, + idleTransactionTimeoutMs, perGraphCloseMs ); } while (transactions.putIfAbsent(txId, ctx) != null); diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransaction.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransaction.java index 3dfc394204..367e8f963b 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransaction.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransaction.java @@ -23,12 +23,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Optional; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -42,6 +43,12 @@ import java.util.concurrent.atomic.AtomicReference; * the complete request lifecycle (graph operation, error handling, response writing), * following the same pattern as the non-transactional HTTP path and the legacy * {@code SessionOpProcessor}. + * <p> + * The single-threaded executor is a {@link SingleThreadTransactionExecutor} (a {@code ThreadPoolExecutor} with one + * core/max thread) rather than {@link java.util.concurrent.Executors#newSingleThreadExecutor}. It is behaviorally + * identical for task execution but exposes the {@code beforeExecute}/{@code afterExecute} lifecycle hooks and the task + * queue, which the idle-timer management relies on to tell "an operation is running" apart from "the worker is idle + * with an empty queue". The {@code Executors} factory hides those behind a sealed wrapper. */ public class UnmanagedTransaction { private static final Logger logger = LoggerFactory.getLogger(UnmanagedTransaction.class); @@ -51,16 +58,16 @@ public class UnmanagedTransaction { private final TransactionManager manager; private final Graph graph; private final ScheduledExecutorService scheduledExecutorService; - private final long timeout; + private final long idleTimeout; private final long perGraphClose; - private final AtomicReference<ScheduledFuture<?>> timeoutFuture = new AtomicReference<>(); + private final AtomicReference<ScheduledFuture<?>> idleFuture = new AtomicReference<>(); // Controls whether the executor is still accepting tasks. private final AtomicBoolean accepting = new AtomicBoolean(true); /** * Single-threaded executor ensures all operations for this transaction run on * the same thread, preserving the ThreadLocal nature of Graph transactions. */ - private final ExecutorService executor; + private final SingleThreadTransactionExecutor executor; /** * Creates a new {@code UnmanagedTransaction} for managing an HTTP transaction. @@ -70,14 +77,14 @@ public class UnmanagedTransaction { * @param traversalSourceName The traversal source name bound at begin time * @param graph The graph instance for this transaction * @param scheduledExecutorService Scheduler for timeout management - * @param transactionTimeout Timeout in milliseconds before auto-rollback + * @param idleTransactionTimeout Inactivity timeout in milliseconds before auto-rollback; {@code 0} disables it */ public UnmanagedTransaction(final String transactionId, final TransactionManager transactionManager, final String traversalSourceName, final Graph graph, final ScheduledExecutorService scheduledExecutorService, - final long transactionTimeout, + final long idleTransactionTimeout, final long perGraphClose) { logger.debug("New transaction context established for {}", transactionId); this.transactionId = transactionId; @@ -85,12 +92,15 @@ public class UnmanagedTransaction { this.manager = transactionManager; this.graph = graph; this.scheduledExecutorService = scheduledExecutorService; - this.timeout = transactionTimeout; + this.idleTimeout = idleTransactionTimeout; this.perGraphClose = perGraphClose; - // Create single-threaded executor with named thread for debugging - this.executor = Executors.newSingleThreadExecutor( - r -> new Thread(r, "tx-" + transactionId.substring(0, Math.min(8, transactionId.length())))); + // Create single-threaded executor with named thread for debugging. A ThreadPoolExecutor(1,1) is used (rather + // than Executors.newSingleThreadExecutor) so the before/afterExecute hooks and the task queue are accessible + // for idle-timer management; see SingleThreadTransactionExecutor. + final ThreadFactory threadFactory = + r -> new Thread(r, "tx-" + transactionId.substring(0, Math.min(8, transactionId.length()))); + this.executor = new SingleThreadTransactionExecutor(threadFactory); } /** @@ -107,36 +117,6 @@ public class UnmanagedTransaction { return traversalSourceName; } - /** - * Resets the timeout for this transaction. Called on each request. - */ - public void touch() { - timeoutFuture.updateAndGet(future -> { - if (future != null) future.cancel(false); - return scheduledExecutorService.schedule(() -> { - logger.info("Transaction {} timed out after {} ms of inactivity", transactionId, timeout); - close(false); - }, timeout, TimeUnit.MILLISECONDS); - }); - } - - /** - * Opens the underlying graph transaction and starts the inactivity timeout. - * Should be called on the transaction's single-threaded executor to preserve - * ThreadLocal affinity. On failure the exception is re-thrown and the caller - * is responsible for cleanup (e.g. via {@link #close(boolean)}). - */ - public void open() { - try { - graph.tx().open(); - touch(); - logger.debug("Transaction {} opened", transactionId); - } catch (Exception e) { - logger.warn("Failed to begin transaction {}: {}", transactionId, e.getMessage()); - throw e; - } - } - /** * Closes this transaction and releases its resources. When {@code force} is {@code false}, * any open graph transaction is rolled back before shutdown. When {@code force} is {@code true}, @@ -183,7 +163,7 @@ public class UnmanagedTransaction { // reorder these two statements. manager.destroy(transactionId); executor.shutdown(); - Optional.ofNullable(timeoutFuture.get()).ifPresent(f -> f.cancel(true)); + Optional.ofNullable(idleFuture.get()).ifPresent(f -> f.cancel(true)); logger.debug("Transaction {} closed", transactionId); } @@ -199,7 +179,77 @@ public class UnmanagedTransaction { public Future<?> submit(final FutureTask<Void> task) { if (!accepting.get()) throw new IllegalStateException("Transaction " + transactionId + " is closed"); - touch(); + // Insurance backstop: cancel (do NOT arm) the idle timer on submit. Arming is the executor's job, done in + // afterExecute once the worker parks with an empty queue. beforeExecute will also cancel when the task starts; + // cancelling here too closes the small window between accepting a task and the worker picking it up. + cancelIdleTimer(); return executor.submit(task); } + + /** + * Suspends the inactivity timer because an operation is running (or about to run) on the transaction thread. + * Invoked from {@link SingleThreadTransactionExecutor#beforeExecute} and, as a backstop, from {@link #submit}. + * <p> + * A long-running operation must not trip the idle timeout: while an operation is in progress the idle timer is + * simply not armed (the operation's own duration is bounded by the per-request {@code evaluationTimeout} instead). + */ + private void cancelIdleTimer() { + idleFuture.updateAndGet(future -> { + if (future != null) future.cancel(false); + return null; + }); + } + + /** + * (Re)arms the inactivity timer, but only when the transaction is genuinely idle. Invoked from + * {@link SingleThreadTransactionExecutor#afterExecute} once an operation has finished and the worker is about to + * look for more work. + * <p> + * "Idle" means: still {@link #accepting} new work (not closing), the executor queue is empty (no sibling request is + * already waiting — on a single thread there is a brief instant between one task finishing and the next starting), + * and the idle timeout is enabled ({@code idleTimeout > 0}; {@code 0} disables idle reclamation entirely). When all + * hold, a fresh {@code close(false)} is scheduled {@code idleTimeout} ms out, replacing any previously scheduled one. + */ + private void maybeScheduleIdleTimer() { + if (!accepting.get()) return; // closing/closed: never re-arm a dying transaction + if (idleTimeout <= 0) return; // 0 (or negative) disables idle reclamation + if (!executor.getQueue().isEmpty()) return; // a sibling task is already queued -> not idle yet + + idleFuture.updateAndGet(future -> { + if (future != null) future.cancel(false); + return scheduledExecutorService.schedule(() -> { + logger.info("Transaction {} timed out after {} ms of inactivity", transactionId, idleTimeout); + close(false); + }, idleTimeout, TimeUnit.MILLISECONDS); + }); + } + + /** + * A single-threaded {@link ThreadPoolExecutor} (one core and max thread) that runs all operations for a single + * transaction on the same worker thread, preserving the ThreadLocal nature of graph transactions. + * <p> + * It is used in place of {@link java.util.concurrent.Executors#newSingleThreadExecutor} solely to expose the + * {@link #beforeExecute}/{@link #afterExecute} lifecycle hooks (and, via {@link #getQueue()}, the pending-task + * queue), which the enclosing {@link UnmanagedTransaction} needs to distinguish "an operation is running" from + * "the worker is idle with nothing queued". Task-execution semantics are otherwise identical to a single-thread + * executor: one worker, FIFO ordering. Submitted {@link FutureTask}s are returned unwrapped so callers can + * {@code cancel(true)} the real work (e.g. the per-request evaluation timeout interrupting a running operation). + */ + private final class SingleThreadTransactionExecutor extends ThreadPoolExecutor { + private SingleThreadTransactionExecutor(final ThreadFactory threadFactory) { + super(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), threadFactory); + } + + @Override + protected void beforeExecute(final Thread t, final Runnable r) { + super.beforeExecute(t, r); + cancelIdleTimer(); + } + + @Override + protected void afterExecute(final Runnable r, final Throwable t) { + super.afterExecute(r, t); + maybeScheduleIdleTimer(); + } + } } diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java index 3995337566..ab6be093e2 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java @@ -306,7 +306,7 @@ public class ServerGremlinExecutor { transactionManager = new TransactionManager( scheduledExecutorService, graphManager, - settings.transactionTimeout, + settings.idleTransactionTimeout, settings.maxConcurrentTransactions, settings.perGraphCloseTimeout ); diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverTransactionIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverTransactionIntegrateTest.java index 69f387b17e..8a6c4700cd 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverTransactionIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverTransactionIntegrateTest.java @@ -85,10 +85,10 @@ public class GremlinDriverTransactionIntegrateTest extends AbstractGremlinServer case "shouldTimeoutIdleTransaction": case "shouldTimeoutIdleTransactionWithNoOperations": case "shouldRejectLateCommitAfterTimeout": - settings.transactionTimeout = 1000; + settings.idleTransactionTimeout = 1000; break; case "shouldTimeoutOnlyIdleTransactionNotActiveOne": - settings.transactionTimeout = 2000; + settings.idleTransactionTimeout = 2000; break; } return settings; diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpTransactionIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpTransactionIntegrateTest.java index 95895e66b4..c9a05b82ab 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpTransactionIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpTransactionIntegrateTest.java @@ -93,17 +93,21 @@ public class GremlinServerHttpTransactionIntegrateTest extends AbstractGremlinSe break; case "shouldTimeoutFreeSlotUnderMaxConcurrentTransactions": settings.maxConcurrentTransactions = 1; - settings.transactionTimeout = 1000; + settings.idleTransactionTimeout = 1000; break; case "shouldTimeoutIdleTransactionWithNoOperations": - settings.transactionTimeout = 500; + settings.idleTransactionTimeout = 500; break; case "shouldTimeoutAndRejectLateCommit": case "shouldTrackTransactionCountAccurately": - settings.transactionTimeout = 1000; + settings.idleTransactionTimeout = 1000; break; case "shouldRollbackAbandonedTransaction": - settings.transactionTimeout = 300; + settings.idleTransactionTimeout = 300; + break; + case "shouldNotIdleTimeoutLongRunningOperation": + // Short idle timeout, but a single long operation must NOT trip it (idle suspended while busy). + settings.idleTransactionTimeout = 500; break; case "shouldRejectMismatchedGraphAliasInTransaction": { final Settings.GraphSettings gs = new Settings.GraphSettings(); @@ -528,6 +532,47 @@ public class GremlinServerHttpTransactionIntegrateTest extends AbstractGremlinSe } } + @Test + public void shouldNotIdleTimeoutLongRunningOperation() throws Exception { + // With a short idle timeout (500ms), a single operation that runs LONGER than the idle timeout must still + // succeed -- the idle timer is suspended while an operation is in progress, so a long-running op is not + // reclaimed mid-execution (it is instead bounded by evaluationTimeout, left at its default here). + final String txId = beginTx(client, GTX); + + // Seed two vertices and an edge so repeat(both()) has something to traverse and keeps the executor busy. Each + // response body is fully consumed before the next request is sent: the chunked stream is only complete once the + // server has finished processing, so consuming guarantees these requests are strictly ordered. + try (final CloseableHttpResponse r = submitInTx(client, txId, "g.addV().property(T.id, 1)", GTX)) { + assertEquals(200, r.getStatusLine().getStatusCode()); + EntityUtils.consume(r.getEntity()); + } + try (final CloseableHttpResponse r = submitInTx(client, txId, "g.addV().property(T.id, 2)", GTX)) { + assertEquals(200, r.getStatusLine().getStatusCode()); + EntityUtils.consume(r.getEntity()); + } + try (final CloseableHttpResponse r = submitInTx(client, txId, "g.V(1).addE('self').to(__.V(2))", GTX)) { + assertEquals(200, r.getStatusLine().getStatusCode()); + EntityUtils.consume(r.getEntity()); + } + + // A traversal that runs well past the 500ms idle timeout. Under the old arm-on-arrival behavior the idle timer + // would have fired mid-execution; under suspend-while-busy it does not. + try (final CloseableHttpResponse r = submitInTx(client, txId, "g.V().repeat(both()).times(2000)", GTX)) { + assertEquals(200, r.getStatusLine().getStatusCode()); + EntityUtils.consume(r.getEntity()); + } + + // The transaction is still alive and usable after the long op (it was not reclaimed mid-flight). + try (final CloseableHttpResponse r = submitInTx(client, txId, "g.V().count()", GTX)) { + assertEquals(200, r.getStatusLine().getStatusCode()); + assertEquals(2, extractCount(r)); // extractCount fully reads the body + } + try (final CloseableHttpResponse r = commitTx(client, txId, GTX)) { + assertEquals(200, r.getStatusLine().getStatusCode()); + EntityUtils.consume(r.getEntity()); + } + } + @Test public void shouldRejectMismatchedGraphAliasInTransaction() throws Exception { final String txId = beginTx(client, GTX); diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransactionTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransactionTest.java new file mode 100644 index 0000000000..81a8a65949 --- /dev/null +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransactionTest.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.server.transaction; + +import org.apache.tinkerpop.gremlin.server.util.ManualScheduledExecutorService; +import org.apache.tinkerpop.gremlin.structure.Graph; +import org.apache.tinkerpop.gremlin.structure.Transaction; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Unit tests for {@link UnmanagedTransaction}, driven by a deterministic {@link ManualScheduledExecutorService} so the + * inactivity-timeout behaviour can be asserted without real wall-clock waits. + * <p> + * These are <em>specification</em> tests for the reworked idle timer (suspend-while-busy): the idle timer is armed only + * when the transaction goes idle (no operation running, empty queue) and is suspended while an operation runs. The idle + * timer is (re)armed from the executor's {@code afterExecute} hook, which runs on the transaction worker thread, so + * timer assertions poll the scheduler with a bounded wait via {@link #awaitPendingTimer(boolean)}. + */ +public class UnmanagedTransactionTest { + + private static final String TX_ID = "test-tx-0001"; + private static final long TIMEOUT_MS = 600000L; + private static final long PER_GRAPH_CLOSE_MS = 10000L; + private static final long AWAIT_MS = 5000L; + + private TransactionManager manager; + private Graph graph; + private ManualScheduledExecutorService scheduler; + private UnmanagedTransaction tx; + + @Before + public void setUp() { + manager = mock(TransactionManager.class); + graph = mock(Graph.class); + final Transaction graphTx = mock(Transaction.class); + when(graph.tx()).thenReturn(graphTx); + when(graphTx.isOpen()).thenReturn(false); // rollback path is a no-op during close(false) + + scheduler = new ManualScheduledExecutorService(); + tx = new UnmanagedTransaction(TX_ID, manager, "g", graph, scheduler, TIMEOUT_MS, PER_GRAPH_CLOSE_MS); + + // close() short-circuits unless the manager still knows about the transaction. + when(manager.get(TX_ID)).thenReturn(Optional.of(tx)); + } + + /** + * Submits a no-op task and blocks until it has finished running on the worker thread. + */ + private void runOp() throws Exception { + tx.submit(new FutureTask<>(() -> null)).get(AWAIT_MS, TimeUnit.MILLISECONDS); + } + + /** + * Waits (bounded) for the idle timer to reach the expected armed/not-armed state, since it is (re)armed on the + * worker thread from afterExecute slightly after the submitted task's Future completes. Returns once the condition + * holds or the wait elapses; the caller asserts on the final state. + */ + private void awaitPendingTimer(final boolean expectArmed) throws InterruptedException { + final long deadline = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(AWAIT_MS); + while (System.nanoTime() < deadline) { + if ((scheduler.getPendingTaskCount() == 1) == expectArmed) return; + Thread.sleep(5); + } + } + + @Test + public void shouldNotScheduleAnyCloseAtConstruction() { + assertEquals(0, scheduler.getScheduledTaskCount()); + assertEquals(0, scheduler.getPendingTaskCount()); + } + + @Test + public void shouldArmIdleTimerWhenWorkerGoesIdleAfterAnOperation() throws Exception { + runOp(); + + awaitPendingTimer(true); + assertEquals("idle timer should be armed once the worker parks with an empty queue", + 1, scheduler.getPendingTaskCount()); + assertEquals(TIMEOUT_MS, scheduler.nextPendingDelayMillis()); + } + + @Test + public void shouldNotArmIdleTimerWhileAnOperationIsRunning() throws Exception { + // Hold an operation "running" and assert no idle timer is armed during that window. + final CountDownLatch started = new CountDownLatch(1); + final CountDownLatch release = new CountDownLatch(1); + final Future<?> running = tx.submit(new FutureTask<>(() -> { + started.countDown(); + release.await(); + return null; + })); + + assertTrue(started.await(AWAIT_MS, MILLISECONDS)); + // While the op runs, the idle timer must not be armed (a long op must not trip the idle timeout). + assertEquals(0, scheduler.getPendingTaskCount()); + + release.countDown(); + running.get(AWAIT_MS, MILLISECONDS); + + // Once the worker goes idle, the timer arms. + awaitPendingTimer(true); + assertEquals(1, scheduler.getPendingTaskCount()); + } + + @Test + public void shouldNotFireIdleCloseForALongRunningOperation() throws Exception { + // A single operation that runs longer than the idle timeout must not be reclaimed mid-execution: no timer is + // armed while it runs, so advancing the clock far past the timeout fires nothing. + final CountDownLatch started = new CountDownLatch(1); + final CountDownLatch release = new CountDownLatch(1); + final Future<?> running = tx.submit(new FutureTask<>(() -> { + started.countDown(); + release.await(); + return null; + })); + assertTrue(started.await(AWAIT_MS, MILLISECONDS)); + + scheduler.advanceTimeBy(TIMEOUT_MS * 2, MILLISECONDS); + + verify(manager, never()).destroy(TX_ID); + release.countDown(); + running.get(AWAIT_MS, MILLISECONDS); + } + + @Test + public void shouldCloseTransactionWhenIdleTimeoutFires() throws Exception { + runOp(); + awaitPendingTimer(true); + + scheduler.advanceTimeBy(TIMEOUT_MS, MILLISECONDS); + + // The scheduled close(false) removes the transaction from the manager. + verify(manager).destroy(TX_ID); + assertEquals(0, scheduler.getPendingTaskCount()); + } + + @Test + public void shouldNotCloseBeforeIdleTimeoutElapses() throws Exception { + runOp(); + awaitPendingTimer(true); + + scheduler.advanceTimeBy(TIMEOUT_MS - 1, MILLISECONDS); + + verify(manager, never()).destroy(TX_ID); + assertEquals(1, scheduler.getPendingTaskCount()); + } + + @Test + public void shouldReArmIdleTimerAfterEachOperation() throws Exception { + runOp(); + awaitPendingTimer(true); + assertEquals(1, scheduler.getScheduledTaskCount()); + + runOp(); + awaitPendingTimer(true); + + // A second operation cancels the prior idle timer and arms a fresh one. + assertEquals(2, scheduler.getScheduledTaskCount()); + assertEquals(1, scheduler.getPendingTaskCount()); + } + + @Test + public void shouldNotArmIdleTimerWhenIdleTimeoutDisabled() throws Exception { + // idleTransactionTimeout == 0 disables idle reclamation entirely: the timer is never armed. + final UnmanagedTransaction disabledTx = + new UnmanagedTransaction(TX_ID, manager, "g", graph, scheduler, 0L, PER_GRAPH_CLOSE_MS); + + disabledTx.submit(new FutureTask<>(() -> null)).get(AWAIT_MS, TimeUnit.MILLISECONDS); + + awaitPendingTimer(false); + assertEquals(0, scheduler.getPendingTaskCount()); + } + + @Test + public void shouldCancelScheduledCloseOnExplicitClose() throws Exception { + runOp(); + awaitPendingTimer(true); + + tx.close(true); + + verify(manager).destroy(TX_ID); + // The pending inactivity close must be cancelled so it cannot fire after the transaction is gone. + assertEquals(0, scheduler.getPendingTaskCount()); + } + + @Test + public void shouldNotReArmIdleTimerAfterClose() throws Exception { + runOp(); + awaitPendingTimer(true); + + tx.close(false); + + verify(manager).destroy(TX_ID); + // Advancing the clock must not resurrect a close on a transaction that is already gone. + scheduler.advanceTimeBy(TIMEOUT_MS * 2, MILLISECONDS); + assertEquals(0, scheduler.getPendingTaskCount()); + } + + // ---- Step 2: SingleThreadTransactionExecutor invariants (executor swap) ---- + + @Test + public void shouldRunSubmittedTasksOnASingleNamedTransactionThreadInOrder() throws Exception { + final List<String> executionOrder = new CopyOnWriteArrayList<>(); + final List<String> threadNames = new CopyOnWriteArrayList<>(); + + Future<?> last = null; + for (int i = 0; i < 5; i++) { + final int n = i; + last = tx.submit(new FutureTask<>(() -> { + threadNames.add(Thread.currentThread().getName()); + executionOrder.add("task-" + n); + return null; + })); + } + last.get(5, TimeUnit.SECONDS); // FIFO single thread: the last task completing means all ran + + assertEquals(List.of("task-0", "task-1", "task-2", "task-3", "task-4"), executionOrder); + // All ran on one thread, and that thread is the named transaction worker. + assertEquals(1, threadNames.stream().distinct().count()); + assertTrue("expected tx-* thread but was " + threadNames.get(0), + threadNames.get(0).startsWith("tx-")); + } + + @Test + public void shouldInterruptRunningTaskWhenReturnedFutureIsCancelled() throws Exception { + // Guards the "do NOT wrap submitted tasks" invariant: cancel(true) on the Future returned by submit() must + // interrupt the real work, exactly as the per-request evaluation timeout relies on in the handler. + final CountDownLatch started = new CountDownLatch(1); + final AtomicBoolean interrupted = new AtomicBoolean(false); + final AtomicReference<Throwable> unexpected = new AtomicReference<>(); + + final Future<?> running = tx.submit(new FutureTask<>(() -> { + started.countDown(); + try { + Thread.sleep(30000); // block until interrupted by cancel(true) + } catch (InterruptedException e) { + interrupted.set(true); + throw e; + } catch (Throwable t) { + unexpected.set(t); + } + return null; + })); + + assertTrue("task did not start", started.await(5, TimeUnit.SECONDS)); + running.cancel(true); + + // Give the worker a moment to observe the interrupt and record it. + final long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(5); + while (!interrupted.get() && System.nanoTime() < deadline) { + Thread.sleep(10); + } + assertTrue("cancel(true) did not interrupt the running task", interrupted.get()); + assertEquals(null, unexpected.get()); + } +} diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/util/ManualScheduledExecutorService.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/util/ManualScheduledExecutorService.java new file mode 100644 index 0000000000..21d93a2e08 --- /dev/null +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/util/ManualScheduledExecutorService.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.server.util; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * A deterministic, single-threaded test double for {@link ScheduledExecutorService} backed by a virtual clock. + * <p> + * It exists so timer-driven behavior (such as the {@code gremlin-server} transaction idle / lifetime timeouts) can be + * exercised without {@link Thread#sleep(long)} and without real wall-clock waits, which are slow and flaky. Time only + * advances when the test calls {@link #advanceTimeBy(long, TimeUnit)} (or {@link #runDueTasks()}), at which point any + * scheduled task whose trigger time has been reached is run synchronously on the calling thread, in trigger-time order. + * <p> + * Only the one-shot {@link #schedule(Runnable, long, TimeUnit)} overload is implemented, because that is all the + * transaction code under test uses. Every other {@link ScheduledExecutorService} method throws + * {@link UnsupportedOperationException} with an explanatory message so that an unsupported use is loud rather than + * silently wrong. + * <p> + * It is thread-safe: the transaction idle timer is armed/cancelled on the transaction's worker thread (via the + * executor's before/afterExecute hooks) while a test thread advances the clock and reads counts. All shared state is + * guarded by {@code lock}. Fired task commands are run <em>outside</em> the lock — a fired idle close calls + * {@code close(false)}, which submits a rollback to the transaction executor and blocks on it; running it under the + * lock could deadlock against the worker thread re-entering {@link #schedule}. + */ +public class ManualScheduledExecutorService implements ScheduledExecutorService { + + private final Object lock = new Object(); + private final List<ScheduledTask> tasks = new ArrayList<>(); + private long nowMillis = 0L; + private int scheduledCount = 0; + + /** + * Schedules a one-shot task to run when the virtual clock advances by at least {@code delay}. Returns a + * {@link ScheduledFuture} whose {@link Future#cancel(boolean)} prevents the task from running on a later advance. + */ + @Override + public ScheduledFuture<?> schedule(final Runnable command, final long delay, final TimeUnit unit) { + synchronized (lock) { + final ScheduledTask task = new ScheduledTask(command, nowMillis + unit.toMillis(delay)); + tasks.add(task); + scheduledCount++; + return task; + } + } + + /** + * Advances the virtual clock by the given amount and runs every task that is now due (trigger time {@code <=} the + * new current time) and not cancelled, in ascending trigger-time order. + */ + public void advanceTimeBy(final long amount, final TimeUnit unit) { + synchronized (lock) { + nowMillis += unit.toMillis(amount); + } + runDueTasks(); + } + + /** + * Runs every currently-due, non-cancelled task without advancing the clock. Useful for firing a zero-delay task. + * Each due task is selected under the lock but executed outside it (see class Javadoc). + */ + public void runDueTasks() { + while (true) { + final ScheduledTask next; + synchronized (lock) { + ScheduledTask soonest = null; + // Loop because a fired task may schedule another task that is itself immediately due. + for (final ScheduledTask t : tasks) { + if (!t.cancelled && !t.done && t.triggerAtMillis <= nowMillis) { + if (soonest == null || t.triggerAtMillis < soonest.triggerAtMillis) soonest = t; + } + } + if (soonest == null) return; + soonest.done = true; // mark done under lock so it is not re-selected + next = soonest; + } + next.command.run(); // run OUTSIDE the lock + } + } + + /** + * The number of tasks that are still scheduled to run (not cancelled, not yet fired). + */ + public int getPendingTaskCount() { + synchronized (lock) { + int count = 0; + for (final ScheduledTask t : tasks) { + if (!t.cancelled && !t.done) count++; + } + return count; + } + } + + /** + * The total number of tasks ever scheduled, including ones later cancelled or already fired. Lets a test assert + * that a reschedule actually issued a fresh {@code schedule(...)} call. + */ + public int getScheduledTaskCount() { + synchronized (lock) { + return scheduledCount; + } + } + + /** + * The remaining delay (ms) until the soonest still-pending task fires, or {@code -1} if none is pending. + */ + public long nextPendingDelayMillis() { + synchronized (lock) { + long soonest = Long.MAX_VALUE; + for (final ScheduledTask t : tasks) { + if (!t.cancelled && !t.done) soonest = Math.min(soonest, t.triggerAtMillis - nowMillis); + } + return soonest == Long.MAX_VALUE ? -1L : soonest; + } + } + + private final class ScheduledTask implements ScheduledFuture<Object> { + private final Runnable command; + private final long triggerAtMillis; + private volatile boolean cancelled = false; + private volatile boolean done = false; + + private ScheduledTask(final Runnable command, final long triggerAtMillis) { + this.command = command; + this.triggerAtMillis = triggerAtMillis; + } + + @Override + public long getDelay(final TimeUnit unit) { + synchronized (lock) { + return unit.convert(triggerAtMillis - nowMillis, TimeUnit.MILLISECONDS); + } + } + + @Override + public int compareTo(final Delayed o) { + return Long.compare(getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS)); + } + + @Override + public boolean cancel(final boolean mayInterruptIfRunning) { + if (done || cancelled) return false; + cancelled = true; + return true; + } + + @Override + public boolean isCancelled() { + return cancelled; + } + + @Override + public boolean isDone() { + return done || cancelled; + } + + @Override + public Object get() { + return null; + } + + @Override + public Object get(final long timeout, final TimeUnit unit) { + return null; + } + } + + // ---- Unsupported ScheduledExecutorService surface: fail loudly rather than behave unexpectedly. ---- + + private static UnsupportedOperationException unsupported(final String method) { + return new UnsupportedOperationException( + ManualScheduledExecutorService.class.getSimpleName() + " does not support " + method + + "; only schedule(Runnable, long, TimeUnit) is implemented for tests."); + } + + @Override + public <V> ScheduledFuture<V> schedule(final Callable<V> callable, final long delay, final TimeUnit unit) { + throw unsupported("schedule(Callable, long, TimeUnit)"); + } + + @Override + public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command, final long initialDelay, final long period, final TimeUnit unit) { + throw unsupported("scheduleAtFixedRate"); + } + + @Override + public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable command, final long initialDelay, final long delay, final TimeUnit unit) { + throw unsupported("scheduleWithFixedDelay"); + } + + @Override + public void execute(final Runnable command) { + throw unsupported("execute"); + } + + @Override + public void shutdown() { + throw unsupported("shutdown"); + } + + @Override + public List<Runnable> shutdownNow() { + throw unsupported("shutdownNow"); + } + + @Override + public boolean isShutdown() { + throw unsupported("isShutdown"); + } + + @Override + public boolean isTerminated() { + throw unsupported("isTerminated"); + } + + @Override + public boolean awaitTermination(final long timeout, final TimeUnit unit) { + throw unsupported("awaitTermination"); + } + + @Override + public <T> Future<T> submit(final Callable<T> task) { + throw unsupported("submit(Callable)"); + } + + @Override + public <T> Future<T> submit(final Runnable task, final T result) { + throw unsupported("submit(Runnable, T)"); + } + + @Override + public Future<?> submit(final Runnable task) { + throw unsupported("submit(Runnable)"); + } + + @Override + public <T> List<Future<T>> invokeAll(final java.util.Collection<? extends Callable<T>> tasks) { + throw unsupported("invokeAll"); + } + + @Override + public <T> List<Future<T>> invokeAll(final java.util.Collection<? extends Callable<T>> tasks, final long timeout, final TimeUnit unit) { + throw unsupported("invokeAll"); + } + + @Override + public <T> T invokeAny(final java.util.Collection<? extends Callable<T>> tasks) throws ExecutionException { + throw unsupported("invokeAny"); + } + + @Override + public <T> T invokeAny(final java.util.Collection<? extends Callable<T>> tasks, final long timeout, final TimeUnit unit) throws ExecutionException { + throw unsupported("invokeAny"); + } +}
