This is an automated email from the ASF dual-hosted git repository.

hutcheb pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new b0bc847e92 fix(opcua): Await `writeAndFlush(msg)` & send next msg 
async (#1147)
b0bc847e92 is described below

commit b0bc847e923a2a59c15868b642d377738a082402
Author: Rajmund Takács <[email protected]>
AuthorDate: Wed Oct 18 07:04:28 2023 +0200

    fix(opcua): Await `writeAndFlush(msg)` & send next msg async (#1147)
    
    * plc4j-driver-opcua: Await writeAndFlush(msg) & send next msg async
    
      All OPC-UA messages contain a sequence number, which is independent
    from the normal TCP sequence numbers. This sequence number is checked
    by the server, that may refuse to respond, if the sequence number is
    lower than the last received message's. Packetization itself happens
    asynchronously in netty, some time after `writeAndFlush(msg)` has been
    invoked by the application code. If there are concurrent calls to this
    function, there is no guarantee that messages are packeted in the same
    order as they have been added to `writeAndFlush(msg)`. This, in some
    cases, can cause OPC-UA messages being delivered to the server in
    different order, than it is specified by their sequence numbers, and
    the server may drop these messages, and the client eventually times out.
    
      As TCP guarantees delivering packets in the correct order, a trival
    solution is to simply wait for packetization to complete, before adding
    the next message to the pipeline.
    
      But yet comes a design problem: OPC-UA response handlers were written
    in a way, that they may send new messages, while processing the response.
    These handlers are executed on the netty event loop thread, which is
    shared among all Netty I/O operations, such as receiving/sending messages.
    This essentially means, that if a response handler is currently being
    executed, you cannot start packetizing a new message, because that would
    require the response handler to finish, which is waiting for guess what,
    the packetization to complete.
    
      Solution to this is to execute response handlers asynchronously, so
    they don't occupy the Netty event handler thread.
    
      This commit also fixes the `OpcuaPlcDriverTest` flakyness, experienced
    after https://github.com/apache/plc4x/pull/1139 had been merged.
    
    See also: https://ci-builds.apache.org/job/PLC4X/job/PLC4X/job/develop/1695/
    
    * Put back a line, disappered after `git merge`
    
    ---------
    
    Co-authored-by: Ben Hutcheson <[email protected]>
---
 .../plc4x/java/opcua/context/SecureChannel.java    | 46 ++++++++++++----------
 .../java/opcua/protocol/OpcuaProtocolLogic.java    |  8 ++--
 .../apache/plc4x/java/spi/Plc4xNettyWrapper.java   |  2 +-
 .../spi/internal/DefaultConversationContext.java   |  2 +-
 .../plc4x/java/spi/Plc4xNettyWrapperTest.java      |  4 ++
 5 files changed, 37 insertions(+), 25 deletions(-)

diff --git 
a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/SecureChannel.java
 
b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/SecureChannel.java
index df9ba53a99..3574fdfd1b 100644
--- 
a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/SecureChannel.java
+++ 
b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/SecureChannel.java
@@ -20,6 +20,7 @@ package org.apache.plc4x.java.opcua.context;
 
 import static java.lang.Thread.currentThread;
 import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static java.util.concurrent.ForkJoinPool.commonPool;
 
 import java.time.Instant;
 import org.apache.commons.lang3.RandomStringUtils;
@@ -179,7 +180,7 @@ public class SecureChannel {
         }
     }
 
-    public void submit(ConversationContext<OpcuaAPU> context, 
Consumer<TimeoutException> onTimeout, BiConsumer<OpcuaAPU, Throwable> error, 
Consumer<byte[]> consumer, WriteBufferByteBased buffer) {
+    public synchronized void submit(ConversationContext<OpcuaAPU> context, 
Consumer<TimeoutException> onTimeout, BiConsumer<OpcuaAPU, Throwable> error, 
Consumer<byte[]> consumer, WriteBufferByteBased buffer) {
         int transactionId = 
channelTransactionManager.getTransactionIdentifier();
 
         //TODO: We need to split large messages up into chunks if it is larger 
than the sendBufferSize
@@ -236,7 +237,7 @@ public class SecureChannel {
                             tokenId.set(opcuaResponse.getSecureTokenId());
                             channelId.set(opcuaResponse.getSecureChannelId());
 
-                            consumer.accept(messageBuffer.toByteArray());
+                            commonPool().submit(() -> 
consumer.accept(messageBuffer.toByteArray()));
                         }
                     });
             } catch (Exception e) {
@@ -267,7 +268,7 @@ public class SecureChannel {
             .expectResponse(OpcuaAPU.class, REQUEST_TIMEOUT)
             .check(p -> p.getMessage() instanceof OpcuaAcknowledgeResponse)
             .unwrap(p -> (OpcuaAcknowledgeResponse) p.getMessage())
-            .handle(opcuaAcknowledgeResponse -> 
onConnectOpenSecureChannel(context, opcuaAcknowledgeResponse));
+            .handle(opcuaAcknowledgeResponse -> commonPool().submit(() -> 
onConnectOpenSecureChannel(context, opcuaAcknowledgeResponse)));
         channelTransactionManager.submit(requestConsumer, 
channelTransactionManager.getTransactionIdentifier());
     }
 
@@ -361,16 +362,18 @@ public class SecureChannel {
                             LOGGER.error("Failed to connect to opc ua server 
for the following reason:- {}, {}", ((ResponseHeader) 
fault.getResponseHeader()).getServiceResult().getStatusCode(), 
OpcuaStatusCode.enumForValue(((ResponseHeader) 
fault.getResponseHeader()).getServiceResult().getStatusCode()));
                         } else {
                             LOGGER.debug("Got Secure Response Connection 
Response");
-                            try {
-                                OpenSecureChannelResponse 
openSecureChannelResponse = (OpenSecureChannelResponse) message.getBody();
-                                ChannelSecurityToken securityToken = 
(ChannelSecurityToken) openSecureChannelResponse.getSecurityToken();
-                                tokenId.set((int) securityToken.getTokenId());
-                                channelId.set((int) 
securityToken.getChannelId());
-                                lifetime = securityToken.getRevisedLifetime();
-                                onConnectCreateSessionRequest(context);
-                            } catch (PlcConnectionException e) {
-                                LOGGER.error("Error occurred while connecting 
to OPC UA server", e);
-                            }
+                            OpenSecureChannelResponse 
openSecureChannelResponse = (OpenSecureChannelResponse) message.getBody();
+                            ChannelSecurityToken securityToken = 
(ChannelSecurityToken) openSecureChannelResponse.getSecurityToken();
+                            tokenId.set((int) securityToken.getTokenId());
+                            channelId.set((int) securityToken.getChannelId());
+                            lifetime = securityToken.getRevisedLifetime();
+                            commonPool().submit(() -> {
+                                try {
+                                    onConnectCreateSessionRequest(context);
+                                } catch (PlcConnectionException e) {
+                                    LOGGER.error("Error occurred while 
connecting to OPC UA server", e);
+                                }
+                            });
                         }
                     } catch (ParseException e) {
                         LOGGER.error("Error parsing", e);
@@ -760,7 +763,7 @@ public class SecureChannel {
             .unwrap(p -> (OpcuaAcknowledgeResponse) p.getMessage())
             .handle(opcuaAcknowledgeResponse -> {
                 LOGGER.debug("Got Hello Response Connection Response");
-                onDiscoverOpenSecureChannel(context, opcuaAcknowledgeResponse);
+                commonPool().submit(() -> onDiscoverOpenSecureChannel(context, 
opcuaAcknowledgeResponse));
             });
 
         channelTransactionManager.submit(requestConsumer, 
channelTransactionManager.getTransactionIdentifier());
@@ -831,11 +834,14 @@ public class SecureChannel {
                             LOGGER.error("Failed to connect to opc ua server 
for the following reason:- {}, {}", ((ResponseHeader) 
fault.getResponseHeader()).getServiceResult().getStatusCode(), 
OpcuaStatusCode.enumForValue(((ResponseHeader) 
fault.getResponseHeader()).getServiceResult().getStatusCode()));
                         } else {
                             LOGGER.debug("Got Secure Response Connection 
Response");
-                            try {
-                                onDiscoverGetEndpointsRequest(context, 
opcuaOpenResponse, (OpenSecureChannelResponse) message.getBody());
-                            } catch (PlcConnectionException e) {
-                                LOGGER.error("Error occurred while connecting 
to OPC UA server");
-                            }
+                            commonPool().submit(() -> {
+                                try {
+                                    onDiscoverGetEndpointsRequest(context, 
opcuaOpenResponse,
+                                            (OpenSecureChannelResponse) 
message.getBody());
+                                } catch (PlcConnectionException e) {
+                                    LOGGER.error("Error occurred while 
connecting to OPC UA server");
+                                }
+                            });
                         }
                     } catch (ParseException e) {
                         LOGGER.debug("error caught", e);
@@ -940,7 +946,7 @@ public class SecureChannel {
                         } catch (NoSuchAlgorithmException e) {
                             LOGGER.error("Failed to find hashing algorithm");
                         }
-                        onDiscoverCloseSecureChannel(context, response);
+                        commonPool().submit(() -> 
onDiscoverCloseSecureChannel(context, response));
                     } catch (ParseException e) {
                         LOGGER.error("Error parsing", e);
                     }
diff --git 
a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaProtocolLogic.java
 
b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaProtocolLogic.java
index 5412b68e6a..071914a351 100644
--- 
a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaProtocolLogic.java
+++ 
b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaProtocolLogic.java
@@ -18,6 +18,8 @@
  */
 package org.apache.plc4x.java.opcua.protocol;
 
+import static java.util.concurrent.ForkJoinPool.commonPool;
+
 import java.nio.ByteBuffer;
 import org.apache.plc4x.java.api.authentication.PlcAuthentication;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
@@ -98,7 +100,7 @@ public class OpcuaProtocolLogic extends 
Plc4xProtocolBase<OpcuaAPU> implements H
         for (Map.Entry<Long, OpcuaSubscriptionHandle> subscriber : 
subscriptions.entrySet()) {
             subscriber.getValue().stopSubscriber();
         }
-        channel.onDisconnect(context);
+        commonPool().submit(() -> channel.onDisconnect(context));
     }
 
     @Override
@@ -118,7 +120,7 @@ public class OpcuaProtocolLogic extends 
Plc4xProtocolBase<OpcuaAPU> implements H
                 return;
             }
         }
-        this.channel.onConnect(context);
+        commonPool().submit(() -> this.channel.onConnect(context));
     }
 
     @Override
@@ -133,7 +135,7 @@ public class OpcuaProtocolLogic extends 
Plc4xProtocolBase<OpcuaAPU> implements H
                 return;
             }
         }
