This is an automated email from the ASF dual-hosted git repository.
hutcheb pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new b0bc847e92 fix(opcua): Await `writeAndFlush(msg)` & send next msg
async (#1147)
b0bc847e92 is described below
commit b0bc847e923a2a59c15868b642d377738a082402
Author: Rajmund Takács <[email protected]>
AuthorDate: Wed Oct 18 07:04:28 2023 +0200
fix(opcua): Await `writeAndFlush(msg)` & send next msg async (#1147)
* plc4j-driver-opcua: Await writeAndFlush(msg) & send next msg async
All OPC-UA messages contain a sequence number, which is independent
from the normal TCP sequence numbers. This sequence number is checked
by the server, that may refuse to respond, if the sequence number is
lower than the last received message's. Packetization itself happens
asynchronously in netty, some time after `writeAndFlush(msg)` has been
invoked by the application code. If there are concurrent calls to this
function, there is no guarantee that messages are packeted in the same
order as they have been added to `writeAndFlush(msg)`. This, in some
cases, can cause OPC-UA messages being delivered to the server in
different order, than it is specified by their sequence numbers, and
the server may drop these messages, and the client eventually times out.
As TCP guarantees delivering packets in the correct order, a trival
solution is to simply wait for packetization to complete, before adding
the next message to the pipeline.
But yet comes a design problem: OPC-UA response handlers were written
in a way, that they may send new messages, while processing the response.
These handlers are executed on the netty event loop thread, which is
shared among all Netty I/O operations, such as receiving/sending messages.
This essentially means, that if a response handler is currently being
executed, you cannot start packetizing a new message, because that would
require the response handler to finish, which is waiting for guess what,
the packetization to complete.
Solution to this is to execute response handlers asynchronously, so
they don't occupy the Netty event handler thread.
This commit also fixes the `OpcuaPlcDriverTest` flakyness, experienced
after https://github.com/apache/plc4x/pull/1139 had been merged.
See also: https://ci-builds.apache.org/job/PLC4X/job/PLC4X/job/develop/1695/
* Put back a line, disappered after `git merge`
---------
Co-authored-by: Ben Hutcheson <[email protected]>
---
.../plc4x/java/opcua/context/SecureChannel.java | 46 ++++++++++++----------
.../java/opcua/protocol/OpcuaProtocolLogic.java | 8 ++--
.../apache/plc4x/java/spi/Plc4xNettyWrapper.java | 2 +-
.../spi/internal/DefaultConversationContext.java | 2 +-
.../plc4x/java/spi/Plc4xNettyWrapperTest.java | 4 ++
5 files changed, 37 insertions(+), 25 deletions(-)
diff --git
a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/SecureChannel.java
b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/SecureChannel.java
index df9ba53a99..3574fdfd1b 100644
---
a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/SecureChannel.java
+++
b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/SecureChannel.java
@@ -20,6 +20,7 @@ package org.apache.plc4x.java.opcua.context;
import static java.lang.Thread.currentThread;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static java.util.concurrent.ForkJoinPool.commonPool;
import java.time.Instant;
import org.apache.commons.lang3.RandomStringUtils;
@@ -179,7 +180,7 @@ public class SecureChannel {
}
}
- public void submit(ConversationContext<OpcuaAPU> context,
Consumer<TimeoutException> onTimeout, BiConsumer<OpcuaAPU, Throwable> error,
Consumer<byte[]> consumer, WriteBufferByteBased buffer) {
+ public synchronized void submit(ConversationContext<OpcuaAPU> context,
Consumer<TimeoutException> onTimeout, BiConsumer<OpcuaAPU, Throwable> error,
Consumer<byte[]> consumer, WriteBufferByteBased buffer) {
int transactionId =
channelTransactionManager.getTransactionIdentifier();
//TODO: We need to split large messages up into chunks if it is larger
than the sendBufferSize
@@ -236,7 +237,7 @@ public class SecureChannel {
tokenId.set(opcuaResponse.getSecureTokenId());
channelId.set(opcuaResponse.getSecureChannelId());
- consumer.accept(messageBuffer.toByteArray());
+ commonPool().submit(() ->
consumer.accept(messageBuffer.toByteArray()));
}
});
} catch (Exception e) {
@@ -267,7 +268,7 @@ public class SecureChannel {
.expectResponse(OpcuaAPU.class, REQUEST_TIMEOUT)
.check(p -> p.getMessage() instanceof OpcuaAcknowledgeResponse)
.unwrap(p -> (OpcuaAcknowledgeResponse) p.getMessage())
- .handle(opcuaAcknowledgeResponse ->
onConnectOpenSecureChannel(context, opcuaAcknowledgeResponse));
+ .handle(opcuaAcknowledgeResponse -> commonPool().submit(() ->
onConnectOpenSecureChannel(context, opcuaAcknowledgeResponse)));
channelTransactionManager.submit(requestConsumer,
channelTransactionManager.getTransactionIdentifier());
}
@@ -361,16 +362,18 @@ public class SecureChannel {
LOGGER.error("Failed to connect to opc ua server
for the following reason:- {}, {}", ((ResponseHeader)
fault.getResponseHeader()).getServiceResult().getStatusCode(),
OpcuaStatusCode.enumForValue(((ResponseHeader)
fault.getResponseHeader()).getServiceResult().getStatusCode()));
} else {
LOGGER.debug("Got Secure Response Connection
Response");
- try {
- OpenSecureChannelResponse
openSecureChannelResponse = (OpenSecureChannelResponse) message.getBody();
- ChannelSecurityToken securityToken =
(ChannelSecurityToken) openSecureChannelResponse.getSecurityToken();
- tokenId.set((int) securityToken.getTokenId());
- channelId.set((int)
securityToken.getChannelId());
- lifetime = securityToken.getRevisedLifetime();
- onConnectCreateSessionRequest(context);
- } catch (PlcConnectionException e) {
- LOGGER.error("Error occurred while connecting
to OPC UA server", e);
- }
+ OpenSecureChannelResponse
openSecureChannelResponse = (OpenSecureChannelResponse) message.getBody();
+ ChannelSecurityToken securityToken =
(ChannelSecurityToken) openSecureChannelResponse.getSecurityToken();
+ tokenId.set((int) securityToken.getTokenId());
+ channelId.set((int) securityToken.getChannelId());
+ lifetime = securityToken.getRevisedLifetime();
+ commonPool().submit(() -> {
+ try {
+ onConnectCreateSessionRequest(context);
+ } catch (PlcConnectionException e) {
+ LOGGER.error("Error occurred while
connecting to OPC UA server", e);
+ }
+ });
}
} catch (ParseException e) {
LOGGER.error("Error parsing", e);
@@ -760,7 +763,7 @@ public class SecureChannel {
.unwrap(p -> (OpcuaAcknowledgeResponse) p.getMessage())
.handle(opcuaAcknowledgeResponse -> {
LOGGER.debug("Got Hello Response Connection Response");
- onDiscoverOpenSecureChannel(context, opcuaAcknowledgeResponse);
+ commonPool().submit(() -> onDiscoverOpenSecureChannel(context,
opcuaAcknowledgeResponse));
});
channelTransactionManager.submit(requestConsumer,
channelTransactionManager.getTransactionIdentifier());
@@ -831,11 +834,14 @@ public class SecureChannel {
LOGGER.error("Failed to connect to opc ua server
for the following reason:- {}, {}", ((ResponseHeader)
fault.getResponseHeader()).getServiceResult().getStatusCode(),
OpcuaStatusCode.enumForValue(((ResponseHeader)
fault.getResponseHeader()).getServiceResult().getStatusCode()));
} else {
LOGGER.debug("Got Secure Response Connection
Response");
- try {
- onDiscoverGetEndpointsRequest(context,
opcuaOpenResponse, (OpenSecureChannelResponse) message.getBody());
- } catch (PlcConnectionException e) {
- LOGGER.error("Error occurred while connecting
to OPC UA server");
- }
+ commonPool().submit(() -> {
+ try {
+ onDiscoverGetEndpointsRequest(context,
opcuaOpenResponse,
+ (OpenSecureChannelResponse)
message.getBody());
+ } catch (PlcConnectionException e) {
+ LOGGER.error("Error occurred while
connecting to OPC UA server");
+ }
+ });
}
} catch (ParseException e) {
LOGGER.debug("error caught", e);
@@ -940,7 +946,7 @@ public class SecureChannel {
} catch (NoSuchAlgorithmException e) {
LOGGER.error("Failed to find hashing algorithm");
}
- onDiscoverCloseSecureChannel(context, response);
+ commonPool().submit(() ->
onDiscoverCloseSecureChannel(context, response));
} catch (ParseException e) {
LOGGER.error("Error parsing", e);
}
diff --git
a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaProtocolLogic.java
b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaProtocolLogic.java
index 5412b68e6a..071914a351 100644
---
a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaProtocolLogic.java
+++
b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaProtocolLogic.java
@@ -18,6 +18,8 @@
*/
package org.apache.plc4x.java.opcua.protocol;
+import static java.util.concurrent.ForkJoinPool.commonPool;
+
import java.nio.ByteBuffer;
import org.apache.plc4x.java.api.authentication.PlcAuthentication;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
@@ -98,7 +100,7 @@ public class OpcuaProtocolLogic extends
Plc4xProtocolBase<OpcuaAPU> implements H
for (Map.Entry<Long, OpcuaSubscriptionHandle> subscriber :
subscriptions.entrySet()) {
subscriber.getValue().stopSubscriber();
}
- channel.onDisconnect(context);
+ commonPool().submit(() -> channel.onDisconnect(context));
}
@Override
@@ -118,7 +120,7 @@ public class OpcuaProtocolLogic extends
Plc4xProtocolBase<OpcuaAPU> implements H
return;
}
}
- this.channel.onConnect(context);
+ commonPool().submit(() -> this.channel.onConnect(context));
}
@Override
@@ -133,7 +135,7 @@ public class OpcuaProtocolLogic extends
Plc4xProtocolBase<OpcuaAPU> implements H
return;
}
}
- channel.onDiscover(context);
+ commonPool().submit(() -> channel.onDiscover(context));
}
private SecureChannel createSecureChannel(PlcAuthentication
authentication) {
diff --git
a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
index a97d39ac7b..27abd9324d 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
@@ -93,7 +93,7 @@ public class Plc4xNettyWrapper<T> extends
MessageToMessageCodec<T, Object> {
@Override
public void sendToWire(T msg) {
- pipeline.writeAndFlush(msg);
+ pipeline.writeAndFlush(msg).syncUninterruptibly();
}
@Override
diff --git
a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultConversationContext.java
b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultConversationContext.java
index 96de091dd0..d382ba8d61 100644
---
a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultConversationContext.java
+++
b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultConversationContext.java
@@ -67,7 +67,7 @@ public class DefaultConversationContext<T1> implements
ConversationContext<T1> {
@Override
public void sendToWire(T1 msg) {
logger.trace("Sending to wire {}", msg);
- channelHandlerContext.channel().writeAndFlush(msg);
+
channelHandlerContext.channel().writeAndFlush(msg).syncUninterruptibly();
}
@Override
diff --git
a/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/Plc4xNettyWrapperTest.java
b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/Plc4xNettyWrapperTest.java
index 26ca73c9d5..07aa5a0a1c 100644
---
a/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/Plc4xNettyWrapperTest.java
+++
b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/Plc4xNettyWrapperTest.java
@@ -19,6 +19,7 @@
package org.apache.plc4x.java.spi;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import org.apache.plc4x.java.spi.events.ConnectEvent;
@@ -35,7 +36,9 @@ import java.util.Date;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@@ -65,6 +68,7 @@ class Plc4xNettyWrapperTest {
doNothing().when(protocol).onConnect(captor.capture());
when(channelHandlerContext.channel()).thenReturn(channel);
+
when(channel.writeAndFlush(any())).thenReturn(mock(ChannelFuture.class));
wrapper.userEventTriggered(channelHandlerContext, new ConnectEvent());
conversationContext = captor.getValue();