imbajin commented on code in PR #2941:
URL:
https://github.com/apache/incubator-hugegraph/pull/2941#discussion_r2715667630
##########
hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java:
##########
@@ -104,9 +104,23 @@ public void start(String name) {
LOG.info("Starting {} workers[{}] with queue size {}...",
this.workers, name, this.queueSize);
for (int i = 0; i < this.workers; i++) {
- this.runningFutures.add(
- this.executor.submit(new
ContextCallable<>(this::runAndDone)));
+ this.runningFutures.add(this.executor.submit(this::safeRun));
}
+
+ }
+ private Void safeRun() {
+ try {
+ new ContextCallable<>(this::runAndDone).call();
Review Comment:
**‼️ Critical:** The `safeRun()` method calls `new
ContextCallable<>(this::runAndDone).call()` directly, which bypasses the
context propagation mechanism that `ContextCallable` is designed to provide
when submitted via `ExecutorService.submit()`.
```suggestion
private Void safeRun() {
try {
this.runAndDone();
```
Since `safeRun()` is already submitted to the executor, the context should
be captured at submission time, not inside the worker. The current approach may
lose thread-local context that `ContextCallable` is meant to preserve.
##########
hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java:
##########
@@ -104,9 +104,23 @@ public void start(String name) {
LOG.info("Starting {} workers[{}] with queue size {}...",
this.workers, name, this.queueSize);
for (int i = 0; i < this.workers; i++) {
- this.runningFutures.add(
- this.executor.submit(new
ContextCallable<>(this::runAndDone)));
+ this.runningFutures.add(this.executor.submit(this::safeRun));
}
+
+ }
+ private Void safeRun() {
+ try {
+ new ContextCallable<>(this::runAndDone).call();
+ } catch (Throwable e) {
+ LOG.error("Worker failed before runAndDone()", e);
+ if (this.exception == null) {
+ this.exception = e;
+ }
+ exceptionHandle(e);
Review Comment:
**⚠️ Important:** Duplicate exception logging - both `safeRun()` and
`runAndDone()` call `exceptionHandle(e)`, which may result in the same
exception being logged/handled twice.
```suggestion
} catch (Throwable e) {
LOG.error("Worker failed before runAndDone()", e);
if (this.exception == null) {
this.exception = e;
}
// exceptionHandle(e) will be called inside runAndDone()
```
Consider removing one of the `exceptionHandle()` calls to avoid redundant
processing.
##########
hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java:
##########
@@ -104,9 +104,23 @@ public void start(String name) {
LOG.info("Starting {} workers[{}] with queue size {}...",
this.workers, name, this.queueSize);
for (int i = 0; i < this.workers; i++) {
- this.runningFutures.add(
- this.executor.submit(new
ContextCallable<>(this::runAndDone)));
+ this.runningFutures.add(this.executor.submit(this::safeRun));
}
+
+ }
+ private Void safeRun() {
Review Comment:
**⚠️ Important:** Missing blank line before method declaration violates Java
code style conventions.
```suggestion
private Void safeRun() {
```
##########
hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java:
##########
@@ -104,9 +104,23 @@ public void start(String name) {
LOG.info("Starting {} workers[{}] with queue size {}...",
this.workers, name, this.queueSize);
for (int i = 0; i < this.workers; i++) {
- this.runningFutures.add(
- this.executor.submit(new
ContextCallable<>(this::runAndDone)));
+ this.runningFutures.add(this.executor.submit(this::safeRun));
}
+
+ }
+ private Void safeRun() {
+ try {
+ new ContextCallable<>(this::runAndDone).call();
+ } catch (Throwable e) {
Review Comment:
**🧹 Minor:** The catch block catches `Throwable` which includes `Error`
subclasses. In the original code via `ContextCallable.call()`, only `Exception`
was caught. This change broadens exception handling - is this intentional?
Consider whether catching `Error` (e.g., `OutOfMemoryError`,
`StackOverflowError`) is appropriate here, as these typically indicate
unrecoverable JVM states.
##########
hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/util/ConsumersTest.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.unit.util;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hugegraph.testutil.Assert;
+import org.apache.hugegraph.util.Consumers;
+import org.junit.Test;
+
+public class ConsumersTest {
+
Review Comment:
**⚠️ Important:** Test timeout of 5 seconds may be too long for a unit test.
The actual execution should complete in milliseconds if there's no deadlock.
```suggestion
@Test(timeout = 1000)
```
Consider reducing to 1 second to fail faster if there's a regression.
##########
hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/util/ConsumersTest.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.unit.util;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hugegraph.testutil.Assert;
+import org.apache.hugegraph.util.Consumers;
+import org.junit.Test;
+
+public class ConsumersTest {
+
+ @Test(timeout = 5000)
+ public void testStartProvideAwaitNormal() throws Throwable {
+ ExecutorService executor = Executors.newFixedThreadPool(2);
+ try {
+ AtomicInteger processed = new AtomicInteger();
+
+ Consumers<Integer> consumers = new Consumers<>(executor, v -> {
+ processed.incrementAndGet();
+ });
+
+ consumers.start("test");
+ for (int i = 0; i < 50; i++) {
+ consumers.provide(i);
+ }
+ consumers.await();
+
+ Assert.assertEquals("Should process all provided elements",
+ 50, processed.get());
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ @Test(timeout = 5000)
Review Comment:
**🧹 Minor:** The test doesn't verify the specific deadlock scenario
mentioned in the PR description - where `ContextCallable` fails *before*
entering `runAndDone()`.
Consider adding a test case that explicitly simulates the failure scenario,
for example:
```java
@Test(timeout = 1000)
public void testAwaitDoesNotHangWhenContextCallableFails() throws Throwable {
// Test that simulates ContextCallable constructor/call() failure
// before runAndDone() is entered
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]