This is an automated email from the ASF dual-hosted git repository.
jin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git
The following commit(s) were added to refs/heads/master by this push:
new fc391a7c6 fix(server): prevent await deadlock on ContextCallable
failure (#2941)
fc391a7c6 is described below
commit fc391a7c661ade3a9cbcf1ba27d947cc6aca56d6
Author: Himanshu Verma <[email protected]>
AuthorDate: Mon Jan 26 12:29:18 2026 +0530
fix(server): prevent await deadlock on ContextCallable failure (#2941)
Add a unit test that explicitly covers the failure scenario described in
the PR,
where ContextCallable fails before entering runAndDone().
The test verifies that Consumers.await() does not hang when the worker task
fails during ContextCallable execution, relying on safeRun() to always
decrement the latch in its finally block.
This test would deadlock on the previous implementation and passes with the
current fix, ensuring the issue cannot regress.
---
.../java/org/apache/hugegraph/util/Consumers.java | 42 ++++++--
.../apache/hugegraph/unit/util/ConsumersTest.java | 111 +++++++++++++++++++++
2 files changed, 145 insertions(+), 8 deletions(-)
diff --git
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java
index daa54ee95..f4a7671f3 100644
---
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java
+++
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java
@@ -101,30 +101,56 @@ public final class Consumers<V> {
if (this.executor == null) {
return;
}
+
LOG.info("Starting {} workers[{}] with queue size {}...",
this.workers, name, this.queueSize);
+
for (int i = 0; i < this.workers; i++) {
- this.runningFutures.add(
- this.executor.submit(new
ContextCallable<>(this::runAndDone)));
+ // capture submission thread context HERE
+ ContextCallable<Void> task = new
ContextCallable<>(this::runAndDone);
+
+ // wrapper ensures latch always decremented even if
ContextCallable fails
+ this.runningFutures.add(this.executor.submit(() ->
this.safeRun(task)));
+ }
+ }
+
+ private Void safeRun(ContextCallable<Void> task) {
+ try {
+ return task.call(); // may fail before/after runAndDone()
+ } catch (Exception e) {
+ // This exception is from ContextCallable wrapper
(setContext/resetContext/delegate dispatch),
+ // not from runAndDone() business logic (that one is handled
inside runAndDone()).
+ if (this.exception == null) {
+ this.exception = e;
+ LOG.error("Consumer worker failed in ContextCallable wrapper",
e);
+ } else {
+ LOG.warn("Additional worker failure in ContextCallable
wrapper; first exception already recorded", e);
+ }
+ this.exceptionHandle(e);
+ } finally {
+ this.latch.countDown();
}
+ return null;
}
private Void runAndDone() {
try {
this.run();
- } catch (Throwable e) {
+ } catch (Exception e) {
if (e instanceof StopExecution) {
this.queue.clear();
putQueueEnd();
} else {
- // Only the first exception to one thread can be stored
- this.exception = e;
- LOG.error("Error when running task", e);
+ if (this.exception == null) {
+ this.exception = e;
+ LOG.error("Unhandled exception in consumer task", e);
+ } else {
+ LOG.warn("Additional exception in consumer task; first
exception already recorded", e);
+ }
}
- exceptionHandle(e);
+ this.exceptionHandle(e);
} finally {
this.done();
- this.latch.countDown();
}
return null;
}
diff --git
a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/util/ConsumersTest.java
b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/util/ConsumersTest.java
new file mode 100644
index 000000000..1bb152ec7
--- /dev/null
+++
b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/util/ConsumersTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.unit.util;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hugegraph.testutil.Assert;
+import org.apache.hugegraph.util.Consumers;
+import org.junit.Test;
+
+public class ConsumersTest {
+
+ @Test(timeout = 1000)
+ public void testStartProvideAwaitNormal() throws Throwable {
+ ExecutorService executor = Executors.newFixedThreadPool(2);
+ try {
+ AtomicInteger processed = new AtomicInteger();
+
+ Consumers<Integer> consumers = new Consumers<>(executor, v -> {
+ processed.incrementAndGet();
+ });
+
+ consumers.start("test");
+ for (int i = 0; i < 50; i++) {
+ consumers.provide(i);
+ }
+ consumers.await();
+
+ Assert.assertEquals("Should process all provided elements",
+ 50, processed.get());
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ /**
+ * Regression test for deadlock:
+ *
+ * ContextCallable fails before entering runAndDone().
+ * await() must still return because latch is decremented in safeRun().
+ */
+ @Test(timeout = 1000)
+ public void testAwaitDoesNotHangWhenContextCallableFails() throws
Throwable {
+ ExecutorService executor = Executors.newFixedThreadPool(1);
+ try {
+ // Use AssertionError to bypass the inner catch(Exception) loop in
runAndDone()
+ // This simulates a scenario where an exception escapes the task
logic
+ // (similar to how a ContextCallable failure would behave from
safeRun's perspective)
+ Consumers<Integer> consumers = new Consumers<>(executor, v -> {
+ throw new AssertionError("Simulated fatal error
(OOM/StackOverflow/etc)");
+ });
+ consumers.start("test-fatal-error");
+ consumers.provide(1);
+ // Verification:
+ // Without the fix, the latch would never be decremented (because
runAndDone crashes), causing await() to hang.
+ // With the fix (safeRun wrapper), the finally block ensures
latch.countDown() is called.
+ consumers.await();
+
+ // Note: consumer.exception will be null because safeRun only
catches Exception, not Error.
+ // This is acceptable behavior for fatal errors, as long as it
doesn't deadlock.
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ @Test(timeout = 1000)
+ public void testAwaitThrowsWhenConsumerThrows() throws Throwable {
+ ExecutorService executor = Executors.newFixedThreadPool(2);
+ try {
+ final String msg = "Injected exception for test";
+
+ Consumers<Integer> consumers = new Consumers<>(executor, v -> {
+ throw new RuntimeException(msg);
+ });
+
+ consumers.start("test");
+ consumers.provide(1);
+
+ try {
+ consumers.await();
+ Assert.fail("Expected await() to throw when consumer throws");
+ } catch (Throwable t) {
+ Throwable root = t.getCause() != null ? t.getCause() : t;
+ Assert.assertTrue("Expected RuntimeException, but got: " +
root,
+ root instanceof RuntimeException);
+ Assert.assertTrue("Exception message should contain injected
message",
+ root.getMessage() != null &&
+ root.getMessage().contains(msg));
+ }
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+}