takraj opened a new issue, #1138:
URL: https://github.com/apache/plc4x/issues/1138

   ### What happened?
   
   This issue affects both `v0.10.0` and the pre-release `v0.11.0`, and also 
highlights a conceptual, architectural problem in the `plc4j` library in 
general, that applies to multiple drivers.
   
   ## Base problem
   
   The OPC-UA driver starts a keepalive thread once a connection is 
successfully estabilished. One keepalive thread per connection, here: 
[SecureChannel.java:577](https://github.com/apache/plc4x/blob/v0.11.0/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/SecureChannel.java#L577).
 The keepalive loop itself is created in the common thread pool (via 
`supplyAsync()`), which is limited in size, and shared between the library and 
the user application: 
[SecureChannel.java:998](https://github.com/apache/plc4x/blob/v0.11.0/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/SecureChannel.java#L998).
 The exit condition of this loop is not defined, therefore it keeps running 
forever, and holds/blocks a thread in the common pool, even after the 
connection is closed. As the common pool is size limited (`nr_cpus - 1`), after 
a few iterations of connect/disconnect, the common pool gets exhausted, and it 
can no longer execute any new tasks. Neither from the p
 lc4j library, nor from the user application. As `supplyAsync()` is also used 
for creating a subscription monitor loop, this means that loop never gets 
started, because of the keepalive threads: 
[OpcuaSubscriptionHandle.java:227](https://github.com/apache/plc4x/blob/v0.11.0/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaSubscriptionHandle.java#L227).
 This practically means that the future object returned by 
`PlcSubscriptionRequest.execute()` never completes.
   
   And let's assume for a moment, that the base problem above is fixed, and the 
keepalive thread now has a proper exit condition. In this case, the use of 
common pool will limit the count of ongoing subscriptions in the application, 
as each subscription thread occupies a slot in the pool. If the user has 4 CPU 
cores, this means they cannot have more than 2 active subscriptions: 1 slot is 
held by keepalive, 2 slots are held by the subscription threads.
   
   ## Conceptual problem
   
   Thread pools are not for long running tasks. They are for frequent, 
ephemeral calculations, to save the cost of spawning a new thread every time. I 
think the current (ab)use of common pool should be either redesigned, to avoid 
hosting infinite loops, or these should be replaced by dedicated threads.
   
   ## Example application to reproduce the Base problem
   
   ```java
   public class ForkJoinPoolDemo {
   
       public static final String FJP_PARALLELISM = 
"java.util.concurrent.ForkJoinPool.common.parallelism";
   
       public static void main(String[] args) throws Exception {
           System.out.println(FJP_PARALLELISM + ": " + 
System.getProperty(FJP_PARALLELISM));
           System.out.println("CPU Core: " + 
Runtime.getRuntime().availableProcessors());
   
           printDetails();
   
           PlcDriverManager driverManager = new DefaultPlcDriverManager();
           PlcConnectionManager connectionManager = 
driverManager.getConnectionManager();
   
           for (int i = 0; i < 50; i++) {
               Thread.sleep(1000);
   
               try (PlcConnection connection = 
connectionManager.getConnection("opcua:tcp://127.0.0.1:12686/milo")) {
                   printStats(format("Iteration: %d, Place: Before Subscribe", 
i));
   
                   PlcSubscriptionRequest request = 
connection.subscriptionRequestBuilder()
                           .addChangeOfStateTag("Demo", 
OpcuaTag.of("ns=2;s=HelloWorld/ScalarTypes/Integer"))
                           .build();
   
                   PlcSubscriptionResponse response = request.execute().get(60, 
TimeUnit.SECONDS);
                   if (response.getResponseCode("Demo") != PlcResponseCode.OK) {
                       throw new RuntimeException("Not OK.");
                   }
   
                   printStats(format("Iteration: %d, Place: Before 
Unsubscribe", i));
   
                   PlcUnsubscriptionRequest unsubscriptionRequest = 
connection.unsubscriptionRequestBuilder()
                           
.addHandles(response.getSubscriptionHandles()).build();
                   unsubscriptionRequest.execute();
   
                   printStats(format("Iteration: %d, Place: Before Close", i));
               } finally {
                   printStats(format("Iteration: %d, Place: After Close", i));
               }
           }
   
           printDetails();
       }
   
       private static void printDetails() {
           System.out.println("CommonPool Parallelism: " + 
ForkJoinPool.commonPool().getParallelism());
           System.out.println("CommonPool Common Parallelism: " + 
ForkJoinPool.getCommonPoolParallelism());
           System.out.println("CommonPool Pool Size: " + 
ForkJoinPool.commonPool().getPoolSize());
           System.out.println("CommonPool Queued Tasks: " + 
ForkJoinPool.commonPool().getQueuedTaskCount());
           System.out.println("CommonPool Queued Submissions: " + 
ForkJoinPool.commonPool().getQueuedSubmissionCount());
           System.out.println("CommonPool Active Threads: " + 
ForkJoinPool.commonPool().getActiveThreadCount());
       }
   
       private static void printStats(String comment) {
           System.out.println(comment);
           System.out.println(ForkJoinPool.commonPool().toString());
       }
   }
   ```
   
   The program above locks up, and eventually exits with a timeout exception. I 
have made thread dump during the lockup period, and I have seen 11 common pool 
threads in `TIMED_WAIT`, with the following stack trace:
   ```
   "ForkJoinPool.commonPool-worker-9" prio=0 tid=0x0 nid=0x0 waiting on 
condition
        java.lang.Thread.State: TIMED_WAITING
        at java.base@11.0.20.1/java.lang.Thread.sleep(Native Method)
        at 
app//org.apache.plc4x.java.opcua.context.SecureChannel.lambda$50(SecureChannel.java:1001)
        at 
app//org.apache.plc4x.java.opcua.context.SecureChannel$$Lambda$227/0x0000000840218c40.get(Unknown
 Source)
        at 
java.base@11.0.20.1/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
        at 
java.base@11.0.20.1/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1692)
        at 
java.base@11.0.20.1/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
        at 
java.base@11.0.20.1/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
        at 
java.base@11.0.20.1/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
        at 
java.base@11.0.20.1/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
        at 
java.base@11.0.20.1/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
   ```
   They all were the keepalive threads: 
[SecureChannel.java:1001](https://github.com/apache/plc4x/blob/v0.11.0/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/SecureChannel.java#L1001)
   
   ## Logs
   
   * [with 
0.10.zip](https://github.com/apache/plc4x/files/12857649/with.0.10.zip)
   * [with 
0.11.zip](https://github.com/apache/plc4x/files/12857657/with.0.11.zip)
   
   And I would like to highlight the following lines:
   ```
   with 0.11$ grep -B1 "java.util.concurrent.ForkJoinPool@" 0.11.log 
   Iteration: 0, Place: Before Subscribe
   java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 
1, active = 1, running = 0, steals = 0, tasks = 0, submissions = 0]
   --
   Iteration: 0, Place: Before Unsubscribe
   java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 
4, active = 2, running = 1, steals = 1, tasks = 0, submissions = 0]
   --
   Iteration: 0, Place: Before Close
   java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 
4, active = 2, running = 0, steals = 1, tasks = 0, submissions = 0]
   --
   Iteration: 0, Place: After Close
   java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 
4, active = 1, running = 0, steals = 2, tasks = 0, submissions = 0]
   --
   Iteration: 1, Place: Before Subscribe
   java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 
4, active = 2, running = 0, steals = 2, tasks = 0, submissions = 0]
   --
   Iteration: 1, Place: Before Unsubscribe
   java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 
6, active = 3, running = 1, steals = 3, tasks = 0, submissions = 0]
   --
   Iteration: 1, Place: Before Close
   java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 
6, active = 3, running = 0, steals = 3, tasks = 0, submissions = 0]
   --
   Iteration: 1, Place: After Close
   java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 
6, active = 2, running = 0, steals = 4, tasks = 0, submissions = 0]
   --
   Iteration: 2, Place: Before Subscribe
   java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 
6, active = 3, running = 0, steals = 4, tasks = 0, submissions = 0]
   --
   Iteration: 2, Place: Before Unsubscribe
   java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 
8, active = 4, running = 1, steals = 4, tasks = 0, submissions = 0]
   --
   Iteration: 2, Place: Before Close
   java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 
8, active = 4, running = 0, steals = 4, tasks = 0, submissions = 0]
   --
   Iteration: 2, Place: After Close
   java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 
8, active = 3, running = 0, steals = 5, tasks = 0, submissions = 0]
   --
   Iteration: 3, Place: Before Subscribe
   java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 
8, active = 4, running = 0, steals = 5, tasks = 0, submissions = 0]
   --
   Iteration: 3, Place: Before Unsubscribe
   java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 
10, active = 5, running = 1, steals = 5, tasks = 0, submissions = 0]
   --
   Iteration: 3, Place: Before Close
   java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 
10, active = 5, running = 0, steals = 5, tasks = 0, submissions = 0]
   --
   Iteration: 3, Place: After Close
   java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 
10, active = 4, running = 0, steals = 6, tasks = 0, submissions = 0]
   --
   Iteration: 4, Place: Before Subscribe
   java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 
10, active = 5, running = 0, steals = 6, tasks = 0, submissions = 0]
   --
   Iteration: 4, Place: Before Unsubscribe
   java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 
11, active = 6, running = 1, steals = 6, tasks = 0, submissions = 0]
   --
   Iteration: 4, Place: Before Close
   java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 
11, active = 6, running = 0, steals = 6, tasks = 0, submissions = 0]
   --
   Iteration: 4, Place: After Close
   java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 
11, active = 5, running = 0, steals = 7, tasks = 0, submissions = 0]
   --
   Iteration: 5, Place: Before Subscribe
   java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 
11, active = 6, running = 0, steals = 7, tasks = 0, submissions = 0]
   --
   Iteration: 5, Place: Before Unsubscribe
   java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 
11, active = 7, running = 1, steals = 8, tasks = 0, submissions = 0]
   --
   Iteration: 5, Place: Before Close
   java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 
11, active = 7, running = 0, steals = 8, tasks = 0, submissions = 0]
   --
   Iteration: 5, Place: After Close
   java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 
11, active = 6, running = 0, steals = 9, tasks = 0, submissions = 0]
   --
   Iteration: 6, Place: Before Subscribe
   java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 
11, active = 7, running = 0, steals = 9, tasks = 0, submissions = 0]
   --
   Iteration: 6, Place: Before Unsubscribe
   java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 
11, active = 8, running = 1, steals = 10, tasks = 0, submissions = 0]
   --
   Iteration: 6, Place: Before Close
   java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 
11, active = 8, running = 0, steals = 10, tasks = 0, submissions = 0]
   --
   Iteration: 6, Place: After Close
   java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 
11, active = 7, running = 0, steals = 11, tasks = 0, submissions = 0]
   --
   Iteration: 7, Place: Before Subscribe
   java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 
11, active = 8, running = 0, steals = 11, tasks = 0, submissions = 0]
   --
   Iteration: 7, Place: Before Unsubscribe
   java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 
11, active = 9, running = 1, steals = 12, tasks = 0, submissions = 0]
   --
   Iteration: 7, Place: Before Close
   java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 
11, active = 9, running = 0, steals = 12, tasks = 0, submissions = 0]
   --
   Iteration: 7, Place: After Close
   java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 
11, active = 8, running = 0, steals = 13, tasks = 0, submissions = 0]
   --
   Iteration: 8, Place: Before Subscribe
   java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 
11, active = 9, running = 0, steals = 13, tasks = 0, submissions = 0]
   --
   Iteration: 8, Place: Before Unsubscribe
   java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 
11, active = 10, running = 1, steals = 14, tasks = 0, submissions = 0]
   --
   Iteration: 8, Place: Before Close
   java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 
11, active = 10, running = 0, steals = 14, tasks = 0, submissions = 0]
   --
   Iteration: 8, Place: After Close
   java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 
11, active = 9, running = 0, steals = 15, tasks = 0, submissions = 0]
   --
   Iteration: 9, Place: Before Subscribe
   java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 
11, active = 10, running = 0, steals = 15, tasks = 0, submissions = 0]
   --
   Iteration: 9, Place: Before Unsubscribe
   java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 
11, active = 11, running = 1, steals = 15, tasks = 0, submissions = 0]
   --
   Iteration: 9, Place: Before Close
   java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 
11, active = 11, running = 0, steals = 15, tasks = 0, submissions = 0]
   --
   Iteration: 9, Place: After Close
   java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 
11, active = 10, running = 0, steals = 16, tasks = 0, submissions = 0]
   --
   Iteration: 10, Place: Before Subscribe
   java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 
11, active = 11, running = 0, steals = 16, tasks = 0, submissions = 0]
   --
   Iteration: 10, Place: After Close
   java.util.concurrent.ForkJoinPool@2de23121[Running, parallelism = 11, size = 
11, active = 11, running = 0, steals = 16, tasks = 0, submissions = 1]
   ```
   
   ### Version
   
   v0.10.0
   
   ### Programming Languages
   
   - [X] plc4j
   - [ ] plc4go
   - [ ] plc4c
   - [ ] plc4net
   
   ### Protocols
   
   - [ ] AB-Ethernet
   - [ ] ADS /AMS
   - [ ] BACnet/IP
   - [ ] CANopen
   - [ ] DeltaV
   - [ ] DF1
   - [ ] EtherNet/IP
   - [ ] Firmata
   - [ ] KNXnet/IP
   - [ ] Modbus
   - [X] OPC-UA
   - [ ] S7


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@plc4x.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to