This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new 654929dda0 fix(plc4j/opcua): Fix keepalive threads are never shut down 
(#1139)
654929dda0 is described below

commit 654929dda0047b65c984379eca2adb43fb77252c
Author: Rajmund Takács <[email protected]>
AuthorDate: Fri Oct 13 08:33:19 2023 +0200

    fix(plc4j/opcua): Fix keepalive threads are never shut down (#1139)
    
    * plc4j-driver-opcua: Fix keepalive threads are never shut down
    
    * plc4j-driver-opcua: Remove superfluous log, and set interrupt flag
---
 .../plc4x/java/opcua/context/SecureChannel.java    | 40 +++++++----
 .../opcua/protocol/OpcuaSubscriptionHandle.java    |  5 +-
 .../plc4x/java/opcua/OpcuaPlcDriverTest.java       | 81 ++++++++++++++++++++++
 3 files changed, 113 insertions(+), 13 deletions(-)

diff --git 
a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/SecureChannel.java
 
b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/SecureChannel.java
index 16e3e36616..9d80a7b4e6 100644
--- 
a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/SecureChannel.java
+++ 
b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/SecureChannel.java
@@ -18,6 +18,9 @@
  */
 package org.apache.plc4x.java.opcua.context;
 
+import static java.lang.Thread.currentThread;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.RandomUtils;
 import org.apache.plc4x.java.api.authentication.PlcAuthentication;
@@ -45,6 +48,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiConsumer;
@@ -122,6 +126,8 @@ public class SecureChannel {
     private CompletableFuture<Void> keepAlive;
     private final List<String> endpoints = new ArrayList<>();
     private final AtomicLong senderSequenceNumber = new AtomicLong();
+    private final AtomicBoolean enableKeepalive = new AtomicBoolean(true);
+    private double sessionTimeout = 120000L;
 
     public SecureChannel(OpcuaDriverContext driverContext, OpcuaConfiguration 
configuration, PlcAuthentication authentication) {
         this.configuration = configuration;
@@ -415,7 +421,7 @@ public class SecureChannel {
             new PascalString(sessionName),
             new PascalByteString(clientNonce.length, clientNonce),
             NULL_BYTE_STRING,
-            120000L,
+            sessionTimeout,
             0L
         );
 
@@ -452,6 +458,7 @@ public class SecureChannel {
                                 responseMessage = (CreateSessionResponse) 
unknownExtensionObject;
 
                                 authenticationToken = 
responseMessage.getAuthenticationToken().getNodeId();
+                                sessionTimeout = 
responseMessage.getRevisedSessionTimeout();
 
                                 onConnectActivateSessionRequest(context, 
responseMessage, (CreateSessionResponse) message.getBody());
                             } else {
@@ -601,7 +608,7 @@ public class SecureChannel {
         int requestHandle = getRequestHandle();
 
         if (keepAlive != null) {
-            keepAlive.complete(null);
+            enableKeepalive.set(false);
         }
 
         ExpandedNodeId expandedNodeId = new ExpandedNodeId(
@@ -995,14 +1002,7 @@ public class SecureChannel {
 
     private void keepAlive() {
         keepAlive = CompletableFuture.supplyAsync(() -> {
-                while (true) {
-
-                    try {
-                        Thread.sleep((long) Math.ceil(this.lifetime * 0.75f));
-                    } catch (InterruptedException e) {
-                        LOGGER.trace("Interrupted Exception");
-                    }
-
+                while (enableKeepalive.get()) {
                     int transactionId = 
channelTransactionManager.getTransactionIdentifier();
 
                     RequestHeader requestHeader = new RequestHeader(new 
NodeId(authenticationToken),
@@ -1071,7 +1071,14 @@ public class SecureChannel {
                             .unwrap(apuMessage -> 
encryptionHandler.decodeMessage(apuMessage))
                             .check(p -> p.getMessage() instanceof 
OpcuaOpenResponse)
                             .unwrap(p -> (OpcuaOpenResponse) p.getMessage())
-                            .check(p -> p.getRequestId() == transactionId)
+                            .check(p -> {
+                                if (p.getRequestId() == transactionId) {
+                                    senderSequenceNumber.incrementAndGet();
+                                    return true;
+                                } else {
+                                    return false;
+                                }
+                            })
                             .handle(opcuaOpenResponse -> {
                                 try {
                                     ReadBufferByteBased readBuffer = new 
ReadBufferByteBased(opcuaOpenResponse.getMessage(), 
org.apache.plc4x.java.spi.generation.ByteOrder.LITTLE_ENDIAN);
@@ -1096,8 +1103,17 @@ public class SecureChannel {
                     } catch (SerializationException | ParseException e) {
                         LOGGER.error("Unable to to Parse Open Secure Request");
                     }
+
+                    try {
+                        Thread.sleep((long) Math.ceil(this.sessionTimeout * 
0.25f));
+                    } catch (InterruptedException e) {
+                        LOGGER.trace("Interrupted Exception");
+                        currentThread().interrupt();
+                    }
                 }
-            }
+                return null;
+            },
+            newSingleThreadExecutor()
         );
     }
 
diff --git 
a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaSubscriptionHandle.java
 
b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaSubscriptionHandle.java
index bde3faac13..e90031be58 100644
--- 
a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaSubscriptionHandle.java
+++ 
b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaSubscriptionHandle.java
@@ -18,6 +18,8 @@
  */
 package org.apache.plc4x.java.opcua.protocol;
 
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+
 import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent;
 import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
 import org.apache.plc4x.java.api.model.PlcConsumerRegistration;
@@ -332,7 +334,8 @@ public class OpcuaSubscriptionHandle extends 
DefaultPlcSubscriptionHandle {
                 LOGGER.error("Failed to start subscription", e);
             }
             return null;
-        });
+        },
+        newSingleThreadExecutor());
     }
 
 
diff --git 
a/plc4j/drivers/opcua/src/test/java/org/apache/plc4x/java/opcua/OpcuaPlcDriverTest.java
 
b/plc4j/drivers/opcua/src/test/java/org/apache/plc4x/java/opcua/OpcuaPlcDriverTest.java
index 3c8fa77889..6f729d2dd8 100644
--- 
a/plc4j/drivers/opcua/src/test/java/org/apache/plc4x/java/opcua/OpcuaPlcDriverTest.java
+++ 
b/plc4j/drivers/opcua/src/test/java/org/apache/plc4x/java/opcua/OpcuaPlcDriverTest.java
@@ -19,15 +19,25 @@
 package org.apache.plc4x.java.opcua;
 
 import io.vavr.collection.List;
+import java.util.ArrayList;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.TimeUnit;
 import org.apache.plc4x.java.DefaultPlcDriverManager;
 import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.PlcConnectionManager;
+import org.apache.plc4x.java.api.PlcDriverManager;
 import 
org.apache.plc4x.java.api.authentication.PlcUsernamePasswordAuthentication;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
 import org.apache.plc4x.java.api.messages.PlcWriteRequest;
 import org.apache.plc4x.java.api.messages.PlcWriteResponse;
 import org.apache.plc4x.java.api.types.PlcResponseCode;
+import org.apache.plc4x.java.opcua.tag.OpcuaTag;
 import org.assertj.core.api.Condition;
 import org.eclipse.milo.examples.server.ExampleServer;
 import org.junit.jupiter.api.*;
@@ -41,6 +51,7 @@ import java.nio.file.Paths;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Stream;
 
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
 import static org.assertj.core.api.Assertions.assertThat;
 
 public class OpcuaPlcDriverTest {
@@ -142,6 +153,76 @@ public class OpcuaPlcDriverTest {
         }
     }
 
+    @Nested
+    class SmokeTest {
+        @Test
+        public void manyReconnectionsWithSingleSubscription() throws Exception 
{
+            PlcDriverManager driverManager = new DefaultPlcDriverManager();
+            PlcConnectionManager connectionManager = 
driverManager.getConnectionManager();
+
+            for (int i = 0; i < 25; i++) {
+                try (PlcConnection connection = 
connectionManager.getConnection(tcpConnectionAddress)) {
+
+                    PlcSubscriptionRequest request = 
connection.subscriptionRequestBuilder()
+                            .addChangeOfStateTag("Demo", 
OpcuaTag.of(INTEGER_IDENTIFIER_READ_WRITE))
+                            .build();
+
+                    PlcSubscriptionResponse response = 
request.execute().get(60, TimeUnit.SECONDS);
+                    
assertThat(response.getResponseCode("Demo")).isEqualTo(PlcResponseCode.OK);
+
+                    connection.unsubscriptionRequestBuilder()
+                            .addHandles(response.getSubscriptionHandles())
+                            .build()
+                            .execute();
+                }
+            }
+        }
+        @Test
+        public void manySubscriptionsOnSingleConnection() throws Exception {
+            PlcDriverManager driverManager = new DefaultPlcDriverManager();
+            PlcConnectionManager connectionManager = 
driverManager.getConnectionManager();
+
+            ArrayList<PlcSubscriptionResponse> plcSubscriptionResponses = new 
ArrayList<>();
+            ConcurrentLinkedDeque<PlcSubscriptionEvent> events = new 
ConcurrentLinkedDeque<>();
+
+            try (PlcConnection connection = 
connectionManager.getConnection(tcpConnectionAddress)) {
+                for (int i = 0; i < 25; i++) {
+                    PlcSubscriptionRequest request = 
connection.subscriptionRequestBuilder()
+                            .addChangeOfStateTag("Demo", 
OpcuaTag.of(INTEGER_IDENTIFIER_READ_WRITE))
+                            .build();
+
+                    PlcSubscriptionResponse response = 
request.execute().get(60, TimeUnit.SECONDS);
+                    
assertThat(response.getResponseCode("Demo")).isEqualTo(PlcResponseCode.OK);
+
+                    plcSubscriptionResponses.add(response);
+
+                    response.getSubscriptionHandles().forEach(handle -> 
handle.register(events::add));
+                }
+
+                CompletableFuture.supplyAsync(() -> {
+                    for (int i = 0; i < 60; i++) {
+                        if (events.size() == 25) {
+                            break;
+                        }
+                        try {
+                            Thread.sleep(1000);
+                        } catch (InterruptedException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                    return null;
+                }, newSingleThreadExecutor()).get(60, TimeUnit.SECONDS);
+
+                for (PlcSubscriptionResponse response : 
plcSubscriptionResponses) {
+                    connection.unsubscriptionRequestBuilder()
+                            .addHandles(response.getSubscriptionHandles())
+                            .build()
+                            .execute();
+                }
+            }
+        }
+    }
+
     @Nested
     class ConnectionRelated {
         @TestFactory

Reply via email to