This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new 654929dda0 fix(plc4j/opcua): Fix keepalive threads are never shut down
(#1139)
654929dda0 is described below
commit 654929dda0047b65c984379eca2adb43fb77252c
Author: Rajmund Takács <[email protected]>
AuthorDate: Fri Oct 13 08:33:19 2023 +0200
fix(plc4j/opcua): Fix keepalive threads are never shut down (#1139)
* plc4j-driver-opcua: Fix keepalive threads are never shut down
* plc4j-driver-opcua: Remove superfluous log, and set interrupt flag
---
.../plc4x/java/opcua/context/SecureChannel.java | 40 +++++++----
.../opcua/protocol/OpcuaSubscriptionHandle.java | 5 +-
.../plc4x/java/opcua/OpcuaPlcDriverTest.java | 81 ++++++++++++++++++++++
3 files changed, 113 insertions(+), 13 deletions(-)
diff --git
a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/SecureChannel.java
b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/SecureChannel.java
index 16e3e36616..9d80a7b4e6 100644
---
a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/SecureChannel.java
+++
b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/SecureChannel.java
@@ -18,6 +18,9 @@
*/
package org.apache.plc4x.java.opcua.context;
+import static java.lang.Thread.currentThread;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.plc4x.java.api.authentication.PlcAuthentication;
@@ -45,6 +48,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
@@ -122,6 +126,8 @@ public class SecureChannel {
private CompletableFuture<Void> keepAlive;
private final List<String> endpoints = new ArrayList<>();
private final AtomicLong senderSequenceNumber = new AtomicLong();
+ private final AtomicBoolean enableKeepalive = new AtomicBoolean(true);
+ private double sessionTimeout = 120000L;
public SecureChannel(OpcuaDriverContext driverContext, OpcuaConfiguration
configuration, PlcAuthentication authentication) {
this.configuration = configuration;
@@ -415,7 +421,7 @@ public class SecureChannel {
new PascalString(sessionName),
new PascalByteString(clientNonce.length, clientNonce),
NULL_BYTE_STRING,
- 120000L,
+ sessionTimeout,
0L
);
@@ -452,6 +458,7 @@ public class SecureChannel {
responseMessage = (CreateSessionResponse)
unknownExtensionObject;
authenticationToken =
responseMessage.getAuthenticationToken().getNodeId();
+ sessionTimeout =
responseMessage.getRevisedSessionTimeout();
onConnectActivateSessionRequest(context,
responseMessage, (CreateSessionResponse) message.getBody());
} else {
@@ -601,7 +608,7 @@ public class SecureChannel {
int requestHandle = getRequestHandle();
if (keepAlive != null) {
- keepAlive.complete(null);
+ enableKeepalive.set(false);
}
ExpandedNodeId expandedNodeId = new ExpandedNodeId(
@@ -995,14 +1002,7 @@ public class SecureChannel {
private void keepAlive() {
keepAlive = CompletableFuture.supplyAsync(() -> {
- while (true) {
-
- try {
- Thread.sleep((long) Math.ceil(this.lifetime * 0.75f));
- } catch (InterruptedException e) {
- LOGGER.trace("Interrupted Exception");
- }
-
+ while (enableKeepalive.get()) {
int transactionId =
channelTransactionManager.getTransactionIdentifier();
RequestHeader requestHeader = new RequestHeader(new
NodeId(authenticationToken),
@@ -1071,7 +1071,14 @@ public class SecureChannel {
.unwrap(apuMessage ->
encryptionHandler.decodeMessage(apuMessage))
.check(p -> p.getMessage() instanceof
OpcuaOpenResponse)
.unwrap(p -> (OpcuaOpenResponse) p.getMessage())
- .check(p -> p.getRequestId() == transactionId)
+ .check(p -> {
+ if (p.getRequestId() == transactionId) {
+ senderSequenceNumber.incrementAndGet();
+ return true;
+ } else {
+ return false;
+ }
+ })
.handle(opcuaOpenResponse -> {
try {
ReadBufferByteBased readBuffer = new
ReadBufferByteBased(opcuaOpenResponse.getMessage(),
org.apache.plc4x.java.spi.generation.ByteOrder.LITTLE_ENDIAN);
@@ -1096,8 +1103,17 @@ public class SecureChannel {
} catch (SerializationException | ParseException e) {
LOGGER.error("Unable to to Parse Open Secure Request");
}
+
+ try {
+ Thread.sleep((long) Math.ceil(this.sessionTimeout *
0.25f));
+ } catch (InterruptedException e) {
+ LOGGER.trace("Interrupted Exception");
+ currentThread().interrupt();
+ }
}
- }
+ return null;
+ },
+ newSingleThreadExecutor()
);
}
diff --git
a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaSubscriptionHandle.java
b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaSubscriptionHandle.java
index bde3faac13..e90031be58 100644
---
a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaSubscriptionHandle.java
+++
b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaSubscriptionHandle.java
@@ -18,6 +18,8 @@
*/
package org.apache.plc4x.java.opcua.protocol;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+
import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent;
import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
import org.apache.plc4x.java.api.model.PlcConsumerRegistration;
@@ -332,7 +334,8 @@ public class OpcuaSubscriptionHandle extends
DefaultPlcSubscriptionHandle {
LOGGER.error("Failed to start subscription", e);
}
return null;
- });
+ },
+ newSingleThreadExecutor());
}
diff --git
a/plc4j/drivers/opcua/src/test/java/org/apache/plc4x/java/opcua/OpcuaPlcDriverTest.java
b/plc4j/drivers/opcua/src/test/java/org/apache/plc4x/java/opcua/OpcuaPlcDriverTest.java
index 3c8fa77889..6f729d2dd8 100644
---
a/plc4j/drivers/opcua/src/test/java/org/apache/plc4x/java/opcua/OpcuaPlcDriverTest.java
+++
b/plc4j/drivers/opcua/src/test/java/org/apache/plc4x/java/opcua/OpcuaPlcDriverTest.java
@@ -19,15 +19,25 @@
package org.apache.plc4x.java.opcua;
import io.vavr.collection.List;
+import java.util.ArrayList;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.TimeUnit;
import org.apache.plc4x.java.DefaultPlcDriverManager;
import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.PlcConnectionManager;
+import org.apache.plc4x.java.api.PlcDriverManager;
import
org.apache.plc4x.java.api.authentication.PlcUsernamePasswordAuthentication;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.apache.plc4x.java.api.messages.PlcReadRequest;
import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
import org.apache.plc4x.java.api.messages.PlcWriteRequest;
import org.apache.plc4x.java.api.messages.PlcWriteResponse;
import org.apache.plc4x.java.api.types.PlcResponseCode;
+import org.apache.plc4x.java.opcua.tag.OpcuaTag;
import org.assertj.core.api.Condition;
import org.eclipse.milo.examples.server.ExampleServer;
import org.junit.jupiter.api.*;
@@ -41,6 +51,7 @@ import java.nio.file.Paths;
import java.util.concurrent.ExecutionException;
import java.util.stream.Stream;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.assertj.core.api.Assertions.assertThat;
public class OpcuaPlcDriverTest {
@@ -142,6 +153,76 @@ public class OpcuaPlcDriverTest {
}
}
+ @Nested
+ class SmokeTest {
+ @Test
+ public void manyReconnectionsWithSingleSubscription() throws Exception
{
+ PlcDriverManager driverManager = new DefaultPlcDriverManager();
+ PlcConnectionManager connectionManager =
driverManager.getConnectionManager();
+
+ for (int i = 0; i < 25; i++) {
+ try (PlcConnection connection =
connectionManager.getConnection(tcpConnectionAddress)) {
+
+ PlcSubscriptionRequest request =
connection.subscriptionRequestBuilder()
+ .addChangeOfStateTag("Demo",
OpcuaTag.of(INTEGER_IDENTIFIER_READ_WRITE))
+ .build();
+
+ PlcSubscriptionResponse response =
request.execute().get(60, TimeUnit.SECONDS);
+
assertThat(response.getResponseCode("Demo")).isEqualTo(PlcResponseCode.OK);
+
+ connection.unsubscriptionRequestBuilder()
+ .addHandles(response.getSubscriptionHandles())
+ .build()
+ .execute();
+ }
+ }
+ }
+ @Test
+ public void manySubscriptionsOnSingleConnection() throws Exception {
+ PlcDriverManager driverManager = new DefaultPlcDriverManager();
+ PlcConnectionManager connectionManager =
driverManager.getConnectionManager();
+
+ ArrayList<PlcSubscriptionResponse> plcSubscriptionResponses = new
ArrayList<>();
+ ConcurrentLinkedDeque<PlcSubscriptionEvent> events = new
ConcurrentLinkedDeque<>();
+
+ try (PlcConnection connection =
connectionManager.getConnection(tcpConnectionAddress)) {
+ for (int i = 0; i < 25; i++) {
+ PlcSubscriptionRequest request =
connection.subscriptionRequestBuilder()
+ .addChangeOfStateTag("Demo",
OpcuaTag.of(INTEGER_IDENTIFIER_READ_WRITE))
+ .build();
+
+ PlcSubscriptionResponse response =
request.execute().get(60, TimeUnit.SECONDS);
+
assertThat(response.getResponseCode("Demo")).isEqualTo(PlcResponseCode.OK);
+
+ plcSubscriptionResponses.add(response);
+
+ response.getSubscriptionHandles().forEach(handle ->
handle.register(events::add));
+ }
+
+ CompletableFuture.supplyAsync(() -> {
+ for (int i = 0; i < 60; i++) {
+ if (events.size() == 25) {
+ break;
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return null;
+ }, newSingleThreadExecutor()).get(60, TimeUnit.SECONDS);
+
+ for (PlcSubscriptionResponse response :
plcSubscriptionResponses) {
+ connection.unsubscriptionRequestBuilder()
+ .addHandles(response.getSubscriptionHandles())
+ .build()
+ .execute();
+ }
+ }
+ }
+ }
+
@Nested
class ConnectionRelated {
@TestFactory