This is an automated email from the ASF dual-hosted git repository.
cgarcia pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new 1f1485f113 Fix/s7hmux (#2109)
1f1485f113 is described below
commit 1f1485f11360494c892d47fba44c2fa9954da26a
Author: César José García León <[email protected]>
AuthorDate: Sat May 24 17:00:32 2025 -0400
Fix/s7hmux (#2109)
* S7HA restart problem.
* Try to detect buffer release.
---
.../java/s7/readwrite/protocol/S7HMuxImpl.java | 17 ++++++++--
.../s7/readwrite/protocol/S7HPlcConnection.java | 38 ++++++++++++++++++----
.../s7/readwrite/protocol/S7ProtocolLogic.java | 7 ++--
3 files changed, 49 insertions(+), 13 deletions(-)
diff --git
a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7HMuxImpl.java
b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7HMuxImpl.java
index 33c961509e..6d0c3c0d7c 100644
---
a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7HMuxImpl.java
+++
b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7HMuxImpl.java
@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
import java.time.LocalTime;
import java.util.List;
+import java.util.logging.Level;
/**
* Implementation of a multiplexing channel, from an embedded channel to two
@@ -201,7 +202,7 @@ public class S7HMuxImpl extends
MessageToMessageCodec<ByteBuf, ByteBuf> implemen
}
if (evt instanceof DisconnectEvent) {
- logger.info("DisconnectEvent");
+// logger.info("userEventTriggered -> DisconnectEvent");
}
// trigger other event handlers after IS_CONNECTED was set
@@ -304,18 +305,28 @@ public class S7HMuxImpl extends
MessageToMessageCodec<ByteBuf, ByteBuf> implemen
} else if (((!this.primaryChannel.isActive()) && (tcpChannel ==
this.primaryChannel)) ||
(primary_channel.isActive())) {
- synchronized (tcpChannel) {
+ synchronized (tcpChannel) {
tcpChannel.close();
this.primaryChannel = primary_channel;
tcpChannel = primary_channel;
embededChannel.attr(IS_PRIMARY).set(true);
if (tcpChannel.isActive()) {
+ logger.info("Reassigns the inactive primary channel and
send ConnectEvent..");
embedCtx.fireUserEventTriggered(new ConnectEvent());
}
}
} else if (primary_channel.isActive()) {
-
+ synchronized (tcpChannel) {
+ tcpChannel.close();
+ this.primaryChannel = primary_channel;
+ tcpChannel = primary_channel;
+ embededChannel.attr(IS_PRIMARY).set(true);
+ logger.info("Reassigns the primary channel and send
ConnectEvent.");
+ if (tcpChannel.isActive()) {
+ embedCtx.fireUserEventTriggered(new ConnectEvent());
+ }
+ }
}
}
diff --git
a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7HPlcConnection.java
b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7HPlcConnection.java
index aee3bb927d..dbf749b4f8 100644
---
a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7HPlcConnection.java
+++
b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7HPlcConnection.java
@@ -20,6 +20,11 @@ package org.apache.plc4x.java.s7.readwrite.protocol;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.MessageToMessageCodec;
import io.netty.handler.logging.LogLevel;
@@ -50,9 +55,15 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.*;
+import java.util.stream.Stream;
import org.apache.plc4x.java.api.exceptions.PlcUnsupportedOperationException;
+import org.apache.plc4x.java.api.listener.ConnectionStateListener;
import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
import org.apache.plc4x.java.s7.readwrite.utils.S7PlcSubscriptionRequest;
+import org.apache.plc4x.java.spi.events.ConnectEvent;
+import org.apache.plc4x.java.spi.events.DisconnectedEvent;
+import org.apache.plc4x.java.spi.events.DiscoverEvent;
+import org.apache.plc4x.java.spi.events.DiscoveredEvent;
/**
* This object generates the main connection and includes the management
@@ -85,6 +96,8 @@ public class S7HPlcConnection extends
DefaultNettyPlcConnection implements Runna
protected int slicePing = 0;
protected int sliceRetryTime = 0;
+
+ protected int retrysPing = 0;
public S7HPlcConnection(
boolean canPing,
@@ -147,13 +160,13 @@ public class S7HPlcConnection extends
DefaultNettyPlcConnection implements Runna
getChannelHandler(sessionSetupCompleteFuture,
sessionDisconnectCompleteFuture,
sessionDiscoveredCompleteFuture));
-
+ channel.pipeline().removeLast();
channel.pipeline().addFirst("Multiplexor", s7hmux);
}
((S7HMux) s7hmux).setEmbededhannel(channel, configuration);
-// channel.pipeline().addFirst((new LoggingHandler("CEOS")));
+ //channel.pipeline().addFirst((new LoggingHandler("CEOS")));
/*
channel.closeFuture().addListener(future -> {
if (!sessionSetupCompleteFuture.isDone()) {
@@ -205,15 +218,18 @@ public class S7HPlcConnection extends
DefaultNettyPlcConnection implements Runna
connected = true;
//((EmbeddedChannel) channel).runPendingTasks();
} catch (InterruptedException e) {
+ logger.error(e.getMessage());
Thread.currentThread().interrupt();
throw new PlcConnectionException(e);
} catch (ExecutionException e) {
+ logger.error(e.getMessage());
throw new PlcConnectionException(e);
}
}
@Override
public void close() throws PlcConnectionException {
+ logger.info("Close connection.");
if (closed) {
return;
}
@@ -228,6 +244,7 @@ public class S7HPlcConnection extends
DefaultNettyPlcConnection implements Runna
primaryChannel.pipeline().remove(MULTIPLEXER);
primaryChannel.pipeline().fireUserEventTriggered(new
CloseConnectionEvent());
primaryChannel.eventLoop().shutdownGracefully();
+ logger.info("Close primary channel.");
} catch (Exception ex) {
logger.info(ex.toString());
}
@@ -239,15 +256,17 @@ public class S7HPlcConnection extends
DefaultNettyPlcConnection implements Runna
secondaryChannel.pipeline().remove(MULTIPLEXER);
secondaryChannel.pipeline().fireUserEventTriggered(new
CloseConnectionEvent());
secondaryChannel.eventLoop().shutdownGracefully();
+ logger.info("Close secondary channel.");
}
}
+
channel.pipeline().fireUserEventTriggered(new DisconnectEvent());
scf.cancel(true);
executor.shutdown();
closed = true;
}
-
+
@Override
public boolean isConnected() {
return !closed && channel.attr(S7HMuxImpl.IS_CONNECTED).get();
@@ -270,7 +289,7 @@ public class S7HPlcConnection extends
DefaultNettyPlcConnection implements Runna
primaryChannel = channelFactory.createChannel(new
LoggingHandler(LogLevel.TRACE));
} catch (Exception ex) {
primaryChannel = null;
- logger.info(ex.toString());
+ logger.error("doPrimaryTcpConnections: " + ex.toString());
}
if (primaryChannel != null) {
if (primaryChannel.isActive()) {
@@ -285,7 +304,7 @@ public class S7HPlcConnection extends
DefaultNettyPlcConnection implements Runna
secondaryChannel = secondaryChannelFactory.createChannel(new
LoggingHandler(LogLevel.TRACE));
} catch (Exception ex) {
secondaryChannel = null;
- logger.info(ex.toString());
+ logger.info("doSecondaryTcpConnections(): " + ex.toString());
}
if (secondaryChannel != null) {
if (secondaryChannel.isActive()) {
@@ -358,7 +377,7 @@ public class S7HPlcConnection extends
DefaultNettyPlcConnection implements Runna
} else if (null == secondaryChannel) {
if (channel.attr(S7HMuxImpl.WAS_CONNECTED).get() &&
!channel.attr(S7HMuxImpl.IS_CONNECTED).get()) {
- logger.info("Reconnecting primary channel.");
+ logger.info("Reconnecting primary channel.");
if (null != ((S7HMux) s7hmux).getTCPChannel()) {
((S7HMux)
s7hmux).setPrimaryChannel(primaryChannel);
}
@@ -377,7 +396,7 @@ public class S7HPlcConnection extends
DefaultNettyPlcConnection implements Runna
} else if (null == primaryChannel) {
if ((channel.attr(S7HMuxImpl.WAS_CONNECTED).get()) &&
(!channel.attr(S7HMuxImpl.IS_CONNECTED).get())) {
- logger.info("Reconnecting secondary channel.");
+ logger.info("Reconnecting secondary channel.");
if (null != ((S7HMux) s7hmux).getTCPChannel()) {
((S7HMux)
s7hmux).setSecondaryChannel(secondaryChannel);
}
@@ -426,6 +445,11 @@ public class S7HPlcConnection extends
DefaultNettyPlcConnection implements Runna
logger.debug("PING: " +
readResponse.getResponseCode("value"));
} catch (Exception ex) {
logger.info("PING: " + ex);
+ retrysPing++;
+ if (retrysPing >
channel.attr(S7HMuxImpl.RETRY_TIME).get()){
+ channel.attr(S7HMuxImpl.IS_CONNECTED).set(false);
+ retrysPing = 0;
+ }
}
});
}
diff --git
a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java
b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java
index bffdb4ec30..883f592aab 100644
---
a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java
+++
b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java
@@ -163,6 +163,7 @@ public class S7ProtocolLogic extends
Plc4xProtocolBase<TPKTPacket> {
@Override
public void onConnect(ConversationContext<TPKTPacket> context) {
+ logger.info("onConnect");
if (context.isPassive()) {
logger.info("S7 Driver running in PASSIVE mode.");
s7DriverContext.setPassiveMode(true);
@@ -257,8 +258,9 @@ public class S7ProtocolLogic extends
Plc4xProtocolBase<TPKTPacket> {
*/
@Override
public void onDisconnect(ConversationContext<TPKTPacket> context) {
+ logger.info("onDisconnect");
// 1. Here we shut down the local task executor.
- clientExecutorService.shutdown();
+ clientExecutorService.shutdownNow();
// 2. Performs the shutdown of the transaction executor.
tm.shutdown();
// 3. Finish the execution of the tasks for the handling of Events.
@@ -1532,9 +1534,8 @@ public class S7ProtocolLogic extends
Plc4xProtocolBase<TPKTPacket> {
.check(p -> p.getTpduReference() == tpduId)
.handle(p -> {
// Finish the request-transaction.
- transaction.endRequest();
-
try {
+ transaction.endRequest();
future.complete(p);
} catch (Exception e) {
logger.warn("Error sending 'write' message: '{}'",
e.getMessage(), e);