takraj opened a new issue, #1138: URL: https://github.com/apache/plc4x/issues/1138
### What happened? This issue affects both `v0.10.0` and the pre-release `v0.11.0`, and also highlights a conceptual, architectural problem in the `plc4j` library in general, that applies to multiple drivers. ## Base problem The OPC-UA driver starts a keepalive thread once a connection is successfully estabilished. One keepalive thread per connection, here: [SecureChannel.java:577](https://github.com/apache/plc4x/blob/v0.11.0/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/SecureChannel.java#L577). The keepalive loop itself is created in the common thread pool (via `supplyAsync()`), which is limited in size, and shared between the library and the user application: [SecureChannel.java:998](https://github.com/apache/plc4x/blob/v0.11.0/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/SecureChannel.java#L998). The exit condition of this loop is not defined, therefore it keeps running forever, and holds/blocks a thread in the common pool, even after the connection is closed. As the common pool is size limited (`nr_cpus - 1`), after a few iterations of connect/disconnect, the common pool gets exhausted, and it can no longer execute any new tasks. Neither from the p lc4j library, nor from the user application. As `supplyAsync()` is also used for creating a subscription monitor loop, this means that loop never gets started, because of the keepalive threads: [OpcuaSubscriptionHandle.java:227](https://github.com/apache/plc4x/blob/v0.11.0/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaSubscriptionHandle.java#L227). This practically means that the future object returned by `PlcSubscriptionRequest.execute()` never completes. And let's assume for a moment, that the base problem above is fixed, and the keepalive thread now has a proper exit condition. In this case, the use of common pool will limit the count of ongoing subscriptions in the application, as each subscription thread occupies a slot in the pool. If the user has 4 CPU cores, this means they cannot have more than 2 active subscriptions: 1 slot is held by keepalive, 2 slots are held by the subscription threads. ## Conceptual problem Thread pools are not for long running tasks. They are for frequent, ephemeral calculations, to save the cost of spawning a new thread every time. I think the current (ab)use of common pool should be either redesigned, to avoid hosting infinite loops, or these should be replaced by dedicated threads. ## Example application to reproduce the Base problem ```java public class ForkJoinPoolDemo { public static final String FJP_PARALLELISM = "java.util.concurrent.ForkJoinPool.common.parallelism"; public static void main(String[] args) throws Exception { System.out.println(FJP_PARALLELISM + ": " + System.getProperty(FJP_PARALLELISM)); System.out.println("CPU Core: " + Runtime.getRuntime().availableProcessors()); printDetails(); PlcDriverManager driverManager = new DefaultPlcDriverManager(); PlcConnectionManager connectionManager = driverManager.getConnectionManager(); for (int i = 0; i < 50; i++) { Thread.sleep(1000); try (PlcConnection connection = connectionManager.getConnection("opcua:tcp://127.0.0.1:12686/milo")) { printStats(format("Iteration: %d, Place: Before Subscribe", i)); PlcSubscriptionRequest request = connection.subscriptionRequestBuilder() .addChangeOfStateTag("Demo", OpcuaTag.of("ns=2;s=HelloWorld/ScalarTypes/Integer")) .build(); PlcSubscriptionResponse response = request.execute().get(60, TimeUnit.SECONDS); if (response.getResponseCode("Demo") != PlcResponseCode.OK) { throw new RuntimeException("Not OK."); } printStats(format("Iteration: %d, Place: Before Unsubscribe", i)); PlcUnsubscriptionRequest unsubscriptionRequest = connection.unsubscriptionRequestBuilder() .addHandles(response.getSubscriptionHandles()).build(); unsubscriptionRequest.execute(); printStats(format("Iteration: %d, Place: Before Close", i)); } finally { printStats(format("Iteration: %d, Place: After Close", i)); } } printDetails(); } private static void printDetails() { System.out.println("CommonPool Parallelism: " + ForkJoinPool.commonPool().getParallelism()); System.out.println("CommonPool Common Parallelism: " + ForkJoinPool.getCommonPoolParallelism()); System.out.println("CommonPool Pool Size: " + ForkJoinPool.commonPool().getPoolSize()); System.out.println("CommonPool Queued Tasks: " + ForkJoinPool.commonPool().getQueuedTaskCount()); System.out.println("CommonPool Queued Submissions: " + ForkJoinPool.commonPool().getQueuedSubmissionCount()); System.out.println("CommonPool Active Threads: " + ForkJoinPool.commonPool().getActiveThreadCount()); } private static void printStats(String comment) { System.out.println(comment); System.out.println(ForkJoinPool.commonPool().toString()); } } ``` The program above locks up, and eventually exits with a timeout exception. I have made thread dump during the lockup period, and I have seen 11 common pool threads in `TIMED_WAIT`, with the following stack trace: ``` "ForkJoinPool.commonPool-worker-9" prio=0 tid=0x0 nid=0x0 waiting on condition java.lang.Thread.State: TIMED_WAITING at java.base@11.0.20.1/java.lang.Thread.sleep(Native Method) at app//org.apache.plc4x.java.opcua.context.SecureChannel.lambda$50(SecureChannel.java:1001) at app//org.apache.plc4x.java.opcua.context.SecureChannel$$Lambda$227/0x0000000840218c40.get(Unknown Source) at java.base@11.0.20.1/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) at java.base@11.0.20.1/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1692) at java.base@11.0.20.1/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) at java.base@11.0.20.1/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) at java.base@11.0.20.1/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) at java.base@11.0.20.1/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) at java.base@11.0.20.1/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) ``` They all were the keepalive threads: [SecureChannel.java:1001](https://github.com/apache/plc4x/blob/v0.11.0/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/SecureChannel.java#L1001) ## Logs * [with 0.10.zip](https://github.com/apache/plc4x/files/12857649/with.0.10.zip) * [with 0.11.zip](https://github.com/apache/plc4x/files/12857657/with.0.11.zip) And I would like to highlight the following lines: ``` with 0.11$ grep -B1 "java.util.concurrent.ForkJoinPool@" 0.11.log Iteration: 0, Place: Before Subscribe java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 1, active = 1, running = 0, steals = 0, tasks = 0, submissions = 0] -- Iteration: 0, Place: Before Unsubscribe java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 4, active = 2, running = 1, steals = 1, tasks = 0, submissions = 0] -- Iteration: 0, Place: Before Close java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 4, active = 2, running = 0, steals = 1, tasks = 0, submissions = 0] -- Iteration: 0, Place: After Close java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 4, active = 1, running = 0, steals = 2, tasks = 0, submissions = 0] -- Iteration: 1, Place: Before Subscribe java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 4, active = 2, running = 0, steals = 2, tasks = 0, submissions = 0] -- Iteration: 1, Place: Before Unsubscribe java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 6, active = 3, running = 1, steals = 3, tasks = 0, submissions = 0] -- Iteration: 1, Place: Before Close java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 6, active = 3, running = 0, steals = 3, tasks = 0, submissions = 0] -- Iteration: 1, Place: After Close java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 6, active = 2, running = 0, steals = 4, tasks = 0, submissions = 0] -- Iteration: 2, Place: Before Subscribe java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 6, active = 3, running = 0, steals = 4, tasks = 0, submissions = 0] -- Iteration: 2, Place: Before Unsubscribe java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 8, active = 4, running = 1, steals = 4, tasks = 0, submissions = 0] -- Iteration: 2, Place: Before Close java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 8, active = 4, running = 0, steals = 4, tasks = 0, submissions = 0] -- Iteration: 2, Place: After Close java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 8, active = 3, running = 0, steals = 5, tasks = 0, submissions = 0] -- Iteration: 3, Place: Before Subscribe java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 8, active = 4, running = 0, steals = 5, tasks = 0, submissions = 0] -- Iteration: 3, Place: Before Unsubscribe java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 10, active = 5, running = 1, steals = 5, tasks = 0, submissions = 0] -- Iteration: 3, Place: Before Close java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 10, active = 5, running = 0, steals = 5, tasks = 0, submissions = 0] -- Iteration: 3, Place: After Close java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 10, active = 4, running = 0, steals = 6, tasks = 0, submissions = 0] -- Iteration: 4, Place: Before Subscribe java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 10, active = 5, running = 0, steals = 6, tasks = 0, submissions = 0] -- Iteration: 4, Place: Before Unsubscribe java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 11, active = 6, running = 1, steals = 6, tasks = 0, submissions = 0] -- Iteration: 4, Place: Before Close java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 11, active = 6, running = 0, steals = 6, tasks = 0, submissions = 0] -- Iteration: 4, Place: After Close java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 11, active = 5, running = 0, steals = 7, tasks = 0, submissions = 0] -- Iteration: 5, Place: Before Subscribe java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 11, active = 6, running = 0, steals = 7, tasks = 0, submissions = 0] -- Iteration: 5, Place: Before Unsubscribe java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 11, active = 7, running = 1, steals = 8, tasks = 0, submissions = 0] -- Iteration: 5, Place: Before Close java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 11, active = 7, running = 0, steals = 8, tasks = 0, submissions = 0] -- Iteration: 5, Place: After Close java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 11, active = 6, running = 0, steals = 9, tasks = 0, submissions = 0] -- Iteration: 6, Place: Before Subscribe java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 11, active = 7, running = 0, steals = 9, tasks = 0, submissions = 0] -- Iteration: 6, Place: Before Unsubscribe java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 11, active = 8, running = 1, steals = 10, tasks = 0, submissions = 0] -- Iteration: 6, Place: Before Close java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 11, active = 8, running = 0, steals = 10, tasks = 0, submissions = 0] -- Iteration: 6, Place: After Close java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 11, active = 7, running = 0, steals = 11, tasks = 0, submissions = 0] -- Iteration: 7, Place: Before Subscribe java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 11, active = 8, running = 0, steals = 11, tasks = 0, submissions = 0] -- Iteration: 7, Place: Before Unsubscribe java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 11, active = 9, running = 1, steals = 12, tasks = 0, submissions = 0] -- Iteration: 7, Place: Before Close java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 11, active = 9, running = 0, steals = 12, tasks = 0, submissions = 0] -- Iteration: 7, Place: After Close java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 11, active = 8, running = 0, steals = 13, tasks = 0, submissions = 0] -- Iteration: 8, Place: Before Subscribe java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 11, active = 9, running = 0, steals = 13, tasks = 0, submissions = 0] -- Iteration: 8, Place: Before Unsubscribe java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 11, active = 10, running = 1, steals = 14, tasks = 0, submissions = 0] -- Iteration: 8, Place: Before Close java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 11, active = 10, running = 0, steals = 14, tasks = 0, submissions = 0] -- Iteration: 8, Place: After Close java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 11, active = 9, running = 0, steals = 15, tasks = 0, submissions = 0] -- Iteration: 9, Place: Before Subscribe java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 11, active = 10, running = 0, steals = 15, tasks = 0, submissions = 0] -- Iteration: 9, Place: Before Unsubscribe java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 11, active = 11, running = 1, steals = 15, tasks = 0, submissions = 0] -- Iteration: 9, Place: Before Close java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 11, active = 11, running = 0, steals = 15, tasks = 0, submissions = 0] -- Iteration: 9, Place: After Close java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 11, active = 10, running = 0, steals = 16, tasks = 0, submissions = 0] -- Iteration: 10, Place: Before Subscribe java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 11, active = 11, running = 0, steals = 16, tasks = 0, submissions = 0] -- Iteration: 10, Place: After Close java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 11, active = 11, running = 0, steals = 16, tasks = 0, submissions = 1] ``` ### Version v0.10.0 ### Programming Languages - [X] plc4j - [ ] plc4go - [ ] plc4c - [ ] plc4net ### Protocols - [ ] AB-Ethernet - [ ] ADS /AMS - [ ] BACnet/IP - [ ] CANopen - [ ] DeltaV - [ ] DF1 - [ ] EtherNet/IP - [ ] Firmata - [ ] KNXnet/IP - [ ] Modbus - [X] OPC-UA - [ ] S7 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@plc4x.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org