This is an automated email from the ASF dual-hosted git repository. cgarcia pushed a commit to branch fix/s7hmux in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit bc74310481ecf1ee4dbe3b879c6190274a28ea42 Author: César José García León <[email protected]> AuthorDate: Mon Apr 14 23:14:05 2025 -0400 S7HA restart problem. --- .../java/s7/readwrite/protocol/S7HMuxImpl.java | 17 ++++++++-- .../s7/readwrite/protocol/S7HPlcConnection.java | 36 ++++++++++++++++++---- .../s7/readwrite/protocol/S7ProtocolLogic.java | 7 +++-- 3 files changed, 48 insertions(+), 12 deletions(-) diff --git a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7HMuxImpl.java b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7HMuxImpl.java index 33c961509e..6d0c3c0d7c 100644 --- a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7HMuxImpl.java +++ b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7HMuxImpl.java @@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory; import java.time.LocalTime; import java.util.List; +import java.util.logging.Level; /** * Implementation of a multiplexing channel, from an embedded channel to two @@ -201,7 +202,7 @@ public class S7HMuxImpl extends MessageToMessageCodec<ByteBuf, ByteBuf> implemen } if (evt instanceof DisconnectEvent) { - logger.info("DisconnectEvent"); +// logger.info("userEventTriggered -> DisconnectEvent"); } // trigger other event handlers after IS_CONNECTED was set @@ -304,18 +305,28 @@ public class S7HMuxImpl extends MessageToMessageCodec<ByteBuf, ByteBuf> implemen } else if (((!this.primaryChannel.isActive()) && (tcpChannel == this.primaryChannel)) || (primary_channel.isActive())) { - synchronized (tcpChannel) { + synchronized (tcpChannel) { tcpChannel.close(); this.primaryChannel = primary_channel; tcpChannel = primary_channel; embededChannel.attr(IS_PRIMARY).set(true); if (tcpChannel.isActive()) { + logger.info("Reassigns the inactive primary channel and send ConnectEvent.."); embedCtx.fireUserEventTriggered(new ConnectEvent()); } } } else if (primary_channel.isActive()) { - + synchronized (tcpChannel) { + tcpChannel.close(); + this.primaryChannel = primary_channel; + tcpChannel = primary_channel; + embededChannel.attr(IS_PRIMARY).set(true); + logger.info("Reassigns the primary channel and send ConnectEvent."); + if (tcpChannel.isActive()) { + embedCtx.fireUserEventTriggered(new ConnectEvent()); + } + } } } diff --git a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7HPlcConnection.java b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7HPlcConnection.java index aee3bb927d..821fc6c1c9 100644 --- a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7HPlcConnection.java +++ b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7HPlcConnection.java @@ -20,6 +20,11 @@ package org.apache.plc4x.java.s7.readwrite.protocol; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.MessageToMessageCodec; import io.netty.handler.logging.LogLevel; @@ -50,9 +55,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.*; +import java.util.stream.Stream; import org.apache.plc4x.java.api.exceptions.PlcUnsupportedOperationException; +import org.apache.plc4x.java.api.listener.ConnectionStateListener; import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest; import org.apache.plc4x.java.s7.readwrite.utils.S7PlcSubscriptionRequest; +import org.apache.plc4x.java.spi.events.ConnectEvent; +import org.apache.plc4x.java.spi.events.DisconnectedEvent; +import org.apache.plc4x.java.spi.events.DiscoverEvent; +import org.apache.plc4x.java.spi.events.DiscoveredEvent; /** * This object generates the main connection and includes the management @@ -85,6 +96,8 @@ public class S7HPlcConnection extends DefaultNettyPlcConnection implements Runna protected int slicePing = 0; protected int sliceRetryTime = 0; + + protected int retrysPing = 0; public S7HPlcConnection( boolean canPing, @@ -147,7 +160,7 @@ public class S7HPlcConnection extends DefaultNettyPlcConnection implements Runna getChannelHandler(sessionSetupCompleteFuture, sessionDisconnectCompleteFuture, sessionDiscoveredCompleteFuture)); - + channel.pipeline().removeLast(); channel.pipeline().addFirst("Multiplexor", s7hmux); } @@ -205,15 +218,18 @@ public class S7HPlcConnection extends DefaultNettyPlcConnection implements Runna connected = true; //((EmbeddedChannel) channel).runPendingTasks(); } catch (InterruptedException e) { + logger.error(e.getMessage()); Thread.currentThread().interrupt(); throw new PlcConnectionException(e); } catch (ExecutionException e) { + logger.error(e.getMessage()); throw new PlcConnectionException(e); } } @Override public void close() throws PlcConnectionException { + logger.info("Close connection."); if (closed) { return; } @@ -228,6 +244,7 @@ public class S7HPlcConnection extends DefaultNettyPlcConnection implements Runna primaryChannel.pipeline().remove(MULTIPLEXER); primaryChannel.pipeline().fireUserEventTriggered(new CloseConnectionEvent()); primaryChannel.eventLoop().shutdownGracefully(); + logger.info("Close primary channel."); } catch (Exception ex) { logger.info(ex.toString()); } @@ -239,15 +256,17 @@ public class S7HPlcConnection extends DefaultNettyPlcConnection implements Runna secondaryChannel.pipeline().remove(MULTIPLEXER); secondaryChannel.pipeline().fireUserEventTriggered(new CloseConnectionEvent()); secondaryChannel.eventLoop().shutdownGracefully(); + logger.info("Close secondary channel."); } } + channel.pipeline().fireUserEventTriggered(new DisconnectEvent()); scf.cancel(true); executor.shutdown(); closed = true; } - + @Override public boolean isConnected() { return !closed && channel.attr(S7HMuxImpl.IS_CONNECTED).get(); @@ -270,7 +289,7 @@ public class S7HPlcConnection extends DefaultNettyPlcConnection implements Runna primaryChannel = channelFactory.createChannel(new LoggingHandler(LogLevel.TRACE)); } catch (Exception ex) { primaryChannel = null; - logger.info(ex.toString()); + logger.error("doPrimaryTcpConnections: " + ex.toString()); } if (primaryChannel != null) { if (primaryChannel.isActive()) { @@ -285,7 +304,7 @@ public class S7HPlcConnection extends DefaultNettyPlcConnection implements Runna secondaryChannel = secondaryChannelFactory.createChannel(new LoggingHandler(LogLevel.TRACE)); } catch (Exception ex) { secondaryChannel = null; - logger.info(ex.toString()); + logger.info("doSecondaryTcpConnections(): " + ex.toString()); } if (secondaryChannel != null) { if (secondaryChannel.isActive()) { @@ -358,7 +377,7 @@ public class S7HPlcConnection extends DefaultNettyPlcConnection implements Runna } else if (null == secondaryChannel) { if (channel.attr(S7HMuxImpl.WAS_CONNECTED).get() && !channel.attr(S7HMuxImpl.IS_CONNECTED).get()) { - logger.info("Reconnecting primary channel."); + logger.info("Reconnecting primary channel."); if (null != ((S7HMux) s7hmux).getTCPChannel()) { ((S7HMux) s7hmux).setPrimaryChannel(primaryChannel); } @@ -377,7 +396,7 @@ public class S7HPlcConnection extends DefaultNettyPlcConnection implements Runna } else if (null == primaryChannel) { if ((channel.attr(S7HMuxImpl.WAS_CONNECTED).get()) && (!channel.attr(S7HMuxImpl.IS_CONNECTED).get())) { - logger.info("Reconnecting secondary channel."); + logger.info("Reconnecting secondary channel."); if (null != ((S7HMux) s7hmux).getTCPChannel()) { ((S7HMux) s7hmux).setSecondaryChannel(secondaryChannel); } @@ -426,6 +445,11 @@ public class S7HPlcConnection extends DefaultNettyPlcConnection implements Runna logger.debug("PING: " + readResponse.getResponseCode("value")); } catch (Exception ex) { logger.info("PING: " + ex); + retrysPing++; + if (retrysPing > channel.attr(S7HMuxImpl.RETRY_TIME).get()){ + channel.attr(S7HMuxImpl.IS_CONNECTED).set(false); + retrysPing = 0; + } } }); } diff --git a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java index bffdb4ec30..883f592aab 100644 --- a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java +++ b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java @@ -163,6 +163,7 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> { @Override public void onConnect(ConversationContext<TPKTPacket> context) { + logger.info("onConnect"); if (context.isPassive()) { logger.info("S7 Driver running in PASSIVE mode."); s7DriverContext.setPassiveMode(true); @@ -257,8 +258,9 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> { */ @Override public void onDisconnect(ConversationContext<TPKTPacket> context) { + logger.info("onDisconnect"); // 1. Here we shut down the local task executor. - clientExecutorService.shutdown(); + clientExecutorService.shutdownNow(); // 2. Performs the shutdown of the transaction executor. tm.shutdown(); // 3. Finish the execution of the tasks for the handling of Events. @@ -1532,9 +1534,8 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> { .check(p -> p.getTpduReference() == tpduId) .handle(p -> { // Finish the request-transaction. - transaction.endRequest(); - try { + transaction.endRequest(); future.complete(p); } catch (Exception e) { logger.warn("Error sending 'write' message: '{}'", e.getMessage(), e);
