This is an automated email from the ASF dual-hosted git repository.

cgarcia pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new 1f1485f113 Fix/s7hmux (#2109)
1f1485f113 is described below

commit 1f1485f11360494c892d47fba44c2fa9954da26a
Author: César José García León <[email protected]>
AuthorDate: Sat May 24 17:00:32 2025 -0400

    Fix/s7hmux (#2109)
    
    * S7HA restart problem.
    
    * Try to detect buffer release.
---
 .../java/s7/readwrite/protocol/S7HMuxImpl.java     | 17 ++++++++--
 .../s7/readwrite/protocol/S7HPlcConnection.java    | 38 ++++++++++++++++++----
 .../s7/readwrite/protocol/S7ProtocolLogic.java     |  7 ++--
 3 files changed, 49 insertions(+), 13 deletions(-)

diff --git 
a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7HMuxImpl.java
 
b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7HMuxImpl.java
index 33c961509e..6d0c3c0d7c 100644
--- 
a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7HMuxImpl.java
+++ 
b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7HMuxImpl.java
@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
 
 import java.time.LocalTime;
 import java.util.List;
+import java.util.logging.Level;
 
 /**
  * Implementation of a multiplexing channel, from an embedded channel to two
@@ -201,7 +202,7 @@ public class S7HMuxImpl extends 
MessageToMessageCodec<ByteBuf, ByteBuf> implemen
         }
 
         if (evt instanceof DisconnectEvent) {
-            logger.info("DisconnectEvent");
+//            logger.info("userEventTriggered -> DisconnectEvent");
         }
         
         // trigger other event handlers after IS_CONNECTED was set
@@ -304,18 +305,28 @@ public class S7HMuxImpl extends 
MessageToMessageCodec<ByteBuf, ByteBuf> implemen
 
         } else if (((!this.primaryChannel.isActive()) && (tcpChannel == 
this.primaryChannel)) ||
             (primary_channel.isActive())) {
-            synchronized (tcpChannel) {
+            synchronized (tcpChannel) {                
                 tcpChannel.close();
                 this.primaryChannel = primary_channel;
                 tcpChannel = primary_channel;
                 embededChannel.attr(IS_PRIMARY).set(true);
 
                 if (tcpChannel.isActive()) {
+                    logger.info("Reassigns the inactive primary channel and 
send ConnectEvent..");
                     embedCtx.fireUserEventTriggered(new ConnectEvent());
                 }
             }
         } else if (primary_channel.isActive()) {
-
+            synchronized (tcpChannel) {
+                tcpChannel.close();
+                this.primaryChannel = primary_channel;
+                tcpChannel = primary_channel;
+                embededChannel.attr(IS_PRIMARY).set(true);
+                logger.info("Reassigns the primary channel and send 
ConnectEvent.");
+                if (tcpChannel.isActive()) {
+                    embedCtx.fireUserEventTriggered(new ConnectEvent());
+                }
+            }            
         }
     }
 
diff --git 
a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7HPlcConnection.java
 
b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7HPlcConnection.java
index aee3bb927d..dbf749b4f8 100644
--- 
a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7HPlcConnection.java
+++ 
b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7HPlcConnection.java
@@ -20,6 +20,11 @@ package org.apache.plc4x.java.s7.readwrite.protocol;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
 import io.netty.channel.embedded.EmbeddedChannel;
 import io.netty.handler.codec.MessageToMessageCodec;
 import io.netty.handler.logging.LogLevel;
@@ -50,9 +55,15 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.*;
+import java.util.stream.Stream;
 import org.apache.plc4x.java.api.exceptions.PlcUnsupportedOperationException;
+import org.apache.plc4x.java.api.listener.ConnectionStateListener;
 import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
 import org.apache.plc4x.java.s7.readwrite.utils.S7PlcSubscriptionRequest;
+import org.apache.plc4x.java.spi.events.ConnectEvent;
+import org.apache.plc4x.java.spi.events.DisconnectedEvent;
+import org.apache.plc4x.java.spi.events.DiscoverEvent;
+import org.apache.plc4x.java.spi.events.DiscoveredEvent;
 
 /**
  * This object generates the main connection and includes the management
@@ -85,6 +96,8 @@ public class S7HPlcConnection extends 
DefaultNettyPlcConnection implements Runna
 
     protected int slicePing = 0;
     protected int sliceRetryTime = 0;
+    
+    protected int retrysPing = 0;
 
     public S7HPlcConnection(
         boolean canPing,
@@ -147,13 +160,13 @@ public class S7HPlcConnection extends 
DefaultNettyPlcConnection implements Runna
                     getChannelHandler(sessionSetupCompleteFuture,
                         sessionDisconnectCompleteFuture,
                         sessionDiscoveredCompleteFuture));
-
+                channel.pipeline().removeLast();
                 channel.pipeline().addFirst("Multiplexor", s7hmux);
 
             }
 
             ((S7HMux) s7hmux).setEmbededhannel(channel, configuration);
-//            channel.pipeline().addFirst((new LoggingHandler("CEOS"))); 
+            //channel.pipeline().addFirst((new LoggingHandler("CEOS"))); 
             /*
             channel.closeFuture().addListener(future -> {
                 if (!sessionSetupCompleteFuture.isDone()) {
@@ -205,15 +218,18 @@ public class S7HPlcConnection extends 
DefaultNettyPlcConnection implements Runna
             connected = true;
             //((EmbeddedChannel) channel).runPendingTasks();
         } catch (InterruptedException e) {
+            logger.error(e.getMessage());
             Thread.currentThread().interrupt();
             throw new PlcConnectionException(e);
         } catch (ExecutionException e) {
+            logger.error(e.getMessage());            
             throw new PlcConnectionException(e);
         }
     }
 
     @Override
     public void close() throws PlcConnectionException {
+        logger.info("Close connection.");
         if (closed) {
             return;
         }
@@ -228,6 +244,7 @@ public class S7HPlcConnection extends 
DefaultNettyPlcConnection implements Runna
                     primaryChannel.pipeline().remove(MULTIPLEXER);
                     primaryChannel.pipeline().fireUserEventTriggered(new 
CloseConnectionEvent());
                     primaryChannel.eventLoop().shutdownGracefully();
+                    logger.info("Close primary channel.");                    
                 } catch (Exception ex) {
                     logger.info(ex.toString());
                 }
@@ -239,15 +256,17 @@ public class S7HPlcConnection extends 
DefaultNettyPlcConnection implements Runna
                 secondaryChannel.pipeline().remove(MULTIPLEXER);
                 secondaryChannel.pipeline().fireUserEventTriggered(new 
CloseConnectionEvent());
                 secondaryChannel.eventLoop().shutdownGracefully();
+                logger.info("Close secondary channel.");                 
             }
         }
 
+        
         channel.pipeline().fireUserEventTriggered(new DisconnectEvent());
         scf.cancel(true);
         executor.shutdown();
         closed = true;
     }
-
+    
     @Override
     public boolean isConnected() {
         return !closed && channel.attr(S7HMuxImpl.IS_CONNECTED).get();
@@ -270,7 +289,7 @@ public class S7HPlcConnection extends 
DefaultNettyPlcConnection implements Runna
             primaryChannel = channelFactory.createChannel(new 
LoggingHandler(LogLevel.TRACE));
         } catch (Exception ex) {
             primaryChannel = null;
-            logger.info(ex.toString());
+            logger.error("doPrimaryTcpConnections: " + ex.toString());
         }
         if (primaryChannel != null) {
             if (primaryChannel.isActive()) {
@@ -285,7 +304,7 @@ public class S7HPlcConnection extends 
DefaultNettyPlcConnection implements Runna
             secondaryChannel = secondaryChannelFactory.createChannel(new 
LoggingHandler(LogLevel.TRACE));
         } catch (Exception ex) {
             secondaryChannel = null;
-            logger.info(ex.toString());
+            logger.info("doSecondaryTcpConnections(): " + ex.toString());
         }
         if (secondaryChannel != null) {
             if (secondaryChannel.isActive()) {
@@ -358,7 +377,7 @@ public class S7HPlcConnection extends 
DefaultNettyPlcConnection implements Runna
                 } else if (null == secondaryChannel) {
                     if (channel.attr(S7HMuxImpl.WAS_CONNECTED).get() &&
                         !channel.attr(S7HMuxImpl.IS_CONNECTED).get()) {
-                        logger.info("Reconnecting primary channel.");
+                        logger.info("Reconnecting primary channel.");          
             
                         if (null != ((S7HMux) s7hmux).getTCPChannel()) {
                             ((S7HMux) 
s7hmux).setPrimaryChannel(primaryChannel);
                         }
@@ -377,7 +396,7 @@ public class S7HPlcConnection extends 
DefaultNettyPlcConnection implements Runna
                 } else if (null == primaryChannel) {
                     if ((channel.attr(S7HMuxImpl.WAS_CONNECTED).get()) &&
                         (!channel.attr(S7HMuxImpl.IS_CONNECTED).get())) {
-                        logger.info("Reconnecting secondary channel.");
+                        logger.info("Reconnecting secondary channel.");        
               
                         if (null != ((S7HMux) s7hmux).getTCPChannel()) {
                             ((S7HMux) 
s7hmux).setSecondaryChannel(secondaryChannel);
                         }
@@ -426,6 +445,11 @@ public class S7HPlcConnection extends 
DefaultNettyPlcConnection implements Runna
                     logger.debug("PING: " + 
readResponse.getResponseCode("value"));
                 } catch (Exception ex) {
                     logger.info("PING: " + ex);
+                    retrysPing++;
+                    if (retrysPing > 
channel.attr(S7HMuxImpl.RETRY_TIME).get()){
+                        channel.attr(S7HMuxImpl.IS_CONNECTED).set(false);
+                        retrysPing = 0;
+                    }
                 }
             });
         }
diff --git 
a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java
 
b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java
index bffdb4ec30..883f592aab 100644
--- 
a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java
+++ 
b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java
@@ -163,6 +163,7 @@ public class S7ProtocolLogic extends 
Plc4xProtocolBase<TPKTPacket> {
 
     @Override
     public void onConnect(ConversationContext<TPKTPacket> context) {
+        logger.info("onConnect");
         if (context.isPassive()) {
             logger.info("S7 Driver running in PASSIVE mode.");
             s7DriverContext.setPassiveMode(true);
@@ -257,8 +258,9 @@ public class S7ProtocolLogic extends 
Plc4xProtocolBase<TPKTPacket> {
      */
     @Override
     public void onDisconnect(ConversationContext<TPKTPacket> context) {
+        logger.info("onDisconnect");        
         // 1. Here we shut down the local task executor.
-        clientExecutorService.shutdown();
+        clientExecutorService.shutdownNow();
         // 2. Performs the shutdown of the transaction executor.
         tm.shutdown();
         // 3. Finish the execution of the tasks for the handling of Events.
@@ -1532,9 +1534,8 @@ public class S7ProtocolLogic extends 
Plc4xProtocolBase<TPKTPacket> {
             .check(p -> p.getTpduReference() == tpduId)
             .handle(p -> {
                 // Finish the request-transaction.
-                transaction.endRequest();
-
                 try {
+                    transaction.endRequest();                    
                     future.complete(p);
                 } catch (Exception e) {
                     logger.warn("Error sending 'write' message: '{}'", 
e.getMessage(), e);

Reply via email to