-        channel.onDiscover(context);
+        commonPool().submit(() -> channel.onDiscover(context));
     }
 
     private SecureChannel createSecureChannel(PlcAuthentication 
authentication) {
diff --git 
a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java 
b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
index a97d39ac7b..27abd9324d 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
@@ -93,7 +93,7 @@ public class Plc4xNettyWrapper<T> extends 
MessageToMessageCodec<T, Object> {
 
             @Override
             public void sendToWire(T msg) {
-                pipeline.writeAndFlush(msg);
+                pipeline.writeAndFlush(msg).syncUninterruptibly();
             }
 
             @Override
diff --git 
a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultConversationContext.java
 
b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultConversationContext.java
index 96de091dd0..d382ba8d61 100644
--- 
a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultConversationContext.java
+++ 
b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultConversationContext.java
@@ -67,7 +67,7 @@ public class DefaultConversationContext<T1> implements 
ConversationContext<T1> {
     @Override
     public void sendToWire(T1 msg) {
         logger.trace("Sending to wire {}", msg);
-        channelHandlerContext.channel().writeAndFlush(msg);
+        
channelHandlerContext.channel().writeAndFlush(msg).syncUninterruptibly();
     }
 
     @Override
diff --git 
a/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/Plc4xNettyWrapperTest.java 
b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/Plc4xNettyWrapperTest.java
index 26ca73c9d5..07aa5a0a1c 100644
--- 
a/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/Plc4xNettyWrapperTest.java
+++ 
b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/Plc4xNettyWrapperTest.java
@@ -19,6 +19,7 @@
 package org.apache.plc4x.java.spi;
 
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPipeline;
 import org.apache.plc4x.java.spi.events.ConnectEvent;
@@ -35,7 +36,9 @@ import java.util.Date;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 @ExtendWith(MockitoExtension.class)
@@ -65,6 +68,7 @@ class Plc4xNettyWrapperTest {
         doNothing().when(protocol).onConnect(captor.capture());
 
         when(channelHandlerContext.channel()).thenReturn(channel);
+        
when(channel.writeAndFlush(any())).thenReturn(mock(ChannelFuture.class));
 
         wrapper.userEventTriggered(channelHandlerContext, new ConnectEvent());
         conversationContext = captor.getValue();

Reply via email to